Skip to content

Commit

Permalink
Addresses #283 - Remember the stream_id from batch to batch
Browse files Browse the repository at this point in the history
So that subsequent batches update instead of trying to insert and fail
  • Loading branch information
drteeth committed Sep 19, 2024
1 parent bbd9690 commit 0956c4c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 39 deletions.
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events_any_version.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
21 changes: 14 additions & 7 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ defmodule EventStore.Storage.Appender do
events
|> Stream.map(&encode_uuids/1)
|> Stream.chunk_every(1_000)
|> Enum.each(fn batch ->
|> Enum.reduce(stream_id, fn batch, stream_id ->
event_count = length(batch)

with :ok <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
with {:ok, new_stream_id} <-
insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
Logger.debug("Appended #{event_count} event(s) to stream #{inspect(stream_uuid)}")

:ok
new_stream_id
else
{:error, error} -> throw({:error, error})
end
end)

:ok
catch
{:error, error} = reply ->
Logger.warning(
Expand Down Expand Up @@ -110,9 +112,14 @@ defmodule EventStore.Storage.Appender do
params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)

case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found}
{:ok, %Postgrex.Result{}} -> :ok
{:error, error} -> handle_error(error)
{:ok, %Postgrex.Result{num_rows: 0}} ->
{:error, :not_found}

{:ok, %Postgrex.Result{rows: [[stream_id]]}} ->
{:ok, stream_id}

{:error, error} ->
handle_error(error)
end
end

Expand Down

0 comments on commit 0956c4c

Please sign in to comment.