Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

daemon improvements #1

Merged
merged 7 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand All @@ -20,7 +20,7 @@ GEM
amq-protocol (~> 2.3, >= 2.3.1)
sorted_set (~> 1, >= 1.0.2)
coderay (1.1.3)
concurrent-ruby (1.3.4)
concurrent-ruby (1.3.5)
crack (1.0.0)
bigdecimal
rexml
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ services:
ports:
- 5672:5672
- 15672:15672
command: >
sh -c "rabbitmq-plugins enable rabbitmq_management && rabbitmq-server"
2 changes: 1 addition & 1 deletion gemfiles/rails52.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails61.gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
lepus (0.0.1.beta1)
lepus (0.0.1.beta2)
bunny
concurrent-ruby
multi_json
Expand Down
34 changes: 4 additions & 30 deletions lib/lepus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ module Lepus
class Error < StandardError
end

# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < Error
end

# Error that is raised when an invalid value is returned from {#work}
class InvalidConsumerReturnError < Error
def initialize(value)
Expand All @@ -47,33 +43,11 @@ def initialize(value)
class InvalidConsumerConfigError < Error
end

module Processes
class ProcessMissingError < RuntimeError
def initialize
super("The process that was running this job no longer exists")
end
end

class ProcessExitError < RuntimeError
def initialize(status)
message = "Process pid=#{status.pid} exited unexpectedly."
if status.exitstatus
message += " Exited with status #{status.exitstatus}."
end

if status.signaled?
message += " Received unhandled signal #{status.termsig}."
end

super(message)
end
end
class ShutdownError < Error
end

class ProcessPrunedError < RuntimeError
def initialize(last_heartbeat_at)
super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}")
end
end
# Error that is raised when the Bunny recovery attempts are exhausted.
class MaxRecoveryAttemptsExhaustedError < ShutdownError
end

extend self
Expand Down
13 changes: 8 additions & 5 deletions lib/lepus/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Lepus
class Configuration
DEFAULT_RABBITMQ_URL = "amqp://guest:guest@localhost:5672"
DEFAULT_RECOVERY_ATTEMPTS = 10
DEFAULT_RECOVERY_INTERVAL = 5.0
DEFAULT_RECOVER_FROM_CONNECTION_CLOSE = true
DEFAULT_CONSUMERS_DIRECTORY = Pathname.new("app/consumers")

Expand All @@ -20,6 +21,9 @@ class Configuration
# @return [Integer] max number of recovery attempts, nil means forever
attr_accessor :recovery_attempts

# @return [Integer] the interval in seconds between network recovery attempts.
attr_accessor :recovery_interval

# @return [Pathname] the directory where the consumers are stored.
attr_reader :consumers_directory

Expand All @@ -40,6 +44,7 @@ def initialize
@connection_name = "Lepus (#{Lepus::VERSION})"
@rabbitmq_url = ENV.fetch("RABBITMQ_URL", DEFAULT_RABBITMQ_URL) || DEFAULT_RABBITMQ_URL
@recovery_attempts = DEFAULT_RECOVERY_ATTEMPTS
@recovery_interval = DEFAULT_RECOVERY_INTERVAL
@recover_from_connection_close = DEFAULT_RECOVER_FROM_CONNECTION_CLOSE
@consumers_directory = DEFAULT_CONSUMERS_DIRECTORY
@process_heartbeat_interval = 60
Expand Down Expand Up @@ -67,18 +72,16 @@ def connection_config
connection_name: connection_name,
recover_from_connection_close: recover_from_connection_close,
recovery_attempts: recovery_attempts,
network_recovery_interval: recovery_interval,
recovery_attempts_exhausted: recovery_attempts_exhausted
}.compact
end

# @return [Proc] that is passed to Bunny’s recovery_attempts_exhausted block. Nil if recovery_attempts is nil.
# @return [Proc, NilClass] Proc that is passed to Bunny’s recovery_attempts_exhausted block.
def recovery_attempts_exhausted
return nil unless recovery_attempts
return unless recovery_attempts

proc do
# We need to have this since Bunny’s multi-threading is cumbersome here.
# Session reconnection seems not to be done in the main thread. If we want to
# achieve a restart of the app we need to modify the thread behaviour.
Thread.current.abort_on_exception = true
raise Lepus::MaxRecoveryAttemptsExhaustedError
end
Expand Down
22 changes: 19 additions & 3 deletions lib/lepus/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def config
return if @abstract_class == true
return @config if defined?(@config)

name = name.split("::").last.gsub(/([a-z\d])([A-Z])/, '\1_\2').downcase
name = Primitive::String.new(to_s).underscore.split("/").last
@config = ConsumerConfig.new(queue: name, exchange: name)
end

Expand All @@ -43,13 +43,14 @@ def use(middleware, opts = {})
if middleware.is_a?(Symbol) || middleware.is_a?(String)
begin
require_relative "middlewares/#{middleware}"
class_name = middleware.to_s.split("_").map(&:capitalize).join
class_name = Primitive::String.new(middleware.to_s).classify
class_name = "JSON" if class_name == "Json"
middleware = Lepus::Middlewares.const_get(class_name)
rescue LoadError, NameError
raise ArgumentError, "Middleware #{middleware} not found"
end
end

middlewares << middleware.new(**opts)
end

Expand Down Expand Up @@ -103,6 +104,13 @@ def process_delivery(delivery_info, metadata, payload)
nest_middleware(middleware, next_middleware)
end
.call(message)
rescue Lepus::InvalidConsumerReturnError
raise
rescue Exception => ex # rubocop:disable Lint/RescueException
# @TODO: add error handling
logger.error(ex)

reject!
end

protected
Expand Down Expand Up @@ -136,6 +144,14 @@ def requeue!
end
alias_method :requeue, :requeue!

