Skip to content

Commit

Permalink
daemon improvements (#1)
Browse files Browse the repository at this point in the history
* feat: add Primitive::String with with smart detection for ActiveSupport and Dry inflector (And rasoable fallback)

* disable lifecycle hooks

* feat: automatically reject message on failures

* chore(lint): fix rubocop offenses

* fix: fix broken specs

* feat: add back lifecycle

* feat: update version to beta2
  • Loading branch information
marcosgz authored Feb 11, 2025
1 parent ac9be46 commit 485af74
Show file tree
Hide file tree
Showing 22 changed files with 283 additions and 97 deletions.
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand All @@ -20,7 +20,7 @@ GEM
amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2)
coderay (1.1.3)
concurrent-ruby (1.3.4)
concurrent-ruby (1.3.5)
crack (1.0.0)
bigdecimal
rexml
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ services:
ports:
- 5672:5672
- 15672:15672
command: >
sh -c "rabbitmq-plugins enable rabbitmq_management && rabbitmq-server"
2 changes: 1 addition & 1 deletion gemfiles/rails52.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails61.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand Down
34 changes: 4 additions & 30 deletions lib/lepus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ module Lepus
class Error < StandardError
end

# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < Error
end

# Error that is raised when an invalid value is returned from {#work}
class InvalidConsumerReturnError < Error
def initialize(value)
Expand All @@ -47,33 +43,11 @@ def initialize(value)
class InvalidConsumerConfigError < Error
end

module Processes
class ProcessMissingError < RuntimeError
def initialize
super("The process that was running this job no longer exists")
end
end

class ProcessExitError < RuntimeError
def initialize(status)
message = "Process pid=#{status.pid} exited unexpectedly."
if status.exitstatus
message += " Exited with status #{status.exitstatus}."
end

if status.signaled?
message += " Received unhandled signal #{status.termsig}."
end

super(message)
end
end
class ShutdownError < Error
end

class ProcessPrunedError < RuntimeError
def initialize(last_heartbeat_at)
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
end
end
# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < ShutdownError
end

extend self
Expand Down
13 changes: 8 additions & 5 deletions lib/lepus/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Lepus
class Configuration
DEFAULT_RABBITMQ_URL = "amqp://guest:guest@localhost:5672"
DEFAULT_RECOVERY_ATTEMPTS = 10
DEFAULT_RECOVERY_INTERVAL = 5.0
DEFAULT_RECOVER_FROM_CONNECTION_CLOSE = true
DEFAULT_CONSUMERS_DIRECTORY = Pathname.new("app/consumers")

Expand All @@ -20,6 +21,9 @@ class Configuration
# @return [Integer] max number of recovery attempts, nil means forever
attr_accessor :recovery_attempts

# @return [Integer] the interval in seconds between network recovery attempts.
attr_accessor :recovery_interval

# @return [Pathname] the directory where the consumers are stored.
attr_reader :consumers_directory

Expand All @@ -40,6 +44,7 @@ def initialize
@connection_name = "Lepus (#{Lepus::VERSION})"
@rabbitmq_url = ENV.fetch("RABBITMQ_URL", DEFAULT_RABBITMQ_URL) || DEFAULT_RABBITMQ_URL
@recovery_attempts = DEFAULT_RECOVERY_ATTEMPTS
@recovery_interval = DEFAULT_RECOVERY_INTERVAL
@recover_from_connection_close = DEFAULT_RECOVER_FROM_CONNECTION_CLOSE
@consumers_directory = DEFAULT_CONSUMERS_DIRECTORY
@process_heartbeat_interval = 60
Expand Down Expand Up @@ -67,18 +72,16 @@ def connection_config
connection_name: connection_name,
recover_from_connection_close: recover_from_connection_close,
recovery_attempts: recovery_attempts,
network_recovery_interval: recovery_interval,
recovery_attempts_exhausted: recovery_attempts_exhausted
}.compact
end

