diff --git a/examples/consumer.rs b/examples/consumer.rs index b9bce60..677d7c5 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -2,6 +2,8 @@ extern crate amqp; extern crate env_logger; use amqp::{Session, Options, Table, Basic, protocol, Channel}; +use amqp::QueueBuilder; +use amqp::ConsumeBuilder; use amqp::TableEntry::LongString; use amqp::protocol::basic; use std::default::Default; @@ -45,15 +47,16 @@ fn main() { println!("Openned channel: {:?}", channel.id); let queue_name = "test_queue"; - //queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table - let queue_declare = channel.queue_declare(queue_name, false, true, false, false, false, Table::new()); + let queue_builder = QueueBuilder::named(queue_name).durable(); + let queue_declare = queue_builder.declare(&mut channel); println!("Queue declare: {:?}", queue_declare); channel.basic_prefetch(10).ok().expect("Failed to prefetch"); //consumer, queue: &str, consumer_tag: &str, no_local: bool, no_ack: bool, exclusive: bool, nowait: bool, arguments: Table println!("Declaring consumers..."); - let consumer_name = channel.basic_consume(consumer_function, queue_name, "", false, false, false, false, Table::new()); + let consume_builder = ConsumeBuilder::new(consumer_function, queue_name); + let consumer_name = consume_builder.basic_consume(&mut channel); println!("Starting consumer {:?}", consumer_name); let my_consumer = MyConsumer { deliveries_number: 0 }; diff --git a/src/builders.rs b/src/builders.rs new file mode 100644 index 0000000..c6b21bb --- /dev/null +++ b/src/builders.rs @@ -0,0 +1,142 @@ +use basic::Basic; +use amqp_error::AMQPResult; +use amq_proto::Table; +use channel::{Channel, Consumer}; +use protocol; + +pub struct ConsumeBuilder + where T: Consumer + 'static +{ + _callback: T, + _queue_name: String, + _tag: String, + _no_local: bool, + _no_ack: bool, + _exclusive: bool, + _nowait: bool, + _arguments: Table, +} + +impl ConsumeBuilder +{ + pub fn new(callback: T, name: S) -> ConsumeBuilder + where S: Into + { + ConsumeBuilder { + _callback: callback, + _queue_name: name.into(), + _tag: "".into(), + _no_local: false, + _no_ack: false, + _exclusive: false, + _nowait: false, + _arguments: Table::new(), + } + } + + pub fn tag(mut self, tag: S) -> ConsumeBuilder + where S: Into + { + self._tag = tag.into(); + self + } + + pub fn no_local(mut self) -> ConsumeBuilder { + self._no_local = true; + self + } + + pub fn no_ack(mut self) -> ConsumeBuilder { + self._no_ack = true; + self + } + + pub fn exclusive(mut self) -> ConsumeBuilder { + self._exclusive = true; + self + } + + pub fn nowait(mut self) -> ConsumeBuilder { + self._nowait = true; + self + } + + pub fn basic_consume(self, channel: &mut Channel) -> AMQPResult { + channel.basic_consume( + self._callback, + self._queue_name, + self._tag, + self._no_local, + self._no_ack, + self._exclusive, + self._nowait, + self._arguments) + } +} + +pub struct QueueBuilder { + _name: String, + _passive: bool, + _durable: bool, + _exclusive: bool, + _auto_delete: bool, + _nowait: bool, + _arguments: Table, +} + +impl QueueBuilder { + pub fn named(name: S) -> QueueBuilder + where S: Into { + QueueBuilder { + _name: name.into(), + _passive: false, + _durable: false, + _exclusive: false, + _auto_delete: false, + _nowait: false, + _arguments: Table::new() + } + } + + pub fn passive(mut self) -> QueueBuilder { + self._passive = true; + self + } + + pub fn durable(mut self) -> QueueBuilder { + self._durable = true; + self + } + + pub fn exclusive(mut self) -> QueueBuilder { + self._exclusive = true; + self + } + + pub fn auto_delete(mut self) -> QueueBuilder { + self._auto_delete = true; + self + } + + pub fn nowait(mut self) -> QueueBuilder { + self._nowait = true; + self + } + + pub fn arguments(mut self, table: Table) -> QueueBuilder { + self._arguments = table; + self + } + + pub fn declare(&self, channel: &mut Channel) -> + AMQPResult { + channel.queue_declare( + self._name.clone(), + self._passive, + self._durable, + self._exclusive, + self._auto_delete, + self._nowait, + self._arguments.clone()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8ff8edc..7847abb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,6 +94,7 @@ mod channel; mod session; mod basic; mod amqp_error; +mod builders; pub const VERSION: &'static str = env!("CARGO_PKG_VERSION"); @@ -103,3 +104,4 @@ pub use basic::{Basic, GetResult}; pub use session::AMQPScheme; pub use amqp_error::AMQPError; pub use amq_proto::{protocol, Table, TableEntry}; +pub use builders::{QueueBuilder, ConsumeBuilder};