Scatter-gather batching for Elixir, built on GenStage. Individual requests are coalesced into a single bulk call and the results are correlated back to each caller.
Add :flurry to your list of dependencies in mix.exs:
def deps do
[
{:flurry, "~> 0.1.0"}
]
endA module that uses Flurry defines a bulk (list-in, list-out) function
and decorates it with @decorate batch(...). Flurry generates a
single-item entry point, runs a GenStage producer/consumer pipeline, and
correlates each caller's request with its result.
defmodule MyApp.UserBatcher do
use Flurry, repo: MyApp.Repo
@decorate batch(get(id))
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids)
end
endThe decorator generates MyApp.UserBatcher.get/1. Under concurrency,
N simultaneous calls collapse into one get_many/1 invocation:
MyApp.UserBatcher.get(42)
#=> %User{id: 42, ...}
MyApp.UserBatcher.get(999)
#=> nilA batch is emitted when any of the following conditions is met:
batch_sizepending requests have accumulated, or- the producer's mailbox is empty, meaning no further requests are immediately queued, or
max_waitmilliseconds have elapsed since the first pending request was enqueued.
The mailbox-empty check provides minimum latency under low load: a single
request arriving at an idle producer flushes immediately as a batch of one.
The max_wait timer (default 200ms) caps worst-case latency under slow
trickle conditions where requests arrive one at a time, fast enough to keep
the mailbox non-empty but too slowly to reach batch_size.
use Flurry generates start_link/1 and child_spec/1 on the module.
Add it to a supervision tree:
children = [
# ...
MyApp.UserBatcher
]Caps the size of any single bulk call. This is necessary because databases
such as PostgreSQL impose parameter limits on WHERE id IN (...) queries.
# Module-wide default
children = [{MyApp.UserBatcher, batch_size: 500}]
# Per-decorated-function override
@decorate batch(get(id), batch_size: 500)
def get_many(ids), do: ...
@decorate batch(get_with_posts(id), batch_size: 50)
def get_many_with_posts(ids), do: ...When more requests accumulate than batch_size allows, Flurry flushes
batch_size entries at a time across successive cycles.
Maximum time in milliseconds that the first pending request waits before
the producer forces a flush. Defaults to 200. Set to nil to disable.
# Module-wide default
children = [{MyApp.UserBatcher, max_wait: 500}]
# Per-decorated-function override
@decorate batch(get(id), max_wait: 100)
def get_many(ids), do: ...By default, the generated singular entry point is not overridable. Defining
a function with the same name and arity in the module produces a
redefinition error. The overridable: option on use Flurry allows
wrapping the generated function via super/1:
defmodule MyApp.UserBatcher do
use Flurry, repo: MyApp.Repo, overridable: [get: 1]
@decorate batch(get(id))
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids)
end
def get(id) do
case super(id) do
nil -> nil
user -> %{user | display_name: "[#{user.id}] #{user.name}"}
end
end
endThe option accepts a keyword list of name: arity pairs. At compile time,
Flurry validates that every :overridable entry has a matching
@decorate batch(...) decoration with the same arity.
Merges list-valued arguments across coalesced callers. This is useful when different callers specify different values that the batch should combine, such as Ecto preloads:
@decorate batch(get(id, preloads), additive: [:preloads])
def get_many(ids, preloads) do
Repo.all(from u in User, where: u.id in ^ids, preload: ^preloads)
end
# Three concurrent callers with distinct preloads:
MyApp.UserBatcher.get(1, [:posts])
MyApp.UserBatcher.get(2, [:comments])
MyApp.UserBatcher.get(3, [:posts, :profile])
# All three coalesce into one bulk call:
# get_many([1, 2, 3], [:posts, :comments, :profile])Named arguments are excluded from the producer's routing key, so callers
that differ only on additive arguments share a batch. At flush time, the
additive values are merged using list ++ list |> Enum.uniq/1.
Restrictions:
- Values at additive positions must be lists.
additive:andbatch_by:cannot be combined on the same decoration.- Every name in
additive:must appear in the decorator's group arguments.
Normalizes the non-batched arguments for coalescing purposes. Accepts a 1-arity function that receives the raw tuple of non-batched arguments and returns the canonical form used for both coalescing and the bulk function's argument positions:
@decorate batch(
get_post(slug, user),
batch_by: fn {user} -> {user.id} end
)
def get_many(slugs, user_id) do
Repo.all(from p in Post, where: p.slug in ^slugs and p.user_id == ^user_id)
endWith batch_by:, the bulk function signature takes the normalized values,
not the raw decorator argument types. Valid values are closures and
captures. batch_by: on a single-arg decoration raises at compile time.
Specifies how the match key is extracted from each returned record. By default, Flurry uses the first decorator argument's name as the record field. Two forms are supported:
- Atom -- names a top-level field on each returned record.
- Function -- a 1-arity function that extracts the key from each record.
@decorate batch(get(id), correlate: :uuid)
def get_many(ids) do
Repo.all(from r in Row, where: r.uuid in ^ids)
endSets the GenServer.call/3 timeout for the generated entry point.
Defaults to 5_000 (5 seconds).
@decorate batch(get(id), timeout: 30_000)
def get_many(ids) do
Repo.all(from u in User, where: u.id in ^ids, preload: [:posts, :comments])
endDefaults to :one, where each caller's argument corresponds to at most one
returned record. Use :list when the bulk function returns multiple records
per key:
@decorate batch(get_posts_by_user(user_id), returns: :list)
def get_many_posts_by_user(user_ids) do
Repo.all(from p in Post, where: p.user_id in ^user_ids)
endUsing :one on a function that returns duplicate keys raises
Flurry.AmbiguousBatchError.
Defaults to :bisect. When the bulk function raises or exits for a batch
of N entries, Flurry splits the batch in half and retries each half. The
recursion continues until a singleton failure isolates the problematic
entry. Every other caller in the original batch still receives their
correlated record.
Use :fail_all to surface a single failure as an error to every caller in
the batch without retrying.
:bisectre-invokes the bulk function with smaller subsets of the same inputs. If the bulk function has non-idempotent side effects, useon_failure: :fail_allto avoid double-writes.
When the decorated function takes more than one argument, the first argument is the batched variable and the remaining arguments determine which callers share a batch. Callers whose non-batched arguments are structurally equal coalesce into the same bulk call.
@decorate batch(get_post(slug, user_id, active?))
def get_many_posts(slugs, user_id, active?) do
Repo.all(
from p in Post,
where: p.slug in ^slugs and p.user_id == ^user_id and p.active == ^active?
)
endEach distinct combination of non-batched arguments has its own pending
list, batch_size cap, and slot in the producer's LRU flush rotation.
The bulk function runs in a background consumer process, not in the caller's process. This has consequences for database transactions:
- Writes performed by the bulk function do not participate in the caller's transaction. If the caller's transaction rolls back, those writes are not rolled back.
- Reads performed by the bulk function do not see the caller's uncommitted writes.
- Under
Ecto.Adapters.SQL.Sandbox, the consumer has no ancestry link to the test process, so queries produce ownership errors under:manualmode.
The repo: option is mandatory. Pass the Ecto repo module, or :none for
batchers with no database involvement:
use Flurry, repo: MyApp.Repo
use Flurry, repo: :noneControls behavior when the generated entry point is called inside a
transaction. With a real repo the default is :warn; with repo: :none
the default is :safe.
:warn-- Logs a warning whenRepo.checked_out?/0returns true.:safe-- Suppresses the warning. Use for reads that do not require read-your-writes consistency.:bypass-- Runs the bulk function inline in the caller's process when inside a transaction, so writes participate in the transaction's commit/rollback semantics. Outside a transaction, batches normally.
Enable global bypass in test_helper.exs:
ExUnit.start()
Flurry.Testing.enable_bypass_globally()With bypass enabled, every call to a decorated function runs the bulk function inline in the caller's process. The batching pipeline is not exercised in unit tests.
- Calling the bulk function directly (e.g.,
get_many/1) runs in the caller's process and does not coalesce with concurrent singular callers. - The default additive merge function (
list ++ list |> Enum.uniq/1) does not support nested preload trees.
MIT -- see LICENSE.