Skip to content

Commit

Permalink
Merge pull request #289 from bitfield-co/283-eventstoreappend_to_stre…
Browse files Browse the repository at this point in the history
…am-is-not-able-to-append-more-than-1000-events-to-a-new-stream

Remember the generated stream_id across batches of events being appeneded.
  • Loading branch information
jwilger authored Oct 21, 2024
2 parents 5e17ea1 + 0956c4c commit 54429c3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 43 deletions.
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
new_events_indexes.stream_version
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
36 changes: 20 additions & 16 deletions lib/event_store/sql/statements/insert_events_any_version.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ WITH
SET stream_version = stream_version + $2::bigint
WHERE stream_id = 0
RETURNING stream_version - $2::bigint as initial_stream_version
),
linked_stream_events AS (
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream
)
INSERT INTO "<%= schema %>".stream_events
(
event_id,
stream_id,
stream_version,
original_stream_id,
original_stream_version
)
SELECT
new_events_indexes.event_id,
0,
linked_stream.initial_stream_version + new_events_indexes.index,
stream.stream_id,
stream.initial_stream_version + new_events_indexes.index
FROM
new_events_indexes, linked_stream, stream;

SELECT stream_id from stream;
21 changes: 14 additions & 7 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ defmodule EventStore.Storage.Appender do
events
|> Stream.map(&encode_uuids/1)
|> Stream.chunk_every(1_000)
|> Enum.each(fn batch ->
|> Enum.reduce(stream_id, fn batch, stream_id ->
event_count = length(batch)

with :ok <- insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
with {:ok, new_stream_id} <-
insert_event_batch(conn, stream_id, stream_uuid, batch, event_count, opts) do
Logger.debug("Appended #{event_count} event(s) to stream #{inspect(stream_uuid)}")

:ok
new_stream_id
else
{:error, error} -> throw({:error, error})
end
end)

:ok
catch
{:error, error} = reply ->
Logger.warning(
Expand Down Expand Up @@ -110,9 +112,14 @@ defmodule EventStore.Storage.Appender do
params = [stream_id_or_uuid, event_count] ++ build_insert_parameters(events)

case Postgrex.query(conn, statement, params, opts) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :not_found}
{:ok, %Postgrex.Result{}} -> :ok
{:error, error} -> handle_error(error)
{:ok, %Postgrex.Result{num_rows: 0}} ->
{:error, :not_found}

{:ok, %Postgrex.Result{rows: [[stream_id]]}} ->
{:ok, stream_id}

{:error, error} ->
handle_error(error)
end
end

Expand Down
4 changes: 1 addition & 3 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,7 @@ defmodule EventStore.Streams.Stream do

append_to_stream(conn, stream_uuid, expected_version, events, opts)
else
# We should never get here, but just in case we break something in another
# part of the app, this will give us better output in the tests.
{:error, :already_retried_once}
{:error, {:already_retried_once, :duplicate_stream_uuid}}
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
exclude = [:ignore, :manual, :migration, :slow]
exclude = [:ignore, :manual, :migration]

ExUnit.start(exclude: exclude)

0 comments on commit 54429c3

Please sign in to comment.