Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,21 @@ Previous usage:
```elixir
EventStore.append_to_stream(stream_uuid, expected_version, events, timeout)
EventStore.link_to_stream(stream_uuid, expected_version, events_or_event_ids, timeout)
EventStore.read_stream_forward(stream_uuid, start_version, count, timeout)
EventStore.read_all_streams_forward(start_version, count, timeout)
EventStore.stream_forward(stream_uuid, start_version, read_batch_size, timeout)
EventStore.stream_all_forward(start_version, read_batch_size, timeout)
EventStore.read_stream_forward(stream_uuid, stream_origin, count, timeout)
EventStore.read_all_streams_forward(stream_origin, count, timeout)
EventStore.stream_forward(stream_uuid, stream_origin, read_batch_size, timeout)
EventStore.stream_all_forward(stream_origin, read_batch_size, timeout)
```

Usage now:

```elixir
EventStore.append_to_stream(stream_uuid, expected_version, events, timeout: timeout)
EventStore.link_to_stream(stream_uuid, expected_version, events_or_event_ids, timeout: timeout)
EventStore.read_stream_forward(stream_uuid, start_version, count, timeout: timeout)
EventStore.read_all_streams_forward(start_version, count, timeout: timeout)
EventStore.stream_forward(stream_uuid, start_version, read_batch_size: read_batch_size, timeout: timeout)
EventStore.stream_all_forward(start_version, read_batch_size: read_batch_size, timeout: timeout)
EventStore.read_stream_forward(stream_uuid, stream_origin, count, timeout: timeout)
EventStore.read_all_streams_forward(stream_origin, count, timeout: timeout)
EventStore.stream_forward(stream_uuid, stream_origin, read_batch_size: read_batch_size, timeout: timeout)
EventStore.stream_all_forward(stream_origin, read_batch_size: read_batch_size, timeout: timeout)
```

### Upgrading
Expand Down
6 changes: 6 additions & 0 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ By default subscriptions are created from the stream origin; they will receive a
- `:current` - subscribe to events from the current version.
- `event_number` (integer) - specify an exact event number to subscribe from. This will be the same as the stream version for single stream subscriptions.

Note that in all cases the streaming will never go "past" the stream's origin, wich
can be set with `EventStore.trim/2`. This functionality allows garbage collection of
events that are not part of any stream. The current origin is returned in the `stream_origin`
field of the `EventStore.stream_info/1` result. If the event number passed in is
smaller than the stream's origin, the stream will still start at the origin.

### Acknowledge received events

Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event and to indicate the subscription is ready to receive the next event.
Expand Down
20 changes: 20 additions & 0 deletions guides/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,23 @@ Hard delete a stream that should exist:
```elixir
:ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard)
```

## Stream origin and garbage collection

By default, a stream starts at the first event added to it. But this means
that the event store will grow without bounds, as there is no way to remove
events from a stream.

To alleviate this problem, streams have variable origins: if you pass
`trim: true` to a `MyApp.EventStore.append_to_stream` call, then the origin
of the stream will be set to the first event passed into that call. This
functionality currently allows you to pass in a "fresh start" event, which
in CQRS/ES is used for various purposes like "closing the books". In effect,
all events that were in the stream before are now invisible: subscriptions
will start at this new origin, meaning that they won't need to traverse
all of the event history on start (or, alternatively, see incomplete
data when starting at the end or somewhere in the middle of a stream).

In the (hopefully near) future, this will allow EventStore to implement garbage
collection: any events that are not visible in any streams can be safely deleted. This
way, by judiciously trimming streams, database sizes can be kept under control.
16 changes: 10 additions & 6 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ defmodule EventStore do
Supervisor.stop(supervisor, :normal, timeout)
end

@accepted_overrides_append_to_stream [:created_at_override]
@accepted_overrides_append_to_stream [:created_at_override, :trim]

def append_to_stream(stream_uuid, expected_version, events, opts \\ [])

Expand Down Expand Up @@ -641,7 +641,11 @@ defmodule EventStore do
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database transaction, in
milliseconds. Defaults to 15,000ms.
milliseconds. Defaults to 15,000ms.
- `trim` a boolean defaulting to false. If true, then the stream will be "trimmed",
which means that the stream's origin will be set to the first event in this call. In other
words, new subscriptions starting from `:origin` will start with the events passed into
this call.

