Skip to content

Commit 3173ea4

Browse files
committed
PubSub: add support for Kafka (#2259)
1 parent 4279841 commit 3173ea4

File tree

6 files changed

+142
-4
lines changed

6 files changed

+142
-4
lines changed

docs/source/pubsub.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ Brokers
8585

8686
The following protocols are supported:
8787

88+
.. note::
89+
90+
Pub/Sub client dependencies will vary based on the selected broker. ``requirements-pubsub.txt`` contains all requirements for supported brokers, as a reference point.
91+
92+
8893
MQTT
8994
^^^^
9095

@@ -99,6 +104,23 @@ Example directive:
99104
channel: messages/a/data # optional
100105
hidden: false # default
101106
107+
Kafka
108+
^^^^^
109+
110+
Example directive:
111+
112+
.. code-block:: yaml
113+
114+
pubsub:
115+
name: Kafka
116+
broker:
117+
url: tcp://localhost:9092
118+
channel: messages-a-data
119+
# if using authentication:
120+
# sasl_mechanism: PLAIN # default PLAIN
121+
# sasl_security_protocol: SASL_PLAINTEXT # default SASL_PLAINTEXT
122+
hidden: true # default false
123+
102124
HTTP
103125
^^^^
104126

@@ -113,12 +135,16 @@ Example directive:
113135
channel: messages-a-data # optional
114136
hidden: true # default false
115137
138+
Additional information
139+
----------------------
140+
116141
.. note::
117142

118143
For any Pub/Sub endpoints requiring authentication, encode the ``url`` value as follows:
119144

120145
* ``mqtt://username:password@localhost:1883``
121146
* ``https://username:password@localhost``
147+
* ``tcp://username:password@localhost:9092``
122148

123149
As with any section of the pygeoapi configuration, environment variables may be used as needed, for example
124150
to set username/password information in a URL. If ``pubsub.broker.url`` contains authentication, and
@@ -131,5 +157,6 @@ Example directive:
131157

132158
If a ``channel`` is not defined, only the relevant OGC API endpoint is used.
133159

160+
134161
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html
135162
.. _`AsyncAPI`: https://www.asyncapi.com

pygeoapi/plugin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
},
8989
'pubsub': {
9090
'HTTP': 'pygeoapi.pubsub.http.HTTPPubSubClient',
91+
'Kafka': 'pygeoapi.pubsub.kafka.KafkaPubSubClient',
9192
'MQTT': 'pygeoapi.pubsub.mqtt.MQTTPubSubClient'
9293
}
9394
}

pygeoapi/pubsub/http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
class HTTPPubSubClient(BasePubSubClient):
4242
"""HTTP client"""
4343

44-
def __init__(self, broker_url):
44+
def __init__(self, publisher_def):
4545
"""
4646
Initialize object
4747
@@ -50,7 +50,7 @@ def __init__(self, broker_url):
5050
:returns: pygeoapi.pubsub.http.HTTPPubSubClient
5151
"""
5252

53-
super().__init__(broker_url)
53+
super().__init__(publisher_def)
5454
self.name = 'HTTP'
5555
self.type = 'http'
5656
self.auth = None

pygeoapi/pubsub/kafka.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <tomkralidis@gmail.com>
4+
#
5+
# Copyright (c) 2026 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import logging
31+
32+
from kafka import errors, KafkaProducer
33+
34+
from pygeoapi.pubsub.base import BasePubSubClient, PubSubClientConnectionError
35+
from pygeoapi.util import to_json
36+
37+
LOGGER = logging.getLogger(__name__)
38+
39+
40+
class KafkaPubSubClient(BasePubSubClient):
41+
"""Kafka client"""
42+
43+
def __init__(self, publisher_def):
44+
"""
45+
Initialize object
46+
47+
:param publisher_def: provider definition
48+
49+
:returns: pygeoapi.pubsub.kafka.KafkaPubSubClient
50+
"""
51+
52+
super().__init__(publisher_def)
53+
self.name = 'Kafka'
54+
self.type = 'kafka'
55+
self.sasl_mechanism = publisher_def.get('sasl.mechanism', 'PLAIN')
56+
self.security_protocol = publisher_def.get('security.protocol', 'SASL_SSL') # noqa
57+
58+
msg = f'Initializing to broker {self.broker_safe_url} with id {self.client_id}' # noqa
59+
LOGGER.debug(msg)
60+
61+
def connect(self) -> None:
62+
"""
63+
Connect to an Kafka broker
64+
65+
:returns: None
66+
"""
67+
68+
args = {
69+
'bootstrap_servers': f'{self.broker_url.hostname}:{self.broker_url.port}', # noqa
70+
'client_id': self.client_id,
71+
'value_serializer': lambda v: to_json(v).encode('utf-8')
72+
}
73+
if None not in [self.broker_url.username, self.broker_url.password]:
74+
args.update({
75+
'security.protocol': self.security_protocol,
76+
'sasl.mechanism': self.sasl_mechanism,
77+
'sasl.username': self.broker_url.username,
78+
'sasl.password': self.broker_url.password
79+
})
80+
81+
LOGGER.debug('Creating Kafka producer')
82+
try:
83+
self.producer = KafkaProducer(**args)
84+
except errors.NoBrokersAvailable as err:
85+
raise PubSubClientConnectionError(err)
86+
87+
def pub(self, channel: str, message: str) -> bool:
88+
"""
89+
Publish a message to a broker/channel
90+
91+
:param channel: `str` of topic
92+
:param message: `str` of message
93+
94+
:returns: `bool` of publish result
95+
"""
96+
97+
LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
98+
LOGGER.debug(f'Channel: {channel}')
99+
LOGGER.debug(f'Message: {message}')
100+
LOGGER.debug('Sanitizing channel for HTTP')
101+
channel = channel.replace('/', '-')
102+
channel = channel.replace(':', '-')
103+
LOGGER.debug(f'Sanitized channel for Kafka: {channel}')
104+
105+
self.producer.send(channel, value=message)
106+
self.producer.flush()
107+
108+
def __repr__(self):
109+
return f'<HTTPPubSubClient> {self.broker_safe_url}'

pygeoapi/pubsub/mqtt.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
class MQTTPubSubClient(BasePubSubClient):
4040
"""MQTT client"""
4141

42-
def __init__(self, broker_url):
42+
def __init__(self, publisher_def):
4343
"""
4444
Initialize object
4545
@@ -48,7 +48,7 @@ def __init__(self, broker_url):
4848
:returns: pycsw.pubsub.mqtt.MQTTPubSubClient
4949
"""
5050

51-
super().__init__(broker_url)
51+
super().__init__(publisher_def)
5252
self.type = 'mqtt'
5353
self.port = self.broker_url.port
5454

requirements-pubsub.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
kafka-python
12
paho-mqtt

0 commit comments

Comments
 (0)