@@ -8,6 +8,10 @@ import (
88 "fmt"
99 "sync"
1010
11+ "go.opentelemetry.io/otel"
12+ "go.opentelemetry.io/otel/attribute"
13+ "go.opentelemetry.io/otel/codes"
14+ "go.opentelemetry.io/otel/trace"
1115 "golang.org/x/sync/errgroup"
1216
1317 "github.com/stacklok/toolhive/pkg/logger"
@@ -21,6 +25,7 @@ type defaultAggregator struct {
2125 backendClient vmcp.BackendClient
2226 conflictResolver ConflictResolver
2327 toolConfigMap map [string ]* config.WorkloadToolConfig // Maps backend ID to tool config
28+ tracer trace.Tracer
2429}
2530
2631// NewDefaultAggregator creates a new default aggregator implementation.
@@ -43,12 +48,20 @@ func NewDefaultAggregator(
4348 backendClient : backendClient ,
4449 conflictResolver : conflictResolver ,
4550 toolConfigMap : toolConfigMap ,
51+ tracer : otel .Tracer ("github.com/stacklok/toolhive/pkg/vmcp/aggregator" ),
4652 }
4753}
4854
4955// QueryCapabilities queries a single backend for its MCP capabilities.
5056// Returns the raw capabilities (tools, resources, prompts) from the backend.
5157func (a * defaultAggregator ) QueryCapabilities (ctx context.Context , backend vmcp.Backend ) (* BackendCapabilities , error ) {
58+ ctx , span := a .tracer .Start (ctx , "aggregator.QueryCapabilities" ,
59+ trace .WithAttributes (
60+ attribute .String ("backend.id" , backend .ID ),
61+ ),
62+ )
63+ defer span .End ()
64+
5265 logger .Debugf ("Querying capabilities from backend %s" , backend .ID )
5366
5467 // Create a BackendTarget from the Backend
@@ -58,6 +71,8 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
5871 // Query capabilities using the backend client
5972 capabilities , err := a .backendClient .ListCapabilities (ctx , target )
6073 if err != nil {
74+ span .RecordError (err )
75+ span .SetStatus (codes .Error , err .Error ())
6176 return nil , fmt .Errorf ("%w: %s: %w" , ErrBackendQueryFailed , backend .ID , err )
6277 }
6378
@@ -74,6 +89,12 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
7489 SupportsSampling : capabilities .SupportsSampling ,
7590 }
7691
92+ span .SetAttributes (
93+ attribute .Int ("tools.count" , len (result .Tools )),
94+ attribute .Int ("resources.count" , len (result .Resources )),
95+ attribute .Int ("prompts.count" , len (result .Prompts )),
96+ )
97+
7798 logger .Debugf ("Backend %s: %d tools (after filtering/overrides), %d resources, %d prompts" ,
7899 backend .ID , len (result .Tools ), len (result .Resources ), len (result .Prompts ))
79100
@@ -86,6 +107,13 @@ func (a *defaultAggregator) QueryAllCapabilities(
86107 ctx context.Context ,
87108 backends []vmcp.Backend ,
88109) (map [string ]* BackendCapabilities , error ) {
110+ ctx , span := a .tracer .Start (ctx , "aggregator.QueryAllCapabilities" ,
111+ trace .WithAttributes (
112+ attribute .Int ("backends.count" , len (backends )),
113+ ),
114+ )
115+ defer span .End ()
116+
89117 logger .Infof ("Querying capabilities from %d backends" , len (backends ))
90118
91119 // Use errgroup for parallel queries with context cancellation
@@ -118,13 +146,22 @@ func (a *defaultAggregator) QueryAllCapabilities(
118146
119147 // Wait for all queries to complete
120148 if err := g .Wait (); err != nil {
149+ span .RecordError (err )
150+ span .SetStatus (codes .Error , err .Error ())
121151 return nil , fmt .Errorf ("capability queries failed: %w" , err )
122152 }
123153
124154 if len (capabilities ) == 0 {
125- return nil , fmt .Errorf ("no backends returned capabilities" )
155+ err := fmt .Errorf ("no backends returned capabilities" )
156+ span .RecordError (err )
157+ span .SetStatus (codes .Error , err .Error ())
158+ return nil , err
126159 }
127160
161+ span .SetAttributes (
162+ attribute .Int ("successful.backends" , len (capabilities )),
163+ )
164+
128165 logger .Infof ("Successfully queried %d/%d backends" , len (capabilities ), len (backends ))
129166 return capabilities , nil
130167}
@@ -135,6 +172,13 @@ func (a *defaultAggregator) ResolveConflicts(
135172 ctx context.Context ,
136173 capabilities map [string ]* BackendCapabilities ,
137174) (* ResolvedCapabilities , error ) {
175+ ctx , span := a .tracer .Start (ctx , "aggregator.ResolveConflicts" ,
176+ trace .WithAttributes (
177+ attribute .Int ("backends.count" , len (capabilities )),
178+ ),
179+ )
180+ defer span .End ()
181+
138182 logger .Debugf ("Resolving conflicts across %d backends" , len (capabilities ))
139183
140184 // Group tools by backend for conflict resolution
@@ -150,6 +194,8 @@ func (a *defaultAggregator) ResolveConflicts(
150194 if a .conflictResolver != nil {
151195 resolvedTools , err = a .conflictResolver .ResolveToolConflicts (ctx , toolsByBackend )
152196 if err != nil {
197+ span .RecordError (err )
198+ span .SetStatus (codes .Error , err .Error ())
153199 return nil , fmt .Errorf ("conflict resolution failed: %w" , err )
154200 }
155201 } else {
@@ -191,6 +237,12 @@ func (a *defaultAggregator) ResolveConflicts(
191237 resolved .SupportsSampling = resolved .SupportsSampling || caps .SupportsSampling
192238 }
193239
240+ span .SetAttributes (
241+ attribute .Int ("resolved.tools" , len (resolved .Tools )),
242+ attribute .Int ("resolved.resources" , len (resolved .Resources )),
243+ attribute .Int ("resolved.prompts" , len (resolved .Prompts )),
244+ )
245+
194246 logger .Debugf ("Resolved %d unique tools, %d resources, %d prompts" ,
195247 len (resolved .Tools ), len (resolved .Resources ), len (resolved .Prompts ))
196248
@@ -199,11 +251,20 @@ func (a *defaultAggregator) ResolveConflicts(
199251
200252// MergeCapabilities creates the final unified capability view and routing table.
201253// Uses the backend registry to populate full BackendTarget information for routing.
202- func (* defaultAggregator ) MergeCapabilities (
254+ func (a * defaultAggregator ) MergeCapabilities (
203255 ctx context.Context ,
204256 resolved * ResolvedCapabilities ,
205257 registry vmcp.BackendRegistry ,
206258) (* AggregatedCapabilities , error ) {
259+ ctx , span := a .tracer .Start (ctx , "aggregator.MergeCapabilities" ,
260+ trace .WithAttributes (
261+ attribute .Int ("resolved.tools" , len (resolved .Tools )),
262+ attribute .Int ("resolved.resources" , len (resolved .Resources )),
263+ attribute .Int ("resolved.prompts" , len (resolved .Prompts )),
264+ ),
265+ )
266+ defer span .End ()
267+
207268 logger .Debugf ("Merging capabilities into final view" )
208269
209270 // Create routing table
@@ -304,6 +365,13 @@ func (*defaultAggregator) MergeCapabilities(
304365 },
305366 }
306367
368+ span .SetAttributes (
369+ attribute .Int ("aggregated.tools" , aggregated .Metadata .ToolCount ),
370+ attribute .Int ("aggregated.resources" , aggregated .Metadata .ResourceCount ),
371+ attribute .Int ("aggregated.prompts" , aggregated .Metadata .PromptCount ),
372+ attribute .String ("conflict.strategy" , string (aggregated .Metadata .ConflictStrategy )),
373+ )
374+
307375 logger .Infof ("Merged capabilities: %d tools, %d resources, %d prompts" ,
308376 aggregated .Metadata .ToolCount , aggregated .Metadata .ResourceCount , aggregated .Metadata .PromptCount )
309377
@@ -316,6 +384,13 @@ func (*defaultAggregator) MergeCapabilities(
316384// 3. Resolve conflicts
317385// 4. Merge into final view with full backend information
318386func (a * defaultAggregator ) AggregateCapabilities (ctx context.Context , backends []vmcp.Backend ) (* AggregatedCapabilities , error ) {
387+ ctx , span := a .tracer .Start (ctx , "aggregator.AggregateCapabilities" ,
388+ trace .WithAttributes (
389+ attribute .Int ("backends.count" , len (backends )),
390+ ),
391+ )
392+ defer span .End ()
393+
319394 logger .Infof ("Starting capability aggregation for %d backends" , len (backends ))
320395
321396 // Step 1: Create registry from discovered backends
@@ -325,24 +400,38 @@ func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends
325400 // Step 2: Query all backends
326401 capabilities , err := a .QueryAllCapabilities (ctx , backends )
327402 if err != nil {
403+ span .RecordError (err )
404+ span .SetStatus (codes .Error , err .Error ())
328405 return nil , fmt .Errorf ("failed to query backends: %w" , err )
329406 }
330407
331408 // Step 3: Resolve conflicts
332409 resolved , err := a .ResolveConflicts (ctx , capabilities )
333410 if err != nil {
411+ span .RecordError (err )
412+ span .SetStatus (codes .Error , err .Error ())
334413 return nil , fmt .Errorf ("failed to resolve conflicts: %w" , err )
335414 }
336415
337416 // Step 4: Merge into final view with full backend information
338417 aggregated , err := a .MergeCapabilities (ctx , resolved , registry )
339418 if err != nil {
419+ span .RecordError (err )
420+ span .SetStatus (codes .Error , err .Error ())
340421 return nil , fmt .Errorf ("failed to merge capabilities: %w" , err )
341422 }
342423
343424 // Update metadata with backend count
344425 aggregated .Metadata .BackendCount = len (backends )
345426
427+ span .SetAttributes (
428+ attribute .Int ("aggregated.backends" , aggregated .Metadata .BackendCount ),
429+ attribute .Int ("aggregated.tools" , aggregated .Metadata .ToolCount ),
430+ attribute .Int ("aggregated.resources" , aggregated .Metadata .ResourceCount ),
431+ attribute .Int ("aggregated.prompts" , aggregated .Metadata .PromptCount ),
432+ attribute .String ("conflict.strategy" , string (aggregated .Metadata .ConflictStrategy )),
433+ )
434+
346435 logger .Infof ("Capability aggregation complete: %d backends, %d tools, %d resources, %d prompts" ,
347436 aggregated .Metadata .BackendCount , aggregated .Metadata .ToolCount ,
348437 aggregated .Metadata .ResourceCount , aggregated .Metadata .PromptCount )
0 commit comments