Skip to content

Commit 5ee02c8

Browse files
Merge pull request #308 from ymorin-orange/nyma/gl292-retain-expiry
python: handle retain in IoT3 SDK, allow filtering in IQM
2 parents e1527f3 + 730a40a commit 5ee02c8

5 files changed

Lines changed: 315 additions & 32 deletions

File tree

python/iot3/src/iot3/core/mqtt.py

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,26 @@ def __init__(
8181
and thus receiving messages from specific topics. If no msg_cb
8282
is provided, it is not possible to subscribe, and only emitting
8383
is permitted. msg_cb must be a callable that accepts at least
84-
those keyword arguments: data, topic, payload. Ideally, to be
85-
future-proof, it should be declared with:
84+
those keyword arguments: data, topic, payload, retain. Ideally,
85+
to be future-proof, it should be declared with:
8686
def my_cb(
8787
*_args,
88-
*,
8988
data: Any,
9089
topic: str,
9190
payload: bytes,
91+
retain: bool | int,
9292
**_kwargs,
9393
) -> None
9494
9595
If msg_cb_data is specified, it is passed as the data keyword
9696
argument of msg_cb, otherwise None is passed.
97+
98+
Note: the retain flag will be set to False if the message was
99+
not published in retain; it will be set to True if the message
100+
was published in retain without a Message Expiry Interval; it
101+
will be set to a non-zero integer representing the remaining
102+
Message Expiry Interval if the message was published in retain
103+
with a non-zero Message Expiry Interval.
97104
"""
98105

99106
self.msg_cb = msg_cb
@@ -171,27 +178,49 @@ def stop(self):
171178
self.client.disconnect()
172179
self.client.loop_stop()
173180

174-
def publish(self, *, topic: str, payload: bytes | str):
181+
def publish(
182+
self,
183+
*,
184+
topic: str,
185+
payload: bytes | str,
186+
retain: bool | int = False,
187+
):
175188
"""Publish an MQTT message
176189
177190
:param topic: The MQTT topic to post on.
178191
:param payload: The payload to post.
192+
:param retain: Whether the message should be set to retain; can be
193+
either a bool(), in which case the message will not
194+
be retained (False) or retained indefinitely (True),
195+
or a strictly positive int(), in which case the message
196+
will be sent in retain with a Message Expiry Interval
197+
set to the specified value (min: 1, max: 2^32-1).
179198
"""
180199
with self.span_ctxmgr_cb(
181200
name="IoT3 Core MQTT Message",
182201
kind=otel.SpanKind.PRODUCER,
183202
) as span:
184-
new_traceparent = span.to_traceparent()
185-
span.set_attribute(key="iot3.core.mqtt.topic", value=topic)
186-
span.set_attribute(key="iot3.core.mqtt.payload_size", value=len(payload))
187203
properties = paho.mqtt.properties.Properties(
188204
paho.mqtt.packettypes.PacketTypes.PUBLISH,
189205
)
206+
new_traceparent = span.to_traceparent()
207+
span.set_attribute(key="iot3.core.mqtt.topic", value=topic)
208+
span.set_attribute(key="iot3.core.mqtt.payload_size", value=len(payload))
209+
if retain:
210+
span.set_attribute(key="iot3.core.mqtt.retain", value=True)
211+
if isinstance(retain, int):
212+
if retain < 1:
213+
raise ValueError(
214+
f"retain must be strictly positive (not: {retain})"
215+
)
216+
properties.MessageExpiryInterval = retain
217+
retain = True
190218
if new_traceparent:
191219
properties.UserProperty = ("traceparent", new_traceparent)
192220
msg_info = self.client.publish(
193221
topic=topic,
194222
payload=payload,
223+
retain=retain,
195224
properties=properties,
196225
)
197226
if msg_info.rc:
@@ -214,7 +243,7 @@ def subscribe(self, *, topics: list[str]):
214243
with self.subscriptions_lock:
215244
sub = topics.difference(self.subscriptions)
216245
if sub and self.client.is_connected():
217-
self.client.subscribe(list(map(lambda t: (t, 0), sub)))
246+
self._do_subscribe(topics=sub)
218247
self.subscriptions.update(topics)
219248

220249
def subscribe_replace(self, *, topics: list[str]):
@@ -242,7 +271,7 @@ def subscribe_replace(self, *, topics: list[str]):
242271
if unsub:
243272
self.client.unsubscribe(list(unsub))
244273
if sub:
245-
self.client.subscribe(list(map(lambda t: (t, 0), sub)))
274+
self._do_subscribe(topics=sub)
246275
self.subscriptions.clear()
247276
self.subscriptions.update(topics)
248277

@@ -270,6 +299,19 @@ def unsubscribe_all(self):
270299
with self.subscriptions_lock:
271300
self.unsubscribe(topics=self.subscriptions)
272301

302+
def _do_subscribe(
303+
self,
304+
*,
305+
topics: list[str],
306+
):
307+
opts = paho.mqtt.client.SubscribeOptions(
308+
qos=2,
309+
retainAsPublished=True,
310+
)
311+
self.client.subscribe(
312+
list(map(lambda t: (t, opts), self.subscriptions)),
313+
)
314+
273315
# In theory, we would not need this method, as we could very well
274316
# have set self.client.on_message = msg_cb and be done with
275317
# that. Having this intermediate __on_message() allows us to do
@@ -285,9 +327,13 @@ def __on_message(
285327
"kind": otel.SpanKind.CONSUMER,
286328
}
287329
try:
288-
properties = dict(message.properties.UserProperty)
289-
span_kwargs["span_links"] = [properties["traceparent"]]
290-
except Exception:
330+
properties = message.properties
331+
except TypeError:
332+
properties = None
333+
try:
334+
user_properties = dict(properties.UserProperty)
335+
span_kwargs["span_links"] = [user_properties["traceparent"]]
336+
except (AttributeError, KeyError):
291337
# There was ultimately no traceparent in that message, ignore
292338
pass
293339
with self.span_ctxmgr_cb(**span_kwargs) as span:
@@ -297,10 +343,22 @@ def __on_message(
297343
key="iot3.core.mqtt.payload_size",
298344
value=len(message.payload),
299345
)
346+
if message.retain:
347+
span.set_attribute(key="iot3.core.mqtt.retain", value=True)
348+
try:
349+
retain = properties.MessageExpiryInterval
350+
if retain < 1:
351+
# Glitch, it should not happen: can't be zero
352+
retain = 1
353+
except AttributeError:
354+
retain = True
355+
else:
356+
retain = False
300357
self.msg_cb(
301358
data=self.msg_cb_data,
302359
topic=message.topic,
303360
payload=message.payload,
361+
retain=retain,
304362
)
305363

306364
def __on_connect(
@@ -313,6 +371,4 @@ def __on_connect(
313371
):
314372
with self.subscriptions_lock:
315373
if self.subscriptions:
316-
self.client.subscribe(
317-
list(map(lambda t: (t, 0), self.subscriptions)),
318-
)
374+
self._do_subscribe(topics=self.subscriptions)

python/its-interqueuemanager/its-iqm.cfg

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,41 @@ socket-path = /run/mosquitto/mqtt.socket
2727
# Name of the local inter-queue; default: interQueue
2828
#interqueue = interQueue
2929

30+
# Sections which name starts with 'filtering.' (with the dot) followed
31+
# by an arbitrary identifier, define filtering rules and actions.
32+
#[filter.NAME]
33+
# The prefix or regexp to match topic against; only one kind of match can
34+
# be specified for a given filter (either by prefix or by regexp), but
35+
# multiple such match can be specified with a multi-line value.
36+
# Placeholders {{XXX}} will be replaced before attempting a match:
37+
# - {{instance-id}}: replaced by general.instance-id
38+
# - {{prefix}}: if general.prefix is set, this placeholder is replaced
39+
# with it, with a trailing '/' appended; otherwise, an empty string;
40+
# - {{suffix}}: replaced by general.suffix if set, an empty string otherwise;
41+
# - {{inQueue}}, {{outQueue}}, {{interQueue}}: placeholders matching the
42+
# inQueue, outQueue, and interQueue, respectively.
43+
#in_prefix =
44+
# prefix-str-1
45+
# prefix-str-2
46+
#in_regex = regex-1
47+
# regex-2
48+
#out_prefix = prefix-str
49+
#out_regex = regex
50+
# What to do with the message:
51+
# - drop the message if the topic matches:
52+
#drop
53+
# - set the message as retained or as not retained:
54+
#retain = True / False
55+
# - set the message as retained and set property Message Expiry Interval
56+
# to the specified value (in seconds):
57+
#retain = INT
58+
# - set the message as retained and set property Message Expiry Interval
59+
# to the value from the JSON payload object at the specified dotted
60+
# path; if the path does not exist or is not an integer, use the
61+
# fallback value if specified, otherwise same as if retain was not
62+
# specified:
63+
#retain = json:DOTTED.PATH.IN.JSON.PAYLOAD [FALLBACK_INT]
64+
3065
[authority]
3166
# Type of central authority, one of: file, http, mqtt
3267
type = file
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Software Name: its-interqueuemanager
2+
# SPDX-FileCopyrightText: Copyright (c) 2025 Orange
3+
# SPDX-License-Identifier: MIT
4+
# Author: Yann E. MORIN <yann.morin@orange.com>
5+
6+
import itertools
7+
import json
8+
import logging
9+
import re
10+
11+
12+
class Filter:
13+
def __init__(
14+
self,
15+
*,
16+
name: str,
17+
filter_cfg: dict,
18+
instance_id: str,
19+
prefix: str,
20+
suffix: str,
21+
queues: dict,
22+
):
23+
self.name = name
24+
25+
placeholders = {
26+
"instance-id": instance_id,
27+
"prefix": prefix,
28+
"suffix": suffix,
29+
}
30+
placeholders.update(queues)
31+
32+
self._patterns = None
33+
for filter_type, filter_kind in itertools.product(
34+
["in", "out"],
35+
["prefix", "regex"],
36+
):
37+
try:
38+
patterns = filter_cfg[filter_type + "_" + filter_kind]
39+
except KeyError:
40+
continue
41+
42+
if self._patterns is not None:
43+
raise ValueError(f"Filter {name} defines multiple patterns")
44+
patterns = list(filter(None, patterns.split("\n")))
45+
for ph in placeholders:
46+
patterns = list(
47+
map(
48+
lambda s: s.replace("{{" + ph + "}}", placeholders[ph]),
49+
patterns,
50+
),
51+
)
52+
if filter_kind == "regex":
53+
patterns = list(map(re.compile, patterns))
54+
55+
self._patterns = patterns
56+
self._type = filter_type
57+
self._kind = filter_kind
58+
59+
if patterns is None:
60+
raise ValueError(f"Filter '{name}' does not define patterns")
61+
62+
self._drop = "drop" in filter_cfg
63+
64+
match filter_cfg.get("retain"):
65+
case None:
66+
self._retain = None
67+
case "True" | "true" | "False" | "false" as b:
68+
self._retain = b in ["True", "true"]
69+
case str(s) if re.match("^\d+$", s):
70+
self._retain = int(s)
71+
case str(s) if s.startswith("json:") and re.match(".* \d+$", s):
72+
path, fallback = s[5:].split(" ")
73+
self._retain = {
74+
"path": path.split("."),
75+
"fallback": int(fallback),
76+
}
77+
case str(s) if s.startswith("json:"):
78+
self._retain = {
79+
"path": s[5:].split("."),
80+
"fallback": None,
81+
}
82+
case _v:
83+
raise ValueError(f"Unable to parse retain value '{_v}'")
84+
85+
logging.debug(f"Created new filter {name}:")
86+
logging.debug(f" - type: {self._type}")
87+
logging.debug(f" - kind: {self._kind}")
88+
logging.debug(f" - patterns: {self._patterns}")
89+
logging.debug(f" - drop: {self._drop}")
90+
logging.debug(f" - retain: {self._retain}")
91+
92+
@property
93+
def type(self):
94+
return self._type
95+
96+
def apply(
97+
self,
98+
*,
99+
topic: str,
100+
payload: bytes,
101+
retain: bool | int,
102+
):
103+
for pattern in self._patterns:
104+
if self._kind == "prefix" and topic.startswith(pattern):
105+
break
106+
if self._kind == "regex" and pattern.match(topic):
107+
break
108+
else:
109+
logging.debug(f"{self.name}[{self._type}]: no match for {topic}")
110+
return topic, payload, retain
111+
112+
logging.debug(
113+
f"{self.name}[{self._type}]: match for {topic} with {self._kind} {pattern}"
114+
)
115+
116+
if self._drop:
117+
logging.debug(f"{self.name}: - dropped")
118+
return None, None, None
119+
120+
match self._retain:
121+
case None:
122+
logging.debug("{self.name}: - retain: None")
123+
case bool(b):
124+
logging.debug(f"{self.name}: - retain: {b}")
125+
retain = b
126+
case int(i):
127+
logging.debug(f"{self.name}: - retain: {i}")
128+
retain = i
129+
case {"path": path, "fallback": fallback}:
130+
logging.debug(f"{self.name}: - retain: {path}")
131+
try:
132+
data = json.loads(payload)
133+
for p in path:
134+
data = data[p]
135+
except (json.decoder.JSONDecodeError, KeyError, TypeError):
136+
if fallback is not None:
137+
logging.debug(f"{self.name}: - using fallback {fallback}")
138+
retain = fallback
139+
else:
140+
retain = data
141+
142+
logging.debug(f"{self.name}: Result of filter:")
143+
logging.debug(f"{self.name}: - topic: {topic}")
144+
logging.debug(f"{self.name}: - payload: {payload}")
145+
logging.debug(f"{self.name}: - retain: {retain}")
146+
147+
return topic, payload, retain

0 commit comments

Comments
 (0)