From da08553652ca7c57cb558c3bc4154881c7d5f54d Mon Sep 17 00:00:00 2001 From: Tyler Pachal Date: Sun, 26 Jan 2025 10:18:33 -0400 Subject: [PATCH 1/5] Add Telemetry --- lib/event_store.ex | 26 +++++++++++-- lib/event_store/supervisor.ex | 8 +++- lib/event_store/telemetry.ex | 69 +++++++++++++++++++++++++++++++++++ mix.exs | 2 + mix.lock | 1 + 5 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 lib/event_store/telemetry.ex diff --git a/lib/event_store.ex b/lib/event_store.ex index a02560a8..6b55148c 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -254,7 +254,7 @@ defmodule EventStore do quote bind_quoted: [opts: opts] do @behaviour EventStore - alias EventStore.{Config, EventData, PubSub, Subscriptions} + alias EventStore.{Config, EventData, PubSub, Subscriptions, Telemetry} alias EventStore.Snapshots.{SnapshotData, Snapshotter} alias EventStore.Subscriptions.Subscription alias EventStore.Streams.Stream @@ -306,7 +306,17 @@ defmodule EventStore do {conn, opts} = parse_opts(opts) opts = Keyword.merge(opts, overrides) - Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + telemetry_metadata = %{ + stream_uuid: stream_uuid, + expected_version: expected_version, + event_count: length(events) + } + + # TODO: The Github issue says to measure the "count" and "total_time". We can measure those here, but would it + # be better to measure them at a "lower" level (i.e. in the Stream module)? + Telemetry.measure_span(:append_to_stream, telemetry_metadata, fn -> + Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + end) end def link_to_stream( @@ -335,7 +345,17 @@ defmodule EventStore do def read_stream_forward(stream_uuid, start_version, count, opts) do {conn, opts} = parse_opts(opts) - Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + telemetry_metadata = %{ + stream_uuid: stream_uuid, + start_version: start_version, + count: count + } + + # TODO: This is not flexible enough if we want to include part of the result in the :stop measurements. For + # example, if we want to return how many events were actually read (as opposed to the "requested" count). + Telemetry.measure_span(:read_stream_forward, telemetry_metadata, fn -> + Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + end) end def read_all_streams_forward( diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index 5a3c95ce..8be99bb0 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -10,7 +10,8 @@ defmodule EventStore.Supervisor do Notifications, PubSub, Serializer, - Subscriptions + Subscriptions, + Telemetry } @doc """ @@ -90,7 +91,10 @@ defmodule EventStore.Supervisor do Supervisor.child_spec({Registry, keys: :unique, name: subscriptions_registry_name}, id: subscriptions_registry_name ), - {Notifications.Supervisor, {name, config}} + {Notifications.Supervisor, {name, config}}, + # TODO: Should this be opt-in? It adds load on the database. + # TODO: Only run one poller per cluster. + Telemetry.poller_child_spec(conn: conn, schema: schema) ] ++ PubSub.child_spec(name) :ok = Config.associate(name, self(), event_store, config) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex new file mode 100644 index 00000000..0fe25503 --- /dev/null +++ b/lib/event_store/telemetry.ex @@ -0,0 +1,69 @@ +defmodule EventStore.Telemetry do + @moduledoc false + alias EventStore.Storage + alias EventStore.Storage.Subscription + + require Logger + + def poller_child_spec(opts) do + conn = Keyword.fetch!(opts, :conn) + schema = Keyword.fetch!(opts, :schema) + # TODO: Do the period and init_delay need to be configurable? + period = Keyword.get(opts, :period) || :timer.seconds(15) + init_delay = Keyword.get(opts, :init_delay) || :timer.seconds(5) + + {:telemetry_poller, + period: period, + init_delay: init_delay, + measurements: [ + {__MODULE__, :subscriptions, [conn, schema]} + ]} + end + + def subscriptions(conn, schema) do + case Storage.subscriptions(conn, schema: schema) do + {:ok, subscriptions} -> + Enum.each(subscriptions, fn %Subscription{} = subscription -> + # TODO: The Github issue mentions including a "lag" metric, but for that we need to know the total count of + # events, which may be expensive to query for. Do we want to implement that? + measurements = %{ + last_seen: subscription.last_seen + } + + # TODO: The Github issue mentions having including the "last processed event". How do we get that? + metadata = %{ + stream_uuid: subscription.stream_uuid, + subscription_name: subscription.subscription_name + } + + execute(:subscription, measurements, metadata) + end) + + # TODO: How do we want to handle these errors? + {:error, _error} -> + Logger.warning("Failed to emit subscription telemetry") + end + end + + def execute(event_name, measurements, metadata) do + :telemetry.execute([event_name_prefix(), event_name], measurements, metadata) + end + + def measure_span(event_name, metadata, func) do + :telemetry.span([event_name_prefix(), event_name], metadata, fn -> + case func.() do + :ok = result -> + {result, Map.put(metadata, :result, result)} + + {:ok, _result} = result -> + {result, Map.put(metadata, :result, result)} + + {:error, error} = result -> + {result, Map.merge(metadata, %{error: error, result: nil})} + end + end) + end + + # TODO: Should this be :eventstore like in the mix.exs file? Or :event_store to match the module names? + defp event_name_prefix, do: :event_store +end diff --git a/mix.exs b/mix.exs index 21f339d2..6f53d6ae 100644 --- a/mix.exs +++ b/mix.exs @@ -42,6 +42,8 @@ defmodule EventStore.Mixfile do {:fsm, "~> 0.3"}, {:gen_stage, "~> 1.2"}, {:postgrex, "~> 0.17"}, + {:telemetry, "~> 1.0"}, + {:telemetry_poller, "~> 1.0"}, # Optional dependencies {:jason, "~> 1.4", optional: true}, diff --git a/mix.lock b/mix.lock index 210c2141..312bc731 100644 --- a/mix.lock +++ b/mix.lock @@ -17,4 +17,5 @@ "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, } From 1628fde8b375b4850c26f357841bd14870b45880 Mon Sep 17 00:00:00 2001 From: Tyler Pachal Date: Sun, 26 Jan 2025 23:12:24 -0400 Subject: [PATCH 2/5] Add subscription lag measurement --- lib/event_store/telemetry.ex | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex index 0fe25503..97b4ceae 100644 --- a/lib/event_store/telemetry.ex +++ b/lib/event_store/telemetry.ex @@ -1,7 +1,6 @@ defmodule EventStore.Telemetry do @moduledoc false alias EventStore.Storage - alias EventStore.Storage.Subscription require Logger @@ -21,26 +20,24 @@ defmodule EventStore.Telemetry do end def subscriptions(conn, schema) do - case Storage.subscriptions(conn, schema: schema) do - {:ok, subscriptions} -> - Enum.each(subscriptions, fn %Subscription{} = subscription -> - # TODO: The Github issue mentions including a "lag" metric, but for that we need to know the total count of - # events, which may be expensive to query for. Do we want to implement that? - measurements = %{ - last_seen: subscription.last_seen - } + with {:ok, stream_info} <- Storage.stream_info(conn, "$all", schema: schema), + {:ok, subscriptions} <- Storage.subscriptions(conn, schema: schema) do + Enum.each(subscriptions, fn subscription -> + measurements = %{ + last_seen: subscription.last_seen, + lag: stream_info.stream_version - subscription.last_seen + } - # TODO: The Github issue mentions having including the "last processed event". How do we get that? - metadata = %{ - stream_uuid: subscription.stream_uuid, - subscription_name: subscription.subscription_name - } + # TODO: The Github issue mentions having including the "last processed event". How do we get that? + metadata = %{ + stream_uuid: subscription.stream_uuid, + subscription_name: subscription.subscription_name + } - execute(:subscription, measurements, metadata) - end) - - # TODO: How do we want to handle these errors? - {:error, _error} -> + execute(:subscription, measurements, metadata) + end) + else + _ -> Logger.warning("Failed to emit subscription telemetry") end end From ebb1485b19ceb0a85b5a3b2aac2e035e6711fb25 Mon Sep 17 00:00:00 2001 From: Tyler Pachal Date: Sun, 26 Jan 2025 23:28:05 -0400 Subject: [PATCH 3/5] Move TODOs to GH comments --- lib/event_store.ex | 4 ---- lib/event_store/supervisor.ex | 2 -- lib/event_store/telemetry.ex | 3 --- 3 files changed, 9 deletions(-) diff --git a/lib/event_store.ex b/lib/event_store.ex index 6b55148c..c4c773d6 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -312,8 +312,6 @@ defmodule EventStore do event_count: length(events) } - # TODO: The Github issue says to measure the "count" and "total_time". We can measure those here, but would it - # be better to measure them at a "lower" level (i.e. in the Stream module)? Telemetry.measure_span(:append_to_stream, telemetry_metadata, fn -> Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) end) @@ -351,8 +349,6 @@ defmodule EventStore do count: count } - # TODO: This is not flexible enough if we want to include part of the result in the :stop measurements. For - # example, if we want to return how many events were actually read (as opposed to the "requested" count). Telemetry.measure_span(:read_stream_forward, telemetry_metadata, fn -> Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) end) diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index 8be99bb0..7ef65507 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -92,8 +92,6 @@ defmodule EventStore.Supervisor do id: subscriptions_registry_name ), {Notifications.Supervisor, {name, config}}, - # TODO: Should this be opt-in? It adds load on the database. - # TODO: Only run one poller per cluster. Telemetry.poller_child_spec(conn: conn, schema: schema) ] ++ PubSub.child_spec(name) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex index 97b4ceae..d4b72a7b 100644 --- a/lib/event_store/telemetry.ex +++ b/lib/event_store/telemetry.ex @@ -7,7 +7,6 @@ defmodule EventStore.Telemetry do def poller_child_spec(opts) do conn = Keyword.fetch!(opts, :conn) schema = Keyword.fetch!(opts, :schema) - # TODO: Do the period and init_delay need to be configurable? period = Keyword.get(opts, :period) || :timer.seconds(15) init_delay = Keyword.get(opts, :init_delay) || :timer.seconds(5) @@ -28,7 +27,6 @@ defmodule EventStore.Telemetry do lag: stream_info.stream_version - subscription.last_seen } - # TODO: The Github issue mentions having including the "last processed event". How do we get that? metadata = %{ stream_uuid: subscription.stream_uuid, subscription_name: subscription.subscription_name @@ -61,6 +59,5 @@ defmodule EventStore.Telemetry do end) end - # TODO: Should this be :eventstore like in the mix.exs file? Or :event_store to match the module names? defp event_name_prefix, do: :event_store end From 84ff4e8e15f580ec07f52069797a9cd818658b78 Mon Sep 17 00:00:00 2001 From: Tyler Pachal Date: Sun, 26 Jan 2025 23:44:54 -0400 Subject: [PATCH 4/5] poller configuration --- lib/event_store/supervisor.ex | 2 +- lib/event_store/telemetry.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index 7ef65507..dee1fa47 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -92,7 +92,7 @@ defmodule EventStore.Supervisor do id: subscriptions_registry_name ), {Notifications.Supervisor, {name, config}}, - Telemetry.poller_child_spec(conn: conn, schema: schema) + Telemetry.poller_child_spec(config) ] ++ PubSub.child_spec(name) :ok = Config.associate(name, self(), event_store, config) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex index d4b72a7b..f6179a8d 100644 --- a/lib/event_store/telemetry.ex +++ b/lib/event_store/telemetry.ex @@ -7,8 +7,8 @@ defmodule EventStore.Telemetry do def poller_child_spec(opts) do conn = Keyword.fetch!(opts, :conn) schema = Keyword.fetch!(opts, :schema) - period = Keyword.get(opts, :period) || :timer.seconds(15) - init_delay = Keyword.get(opts, :init_delay) || :timer.seconds(5) + period = Keyword.get(opts, :telemetry_poller_period) || :timer.seconds(15) + init_delay = Keyword.get(opts, :telemetry_poller_init_delay) || :timer.seconds(5) {:telemetry_poller, period: period, From 5ccc643871ce9cef7d73d29f618329bc8ffa7be6 Mon Sep 17 00:00:00 2001 From: Tyler Pachal Date: Mon, 27 Jan 2025 14:18:51 -0400 Subject: [PATCH 5/5] Fix metric prefix --- lib/event_store/telemetry.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex index f6179a8d..9b8705f4 100644 --- a/lib/event_store/telemetry.ex +++ b/lib/event_store/telemetry.ex @@ -59,5 +59,5 @@ defmodule EventStore.Telemetry do end) end - defp event_name_prefix, do: :event_store + defp event_name_prefix, do: :eventstore end