Skip to content

Commit

Permalink
feat: automatically reject message on failures
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Feb 11, 2025
1 parent f8e5d85 commit e3d64c0
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 92 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ services:
ports:
- 5672:5672
- 15672:15672
command: >
sh -c "rabbitmq-plugins enable rabbitmq_management && rabbitmq-server"
33 changes: 2 additions & 31 deletions lib/lepus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ module Lepus
class Error < StandardError
end

# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < Error
end

# Error that is raised when an invalid value is returned from {#work}
class InvalidConsumerReturnError < Error
def initialize(value)
Expand All @@ -50,33 +46,8 @@ class InvalidConsumerConfigError < Error
class ShutdownError < Error
end

module Processes
class ProcessMissingError < RuntimeError
def initialize
super("The process that was running this job no longer exists")
end
end

class ProcessExitError < RuntimeError
def initialize(status)
message = "Process pid=#{status.pid} exited unexpectedly."
if status.exitstatus
message += " Exited with status #{status.exitstatus}."
end

if status.signaled?
message += " Received unhandled signal #{status.termsig}."
end

super(message)
end
end

class ProcessPrunedError < RuntimeError
def initialize(last_heartbeat_at)
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
end
end
# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < ShutdownError
end

extend self
Expand Down
13 changes: 8 additions & 5 deletions lib/lepus/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Lepus
class Configuration
DEFAULT_RABBITMQ_URL = "amqp://guest:guest@localhost:5672"
DEFAULT_RECOVERY_ATTEMPTS = 10
DEFAULT_RECOVERY_INTERVAL = 5.0
DEFAULT_RECOVER_FROM_CONNECTION_CLOSE = true
DEFAULT_CONSUMERS_DIRECTORY = Pathname.new("app/consumers")

Expand All @@ -20,6 +21,9 @@ class Configuration
# @return [Integer] max number of recovery attempts, nil means forever
attr_accessor :recovery_attempts

# @return [Integer] the interval in seconds between network recovery attempts.
attr_accessor :recovery_interval

# @return [Pathname] the directory where the consumers are stored.
attr_reader :consumers_directory

Expand All @@ -40,6 +44,7 @@ def initialize
@connection_name = "Lepus (#{Lepus::VERSION})"
@rabbitmq_url = ENV.fetch("RABBITMQ_URL", DEFAULT_RABBITMQ_URL) || DEFAULT_RABBITMQ_URL
@recovery_attempts = DEFAULT_RECOVERY_ATTEMPTS
@recovery_interval = DEFAULT_RECOVERY_INTERVAL
@recover_from_connection_close = DEFAULT_RECOVER_FROM_CONNECTION_CLOSE
@consumers_directory = DEFAULT_CONSUMERS_DIRECTORY
@process_heartbeat_interval = 60
Expand Down Expand Up @@ -67,18 +72,16 @@ def connection_config
connection_name: connection_name,
recover_from_connection_close: recover_from_connection_close,
recovery_attempts: recovery_attempts,
network_recovery_interval: recovery_interval,
recovery_attempts_exhausted: recovery_attempts_exhausted
}.compact
end

# @return [Proc] that is passed to Bunny’s recovery_attempts_exhausted block. Nil if recovery_attempts is nil.
# @return [Proc, NilClass] Proc that is passed to Bunny’s recovery_attempts_exhausted block.
def recovery_attempts_exhausted
return nil unless recovery_attempts
return unless recovery_attempts

proc do
# We need to have this since Bunny’s multi-threading is cumbersome here.
# Session reconnection seems not to be done in the main thread. If we want to
# achieve a restart of the app we need to modify the thread behaviour.
Thread.current.abort_on_exception = true
raise Lepus::MaxRecoveryAttemptsExhaustedError
end
Expand Down
20 changes: 17 additions & 3 deletions lib/lepus/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def config
return if @abstract_class == true
return @config if defined?(@config)