# Helper method to nack a message.
#
# @return [:nack]
def nack!
:nack
end
alias_method :nack, :nack!

private

def work_proc
Expand All @@ -153,7 +169,7 @@ def nest_middleware(middleware, next_middleware)
end

def verify_result(result)
return if %i[ack reject requeue].include?(result)
return if %i[ack reject requeue nack].include?(result)

raise InvalidConsumerReturnError, result
end
Expand Down
12 changes: 6 additions & 6 deletions lib/lepus/consumer_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def channel_args
end

def exchange_args
name = @exchange_opts[:name]
raise InvalidConsumerConfigError, "Exchange name is required" if name.nil?
[name, @exchange_opts.reject { |k, v| k == :name }]
[exchange_name, @exchange_opts.reject { |k, v| k == :name }]
end

def consumer_queue_args
Expand Down Expand Up @@ -113,10 +111,12 @@ def binds_args

protected

def exchange_name
@exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required")
end

def queue_name
name = @queue_opts[:name]
raise InvalidConsumerConfigError, "Queue name is required" if name.nil?
name
@queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required")
end

def retry_queue_name
Expand Down
2 changes: 2 additions & 0 deletions lib/lepus/consumer_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def process_result(result, delivery_tag)
channel.reject(delivery_tag)
when :requeue
channel.reject(delivery_tag, true)
when :nack
channel.nack(delivery_tag, false, true)
else
raise Lepus::InvalidConsumerReturnError, result
end
Expand Down
1 change: 1 addition & 0 deletions lib/lepus/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

module Lepus
# @TODO: Move after/before fork hooks to this module
module LifecycleHooks
def self.included(base)
base.extend ClassMethods
Expand Down
55 changes: 55 additions & 0 deletions lib/lepus/primitive/string.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

begin
require "dry/inflector"
rescue LoadError
# noop
end

begin
require "active_support/inflector"
rescue LoadError
# noop
end

module Lepus::Primitive
class String < ::String
def classify
new_str = if defined?(Dry::Inflector)
Dry::Inflector.new.classify(self)
elsif defined?(ActiveSupport::Inflector)
ActiveSupport::Inflector.classify(self)
else
split("/").collect do |c|
c.split("_").collect(&:capitalize).join
end.join("::")
end

self.class.new(new_str)
end

def constantize
if defined?(Dry::Inflector)
Dry::Inflector.new.constantize(self)
elsif defined?(ActiveSupport::Inflector)
ActiveSupport::Inflector.constantize(self)
else
Object.const_get(self)
end
end

def underscore
new_str = sub(/^::/, "")
.gsub("::", "/")
.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
.gsub(/([a-z\d])([A-Z])/, '\1_\2')
.tr("-", "_")
.tr(".", "_")
.gsub(/\s/, "_")
.gsub(/__+/, "_")
.downcase

self.class.new(new_str)
end
end
end
3 changes: 0 additions & 3 deletions lib/lepus/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def deregister(pruned: false)
end

def prune
# error = Lepus::Processes::ProcessPrunedError.new(last_heartbeat_at)
# fail_all_claimed_executions_with(error)

deregister(pruned: true)
end

Expand Down
20 changes: 12 additions & 8 deletions lib/lepus/processes/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Consumer < Base

def initialize(class_name:, **options)
@consumer_class = class_name
@consumer_class = @consumer_class.constantize if @consumer_class.is_a?(String)
@consumer_class = Lepus::Primitive::String.new(@consumer_class).constantize if @consumer_class.is_a?(String)

super(**options)
end

Expand Down Expand Up @@ -51,7 +52,7 @@ def run
end

def shutdown
@consumers.to_a.each(&:cancel)
@subscriptions.to_a.each(&:cancel)
@channel&.close
@bunny&.close

Expand All @@ -70,8 +71,10 @@ def setup_consumer!
@bunny = Thread.current[:lepus_bunny] || Lepus.config.create_connection
@channel = Thread.current[:lepus_channel] || begin
@bunny.create_channel(nil, 1, true).tap do |channel|
channel.prefetch(1)
channel.on_uncaught_exception { |error| handle_thread_error(error) }
channel.prefetch(1) # @TODO make this configurable
channel.on_uncaught_exception { |error|
handle_thread_error(error)
}
end
end

Expand All @@ -83,7 +86,7 @@ def setup_consumer!
@error_queue = @channel.queue(*args)
end

@consumers = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future, need to test how this works
@subscriptions = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future
main_queue = @channel.queue(*consumer_class.config.consumer_queue_args)
consumer_class.config.binds_args.each do |opts|
main_queue.bind(@exchange, **opts)
Expand All @@ -101,9 +104,10 @@ def setup_consumer!
end
main_queue.subscribe_with(consumer_wrapper)
end
# rescue Lepus::InvalidConsumerConfigError => e
# # shutdown if the consumer config is invalid
# raise e
rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError
raise Lepus::ShutdownError
rescue Lepus::InvalidConsumerConfigError
raise Lepus::ShutdownError
end
end
end
4 changes: 2 additions & 2 deletions lib/lepus/processes/procline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Lepus::Processes
module Procline
# Sets the procline ($0)
# lepus-supervisor(0.1.0): <string>
# [lepus-supervisor: <string>]
def procline(string)
$0 = "lepus-#{self.class.name.split("::").last.downcase}: #{string}"
$0 = "[lepus-#{kind.downcase}: #{string}]"
end
end
end
2 changes: 1 addition & 1 deletion lib/lepus/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def launch_heartbeat
wrap_in_app_executor { heartbeat }
end

@heartbeat_task.add_observer do |_, _, error|
@heartbeat_task.add_observer do |_time, _result, error|
handle_thread_error(error) if error
end

Expand Down
Loading