Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie
chatBuilder.Use(innerClient => new PerServiceCallChatHistoryPersistingChatClient(innerClient));
}

// DeferredOpenTelemetryChatClient is registered last so it sits as the innermost decorator, directly
// above the leaf client and below FunctionInvokingChatClient. It is inert until an OpenTelemetryAgent
// activates it. Placing OpenTelemetry below FICC ensures the chat span closes before tools are invoked,
// so FICC emits execute_tool spans on the agent source.
chatBuilder.Use(innerClient => new DeferredOpenTelemetryChatClient(innerClient));

var agentChatClient = chatBuilder.Build(services);

if (options?.ChatOptions?.Tools is { Count: > 0 })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI;

/// <summary>
/// A delegating chat client that reserves a position for OpenTelemetry instrumentation directly above
/// the leaf <see cref="IChatClient"/> and below the <see cref="FunctionInvokingChatClient"/> in a
/// <see cref="ChatClientAgent"/> pipeline.
/// </summary>
/// <remarks>
/// <para>
/// The slot is inert until <see cref="Activate"/> is called: it simply forwards to its inner client.
/// When the agent is wrapped by an <see cref="OpenTelemetryAgent"/>, that agent activates the slot with
/// the resolved source name, at which point the slot routes calls through an
/// <see cref="OpenTelemetryChatClient"/> so chat spans are emitted below the
/// <see cref="FunctionInvokingChatClient"/>.
/// </para>
/// <para>
/// Positioning OpenTelemetry below FICC is required for tool telemetry: the chat span then closes before
/// FICC invokes tools, so <see cref="System.Diagnostics.Activity.Current"/> is the invoke_agent span and
/// FICC emits execute_tool spans on the agent source.
/// </para>
/// </remarks>
internal sealed class DeferredOpenTelemetryChatClient : DelegatingChatClient
{
private readonly object _activationLock = new();
private volatile IChatClient _target;
private OpenTelemetryChatClient? _activatedClient;

/// <summary>
/// Initializes a new instance of the <see cref="DeferredOpenTelemetryChatClient"/> class in its inert state.
/// </summary>
/// <param name="innerClient">The underlying chat client to forward to until the slot is activated.</param>
public DeferredOpenTelemetryChatClient(IChatClient innerClient)
: base(innerClient)
{
this._target = innerClient;
}

/// <summary>Gets a value indicating whether the slot has been activated.</summary>
public bool IsActive => !ReferenceEquals(this._target, this.InnerClient);

/// <summary>
/// Gets or sets a value indicating whether the activated <see cref="OpenTelemetryChatClient"/> should
/// include potentially sensitive information (such as message content) in telemetry. Reading or writing
/// this property is a no-op while the slot is inert; the owning <see cref="OpenTelemetryAgent"/> applies
/// and propagates the value once the slot is activated.
/// </summary>
public bool EnableSensitiveData
{
get => this._activatedClient?.EnableSensitiveData ?? false;
set
{
if (this._activatedClient is { } activatedClient)
{
activatedClient.EnableSensitiveData = value;
}
}
}

/// <summary>
/// Activates the slot so that calls are routed through an <see cref="OpenTelemetryChatClient"/> wrapping
/// the inner client under the specified <paramref name="sourceName"/>. Idempotent and thread-safe; a
/// second call (or a call after another thread activated the slot) is a no-op.
/// </summary>
/// <param name="sourceName">The telemetry source name to emit chat spans under.</param>
public void Activate(string sourceName)
{
if (this.IsActive)
{
return;
}

lock (this._activationLock)
{
if (this.IsActive)
{
return;
}

var activatedTarget = this.InnerClient.AsBuilder().UseOpenTelemetry(sourceName: sourceName).Build();

// Capture the OpenTelemetryChatClient so the owning agent can propagate EnableSensitiveData to it
// (the agent's value may be set after construction, e.g. via the UseOpenTelemetry configure callback).
this._activatedClient = activatedTarget.GetService(typeof(OpenTelemetryChatClient)) as OpenTelemetryChatClient;
this._target = activatedTarget;
}
}

/// <inheritdoc/>
public override Task<ChatResponse> GetResponseAsync(
IEnumerable<ChatMessage> messages, ChatOptions? options = null, CancellationToken cancellationToken = default) =>
this._target.GetResponseAsync(messages, options, cancellationToken);

/// <inheritdoc/>
public override IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
IEnumerable<ChatMessage> messages, ChatOptions? options = null, CancellationToken cancellationToken = default) =>
this._target.GetStreamingResponseAsync(messages, options, cancellationToken);