name = Primitive::String.new(self).underscore.split("/").last
name = Primitive::String.new(to_s).underscore.split("/").last
@config = ConsumerConfig.new(queue: name, exchange: name)
end

Expand All @@ -43,13 +43,14 @@ def use(middleware, opts = {})
if middleware.is_a?(Symbol) || middleware.is_a?(String)
begin
require_relative "middlewares/#{middleware}"
class_name = Primitive::String.new(middleware).classify
class_name = Primitive::String.new(middleware.to_s).classify
class_name = "JSON" if class_name == "Json"
middleware = Lepus::Middlewares.const_get(class_name)
rescue LoadError, NameError
raise ArgumentError, "Middleware #{middleware} not found"
end
end

middlewares << middleware.new(**opts)
end

Expand Down Expand Up @@ -103,6 +104,11 @@ def process_delivery(delivery_info, metadata, payload)
nest_middleware(middleware, next_middleware)
end
.call(message)
rescue Exception => ex
# @TODO: add error handling
logger.error(ex)

reject!
end

protected
Expand Down Expand Up @@ -136,6 +142,14 @@ def requeue!
end
alias_method :requeue, :requeue!

# Helper method to nack a message.
#
# @return [:nack]
def nack!
:nack
end
alias_method :nack, :nack!

private

def work_proc
Expand All @@ -153,7 +167,7 @@ def nest_middleware(middleware, next_middleware)
end

def verify_result(result)
return if %i[ack reject requeue].include?(result)
return if %i[ack reject requeue nack].include?(result)

raise InvalidConsumerReturnError, result
end
Expand Down
12 changes: 6 additions & 6 deletions lib/lepus/consumer_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def channel_args
end

def exchange_args
name = @exchange_opts[:name]
raise InvalidConsumerConfigError, "Exchange name is required" if name.nil?
[name, @exchange_opts.reject { |k, v| k == :name }]
[exchange_name, @exchange_opts.reject { |k, v| k == :name }]
end

def consumer_queue_args
Expand Down Expand Up @@ -113,10 +111,12 @@ def binds_args

protected

def exchange_name
@exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required")
end

def queue_name
name = @queue_opts[:name]
raise InvalidConsumerConfigError, "Queue name is required" if name.nil?
name
@queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required")
end

def retry_queue_name
Expand Down
2 changes: 2 additions & 0 deletions lib/lepus/consumer_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def process_result(result, delivery_tag)
channel.reject(delivery_tag)
when :requeue
channel.reject(delivery_tag, true)
when :nack
channel.nack(delivery_tag, false, true)
else
raise Lepus::InvalidConsumerReturnError, result
end
Expand Down
3 changes: 0 additions & 3 deletions lib/lepus/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def deregister(pruned: false)
end

def prune
# error = Lepus::Processes::ProcessPrunedError.new(last_heartbeat_at)
# fail_all_claimed_executions_with(error)

deregister(pruned: true)
end

Expand Down
17 changes: 10 additions & 7 deletions lib/lepus/processes/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run
end

def shutdown
@consumers.to_a.each(&:cancel)
@subscriptions.to_a.each(&:cancel)
@channel&.close
@bunny&.close

Expand All @@ -71,8 +71,10 @@ def setup_consumer!
@bunny = Thread.current[:lepus_bunny] || Lepus.config.create_connection
@channel = Thread.current[:lepus_channel] || begin
@bunny.create_channel(nil, 1, true).tap do |channel|
channel.prefetch(1)
channel.on_uncaught_exception { |error| handle_thread_error(error) }
channel.prefetch(1) # @TODO make this configurable
channel.on_uncaught_exception { |error|
handle_thread_error(error)
}
end
end

Expand All @@ -84,7 +86,7 @@ def setup_consumer!
@error_queue = @channel.queue(*args)
end

