Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

org.eclipse.paho.mqttv5.common.MqttException: Internal error, caused by no new message IDs being available #1070

Open
PengZhouSY opened this issue Mar 13, 2025 · 0 comments

Comments

@PengZhouSY
Copy link

PengZhouSY commented Mar 13, 2025

I use flink to write data to mqtt, I set qos to 0, But after running for a while, an error occurred,
The error message is as follows
java.lang.RuntimeException: Internal error, caused by no new message IDs being available (32001) at com.kitegogo.sink.MqttV5Sink.invoke(MqttV5Sink.java:57) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: org.eclipse.paho.mqttv5.common.MqttException: Internal error, caused by no new message IDs being available at org.eclipse.paho.mqttv5.client.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:32) at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454) at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:511) at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:155) at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:218) at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1530) at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1499) at com.kitegogo.sink.MqttV5Sink.invoke(MqttV5Sink.java:54) ... 14 common frames omitted
The dependencies I use are as follows
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency>
My code is as follows

` public class MqttV5Sink extends RichSinkFunction {
private transient MqttAsyncClient client;
private final String topic;
private final String clientName;

public MqttV5Sink(String topic, String clientName) {
    this.topic = topic;
    this.clientName = clientName;
}

@Override
public void open(Configuration parameters) throws Exception {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConfig mqttConfig = new MqttConfig();
    String clientId = "UavSimulator_"+ clientName+"_" + UUID.randomUUID().toString();
    MqttConnectionOptions connOpts = new MqttConnectionOptions();
    connOpts.setUserName(mqttConfig.getUserName());
    connOpts.setPassword(mqttConfig.getPassword().getBytes());
    connOpts.setCleanStart(true);
    client = new MqttAsyncClient(mqttConfig.getHost(), clientId, persistence);
    IMqttToken token = client.connect(connOpts);
    token.waitForCompletion();

    log.debug(clientId+" Connected to MQTT broker successfully");
}



@Override
public void invoke(T value, Context context){
    String msg = (String) value;
    MqttMessage message = new MqttMessage(msg.getBytes());
    message.setQos(0);
    try{

        log.debug("{}发送消息:{}",client.getClientId(),new String(message.getPayload()));
        client.publish(topic, message);

    }catch (Exception e){
        throw new RuntimeException(e);
    }
}

@Override
public void close() throws Exception {
    if (client != null && client.isConnected()) {
        client.disconnect();
    }
    super.close();
}

}`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant