Skip to content

Commit

Permalink
added builders for better ergonomics
Browse files Browse the repository at this point in the history
  • Loading branch information
gagath committed Apr 2, 2018
1 parent 234dbdb commit 798096f
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 3 deletions.
9 changes: 6 additions & 3 deletions examples/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ extern crate amqp;
extern crate env_logger;

use amqp::{Session, Options, Table, Basic, protocol, Channel};
use amqp::QueueBuilder;
use amqp::ConsumeBuilder;
use amqp::TableEntry::LongString;
use amqp::protocol::basic;
use std::default::Default;
Expand Down Expand Up @@ -45,15 +47,16 @@ fn main() {
println!("Openned channel: {:?}", channel.id);

let queue_name = "test_queue";
//queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table
let queue_declare = channel.queue_declare(queue_name, false, true, false, false, false, Table::new());
let queue_builder = QueueBuilder::named(queue_name).durable();
let queue_declare = queue_builder.declare(&mut channel);

println!("Queue declare: {:?}", queue_declare);
channel.basic_prefetch(10).ok().expect("Failed to prefetch");
//consumer, queue: &str, consumer_tag: &str, no_local: bool, no_ack: bool, exclusive: bool, nowait: bool, arguments: Table
println!("Declaring consumers...");

let consumer_name = channel.basic_consume(consumer_function, queue_name, "", false, false, false, false, Table::new());
let consume_builder = ConsumeBuilder::new(consumer_function, queue_name);
let consumer_name = consume_builder.basic_consume(&mut channel);
println!("Starting consumer {:?}", consumer_name);

let my_consumer = MyConsumer { deliveries_number: 0 };
Expand Down
142 changes: 142 additions & 0 deletions src/builders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use basic::Basic;
use amqp_error::AMQPResult;
use amq_proto::Table;
use channel::{Channel, Consumer};
use protocol;

pub struct ConsumeBuilder<T>
where T: Consumer + 'static
{
_callback: T,
_queue_name: String,
_tag: String,
_no_local: bool,
_no_ack: bool,
_exclusive: bool,
_nowait: bool,
_arguments: Table,
}

impl <T: Consumer + 'static> ConsumeBuilder<T>
{
pub fn new<S>(callback: T, name: S) -> ConsumeBuilder<T>
where S: Into<String>
{
ConsumeBuilder {
_callback: callback,
_queue_name: name.into(),
_tag: "".into(),
_no_local: false,
_no_ack: false,
_exclusive: false,
_nowait: false,
_arguments: Table::new(),
}
}

pub fn tag<S>(mut self, tag: S) -> ConsumeBuilder<T>
where S: Into<String>
{
self._tag = tag.into();
self
}

pub fn no_local(mut self) -> ConsumeBuilder<T> {
self._no_local = true;
self
}

pub fn no_ack(mut self) -> ConsumeBuilder<T> {
self._no_ack = true;
self
}

pub fn exclusive(mut self) -> ConsumeBuilder<T> {
self._exclusive = true;
self
}

pub fn nowait(mut self) -> ConsumeBuilder<T> {
self._nowait = true;
self
}

pub fn basic_consume(self, channel: &mut Channel) -> AMQPResult<String> {
channel.basic_consume(
self._callback,
self._queue_name,
self._tag,
self._no_local,
self._no_ack,
self._exclusive,
self._nowait,
self._arguments)
}
}

pub struct QueueBuilder {
_name: String,
_passive: bool,
_durable: bool,
_exclusive: bool,
_auto_delete: bool,
_nowait: bool,
_arguments: Table,
}

impl QueueBuilder {
pub fn named<S>(name: S) -> QueueBuilder
where S: Into<String> {
QueueBuilder {
_name: name.into(),
_passive: false,
_durable: false,
_exclusive: false,
_auto_delete: false,
_nowait: false,
_arguments: Table::new()
}
}

pub fn passive(mut self) -> QueueBuilder {
self._passive = true;
self
}

pub fn durable(mut self) -> QueueBuilder {
self._durable = true;
self
}

pub fn exclusive(mut self) -> QueueBuilder {
self._exclusive = true;
self
}

pub fn auto_delete(mut self) -> QueueBuilder {
self._auto_delete = true;
self
}

pub fn nowait(mut self) -> QueueBuilder {
self._nowait = true;
self
}

pub fn arguments(mut self, table: Table) -> QueueBuilder {
self._arguments = table;
self
}

pub fn declare(&self, channel: &mut Channel) ->
AMQPResult<protocol::queue::DeclareOk> {
channel.queue_declare(
self._name.clone(),
self._passive,
self._durable,
self._exclusive,
self._auto_delete,
self._nowait,
self._arguments.clone())
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mod channel;
mod session;
mod basic;
mod amqp_error;
mod builders;

pub const VERSION: &'static str = env!("CARGO_PKG_VERSION");

Expand All @@ -103,3 +104,4 @@ pub use basic::{Basic, GetResult};
pub use session::AMQPScheme;
pub use amqp_error::AMQPError;
pub use amq_proto::{protocol, Table, TableEntry};
pub use builders::{QueueBuilder, ConsumeBuilder};

0 comments on commit 798096f

Please sign in to comment.