Skip to content

Commit f7db0d4

Browse files
authored
Merge pull request #325 from mathieu1fb/feature/mqtt-auto-reconnect
java: add MQTT auto reconnect to IoT3 Core
2 parents 4a2bd88 + 0017b55 commit f7db0d4

3 files changed

Lines changed: 29 additions & 7 deletions

File tree

java/iot3/core/src/main/java/com/orange/iot3core/IoT3Core.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,19 @@ public void mqttPublish(String topic, String message, boolean retain) {
205205
if(mqttClient != null) mqttClient.publishMessage(topic, message, retain);
206206
}
207207

208+
/**
209+
* Check that the MQTT connection is established
210+
*/
211+
public boolean isMqttConnected() {
212+
if(mqttClient != null) return mqttClient.isConnected();
213+
else return false;
214+
}
215+
208216
/**
209217
* Check that the MQTT connection is secured with TLS
210218
*/
211219
public boolean isMqttConnectionSecured() {
212-
if(mqttClient != null) return mqttClient.isConnected() && mqttClient.isConnectionSecured();
213-
else return false;
220+
return isMqttConnected() && mqttClient.isConnectionSecured();
214221
}
215222

216223
/**

java/iot3/core/src/main/java/com/orange/iot3core/clients/MqttClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
1111
import com.hivemq.client.mqtt.datatypes.MqttQos;
12+
import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect;
13+
import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnectBuilder;
1214
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
1315
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
1416
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
@@ -26,6 +28,7 @@
2628
import java.nio.charset.StandardCharsets;
2729
import java.util.HashMap;
2830
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
2932
import java.util.logging.Level;
3033
import java.util.logging.Logger;
3134

@@ -57,6 +60,10 @@ public MqttClient(String serverHost,
5760
.identifier(clientId)
5861
.serverHost(serverHost)
5962
.serverPort(serverPort)
63+
.automaticReconnect()
64+
.initialDelay(500, TimeUnit.MILLISECONDS)
65+
.maxDelay(5, TimeUnit.SECONDS)
66+
.applyAutomaticReconnect()
6067
.addDisconnectedListener(context1 -> {
6168
LOGGER.log(Level.INFO, "Disconnected from MQTT broker " + serverHost);
6269
callback.connectionLost(context1.getCause());

java/iot3/mobility/src/main/java/com/orange/iot3mobility/IoT3Mobility.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ public void mqttUnsubscriptionComplete(Throwable unsubscribeFailure) {
146146
TrueTime.initTrueTime();
147147
}
148148

149+
/**
150+
* Check that the connection is established.
151+
*/
152+
public boolean isConnected() {
153+
if(ioT3Core != null) return ioT3Core.isMqttConnected();
154+
else return false;
155+
}
156+
149157
/**
150158
* Retrieve the IoT3Core instance powering IoT3Mobility.
151159
*/
@@ -324,8 +332,8 @@ public void sendCam(CAM cam) {
324332
String geoExtension = QuadTileHelper.quadKeyToQuadTopic(quadkey);
325333
String topic = context + "/inQueue/v2x/cam/" + uuid + geoExtension;
326334

327-
// send the message
328-
if(ioT3Core != null) ioT3Core.mqttPublish(topic, cam.getJsonCAM().toString());
335+
// send the message only if the client is connected
336+
if(isConnected()) ioT3Core.mqttPublish(topic, cam.getJsonCAM().toString());
329337
}
330338

331339
/**
@@ -392,7 +400,7 @@ public void sendDenm(DENM denm) {
392400
String geoExtension = QuadTileHelper.quadKeyToQuadTopic(quadkey);
393401
String topic = context + "/inQueue/v2x/denm/" + uuid + geoExtension;
394402

395-
// send the message
403+
// send the message even if the client is disconnected, so it will be queued
396404
if(ioT3Core != null) ioT3Core.mqttPublish(topic, denm.getJsonDENM().toString());
397405
}
398406

@@ -410,8 +418,8 @@ public void sendCpm(CPM cpm) {
410418
String geoExtension = QuadTileHelper.quadKeyToQuadTopic(quadkey);
411419
String topic = context + "/inQueue/v2x/cpm/" + uuid + geoExtension;
412420

413-
// send the message
414-
if(ioT3Core != null) ioT3Core.mqttPublish(topic, cpm.getJson().toString());
421+
// send the message only if the client is connected
422+
if(isConnected()) ioT3Core.mqttPublish(topic, cpm.getJson().toString());
415423
}
416424

417425
/**

0 commit comments

Comments
 (0)