feat(dsm): add kafka_cluster_id to segmentio/kafka-go integration#4477
feat(dsm): add kafka_cluster_id to segmentio/kafka-go integration#4477robcarlan-datadog wants to merge 7 commits intorob.carlan/dsm-kafka-cluster-idfrom
Conversation
76111c5 to
7d0d2b3
Compare
BenchmarksBenchmark execution time: 2026-03-05 21:21:38 Comparing candidate commit e39a9ab in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 157 metrics, 7 unstable metrics.
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files🚀 New features to boost your workflow:
|
7d0d2b3 to
4c9fc7d
Compare
68e76bf to
3ed2642
Compare
This comment has been minimized.
This comment has been minimized.
52c3314 to
01f3ee5
Compare
Move cluster ID async fetching and caching into internal/kafkaclusterid so it can be reused across kafka integrations (confluent, sarama, kafka-go). Replace blocking WaitForClusterID in Close() with context cancellation: Close() now cancels the fetch context and waits only for goroutine cleanup (microseconds) instead of blocking up to 2s for the network call. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…le access Move from internal/kafkaclusterid to instrumentation/kafkaclusterid so that contrib modules with separate go.mod files can import the shared package. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Auto-fetch the Kafka cluster ID via metadata request in WrapReader and WrapWriter. Plumb cluster ID through DSM edge tags, offset tracking, and span tags for both produce and consume paths. - Add ClusterID field to KafkaConfig - Add WithClusterID option for manual override - Auto-detect cluster ID from broker addresses in WrapReader/WrapWriter - Add kafka_cluster_id to DSM checkpoint edge tags - Use TrackKafkaCommitOffsetWithCluster for cluster-aware lag tracking - Tag produce and consume spans with messaging.kafka.cluster_id - Update existing tests and add TestReadMessageFunctionalWithClusterID Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use FetchClusterIDAsync/WaitForClusterID pattern for segmentio/kafka-go. Normalize bootstrap servers for cache key, reduce timeout from 10s to 2s, and use ClusterID() getter for thread-safe access. Close() waits for the goroutine to complete. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the public WithClusterID option since cluster ID is now auto-detected. Add Close() to KafkaWriter to wait for async fetch. Add WaitForClusterID calls in test helper for deterministic behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace duplicated cluster ID fetching, caching, and normalization code with the shared instrumentation/kafkaclusterid package. Use context-aware FetchAsync for cancellation support on Close. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
6dcd1dc to
f98f0a4
Compare
ac5efc0 to
e39a9ab
Compare
f98f0a4 to
e916411
Compare
2d54c45 to
6dec38b
Compare
Summary
kafka_cluster_idsupport to thesegmentio/kafka-goDSM integration, building on feat(dsm): add kafka_cluster_id to confluent-kafka-go #4470WrapReader/WrapWriter— no external API changes requiredChanges
KafkaConfig.ClusterIDfield +WithClusterIDoption (for manual override)fetchClusterID()usingkafka.Client.Metadata()— called automatically fromWrapReader/WrapWriterkafka_cluster_idadded to DSM checkpoint edge tags (produce + consume)TrackKafkaCommitOffsetWithClusterused for cluster-aware lag trackingmessaging.kafka.cluster_idspan tag on produce and consume spansTestReadMessageFunctionalWithClusterIDTest plan
TestReadMessageFunctionalWithClusterIDvalidates cluster ID in spans and DSM pathway hashesgo vet ./contrib/segmentio/kafka-go/...passes🤖 Generated with Claude Code