Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include a restart parameter #139

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/swarm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ defmodule Swarm do
@spec register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
@spec register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
{:ok, pid} | {:error, term}
def register_name(name, m, f, a, timeout \\ :infinity)
def register_name(name, m, f, a, timeout), do: Swarm.Registry.register(name, m, f, a, timeout)
def register_name(name, m, f, a, timeout \\ :infinity, restart \\ :temporary)
def register_name(name, m, f, a, timeout, restart), do: Swarm.Registry.register(name, m, f, a, timeout, restart)

@doc """
Either finds the named process in the swarm or registers it using the register function.
"""
@spec whereis_or_register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
@spec whereis_or_register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
{:ok, pid} | {:error, term}
def whereis_or_register_name(name, m, f, a, timeout \\ :infinity)
def whereis_or_register_name(name, m, f, a, timeout \\ :infinity, restart \\ :temporary)

def whereis_or_register_name(name, m, f, a, timeout),
do: Swarm.Registry.whereis_or_register(name, m, f, a, timeout)
def whereis_or_register_name(name, m, f, a, timeout, restart),
do: Swarm.Registry.whereis_or_register(name, m, f, a, timeout, restart)

@doc """
Unregisters the given name from the registry.
Expand Down
7 changes: 6 additions & 1 deletion lib/swarm/distribution/ring.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@ defmodule Swarm.Distribution.Ring do
def add_node(ring, node, weight), do: HashRing.add_node(ring, node, weight)
def add_nodes(ring, nodes), do: HashRing.add_nodes(ring, nodes)
def remove_node(ring, node), do: HashRing.remove_node(ring, node)
def key_to_node(ring, key), do: HashRing.key_to_node(ring, key)
def key_to_node(ring, key) do
case HashRing.key_to_node(ring, key) do
{:error, _reason} -> :undefined
node -> node
end
end
end
8 changes: 4 additions & 4 deletions lib/swarm/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Swarm.Registry do
## Public API

defdelegate register(name, pid), to: Tracker, as: :track
defdelegate register(name, module, fun, args, timeout), to: Tracker, as: :track
defdelegate register(name, module, fun, args, timeout, restart), to: Tracker, as: :track

@spec unregister(term) :: :ok
def unregister(name) do
Expand All @@ -31,13 +31,13 @@ defmodule Swarm.Registry do
end

@spec whereis_or_register(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term}
def whereis_or_register(name, m, f, a, timeout \\ :infinity)
def whereis_or_register(name, m, f, a, timeout \\ :infinity, restart \\ :temporary)

@spec whereis_or_register(term, atom(), atom(), [term], non_neg_integer() | :infinity) ::
{:ok, pid} | {:error, term}
def whereis_or_register(name, module, fun, args, timeout) do
def whereis_or_register(name, module, fun, args, timeout, restart) do
with :undefined <- whereis(name),
{:ok, pid} <- register(name, module, fun, args, timeout) do
{:ok, pid} <- register(name, module, fun, args, timeout, restart) do
{:ok, pid}
else
pid when is_pid(pid) ->
Expand Down
51 changes: 44 additions & 7 deletions lib/swarm/tracker/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ defmodule Swarm.Tracker do
the new owner, after initiating the handoff process as described in the documentation.
A track call will return an error tagged tuple, `{:error, :no_node_available}`, if there is no node available to start the process.
Provide a timeout value to limit the track call duration. A value of `:infinity` can be used to block indefinitely.
The optional restart parameter indicates when a handoff should occur. If `:temporary` and a process terminates, it will not be
handed off (only if the node goes down abruptly); if `:transient`, the terminate reason is inspected, and if it's `:normal`,
`:shutdown` or `{:shutdown, term()}` a handoff isn't started, only if the termination reason is something else; and finally if it's
`:permanent`, the reason is just ignored, a handoff is always performed.
"""
def track(name, m, f, a, timeout) when is_atom(m) and is_atom(f) and is_list(a),
do: GenStateMachine.call(__MODULE__, {:track, name, %{mfa: {m, f, a}}}, timeout)
def track(name, m, f, a, timeout, restart) when is_atom(m) and is_atom(f) and is_list(a) and restart in [:permanent, :transient, :temporary],
do: GenStateMachine.call(__MODULE__, {:track, name, %{mfa: {m, f, a}, restart: restart}}, timeout)

@doc """
Stops tracking the given process (pid).
Expand Down Expand Up @@ -1184,7 +1188,7 @@ defmodule Swarm.Tracker do
debug "The node #{worker_name} was not found in the registry"
entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj ->
case Strategy.remove_node(state.strategy, state.self) |> Strategy.key_to_node(name) do
{:error, {:invalid_ring, :no_nodes}} ->
:undefined ->
debug "Cannot handoff #{inspect name} because there is no other node left"
other_node ->
debug "#{inspect name} has requested to be terminated and resumed on another node"
Expand Down Expand Up @@ -1265,18 +1269,51 @@ defmodule Swarm.Tracker do
end

defp handle_monitor(ref, pid, reason, %TrackerState{} = state) do
case Registry.get_by_ref(ref) do
action =
case Registry.get_by_ref(ref) do
:undefined -> :undefined

entry(pid: ^pid, meta: %{mfa: _mfa, restart: restart}) = obj ->
case restart do
:permanent ->
{:handoff, {:restart, :permanent}}

:transient ->
case reason do
:normal -> {:down, obj}
:shutdown -> {:down, obj}
{:shutdown, _reason} -> {:down, obj}
_ -> {:handoff, {:restart, :transient, reason}}
end

:temporary ->
{:down, obj}
end

entry(pid: ^pid) = obj -> {:down, obj}
end

case action do
:undefined ->
debug(
"#{inspect(pid)} is down: #{inspect(reason)}, but no registration found, ignoring.."
)

:keep_state_and_data

entry(name: name, pid: ^pid) = obj ->
{:down, entry(name: name) = obj} ->
debug("#{inspect(name)} is down: #{inspect(reason)}")
{:ok, new_state} = remove_registration(obj, state)
{:keep_state, new_state}

{:handoff, condition} ->
debug(
"handing off #{inspect(pid)} due to #{inspect condition}"
)

state
|> nodedown(node(pid))
|> handle_node_status()
end
end

Expand Down Expand Up @@ -1594,9 +1631,9 @@ defmodule Swarm.Tracker do
entry(name: rname, pid: rpid),
state
) do
GenStateMachine.cast({__MODULE__, remote_node}, {:untrack, rpid})
GenStateMachine.cast({__MODULE__, remote_node}, {:event, self(), state.clock, {:untrack, rpid}})
send(rpid, {:swarm, :die})
GenStateMachine.cast({__MODULE__, remote_node}, {:track, rname, lpid, lmeta})
GenStateMachine.cast({__MODULE__, remote_node}, {:event, self(), state.clock, {:track, rname, lpid, lmeta}})
state
end

Expand Down