Skip to content

Commit f899b50

Browse files
authored
Merge pull request #6 from hannesa2/Erguotou/develop
Erguotou/develop
2 parents 521bb10 + 8dfe6b0 commit f899b50

File tree

4 files changed

+87
-16
lines changed

4 files changed

+87
-16
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ ext {
2121
serviceArchivesBaseName = 'org.eclipse.paho.android.service'
2222
serviceVersion = '1.2.0'
2323

24-
clientVersion = '1.2.0'
24+
clientVersion = '1.2.5'
2525

2626
mavenUrl = "https://repo.eclipse.org/content/repositories/paho-releases/"
2727
}

org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttAndroidClient.java

+50
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,33 @@ public IMqttToken unsubscribe(String[] topic, Object userContext, IMqttActionLis
10281028
return token;
10291029
}
10301030

1031+
/**
1032+
* Removes a published message corresponding to the token.
1033+
* <p>If a publish is requested with QoS1 or Qos2 and the publish callback is
1034+
* not called yet, this function returns true, the publish called will never
1035+
* be called, and a messageId corresponding to the token will become reusable.
1036+
* </p>
1037+
* <p>If the publish callback is already be called, this function returns false.
1038+
* </p>
1039+
* <p>This function might not stop sending the published message.
1040+
* </p>
1041+
* *
1042+
*
1043+
* @param token the token of removing published message
1044+
* @return if the message is removed then true, otherwise false
1045+
* @throws MqttException if there was an error removing the message.
1046+
*/
1047+
@Override
1048+
public boolean removeMessage(IMqttDeliveryToken token) throws MqttException {
1049+
if (token.getMessage().getQos() > 0 && !token.isComplete())
1050+
return false;
1051+
int i = tokenMap.indexOfValue(token);
1052+
if (i > -1) tokenMap.removeAt(i);
1053+
if (callback != null)
1054+
callback.deliveryComplete(token);
1055+
return true;
1056+
}
1057+
10311058
/**
10321059
* Returns the delivery tokens for any outstanding publish operations.
10331060
* <p>
@@ -1176,6 +1203,16 @@ public void setManualAcks(boolean manualAcks) {
11761203
throw new UnsupportedOperationException();
11771204
}
11781205

1206+
/**
1207+
* Will attempt to reconnect to the server after the client has lost connection.
1208+
*
1209+
* @throws MqttException if an error occurs attempting to reconnect
1210+
*/
1211+
@Override
1212+
public void reconnect() throws MqttException {
1213+
mqttService.reconnect();
1214+
}
1215+
11791216
/**
11801217
* Process the results of a connection
11811218
*
@@ -1405,6 +1442,19 @@ public void deleteBufferedMessage(int bufferIndex) {
14051442
mqttService.deleteBufferedMessage(clientHandle, bufferIndex);
14061443
}
14071444

1445+
/**
1446+
* Returns the current number of outgoing in-flight messages being sent by the
1447+
* client. Note that this number cannot be guaranteed to be 100% accurate as
1448+
* some messages may have been sent or queued in the time taken for this method
1449+
* to return.
1450+
*
1451+
* @return the current number of in-flight messages.
1452+
*/
1453+
@Override
1454+
public int getInFlightMessageCount() {
1455+
return mqttService.getInFlightMessageCount(clientHandle);
1456+
}
1457+
14081458
/**
14091459
* Get the SSLSocketFactory using SSL key store and password
14101460
* <p>

org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttConnection.java

+31-15
Original file line numberDiff line numberDiff line change
@@ -710,21 +710,10 @@ public void deliveryComplete(IMqttDeliveryToken messageToken) {
710710

711711
service.traceDebug(TAG, "deliveryComplete(" + messageToken + ")");
712712

713-
MqttMessage message = savedSentMessages.remove(messageToken);
714-
if (message != null) { // If I don't know about the message, it's
715-
// irrelevant
716-
String topic = savedTopics.remove(messageToken);
717-
String activityToken = savedActivityTokens.remove(messageToken);
718-
String invocationContext = savedInvocationContexts.remove(messageToken);
719-
720-
Bundle resultBundle = messageToBundle(null, topic, message);
721-
if (activityToken != null) {
722-
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.SEND_ACTION);
723-
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
724-
resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);
725-
713+
Bundle resultBundle = popSendDetails(messageToken);
714+
if (resultBundle != null) {
715+
if (MqttServiceConstants.SEND_ACTION.equals(resultBundle.getString(MqttServiceConstants.CALLBACK_ACTION)))
726716
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
727-
}
728717
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.MESSAGE_DELIVERED_ACTION);
729718
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
730719
}
@@ -751,6 +740,29 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
751740
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
752741
}
753742

743+
/**
744+
* Removed store details of sent messages in "deliveryComplete"
745+
* callbacks from the mqttClient
746+
*/
747+
private synchronized Bundle popSendDetails(final IMqttDeliveryToken messageToken) {
748+
MqttMessage message = savedSentMessages.remove(messageToken);
749+
if (message != null) { // If I don't know about the message, it's
750+
// irrelevant
751+
String topic = savedTopics.remove(messageToken);
752+
String activityToken = savedActivityTokens.remove(messageToken);
753+
String invocationContext = savedInvocationContexts.remove(messageToken);
754+
755+
Bundle resultBundle = messageToBundle(null, topic, message);
756+
if (activityToken != null) {
757+
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.SEND_ACTION);
758+
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
759+
resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);
760+
}
761+
return resultBundle;
762+
}
763+
return null;
764+
}
765+
754766
/**
755767
* Store details of sent messages so we can handle "deliveryComplete"
756768
* callbacks from the mqttClient
@@ -761,7 +773,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
761773
* @param invocationContext
762774
* @param activityToken
763775
*/
764-
private void storeSendDetails(final String topic, final MqttMessage msg, final IMqttDeliveryToken messageToken,
776+
private synchronized void storeSendDetails(final String topic, final MqttMessage msg, final IMqttDeliveryToken messageToken,
765777
final String invocationContext, final String activityToken) {
766778
savedTopics.put(messageToken, topic);
767779
savedSentMessages.put(messageToken, msg);
@@ -920,6 +932,10 @@ public void deleteBufferedMessage(int bufferIndex) {
920932
myClient.deleteBufferedMessage(bufferIndex);
921933
}
922934

935+
public int getInFlightMessageCount() {
936+
return myClient.getInFlightMessageCount();
937+
}
938+
923939
/**
924940
* General-purpose IMqttActionListener for the Client context
925941
* <p>

org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttService.java

+5
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,11 @@ public void deleteBufferedMessage(String clientHandle, int bufferIndex) {
762762
client.deleteBufferedMessage(bufferIndex);
763763
}
764764

765+
public int getInFlightMessageCount(String clientHandle) {
766+
MqttConnection client = getConnection(clientHandle);
767+
return client.getInFlightMessageCount();
768+
}
769+
765770
/*
766771
* Called in response to a change in network connection - after losing a
767772
* connection to the server, this allows us to wait until we have a usable

0 commit comments

Comments
 (0)