diff --git a/Gemfile.lock b/Gemfile.lock index eabaffa..9695737 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -102,6 +102,7 @@ GEM zeitwerk (2.6.18) PLATFORMS + ruby x86_64-linux DEPENDENCIES diff --git a/gemfiles/rails52.gemfile.lock b/gemfiles/rails52.gemfile.lock index 2712b0a..f0e942f 100644 --- a/gemfiles/rails52.gemfile.lock +++ b/gemfiles/rails52.gemfile.lock @@ -5,6 +5,7 @@ PATH bunny concurrent-ruby multi_json + thor zeitwerk GEM @@ -90,6 +91,7 @@ GEM marcel (1.0.4) method_source (1.1.0) mini_mime (1.1.5) + mini_portile2 (2.8.8) minitest (5.25.1) multi_json (1.15.0) net-imap (0.4.17) @@ -102,6 +104,9 @@ GEM net-smtp (0.5.0) net-protocol nio4r (2.7.4) + nokogiri (1.15.6) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) nokogiri (1.15.6-x86_64-linux) racc (~> 1.4) parallel (1.26.3) @@ -218,6 +223,7 @@ GEM zeitwerk (2.6.18) PLATFORMS + ruby x86_64-linux DEPENDENCIES diff --git a/gemfiles/rails61.gemfile.lock b/gemfiles/rails61.gemfile.lock index 3fa63a5..98f9a34 100644 --- a/gemfiles/rails61.gemfile.lock +++ b/gemfiles/rails61.gemfile.lock @@ -5,6 +5,7 @@ PATH bunny concurrent-ruby multi_json + thor zeitwerk GEM @@ -107,6 +108,7 @@ GEM marcel (1.0.4) method_source (1.1.0) mini_mime (1.1.5) + mini_portile2 (2.8.8) minitest (5.25.1) multi_json (1.15.0) net-imap (0.4.17) @@ -119,6 +121,9 @@ GEM net-smtp (0.5.0) net-protocol nio4r (2.7.4) + nokogiri (1.15.6) + mini_portile2 (~> 2.8.2) + racc (~> 1.4) nokogiri (1.15.6-x86_64-linux) racc (~> 1.4) parallel (1.26.3) @@ -236,6 +241,7 @@ GEM zeitwerk (2.6.18) PLATFORMS + ruby x86_64-linux DEPENDENCIES diff --git a/lib/lepus/processes/callbacks.rb b/lib/lepus/processes/callbacks.rb index a3259aa..a7038d1 100644 --- a/lib/lepus/processes/callbacks.rb +++ b/lib/lepus/processes/callbacks.rb @@ -12,10 +12,14 @@ def run_process_callbacks(name) self.class.send(:"before_#{name}_callbacks").each do |method| send(method) end - yield if block_given? + + result = yield if block_given? + self.class.send(:"after_#{name}_callbacks").each do |method| send(method) end + + result end end @@ -25,6 +29,7 @@ def inherited(base) base.instance_variable_set(:@after_boot_callbacks, after_boot_callbacks.dup) base.instance_variable_set(:@before_shutdown_callbacks, before_shutdown_callbacks.dup) base.instance_variable_set(:@after_shutdown_callbacks, after_shutdown_callbacks.dup) + super end def before_boot(*methods) diff --git a/lib/lepus/processes/consumer.rb b/lib/lepus/processes/consumer.rb index 2d5eea1..e6b6316 100644 --- a/lib/lepus/processes/consumer.rb +++ b/lib/lepus/processes/consumer.rb @@ -77,8 +77,9 @@ def setup_consumer! main_queue.bind(@exchange, **opts) end + consumer_instance = consumer_class.new consumer_wrapper = Lepus::ConsumerWrapper.new( - consumer_class.new, + consumer_instance, main_queue.channel, main_queue, "#{consumer_class.name}-#{n + 1}" diff --git a/lib/lepus/producer.rb b/lib/lepus/producer.rb index 11bdcae..ce493ed 100644 --- a/lib/lepus/producer.rb +++ b/lib/lepus/producer.rb @@ -9,7 +9,7 @@ class Producer }.freeze DEFAULT_PUBLISH_OPTIONS = { - expiration: 7 * (60 * 60 * 24), + expiration: 7 * (60 * 60 * 24) }.freeze def initialize(exchange_name, **options) diff --git a/lib/lepus/supervisor.rb b/lib/lepus/supervisor.rb index 6f3161c..e85cb60 100644 --- a/lib/lepus/supervisor.rb +++ b/lib/lepus/supervisor.rb @@ -79,7 +79,9 @@ def check_bunny_connection end def start_processes - configuration.configured_processes.each { |configured_process| start_process(configured_process) } + configuration.configured_processes.each do |configured_process| + start_process(configured_process) + end end def supervise @@ -104,10 +106,18 @@ def start_process(configured_process) instance.mode = :fork end + if process_instance.respond_to?(:before_fork) + process_instance.send(:before_fork) + end + pid = fork do process_instance.start end + if process_instance.respond_to?(:after_fork) + process_instance.send(:after_fork) + end + configured_processes[pid] = configured_process forks[pid] = process_instance end diff --git a/spec/lepus/producer_spec.rb b/spec/lepus/producer_spec.rb index 8806cfa..e3ccce9 100644 --- a/spec/lepus/producer_spec.rb +++ b/spec/lepus/producer_spec.rb @@ -44,15 +44,15 @@ end describe "#publish" do - let(:options) { { expiration: 60 } } + let(:options) { {expiration: 60} } context "when the message is different than String" do - let(:message) { { key: "value" } } + let(:message) { {key: "value"} } it "publishes the message to the exchange as JSON" do - bunny = instance_double("Bunny::Session") - channel = instance_double("Bunny::Channel") - exchange = instance_double("Bunny::Exchange") + bunny = instance_double(Bunny::Session) + channel = instance_double(Bunny::Channel) + exchange = instance_double(Bunny::Exchange) allow(producer).to receive(:bunny).and_return(bunny) expect(bunny).to receive(:with_channel).and_yield(channel) @@ -63,8 +63,7 @@ expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS) expect(exchange).to have_received(:publish).with(MultiJson.dump(message), - described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json") - ) + described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json")) end end @@ -72,9 +71,9 @@ let(:message) { "test message" } it "publishes the message to the exchange as text" do - bunny = instance_double("Bunny::Session") - channel = instance_double("Bunny::Channel") - exchange = instance_double("Bunny::Exchange") + bunny = instance_double(Bunny::Session) + channel = instance_double(Bunny::Channel) + exchange = instance_double(Bunny::Exchange) allow(producer).to receive(:bunny).and_return(bunny) expect(bunny).to receive(:with_channel).and_yield(channel) @@ -85,8 +84,7 @@ expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS) expect(exchange).to have_received(:publish).with(message, - described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain") - ) + described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain")) end end end