Returns `:ok` on success, or an `{:error, reason}` tagged tuple. The returned
error may be due to one of the following reasons:
Expand Down Expand Up @@ -729,7 +733,7 @@ defmodule EventStore do
- `stream_uuid` is used to uniquely identify a stream.

- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
Defaults to the origin of the stream if not set.

- `count` optionally, the maximum number of events to read.
Defaults to to returning 1,000 events from the stream.
Expand All @@ -754,7 +758,7 @@ defmodule EventStore do
they were originally written.

- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
Defaults to the origin of the stream if not set.

- `count` optionally, the maximum number of events to read.
Defaults to returning 1,000 events from all streams.
Expand Down Expand Up @@ -825,7 +829,7 @@ defmodule EventStore do
originally written.

- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
Defaults to the origin of the stream if not set.

- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
Expand All @@ -845,7 +849,7 @@ defmodule EventStore do
written.

- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
Defaults to the origin of the stream if not set.

- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
Expand Down
3 changes: 2 additions & 1 deletion lib/event_store/sql/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule EventStore.Sql.Init do
stream_id bigserial PRIMARY KEY NOT NULL,
stream_uuid text NOT NULL,
stream_version bigint default 0 NOT NULL,
stream_origin bigint default 0 NOT NULL,
created_at timestamp with time zone DEFAULT NOW() NOT NULL,
deleted_at timestamp with time zone
);
Expand All @@ -54,7 +55,7 @@ defmodule EventStore.Sql.Init do
# Create `$all` stream
defp seed_all_stream do
"""
INSERT INTO streams (stream_id, stream_uuid, stream_version) VALUES (0, '$all', 0);
INSERT INTO streams (stream_id, stream_uuid, stream_version, stream_origin) VALUES (0, '$all', 0, 0);
"""
end