@consumers = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future, need to test how this works
@subscriptions = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future
main_queue = @channel.queue(*consumer_class.config.consumer_queue_args)
consumer_class.config.binds_args.each do |opts|
main_queue.bind(@exchange, **opts)
Expand All @@ -102,9 +104,10 @@ def setup_consumer!
end
main_queue.subscribe_with(consumer_wrapper)
end
# rescue Lepus::InvalidConsumerConfigError => e
# # shutdown if the consumer config is invalid
# raise e
rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError => e
raise Lepus::ShutdownError
rescue Lepus::InvalidConsumerConfigError => e
raise Lepus::ShutdownError
end
end
end
2 changes: 0 additions & 2 deletions lib/lepus/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
module Lepus::Processes
module Interruptible
def wake_up
puts "[Interruptible |> #{self.class.name}] Waking up process..."
interrupt
end

Expand All @@ -12,7 +11,6 @@ def wake_up
SELF_PIPE_BLOCK_SIZE = 11

def interrupt
puts "[Interruptible |> #{self.class.name}] Interrupting process..."
self_pipe[:writer].write_nonblock(".")
rescue Errno::EAGAIN, Errno::EINTR
# Ignore writes that would block and retry
Expand Down
4 changes: 2 additions & 2 deletions lib/lepus/processes/procline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Lepus::Processes
module Procline
# Sets the procline ($0)
# lepus-supervisor(0.1.0): <string>
# [lepus-supervisor: <string>]
def procline(string)
$0 = "lepus-#{self.class.name.split("::").last.downcase}: #{string}"
$0 = "[lepus-#{kind.downcase}: #{string}]"
end
end
end
2 changes: 1 addition & 1 deletion lib/lepus/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def launch_heartbeat
wrap_in_app_executor { heartbeat }
end

@heartbeat_task.add_observer do |_, _, error|
@heartbeat_task.add_observer do |_time, _result, error|
handle_thread_error(error) if error
end

Expand Down
21 changes: 11 additions & 10 deletions lib/lepus/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,31 @@ def boot
end

def shutting_down?
stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered?
stopped? || (running_as_fork? && supervisor_went_away?) || !registered? # || finished?
end

def run
raise NotImplementedError
end

def finished?
running_inline? && all_work_completed?
end
# @TODO Add it to the inline mode
# def finished?
# running_inline? && all_work_completed?
# end

def all_work_completed?
false
end
# def all_work_completed?
# false
# end

def shutdown
end

def set_procline
end

def running_inline?
mode.inline?
end
# def running_inline?
# mode.inline?
# end

def running_async?
mode.async?
Expand Down
14 changes: 3 additions & 11 deletions lib/lepus/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def reap_terminated_forks
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
handle_claimed_jobs_by(terminated_fork, status)
end
# if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
# handle_claimed_jobs_by(terminated_fork, status)
# end

configured_processes.delete(pid)
end
Expand All @@ -194,20 +194,12 @@ def replace_fork(pid, status)
Lepus.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if (terminated_fork = forks.delete(pid))
payload[:fork] = terminated_fork
handle_claimed_jobs_by(terminated_fork, status)

start_process(configured_processes.delete(pid))
end
end
end

def handle_claimed_jobs_by(terminated_fork, status)
# if registered_process = process.supervisees.find_by(name: terminated_fork.name)
# error = Processes::ProcessExitError.new(status)
# registered_process.fail_all_claimed_executions_with(error)
# end
end

def all_forks_terminated?
forks.empty?
end
Expand Down
4 changes: 2 additions & 2 deletions lib/lepus/supervisor/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
module Lepus
class Supervisor < Processes::Base
class Config
class Process < Struct.new(:process_class, :attributes)
class ProcessStruct < Struct.new(:process_class, :attributes)
def instantiate
process_class.new(**attributes)
end
Expand Down Expand Up @@ -37,7 +37,7 @@ def consumers

def consumer_processes
@consumer_processes ||= consumers.map do |class_name|
Process.new(Lepus::Processes::Consumer, {class_name: class_name})
ProcessStruct.new(Lepus::Processes::Consumer, {class_name: class_name})
end
end
end
Expand Down
Loading

0 comments on commit e3d64c0

Please sign in to comment.