diff --git a/Gemfile.lock b/Gemfile.lock index 9695737..5797c72 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - lepus (0.0.1.beta1) + lepus (0.0.1.beta2) bunny concurrent-ruby multi_json @@ -20,7 +20,7 @@ GEM amq-protocol (~> 2.3, >= 2.3.1) sorted_set (~> 1, >= 1.0.2) coderay (1.1.3) - concurrent-ruby (1.3.4) + concurrent-ruby (1.3.5) crack (1.0.0) bigdecimal rexml diff --git a/docker-compose.yml b/docker-compose.yml index d686ef3..5a7aa79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,3 +4,5 @@ services: ports: - 5672:5672 - 15672:15672 + command: > + sh -c "rabbitmq-plugins enable rabbitmq_management && rabbitmq-server" diff --git a/gemfiles/rails52.gemfile.lock b/gemfiles/rails52.gemfile.lock index f0e942f..35445e0 100644 --- a/gemfiles/rails52.gemfile.lock +++ b/gemfiles/rails52.gemfile.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - lepus (0.0.1.beta1) + lepus (0.0.1.beta2) bunny concurrent-ruby multi_json diff --git a/gemfiles/rails61.gemfile.lock b/gemfiles/rails61.gemfile.lock index 98f9a34..85828f8 100644 --- a/gemfiles/rails61.gemfile.lock +++ b/gemfiles/rails61.gemfile.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - lepus (0.0.1.beta1) + lepus (0.0.1.beta2) bunny concurrent-ruby multi_json diff --git a/lib/lepus.rb b/lib/lepus.rb index 2afe342..749deee 100644 --- a/lib/lepus.rb +++ b/lib/lepus.rb @@ -31,10 +31,6 @@ module Lepus class Error < StandardError end - # Error that is raised when the Bunny recovery attempts are exhausted. - class MaxRecoveryAttemptsExhaustedError < Error - end - # Error that is raised when an invalid value is returned from {#work} class InvalidConsumerReturnError < Error def initialize(value) @@ -47,33 +43,11 @@ def initialize(value) class InvalidConsumerConfigError < Error end - module Processes - class ProcessMissingError < RuntimeError - def initialize - super("The process that was running this job no longer exists") - end - end - - class ProcessExitError < RuntimeError - def initialize(status) - message = "Process pid=#{status.pid} exited unexpectedly." - if status.exitstatus - message += " Exited with status #{status.exitstatus}." - end - - if status.signaled? - message += " Received unhandled signal #{status.termsig}." - end - - super(message) - end - end + class ShutdownError < Error + end - class ProcessPrunedError < RuntimeError - def initialize(last_heartbeat_at) - super("Process was found dead and pruned (last heartbeat at: #{last_heartbeat_at}") - end - end + # Error that is raised when the Bunny recovery attempts are exhausted. + class MaxRecoveryAttemptsExhaustedError < ShutdownError end extend self diff --git a/lib/lepus/configuration.rb b/lib/lepus/configuration.rb index 91ef20e..f2701d1 100644 --- a/lib/lepus/configuration.rb +++ b/lib/lepus/configuration.rb @@ -5,6 +5,7 @@ module Lepus class Configuration DEFAULT_RABBITMQ_URL = "amqp://guest:guest@localhost:5672" DEFAULT_RECOVERY_ATTEMPTS = 10 + DEFAULT_RECOVERY_INTERVAL = 5.0 DEFAULT_RECOVER_FROM_CONNECTION_CLOSE = true DEFAULT_CONSUMERS_DIRECTORY = Pathname.new("app/consumers") @@ -20,6 +21,9 @@ class Configuration # @return [Integer] max number of recovery attempts, nil means forever attr_accessor :recovery_attempts + # @return [Integer] the interval in seconds between network recovery attempts. + attr_accessor :recovery_interval + # @return [Pathname] the directory where the consumers are stored. attr_reader :consumers_directory @@ -40,6 +44,7 @@ def initialize @connection_name = "Lepus (#{Lepus::VERSION})" @rabbitmq_url = ENV.fetch("RABBITMQ_URL", DEFAULT_RABBITMQ_URL) || DEFAULT_RABBITMQ_URL @recovery_attempts = DEFAULT_RECOVERY_ATTEMPTS + @recovery_interval = DEFAULT_RECOVERY_INTERVAL @recover_from_connection_close = DEFAULT_RECOVER_FROM_CONNECTION_CLOSE @consumers_directory = DEFAULT_CONSUMERS_DIRECTORY @process_heartbeat_interval = 60 @@ -67,18 +72,16 @@ def connection_config connection_name: connection_name, recover_from_connection_close: recover_from_connection_close, recovery_attempts: recovery_attempts, + network_recovery_interval: recovery_interval, recovery_attempts_exhausted: recovery_attempts_exhausted }.compact end - # @return [Proc] that is passed to Bunny’s recovery_attempts_exhausted block. Nil if recovery_attempts is nil. + # @return [Proc, NilClass] Proc that is passed to Bunny’s recovery_attempts_exhausted block. def recovery_attempts_exhausted - return nil unless recovery_attempts + return unless recovery_attempts proc do - # We need to have this since Bunny’s multi-threading is cumbersome here. - # Session reconnection seems not to be done in the main thread. If we want to - # achieve a restart of the app we need to modify the thread behaviour. Thread.current.abort_on_exception = true raise Lepus::MaxRecoveryAttemptsExhaustedError end diff --git a/lib/lepus/consumer.rb b/lib/lepus/consumer.rb index 40521e8..e89c167 100644 --- a/lib/lepus/consumer.rb +++ b/lib/lepus/consumer.rb @@ -25,7 +25,7 @@ def config return if @abstract_class == true return @config if defined?(@config) - name = name.split("::").last.gsub(/([a-z\d])([A-Z])/, '\1_\2').downcase + name = Primitive::String.new(to_s).underscore.split("/").last @config = ConsumerConfig.new(queue: name, exchange: name) end @@ -43,13 +43,14 @@ def use(middleware, opts = {}) if middleware.is_a?(Symbol) || middleware.is_a?(String) begin require_relative "middlewares/#{middleware}" - class_name = middleware.to_s.split("_").map(&:capitalize).join + class_name = Primitive::String.new(middleware.to_s).classify class_name = "JSON" if class_name == "Json" middleware = Lepus::Middlewares.const_get(class_name) rescue LoadError, NameError raise ArgumentError, "Middleware #{middleware} not found" end end + middlewares << middleware.new(**opts) end @@ -103,6 +104,13 @@ def process_delivery(delivery_info, metadata, payload) nest_middleware(middleware, next_middleware) end .call(message) + rescue Lepus::InvalidConsumerReturnError + raise + rescue Exception => ex # rubocop:disable Lint/RescueException + # @TODO: add error handling + logger.error(ex) + + reject! end protected @@ -136,6 +144,14 @@ def requeue! end alias_method :requeue, :requeue! + # Helper method to nack a message. + # + # @return [:nack] + def nack! + :nack + end + alias_method :nack, :nack! + private def work_proc @@ -153,7 +169,7 @@ def nest_middleware(middleware, next_middleware) end def verify_result(result) - return if %i[ack reject requeue].include?(result) + return if %i[ack reject requeue nack].include?(result) raise InvalidConsumerReturnError, result end diff --git a/lib/lepus/consumer_config.rb b/lib/lepus/consumer_config.rb index d16c44b..500e6d8 100644 --- a/lib/lepus/consumer_config.rb +++ b/lib/lepus/consumer_config.rb @@ -60,9 +60,7 @@ def channel_args end def exchange_args - name = @exchange_opts[:name] - raise InvalidConsumerConfigError, "Exchange name is required" if name.nil? - [name, @exchange_opts.reject { |k, v| k == :name }] + [exchange_name, @exchange_opts.reject { |k, v| k == :name }] end def consumer_queue_args @@ -113,10 +111,12 @@ def binds_args protected + def exchange_name + @exchange_opts[:name] || raise(InvalidConsumerConfigError, "Exchange name is required") + end + def queue_name - name = @queue_opts[:name] - raise InvalidConsumerConfigError, "Queue name is required" if name.nil? - name + @queue_opts[:name] || raise(InvalidConsumerConfigError, "Queue name is required") end def retry_queue_name diff --git a/lib/lepus/consumer_wrapper.rb b/lib/lepus/consumer_wrapper.rb index 448ec77..102054e 100644 --- a/lib/lepus/consumer_wrapper.rb +++ b/lib/lepus/consumer_wrapper.rb @@ -36,6 +36,8 @@ def process_result(result, delivery_tag) channel.reject(delivery_tag) when :requeue channel.reject(delivery_tag, true) + when :nack + channel.nack(delivery_tag, false, true) else raise Lepus::InvalidConsumerReturnError, result end diff --git a/lib/lepus/lifecycle_hooks.rb b/lib/lepus/lifecycle_hooks.rb index 97ba94a..8cc1f28 100644 --- a/lib/lepus/lifecycle_hooks.rb +++ b/lib/lepus/lifecycle_hooks.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true module Lepus + # @TODO: Move after/before fork hooks to this module module LifecycleHooks def self.included(base) base.extend ClassMethods diff --git a/lib/lepus/primitive/string.rb b/lib/lepus/primitive/string.rb new file mode 100644 index 0000000..b4defac --- /dev/null +++ b/lib/lepus/primitive/string.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +begin + require "dry/inflector" +rescue LoadError + # noop +end + +begin + require "active_support/inflector" +rescue LoadError + # noop +end + +module Lepus::Primitive + class String < ::String + def classify + new_str = if defined?(Dry::Inflector) + Dry::Inflector.new.classify(self) + elsif defined?(ActiveSupport::Inflector) + ActiveSupport::Inflector.classify(self) + else + split("/").collect do |c| + c.split("_").collect(&:capitalize).join + end.join("::") + end + + self.class.new(new_str) + end + + def constantize + if defined?(Dry::Inflector) + Dry::Inflector.new.constantize(self) + elsif defined?(ActiveSupport::Inflector) + ActiveSupport::Inflector.constantize(self) + else + Object.const_get(self) + end + end + + def underscore + new_str = sub(/^::/, "") + .gsub("::", "/") + .gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2') + .gsub(/([a-z\d])([A-Z])/, '\1_\2') + .tr("-", "_") + .tr(".", "_") + .gsub(/\s/, "_") + .gsub(/__+/, "_") + .downcase + + self.class.new(new_str) + end + end +end diff --git a/lib/lepus/process.rb b/lib/lepus/process.rb index ed03e4d..b70e0cc 100644 --- a/lib/lepus/process.rb +++ b/lib/lepus/process.rb @@ -115,9 +115,6 @@ def deregister(pruned: false) end def prune - # error = Lepus::Processes::ProcessPrunedError.new(last_heartbeat_at) - # fail_all_claimed_executions_with(error) - deregister(pruned: true) end diff --git a/lib/lepus/processes/consumer.rb b/lib/lepus/processes/consumer.rb index 7eb44b9..fcd37f9 100644 --- a/lib/lepus/processes/consumer.rb +++ b/lib/lepus/processes/consumer.rb @@ -8,7 +8,8 @@ class Consumer < Base def initialize(class_name:, **options) @consumer_class = class_name - @consumer_class = @consumer_class.constantize if @consumer_class.is_a?(String) + @consumer_class = Lepus::Primitive::String.new(@consumer_class).constantize if @consumer_class.is_a?(String) + super(**options) end @@ -51,7 +52,7 @@ def run end def shutdown - @consumers.to_a.each(&:cancel) + @subscriptions.to_a.each(&:cancel) @channel&.close @bunny&.close @@ -70,8 +71,10 @@ def setup_consumer! @bunny = Thread.current[:lepus_bunny] || Lepus.config.create_connection @channel = Thread.current[:lepus_channel] || begin @bunny.create_channel(nil, 1, true).tap do |channel| - channel.prefetch(1) - channel.on_uncaught_exception { |error| handle_thread_error(error) } + channel.prefetch(1) # @TODO make this configurable + channel.on_uncaught_exception { |error| + handle_thread_error(error) + } end end @@ -83,7 +86,7 @@ def setup_consumer! @error_queue = @channel.queue(*args) end - @consumers = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future, need to test how this works + @subscriptions = Array.new((_threads = 1)) do |n| # may add multiple consumers in the future main_queue = @channel.queue(*consumer_class.config.consumer_queue_args) consumer_class.config.binds_args.each do |opts| main_queue.bind(@exchange, **opts) @@ -101,9 +104,10 @@ def setup_consumer! end main_queue.subscribe_with(consumer_wrapper) end - # rescue Lepus::InvalidConsumerConfigError => e - # # shutdown if the consumer config is invalid - # raise e + rescue Bunny::TCPConnectionFailed, Bunny::PossibleAuthenticationFailureError + raise Lepus::ShutdownError + rescue Lepus::InvalidConsumerConfigError + raise Lepus::ShutdownError end end end diff --git a/lib/lepus/processes/procline.rb b/lib/lepus/processes/procline.rb index baa4a4f..f62034f 100644 --- a/lib/lepus/processes/procline.rb +++ b/lib/lepus/processes/procline.rb @@ -3,9 +3,9 @@ module Lepus::Processes module Procline # Sets the procline ($0) - # lepus-supervisor(0.1.0): + # [lepus-supervisor: ] def procline(string) - $0 = "lepus-#{self.class.name.split("::").last.downcase}: #{string}" + $0 = "[lepus-#{kind.downcase}: #{string}]" end end end diff --git a/lib/lepus/processes/registrable.rb b/lib/lepus/processes/registrable.rb index ea32ffb..0efcf9a 100644 --- a/lib/lepus/processes/registrable.rb +++ b/lib/lepus/processes/registrable.rb @@ -45,7 +45,7 @@ def launch_heartbeat wrap_in_app_executor { heartbeat } end - @heartbeat_task.add_observer do |_, _, error| + @heartbeat_task.add_observer do |_time, _result, error| handle_thread_error(error) if error end diff --git a/lib/lepus/processes/runnable.rb b/lib/lepus/processes/runnable.rb index b53e65b..30d174d 100644 --- a/lib/lepus/processes/runnable.rb +++ b/lib/lepus/processes/runnable.rb @@ -55,20 +55,21 @@ def boot end def shutting_down? - stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered? + stopped? || (running_as_fork? && supervisor_went_away?) || !registered? # || finished? end def run raise NotImplementedError end - def finished? - running_inline? && all_work_completed? - end + # @TODO Add it to the inline mode + # def finished? + # running_inline? && all_work_completed? + # end - def all_work_completed? - false - end + # def all_work_completed? + # false + # end def shutdown end @@ -76,9 +77,9 @@ def shutdown def set_procline end - def running_inline? - mode.inline? - end + # def running_inline? + # mode.inline? + # end def running_async? mode.async? diff --git a/lib/lepus/supervisor.rb b/lib/lepus/supervisor.rb index 4d70f5e..811e07e 100644 --- a/lib/lepus/supervisor.rb +++ b/lib/lepus/supervisor.rb @@ -125,6 +125,7 @@ def terminate_gracefully term_forks shutdown_timeout = 5 + puts "\nWaiting up to #{shutdown_timeout} seconds for processes to terminate gracefully..." Timer.wait_until(shutdown_timeout, -> { all_forks_terminated? }) do reap_terminated_forks end @@ -177,13 +178,9 @@ def reap_and_replace_terminated_forks def reap_terminated_forks loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + pid, _ = ::Process.waitpid2(-1, ::Process::WNOHANG) break unless pid - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end - configured_processes.delete(pid) end rescue SystemCallError @@ -194,20 +191,12 @@ def replace_fork(pid, status) Lepus.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| if (terminated_fork = forks.delete(pid)) payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) start_process(configured_processes.delete(pid)) end end end - def handle_claimed_jobs_by(terminated_fork, status) - # if registered_process = process.supervisees.find_by(name: terminated_fork.name) - # error = Processes::ProcessExitError.new(status) - # registered_process.fail_all_claimed_executions_with(error) - # end - end - def all_forks_terminated? forks.empty? end diff --git a/lib/lepus/supervisor/config.rb b/lib/lepus/supervisor/config.rb index c6b36e5..1c7e417 100644 --- a/lib/lepus/supervisor/config.rb +++ b/lib/lepus/supervisor/config.rb @@ -6,7 +6,7 @@ module Lepus class Supervisor < Processes::Base class Config - class Process < Struct.new(:process_class, :attributes) + class ProcessStruct < Struct.new(:process_class, :attributes) def instantiate process_class.new(**attributes) end @@ -37,7 +37,7 @@ def consumers def consumer_processes @consumer_processes ||= consumers.map do |class_name| - Process.new(Lepus::Processes::Consumer, {class_name: class_name}) + ProcessStruct.new(Lepus::Processes::Consumer, {class_name: class_name}) end end end diff --git a/lib/lepus/supervisor/maintenance.rb b/lib/lepus/supervisor/maintenance.rb index 6ea3540..b5ed1d9 100644 --- a/lib/lepus/supervisor/maintenance.rb +++ b/lib/lepus/supervisor/maintenance.rb @@ -3,9 +3,6 @@ class Supervisor < Processes::Base module Maintenance def self.included(base) base.send :include, InstanceMethods - base.class_eval do - after_boot :fail_orphaned_executions - end end module InstanceMethods @@ -32,12 +29,6 @@ def prune_dead_processes Lepus::Process.prune(excluding: process) end end - - def fail_orphaned_executions - wrap_in_app_executor do - # ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) - end - end end end end diff --git a/lib/lepus/version.rb b/lib/lepus/version.rb index 787f1ab..c87b98d 100644 --- a/lib/lepus/version.rb +++ b/lib/lepus/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Lepus - VERSION = "0.0.1.beta1" + VERSION = "0.0.1.beta2" end diff --git a/spec/lepus/consumer_spec.rb b/spec/lepus/consumer_spec.rb index 5c7a8c3..3495c4b 100644 --- a/spec/lepus/consumer_spec.rb +++ b/spec/lepus/consumer_spec.rb @@ -116,6 +116,14 @@ ).to be :requeue end + it "allows returning :nack" do + allow(instance).to receive(:perform).and_return(:nack) + + expect( + instance.process_delivery(delivery_info, metadata, payload) + ).to be :nack + end + it "raises an error if #perform does not return a valid symbol" do allow(instance).to receive(:perform).and_return(:blorg) @@ -170,6 +178,20 @@ def perform(_message) end end + describe "#nack" do + let(:instance) do + Class.new(Lepus::Consumer) do + def perform(_message) + nack + end + end.new + end + + it "returns :nack when called in #perform" do + expect(instance.perform(message)).to eq(:nack) + end + end + describe ".use" do let(:instance) do Class.new(Lepus::Consumer) do diff --git a/spec/lepus/primitive/string_spec.rb b/spec/lepus/primitive/string_spec.rb new file mode 100644 index 0000000..896c17a --- /dev/null +++ b/spec/lepus/primitive/string_spec.rb @@ -0,0 +1,129 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Lepus::Primitive::String do + describe "#classify" do + context "when dry-inflector is available" do + before do + stub_const("Dry::Inflector", Class.new { + def classify(string) + "Classified" + end + }) + end + + it "returns the classified string" do + expect(described_class.new("string").classify).to eq("Classified") + end + end + + context "when active-support is available" do + before do + stub_const("ActiveSupport::Inflector", Class.new { + def self.classify(string) + "Classified" + end + }) + end + + it "returns the classified string" do + expect(described_class.new("string").classify).to eq("Classified") + end + end + + context "when no inflector is available" do + it "returns the classified string" do + expect(described_class.new("my_string").classify).to eq("MyString") + end + end + end + + describe "#constantize" do + let(:constant) { Class.new } + + before do + stub_const("MyConstant", constant) + end + + context "when dry-inflector is available" do + before do + stub_const("Dry::Inflector", Class.new { + def constantize(string) + MyConstant if string == "MyConstant" + end + }) + end + + it "returns the constantized string" do + expect(described_class.new("MyConstant").constantize).to eq(constant) + end + end + + context "when active-support is available" do + before do + stub_const("ActiveSupport::Inflector", Class.new { + def self.constantize(string) + MyConstant if string == "MyConstant" + end + }) + end + + it "returns the constantized string" do + expect(described_class.new("MyConstant").constantize).to eq(constant) + end + end + end + + describe "#underscore" do + subject { described_class.new(arg).underscore } + + context "with capitalized string" do + let(:arg) { "User" } + + it { is_expected.to eq("user") } + end + + context "with camelized string" do + let(:arg) { "UserName" } + + it { is_expected.to eq("user_name") } + end + + context "with parameterized string" do + let(:arg) { "foo-bar" } + + it { is_expected.to eq("foo_bar") } + end + + context "with camelized string under a namespace" do + let(:arg) { "Apiv2::UserName" } + + it { is_expected.to eq("apiv2/user_name") } + end + + context "with camelized string with a root namespace" do + let(:arg) { "::UserName" } + + it { is_expected.to eq("user_name") } + end + + context "with a dot in the string" do + let(:arg) { "user.name" } + + it { is_expected.to eq("user_name") } + end + + context "with a space in the string" do + let(:arg) { "user name" } + + it { is_expected.to eq("user_name") } + end + + context "with multiple underscores in the string" do + let(:arg) { "user_______name" } + + it { is_expected.to eq("user_name") } + end + end +end