Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
542 changes: 89 additions & 453 deletions lib/phoenix/tracker.ex

Large diffs are not rendered by default.

497 changes: 497 additions & 0 deletions lib/phoenix/tracker/shard.ex

Large diffs are not rendered by default.

25 changes: 18 additions & 7 deletions lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Phoenix.Tracker.State do
@type context :: %{name => clock}
@type values :: ets_id | :extracted | %{tag => {pid, topic, key, meta}}
@type value :: {{topic, pid, key}, meta, tag}
@type key_meta :: {key, meta}
@type delta :: %State{mode: :delta}
@type pid_lookup :: {pid, topic, key}

Expand Down Expand Up @@ -50,13 +51,13 @@ defmodule Phoenix.Tracker.State do
%Phoenix.Tracker.State{...}

"""
@spec new(name) :: t
def new(replica) do
@spec new(name, atom) :: t
def new(replica, shard_name) do
reset_delta(%State{
replica: replica,
context: %{replica => 0},
mode: :normal,
values: :ets.new(:values, [:ordered_set]),
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
pids: :ets.new(:pids, [:duplicate_bag]),
replicas: %{replica => :up}})
end
Expand Down Expand Up @@ -113,12 +114,22 @@ defmodule Phoenix.Tracker.State do
@doc """
Returns a list of elements for the topic who belong to an online replica.
"""
@spec get_by_topic(t, topic) :: [value]
@spec get_by_topic(t, topic) :: [key_meta]
def get_by_topic(%State{values: values} = state, topic) do
replicas = down_replicas(state)
:ets.select(values, [{ {{topic, :_, :_}, :_, {:"$1", :_}},
not_in(:"$1", replicas), [:"$_"]}])
tracked_values(values, topic, down_replicas(state))
end

@doc """
Performs table lookup for tracked elements in the topic, filtering out
those present on downed replicas.
"""
def tracked_values(table, topic, down_replicas) do
:ets.select(table,
[{{{topic, :_, :"$1"}, :"$2", {:"$3", :_}},
not_in(:"$3", down_replicas),
[{{:"$1", :"$2"}}]}])
end

defp not_in(_pos, []), do: []
defp not_in(pos, replicas), do: [not: ors(pos, replicas)]
defp ors(pos, [rep]), do: {:"==", pos, {rep}}
Expand Down
18 changes: 10 additions & 8 deletions test/phoenix/pubsub/pg2_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule Phoenix.PubSub.PG2Test do
@node1 :"node1@127.0.0.1"
@node2 :"node2@127.0.0.1"

@receive_timeout 500

setup config do
size = config[:pool_size] || 1
if config[:pool_size] do
Expand All @@ -29,15 +31,15 @@ defmodule Phoenix.PubSub.PG2Test do

PubSub.subscribe(config.pubsub, config.topic)
:ok = PubSub.direct_broadcast(@node1, config.pubsub, config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout
:ok = PubSub.direct_broadcast!(@node1, config.pubsub, config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast(@node2, config.pubsub, config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast!(@node2, config.pubsub, config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout
end

@tag pool_size: size, topic: topic
Expand All @@ -46,15 +48,15 @@ defmodule Phoenix.PubSub.PG2Test do

PubSub.subscribe(config.pubsub, config.topic)
:ok = PubSub.direct_broadcast_from(@node1, config.pubsub, self(), config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout
:ok = PubSub.direct_broadcast_from!(@node1, config.pubsub, self(), config.topic, :ping)
assert_receive {@node1, :ping}
assert_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast_from(@node2, config.pubsub, self(), config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout

:ok = PubSub.direct_broadcast_from!(@node2, config.pubsub, self(), config.topic, :ping)
refute_receive {@node1, :ping}
refute_receive {@node1, :ping}, @receive_timeout
end
end

Expand Down
24 changes: 12 additions & 12 deletions test/phoenix/tracker/delta_generation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
|> Enum.sort()
end

defp new(node) do
State.new(node)
defp new(node, config) do
State.new(node, :"#{node} #{config.test}")
end

defp new_pid() do
Expand All @@ -23,9 +23,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
|> Enum.sort()
end

test "generations" do
s1 = new(:s1)
s2 = new(:s2)
test "generations", config do
s1 = new(:s1, config)
s2 = new(:s2, config)
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
assert [gen1, gen1, gen1] = gens = push(s1, [], s1.delta, [2, 5, 6])
assert keys(gen1) == ["user1"]
Expand Down Expand Up @@ -86,9 +86,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
assert sorted_clouds(gen3.clouds) == [{:s1, 3}, {:s1, 4}, {:s2, 1}, {:s2, 2}]
end

test "does not include non-contiguous deltas" do
s1 = new(:s1)
s3 = new(:s3)
test "does not include non-contiguous deltas", config do
s1 = new(:s1, config)
s3 = new(:s3, config)
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
old_s3 = s3 = State.join(s3, new_pid(), "lobby", "user3", %{})
s3 = State.reset_delta(s3)
Expand All @@ -99,10 +99,10 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
assert [^gen1, ^gen1, ^gen1] = push(s1, gens, s3.delta, [5, 10, 15])
end

test "remove_down_replicas" do
s1 = new(:s1)
s2 = new(:s2)
s3 = new(:s3)
test "remove_down_replicas", config do
s1 = new(:s1, config)
s2 = new(:s2, config)
s3 = new(:s3, config)
s2 = State.join(s2, new_pid(), "lobby", "user2", %{})
assert [gen1, gen1, gen1] = gens = push(s1, [], s2.delta, [5, 10, 15])
assert [pruned_gen1, pruned_gen1, pruned_gen1] = DeltaGeneration.remove_down_replicas(gens, :s2)
Expand Down
Loading