Skip to content

Commit

Permalink
Fix purge packet ids
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell committed May 28, 2024
1 parent 96a5805 commit 4bd69f2
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
packet_id = undefined :: undefined | non_neg_integer(),
queued :: non_neg_integer(),
expiry :: non_neg_integer(),
qos :: 0..2,
qos = 0 :: 0..2,
message :: mqtt_packet_map:mqtt_packet()
}).

Expand Down Expand Up @@ -818,11 +818,7 @@ relay_publish(#{ type := publish, message := Msg } = MqttMsg, State) ->
% ---------------------------------------------------------------------------------------

cleanup_pending_qos0(#state{ pending = Pending } = State) ->
Pending1 = queue:filter(
fun(#queued{ qos = QoS }) ->
QoS =:= 0
end,
Pending),
Pending1 = queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Pending),
State#state{ pending = Pending1 }.

resend_unacknowledged(#state{ awaiting_ack = AwaitAck } = State) ->
Expand Down Expand Up @@ -988,10 +984,13 @@ maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
case queue:len(Queue) > ?MAX_INFLIGHT orelse size(WaitAcks) > ?MAX_INFLIGHT_ACK of
true ->
PurgedQueue = purge(Queue),
AcksInPurgedQueue = queue:all(
fun(#queued{ qos = N }) -> N > 0 end,
PacketIds = queue:fold(
fun
(#queued{ qos = 0 }, Acc) -> Acc;
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
end,
[],
PurgedQueue),
PacketIds = [ PacketId || #queued{ packet_id = PacketId } <- AcksInPurgedQueue ],
State#state{
pending = PurgedQueue,
awaiting_ack = maps:with(PacketIds, WaitAcks)
Expand Down

0 comments on commit 4bd69f2

Please sign in to comment.