From e3d64c0b8eec93006248e26a22228409336730a4 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Tue, 11 Feb 2025 09:34:05 -0300 Subject: [PATCH] feat: automatically reject message on failures --- docker-compose.yml | 2 ++ lib/lepus.rb | 33 ++-------------------------- lib/lepus/configuration.rb | 13 ++++++----- lib/lepus/consumer.rb | 20 ++++++++++++++--- lib/lepus/consumer_config.rb | 12 +++++----- lib/lepus/consumer_wrapper.rb | 2 ++ lib/lepus/process.rb | 3 --- lib/lepus/processes/consumer.rb | 17 ++++++++------ lib/lepus/processes/interruptible.rb | 2 -- lib/lepus/processes/procline.rb | 4 ++-- lib/lepus/processes/registrable.rb | 2 +- lib/lepus/processes/runnable.rb | 21 +++++++++--------- lib/lepus/supervisor.rb | 14 +++--------- lib/lepus/supervisor/config.rb | 4 ++-- lib/lepus/supervisor/maintenance.rb | 9 -------- spec/lepus/consumer_spec.rb | 22 +++++++++++++++++++ 16 files changed, 88 insertions(+), 92 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d686ef3..5a7aa79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,3 +4,5 @@ services: ports: - 5672:5672 - 15672:15672 + command: > + sh -c "rabbitmq-plugins enable rabbitmq_management && rabbitmq-server" diff --git a/lib/lepus.rb b/lib/lepus.rb index 3e48d33..749deee 100644 --- a/lib/lepus.rb +++ b/lib/lepus.rb @@ -31,10 +31,6 @@ module Lepus class Error < StandardError end - # Error that is raised when the Bunny recovery attempts are exhausted. - class MaxRecoveryAttemptsExhaustedError < Error - end - # Error that is raised when an invalid value is returned from {#work} class InvalidConsumerReturnError < Error def initialize(value) @@ -50,33 +46,8 @@ class InvalidConsumerConfigError < Error class ShutdownError < Error end - module Processes - class ProcessMissingError < RuntimeError - def initialize - super("The process that was running this job no longer exists") - end - end - - class ProcessExitError < RuntimeError - def initialize(status) - message = "Process pid=#{status.pid} exited unexpectedly." - if status.exitstatus - message += " Exited with status #{status.exitstatus}." - end - - if status.signaled? - message += " Received unhandled signal #{status.termsig}." - end - - super(message) - end - end - - class ProcessPrunedError < RuntimeError - def initialize(last_heartbeat_at) - super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}") - end - end + # Error that is raised when the Bunny recovery attempts are exhausted. + class MaxRecoveryAttemptsExhaustedError < ShutdownError end extend self diff --git a/lib/lepus/configuration.rb b/lib/lepus/configuration.rb index 91ef20e..f2701d1 100644 --- a/lib/lepus/configuration.rb +++ b/lib/lepus/configuration.rb @@ -5,6 +5,7 @@ module Lepus class Configuration DEFAULT_RABBITMQ_URL = "amqp://guest:guest@localhost:5672" DEFAULT_RECOVERY_ATTEMPTS = 10 + DEFAULT_RECOVERY_INTERVAL = 5.0 DEFAULT_RECOVER_FROM_CONNECTION_CLOSE = true DEFAULT_CONSUMERS_DIRECTORY = Pathname.new("app/consumers") @@ -20,6 +21,9 @@ class Configuration # @return [Integer] max number of recovery attempts, nil means forever attr_accessor :recovery_attempts + # @return [Integer] the interval in seconds between network recovery attempts. + attr_accessor :recovery_interval + # @return [Pathname] the directory where the consumers are stored. attr_reader :consumers_directory @@ -40,6 +44,7 @@ def initialize @connection_name = "Lepus (#{Lepus::VERSION})" @rabbitmq_url = ENV.fetch("RABBITMQ_URL", DEFAULT_RABBITMQ_URL) || DEFAULT_RABBITMQ_URL @recovery_attempts = DEFAULT_RECOVERY_ATTEMPTS + @recovery_interval = DEFAULT_RECOVERY_INTERVAL @recover_from_connection_close = DEFAULT_RECOVER_FROM_CONNECTION_CLOSE @consumers_directory = DEFAULT_CONSUMERS_DIRECTORY @process_heartbeat_interval = 60 @@ -67,18 +72,16 @@ def connection_config connection_name: connection_name, recover_from_connection_close: recover_from_connection_close, recovery_attempts: recovery_attempts, + network_recovery_interval: recovery_interval, recovery_attempts_exhausted: recovery_attempts_exhausted }.compact end - # @return [Proc] that is passed to Bunny’s recovery_attempts_exhausted block. Nil if recovery_attempts is nil. + # @return [Proc, NilClass] Proc that is passed to Bunny’s recovery_attempts_exhausted block. def recovery_attempts_exhausted - return nil unless recovery_attempts + return unless recovery_attempts proc do - # We need to have this since Bunny’s multi-threading is cumbersome here. - # Session reconnection seems not to be done in the main thread. If we want to - # achieve a restart of the app we need to modify the thread behaviour. Thread.current.abort_on_exception = true raise Lepus::MaxRecoveryAttemptsExhaustedError end diff --git a/lib/lepus/consumer.rb b/lib/lepus/consumer.rb index 4aa2d7d..8677481 100644 --- a/lib/lepus/consumer.rb +++ b/lib/lepus/consumer.rb @@ -25,7 +25,7 @@ def config return if @abstract_class == true return @config if defined?(@config) - name = Primitive::String.new(self).underscore.split("/").last + name = Primitive::String.new(to_s).underscore.split("/").last @config = ConsumerConfig.new(queue: name, exchange: name) end @@ -43,13 +43,14 @@ def use(middleware, opts = {}) if middleware.is_a?(Symbol) || middleware.is_a?(String) begin require_relative "middlewares/#{middleware}" - class_name = Primitive::String.new(middleware).classify + class_name = Primitive::String.new(middleware.to_s).classify class_name = "JSON" if class_name == "Json" middleware = Lepus::Middlewares.const_get(class_name) rescue LoadError, NameError raise ArgumentError, "Middleware #{middleware} not found" end end + middlewares << middleware.new(**opts) end @@ -103,6 +104,11 @@ def process_delivery(delivery_info, metadata, payload) nest_middleware(middleware, next_middleware) end .call(message) + rescue Exception => ex + # @TODO: add error handling + logger.error(ex) + + reject! end protected @@ -136,6 +142,14 @@ def requeue! end alias_method :requeue, :requeue! + # Helper method to nack a message. + # + # @return [:nack] + def nack! + :nack + end + alias_method :nack, :nack! + private def work_proc @@ -153,7 +167,7 @@ def nest_middleware(middleware, next_middleware) end def verify_result(result) - return if %i[ack reject requeue].include?(result) + return if %i[ack reject requeue nack].include?(result) raise InvalidConsumerReturnError, result end diff --git a/lib/lepus/consumer_config.rb b/lib/lepus/consumer_config.rb index d16c44b..500e6d8 100644 --- a/lib/lepus/consumer_config.rb +++ b/lib/lepus/consumer_config.rb @@ -60,9 +60,7 @@ def channel_args end def exchange_args - name = @exchange_opts[:name] - raise InvalidConsumerConfigError, "Exchange name is required" if name.nil? - [name, @exchange_opts.reject { |k, v| k == :name }] + [exchange_name, @exchange_opts.reject { |k, v| k == :name }] end def consumer_queue_args @@ -113,10 +111,12 @@ def binds_args protected + def exchange_name + @exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required") + end + def queue_name - name = @queue_opts[:name] - raise InvalidConsumerConfigError, "Queue name is required" if name.nil? - name + @queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required") end def retry_queue_name diff --git a/lib/lepus/consumer_wrapper.rb b/lib/lepus/consumer_wrapper.rb index 448ec77..102054e 100644 --- a/lib/lepus/consumer_wrapper.rb +++ b/lib/lepus/consumer_wrapper.rb @@ -36,6 +36,8 @@ def process_result(result, delivery_tag) channel.reject(delivery_tag) when :requeue channel.reject(delivery_tag, true) + when :nack + channel.nack(delivery_tag, false, true) else raise Lepus::InvalidConsumerReturnError, result end diff --git a/lib/lepus/process.rb b/lib/lepus/process.rb index ed03e4d..b70e0cc 100644 --- a/lib/lepus/process.rb +++ b/lib/lepus/process.rb @@ -115,9 +115,6 @@ def deregister(pruned: false) end def prune - # error = Lepus::Processes::ProcessPrunedError.new(last_heartbeat_at) - # fail_all_claimed_executions_with(error) - deregister(pruned: true) end diff --git a/lib/lepus/processes/consumer.rb b/lib/lepus/processes/consumer.rb index f803bdf..728ff92 100644 --- a/lib/lepus/processes/consumer.rb +++ b/lib/lepus/processes/consumer.rb @@ -52,7 +52,7 @@ def run end def shutdown - @consumers.to_a.each(&:cancel) + @subscriptions.to_a.each(&:cancel) @channel&.close @bunny&.close @@ -71,8 +71,10 @@ def setup_consumer! @bunny = Thread.current[:lepus_bunny] || Lepus.config.create_connection @channel = Thread.current[:lepus_channel] || begin @bunny.create_channel(nil, 1, true).tap do |channel| - channel.prefetch(1) - channel.on_uncaught_exception { |error| handle_thread_error(error) } + channel.prefetch(1) # @TODO make this configurable + channel.on_uncaught_exception { |error| + handle_thread_error(error) + } end end @@ -84,7 +86,7 @@ def setup_consumer! @error_queue = @channel.queue(*args) end - @consumers = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future, need to test how this works + @subscriptions = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future main_queue = @channel.queue(*consumer_class.config.consumer_queue_args) consumer_class.config.binds_args.each do |opts| main_queue.bind(@exchange, **opts) @@ -102,9 +104,10 @@ def setup_consumer! end main_queue.subscribe_with(consumer_wrapper) end - # rescue Lepus::InvalidConsumerConfigError => e - # # shutdown if the consumer config is invalid - # raise e + rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e + raise Lepus::ShutdownError + rescue Lepus::InvalidConsumerConfigError => e + raise Lepus::ShutdownError end end end diff --git a/lib/lepus/processes/interruptible.rb b/lib/lepus/processes/interruptible.rb index f96fb08..9e634fc 100644 --- a/lib/lepus/processes/interruptible.rb +++ b/lib/lepus/processes/interruptible.rb @@ -3,7 +3,6 @@ module Lepus::Processes module Interruptible def wake_up - puts "[Interruptible |> #{self.class.name}] Waking up process..." interrupt end @@ -12,7 +11,6 @@ def wake_up SELF_PIPE_BLOCK_SIZE = 11 def interrupt - puts "[Interruptible |> #{self.class.name}] Interrupting process..." self_pipe[:writer].write_nonblock(".") rescue Errno::EAGAIN, Errno::EINTR # Ignore writes that would block and retry diff --git a/lib/lepus/processes/procline.rb b/lib/lepus/processes/procline.rb index baa4a4f..f62034f 100644 --- a/lib/lepus/processes/procline.rb +++ b/lib/lepus/processes/procline.rb @@ -3,9 +3,9 @@ module Lepus::Processes module Procline # Sets the procline ($0) - # lepus-supervisor(0.1.0): + # [lepus-supervisor: ] def procline(string) - $0 = "lepus-#{self.class.name.split("::").last.downcase}: #{string}" + $0 = "[lepus-#{kind.downcase}: #{string}]" end end end diff --git a/lib/lepus/processes/registrable.rb b/lib/lepus/processes/registrable.rb index ea32ffb..0efcf9a 100644 --- a/lib/lepus/processes/registrable.rb +++ b/lib/lepus/processes/registrable.rb @@ -45,7 +45,7 @@ def launch_heartbeat wrap_in_app_executor { heartbeat } end - @heartbeat_task.add_observer do |_, _, error| + @heartbeat_task.add_observer do |_time, _result, error| handle_thread_error(error) if error end diff --git a/lib/lepus/processes/runnable.rb b/lib/lepus/processes/runnable.rb index b53e65b..30d174d 100644 --- a/lib/lepus/processes/runnable.rb +++ b/lib/lepus/processes/runnable.rb @@ -55,20 +55,21 @@ def boot end def shutting_down? - stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered? + stopped? || (running_as_fork? && supervisor_went_away?) || !registered? # || finished? end def run raise NotImplementedError end - def finished? - running_inline? && all_work_completed? - end + # @TODO Add it to the inline mode + # def finished? + # running_inline? && all_work_completed? + # end - def all_work_completed? - false - end + # def all_work_completed? + # false + # end def shutdown end @@ -76,9 +77,9 @@ def shutdown def set_procline end - def running_inline? - mode.inline? - end + # def running_inline? + # mode.inline? + # end def running_async? mode.async? diff --git a/lib/lepus/supervisor.rb b/lib/lepus/supervisor.rb index 0d3f1d0..37b526b 100644 --- a/lib/lepus/supervisor.rb +++ b/lib/lepus/supervisor.rb @@ -180,9 +180,9 @@ def reap_terminated_forks pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) break unless pid - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end + # if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) + # handle_claimed_jobs_by(terminated_fork, status) + # end configured_processes.delete(pid) end @@ -194,20 +194,12 @@ def replace_fork(pid, status) Lepus.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| if (terminated_fork = forks.delete(pid)) payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) start_process(configured_processes.delete(pid)) end end end - def handle_claimed_jobs_by(terminated_fork, status) - # if registered_process = process.supervisees.find_by(name: terminated_fork.name) - # error = Processes::ProcessExitError.new(status) - # registered_process.fail_all_claimed_executions_with(error) - # end - end - def all_forks_terminated? forks.empty? end diff --git a/lib/lepus/supervisor/config.rb b/lib/lepus/supervisor/config.rb index c6b36e5..1c7e417 100644 --- a/lib/lepus/supervisor/config.rb +++ b/lib/lepus/supervisor/config.rb @@ -6,7 +6,7 @@ module Lepus class Supervisor < Processes::Base class Config - class Process < Struct.new(:process_class, :attributes) + class ProcessStruct < Struct.new(:process_class, :attributes) def instantiate process_class.new(**attributes) end @@ -37,7 +37,7 @@ def consumers def consumer_processes @consumer_processes ||= consumers.map do |class_name| - Process.new(Lepus::Processes::Consumer, {class_name: class_name}) + ProcessStruct.new(Lepus::Processes::Consumer, {class_name: class_name}) end end end diff --git a/lib/lepus/supervisor/maintenance.rb b/lib/lepus/supervisor/maintenance.rb index 6ea3540..b5ed1d9 100644 --- a/lib/lepus/supervisor/maintenance.rb +++ b/lib/lepus/supervisor/maintenance.rb @@ -3,9 +3,6 @@ class Supervisor < Processes::Base module Maintenance def self.included(base) base.send :include, InstanceMethods - base.class_eval do - after_boot :fail_orphaned_executions - end end module InstanceMethods @@ -32,12 +29,6 @@ def prune_dead_processes Lepus::Process.prune(excluding: process) end end - - def fail_orphaned_executions - wrap_in_app_executor do - # ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) - end - end end end end diff --git a/spec/lepus/consumer_spec.rb b/spec/lepus/consumer_spec.rb index 5c7a8c3..3495c4b 100644 --- a/spec/lepus/consumer_spec.rb +++ b/spec/lepus/consumer_spec.rb @@ -116,6 +116,14 @@ ).to be :requeue end + it "allows returning :nack" do + allow(instance).to receive(:perform).and_return(:nack) + + expect( + instance.process_delivery(delivery_info, metadata, payload) + ).to be :nack + end + it "raises an error if #perform does not return a valid symbol" do allow(instance).to receive(:perform).and_return(:blorg) @@ -170,6 +178,20 @@ def perform(_message) end end + describe "#nack" do + let(:instance) do + Class.new(Lepus::Consumer) do + def perform(_message) + nack + end + end.new + end + + it "returns :nack when called in #perform" do + expect(instance.perform(message)).to eq(:nack) + end + end + describe ".use" do let(:instance) do Class.new(Lepus::Consumer) do