Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS 2 messages loss on new instance #883

Open
rubenbaer opened this issue Mar 5, 2025 · 0 comments
Open

QoS 2 messages loss on new instance #883

rubenbaer opened this issue Mar 5, 2025 · 0 comments
Labels
Status: Available No one has claimed responsibility for resolving this issue.

Comments

@rubenbaer
Copy link

Bug Description

Current Situation

When a QoS 2 PUBLISH is received, the message is stored internally, and a PUBREC is sent. Once the broker receives PUBREC, message ownership is transferred to the client.
When the client receives a subsequent PUBREL, the on_message(..) callback is executed. If on_message(..) completes successfully or a manual acknowledgment is performed, a PUBCOMP is sent.

Problem

If the client disconnects and reconnects, the broker has already sent PUBREC and cannot resend the PUBLISH. If a new client instance is created instead, the broker still cannot resend the PUBLISH, causing message loss.

For the broker, a reconnection and a new client instance are indistinguishable. However, the client can differentiate between them based on session state. These scenarios should be handled separately.

Expected Behavior

The client should adhere to the message delivery retry specification. This requires retaining the following information after a connection loss:

  • Unacknowledged PUBLISH: If a PUBLISH has been received and passed to the handler but is not yet acknowledged, it must be reprocessed if the client restarts.
  • Outgoing PUBREC Packets: If a PUBREC has been sent, the message was already acknowledged and must not be reprocessed. Otherwise, this would violate the exactly-once guarantee.

Advantages

This approach ensures that events can react to PUBREL appropriately, such as committing processed data.
Messages will be properly retransmitted, and with a commit mechanism or PUBREL, no messages should be lost.

Additionally, clients can distinguish between:

  • Recoverable errors: Resolvable by reconnection. The message should not be reprocessed.
  • Application errors: Require a new client instance. The message should be reprocessed.

Note

Under this logic, the PUBLISH message itself is not part of the session state. Instead, handle_publish should dispatch the message to the handler and retain only the message ID.

Reproduction

This is the log:

Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (0, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Sending SUBSCRIBE (d0, m1) [(b'236a73a0-e388-48c1-bb7e-e221189d8b3a', {QoS=2, noLocal=False, retainAsPublished=False, retainHandling=0})]
Received SUBACK
Received PUBLISH (d0, q2, r0, m1), '236a73a0-e388-48c1-bb7e-e221189d8b3a', properties=[], ...  (3 bytes)
Sending PUBREC (Mid: 1)
Received PUBREL (Mid: 1)
Caught exception in on_message: Unable to ack: Processing failed, msg = b'msg', qos = 2, topic = 236a73a0-e388-48c1-bb7e-e221189d8b3a
Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'd11461cb-2fd4-40fd-8cba-e51fda4334a1' properties=[SessionExpiryInterval : 120]
Received CONNACK (1, Success) properties=[ReceiveMaximum : 20, TopicAliasMaximum : 10]
Received PUBREL (Mid: 1)
Sending PUBCOMP (Mid: 1)

that is generated by this example:

import paho.mqtt.client as mqtt
from paho.mqtt import publish
import threading
import uuid
import time

host = "127.0.0.1"
client_id = str(uuid.uuid4())
topic = str(uuid.uuid4())
qos = 2

def new_client():
    def on_message(client, userdata, msg: mqtt.MQTTMessage):
        raise Exception(f"Unable to ack: Processing failed, msg = {msg.payload}, qos = {msg.qos}, topic = {msg.topic}")

    def on_connect(client, userdata, flags, reason_code, properties):
        if not flags.session_present:
            client.subscribe(topic, qos=qos)

    client = mqtt.Client(protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
    client.on_message = on_message
    client.on_connect = on_connect
    client.on_log = lambda a, b, c, msg: print(msg)

    properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
    properties.SessionExpiryInterval = 120

    client.connect(host=host, clean_start=False, properties=properties)

    return client

def publish_msg():
    time.sleep(1)
    publish.single(topic, "msg", qos=qos, hostname=host)

try:
    threading.Thread(target=publish_msg).start()
    new_client().loop_forever()
except Exception:
    ...

try:
    new_client().loop_forever()
except Exception:
    print("THIS WILL NEVER HAPPEN")

The docker-compose.yml:

services:
  mosquitto:
    image: eclipse-mosquitto
    ports:
      - "1883:1883"
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

The mosquitto.conf:

persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883

Environment

  • Python version: 3.11, 3.13
  • Library version: 2.1.0
  • Operating system (including version): Linux
  • MQTT server (name, version, configuration, hosting details): mosquitto:latest

Logs

For many issues, especially when you cannot provide code to replicate the issue, it's helpful to include logs. Please
consider including:

@github-actions github-actions bot added the Status: Available No one has claimed responsibility for resolving this issue. label Mar 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Available No one has claimed responsibility for resolving this issue.
Projects
None yet
Development

No branches or pull requests

1 participant