Skip to content

Commit

Permalink
Fixes (#185)
Browse files Browse the repository at this point in the history
* Support outgoing pubrel during retransmissions

* Implement 'From' trait for serialization error

* Reorganize methods in state for readability

* Add responses to incoming packets in event queue

* Catch error in example

Co-authored-by: tekjar <[email protected]>
  • Loading branch information
Ravi Teja and tekjar authored Nov 13, 2020
1 parent dcb256a commit e74d915
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 103 deletions.
21 changes: 12 additions & 9 deletions rumqttc/examples/asyncpubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
});

loop {
match eventloop.poll().await? {
Event::Incoming(i) => {
match eventloop.poll().await {
Ok(Event::Incoming(Incoming::Publish(p))) => {
println!("Topic: {}, Payload: {:?}", p.topic, p.payload)
}
Ok(Event::Incoming(i)) => {
println!("Incoming = {:?}", i);

// Extract topic (String) & payload (Bytes)
if let Incoming::Publish(p) = i {
println!("Topic: {}, Payload: {:?}", p.topic, p.payload);
}
}
Event::Outgoing(o) => println!("Outgoing = {:?}", o),
Ok(Event::Outgoing(o)) => println!("Outgoing = {:?}", o),
Err(e) => {
println!("Error = {:?}", e);
continue;
}
}
}
}
Expand All @@ -41,9 +43,10 @@ async fn requests(client: AsyncClient) {

for i in 1..=10 {
client
.publish("hello/world", QoS::AtLeastOnce, false, vec![i; i as usize])
.publish("hello/world", QoS::ExactlyOnce, false, vec![i; 1000 * 1024])
.await
.unwrap();

time::sleep(Duration::from_secs(1)).await;
}

Expand Down
192 changes: 98 additions & 94 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub enum StateError {
Serialization(mqtt4bytes::Error),
}

impl From<mqtt4bytes::Error> for StateError {
fn from(e: mqtt4bytes::Error) -> StateError {
StateError::Serialization(e)
}
}

/// State of the mqtt connection.
// Design: Methods will just modify the state of the object without doing any network operations
// Design: All inflight queues are maintained in a pre initialized vec with index as packet id.
Expand All @@ -48,7 +54,6 @@ pub struct MqttState {
/// Collision ping count. Collisions stop user requests
/// which inturn trigger pings. Multiple pings without
/// resolving collisions will result in error
/// Last incoming packet time
pub collision_ping_count: usize,
/// Last incoming packet time
last_incoming: Instant,
Expand Down Expand Up @@ -137,6 +142,7 @@ impl MqttState {
pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
match request {
Request::Publish(publish) => self.outgoing_publish(publish)?,
Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?,
Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
Request::PingReq => self.outgoing_ping()?,
Expand Down Expand Up @@ -174,37 +180,39 @@ impl MqttState {
Ok(())
}

/// Adds next packet identifier to QoS 1 and 2 publish packets and returns
/// it buy wrapping publish in packet
fn outgoing_publish(&mut self, publish: Publish) -> Result<(), StateError> {
debug!("Publish. Topc = {}", publish.topic);
fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
Ok(())
}

let publish = match publish.qos {
QoS::AtMostOnce => publish,
QoS::AtLeastOnce | QoS::ExactlyOnce => self.add_packet_id_and_save(publish)?,
};
fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
Ok(())
}

debug!(
"Publish. Pkid = {:?}, Payload Size = {:?}",
publish.pkid,
publish.payload.len()
);
/// Results in a publish notification in all the QoS cases. Replys with an ack
/// in case of QoS1 and Replys rec in case of QoS while also storing the message
fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
let qos = publish.qos;

publish
.write(&mut self.write)
.map_err(StateError::Serialization)?;
match qos {
QoS::AtMostOnce => Ok(()),
QoS::AtLeastOnce => {
let pkid = publish.pkid;
PubAck::new(pkid).write(&mut self.write)?;

let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
self.events.push_back(event);
Ok(())
Ok(())
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
PubRec::new(pkid).write(&mut self.write)?;
self.incoming_pub[pkid as usize] = Some(pkid);
Ok(())
}
}
}

fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result<(), StateError> {
if let Some(publish) = self.check_collision(puback.pkid) {
publish
.write(&mut self.write)
.map_err(StateError::Serialization)?;

publish.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
self.events.push_back(event);
self.collision_ping_count = 0;
Expand All @@ -222,23 +230,15 @@ impl MqttState {
}
}

fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
Ok(())
}

fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
Ok(())
}

fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<(), StateError> {
match mem::replace(&mut self.outgoing_pub[pubrec.pkid as usize], None) {
Some(_) => {
// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
PubRel::new(pubrec.pkid)
.write(&mut self.write)
.map_err(StateError::Serialization)?;
PubRel::new(pubrec.pkid).write(&mut self.write)?;

let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);
Ok(())
}
None => {
Expand All @@ -248,40 +248,12 @@ impl MqttState {
}
}

/// Results in a publish notification in all the QoS cases. Replys with an ack
/// in case of QoS1 and Replys rec in case of QoS while also storing the message
fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
let qos = publish.qos;

match qos {
QoS::AtMostOnce => Ok(()),
QoS::AtLeastOnce => {
let pkid = publish.pkid;
PubAck::new(pkid)
.write(&mut self.write)
.map_err(StateError::Serialization)?;

Ok(())
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
PubRec::new(pkid)
.write(&mut self.write)
.map_err(StateError::Serialization)?;

self.incoming_pub[pkid as usize] = Some(pkid);
Ok(())
}
}
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<(), StateError> {
match mem::replace(&mut self.incoming_pub[pubrel.pkid as usize], None) {
Some(_) => {
PubComp::new(pubrel.pkid)
.write(&mut self.write)
.map_err(StateError::Serialization)?;

PubComp::new(pubrel.pkid).write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
self.events.push_back(event);
Ok(())
}
None => {
Expand All @@ -293,10 +265,9 @@ impl MqttState {

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<(), StateError> {
if let Some(publish) = self.check_collision(pubcomp.pkid) {
publish
.write(&mut self.write)
.map_err(StateError::Serialization)?;

publish.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
self.events.push_back(event);
self.collision_ping_count = 0;
}

Expand All @@ -312,6 +283,43 @@ impl MqttState {
}
}

fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
self.await_pingresp = false;
Ok(())
}

/// Adds next packet identifier to QoS 1 and 2 publish packets and returns
/// it buy wrapping publish in packet
fn outgoing_publish(&mut self, publish: Publish) -> Result<(), StateError> {
let publish = match publish.qos {
QoS::AtMostOnce => publish,
QoS::AtLeastOnce | QoS::ExactlyOnce => self.save_publish(publish)?,
};

debug!(
"Publish. Topic = {}, Pkid = {:?}, Payload Size = {:?}",
publish.topic,
publish.pkid,
publish.payload.len()
);

publish.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
self.events.push_back(event);
Ok(())
}

fn outgoing_pubrel(&mut self, pubrel: PubRel) -> Result<(), StateError> {
let pubrel = self.save_pubrel(pubrel)?;

debug!("Pubrel. Pkid = {}", pubrel.pkid);
PubRel::new(pubrel.pkid).write(&mut self.write)?;

let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid));
self.events.push_back(event);
Ok(())
}

/// check when the last control packet/pingreq packet is received and return
/// the status which tells if keep alive time has exceeded
/// NOTE: status will be checked for zero keepalive times also
Expand Down Expand Up @@ -341,20 +349,12 @@ impl MqttState {
elapsed_out.as_millis()
);

PingReq
.write(&mut self.write)
.map_err(StateError::Serialization)?;

PingReq.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::PingReq);
self.events.push_back(event);
Ok(())
}

fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
self.await_pingresp = false;
Ok(())
}

fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
let pkid = self.next_pkid();
subscription.pkid = pkid;
Expand All @@ -364,10 +364,7 @@ impl MqttState {
subscription.topics, subscription.pkid
);

subscription
.write(&mut self.write)
.map_err(StateError::Serialization)?;

subscription.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid));
self.events.push_back(event);
Ok(())
Expand All @@ -382,10 +379,7 @@ impl MqttState {
unsub.topics, unsub.pkid
);

