Skip to content

Commit aa9cdae

Browse files
artembilancppwfs
authored andcommitted
GH-10760: Support topic filters in messages arrived callback (#10761)
* GH-10760: Support topic filters in messages arrived callback Fixes: #10760 The previous fix with protection against subscribed topics did not include the logic about wildcard support in MQTT subscriptions * Fix `MqttPahoMessageDrivenChannelAdapter` extracting `messageArrivedIfMatched()` which would be called as a fallback common callback. Delegate matching logic to the `MqttTopic.isMatched()` utility * Fix `Mqttv5PahoMessageDrivenChannelAdapter` extracting `messageArrivedIfMatched()` which would be called as a fallback common callback. Delegate matching logic to the `MqttTopicValidator.isMatched()` utility. However, before that check for the `$share/` prefix and strip two subscription parts. Exactly the logic missed in Paho Client * Modify `Mqttv5BackToBackTests.testSharedTopicMqttv5Interaction()` for wildcard to be sure new matching logic works as expected * Some MQTT MDChAs cleanup after review Auto-cherry-pick to `7.0.x` & `6.5.x`**
1 parent cbafc03 commit aa9cdae

File tree

3 files changed

+36
-35
lines changed

3 files changed

+36
-35
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
3030
import org.eclipse.paho.client.mqttv3.MqttException;
3131
import org.eclipse.paho.client.mqttv3.MqttMessage;
32+
import org.eclipse.paho.client.mqttv3.MqttTopic;
3233

3334
import org.springframework.context.ApplicationEventPublisher;
3435
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -72,7 +73,7 @@ public class MqttPahoMessageDrivenChannelAdapter
7273
* Never invoked because MQTT v3 does not support shared subscriptions.
7374
* But it is here for consistency with the {@link ClientManager} requirements
7475
*/
75-
private final ClientManager.DefaultMessageHandler<MqttMessage> defaultMessageHandler = this::messageArrived;
76+
private final ClientManager.DefaultMessageHandler<MqttMessage> defaultMessageHandler = this::messageArrivedIfMatched;
7677

7778
private final MqttPahoClientFactory clientFactory;
7879

@@ -188,7 +189,6 @@ protected void doStart() {
188189
}
189190
}
190191

191-
@SuppressWarnings("deprecation")
192192
private void connect() throws MqttException {
193193
this.lock.lock();
194194
try {
@@ -374,14 +374,6 @@ public void connectionLost(Throwable cause) {
374374

375375
@Override
376376
public void messageArrived(String topic, MqttMessage mqttMessage) {
377-
// Just simple check since MQTT v3 does not support shared subscriptions.
378-
boolean subscribed = Arrays.asList(getTopic()).contains(topic);
379-
if (!subscribed) {
380-
logger.trace(() ->
381-
"Arrived message on topic '" + topic + "' this channel adapter is not subscribed to. Ignoring...");
382-
return;
383-
}
384-
385377
AbstractIntegrationMessageBuilder<?> builder = toMessageBuilder(topic, mqttMessage);
386378
if (builder != null) {
387379
if (isManualAcks()) {
@@ -451,6 +443,17 @@ public void connectComplete(boolean reconnect, String serverURI) {
451443
}
452444
}
453445

446+
private void messageArrivedIfMatched(String topic, MqttMessage mqttMessage) {
447+
for (String subscribedTopic : getTopic()) {
448+
if (MqttTopic.isMatched(subscribedTopic, topic)) {
449+
messageArrived(topic, mqttMessage);
450+
return;
451+
}
452+
}
453+
logger.trace(() ->
454+
"Arrived message on topic '" + topic + "' this channel adapter is not subscribed to. Ignoring...");
455+
}
456+
454457
/**
455458
* Used to complete message arrival when {@link #isManualAcks()} is true.
456459
*

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.eclipse.paho.mqttv5.common.MqttMessage;
3838
import org.eclipse.paho.mqttv5.common.MqttSubscription;
3939
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
40+
import org.eclipse.paho.mqttv5.common.util.MqttTopicValidator;
4041

4142
import org.springframework.beans.factory.BeanCreationException;
4243
import org.springframework.context.ApplicationEventPublisher;
@@ -94,7 +95,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter
9495
/**
9596
* Used as a fallback option for shared subscriptions unrouted messages
9697
*/
97-
private final ClientManager.DefaultMessageHandler<MqttMessage> defaultMessageHandler = this::messageArrived;
98+
private final ClientManager.DefaultMessageHandler<MqttMessage> defaultMessageHandler = this::messageArrivedIfMatched;
9899

99100
private final MqttConnectionOptions connectionOptions;
100101

@@ -301,12 +302,16 @@ protected void doStop() {
301302
this.readyToSubscribeOnStart = true;
302303

303304
}
304-
if (getClientManager() == null) {
305+
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager = getClientManager();
306+
if (clientManager == null) {
305307
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
306308
if (getConnectionInfo().isAutomaticReconnect()) {
307309
MqttUtils.stopClientReconnectCycle(this.mqttClient);
308310
}
309311
}
312+
else {
313+
clientManager.removeDefaultMessageHandler(this.defaultMessageHandler);
314+
}
310315
}
311316
}
312317
catch (MqttException ex) {
@@ -325,10 +330,6 @@ private void unsubscribe(String... topics) throws MqttException {
325330
catch (ConcurrentModificationException ex) {
326331
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
327332
}
328-
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager = getClientManager();
329-
if (clientManager != null) {
330-
clientManager.removeDefaultMessageHandler(this.defaultMessageHandler);
331-
}
332333
}
333334

334335
@Override
@@ -380,10 +381,10 @@ public void addTopic(String topic, int qos) {
380381
public void removeTopic(String... topic) {
381382
this.topicLock.lock();
382383
try {
384+
super.removeTopic(topic);
383385
if (this.mqttClient != null && this.mqttClient.isConnected()) {
384386
unsubscribe(topic);
385387
}
386-
super.removeTopic(topic);
387388
if (!CollectionUtils.isEmpty(this.subscriptions)) {
388389
this.subscriptions.removeIf((sub) -> ObjectUtils.containsElement(topic, sub.getTopic()));
389390
}
@@ -398,12 +399,6 @@ public void removeTopic(String... topic) {
398399

399400
@Override
400401
public void messageArrived(String topic, MqttMessage mqttMessage) {
401-
if (!isTopicSubscribed(topic)) {
402-
logger.trace(() ->
403-
"Arrived message on topic '" + topic + "' this channel adapter is not subscribed to. Ignoring...");
404-
return;
405-
}
406-
407402
Map<String, Object> headers = this.headerMapper.toHeaders(mqttMessage.getProperties());
408403
headers.put(MqttHeaders.ID, mqttMessage.getId());
409404
headers.put(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos());
@@ -438,17 +433,6 @@ public void messageArrived(String topic, MqttMessage mqttMessage) {
438433
}
439434
}
440435

441-
private boolean isTopicSubscribed(String receivedTopic) {
442-
for (String subscribedTopic : getTopic()) {
443-
if (subscribedTopic.equals(receivedTopic)
444-
|| subscribedTopic.startsWith("$share/") && subscribedTopic.endsWith(receivedTopic)) {
445-
446-
return true;
447-
}
448-
}
449-
return false;
450-
}
451-
452436
@Override
453437
public void disconnected(MqttDisconnectResponse disconnectResponse) {
454438
if (isRunning()) {
@@ -550,6 +534,20 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {
550534

551535
}
552536

537+
private void messageArrivedIfMatched(String topic, MqttMessage mqttMessage) {
538+
for (String subscribedTopic : getTopic()) {
539+
if (subscribedTopic.startsWith("$share/")) {
540+
subscribedTopic = subscribedTopic.split("/", 3)[2];
541+
}
542+
if (MqttTopicValidator.isMatched(subscribedTopic, topic)) {
543+
messageArrived(topic, mqttMessage);
544+
return;
545+
}
546+
}
547+
logger.trace(() ->
548+
"Arrived message on topic '" + topic + "' this channel adapter is not subscribed to. Ignoring...");
549+
}
550+
553551
private static String obtainServerUrlFromOptions(MqttConnectionOptions connectionOptions) {
554552
Assert.notNull(connectionOptions, "'connectionOptions' must not be null");
555553
String[] serverURIs = connectionOptions.getServerURIs();

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void testSimpleMqttv5Interaction() {
139139

140140
@Test
141141
public void testSharedTopicMqttv5Interaction() {
142-
this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/testTopic");
142+
this.mqttv5MessageDrivenChannelAdapter.addTopic("$share/group/#");
143143

144144
String testPayload = "shared topic payload";
145145
this.mqttOutFlowInput.send(

0 commit comments

Comments
 (0)