/// <inheritdoc/>
public override object? GetService(Type serviceType, object? serviceKey = null)
{
_ = Throw.IfNull(serviceType);

// Return this slot for its own type and base contracts; otherwise forward to the current target so
// that, once activated, queries such as OpenTelemetryChatClient and ActivitySource resolve to the
// activated instrumentation rather than the bare leaf.
return serviceKey is null && serviceType.IsInstanceOfType(this)
? this
: this._target.GetService(serviceType, serviceKey);
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (disposing && !ReferenceEquals(this._target, this.InnerClient))
{
// When activated, _target is an OpenTelemetryChatClient wrapping the inner client; dispose it so its
// own telemetry resources are released. It also disposes the inner client, which is idempotent with the
// base.Dispose call below.
this._target.Dispose();
}

base.Dispose(disposing);
}
}
109 changes: 45 additions & 64 deletions dotnet/src/Microsoft.Agents.AI/OpenTelemetryAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public sealed class OpenTelemetryAgent : DelegatingAIAgent, IDisposable
/// </summary>
private readonly bool _autoWireChatClient;

/// <summary>
/// The auto-wired below-FICC telemetry slot, when one was activated. Cached so that updates to
/// <see cref="EnableSensitiveData"/> made after construction can be propagated to it.
/// </summary>
private DeferredOpenTelemetryChatClient? _innerTelemetrySlot;