Expand Down
1 change: 1 addition & 0 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule EventStore.Sql.Statements do
{:insert_events, [:schema, :stream_id, :number_of_events, :created_at]},
{:insert_events_any_version, [:schema, :stream_id, :number_of_events, :created_at]},
{:insert_link_events, [:schema, :number_of_events]},
{:trim_stream_to_next, [:schema]},
{:soft_delete_stream, [:schema]},
{:hard_delete_stream, [:schema]},
{:insert_subscription, [:schema]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ SELECT
FROM "<%= schema %>".stream_events se
INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id
INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id
WHERE
se.stream_id = $1 AND (se.stream_version <= $2 OR $2 = -1)
WHERE se.stream_id = $1
AND (se.stream_version <= $2 OR $2 = -1)
AND se.stream_version >= s.stream_origin
ORDER BY
se.stream_version DESC
LIMIT $3;
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ SELECT
FROM "<%= schema %>".stream_events se
INNER JOIN "<%= schema %>".streams s ON s.stream_id = se.original_stream_id
INNER JOIN "<%= schema %>".events e ON se.event_id = e.event_id
WHERE
se.stream_id = $1 AND se.stream_version >= $2
WHERE se.stream_id = $1
AND se.stream_version >= $2
AND se.stream_version >= s.stream_origin
ORDER BY
se.stream_version ASC
LIMIT $3;
1 change: 1 addition & 0 deletions lib/event_store/sql/statements/query_stream_info.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ SELECT
stream_id,
stream_uuid,
stream_version,
stream_origin,
created_at,
deleted_at
FROM "<%= schema %>".streams
Expand Down
1 change: 1 addition & 0 deletions lib/event_store/sql/statements/query_streams.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ SELECT
stream_id,
stream_uuid,
stream_version,
stream_origin,
created_at,
deleted_at
FROM "<%= schema %>".streams
Expand Down
3 changes: 3 additions & 0 deletions lib/event_store/sql/statements/trim_stream_to_next.sql.eex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UPDATE "<%= schema %>".streams
SET stream_origin = stream_version + 1
WHERE stream_uuid = $1;
4 changes: 2 additions & 2 deletions lib/event_store/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ defmodule EventStore.Storage do
Read events for the given stream forward from the starting version, use zero
for all events for the stream.
"""
defdelegate read_stream_forward(conn, stream_id, start_version, count, opts),
defdelegate read_stream_forward(conn, stream_id, stream_origin, count, opts),
to: Reader,
as: :read_forward

@doc """
Read events for the given stream backward from the starting version, use -1
for all events for the stream.
"""
defdelegate read_stream_backward(conn, stream_id, start_version, count, opts),
defdelegate read_stream_backward(conn, stream_id, stream_origin, count, opts),
to: Reader,
as: :read_backward

Expand Down
25 changes: 25 additions & 0 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ defmodule EventStore.Storage.Appender do
def append(conn, stream_id, events, opts) do
[%RecordedEvent{stream_uuid: stream_uuid} | _] = events

{trim, opts} = Keyword.pop(opts, :trim, false)

if trim do
# We could probably do this by being really smart in the insert query, but
# these queries are already somewhat unwieldy. We don't trim all the time, so
# the extra db roundtrip is unlikely to be noticeable.
:ok = trim_stream_to_next(conn, stream_uuid, opts)
end

try do
events
|> Stream.map(&encode_uuids/1)
Expand Down Expand Up @@ -131,6 +140,22 @@ defmodule EventStore.Storage.Appender do
end
end

defp trim_stream_to_next(conn, stream_uuid, opts) do
{schema, opts} = Keyword.pop(opts, :schema)
statement = Statements.trim_stream_to_next(schema)

case Postgrex.query(conn, statement, [stream_uuid], opts) do
{:ok, %Postgrex.Result{num_rows: 0}} ->
{:error, :not_found}

{:ok, _} ->
:ok

{:error, error} ->
handle_error(error)
end
end

defp append_if(params, true, value) when not is_nil(value), do: params ++ [value]
defp append_if(params, _, _), do: params

Expand Down
16 changes: 8 additions & 8 deletions lib/event_store/storage/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule EventStore.Storage.Reader do
@doc """
Read events from a single stream forwards from the given starting version.
"""
def read_forward(conn, stream_id, start_version, count, opts) do
case Reader.Query.read_events_forward(conn, stream_id, start_version, count, opts) do
def read_forward(conn, stream_id, stream_origin, count, opts) do
case Reader.Query.read_events_forward(conn, stream_id, stream_origin, count, opts) do
{:ok, []} = reply -> reply
{:ok, rows} -> map_rows_to_event_data(rows)
{:error, reason} -> failed_to_read(stream_id, reason)
Expand All @@ -21,8 +21,8 @@ defmodule EventStore.Storage.Reader do
@doc """
Read events from a single stream backwards from the given starting version.
"""
def read_backward(conn, stream_id, start_version, count, opts) do
case Reader.Query.read_events_backward(conn, stream_id, start_version, count, opts) do
def read_backward(conn, stream_id, stream_origin, count, opts) do
case Reader.Query.read_events_backward(conn, stream_id, stream_origin, count, opts) do
{:ok, []} = reply -> reply
{:ok, rows} -> map_rows_to_event_data(rows)
{:error, reason} -> failed_to_read(stream_id, reason)
Expand Down Expand Up @@ -106,20 +106,20 @@ defmodule EventStore.Storage.Reader do
defmodule Query do
@moduledoc false

def read_events_forward(conn, stream_id, start_version, count, opts) do
def read_events_forward(conn, stream_id, stream_origin, count, opts) do
{schema, opts} = Keyword.pop(opts, :schema)

query = Statements.query_stream_events_forward(schema)

do_query(conn, query, [stream_id, start_version, count], opts)
do_query(conn, query, [stream_id, stream_origin, count], opts)
end

def read_events_backward(conn, stream_id, start_version, count, opts) do
def read_events_backward(conn, stream_id, stream_origin, count, opts) do
{schema, opts} = Keyword.pop(opts, :schema)

query = Statements.query_stream_events_backward(schema)

do_query(conn, query, [stream_id, start_version, count], opts)
do_query(conn, query, [stream_id, stream_origin, count], opts)
end

defp do_query(conn, query, params, opts) do
Expand Down
4 changes: 3 additions & 1 deletion lib/event_store/storage/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,17 @@ defmodule EventStore.Storage.Stream do
end

defp to_stream_info(row) do
[stream_id, stream_uuid, stream_version, created_at, deleted_at] = row
[stream_id, stream_uuid, stream_version, stream_origin, created_at, deleted_at] = row

stream_version = if is_nil(stream_version), do: 0, else: stream_version
stream_origin = if is_nil(stream_origin), do: 0, else: stream_origin
status = if is_nil(deleted_at), do: :created, else: :deleted

%StreamInfo{
stream_uuid: stream_uuid,
stream_id: stream_id,
stream_version: stream_version,
stream_origin: stream_origin,
created_at: created_at,
deleted_at: deleted_at,
status: status
Expand Down
Loading
Loading