Skip to content
20 changes: 10 additions & 10 deletions contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
}
opts = append(opts, WithConfig(conf))
wrapped := WrapConsumer(c, opts...)
wrapped.tracer.FetchClusterIDAsync(func() string {
wrapped.tracer.FetchClusterIDAsync(func(ctx context.Context) string {
return clusterIDFromConfigOrFetch(conf, func() string {
return fetchClusterIDFromConsumer(c)
return fetchClusterIDFromConsumer(ctx, c)
})
})
return wrapped, nil
Expand All @@ -57,22 +57,22 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
}
opts = append(opts, WithConfig(conf))
wrapped := WrapProducer(p, opts...)
wrapped.tracer.FetchClusterIDAsync(func() string {
wrapped.tracer.FetchClusterIDAsync(func(ctx context.Context) string {
return clusterIDFromConfigOrFetch(conf, func() string {
return fetchClusterIDFromProducer(p)
return fetchClusterIDFromProducer(ctx, p)
})
})
return wrapped, nil
}

func fetchClusterIDFromConsumer(c *kafka.Consumer) string {
func fetchClusterIDFromConsumer(ctx context.Context, c *kafka.Consumer) string {
admin, err := kafka.NewAdminClientFromConsumer(c)
if err != nil {
instr.Logger().Warn("failed to create admin client from consumer for cluster ID: %s", err)
return ""
}
defer admin.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
clusterID, err := admin.ClusterID(ctx)
if err != nil {
Expand All @@ -82,14 +82,14 @@ func fetchClusterIDFromConsumer(c *kafka.Consumer) string {
return clusterID
}

func fetchClusterIDFromProducer(p *kafka.Producer) string {
func fetchClusterIDFromProducer(ctx context.Context, p *kafka.Producer) string {
admin, err := kafka.NewAdminClientFromProducer(p)
if err != nil {
instr.Logger().Warn("failed to create admin client from producer for cluster ID: %s", err)
return ""
}
defer admin.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
clusterID, err := admin.ClusterID(ctx)
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
// Close calls the underlying Consumer.Close and if polling is enabled, finishes
// any remaining span.
func (c *Consumer) Close() error {
c.tracer.WaitForClusterID()
c.tracer.StopClusterIDFetch()
err := c.Consumer.Close()
// we only close the previous span if consuming via the events channel is
// not enabled, because otherwise there would be a data race from the
Expand Down Expand Up @@ -255,7 +255,7 @@ func (p *Producer) Events() chan kafka.Event {
// Close calls the underlying Producer.Close and also closes the internal
// wrapping producer channel.
func (p *Producer) Close() {
p.tracer.WaitForClusterID()
p.tracer.StopClusterIDFetch()
close(p.produceChannel)
p.Producer.Close()
}
Expand Down
20 changes: 10 additions & 10 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
}
opts = append(opts, WithConfig(conf))
wrapped := WrapConsumer(c, opts...)
wrapped.tracer.FetchClusterIDAsync(func() string {
wrapped.tracer.FetchClusterIDAsync(func(ctx context.Context) string {
return clusterIDFromConfigOrFetch(conf, func() string {
return fetchClusterIDFromConsumer(c)
return fetchClusterIDFromConsumer(ctx, c)
})
})
return wrapped, nil
Expand All @@ -57,22 +57,22 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
}
opts = append(opts, WithConfig(conf))
wrapped := WrapProducer(p, opts...)
wrapped.tracer.FetchClusterIDAsync(func() string {
wrapped.tracer.FetchClusterIDAsync(func(ctx context.Context) string {
return clusterIDFromConfigOrFetch(conf, func() string {
return fetchClusterIDFromProducer(p)
return fetchClusterIDFromProducer(ctx, p)
})
})
return wrapped, nil
}

func fetchClusterIDFromConsumer(c *kafka.Consumer) string {
func fetchClusterIDFromConsumer(ctx context.Context, c *kafka.Consumer) string {
admin, err := kafka.NewAdminClientFromConsumer(c)
if err != nil {
instr.Logger().Warn("failed to create admin client from consumer for cluster ID: %s", err)
return ""
}
defer admin.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
clusterID, err := admin.ClusterID(ctx)
if err != nil {
Expand All @@ -82,14 +82,14 @@ func fetchClusterIDFromConsumer(c *kafka.Consumer) string {
return clusterID
}

func fetchClusterIDFromProducer(p *kafka.Producer) string {
func fetchClusterIDFromProducer(ctx context.Context, p *kafka.Producer) string {
admin, err := kafka.NewAdminClientFromProducer(p)
if err != nil {
instr.Logger().Warn("failed to create admin client from producer for cluster ID: %s", err)
return ""
}
defer admin.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
clusterID, err := admin.ClusterID(ctx)
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
// Close calls the underlying Consumer.Close and if polling is enabled, finishes
// any remaining span.
func (c *Consumer) Close() error {
c.tracer.WaitForClusterID()
c.tracer.StopClusterIDFetch()
err := c.Consumer.Close()
// we only close the previous span if consuming via the events channel is
// not enabled, because otherwise there would be a data race from the
Expand Down Expand Up @@ -255,7 +255,7 @@ func (p *Producer) Events() chan kafka.Event {
// Close calls the underlying Producer.Close and also closes the internal
// wrapping producer channel.
func (p *Producer) Close() {
p.tracer.WaitForClusterID()
p.tracer.StopClusterIDFetch()
close(p.produceChannel)
p.Producer.Close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafkatrace

import "github.com/DataDog/dd-trace-go/v2/instrumentation/kafkaclusterid"

// NormalizeBootstrapServers returns a canonical form of a comma-separated list
// of broker addresses.
func NormalizeBootstrapServers(bootstrapServers string) string {
return kafkaclusterid.NormalizeBootstrapServers(bootstrapServers)
}

// GetCachedClusterID returns a cached cluster ID for the given bootstrap servers key.
func GetCachedClusterID(bootstrapServers string) (string, bool) {
return kafkaclusterid.GetCachedID(bootstrapServers)
}

// SetCachedClusterID caches a cluster ID for the given bootstrap servers key.
func SetCachedClusterID(bootstrapServers, clusterID string) {
kafkaclusterid.SetCachedID(bootstrapServers, clusterID)
}

// ResetClusterIDCache clears the cluster ID cache. This is intended for use in tests.
func ResetClusterIDCache() {
kafkaclusterid.ResetCache()
}

This file was deleted.

40 changes: 17 additions & 23 deletions contrib/confluentinc/confluent-kafka-go/kafkatrace/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
"math"
"net"
"strings"
"sync"

"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/internal"

Check failure on line 16 in contrib/confluentinc/confluent-kafka-go/kafkatrace/tracer.go

View workflow job for this annotation

GitHub Actions / lint

[golangci] reported by reviewdog 🐶 File is not properly formatted (gofmt) Raw Output: contrib/confluentinc/confluent-kafka-go/kafkatrace/tracer.go:16:1: File is not properly formatted (gofmt) "github.com/DataDog/dd-trace-go/v2/internal" ^ 1 issues: * gofmt: 1
"github.com/DataDog/dd-trace-go/v2/instrumentation/kafkaclusterid"
)

type Tracer struct {
PrevSpan *tracer.Span
ClusterIDFetcher kafkaclusterid.Fetcher
ctx context.Context
consumerServiceName string
producerServiceName string
Expand All @@ -27,9 +28,6 @@
analyticsRate float64
bootstrapServers string
groupID string
clusterID string
clusterIDMu sync.RWMutex
clusterIDReady chan struct{}
tagFns map[string]func(msg Message) any
dsmEnabled bool
ckgoVersion CKGoVersion
Expand All @@ -41,34 +39,30 @@
}

func (tr *Tracer) ClusterID() string {
tr.clusterIDMu.RLock()
defer tr.clusterIDMu.RUnlock()
return tr.clusterID
return tr.ClusterIDFetcher.ID()
}

func (tr *Tracer) SetClusterID(id string) {
tr.clusterIDMu.Lock()
defer tr.clusterIDMu.Unlock()
tr.clusterID = id
tr.ClusterIDFetcher.SetID(id)
}

// FetchClusterIDAsync launches a background goroutine to fetch the cluster ID.
// Use WaitForClusterID to block until the fetch completes.
func (tr *Tracer) FetchClusterIDAsync(fetchFn func() string) {
tr.clusterIDReady = make(chan struct{})
go func() {
defer close(tr.clusterIDReady)
if id := fetchFn(); id != "" {
tr.SetClusterID(id)
}
}()
// The goroutine is cancelled when StopClusterIDFetch is called.
func (tr *Tracer) FetchClusterIDAsync(fetchFn func(ctx context.Context) string) {
tr.ClusterIDFetcher.FetchAsync(fetchFn)
}

// StopClusterIDFetch cancels any in-flight cluster ID fetch and waits for the
// goroutine to finish cleanup. This returns near-instantly because the context
// cancellation causes in-flight network calls to abort.
func (tr *Tracer) StopClusterIDFetch() {
tr.ClusterIDFetcher.Stop()
}

// WaitForClusterID blocks until any in-flight async cluster ID fetch completes.
// WaitForClusterID blocks until any in-flight cluster ID fetch completes.
// Use this in tests to ensure the cluster ID is available before asserting.
func (tr *Tracer) WaitForClusterID() {
if tr.clusterIDReady != nil {
<-tr.clusterIDReady
}
tr.ClusterIDFetcher.Wait()
}

type Option interface {
Expand Down
8 changes: 7 additions & 1 deletion contrib/segmentio/kafka-go/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
if tr.kafkaCfg.ConsumerGroupID != "" {
edges = append(edges, "group:"+tr.kafkaCfg.ConsumerGroupID)
}
if tr.ClusterID() != "" {
edges = append(edges, "kafka_cluster_id:"+tr.ClusterID())
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
Expand All @@ -34,7 +37,7 @@ func (tr *Tracer) SetConsumeDSMCheckpoint(msg Message) {
if tr.kafkaCfg.ConsumerGroupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset())
tracer.TrackKafkaCommitOffsetWithCluster(tr.kafkaCfg.ConsumerGroupID, msg.GetTopic(), int32(msg.GetPartition()), msg.GetOffset(), tr.ClusterID())
}
}

Expand All @@ -51,6 +54,9 @@ func (tr *Tracer) SetProduceDSMCheckpoint(msg Message, writer Writer) {
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
if tr.ClusterID() != "" {
edges = append(edges, "kafka_cluster_id:"+tr.ClusterID())
}
carrier := MessageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
Expand Down
23 changes: 23 additions & 0 deletions contrib/segmentio/kafka-go/internal/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package tracing

import (
"context"
"math"

"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/instrumentation/kafkaclusterid"
)

var instr *instrumentation.Instrumentation
Expand All @@ -25,6 +27,7 @@ type Tracer struct {
analyticsRate float64
dataStreamsEnabled bool
kafkaCfg KafkaConfig
clusterIDFetcher kafkaclusterid.Fetcher
}

// Option describes options for the Kafka integration.
Expand Down Expand Up @@ -93,6 +96,26 @@ func WithDataStreams() Option {
})
}

func (tr *Tracer) ClusterID() string {
return tr.clusterIDFetcher.ID()
}

func (tr *Tracer) SetClusterID(id string) {
tr.clusterIDFetcher.SetID(id)
}

func (tr *Tracer) FetchClusterIDAsync(fetchFn func(ctx context.Context) string) {
tr.clusterIDFetcher.FetchAsync(fetchFn)
}

func (tr *Tracer) WaitForClusterID() {
tr.clusterIDFetcher.Wait()
}

func (tr *Tracer) StopClusterIDFetch() {
tr.clusterIDFetcher.Stop()
}

func Logger() instrumentation.Logger {
return instr.Logger()
}
Loading
Loading