# @return [Proc] that is passed to Bunny’s recovery_attempts_exhausted block. Nil if recovery_attempts is nil.
# @return [Proc, NilClass] Proc that is passed to Bunny’s recovery_attempts_exhausted block.
def recovery_attempts_exhausted
return nil unless recovery_attempts
return unless recovery_attempts

proc do
# We need to have this since Bunny’s multi-threading is cumbersome here.
# Session reconnection seems not to be done in the main thread. If we want to
# achieve a restart of the app we need to modify the thread behaviour.
Thread.current.abort_on_exception = true
raise Lepus::MaxRecoveryAttemptsExhaustedError
end
Expand Down
22 changes: 19 additions & 3 deletions lib/lepus/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def config
return if @abstract_class == true
return @config if defined?(@config)

name = name.split("::").last.gsub(/([a-z\d])([A-Z])/, '\1_\2').downcase
name = Primitive::String.new(to_s).underscore.split("/").last
@config = ConsumerConfig.new(queue: name, exchange: name)
end

Expand All @@ -43,13 +43,14 @@ def use(middleware, opts = {})
if middleware.is_a?(Symbol) || middleware.is_a?(String)
begin
require_relative "middlewares/#{middleware}"
class_name = middleware.to_s.split("_").map(&:capitalize).join
class_name = Primitive::String.new(middleware.to_s).classify
class_name = "JSON" if class_name == "Json"
middleware = Lepus::Middlewares.const_get(class_name)
rescue LoadError, NameError
raise ArgumentError, "Middleware #{middleware} not found"
end
end

middlewares << middleware.new(**opts)
end

Expand Down Expand Up @@ -103,6 +104,13 @@ def process_delivery(delivery_info, metadata, payload)
nest_middleware(middleware, next_middleware)
end
.call(message)
rescue Lepus::InvalidConsumerReturnError
raise
rescue Exception => ex # rubocop:disable Lint/RescueException
# @TODO: add error handling
logger.error(ex)

reject!
end

protected
Expand Down Expand Up @@ -136,6 +144,14 @@ def requeue!
end
alias_method :requeue, :requeue!

# Helper method to nack a message.
#
# @return [:nack]
def nack!
:nack
end
alias_method :nack, :nack!

private

def work_proc
Expand All @@ -153,7 +169,7 @@ def nest_middleware(middleware, next_middleware)
end

def verify_result(result)
return if %i[ack reject requeue].include?(result)
return if %i[ack reject requeue nack].include?(result)

raise InvalidConsumerReturnError, result
end
Expand Down
12 changes: 6 additions & 6 deletions lib/lepus/consumer_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def channel_args
end

def exchange_args
name = @exchange_opts[:name]
raise InvalidConsumerConfigError, "Exchange name is required" if name.nil?
[name, @exchange_opts.reject { |k, v| k == :name }]
[exchange_name, @exchange_opts.reject { |k, v| k == :name }]
end

def consumer_queue_args
Expand Down Expand Up @@ -113,10 +111,12 @@ def binds_args

protected

def exchange_name
@exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required")
end

def queue_name
name = @queue_opts[:name]
raise InvalidConsumerConfigError, "Queue name is required" if name.nil?
name
@queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required")
end

def retry_queue_name
Expand Down
2 changes: 2 additions & 0 deletions lib/lepus/consumer_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def process_result(result, delivery_tag)
channel.reject(delivery_tag)
when :requeue
channel.reject(delivery_tag, true)
when :nack
channel.nack(delivery_tag, false, true)
else
raise Lepus::InvalidConsumerReturnError, result
end
Expand Down
1 change: 1 addition & 0 deletions lib/lepus/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

module Lepus
# @TODO: Move after/before fork hooks to this module
module LifecycleHooks
def self.included(base)
base.extend ClassMethods
Expand Down
55 changes: 55 additions & 0 deletions lib/lepus/primitive/string.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

begin
require "dry/inflector"
rescue LoadError
# noop
end

