Skip to content

Commit

Permalink
add after and before for to the consumer class
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Jan 21, 2025
1 parent 48c04e8 commit ac9be46
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 16 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ GEM
zeitwerk (2.6.18)

PLATFORMS
ruby
x86_64-linux

DEPENDENCIES
Expand Down
6 changes: 6 additions & 0 deletions gemfiles/rails52.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PATH
bunny
concurrent-ruby
multi_json
thor
zeitwerk

GEM
Expand Down Expand Up @@ -90,6 +91,7 @@ GEM
marcel (1.0.4)
method_source (1.1.0)
mini_mime (1.1.5)
mini_portile2 (2.8.8)
minitest (5.25.1)
multi_json (1.15.0)
net-imap (0.4.17)
Expand All @@ -102,6 +104,9 @@ GEM
net-smtp (0.5.0)
net-protocol
nio4r (2.7.4)
nokogiri (1.15.6)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
nokogiri (1.15.6-x86_64-linux)
racc (~> 1.4)
parallel (1.26.3)
Expand Down Expand Up @@ -218,6 +223,7 @@ GEM
zeitwerk (2.6.18)

PLATFORMS
ruby
x86_64-linux

DEPENDENCIES
Expand Down
6 changes: 6 additions & 0 deletions gemfiles/rails61.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PATH
bunny
concurrent-ruby
multi_json
thor
zeitwerk

GEM
Expand Down Expand Up @@ -107,6 +108,7 @@ GEM
marcel (1.0.4)
method_source (1.1.0)
mini_mime (1.1.5)
mini_portile2 (2.8.8)
minitest (5.25.1)
multi_json (1.15.0)
net-imap (0.4.17)
Expand All @@ -119,6 +121,9 @@ GEM
net-smtp (0.5.0)
net-protocol
nio4r (2.7.4)
nokogiri (1.15.6)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
nokogiri (1.15.6-x86_64-linux)
racc (~> 1.4)
parallel (1.26.3)
Expand Down Expand Up @@ -236,6 +241,7 @@ GEM
zeitwerk (2.6.18)

PLATFORMS
ruby
x86_64-linux

DEPENDENCIES
Expand Down
7 changes: 6 additions & 1 deletion lib/lepus/processes/callbacks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ def run_process_callbacks(name)
self.class.send(:"before_#{name}_callbacks").each do |method|
send(method)
end
yield if block_given?

result = yield if block_given?

self.class.send(:"after_#{name}_callbacks").each do |method|
send(method)
end

result
end
end

Expand All @@ -25,6 +29,7 @@ def inherited(base)
base.instance_variable_set(:@after_boot_callbacks, after_boot_callbacks.dup)
base.instance_variable_set(:@before_shutdown_callbacks, before_shutdown_callbacks.dup)
base.instance_variable_set(:@after_shutdown_callbacks, after_shutdown_callbacks.dup)
super
end

def before_boot(*methods)
Expand Down
15 changes: 14 additions & 1 deletion lib/lepus/processes/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ def metadata
super.merge(consumer_class: consumer_class.to_s)
end

def before_fork
return unless @consumer_class.respond_to?(:before_fork, true)

@consumer_class.send(:before_fork)
end

def after_fork
return unless @consumer_class.respond_to?(:after_fork, true)

@consumer_class.send(:after_fork)
end

private

SLEEP_INTERVAL = 5
Expand Down Expand Up @@ -77,8 +89,9 @@ def setup_consumer!
main_queue.bind(@exchange, **opts)
end

consumer_instance = consumer_class.new
consumer_wrapper = Lepus::ConsumerWrapper.new(
consumer_class.new,
consumer_instance,
main_queue.channel,
main_queue,
"#{consumer_class.name}-#{n + 1}"
Expand Down
2 changes: 1 addition & 1 deletion lib/lepus/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Producer
}.freeze

DEFAULT_PUBLISH_OPTIONS = {
expiration: 7 * (60 * 60 * 24),
expiration: 7 * (60 * 60 * 24)
}.freeze

def initialize(exchange_name, **options)
Expand Down
6 changes: 5 additions & 1 deletion lib/lepus/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def check_bunny_connection
end

def start_processes
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
configuration.configured_processes.each do |configured_process|
start_process(configured_process)
end
end

def supervise
Expand All @@ -104,7 +106,9 @@ def start_process(configured_process)
instance.mode = :fork
end

process_instance.before_fork
pid = fork do
process_instance.after_fork
process_instance.start
end

Expand Down
22 changes: 10 additions & 12 deletions spec/lepus/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
end

describe "#publish" do
let(:options) { { expiration: 60 } }
let(:options) { {expiration: 60} }

context "when the message is different than String" do
let(:message) { { key: "value" } }
let(:message) { {key: "value"} }

it "publishes the message to the exchange as JSON" do
bunny = instance_double("Bunny::Session")
channel = instance_double("Bunny::Channel")
exchange = instance_double("Bunny::Exchange")
bunny = instance_double(Bunny::Session)
channel = instance_double(Bunny::Channel)
exchange = instance_double(Bunny::Exchange)

allow(producer).to receive(:bunny).and_return(bunny)
expect(bunny).to receive(:with_channel).and_yield(channel)
Expand All @@ -63,18 +63,17 @@

expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS)
expect(exchange).to have_received(:publish).with(MultiJson.dump(message),
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json")
)
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json"))
end
end

context "when the message is a String" do
let(:message) { "test message" }

it "publishes the message to the exchange as text" do
bunny = instance_double("Bunny::Session")
channel = instance_double("Bunny::Channel")
exchange = instance_double("Bunny::Exchange")
bunny = instance_double(Bunny::Session)
channel = instance_double(Bunny::Channel)
exchange = instance_double(Bunny::Exchange)

allow(producer).to receive(:bunny).and_return(bunny)
expect(bunny).to receive(:with_channel).and_yield(channel)
Expand All @@ -85,8 +84,7 @@

expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS)
expect(exchange).to have_received(:publish).with(message,
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain")
)
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain"))
end
end
end
Expand Down

0 comments on commit ac9be46

Please sign in to comment.