From a8c234b7722348713223a7ba5b9d6b851e2e4f81 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Tue, 28 Apr 2026 10:03:52 -0400 Subject: [PATCH 1/4] Add a starting version to streams --- lib/event_store/sql/init.ex | 3 ++- .../statements/query_stream_events_backward.sql.eex | 5 +++-- .../sql/statements/query_stream_events_forward.sql.eex | 5 +++-- .../sql/statements/query_stream_info.sql.eex | 1 + lib/event_store/sql/statements/query_streams.sql.eex | 1 + lib/event_store/storage/stream.ex | 4 +++- lib/event_store/streams/stream_info.ex | 3 ++- lib/event_store/tasks/migrations.ex | 3 ++- priv/event_store/migrations/v1.4.0.sql | 10 ++++++++++ test/event_store_test.exs | 10 +++------- 10 files changed, 30 insertions(+), 15 deletions(-) create mode 100644 priv/event_store/migrations/v1.4.0.sql diff --git a/lib/event_store/sql/init.ex b/lib/event_store/sql/init.ex index ea529782..186b8a85 100644 --- a/lib/event_store/sql/init.ex +++ b/lib/event_store/sql/init.ex @@ -39,6 +39,7 @@ defmodule EventStore.Sql.Init do stream_id bigserial PRIMARY KEY NOT NULL, stream_uuid text NOT NULL, stream_version bigint default 0 NOT NULL, + start_version bigint default 0 NOT NULL, created_at timestamp with time zone DEFAULT NOW() NOT NULL, deleted_at timestamp with time zone ); @@ -54,7 +55,7 @@ defmodule EventStore.Sql.Init do # Create `$all` stream defp seed_all_stream do """ - INSERT INTO streams (stream_id, stream_uuid, stream_version) VALUES (0, '$all', 0); + INSERT INTO streams (stream_id, stream_uuid, stream_version, start_version) VALUES (0, '$all', 0, 0); """ end diff --git a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex index e107a1cd..31a8b1c0 100644 --- a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex @@ -12,8 +12,9 @@ SELECT FROM "<%= schema %>".stream_events se INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id -WHERE - se.stream_id = $1 AND (se.stream_version <= $2 OR $2 = -1) +WHERE se.stream_id = $1 + AND (se.stream_version <= $2 OR $2 = -1) + AND se.stream_version >= s.start_version ORDER BY se.stream_version DESC LIMIT $3; diff --git a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex index 13d6236c..cc1217b6 100644 --- a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex @@ -12,8 +12,9 @@ SELECT FROM "<%= schema %>".stream_events se INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id -WHERE - se.stream_id = $1 AND se.stream_version >= $2 +WHERE se.stream_id = $1 + AND se.stream_version >= $2 + AND se.stream_version >= s.start_version ORDER BY se.stream_version ASC LIMIT $3; diff --git a/lib/event_store/sql/statements/query_stream_info.sql.eex b/lib/event_store/sql/statements/query_stream_info.sql.eex index b5d74b3d..d6902749 100644 --- a/lib/event_store/sql/statements/query_stream_info.sql.eex +++ b/lib/event_store/sql/statements/query_stream_info.sql.eex @@ -2,6 +2,7 @@ SELECT stream_id, stream_uuid, stream_version, + start_version, created_at, deleted_at FROM "<%= schema %>".streams diff --git a/lib/event_store/sql/statements/query_streams.sql.eex b/lib/event_store/sql/statements/query_streams.sql.eex index 87e02924..95d5f3c4 100644 --- a/lib/event_store/sql/statements/query_streams.sql.eex +++ b/lib/event_store/sql/statements/query_streams.sql.eex @@ -2,6 +2,7 @@ SELECT stream_id, stream_uuid, stream_version, + start_version, created_at, deleted_at FROM "<%= schema %>".streams diff --git a/lib/event_store/storage/stream.ex b/lib/event_store/storage/stream.ex index 253b19b0..f643a49a 100644 --- a/lib/event_store/storage/stream.ex +++ b/lib/event_store/storage/stream.ex @@ -98,15 +98,17 @@ defmodule EventStore.Storage.Stream do end defp to_stream_info(row) do - [stream_id, stream_uuid, stream_version, created_at, deleted_at] = row + [stream_id, stream_uuid, stream_version, start_version, created_at, deleted_at] = row stream_version = if is_nil(stream_version), do: 0, else: stream_version + start_version = if is_nil(start_version), do: 0, else: start_version status = if is_nil(deleted_at), do: :created, else: :deleted %StreamInfo{ stream_uuid: stream_uuid, stream_id: stream_id, stream_version: stream_version, + start_version: start_version, created_at: created_at, deleted_at: deleted_at, status: status diff --git a/lib/event_store/streams/stream_info.ex b/lib/event_store/streams/stream_info.ex index e87412d8..6818ddbc 100644 --- a/lib/event_store/streams/stream_info.ex +++ b/lib/event_store/streams/stream_info.ex @@ -5,12 +5,13 @@ defmodule EventStore.Streams.StreamInfo do stream_uuid: String.t(), stream_id: non_neg_integer() | nil, stream_version: non_neg_integer(), + start_version: non_neg_integer(), created_at: DateTime.t(), deleted_at: DateTime.t() | nil, status: :created | :deleted | nil } - defstruct [:stream_uuid, :stream_id, :created_at, :deleted_at, :status, stream_version: 0] + defstruct [:stream_uuid, :stream_id, :created_at, :deleted_at, :status, stream_version: 0, start_version: 0] def new(stream_uuid) do %StreamInfo{stream_uuid: stream_uuid} diff --git a/lib/event_store/tasks/migrations.ex b/lib/event_store/tasks/migrations.ex index 8b3dbc19..4d5147bf 100644 --- a/lib/event_store/tasks/migrations.ex +++ b/lib/event_store/tasks/migrations.ex @@ -19,7 +19,8 @@ defmodule EventStore.Tasks.Migrations do "1.1.0", "1.2.0", "1.3.0", - "1.3.2" + "1.3.2", + "1.4.0" ] @dialyzer {:no_return, exec: 2, handle_response: 1} diff --git a/priv/event_store/migrations/v1.4.0.sql b/priv/event_store/migrations/v1.4.0.sql new file mode 100644 index 00000000..0a1350f7 --- /dev/null +++ b/priv/event_store/migrations/v1.4.0.sql @@ -0,0 +1,10 @@ +DO $$ +BEGIN + + ALTER TABLE streams ADD COLUMN start_version bigint DEFAULT 0; + + -- record schema migration + INSERT INTO schema_migrations (major_version, minor_version, patch_version) VALUES (1, 4, 0); + +END; +$$ LANGUAGE plpgsql; diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 630c0775..be40a1cc 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -167,7 +167,7 @@ defmodule EventStore.EventStoreTest do {:ok, recorded_events} = EventStore.read_stream_backward(stream_uuid) - assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events) + assert_recorded_events(stream_uuid, 10..1//-1, Enum.reverse(events), recorded_events) end test "stream backward", %{stream_uuid: stream_uuid, events: events} do @@ -176,7 +176,7 @@ defmodule EventStore.EventStoreTest do recorded_events = EventStore.stream_backward(stream_uuid, -1, batch_size: 5) |> Enum.to_list() - assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events) + assert_recorded_events(stream_uuid, 10..1//-1, Enum.reverse(events), recorded_events) end test "stream all backward", %{stream_uuid: stream_uuid, events: events} do @@ -184,7 +184,7 @@ defmodule EventStore.EventStoreTest do recorded_events = EventStore.stream_all_backward(-1, batch_size: 5) |> Enum.to_list() - assert_recorded_events(stream_uuid, 10..1, Enum.reverse(events), recorded_events) + assert_recorded_events(stream_uuid, 10..1//-1, Enum.reverse(events), recorded_events) end end @@ -484,10 +484,6 @@ defmodule EventStore.EventStoreTest do defstruct([:data]) end - test "record snapshot" do - assert record_snapshot() != nil - end - test "read a snapshot" do snapshot = record_snapshot() From b1ffb82368035a654430b4137d908e7f50af4478 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Tue, 28 Apr 2026 10:42:09 -0400 Subject: [PATCH 2/4] Rename to stream_origin and do some design-by-writing-documentation --- CHANGELOG.md | 16 +++++----- guides/Subscriptions.md | 6 ++++ guides/Usage.md | 20 ++++++++++++ lib/event_store.ex | 14 +++++--- lib/event_store/sql/init.ex | 4 +-- .../query_stream_events_backward.sql.eex | 2 +- .../query_stream_events_forward.sql.eex | 2 +- .../sql/statements/query_stream_info.sql.eex | 2 +- .../sql/statements/query_streams.sql.eex | 2 +- lib/event_store/storage.ex | 4 +-- lib/event_store/storage/reader.ex | 16 +++++----- lib/event_store/storage/stream.ex | 6 ++-- lib/event_store/streams/stream.ex | 32 +++++++++---------- lib/event_store/streams/stream_info.ex | 4 +-- priv/event_store/migrations/v1.4.0.sql | 2 +- test/storage/read_events_test.exs | 8 ++--- 16 files changed, 85 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 333933bc..6413c7dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -205,10 +205,10 @@ Previous usage: ```elixir EventStore.append_to_stream(stream_uuid, expected_version, events, timeout) EventStore.link_to_stream(stream_uuid, expected_version, events_or_event_ids, timeout) -EventStore.read_stream_forward(stream_uuid, start_version, count, timeout) -EventStore.read_all_streams_forward(start_version, count, timeout) -EventStore.stream_forward(stream_uuid, start_version, read_batch_size, timeout) -EventStore.stream_all_forward(start_version, read_batch_size, timeout) +EventStore.read_stream_forward(stream_uuid, stream_origin, count, timeout) +EventStore.read_all_streams_forward(stream_origin, count, timeout) +EventStore.stream_forward(stream_uuid, stream_origin, read_batch_size, timeout) +EventStore.stream_all_forward(stream_origin, read_batch_size, timeout) ``` Usage now: @@ -216,10 +216,10 @@ Usage now: ```elixir EventStore.append_to_stream(stream_uuid, expected_version, events, timeout: timeout) EventStore.link_to_stream(stream_uuid, expected_version, events_or_event_ids, timeout: timeout) -EventStore.read_stream_forward(stream_uuid, start_version, count, timeout: timeout) -EventStore.read_all_streams_forward(start_version, count, timeout: timeout) -EventStore.stream_forward(stream_uuid, start_version, read_batch_size: read_batch_size, timeout: timeout) -EventStore.stream_all_forward(start_version, read_batch_size: read_batch_size, timeout: timeout) +EventStore.read_stream_forward(stream_uuid, stream_origin, count, timeout: timeout) +EventStore.read_all_streams_forward(stream_origin, count, timeout: timeout) +EventStore.stream_forward(stream_uuid, stream_origin, read_batch_size: read_batch_size, timeout: timeout) +EventStore.stream_all_forward(stream_origin, read_batch_size: read_batch_size, timeout: timeout) ``` ### Upgrading diff --git a/guides/Subscriptions.md b/guides/Subscriptions.md index 0314189e..642ed562 100644 --- a/guides/Subscriptions.md +++ b/guides/Subscriptions.md @@ -97,6 +97,12 @@ By default subscriptions are created from the stream origin; they will receive a - `:current` - subscribe to events from the current version. - `event_number` (integer) - specify an exact event number to subscribe from. This will be the same as the stream version for single stream subscriptions. +Note that in all cases the streaming will never go "past" the stream's origin, wich +can be set with `EventStore.trim/2`. This functionality allows garbage collection of +events that are not part of any stream. The current origin is returned in the `stream_origin` +field of the `EventStore.stream_info/1` result. If the event number passed in is +smaller than the stream's origin, the stream will still start at the origin. + ### Acknowledge received events Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event and to indicate the subscription is ready to receive the next event. diff --git a/guides/Usage.md b/guides/Usage.md index f06f7bbd..d53b990d 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -217,3 +217,23 @@ Hard delete a stream that should exist: ```elixir :ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard) ``` + +## Stream origin and garbage collection + +By default, a stream starts at the first event added to it. But this means +that the event store will grow without bounds, as there is no way to remove +events from a stream. + +To alleviate this problem, streams have variable origins: if you pass +`trim: true` to a `MyApp.EventStore.append_to_stream` call, then the origin +of the stream will be set to the first event passed into that call. This +functionality currently allows you to pass in a "fresh start" event, which +in CQRS/ES is used for various purposes like "closing the books". In effect, +all events that were in the stream before are now invisible: subscriptions +will start at this new origin, meaning that they won't need to traverse +all of the event history on start (or, alternatively, see incomplete +data when starting at the end or somewhere in the middle of a stream). + +In the (hopefully near) future, this will allow EventStore to implement garbage +collection: any events that are not visible in any streams can be safely deleted. This +way, by judiciously trimming streams, database sizes can be kept under control. diff --git a/lib/event_store.ex b/lib/event_store.ex index a02560a8..719fed1b 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -641,7 +641,11 @@ defmodule EventStore do - `opts` an optional keyword list containing: - `name` the name of the event store if provided to `start_link/1`. - `timeout` an optional timeout for the database transaction, in - milliseconds. Defaults to 15,000ms. + milliseconds. Defaults to 15,000ms. + - `trim` a boolean defaulting to false. If true, then the stream will be "trimmed", + which means that the stream's origin will be set to the first event in this call. In other + words, new subscriptions starting from `:origin` will start with the events passed into + this call. Returns `:ok` on success, or an `{:error, reason}` tagged tuple. The returned error may be due to one of the following reasons: @@ -729,7 +733,7 @@ defmodule EventStore do - `stream_uuid` is used to uniquely identify a stream. - `start_version` optionally, the stream version of the first event to read. - Defaults to the beginning of the stream if not set. + Defaults to the origin of the stream if not set. - `count` optionally, the maximum number of events to read. Defaults to to returning 1,000 events from the stream. @@ -754,7 +758,7 @@ defmodule EventStore do they were originally written. - `start_version` optionally, the stream version of the first event to read. - Defaults to the beginning of the stream if not set. + Defaults to the origin of the stream if not set. - `count` optionally, the maximum number of events to read. Defaults to returning 1,000 events from all streams. @@ -825,7 +829,7 @@ defmodule EventStore do originally written. - `start_version` optionally, the stream version of the first event to read. - Defaults to the beginning of the stream if not set. + Defaults to the origin of the stream if not set. - `opts` an optional keyword list containing: - `name` the name of the event store if provided to `start_link/1`. @@ -845,7 +849,7 @@ defmodule EventStore do written. - `start_version` optionally, the stream version of the first event to read. - Defaults to the beginning of the stream if not set. + Defaults to the origin of the stream if not set. - `opts` an optional keyword list containing: - `name` the name of the event store if provided to `start_link/1`. diff --git a/lib/event_store/sql/init.ex b/lib/event_store/sql/init.ex index 186b8a85..7d3e6bd5 100644 --- a/lib/event_store/sql/init.ex +++ b/lib/event_store/sql/init.ex @@ -39,7 +39,7 @@ defmodule EventStore.Sql.Init do stream_id bigserial PRIMARY KEY NOT NULL, stream_uuid text NOT NULL, stream_version bigint default 0 NOT NULL, - start_version bigint default 0 NOT NULL, + stream_origin bigint default 0 NOT NULL, created_at timestamp with time zone DEFAULT NOW() NOT NULL, deleted_at timestamp with time zone ); @@ -55,7 +55,7 @@ defmodule EventStore.Sql.Init do # Create `$all` stream defp seed_all_stream do """ - INSERT INTO streams (stream_id, stream_uuid, stream_version, start_version) VALUES (0, '$all', 0, 0); + INSERT INTO streams (stream_id, stream_uuid, stream_version, stream_origin) VALUES (0, '$all', 0, 0); """ end diff --git a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex index 31a8b1c0..19f300b7 100644 --- a/lib/event_store/sql/statements/query_stream_events_backward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_backward.sql.eex @@ -14,7 +14,7 @@ INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id WHERE se.stream_id = $1 AND (se.stream_version <= $2 OR $2 = -1) - AND se.stream_version >= s.start_version + AND se.stream_version >= s.stream_origin ORDER BY se.stream_version DESC LIMIT $3; diff --git a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex index cc1217b6..c97ff285 100644 --- a/lib/event_store/sql/statements/query_stream_events_forward.sql.eex +++ b/lib/event_store/sql/statements/query_stream_events_forward.sql.eex @@ -14,7 +14,7 @@ INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id WHERE se.stream_id = $1 AND se.stream_version >= $2 - AND se.stream_version >= s.start_version + AND se.stream_version >= s.stream_origin ORDER BY se.stream_version ASC LIMIT $3; diff --git a/lib/event_store/sql/statements/query_stream_info.sql.eex b/lib/event_store/sql/statements/query_stream_info.sql.eex index d6902749..1178814e 100644 --- a/lib/event_store/sql/statements/query_stream_info.sql.eex +++ b/lib/event_store/sql/statements/query_stream_info.sql.eex @@ -2,7 +2,7 @@ SELECT stream_id, stream_uuid, stream_version, - start_version, + stream_origin, created_at, deleted_at FROM "<%= schema %>".streams diff --git a/lib/event_store/sql/statements/query_streams.sql.eex b/lib/event_store/sql/statements/query_streams.sql.eex index 95d5f3c4..ba3418cd 100644 --- a/lib/event_store/sql/statements/query_streams.sql.eex +++ b/lib/event_store/sql/statements/query_streams.sql.eex @@ -2,7 +2,7 @@ SELECT stream_id, stream_uuid, stream_version, - start_version, + stream_origin, created_at, deleted_at FROM "<%= schema %>".streams diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index 0f2af5ac..e55d3505 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -30,7 +30,7 @@ defmodule EventStore.Storage do Read events for the given stream forward from the starting version, use zero for all events for the stream. """ - defdelegate read_stream_forward(conn, stream_id, start_version, count, opts), + defdelegate read_stream_forward(conn, stream_id, stream_origin, count, opts), to: Reader, as: :read_forward @@ -38,7 +38,7 @@ defmodule EventStore.Storage do Read events for the given stream backward from the starting version, use -1 for all events for the stream. """ - defdelegate read_stream_backward(conn, stream_id, start_version, count, opts), + defdelegate read_stream_backward(conn, stream_id, stream_origin, count, opts), to: Reader, as: :read_backward diff --git a/lib/event_store/storage/reader.ex b/lib/event_store/storage/reader.ex index b818e6cc..777fe51d 100644 --- a/lib/event_store/storage/reader.ex +++ b/lib/event_store/storage/reader.ex @@ -10,8 +10,8 @@ defmodule EventStore.Storage.Reader do @doc """ Read events from a single stream forwards from the given starting version. """ - def read_forward(conn, stream_id, start_version, count, opts) do - case Reader.Query.read_events_forward(conn, stream_id, start_version, count, opts) do + def read_forward(conn, stream_id, stream_origin, count, opts) do + case Reader.Query.read_events_forward(conn, stream_id, stream_origin, count, opts) do {:ok, []} = reply -> reply {:ok, rows} -> map_rows_to_event_data(rows) {:error, reason} -> failed_to_read(stream_id, reason) @@ -21,8 +21,8 @@ defmodule EventStore.Storage.Reader do @doc """ Read events from a single stream backwards from the given starting version. """ - def read_backward(conn, stream_id, start_version, count, opts) do - case Reader.Query.read_events_backward(conn, stream_id, start_version, count, opts) do + def read_backward(conn, stream_id, stream_origin, count, opts) do + case Reader.Query.read_events_backward(conn, stream_id, stream_origin, count, opts) do {:ok, []} = reply -> reply {:ok, rows} -> map_rows_to_event_data(rows) {:error, reason} -> failed_to_read(stream_id, reason) @@ -106,20 +106,20 @@ defmodule EventStore.Storage.Reader do defmodule Query do @moduledoc false - def read_events_forward(conn, stream_id, start_version, count, opts) do + def read_events_forward(conn, stream_id, stream_origin, count, opts) do {schema, opts} = Keyword.pop(opts, :schema) query = Statements.query_stream_events_forward(schema) - do_query(conn, query, [stream_id, start_version, count], opts) + do_query(conn, query, [stream_id, stream_origin, count], opts) end - def read_events_backward(conn, stream_id, start_version, count, opts) do + def read_events_backward(conn, stream_id, stream_origin, count, opts) do {schema, opts} = Keyword.pop(opts, :schema) query = Statements.query_stream_events_backward(schema) - do_query(conn, query, [stream_id, start_version, count], opts) + do_query(conn, query, [stream_id, stream_origin, count], opts) end defp do_query(conn, query, params, opts) do diff --git a/lib/event_store/storage/stream.ex b/lib/event_store/storage/stream.ex index f643a49a..7aad39bc 100644 --- a/lib/event_store/storage/stream.ex +++ b/lib/event_store/storage/stream.ex @@ -98,17 +98,17 @@ defmodule EventStore.Storage.Stream do end defp to_stream_info(row) do - [stream_id, stream_uuid, stream_version, start_version, created_at, deleted_at] = row + [stream_id, stream_uuid, stream_version, stream_origin, created_at, deleted_at] = row stream_version = if is_nil(stream_version), do: 0, else: stream_version - start_version = if is_nil(start_version), do: 0, else: start_version + stream_origin = if is_nil(stream_origin), do: 0, else: stream_origin status = if is_nil(deleted_at), do: :created, else: :deleted %StreamInfo{ stream_uuid: stream_uuid, stream_id: stream_id, stream_version: stream_version, - start_version: start_version, + stream_origin: stream_origin, created_at: created_at, deleted_at: deleted_at, status: status diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 39088b62..c3ed6f31 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -59,27 +59,27 @@ defmodule EventStore.Streams.Stream do def paginate_streams(conn, opts), do: Storage.paginate_streams(conn, opts) - def read_stream_forward(conn, stream_uuid, start_version, count, opts) do + def read_stream_forward(conn, stream_uuid, stream_origin, count, opts) do with {:ok, stream} <- stream_info(conn, stream_uuid, :stream_exists, opts) do - read_storage_forward(conn, stream, start_version, count, opts) + read_storage_forward(conn, stream, stream_origin, count, opts) end end - def read_stream_backward(conn, stream_uuid, start_version, count, opts) do + def read_stream_backward(conn, stream_uuid, stream_origin, count, opts) do with {:ok, stream} <- stream_info(conn, stream_uuid, :stream_exists, opts) do - read_storage_backward(conn, stream, start_version, count, opts) + read_storage_backward(conn, stream, stream_origin, count, opts) end end - def stream_forward(conn, stream_uuid, start_version, opts) do + def stream_forward(conn, stream_uuid, stream_origin, opts) do with {:ok, stream} <- stream_info(conn, stream_uuid, :stream_exists, opts) do - stream_storage_forward(conn, stream, start_version, opts) + stream_storage_forward(conn, stream, stream_origin, opts) end end - def stream_backward(conn, stream_uuid, start_version, opts) do + def stream_backward(conn, stream_uuid, stream_origin, opts) do with {:ok, stream} <- stream_info(conn, stream_uuid, :stream_exists, opts) do - stream_storage_backward(conn, stream, start_version, opts) + stream_storage_backward(conn, stream, stream_origin, opts) end end @@ -218,26 +218,26 @@ defmodule EventStore.Streams.Stream do Storage.append_to_stream(conn, stream_id, prepared_events, opts) end - defp read_storage_forward(conn, %StreamInfo{} = stream, start_version, count, opts) do + defp read_storage_forward(conn, %StreamInfo{} = stream, stream_origin, count, opts) do %StreamInfo{stream_id: stream_id} = stream {serializer, opts} = Keyword.pop(opts, :serializer) with {:ok, recorded_events} <- - Storage.read_stream_forward(conn, stream_id, start_version, count, opts) do + Storage.read_stream_forward(conn, stream_id, stream_origin, count, opts) do deserialized_events = deserialize_recorded_events(recorded_events, serializer) {:ok, deserialized_events} end end - defp read_storage_backward(conn, %StreamInfo{} = stream, start_version, count, opts) do + defp read_storage_backward(conn, %StreamInfo{} = stream, stream_origin, count, opts) do %StreamInfo{stream_id: stream_id} = stream {serializer, opts} = Keyword.pop(opts, :serializer) with {:ok, recorded_events} <- - Storage.read_stream_backward(conn, stream_id, start_version, count, opts) do + Storage.read_stream_backward(conn, stream_id, stream_origin, count, opts) do deserialized_events = deserialize_recorded_events(recorded_events, serializer) {:ok, deserialized_events} @@ -248,11 +248,11 @@ defmodule EventStore.Streams.Stream do defp stream_storage_forward(conn, stream, 0, opts), do: stream_storage_forward(conn, stream, 1, opts) - defp stream_storage_forward(conn, stream, start_version, opts) do + defp stream_storage_forward(conn, stream, stream_origin, opts) do read_batch_size = Keyword.fetch!(opts, :read_batch_size) Elixir.Stream.resource( - fn -> start_version end, + fn -> stream_origin end, fn next_version -> case read_storage_forward(conn, stream, next_version, read_batch_size, opts) do {:ok, []} -> {:halt, next_version} @@ -270,11 +270,11 @@ defmodule EventStore.Streams.Stream do stream_storage_backward(conn, stream, stream_version, opts) end - defp stream_storage_backward(conn, stream, start_version, opts) do + defp stream_storage_backward(conn, stream, stream_origin, opts) do read_batch_size = Keyword.fetch!(opts, :read_batch_size) Elixir.Stream.resource( - fn -> start_version end, + fn -> stream_origin end, fn next_version when next_version <= 0 -> {:halt, 0} diff --git a/lib/event_store/streams/stream_info.ex b/lib/event_store/streams/stream_info.ex index 6818ddbc..cb67ab29 100644 --- a/lib/event_store/streams/stream_info.ex +++ b/lib/event_store/streams/stream_info.ex @@ -5,13 +5,13 @@ defmodule EventStore.Streams.StreamInfo do stream_uuid: String.t(), stream_id: non_neg_integer() | nil, stream_version: non_neg_integer(), - start_version: non_neg_integer(), + stream_origin: non_neg_integer(), created_at: DateTime.t(), deleted_at: DateTime.t() | nil, status: :created | :deleted | nil } - defstruct [:stream_uuid, :stream_id, :created_at, :deleted_at, :status, stream_version: 0, start_version: 0] + defstruct [:stream_uuid, :stream_id, :created_at, :deleted_at, :status, stream_version: 0, stream_origin: 0] def new(stream_uuid) do %StreamInfo{stream_uuid: stream_uuid} diff --git a/priv/event_store/migrations/v1.4.0.sql b/priv/event_store/migrations/v1.4.0.sql index 0a1350f7..e427f6d9 100644 --- a/priv/event_store/migrations/v1.4.0.sql +++ b/priv/event_store/migrations/v1.4.0.sql @@ -1,7 +1,7 @@ DO $$ BEGIN - ALTER TABLE streams ADD COLUMN start_version bigint DEFAULT 0; + ALTER TABLE streams ADD COLUMN stream_origin bigint DEFAULT 0; -- record schema migration INSERT INTO schema_migrations (major_version, minor_version, patch_version) VALUES (1, 4, 0); diff --git a/test/storage/read_events_test.exs b/test/storage/read_events_test.exs index 4d1d8b02..abfb2b97 100644 --- a/test/storage/read_events_test.exs +++ b/test/storage/read_events_test.exs @@ -168,16 +168,16 @@ defmodule EventStore.Storage.ReadEventsTest do end end - defp read_all_stream_forward(context, start_version, count) do + defp read_all_stream_forward(context, stream_origin, count) do %{conn: conn, schema: schema} = context - Storage.read_stream_forward(conn, 0, start_version, count, schema: schema) + Storage.read_stream_forward(conn, 0, stream_origin, count, schema: schema) end - defp read_stream_forward(context, stream_id, start_version, count) do + defp read_stream_forward(context, stream_id, stream_origin, count) do %{conn: conn, schema: schema} = context - Storage.read_stream_forward(conn, stream_id, start_version, count, schema: schema) + Storage.read_stream_forward(conn, stream_id, stream_origin, count, schema: schema) end defp pluck(enumerable, field), do: Enum.map(enumerable, &Map.get(&1, field)) From 2a25b5d6d91b942343cdf627e393a243c6100817 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Tue, 28 Apr 2026 12:17:39 -0400 Subject: [PATCH 3/4] Implement trim option to set origin to start of the passed-in event batch --- lib/event_store.ex | 2 +- lib/event_store/sql/statements.ex | 1 + .../statements/trim_stream_to_next.sql.eex | 3 + lib/event_store/storage/appender.ex | 25 ++++++++ lib/event_store/streams/stream.ex | 60 +++++++++---------- test/event_store_test.exs | 33 ++++++++++ 6 files changed, 92 insertions(+), 32 deletions(-) create mode 100644 lib/event_store/sql/statements/trim_stream_to_next.sql.eex diff --git a/lib/event_store.ex b/lib/event_store.ex index 719fed1b..429718df 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -294,7 +294,7 @@ defmodule EventStore do Supervisor.stop(supervisor, :normal, timeout) end - @accepted_overrides_append_to_stream [:created_at_override] + @accepted_overrides_append_to_stream [:created_at_override, :trim] def append_to_stream(stream_uuid, expected_version, events, opts \\ []) diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index c9801731..9c18c106 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -14,6 +14,7 @@ defmodule EventStore.Sql.Statements do {:insert_events, [:schema, :stream_id, :number_of_events, :created_at]}, {:insert_events_any_version, [:schema, :stream_id, :number_of_events, :created_at]}, {:insert_link_events, [:schema, :number_of_events]}, + {:trim_stream_to_next, [:schema]}, {:soft_delete_stream, [:schema]}, {:hard_delete_stream, [:schema]}, {:insert_subscription, [:schema]}, diff --git a/lib/event_store/sql/statements/trim_stream_to_next.sql.eex b/lib/event_store/sql/statements/trim_stream_to_next.sql.eex new file mode 100644 index 00000000..c4e2a493 --- /dev/null +++ b/lib/event_store/sql/statements/trim_stream_to_next.sql.eex @@ -0,0 +1,3 @@ +UPDATE "<%= schema %>".streams +SET stream_origin = stream_version + 1 +WHERE stream_uuid = $1; diff --git a/lib/event_store/storage/appender.ex b/lib/event_store/storage/appender.ex index 13c2f40e..4e9dfe42 100644 --- a/lib/event_store/storage/appender.ex +++ b/lib/event_store/storage/appender.ex @@ -16,6 +16,15 @@ defmodule EventStore.Storage.Appender do def append(conn, stream_id, events, opts) do [%RecordedEvent{stream_uuid: stream_uuid} | _] = events + {trim, opts} = Keyword.pop(opts, :trim, false) + + if trim do + # We could probably do this by being really smart in the insert query, but + # these queries are already somewhat unwieldy. We don't trim all the time, so + # the extra db roundtrip is unlikely to be noticeable. + :ok = trim_stream_to_next(conn, stream_uuid, opts) + end + try do events |> Stream.map(&encode_uuids/1) @@ -131,6 +140,22 @@ defmodule EventStore.Storage.Appender do end end + defp trim_stream_to_next(conn, stream_uuid, opts) do + {schema, opts} = Keyword.pop(opts, :schema) + statement = Statements.trim_stream_to_next(schema) + + case Postgrex.query(conn, statement, [stream_uuid], opts) do + {:ok, %Postgrex.Result{num_rows: 0}} -> + {:error, :not_found} + + {:ok, _} -> + :ok + + {:error, error} -> + handle_error(error) + end + end + defp append_if(params, true, value) when not is_nil(value), do: params ++ [value] defp append_if(params, _, _), do: params diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index c3ed6f31..5cd0af6b 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -4,40 +4,38 @@ defmodule EventStore.Streams.Stream do alias EventStore.{EventData, RecordedEvent, Storage, UUID} alias EventStore.Streams.StreamInfo - def append_to_stream(conn, stream_uuid, expected_version, events, opts) - when length(events) < 1000 do - {serializer, new_opts} = Keyword.pop(opts, :serializer) - - with {:ok, stream} <- stream_info(conn, stream_uuid, expected_version, new_opts), - :ok <- do_append_to_storage(conn, stream, events, expected_version, serializer, new_opts) do - :ok - end - |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) - end - def append_to_stream(conn, stream_uuid, expected_version, events, opts) do {serializer, new_opts} = Keyword.pop(opts, :serializer) - transaction( - conn, - fn transaction -> - with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), - :ok <- - do_append_to_storage( - transaction, - stream, - events, - expected_version, - serializer, - new_opts - ) do - :ok - else - {:error, error} -> Postgrex.rollback(transaction, error) - end - end, - new_opts - ) + if length(events) < 1000 and not Keyword.has_key?(opts, :trim) do + # This will be written as a single batch, so we don't need a db call to initiate a transaction. + with {:ok, stream} <- stream_info(conn, stream_uuid, expected_version, new_opts), + :ok <- + do_append_to_storage(conn, stream, events, expected_version, serializer, new_opts) do + :ok + end + else + transaction( + conn, + fn transaction -> + with {:ok, stream} <- stream_info(transaction, stream_uuid, expected_version, new_opts), + :ok <- + do_append_to_storage( + transaction, + stream, + events, + expected_version, + serializer, + new_opts + ) do + :ok + else + {:error, error} -> Postgrex.rollback(transaction, error) + end + end, + new_opts + ) + end |> maybe_retry_once(conn, stream_uuid, expected_version, events, opts) end diff --git a/test/event_store_test.exs b/test/event_store_test.exs index be40a1cc..fb22140d 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -188,6 +188,39 @@ defmodule EventStore.EventStoreTest do end end + describe "read events with non-zero origin" do + setup do + stream_uuid = UUID.uuid4() + old_events = EventFactory.create_events(5) + new_events = EventFactory.create_events(10) + + [stream_uuid: stream_uuid, old_events: old_events, new_events: new_events] + end + + # Not testing the streaming versions again, as they end up doing the same query anyway. + + test "read stream forward", %{stream_uuid: stream_uuid, old_events: old_events, new_events: new_events} do + :ok = EventStore.append_to_stream(stream_uuid, 0, old_events) + :ok = EventStore.append_to_stream(stream_uuid, 5, new_events, trim: true) + + {:ok, recorded_events} = EventStore.read_stream_forward(stream_uuid, 0) + + assert_recorded_events(stream_uuid, 6..15, new_events, recorded_events) + end + + test "read stream backward", %{stream_uuid: stream_uuid, old_events: old_events, new_events: new_events} do + :ok = EventStore.append_to_stream(stream_uuid, 0, old_events) + :ok = EventStore.append_to_stream(stream_uuid, 5, new_events, trim: true) + + {:ok, recorded_events} = EventStore.read_stream_backward(stream_uuid) + + assert_recorded_events(stream_uuid, 15..6//-1, Enum.reverse(new_events), recorded_events) + end + + + + end + test "unicode character support" do unicode_text = "Unicode characters are supported ✅" stream_uuid = UUID.uuid4() From 680a55423e184465a6e9ab63025ca9667536c355 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Tue, 28 Apr 2026 14:59:06 -0400 Subject: [PATCH 4/4] Format --- lib/event_store/streams/stream_info.ex | 10 +++++++++- test/event_store_test.exs | 15 ++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/lib/event_store/streams/stream_info.ex b/lib/event_store/streams/stream_info.ex index cb67ab29..159f9872 100644 --- a/lib/event_store/streams/stream_info.ex +++ b/lib/event_store/streams/stream_info.ex @@ -11,7 +11,15 @@ defmodule EventStore.Streams.StreamInfo do status: :created | :deleted | nil } - defstruct [:stream_uuid, :stream_id, :created_at, :deleted_at, :status, stream_version: 0, stream_origin: 0] + defstruct [ + :stream_uuid, + :stream_id, + :created_at, + :deleted_at, + :status, + stream_version: 0, + stream_origin: 0 + ] def new(stream_uuid) do %StreamInfo{stream_uuid: stream_uuid} diff --git a/test/event_store_test.exs b/test/event_store_test.exs index fb22140d..ed4b2876 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -199,7 +199,11 @@ defmodule EventStore.EventStoreTest do # Not testing the streaming versions again, as they end up doing the same query anyway. - test "read stream forward", %{stream_uuid: stream_uuid, old_events: old_events, new_events: new_events} do + test "read stream forward", %{ + stream_uuid: stream_uuid, + old_events: old_events, + new_events: new_events + } do :ok = EventStore.append_to_stream(stream_uuid, 0, old_events) :ok = EventStore.append_to_stream(stream_uuid, 5, new_events, trim: true) @@ -208,7 +212,11 @@ defmodule EventStore.EventStoreTest do assert_recorded_events(stream_uuid, 6..15, new_events, recorded_events) end - test "read stream backward", %{stream_uuid: stream_uuid, old_events: old_events, new_events: new_events} do + test "read stream backward", %{ + stream_uuid: stream_uuid, + old_events: old_events, + new_events: new_events + } do :ok = EventStore.append_to_stream(stream_uuid, 0, old_events) :ok = EventStore.append_to_stream(stream_uuid, 5, new_events, trim: true) @@ -216,9 +224,6 @@ defmodule EventStore.EventStoreTest do assert_recorded_events(stream_uuid, 15..6//-1, Enum.reverse(new_events), recorded_events) end - - - end test "unicode character support" do