diff --git a/lib/event_store.ex b/lib/event_store.ex index a02560a8..c4c773d6 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -254,7 +254,7 @@ defmodule EventStore do quote bind_quoted: [opts: opts] do @behaviour EventStore - alias EventStore.{Config, EventData, PubSub, Subscriptions} + alias EventStore.{Config, EventData, PubSub, Subscriptions, Telemetry} alias EventStore.Snapshots.{SnapshotData, Snapshotter} alias EventStore.Subscriptions.Subscription alias EventStore.Streams.Stream @@ -306,7 +306,15 @@ defmodule EventStore do {conn, opts} = parse_opts(opts) opts = Keyword.merge(opts, overrides) - Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + telemetry_metadata = %{ + stream_uuid: stream_uuid, + expected_version: expected_version, + event_count: length(events) + } + + Telemetry.measure_span(:append_to_stream, telemetry_metadata, fn -> + Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts) + end) end def link_to_stream( @@ -335,7 +343,15 @@ defmodule EventStore do def read_stream_forward(stream_uuid, start_version, count, opts) do {conn, opts} = parse_opts(opts) - Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + telemetry_metadata = %{ + stream_uuid: stream_uuid, + start_version: start_version, + count: count + } + + Telemetry.measure_span(:read_stream_forward, telemetry_metadata, fn -> + Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts) + end) end def read_all_streams_forward( diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index 5a3c95ce..dee1fa47 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -10,7 +10,8 @@ defmodule EventStore.Supervisor do Notifications, PubSub, Serializer, - Subscriptions + Subscriptions, + Telemetry } @doc """ @@ -90,7 +91,8 @@ defmodule EventStore.Supervisor do Supervisor.child_spec({Registry, keys: :unique, name: subscriptions_registry_name}, id: subscriptions_registry_name ), - {Notifications.Supervisor, {name, config}} + {Notifications.Supervisor, {name, config}}, + Telemetry.poller_child_spec(config) ] ++ PubSub.child_spec(name) :ok = Config.associate(name, self(), event_store, config) diff --git a/lib/event_store/telemetry.ex b/lib/event_store/telemetry.ex new file mode 100644 index 00000000..9b8705f4 --- /dev/null +++ b/lib/event_store/telemetry.ex @@ -0,0 +1,63 @@ +defmodule EventStore.Telemetry do + @moduledoc false + alias EventStore.Storage + + require Logger + + def poller_child_spec(opts) do + conn = Keyword.fetch!(opts, :conn) + schema = Keyword.fetch!(opts, :schema) + period = Keyword.get(opts, :telemetry_poller_period) || :timer.seconds(15) + init_delay = Keyword.get(opts, :telemetry_poller_init_delay) || :timer.seconds(5) + + {:telemetry_poller, + period: period, + init_delay: init_delay, + measurements: [ + {__MODULE__, :subscriptions, [conn, schema]} + ]} + end + + def subscriptions(conn, schema) do + with {:ok, stream_info} <- Storage.stream_info(conn, "$all", schema: schema), + {:ok, subscriptions} <- Storage.subscriptions(conn, schema: schema) do + Enum.each(subscriptions, fn subscription -> + measurements = %{ + last_seen: subscription.last_seen, + lag: stream_info.stream_version - subscription.last_seen + } + + metadata = %{ + stream_uuid: subscription.stream_uuid, + subscription_name: subscription.subscription_name + } + + execute(:subscription, measurements, metadata) + end) + else + _ -> + Logger.warning("Failed to emit subscription telemetry") + end + end + + def execute(event_name, measurements, metadata) do + :telemetry.execute([event_name_prefix(), event_name], measurements, metadata) + end + + def measure_span(event_name, metadata, func) do + :telemetry.span([event_name_prefix(), event_name], metadata, fn -> + case func.() do + :ok = result -> + {result, Map.put(metadata, :result, result)} + + {:ok, _result} = result -> + {result, Map.put(metadata, :result, result)} + + {:error, error} = result -> + {result, Map.merge(metadata, %{error: error, result: nil})} + end + end) + end + + defp event_name_prefix, do: :eventstore +end diff --git a/mix.exs b/mix.exs index 21f339d2..6f53d6ae 100644 --- a/mix.exs +++ b/mix.exs @@ -42,6 +42,8 @@ defmodule EventStore.Mixfile do {:fsm, "~> 0.3"}, {:gen_stage, "~> 1.2"}, {:postgrex, "~> 0.17"}, + {:telemetry, "~> 1.0"}, + {:telemetry_poller, "~> 1.0"}, # Optional dependencies {:jason, "~> 1.4", optional: true}, diff --git a/mix.lock b/mix.lock index 210c2141..312bc731 100644 --- a/mix.lock +++ b/mix.lock @@ -17,4 +17,5 @@ "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, }