/// <summary>Initializes a new instance of the <see cref="OpenTelemetryAgent"/> class.</summary>
/// <param name="innerAgent">The underlying <see cref="AIAgent"/> to be augmented with telemetry capabilities.</param>
/// <param name="sourceName">
Expand Down Expand Up @@ -91,6 +97,8 @@ public OpenTelemetryAgent(AIAgent innerAgent, string? sourceName, bool autoWireC
this._otelClient = new OpenTelemetryChatClient(
new ForwardingChatClient(this),
sourceName: this._sourceName);

this.TryActivateInnerChatClientTelemetry();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -120,7 +128,16 @@ public OpenTelemetryAgent(AIAgent innerAgent, string? sourceName, bool autoWireC
public bool EnableSensitiveData
{
get => this._otelClient.EnableSensitiveData;
set => this._otelClient.EnableSensitiveData = value;
set
{
this._otelClient.EnableSensitiveData = value;

// Keep the auto-wired below-FICC slot in sync so its chat span captures message content too.
if (this._innerTelemetrySlot is { } slot)
{
slot.EnableSensitiveData = value;
}
}
}

/// <inheritdoc/>
Expand Down Expand Up @@ -204,84 +221,52 @@ public ForwardedOptions(AgentRunOptions? options, AgentSession? session, Activit
}

/// <summary>
/// If auto-wiring is enabled and the inner agent is a <see cref="ChatClientAgent"/> whose underlying
/// <see cref="IChatClient"/> is not already instrumented with <see cref="OpenTelemetryChatClient"/>, returns a
/// new <see cref="ChatClientAgentRunOptions"/> with a <see cref="ChatClientAgentRunOptions.ChatClientFactory"/>
/// that wraps the chat client with <see cref="OpenTelemetryChatClient"/>. When <paramref name="options"/> is a
/// plain <see cref="AgentRunOptions"/> (the base type, not <see cref="ChatClientAgentRunOptions"/>), the base
/// properties are copied onto the new <see cref="ChatClientAgentRunOptions"/> so high-level callers that pass
/// the abstract <see cref="AgentRunOptions"/> still benefit from auto-wiring and propagate their settings to
/// the inner agent. Otherwise, returns <paramref name="options"/> unchanged.
/// When auto-wiring is enabled and the inner agent is a <see cref="ChatClientAgent"/> whose underlying
/// <see cref="IChatClient"/> is not already instrumented, activates the in-place
/// <see cref="DeferredOpenTelemetryChatClient"/> slot so that chat spans are emitted below the
/// <see cref="FunctionInvokingChatClient"/> under this agent's source name. Positioning OpenTelemetry below FICC
/// is what allows FICC to emit execute_tool spans on the agent source. Respects
/// <see cref="ChatClientAgentOptions.UseProvidedChatClientAsIs"/> and is a no-op when no slot is reachable.
/// </summary>
private AgentRunOptions? GetRunOptionsWithChatClientWiring(AgentRunOptions? options)
private void TryActivateInnerChatClientTelemetry()
{
if (!this._autoWireChatClient)
{
return options;
return;
}

// The auto-wiring only applies when a ChatClientAgent is reachable from the inner agent. Otherwise, no-op.
// Auto-wiring only applies when a ChatClientAgent is reachable from the inner agent. Otherwise, no-op.
// Use GetService rather than a type check so wrapping agents that expose a nested ChatClientAgent are supported.
var chatClientAgent = this.InnerAgent.GetService<ChatClientAgent>();
if (chatClientAgent is null)
{
return options;
return;
}

// Respect ChatClientAgentOptions.UseProvidedChatClientAsIs: don't decorate the chat client when the user opted out.
if (chatClientAgent.GetService<ChatClientAgentOptions>()?.UseProvidedChatClientAsIs is true)
{
return options;
return;
}

// Capture the underlying IChatClient and check whether it is already instrumented.
// Don't activate when the chat client is already instrumented (e.g. the caller added their own
// OpenTelemetryChatClient), to avoid emitting duplicate chat spans.
var chatClient = chatClientAgent.GetService<IChatClient>();
if (chatClient is null || chatClient.GetService(typeof(OpenTelemetryChatClient)) is not null)
{
return options;
}

string sourceName = this._sourceName;
bool enableSensitiveData = this.EnableSensitiveData;
static IChatClient WrapIfNeeded(IChatClient cc, string sourceName, bool enableSensitiveData) =>
cc.GetService(typeof(OpenTelemetryChatClient)) is not null
? cc
: cc.AsBuilder().UseOpenTelemetry(sourceName: sourceName, configure: o => o.EnableSensitiveData = enableSensitiveData).Build();

if (options is ChatClientAgentRunOptions ccOptions)
{
// Don't mutate the caller's options; clone and chain any caller-provided factory.
// If the user factory already returns an OpenTelemetry-instrumented client, don't double-wrap.
var clone = (ChatClientAgentRunOptions)ccOptions.Clone();
var userFactory = clone.ChatClientFactory;
clone.ChatClientFactory = cc => WrapIfNeeded(userFactory is null ? cc : userFactory(cc), sourceName, enableSensitiveData);
return clone;
return;
}

// For a plain AgentRunOptions (or null), create a ChatClientAgentRunOptions and preserve
// any base AgentRunOptions properties from the caller so they reach the inner agent.
var newOptions = new ChatClientAgentRunOptions
{
ChatClientFactory = cc => WrapIfNeeded(cc, sourceName, enableSensitiveData),
};

if (options is not null)
// Activate the pre-placed slot in-place (below FICC) rather than wrapping a new OpenTelemetryChatClient
// around the whole pipeline on each run. Cache it and seed its EnableSensitiveData from the current
// value so a later change to this agent's EnableSensitiveData can be propagated to the inner chat span.
if (chatClient.GetService(typeof(DeferredOpenTelemetryChatClient)) is DeferredOpenTelemetryChatClient slot)
{
CopyBaseAgentRunOptions(options, newOptions);
slot.Activate(this._sourceName);
slot.EnableSensitiveData = this.EnableSensitiveData;
this._innerTelemetrySlot = slot;
}

return newOptions;
}

#pragma warning disable MEAI001 // ContinuationToken is experimental; copy it through to preserve caller-provided value.
private static void CopyBaseAgentRunOptions(AgentRunOptions source, AgentRunOptions target)
{
target.ContinuationToken = source.ContinuationToken;
target.AllowBackgroundResponses = source.AllowBackgroundResponses;
target.AdditionalProperties = source.AdditionalProperties?.Clone();
target.ResponseFormat = source.ResponseFormat;
}
#pragma warning restore MEAI001

/// <summary>The stub <see cref="IChatClient"/> used to delegate from the <see cref="OpenTelemetryChatClient"/> into the inner <see cref="AIAgent"/>.</summary>
/// <param name="parentAgent"></param>
Expand All @@ -295,11 +280,9 @@ public async Task<ChatResponse> GetResponseAsync(
// Update the current activity to reflect the agent invocation.
parentAgent.UpdateCurrentActivity(fo?.CurrentActivity);

// If enabled, wire the underlying chat client with OpenTelemetryChatClient via ChatClientFactory.
var runOptions = parentAgent.GetRunOptionsWithChatClientWiring(fo?.Options);

// Invoke the inner agent.
var response = await parentAgent.InnerAgent.RunAsync(messages, fo?.Session, runOptions, cancellationToken).ConfigureAwait(false);
// Invoke the inner agent. Chat-level telemetry is emitted by the in-place DeferredOpenTelemetryChatClient
// slot (below FICC), activated once at construction; no per-run chat-client wiring is needed here.
var response = await parentAgent.InnerAgent.RunAsync(messages, fo?.Session, fo?.Options, cancellationToken).ConfigureAwait(false);

// Wrap the response in a ChatResponse so we can pass it back through OpenTelemetryChatClient.
return response.AsChatResponse();
Expand All @@ -313,11 +296,9 @@ public async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
// Update the current activity to reflect the agent invocation.
parentAgent.UpdateCurrentActivity(fo?.CurrentActivity);

// If enabled, wire the underlying chat client with OpenTelemetryChatClient via ChatClientFactory.
var runOptions = parentAgent.GetRunOptionsWithChatClientWiring(fo?.Options);

// Invoke the inner agent.
await foreach (var update in parentAgent.InnerAgent.RunStreamingAsync(messages, fo?.Session, runOptions, cancellationToken).ConfigureAwait(false))
// Invoke the inner agent. Chat-level telemetry is emitted by the in-place DeferredOpenTelemetryChatClient
// slot (below FICC), activated once at construction; no per-run chat-client wiring is needed here.
await foreach (var update in parentAgent.InnerAgent.RunStreamingAsync(messages, fo?.Session, fo?.Options, cancellationToken).ConfigureAwait(false))
{
// Wrap the response updates in ChatResponseUpdates so we can pass them back through OpenTelemetryChatClient.
yield return update.AsChatResponseUpdate();
Expand Down
Loading
Loading