begin
require "active_support/inflector"
rescue LoadError
# noop
end

module Lepus::Primitive
class String < ::String
def classify
new_str = if defined?(Dry::Inflector)
Dry::Inflector.new.classify(self)
elsif defined?(ActiveSupport::Inflector)
ActiveSupport::Inflector.classify(self)
else
split("/").collect do |c|
c.split("_").collect(&:capitalize).join
end.join("::")
end

self.class.new(new_str)
end

def constantize
if defined?(Dry::Inflector)
Dry::Inflector.new.constantize(self)
elsif defined?(ActiveSupport::Inflector)
ActiveSupport::Inflector.constantize(self)
else
Object.const_get(self)
end
end

def underscore
new_str = sub(/^::/, "")
.gsub("::", "/")
.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
.gsub(/([a-z\d])([A-Z])/, '\1_\2')
.tr("-", "_")
.tr(".", "_")
.gsub(/\s/, "_")
.gsub(/__+/, "_")
.downcase

self.class.new(new_str)
end
end
end
3 changes: 0 additions & 3 deletions lib/lepus/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def deregister(pruned: false)
end

def prune
# error = Lepus::Processes::ProcessPrunedError.new(last_heartbeat_at)
# fail_all_claimed_executions_with(error)

deregister(pruned: true)
end

Expand Down
20 changes: 12 additions & 8 deletions lib/lepus/processes/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Consumer < Base

def initialize(class_name:, **options)
@consumer_class = class_name
@consumer_class = @consumer_class.constantize if @consumer_class.is_a?(String)
@consumer_class = Lepus::Primitive::String.new(@consumer_class).constantize if @consumer_class.is_a?(String)

super(**options)
end

Expand Down Expand Up @@ -51,7 +52,7 @@ def run
end

def shutdown
@consumers.to_a.each(&:cancel)
@subscriptions.to_a.each(&:cancel)
@channel&.close
@bunny&.close

Expand All @@ -70,8 +71,10 @@ def setup_consumer!
@bunny = Thread.current[:lepus_bunny] || Lepus.config.create_connection
@channel = Thread.current[:lepus_channel] || begin
@bunny.create_channel(nil, 1, true).tap do |channel|
channel.prefetch(1)
channel.on_uncaught_exception { |error| handle_thread_error(error) }
channel.prefetch(1) # @TODO make this configurable
channel.on_uncaught_exception { |error|
handle_thread_error(error)
}
end
end

Expand All @@ -83,7 +86,7 @@ def setup_consumer!
@error_queue = @channel.queue(*args)
end

@consumers = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future, need to test how this works
@subscriptions = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future
main_queue = @channel.queue(*consumer_class.config.consumer_queue_args)
consumer_class.config.binds_args.each do |opts|
main_queue.bind(@exchange, **opts)
Expand All @@ -101,9 +104,10 @@ def setup_consumer!
end
main_queue.subscribe_with(consumer_wrapper)
end
# rescue Lepus::InvalidConsumerConfigError => e
# # shutdown if the consumer config is invalid
# raise e
rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError
raise Lepus::ShutdownError
rescue Lepus::InvalidConsumerConfigError
raise Lepus::ShutdownError
end
end
end
4 changes: 2 additions & 2 deletions lib/lepus/processes/procline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Lepus::Processes
module Procline
# Sets the procline ($0)
# lepus-supervisor(0.1.0): <string>
# [lepus-supervisor: <string>]
def procline(string)
$0 = "lepus-#{self.class.name.split("::").last.downcase}: #{string}"
$0 = "[lepus-#{kind.downcase}: #{string}]"
end
end
end
2 changes: 1 addition & 1 deletion lib/lepus/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def launch_heartbeat
wrap_in_app_executor { heartbeat }
end

@heartbeat_task.add_observer do |_, _, error|
@heartbeat_task.add_observer do |_time, _result, error|
handle_thread_error(error) if error
end

Expand Down
Loading

0 comments on commit 485af74

Please sign in to comment.