Skip to content

feat(dsm): add kafka_cluster_id to segmentio/kafka-go integration#4477

Open
robcarlan-datadog wants to merge 7 commits intorob.carlan/dsm-kafka-cluster-idfrom
rob.carlan/kafka-go-cluster-id
Open

feat(dsm): add kafka_cluster_id to segmentio/kafka-go integration#4477
robcarlan-datadog wants to merge 7 commits intorob.carlan/dsm-kafka-cluster-idfrom
rob.carlan/kafka-go-cluster-id

Conversation

@robcarlan-datadog
Copy link
Contributor

Summary

  • Adds kafka_cluster_id support to the segmentio/kafka-go DSM integration, building on feat(dsm): add kafka_cluster_id to confluent-kafka-go #4470
  • Auto-fetches the Kafka cluster ID via metadata request in WrapReader/WrapWriterno external API changes required
  • Plumbs cluster ID through DSM edge tags, offset tracking, and span tags

Changes

  • KafkaConfig.ClusterID field + WithClusterID option (for manual override)
  • fetchClusterID() using kafka.Client.Metadata() — called automatically from WrapReader/WrapWriter
  • kafka_cluster_id added to DSM checkpoint edge tags (produce + consume)
  • TrackKafkaCommitOffsetWithCluster used for cluster-aware lag tracking
  • messaging.kafka.cluster_id span tag on produce and consume spans
  • Existing tests updated to account for cluster ID; new TestReadMessageFunctionalWithClusterID

Test plan

  • Existing integration tests pass with cluster ID auto-detected
  • TestReadMessageFunctionalWithClusterID validates cluster ID in spans and DSM pathway hashes
  • go vet ./contrib/segmentio/kafka-go/... passes

🤖 Generated with Claude Code

@github-actions github-actions bot added the apm:ecosystem contrib/* related feature requests or bugs label Feb 26, 2026
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/kafka-go-cluster-id branch from 76111c5 to 7d0d2b3 Compare February 26, 2026 17:29
@pr-commenter
Copy link

pr-commenter bot commented Feb 26, 2026

Benchmarks

Benchmark execution time: 2026-03-05 21:21:38

Comparing candidate commit e39a9ab in PR branch rob.carlan/kafka-go-cluster-id with baseline commit f98f0a4 in branch rob.carlan/dsm-kafka-cluster-id.

Found 0 performance improvements and 0 performance regressions! Performance is the same for 157 metrics, 7 unstable metrics.

Explanation

This is an A/B test comparing a candidate commit's performance against that of a baseline commit. Performance changes are noted in the tables below as:

  • 🟩 = significantly better candidate vs. baseline
  • 🟥 = significantly worse candidate vs. baseline

We compute a confidence interval (CI) over the relative difference of means between metrics from the candidate and baseline commits, considering the baseline as the reference.

If the CI is entirely outside the configured SIGNIFICANT_IMPACT_THRESHOLD (or the deprecated UNCONFIDENCE_THRESHOLD), the change is considered significant.

Feel free to reach out to #apm-benchmarking-platform on Slack if you have any questions.

More details about the CI and significant changes

You can imagine this CI as a range of values that is likely to contain the true difference of means between the candidate and baseline commits.

CIs of the difference of means are often centered around 0%, because often changes are not that big:

---------------------------------(------|---^--------)-------------------------------->
                              -0.6%    0%  0.3%     +1.2%
                                 |          |        |
         lower bound of the CI --'          |        |
sample mean (center of the CI) -------------'        |
         upper bound of the CI ----------------------'

As described above, a change is considered significant if the CI is entirely outside the configured SIGNIFICANT_IMPACT_THRESHOLD (or the deprecated UNCONFIDENCE_THRESHOLD).

For instance, for an execution time metric, this confidence interval indicates a significantly worse performance:

----------------------------------------|---------|---(---------^---------)---------->
                                       0%        1%  1.3%      2.2%      3.1%
                                                  |   |         |         |
       significant impact threshold --------------'   |         |         |
                      lower bound of CI --------------'         |         |
       sample mean (center of the CI) --------------------------'         |
                      upper bound of CI ----------------------------------'

@codecov
Copy link

codecov bot commented Feb 26, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 59.86%. Comparing base (f98f0a4) to head (e39a9ab).

Additional details and impacted files

see 12 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/kafka-go-cluster-id branch from 7d0d2b3 to 4c9fc7d Compare February 26, 2026 18:38
@robcarlan-datadog robcarlan-datadog marked this pull request as ready for review February 26, 2026 18:46
@robcarlan-datadog robcarlan-datadog requested review from a team as code owners February 26, 2026 18:46
@robcarlan-datadog robcarlan-datadog added the AI Generated Largely based on code generated by an AI or LLM. This label is the same across all dd-trace-* repos label Mar 2, 2026
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/kafka-go-cluster-id branch from 68e76bf to 3ed2642 Compare March 4, 2026 16:15
@datadog-official

This comment has been minimized.

@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/kafka-go-cluster-id branch from 52c3314 to 01f3ee5 Compare March 4, 2026 17:58
robcarlan-datadog and others added 7 commits March 5, 2026 15:36
Move cluster ID async fetching and caching into internal/kafkaclusterid
so it can be reused across kafka integrations (confluent, sarama, kafka-go).

Replace blocking WaitForClusterID in Close() with context cancellation:
Close() now cancels the fetch context and waits only for goroutine cleanup
(microseconds) instead of blocking up to 2s for the network call.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…le access

Move from internal/kafkaclusterid to instrumentation/kafkaclusterid so
that contrib modules with separate go.mod files can import the shared
package.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Auto-fetch the Kafka cluster ID via metadata request in WrapReader and
WrapWriter. Plumb cluster ID through DSM edge tags, offset tracking,
and span tags for both produce and consume paths.

- Add ClusterID field to KafkaConfig
- Add WithClusterID option for manual override
- Auto-detect cluster ID from broker addresses in WrapReader/WrapWriter
- Add kafka_cluster_id to DSM checkpoint edge tags
- Use TrackKafkaCommitOffsetWithCluster for cluster-aware lag tracking
- Tag produce and consume spans with messaging.kafka.cluster_id
- Update existing tests and add TestReadMessageFunctionalWithClusterID

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use FetchClusterIDAsync/WaitForClusterID pattern for segmentio/kafka-go.
Normalize bootstrap servers for cache key, reduce timeout from 10s to
2s, and use ClusterID() getter for thread-safe access. Close() waits
for the goroutine to complete.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove the public WithClusterID option since cluster ID is now
auto-detected. Add Close() to KafkaWriter to wait for async fetch.
Add WaitForClusterID calls in test helper for deterministic behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace duplicated cluster ID fetching, caching, and normalization code
with the shared instrumentation/kafkaclusterid package. Use context-aware
FetchAsync for cancellation support on Close.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/dsm-kafka-cluster-id branch from 6dcd1dc to f98f0a4 Compare March 5, 2026 21:01
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/kafka-go-cluster-id branch from ac5efc0 to e39a9ab Compare March 5, 2026 21:01
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/dsm-kafka-cluster-id branch from f98f0a4 to e916411 Compare March 6, 2026 17:09
@robcarlan-datadog robcarlan-datadog requested a review from a team as a code owner March 6, 2026 17:09
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/dsm-kafka-cluster-id branch from 2d54c45 to 6dec38b Compare March 6, 2026 21:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI Generated Largely based on code generated by an AI or LLM. This label is the same across all dd-trace-* repos apm:ecosystem contrib/* related feature requests or bugs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants