Amazon MSK Connect simplifies the deployment, monitoring, and automatic scaling of connectors that transfer data between Apache Kafka clusters and external systems such as databases, file systems, and search indices, It is fully compatible with Kafka Connect and supports Amazon MSK, Apache Kafka and Apache Kafka compatible clusters. MSK Connect uses custom plugins as the container to implement custom business logic.
Custom connect plugins use JMX to expose runtime metrics. While MSK Connect sends a set of connector metrics to Amazon CloudWatch, it currently does not support exporting the JMX metrics emitted by the connector plugins natively. These metrics can be exported by modifying the custom connect plugin code directly.
A custom connector that exports Debezium MySQL and MongoDB metrics to Amazon CloudWatch, enabling real-time monitoring of database change data capture (CDC) operations in AWS Managed Streaming for Apache Kafka (MSK) environments.
While Debezium provides powerful CDC capabilities, monitoring its operations in AWS MSK environments presents several challenges:
- Limited Visibility: Standard Debezium connectors expose metrics through JMX, but these aren't readily accessible in managed Kafka environments
- Monitoring Gap: Without direct access to JMX metrics, it's difficult to track critical metrics like replication lag and CDC progress
- Integration Needs: AWS MSK users need CloudWatch integration for consistent monitoring across their AWS infrastructure
This project provides specialized connectors that extend Debezium MySQL and MongoDB Kafka connectors to capture operational metrics and publish them to Amazon CloudWatch. It supports multiple versions of Debezium (2.5.2, 2.7.3) and MongoDB (1.10.0), allowing users to monitor the health and performance of their CDC pipelines through CloudWatch metrics.
The connectors capture essential debezium JMX metrics like streaming metrics, snapshot metrics, and schema history metrics, providing comprehensive visibility into the CDC process.
.
├── custom_module/ # Root directory for all connector modules
├── debezium/ # Debezium connector implementations
│ ├── 2.5.2/ # Debezium 2.5.2 implementation
│ │ └── src/ # Source code for 2.5.2 version
│ └── 2.7.3/ # Debezium 2.7.3 implementation
│ └── src/ # Source code for 2.7.3 version
└── mongodb/ # MongoDB connector implementation
└── 1.10.0/ # MongoDB 1.10.0 implementation
└── src/ # Source code for MongoDB connector
- Java Development Kit (JDK) 11 or later
- Apache Maven 3.6.0 or later
- AWS Account with CloudWatch access
- Apache Kafka Connect runtime environment
- Access to AWS MSK cluster
- Appropriate IAM permissions for CloudWatch metrics publishing
- OptionA : Download pre-packaged plugins
- OptionB : Rebuild the plugin with a new JAR with your custom logic
- Download pre-packaged plugins
Download the required connector plugin(s) directly from plugin directory (available as zip files):
# For Debezium 2.7.3
custom_module/debezium/2.7.3/target/msk-debezium-mysql-metrics-connector-0.0.3-SNAPSHOT.jar
# For Debezium 2.5.2
custom_module/debezium/2.5.2/target/msk-debezium-mysql-metrics-connector-0.0.1-SNAPSHOT.jar- Upload them to S3 and configure your MSK connector pointing to the plugin uploaded to S3 along with relevant MSK connector configuration properties.
- Clone the repository:
git clone [repository-url]
cd custom_module- Build the desired connector version:
For Debezium 2.7.3:
cd debezium/2.7.3
mvn clean packageFor MongoDB 1.10.0:
cd mongodb/1.10.0
mvn clean package- Prepare a custom plugin (a zip file) by adding the generated JAR file and related debezium connector dependencies to be uploaded to S3. Refer to the respective pre-packaged plugin zip file in the repository for dependecies reference or follow the connector specific README file for further details.
Configure the respective connectors with required properties:
For Debezium MySQL:
{
"connector.class": "com.amazonaws.msk.debezium.mysql.connect.DebeziumMySqlMetricsConnector",
"database.server.name": "my-mysql-server",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz-password",
"connect.jmx.port": "7098",
"cw.namespace": "MyDebeziumMetrics",
"cw.region": "us-east-1"
}For MongoDB:
{
"connector.class": "com.amazonaws.msk.kafka.mongodb.connect.KafkaMongodbMetricsSinkConnector",
"connection.uri": "mongodb://mongodb-host:27017",
"database": "mydb",
"connect.jmx.port": "1098",
"cw.namespace": "MyMongoMetrics",
"cw.region": "us-east-1"
}For detailed deployment instructions and configuration options, please refer to the version-specific README files:
- Debezium 2.5.2: custom_module/debezium/2.5.2/README.md
- Debezium 2.7.3: custom_module/debezium/2.7.3/README.md
- MongoDB 1.10.0: custom_module/mongodb/1.10.0/README.md
Configuring metric filters for Debezium 2.7.3:
{
"cw.debezium.stream.metrics.include": "MilliSecondsBehindSource,NumberOfCommittedTransactions",
"cw.debezium.snapshot.metrics.include": "RemainingTableCount,TotalTableCount",
"cw.debezium.schema.history.metrics.include": "LastDatabaseOffset,LastProcessedTimestamp"
}Common issues:
- JMX Connection Failures:
Error: Unable to connect to JMX server
Solution:
- Verify JMX port is available and not blocked by firewall
- Check JMX URL format in logs
- Ensure proper permissions for JMX operations
- CloudWatch Metrics Not Appearing:
- Verify IAM permissions for CloudWatch
- Check CloudWatch namespace configuration
- Ensure metrics are being collected (check connector logs)
- Verify region configuration matches your CloudWatch region
The connector captures metrics from Debezium and MongoDB connectors through JMX, processes them, and publishes to CloudWatch. This enables real-time monitoring of CDC operations.
Source DB (MySQL/MongoDB) --> Debezium/MongoDB Connector --> JMX Metrics Collection --> CloudWatch Metrics
|
v
Kafka Connect
(MSK Cluster)
Key component interactions:
- Connector initializes JMX server on specified port
- JMXMetricsExporter runs on scheduled intervals
- Metrics are collected from MBeans
- CloudWatch client publishes metrics with configured dimensions
- Metrics are organized by database/server name in CloudWatch