diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 15cd5f5ad..a9f83d7e9 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -41,7 +41,7 @@ impl From> for ClientError { /// from the broker, i.e. move ahead. #[derive(Clone, Debug)] pub struct AsyncClient { - request_tx: Sender, + pub request_tx: Sender, } impl AsyncClient { diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9b1ce8c5..70b6a34f4 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -161,6 +161,7 @@ impl EventLoop { match self.select().await { Ok(v) => Ok(v), Err(e) => { + log::error!("connection error : {e}"); self.clean(); Err(e) } diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 408741e35..e567c6565 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -221,6 +221,10 @@ impl MqttState { } fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result, StateError> { + if puback.pkid > self.max_inflight { + return Ok(None); + } + let publish = self .outgoing_pub .get_mut(puback.pkid as usize) @@ -319,7 +323,7 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet fn outgoing_publish(&mut self, mut publish: Publish) -> Result, StateError> { - if publish.qos != QoS::AtMostOnce { + if publish.qos != QoS::AtMostOnce && publish.pkid <= self.max_inflight { if publish.pkid == 0 { publish.pkid = self.next_pkid(); } diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 854aa7b0f..5d415dc3f 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -65,7 +65,7 @@ pub enum StateError { #[error("Connection failed with reason '{reason:?}' ")] ConnFail { reason: ConnectReturnCode }, #[error("Connection closed by peer abruptly")] - ConnectionAborted + ConnectionAborted, } impl From for StateError {