Summary
Implement a new mcp_mqtt · proxy binding in a new runtime/binding-mcp-mqtt/
module. Accepts mcp streams from mcp · proxy and produces mqtt streams to
a single mqtt · client exit. Exposes MQTT pub/sub operations as MCP tools and
resource templates, making MQTT topics accessible to AI agents without exposing
wire-protocol details.
Aggregation across multiple brokers or topic namespaces is handled by
mcp · proxy upstream — mcp_mqtt enforces a single exit and does not use
routes.
Stream type
- Accepts:
mcp
- Produces:
mqtt
Module
runtime/binding-mcp-mqtt/ and specs/binding-mcp-mqtt.spec/
Follows the same module structure as binding-mcp-kafka.
MCP tools
publish
Publish a message to an MQTT topic.
Produces a MqttPublishBeginEx stream followed by MqttPublishDataEx payload.
MqttPublishDataEx field mapping:
MqttPublishDataEx {
deferred: 0,
qos: <tool input qos>,
flags: RETAIN if retain=true, else 0,
packetId: 0,
expiryInterval: -1,
contentType: null,
format: TEXT if payload is UTF-8 string, else BINARY,
responseTopic: null,
correlation: empty,
properties: []
}
On success, returns an empty MCP tool result content array.
inputSchema:
{
"type": "object",
"properties": {
"topic": { "type": "string", "description": "MQTT topic to publish to" },
"payload": { "type": "string", "description": "Message payload" },
"qos": { "type": "integer", "enum": [0, 1, 2], "default": 0 },
"retain": { "type": "boolean", "default": false }
},
"required": ["topic", "payload"]
}
Maps to MqttPublishBeginEx:
MqttPublishBeginEx {
clientId: <session identity field from McpBeginEx — read #1668 merged code>,
topic: <tool input topic>,
flags: RETAIN if retain=true, else 0,
qos: <tool input qos>
}
If options.publish is configured, the binding rejects publish calls whose
topic does not match any configured pattern, returning an MCP tool error
before opening any mqtt stream.
receive
Subscribe to an MQTT topic filter and receive up to limit messages.
Produces a MqttSubscribeBeginEx stream, collects MqttSubscribeDataEx frames
until limit messages are received or the stream ends, closes the subscribe
stream, and returns the collected messages as an array. Bounded and
request/response style — not a persistent subscription.
inputSchema:
{
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "MQTT topic filter — supports + and # wildcards"
},
"limit": {
"type": "integer",
"description": "Maximum number of messages to receive",
"default": 10
}
},
"required": ["topic"]
}
Maps to MqttSubscribeBeginEx:
MqttSubscribeBeginEx {
clientId: <session identity field from McpBeginEx — read #1668 merged code>,
qos: AT_LEAST_ONCE,
filters: [
MqttTopicFilter {
pattern: <tool input topic>,
qos: AT_LEAST_ONCE,
flags: 0
}
]
}
Output: array of objects derived from MqttSubscribeDataEx frames, up to limit:
{
topic: MqttSubscribeDataEx.topic,
payload: DATA frame payload bytes as UTF-8 string,
qos: MqttSubscribeDataEx.qos,
retain: (MqttSubscribeDataEx.flags >> MqttSubscribeFlags.RETAIN) & 1
}
If options.subscribe is configured, the binding rejects receive calls whose
topic does not match any configured pattern.
MCP resources
The binding exposes MQTT topics as MCP resource templates. The mqtt:// URI
prefix is added by the binding when constructing MCP resource URIs from the
bare topic patterns in options.subscribe.
resources/list
Returns MCP resource templates derived from options.subscribe patterns.
The mqtt:// prefix is prepended to each bare topic pattern as-is — MQTT
wildcards (+, #) are preserved unchanged in the URI template:
{
"resourceTemplates": [
{ "uriTemplate": "mqtt://sensors/temperature/+" },
{ "uriTemplate": "mqtt://sensors/humidity/+" }
]
}
Resolution is mechanical in both directions: strip mqtt:// to get the MQTT
topic; prepend mqtt:// to get the resource URI. No parameter extraction or
RFC 6570 variable substitution is involved.
If options.subscribe is absent, resources/list returns an empty template
list — MQTT has no native topic discovery mechanism.
resources/read
Opens a MqttSubscribeBeginEx stream for the concrete topic derived by
stripping mqtt:// from the resource URI, waits for the first message
(retained or otherwise), returns it, then closes the stream.
If no message arrives within the configured timeout, returns a clear error:
"No message received on topic '{topic}' within {timeout}ms".
Timeout is configurable via options.readTimeout (default: 5000ms).
resources/subscribe
Opens a persistent MqttSubscribeBeginEx stream for the topic derived from
the resource URI and forwards each MqttSubscribeDataEx frame as an MCP
notifications/resources/updated event to the MCP client. When the MCP
client unsubscribes, an END frame arrives on the mcp stream — the binding
responds by sending END on the mqtt subscribe stream to close it cleanly.
If options.subscribe is configured, resource URIs not matching any configured
pattern are rejected before opening any mqtt stream.
Configuration
mcp_mqtt0:
type: mcp_mqtt
kind: proxy
options:
subscribe:
- sensors/temperature/+
- sensors/humidity/+
publish:
- commands/+/set
- devices/+/control
readTimeout: 5000
exit: mqtt_client0
options.subscribe (optional)
Array of bare MQTT topic patterns (supporting + and # wildcards) that
constrain which topics may be subscribed to via the receive tool and
resources/read / resources/subscribe. Also drives resources/list —
the binding advertises one MCP resource template per configured pattern.
When absent: no topic constraint on subscribe operations; resources/list
returns an empty template list.
options.publish (optional)
Array of bare MQTT topic patterns that constrain which topics the publish
tool will accept. When absent: no topic constraint on publish operations.
options.readTimeout (optional)
Timeout in milliseconds for resources/read when no message is immediately
available. Default: 5000.
Both subscribe and publish absent means fully open — all topic access is
then constrained downstream by mqtt · client guarded routes and broker ACLs.
Single exit
mcp_mqtt enforces a single exit — no routes. Aggregation across multiple
brokers or topic namespaces is handled by mcp · proxy upstream, which
federates multiple mcp_mqtt bindings. Each mcp_mqtt binding contributes
its own resource templates and tools to the federated MCP view.
Spec test coverage
Add scenarios to specs/binding-mcp-mqtt.spec/ covering:
publish tool → correct MqttPublishBeginEx + MqttPublishDataEx
publish with retain: true → MqttPublishBeginEx flags includes RETAIN
publish rejected when topic does not match options.publish pattern
receive tool → MqttSubscribeBeginEx with correct topic filter, returns
messages up to limit, stream closed after limit
receive with wildcard filter (+, #) → correct MqttTopicFilter.pattern
receive rejected when topic does not match options.subscribe pattern
resources/list → returns correct URI templates from options.subscribe
resources/read → subscribe stream opened, first message returned, stream closed
resources/read timeout → clear error returned when no message within timeout
resources/subscribe → persistent stream, notifications/resources/updated
forwarded per message
- Resource URI rejected when not matching
options.subscribe pattern
Dependencies
Additional context
Summary
Implement a new
mcp_mqtt · proxybinding in a newruntime/binding-mcp-mqtt/module. Accepts
mcpstreams frommcp · proxyand producesmqttstreams toa single
mqtt · clientexit. Exposes MQTT pub/sub operations as MCP tools andresource templates, making MQTT topics accessible to AI agents without exposing
wire-protocol details.
Aggregation across multiple brokers or topic namespaces is handled by
mcp · proxyupstream —mcp_mqttenforces a single exit and does not useroutes.
Stream type
mcpmqttModule
runtime/binding-mcp-mqtt/andspecs/binding-mcp-mqtt.spec/Follows the same module structure as
binding-mcp-kafka.MCP tools
publishPublish a message to an MQTT topic.
Produces a
MqttPublishBeginExstream followed byMqttPublishDataExpayload.MqttPublishDataExfield mapping:On success, returns an empty MCP tool result content array.
inputSchema:
{ "type": "object", "properties": { "topic": { "type": "string", "description": "MQTT topic to publish to" }, "payload": { "type": "string", "description": "Message payload" }, "qos": { "type": "integer", "enum": [0, 1, 2], "default": 0 }, "retain": { "type": "boolean", "default": false } }, "required": ["topic", "payload"] }Maps to
MqttPublishBeginEx:If
options.publishis configured, the binding rejectspublishcalls whosetopicdoes not match any configured pattern, returning an MCP tool errorbefore opening any
mqttstream.receiveSubscribe to an MQTT topic filter and receive up to
limitmessages.Produces a
MqttSubscribeBeginExstream, collectsMqttSubscribeDataExframesuntil
limitmessages are received or the stream ends, closes the subscribestream, and returns the collected messages as an array. Bounded and
request/response style — not a persistent subscription.
inputSchema:
{ "type": "object", "properties": { "topic": { "type": "string", "description": "MQTT topic filter — supports + and # wildcards" }, "limit": { "type": "integer", "description": "Maximum number of messages to receive", "default": 10 } }, "required": ["topic"] }Maps to
MqttSubscribeBeginEx:Output: array of objects derived from
MqttSubscribeDataExframes, up tolimit:If
options.subscribeis configured, the binding rejectsreceivecalls whosetopicdoes not match any configured pattern.MCP resources
The binding exposes MQTT topics as MCP resource templates. The
mqtt://URIprefix is added by the binding when constructing MCP resource URIs from the
bare topic patterns in
options.subscribe.resources/listReturns MCP resource templates derived from
options.subscribepatterns.The
mqtt://prefix is prepended to each bare topic pattern as-is — MQTTwildcards (
+,#) are preserved unchanged in the URI template:{ "resourceTemplates": [ { "uriTemplate": "mqtt://sensors/temperature/+" }, { "uriTemplate": "mqtt://sensors/humidity/+" } ] }Resolution is mechanical in both directions: strip
mqtt://to get the MQTTtopic; prepend
mqtt://to get the resource URI. No parameter extraction orRFC 6570 variable substitution is involved.
If
options.subscribeis absent,resources/listreturns an empty templatelist — MQTT has no native topic discovery mechanism.
resources/readOpens a
MqttSubscribeBeginExstream for the concrete topic derived bystripping
mqtt://from the resource URI, waits for the first message(retained or otherwise), returns it, then closes the stream.
If no message arrives within the configured timeout, returns a clear error:
"No message received on topic '{topic}' within {timeout}ms".Timeout is configurable via
options.readTimeout(default: 5000ms).resources/subscribeOpens a persistent
MqttSubscribeBeginExstream for the topic derived fromthe resource URI and forwards each
MqttSubscribeDataExframe as an MCPnotifications/resources/updatedevent to the MCP client. When the MCPclient unsubscribes, an END frame arrives on the
mcpstream — the bindingresponds by sending END on the
mqttsubscribe stream to close it cleanly.If
options.subscribeis configured, resource URIs not matching any configuredpattern are rejected before opening any
mqttstream.Configuration
options.subscribe (optional)
Array of bare MQTT topic patterns (supporting
+and#wildcards) thatconstrain which topics may be subscribed to via the
receivetool andresources/read/resources/subscribe. Also drivesresources/list—the binding advertises one MCP resource template per configured pattern.
When absent: no topic constraint on subscribe operations;
resources/listreturns an empty template list.
options.publish (optional)
Array of bare MQTT topic patterns that constrain which topics the
publishtool will accept. When absent: no topic constraint on publish operations.
options.readTimeout (optional)
Timeout in milliseconds for
resources/readwhen no message is immediatelyavailable. Default:
5000.Both
subscribeandpublishabsent means fully open — all topic access isthen constrained downstream by
mqtt · clientguarded routes and broker ACLs.Single exit
mcp_mqttenforces a singleexit— no routes. Aggregation across multiplebrokers or topic namespaces is handled by
mcp · proxyupstream, whichfederates multiple
mcp_mqttbindings. Eachmcp_mqttbinding contributesits own resource templates and tools to the federated MCP view.
Spec test coverage
Add scenarios to
specs/binding-mcp-mqtt.spec/covering:publishtool → correctMqttPublishBeginEx+MqttPublishDataExpublishwithretain: true→MqttPublishBeginExflags includesRETAINpublishrejected when topic does not matchoptions.publishpatternreceivetool →MqttSubscribeBeginExwith correct topic filter, returnsmessages up to
limit, stream closed afterlimitreceivewith wildcard filter (+,#) → correctMqttTopicFilter.patternreceiverejected when topic does not matchoptions.subscribepatternresources/list→ returns correct URI templates fromoptions.subscriberesources/read→ subscribe stream opened, first message returned, stream closedresources/readtimeout → clear error returned when no message within timeoutresources/subscribe→ persistent stream,notifications/resources/updatedforwarded per message
options.subscribepatternDependencies
binding-mcp: implement mcp · server binding(binding-mcp: implement mcp · server binding #1668) — defines themcpstream type
binding-mqtt— existing; provides themqttstream type (no changes required)Additional context
binding-mcp-kafka(binding-mcp-kafka: implement mcp_kafka · proxy binding #1671)