diff --git a/lib/swarm.ex b/lib/swarm.ex index dd22f95..47c00d1 100644 --- a/lib/swarm.ex +++ b/lib/swarm.ex @@ -49,8 +49,8 @@ defmodule Swarm do @spec register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term} @spec register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) :: {:ok, pid} | {:error, term} - def register_name(name, m, f, a, timeout \\ :infinity) - def register_name(name, m, f, a, timeout), do: Swarm.Registry.register(name, m, f, a, timeout) + def register_name(name, m, f, a, timeout \\ :infinity, restart \\ :temporary) + def register_name(name, m, f, a, timeout, restart), do: Swarm.Registry.register(name, m, f, a, timeout, restart) @doc """ Either finds the named process in the swarm or registers it using the register function. @@ -58,10 +58,10 @@ defmodule Swarm do @spec whereis_or_register_name(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term} @spec whereis_or_register_name(term, atom(), atom(), [term], non_neg_integer() | :infinity) :: {:ok, pid} | {:error, term} - def whereis_or_register_name(name, m, f, a, timeout \\ :infinity) + def whereis_or_register_name(name, m, f, a, timeout \\ :infinity, restart \\ :temporary) - def whereis_or_register_name(name, m, f, a, timeout), - do: Swarm.Registry.whereis_or_register(name, m, f, a, timeout) + def whereis_or_register_name(name, m, f, a, timeout, restart), + do: Swarm.Registry.whereis_or_register(name, m, f, a, timeout, restart) @doc """ Unregisters the given name from the registry. diff --git a/lib/swarm/distribution/ring.ex b/lib/swarm/distribution/ring.ex index a85583c..c4b92a0 100644 --- a/lib/swarm/distribution/ring.ex +++ b/lib/swarm/distribution/ring.ex @@ -7,5 +7,10 @@ defmodule Swarm.Distribution.Ring do def add_node(ring, node, weight), do: HashRing.add_node(ring, node, weight) def add_nodes(ring, nodes), do: HashRing.add_nodes(ring, nodes) def remove_node(ring, node), do: HashRing.remove_node(ring, node) - def key_to_node(ring, key), do: HashRing.key_to_node(ring, key) + def key_to_node(ring, key) do + case HashRing.key_to_node(ring, key) do + {:error, _reason} -> :undefined + node -> node + end + end end diff --git a/lib/swarm/registry.ex b/lib/swarm/registry.ex index 2a4a98f..9d63b91 100644 --- a/lib/swarm/registry.ex +++ b/lib/swarm/registry.ex @@ -9,7 +9,7 @@ defmodule Swarm.Registry do ## Public API defdelegate register(name, pid), to: Tracker, as: :track - defdelegate register(name, module, fun, args, timeout), to: Tracker, as: :track + defdelegate register(name, module, fun, args, timeout, restart), to: Tracker, as: :track @spec unregister(term) :: :ok def unregister(name) do @@ -31,13 +31,13 @@ defmodule Swarm.Registry do end @spec whereis_or_register(term, atom(), atom(), [term]) :: {:ok, pid} | {:error, term} - def whereis_or_register(name, m, f, a, timeout \\ :infinity) + def whereis_or_register(name, m, f, a, timeout \\ :infinity, restart \\ :temporary) @spec whereis_or_register(term, atom(), atom(), [term], non_neg_integer() | :infinity) :: {:ok, pid} | {:error, term} - def whereis_or_register(name, module, fun, args, timeout) do + def whereis_or_register(name, module, fun, args, timeout, restart) do with :undefined <- whereis(name), - {:ok, pid} <- register(name, module, fun, args, timeout) do + {:ok, pid} <- register(name, module, fun, args, timeout, restart) do {:ok, pid} else pid when is_pid(pid) -> diff --git a/lib/swarm/tracker/tracker.ex b/lib/swarm/tracker/tracker.ex index 920a912..ae107ab 100644 --- a/lib/swarm/tracker/tracker.ex +++ b/lib/swarm/tracker/tracker.ex @@ -83,9 +83,13 @@ defmodule Swarm.Tracker do the new owner, after initiating the handoff process as described in the documentation. A track call will return an error tagged tuple, `{:error, :no_node_available}`, if there is no node available to start the process. Provide a timeout value to limit the track call duration. A value of `:infinity` can be used to block indefinitely. + The optional restart parameter indicates when a handoff should occur. If `:temporary` and a process terminates, it will not be + handed off (only if the node goes down abruptly); if `:transient`, the terminate reason is inspected, and if it's `:normal`, + `:shutdown` or `{:shutdown, term()}` a handoff isn't started, only if the termination reason is something else; and finally if it's + `:permanent`, the reason is just ignored, a handoff is always performed. """ - def track(name, m, f, a, timeout) when is_atom(m) and is_atom(f) and is_list(a), - do: GenStateMachine.call(__MODULE__, {:track, name, %{mfa: {m, f, a}}}, timeout) + def track(name, m, f, a, timeout, restart) when is_atom(m) and is_atom(f) and is_list(a) and restart in [:permanent, :transient, :temporary], + do: GenStateMachine.call(__MODULE__, {:track, name, %{mfa: {m, f, a}, restart: restart}}, timeout) @doc """ Stops tracking the given process (pid). @@ -1184,7 +1188,7 @@ defmodule Swarm.Tracker do debug "The node #{worker_name} was not found in the registry" entry(name: name, pid: pid, meta: %{mfa: _mfa} = meta) = obj -> case Strategy.remove_node(state.strategy, state.self) |> Strategy.key_to_node(name) do - {:error, {:invalid_ring, :no_nodes}} -> + :undefined -> debug "Cannot handoff #{inspect name} because there is no other node left" other_node -> debug "#{inspect name} has requested to be terminated and resumed on another node" @@ -1265,7 +1269,31 @@ defmodule Swarm.Tracker do end defp handle_monitor(ref, pid, reason, %TrackerState{} = state) do - case Registry.get_by_ref(ref) do + action = + case Registry.get_by_ref(ref) do + :undefined -> :undefined + + entry(pid: ^pid, meta: %{mfa: _mfa, restart: restart}) = obj -> + case restart do + :permanent -> + {:handoff, {:restart, :permanent}} + + :transient -> + case reason do + :normal -> {:down, obj} + :shutdown -> {:down, obj} + {:shutdown, _reason} -> {:down, obj} + _ -> {:handoff, {:restart, :transient, reason}} + end + + :temporary -> + {:down, obj} + end + + entry(pid: ^pid) = obj -> {:down, obj} + end + + case action do :undefined -> debug( "#{inspect(pid)} is down: #{inspect(reason)}, but no registration found, ignoring.." @@ -1273,10 +1301,19 @@ defmodule Swarm.Tracker do :keep_state_and_data - entry(name: name, pid: ^pid) = obj -> + {:down, entry(name: name) = obj} -> debug("#{inspect(name)} is down: #{inspect(reason)}") {:ok, new_state} = remove_registration(obj, state) {:keep_state, new_state} + + {:handoff, condition} -> + debug( + "handing off #{inspect(pid)} due to #{inspect condition}" + ) + + state + |> nodedown(node(pid)) + |> handle_node_status() end end @@ -1594,9 +1631,9 @@ defmodule Swarm.Tracker do entry(name: rname, pid: rpid), state ) do - GenStateMachine.cast({__MODULE__, remote_node}, {:untrack, rpid}) + GenStateMachine.cast({__MODULE__, remote_node}, {:event, self(), state.clock, {:untrack, rpid}}) send(rpid, {:swarm, :die}) - GenStateMachine.cast({__MODULE__, remote_node}, {:track, rname, lpid, lmeta}) + GenStateMachine.cast({__MODULE__, remote_node}, {:event, self(), state.clock, {:track, rname, lpid, lmeta}}) state end