diff --git a/lib/lepus/producer.rb b/lib/lepus/producer.rb new file mode 100644 index 0000000..11bdcae --- /dev/null +++ b/lib/lepus/producer.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Lepus + class Producer + DEFAULT_EXCHANGE_OPTIONS = { + type: :topic, + durable: true, + auto_delete: false + }.freeze + + DEFAULT_PUBLISH_OPTIONS = { + expiration: 7 * (60 * 60 * 24), + }.freeze + + def initialize(exchange_name, **options) + @exchange_name = exchange_name + @exchange_options = DEFAULT_EXCHANGE_OPTIONS.merge(options) + end + + def publish(message, **options) + payload = if message.is_a?(String) + options[:content_type] ||= "text/plain" + message + else + options[:content_type] ||= "application/json" + MultiJson.dump(message) + end + + bunny.with_channel do |channel| + exchange = channel.exchange(@exchange_name, @exchange_options) + exchange.publish( + payload, + DEFAULT_PUBLISH_OPTIONS.merge(options) + ) + end + end + + def bunny + Thread.current[:lepus_bunny] ||= Lepus.config.create_connection(suffix: "producer") + end + end +end diff --git a/lib/lepus/publisher.rb b/lib/lepus/publisher.rb deleted file mode 100644 index cafeedc..0000000 --- a/lib/lepus/publisher.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -module Lepus - class Publisher - DEFAULT_EXCHANGE_OPTIONS = {type: :topic, durable: true, auto_delete: false}.freeze - DEFAULT_PUBLISH_OPTIONS = {expiration: 7 * (60 * 60 * 24), content_type: "application/json"}.freeze - - def initialize(exchange_name, **options) - @exchange_name = "pipeline.#{exchange_name}" - @exchange_options = DEFAULT_EXCHANGE_OPTIONS.merge(options) - end - - def publish(message, **options) - bunny.with_channel do |channel| - exchange = channel.exchange(@exchange_name, @exchange_options) - exchange.publish(message.to_json, DEFAULT_PUBLISH_OPTIONS.merge(options)) - end - end - - def bunny - Thread.current[:lepus_bunny] ||= Lepus.config.create_connection(suffix: "publisher") - end - end -end diff --git a/spec/lepus/producer_spec.rb b/spec/lepus/producer_spec.rb new file mode 100644 index 0000000..8806cfa --- /dev/null +++ b/spec/lepus/producer_spec.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Lepus::Producer do + let(:exchange_name) { "test_exchange" } + let(:producer) { described_class.new(exchange_name) } + + describe "#initialize" do + it "sets the exchange name" do + expect(producer.instance_variable_get(:@exchange_name)).to eq(exchange_name) + end + + it "sets the exchange options" do + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS + ) + + producer = described_class.new(exchange_name, type: :direct, durable: false, auto_delete: true) + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct, durable: false, auto_delete: true) + ) + + producer = described_class.new(exchange_name, type: :direct) + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct) + ) + + producer = described_class.new(exchange_name, durable: false) + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS.merge(durable: false) + ) + + producer = described_class.new(exchange_name, auto_delete: true) + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS.merge(auto_delete: true) + ) + + producer = described_class.new(exchange_name, type: :direct, durable: false) + expect(producer.instance_variable_get(:@exchange_options)).to eq( + described_class::DEFAULT_EXCHANGE_OPTIONS.merge(type: :direct, durable: false) + ) + end + end + + describe "#publish" do + let(:options) { { expiration: 60 } } + + context "when the message is different than String" do + let(:message) { { key: "value" } } + + it "publishes the message to the exchange as JSON" do + bunny = instance_double("Bunny::Session") + channel = instance_double("Bunny::Channel") + exchange = instance_double("Bunny::Exchange") + + allow(producer).to receive(:bunny).and_return(bunny) + expect(bunny).to receive(:with_channel).and_yield(channel) + allow(channel).to receive(:exchange).and_return(exchange) + allow(exchange).to receive(:publish) + + producer.publish(message, **options) + + expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS) + expect(exchange).to have_received(:publish).with(MultiJson.dump(message), + described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "application/json") + ) + end + end + + context "when the message is a String" do + let(:message) { "test message" } + + it "publishes the message to the exchange as text" do + bunny = instance_double("Bunny::Session") + channel = instance_double("Bunny::Channel") + exchange = instance_double("Bunny::Exchange") + + allow(producer).to receive(:bunny).and_return(bunny) + expect(bunny).to receive(:with_channel).and_yield(channel) + allow(channel).to receive(:exchange).and_return(exchange) + allow(exchange).to receive(:publish) + + producer.publish(message, **options) + + expect(channel).to have_received(:exchange).with(exchange_name, described_class::DEFAULT_EXCHANGE_OPTIONS) + expect(exchange).to have_received(:publish).with(message, + described_class::DEFAULT_PUBLISH_OPTIONS.merge(options).merge(content_type: "text/plain") + ) + end + end + end +end