|
| 1 | +defmodule Shared.EventStoreListener do |
| 2 | + use GenServer |
| 3 | + require Logger |
| 4 | + alias EventStore.RecordedEvent |
| 5 | + |
| 6 | + defmodule ErrorContext do |
| 7 | + defstruct error_count: 0, max_retries: 3, delay_factor: 10 |
| 8 | + |
| 9 | + @type t :: %__MODULE__{ |
| 10 | + error_count: integer, |
| 11 | + max_retries: integer, |
| 12 | + delay_factor: integer |
| 13 | + } |
| 14 | + |
| 15 | + def new do |
| 16 | + %__MODULE__{error_count: 0, max_retries: 3} |
| 17 | + end |
| 18 | + |
| 19 | + def record_error(%__MODULE__{} = context) do |
| 20 | + Map.update(context, :error_count, 1, fn error_count -> error_count + 1 end) |
| 21 | + end |
| 22 | + |
| 23 | + def retry?(%__MODULE__{error_count: error_count, max_retries: max_retries}) do |
| 24 | + error_count <= max_retries |
| 25 | + end |
| 26 | + |
| 27 | + def retry_count(%__MODULE__{error_count: error_count}) do |
| 28 | + error_count - 1 |
| 29 | + end |
| 30 | + |
| 31 | + def delay(%__MODULE__{ |
| 32 | + error_count: error_count, |
| 33 | + max_retries: max_retries, |
| 34 | + delay_factor: delay_factor |
| 35 | + }) |
| 36 | + when error_count <= max_retries do |
| 37 | + # Exponential backoff |
| 38 | + sleep_duration = (:math.pow(2, error_count) * delay_factor) |> round() |
| 39 | + |
| 40 | + Process.sleep(sleep_duration) |
| 41 | + end |
| 42 | + end |
| 43 | + |
| 44 | + @type domain_event :: struct() |
| 45 | + @type metadata :: map() |
| 46 | + @type error_context :: ErrorContext.t() |
| 47 | + @type state :: map() | list() |
| 48 | + @type handle_result :: :ok | {:error, reason :: any()} |
| 49 | + |
| 50 | + @callback handle(domain_event(), metadata()) :: handle_result() |
| 51 | + @callback handle(domain_event(), metadata(), state()) :: handle_result() |
| 52 | + @callback on_error( |
| 53 | + error :: term(), |
| 54 | + failed_event :: domain_event(), |
| 55 | + metadata :: metadata(), |
| 56 | + error_context :: error_context() |
| 57 | + ) :: |
| 58 | + {:retry, error_context :: error_context()} |
| 59 | + | {:retry, delay :: non_neg_integer(), error_context :: error_context()} |
| 60 | + | :skip |
| 61 | + | {:stop, reason :: term()} |
| 62 | + |
| 63 | + @callback on_error( |
| 64 | + error :: term(), |
| 65 | + stacktrace :: list(), |
| 66 | + failed_event :: domain_event(), |
| 67 | + metadata :: metadata(), |
| 68 | + error_context :: error_context() |
| 69 | + ) :: |
| 70 | + {:retry, error_context :: error_context()} |
| 71 | + | {:retry, delay :: non_neg_integer(), error_context :: error_context()} |
| 72 | + | :skip |
| 73 | + | {:stop, reason :: term()} |
| 74 | + |
| 75 | + defmacro __using__(opts) do |
| 76 | + opts = opts || [] |
| 77 | + |
| 78 | + quote location: :keep do |
| 79 | + @opts unquote(opts) || [] |
| 80 | + @name @opts[:name] || __MODULE__ |
| 81 | + |
| 82 | + @behaviour Shared.EventStoreListener |
| 83 | + |
| 84 | + # Verhindere, dass`@subscription_key` eine Warnung produzieren, falls nicht gesetzt. |
| 85 | + unless Module.get_attribute(__MODULE__, :subscription_key) do |
| 86 | + Module.put_attribute(__MODULE__, :subscription_key, nil) |
| 87 | + end |
| 88 | + |
| 89 | + # Adds default handle method |
| 90 | + @before_compile unquote(__MODULE__) |
| 91 | + |
| 92 | + def start_link(opts \\ []) do |
| 93 | + opts = Keyword.merge(@opts, opts) |
| 94 | + Shared.EventStoreListener.start_link(@name, __MODULE__, opts) |
| 95 | + end |
| 96 | + |
| 97 | + def child_spec(opts) do |
| 98 | + default = %{ |
| 99 | + id: @name, |
| 100 | + start: {__MODULE__, :start_link, [opts]}, |
| 101 | + restart: :permanent, |
| 102 | + type: :worker |
| 103 | + } |
| 104 | + |
| 105 | + Supervisor.child_spec(default, []) |
| 106 | + end |
| 107 | + end |
| 108 | + end |
| 109 | + |
| 110 | + def start_link(name, handler_module, opts) do |
| 111 | + default_opts = %{ |
| 112 | + name: nil, |
| 113 | + handler_module: nil, |
| 114 | + subscription_key: nil, |
| 115 | + subscription: nil, |
| 116 | + start_from: :origin, |
| 117 | + event_store: nil |
| 118 | + } |
| 119 | + |
| 120 | + opts = Enum.into(opts, default_opts) |
| 121 | + opts[:event_store] || raise "Event Store(event_store: My.EventStore) configuration is missing" |
| 122 | + |
| 123 | + state = %{opts | handler_module: handler_module, name: name} |
| 124 | + |
| 125 | + GenServer.start_link(__MODULE__, state, name: name) |
| 126 | + end |
| 127 | + |
| 128 | + defmacro __before_compile__(_env) do |
| 129 | + quote generated: true do |
| 130 | + def init(state) do |
| 131 | + state = |
| 132 | + case @subscription_key do |
| 133 | + subscription_key when is_binary(subscription_key) and subscription_key != "" -> |
| 134 | + %{state | subscription_key: subscription_key} |
| 135 | + |
| 136 | + _ -> |
| 137 | + state |
| 138 | + end |
| 139 | + |
| 140 | + {:ok, state} |
| 141 | + end |
| 142 | + |
| 143 | + defoverridable init: 1 |
| 144 | + |
| 145 | + def handle(_event, _metadata), do: :ok |
| 146 | + defoverridable handle: 2 |
| 147 | + |
| 148 | + def handle(event, metadata, _state), do: handle(event, metadata) |
| 149 | + defoverridable handle: 3 |
| 150 | + |
| 151 | + def on_error({:error, reason}, _event, _metadata, error_context), |
| 152 | + do: {:retry, error_context} |
| 153 | + |
| 154 | + defoverridable on_error: 4 |
| 155 | + |
| 156 | + def on_error(error, _stacktrace, event, metadata, error_context), |
| 157 | + do: on_error(error, event, metadata, error_context) |
| 158 | + |
| 159 | + defoverridable on_error: 5 |
| 160 | + end |
| 161 | + end |
| 162 | + |
| 163 | + @impl true |
| 164 | + def init( |
| 165 | + %{name: handler_name, handler_module: handler_module, event_store: event_store} = state |
| 166 | + ) do |
| 167 | + with {:ok, new_state} <- handler_module.init(state), |
| 168 | + subscription_key = new_state[:subscription_key] || subscription_key_for(handler_name), |
| 169 | + start_from = new_state[:start_from] || :origin, |
| 170 | + {:ok, subscription} <- |
| 171 | + event_store.subscribe_to_all_streams( |
| 172 | + subscription_key, |
| 173 | + self(), |
| 174 | + start_from: start_from |
| 175 | + ) do |
| 176 | + {:ok, %{new_state | subscription: subscription}} |
| 177 | + end |
| 178 | + end |
| 179 | + |
| 180 | + @impl true |
| 181 | + def handle_info({:subscribed, _subscription}, %{name: name} = state) do |
| 182 | + Logger.debug(fn -> |
| 183 | + "#{name} sucessfully subscribed to event store." |
| 184 | + end) |
| 185 | + |
| 186 | + {:noreply, state} |
| 187 | + end |
| 188 | + |
| 189 | + @impl true |
| 190 | + def handle_info({:events, events}, %{name: name} = state) do |
| 191 | + Logger.debug(fn -> "#{name} received events: #{inspect(events)}" end) |
| 192 | + |
| 193 | + try do |
| 194 | + Enum.each(events, fn event -> handle_event(event, state, ErrorContext.new()) end) |
| 195 | + {:noreply, state} |
| 196 | + catch |
| 197 | + {:error, reason} -> |
| 198 | + {:stop, reason, state} |
| 199 | + end |
| 200 | + end |
| 201 | + |
| 202 | + @impl true |
| 203 | + def handle_call(:get_state, _from, state) do |
| 204 | + {:reply, state, state} |
| 205 | + end |
| 206 | + |
| 207 | + defp handle_event( |
| 208 | + %RecordedEvent{} = event, |
| 209 | + %{name: name} = state, |
| 210 | + %ErrorContext{} = error_context |
| 211 | + ) do |
| 212 | + case delegate_event_to_handler(event, state) do |
| 213 | + :ok -> |
| 214 | + ack_event(event, state) |
| 215 | + |
| 216 | + {:ok, _} -> |
| 217 | + ack_event(event, state) |
| 218 | + |
| 219 | + {:error, reason} -> |
| 220 | + Logger.error(fn -> |
| 221 | + "#{name} failed to handle event #{inspect(event)} due to #{inspect(reason)}" |
| 222 | + end) |
| 223 | + |
| 224 | + handle_error({:error, reason}, current_stacktrace(), event, state, error_context) |
| 225 | + |
| 226 | + {:error, reason, stacktrace} -> |
| 227 | + Logger.error(fn -> |
| 228 | + "#{name} failed to handle event #{inspect(event)} due to #{inspect(reason)}" |
| 229 | + end) |
| 230 | + |
| 231 | + handle_error({:error, reason}, stacktrace, event, state, error_context) |
| 232 | + end |
| 233 | + end |
| 234 | + |
| 235 | + defp delegate_event_to_handler( |
| 236 | + %RecordedEvent{} = event, |
| 237 | + %{ |
| 238 | + handler_module: handler_module |
| 239 | + } = state |
| 240 | + ) do |
| 241 | + try do |
| 242 | + {domain_event, metadata} = Shared.EventStoreEvent.unwrap(event) |
| 243 | + handler_module.handle(domain_event, metadata, state) |
| 244 | + rescue |
| 245 | + error -> |
| 246 | + {:error, error, __STACKTRACE__} |
| 247 | + end |
| 248 | + end |
| 249 | + |
| 250 | + defp handle_error( |
| 251 | + error, |
| 252 | + stacktrace, |
| 253 | + event, |
| 254 | + %{handler_module: handler_module, name: name} = state, |
| 255 | + context |
| 256 | + ) do |
| 257 | + %RecordedEvent{data: domain_event, metadata: metadata} = event |
| 258 | + |
| 259 | + case handler_module.on_error(error, stacktrace, domain_event, metadata, context) do |
| 260 | + {:retry, %ErrorContext{} = context} -> |
| 261 | + context = ErrorContext.record_error(context) |
| 262 | + |
| 263 | + if ErrorContext.retry?(context) do |
| 264 | + ErrorContext.delay(context) |
| 265 | + |
| 266 | + Logger.warn(fn -> |
| 267 | + "#{name} is retrying (#{context.error_count}/#{context.max_retries}) failed event #{ |
| 268 | + inspect(event) |
| 269 | + }" |
| 270 | + end) |
| 271 | + |
| 272 | + handle_event(event, state, context) |
| 273 | + else |
| 274 | + reason = |
| 275 | + "#{name} is dying due to bad event after #{ErrorContext.retry_count(context)} retries #{ |
| 276 | + inspect(error) |
| 277 | + }, Stacktrace: #{inspect(stacktrace)}" |
| 278 | + |
| 279 | + Logger.warn(reason) |
| 280 | + |
| 281 | + throw({:error, reason}) |
| 282 | + end |
| 283 | + |
| 284 | + :skip -> |
| 285 | + Logger.debug(fn -> |
| 286 | + "#{name} is skipping event #{inspect(event)}" |
| 287 | + end) |
| 288 | + |
| 289 | + ack_event(event, state) |
| 290 | + |
| 291 | + {:stop, reason} -> |
| 292 | + reason = "#{name} has requested to stop in on_error/5 callback with #{inspect(reason)}" |
| 293 | + |
| 294 | + Logger.warn(reason) |
| 295 | + throw({:error, reason}) |
| 296 | + |
| 297 | + error -> |
| 298 | + Logger.warn(fn -> |
| 299 | + "#{name} on_error/5 returned an invalid response #{inspect(error)}" |
| 300 | + end) |
| 301 | + |
| 302 | + throw(error) |
| 303 | + end |
| 304 | + end |
| 305 | + |
| 306 | + defp current_stacktrace do |
| 307 | + case Process.info(self(), :current_stacktrace) do |
| 308 | + {:current_stacktrace, stacktrace} -> stacktrace |
| 309 | + nil -> "Process is not alive. No stacktrace available" |
| 310 | + end |
| 311 | + end |
| 312 | + |
| 313 | + defp ack_event(event, %{subscription: subscription, event_store: event_store}) do |
| 314 | + :ok = event_store.ack(subscription, event) |
| 315 | + end |
| 316 | + |
| 317 | + @deprecated """ |
| 318 | + Set `subscription_key` on initialization. Otherwise a change of the file name would break the subscription and all the events get processed again. |
| 319 | + """ |
| 320 | + defp subscription_key_for(handler) do |
| 321 | + subscription_key = |
| 322 | + handler |
| 323 | + |> Atom.to_string() |
| 324 | + |> String.split(".") |
| 325 | + |> Enum.at(-1) |
| 326 | + |> Macro.underscore() |
| 327 | + |> Kernel.<>("_event_listener") |
| 328 | + |
| 329 | + Logger.warn( |
| 330 | + "Please specify a `subscription_key` on initialization for `#{handler}`. Otherwise a change of the file name would break the subscription and all the events get processed again. Default was: \"#{ |
| 331 | + subscription_key |
| 332 | + }\"" |
| 333 | + ) |
| 334 | + |
| 335 | + subscription_key |
| 336 | + end |
| 337 | +end |
0 commit comments