Skip to content

binding-mcp-mqtt: implement mcp_mqtt · proxy binding #1728

@jfallows

Description

@jfallows

Summary

Implement a new mcp_mqtt · proxy binding in a new runtime/binding-mcp-mqtt/
module. Accepts mcp streams from mcp · proxy and produces mqtt streams to
a single mqtt · client exit. Exposes MQTT pub/sub operations as MCP tools and
resource templates, making MQTT topics accessible to AI agents without exposing
wire-protocol details.

Aggregation across multiple brokers or topic namespaces is handled by
mcp · proxy upstream — mcp_mqtt enforces a single exit and does not use
routes.

Stream type

  • Accepts: mcp
  • Produces: mqtt

Module

runtime/binding-mcp-mqtt/ and specs/binding-mcp-mqtt.spec/

Follows the same module structure as binding-mcp-kafka.

MCP tools

publish

Publish a message to an MQTT topic.

Produces a MqttPublishBeginEx stream followed by MqttPublishDataEx payload.

MqttPublishDataEx field mapping:

MqttPublishDataEx {
    deferred:       0,
    qos:            <tool input qos>,
    flags:          RETAIN if retain=true, else 0,
    packetId:       0,
    expiryInterval: -1,
    contentType:    null,
    format:         TEXT if payload is UTF-8 string, else BINARY,
    responseTopic:  null,
    correlation:    empty,
    properties:     []
}

On success, returns an empty MCP tool result content array.

inputSchema:

{
  "type": "object",
  "properties": {
    "topic":   { "type": "string", "description": "MQTT topic to publish to" },
    "payload": { "type": "string", "description": "Message payload" },
    "qos":     { "type": "integer", "enum": [0, 1, 2], "default": 0 },
    "retain":  { "type": "boolean", "default": false }
  },
  "required": ["topic", "payload"]
}

Maps to MqttPublishBeginEx:

MqttPublishBeginEx {
    clientId: <session identity field from McpBeginEx — read #1668 merged code>,
    topic:    <tool input topic>,
    flags:    RETAIN if retain=true, else 0,
    qos:      <tool input qos>
}

If options.publish is configured, the binding rejects publish calls whose
topic does not match any configured pattern, returning an MCP tool error
before opening any mqtt stream.

receive

Subscribe to an MQTT topic filter and receive up to limit messages.

Produces a MqttSubscribeBeginEx stream, collects MqttSubscribeDataEx frames
until limit messages are received or the stream ends, closes the subscribe
stream, and returns the collected messages as an array. Bounded and
request/response style — not a persistent subscription.

inputSchema:

{
  "type": "object",
  "properties": {
    "topic": {
      "type": "string",
      "description": "MQTT topic filter — supports + and # wildcards"
    },
    "limit": {
      "type": "integer",
      "description": "Maximum number of messages to receive",
      "default": 10
    }
  },
  "required": ["topic"]
}

Maps to MqttSubscribeBeginEx:

MqttSubscribeBeginEx {
    clientId: <session identity field from McpBeginEx — read #1668 merged code>,
    qos:      AT_LEAST_ONCE,
    filters: [
        MqttTopicFilter {
            pattern: <tool input topic>,
            qos:     AT_LEAST_ONCE,
            flags:   0
        }
    ]
}

Output: array of objects derived from MqttSubscribeDataEx frames, up to limit:

{
    topic:   MqttSubscribeDataEx.topic,
    payload: DATA frame payload bytes as UTF-8 string,
    qos:     MqttSubscribeDataEx.qos,
    retain:  (MqttSubscribeDataEx.flags >> MqttSubscribeFlags.RETAIN) & 1
}

If options.subscribe is configured, the binding rejects receive calls whose
topic does not match any configured pattern.

MCP resources

The binding exposes MQTT topics as MCP resource templates. The mqtt:// URI
prefix is added by the binding when constructing MCP resource URIs from the
bare topic patterns in options.subscribe.

resources/list

Returns MCP resource templates derived from options.subscribe patterns.
The mqtt:// prefix is prepended to each bare topic pattern as-is — MQTT
wildcards (+, #) are preserved unchanged in the URI template:

{
  "resourceTemplates": [
    { "uriTemplate": "mqtt://sensors/temperature/+" },
    { "uriTemplate": "mqtt://sensors/humidity/+" }
  ]
}

Resolution is mechanical in both directions: strip mqtt:// to get the MQTT
topic; prepend mqtt:// to get the resource URI. No parameter extraction or
RFC 6570 variable substitution is involved.

If options.subscribe is absent, resources/list returns an empty template
list — MQTT has no native topic discovery mechanism.

resources/read

Opens a MqttSubscribeBeginEx stream for the concrete topic derived by
stripping mqtt:// from the resource URI, waits for the first message
(retained or otherwise), returns it, then closes the stream.

If no message arrives within the configured timeout, returns a clear error:
"No message received on topic '{topic}' within {timeout}ms".

Timeout is configurable via options.readTimeout (default: 5000ms).

resources/subscribe

Opens a persistent MqttSubscribeBeginEx stream for the topic derived from
the resource URI and forwards each MqttSubscribeDataEx frame as an MCP
notifications/resources/updated event to the MCP client. When the MCP
client unsubscribes, an END frame arrives on the mcp stream — the binding
responds by sending END on the mqtt subscribe stream to close it cleanly.

If options.subscribe is configured, resource URIs not matching any configured
pattern are rejected before opening any mqtt stream.

Configuration

mcp_mqtt0:
  type: mcp_mqtt
  kind: proxy
  options:
    subscribe:
      - sensors/temperature/+
      - sensors/humidity/+
    publish:
      - commands/+/set
      - devices/+/control
    readTimeout: 5000
  exit: mqtt_client0

options.subscribe (optional)

Array of bare MQTT topic patterns (supporting + and # wildcards) that
constrain which topics may be subscribed to via the receive tool and
resources/read / resources/subscribe. Also drives resources/list
the binding advertises one MCP resource template per configured pattern.

When absent: no topic constraint on subscribe operations; resources/list
returns an empty template list.

options.publish (optional)

Array of bare MQTT topic patterns that constrain which topics the publish
tool will accept. When absent: no topic constraint on publish operations.

options.readTimeout (optional)

Timeout in milliseconds for resources/read when no message is immediately
available. Default: 5000.

Both subscribe and publish absent means fully open — all topic access is
then constrained downstream by mqtt · client guarded routes and broker ACLs.

Single exit

mcp_mqtt enforces a single exit — no routes. Aggregation across multiple
brokers or topic namespaces is handled by mcp · proxy upstream, which
federates multiple mcp_mqtt bindings. Each mcp_mqtt binding contributes
its own resource templates and tools to the federated MCP view.

Spec test coverage

Add scenarios to specs/binding-mcp-mqtt.spec/ covering:

  • publish tool → correct MqttPublishBeginEx + MqttPublishDataEx
  • publish with retain: trueMqttPublishBeginEx flags includes RETAIN
  • publish rejected when topic does not match options.publish pattern
  • receive tool → MqttSubscribeBeginEx with correct topic filter, returns
    messages up to limit, stream closed after limit
  • receive with wildcard filter (+, #) → correct MqttTopicFilter.pattern
  • receive rejected when topic does not match options.subscribe pattern
  • resources/list → returns correct URI templates from options.subscribe
  • resources/read → subscribe stream opened, first message returned, stream closed
  • resources/read timeout → clear error returned when no message within timeout
  • resources/subscribe → persistent stream, notifications/resources/updated
    forwarded per message
  • Resource URI rejected when not matching options.subscribe pattern

Dependencies

Additional context

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions