From b349c1093f90cc6ea1facb8a70f9171423ef708f Mon Sep 17 00:00:00 2001 From: Benjamin Moss Date: Thu, 19 Sep 2024 11:12:28 -0400 Subject: [PATCH] Addresses #283 - Remember the stream_id from batch to batch So that subsequent batches update instead of trying to insert and fail --- .../sql/statements/insert_events.sql.eex | 36 ++++++++++--------- .../insert_events_any_version.sql.eex | 36 ++++++++++--------- lib/event_store/storage/appender.ex | 20 +++++++---- 3 files changed, 53 insertions(+), 39 deletions(-) diff --git a/lib/event_store/sql/statements/insert_events.sql.eex b/lib/event_store/sql/statements/insert_events.sql.eex index 49bcae15..3d444fca 100644 --- a/lib/event_store/sql/statements/insert_events.sql.eex +++ b/lib/event_store/sql/statements/insert_events.sql.eex @@ -57,20 +57,24 @@ WITH SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version + ), + linked_stream_events AS ( + INSERT INTO "<%= schema %>".stream_events + ( + event_id, + stream_id, + stream_version, + original_stream_id, + original_stream_version + ) + SELECT + new_events_indexes.event_id, + 0, + linked_stream.initial_stream_version + new_events_indexes.index, + stream.stream_id, + new_events_indexes.stream_version + FROM + new_events_indexes, linked_stream, stream ) -INSERT INTO "<%= schema %>".stream_events -( - event_id, - stream_id, - stream_version, - original_stream_id, - original_stream_version -) -SELECT - new_events_indexes.event_id, - 0, - linked_stream.initial_stream_version + new_events_indexes.index, - stream.stream_id, - new_events_indexes.stream_version -FROM - new_events_indexes, linked_stream, stream; + +SELECT stream_id from stream; diff --git a/lib/event_store/sql/statements/insert_events_any_version.sql.eex b/lib/event_store/sql/statements/insert_events_any_version.sql.eex index 7be918e9..cfc250a0 100644 --- a/lib/event_store/sql/statements/insert_events_any_version.sql.eex +++ b/lib/event_store/sql/statements/insert_events_any_version.sql.eex @@ -57,20 +57,24 @@ WITH SET stream_version = stream_version + $2::bigint WHERE stream_id = 0 RETURNING stream_version - $2::bigint as initial_stream_version + ), + linked_stream_events AS ( + INSERT INTO "<%= schema %>".stream_events + ( + event_id, + stream_id, + stream_version, + original_stream_id, + original_stream_version + ) + SELECT + new_events_indexes.event_id, + 0, + linked_stream.initial_stream_version + new_events_indexes.index, + stream.stream_id, + stream.initial_stream_version + new_events_indexes.index + FROM + new_events_indexes, linked_stream, stream ) -INSERT INTO "<%= schema %>".stream_events -( - event_id, - stream_id, - stream_version, - original_stream_id, - original_stream_version -) -SELECT - new_events_indexes.event_id, - 0, - linked_stream.initial_stream_version + new_events_indexes.index, - stream.stream_id, - stream.initial_stream_version + new_events_indexes.index -FROM - new_events_indexes, linked_stream, stream; + +SELECT stream_id from stream; diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index ffd6b5c2..1b501efa 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -20,17 +20,18 @@ defmodule EventStore.Storage.Appender do events |> Stream.map(&encode_uuids/1) |> Stream.chunk_every(1_000) - |> Enum.each(fn batch -> + |> Enum.reduce(stream_id, fn batch, stream_id -> event_count = length(batch) - with :ok <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do + with {:ok, new_stream_id} <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do Logger.debug("Appended #{event_count} event(s) to stream #{inspect(stream_uuid)}") - - :ok + new_stream_id else {:error, error} -> throw({:error, error}) end end) + + :ok catch {:error, error} = reply -> Logger.warning( @@ -110,9 +111,14 @@ defmodule EventStore.Storage.Appender do params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events) case Postgrex.query(conn, statement, params, opts) do - {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found} - {:ok, %Postgrex.Result{}} -> :ok - {:error, error} -> handle_error(error) + {:ok, %Postgrex.Result{num_rows: 0}} -> + {:error, :not_found} + + {:ok, %Postgrex.Result{rows: [[stream_id]]}} -> + {:ok, stream_id} + + {:error, error} -> + handle_error(error) end end