unsub
.write(&mut self.write)
.map_err(StateError::Serialization)?;

unsub.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid));
self.events.push_back(event);
Ok(())
Expand All @@ -394,10 +388,7 @@ impl MqttState {
fn outgoing_disconnect(&mut self) -> Result<(), StateError> {
debug!("Disconnect");

Disconnect
.write(&mut self.write)
.map_err(StateError::Serialization)?;

Disconnect.write(&mut self.write)?;
let event = Event::Outgoing(Outgoing::Disconnect);
self.events.push_back(event);
Ok(())
Expand All @@ -416,14 +407,27 @@ impl MqttState {
None
}

fn save_pubrel(&mut self, mut pubrel: PubRel) -> Result<PubRel, StateError> {
let pubrel = match pubrel.pkid {
// consider PacketIdentifier(0) as uninitialized packets
0 => {
pubrel.pkid = self.next_pkid();
pubrel
}
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
Ok(pubrel)
}

/// Add publish packet to the state and return the packet. This method clones the
/// publish packet to save it to the state.
fn add_packet_id_and_save(&mut self, mut publish: Publish) -> Result<Publish, StateError> {
fn save_publish(&mut self, mut publish: Publish) -> Result<Publish, StateError> {
let publish = match publish.pkid {
// consider PacketIdentifier(0) as uninitialized packets
0 => {
let pkid = self.next_pkid();
publish.pkid = pkid;
publish.pkid = self.next_pkid();
publish
}
_ => publish,
Expand Down

0 comments on commit e74d915

Please sign in to comment.