Skip to content

Commit

Permalink
feat: add simple producer
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Nov 19, 2024
1 parent 3277e3f commit cf6c933
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 24 deletions.
42 changes: 42 additions & 0 deletions lib/lepus/producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module Lepus
class Producer
DEFAULT_EXCHANGE_OPTIONS = {
type: :topic,
durable: true,
auto_delete: false
}.freeze

DEFAULT_PUBLISH_OPTIONS = {
expiration: 7 * (60 * 60 * 24),
}.freeze

def initialize(exchange_name, **options)
@exchange_name = exchange_name
@exchange_options = DEFAULT_EXCHANGE_OPTIONS.merge(options)
end

def publish(message, **options)
payload = if message.is_a?(String)
options[:content_type] ||= "text/plain"
message
else
options[:content_type] ||= "application/json"
MultiJson.dump(message)
end

bunny.with_channel do |channel|
exchange = channel.exchange(@exchange_name, @exchange_options)
exchange.publish(
payload,
DEFAULT_PUBLISH_OPTIONS.merge(options)
)
end
end

def bunny
Thread.current[:lepus_bunny] ||= Lepus.config.create_connection(suffix: "producer")
end
end
end
24 changes: 0 additions & 24 deletions lib/lepus/publisher.rb

This file was deleted.

93 changes: 93 additions & 0 deletions spec/lepus/producer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe Lepus::Producer do
let(:exchange_name) { "test_exchange" }
let(:producer) { described_class.new(exchange_name) }

describe "#initialize" do
it "sets the exchange name" do
expect(producer.instance_variable_get(:@exchange_name)).to eq(exchange_name)
end

it "sets the exchange options" do
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS
)

producer = described_class.new(exchange_name, type: :direct, durable: false, auto_delete: true)
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct, durable: false, auto_delete: true)
)

producer = described_class.new(exchange_name, type: :direct)
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct)
)

producer = described_class.new(exchange_name, durable: false)
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS.merge(durable: false)
)

producer = described_class.new(exchange_name, auto_delete: true)
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS.merge(auto_delete: true)
)

producer = described_class.new(exchange_name, type: :direct, durable: false)
expect(producer.instance_variable_get(:@exchange_options)).to eq(
described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct, durable: false)
)
end
end

describe "#publish" do
let(:options) { { expiration: 60 } }

context "when the message is different than String" do
let(:message) { { key: "value" } }

it "publishes the message to the exchange as JSON" do
bunny = instance_double("Bunny::Session")
channel = instance_double("Bunny::Channel")
exchange = instance_double("Bunny::Exchange")

allow(producer).to receive(:bunny).and_return(bunny)
expect(bunny).to receive(:with_channel).and_yield(channel)
allow(channel).to receive(:exchange).and_return(exchange)
allow(exchange).to receive(:publish)

producer.publish(message, **options)

expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS)
expect(exchange).to have_received(:publish).with(MultiJson.dump(message),
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json")
)
end
end

context "when the message is a String" do
let(:message) { "test message" }

it "publishes the message to the exchange as text" do
bunny = instance_double("Bunny::Session")
channel = instance_double("Bunny::Channel")
exchange = instance_double("Bunny::Exchange")

allow(producer).to receive(:bunny).and_return(bunny)
expect(bunny).to receive(:with_channel).and_yield(channel)
allow(channel).to receive(:exchange).and_return(exchange)
allow(exchange).to receive(:publish)

producer.publish(message, **options)

expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS)
expect(exchange).to have_received(:publish).with(message,
described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain")
)
end
end
end
end

0 comments on commit cf6c933

Please sign in to comment.