-
Notifications
You must be signed in to change notification settings - Fork 6
Single Agent Tool LlamaIndex Instrumentation #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add embedding event handlers (_handle_embedding_start, _handle_embedding_end) - Extract model name, input texts, and dimension count from embedding events - Create vendor_detection.py module with VendorRule-based provider detection - Support 13+ embedding providers (OpenAI, Azure, AWS, Google, Cohere, etc.) - Add test_embedding_instrumentation.py with single and batch embedding tests - Update README with embedding documentation and provider list - Tested successfully with OpenAI embeddings API
|
|
||
| # Get the currently active span to establish parent-child relationship | ||
| # First try to get from active agent context (workflow-based agents) | ||
| parent_span = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a good candidate for a helper function? i.e. 154-164
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a helper function and refactored.
Changed start_agent_invocation -> start_agent and stop_agent_invocation -> stop_agent to match the actual TelemetryHandler API. These methods are called when CBEventType.AGENT_STEP events are emitted (rare in modern LlamaIndex, but used by legacy AgentRunner and custom workflows).
| if not self._handler: | ||
| return None | ||
| current_id = parent_id | ||
| while current_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to do some defensive programming and use a setto track the visited invocation similar to what we do in opentelemetry.instrumentation.langchain.callback_handler.LangchainCallbackHandler._find_nearest_agent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added visited checking
| request_presence_penalty=presence_penalty, | ||
| request_stop_sequences=stop if stop else [], | ||
| request_seed=seed, | ||
| parent_run_id=parent_id if parent_id else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we plan on passing parent_run_id from the instrumentation layer for the types. This needs to go the invocation_manager.py
cc: @keith-decker @wrisa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| """ | ||
| Wrap agent.run() to instrument workflow events. | ||
| This creates a root agent span immediately when agent.run() is called, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we NOT creating a root workflow span?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At root level (no parent):
If [_is_agent_root()] detects agent markers → creates AgentInvocation
Otherwise → creates Workflow (generic chain/sequence)
This is agent call.
In the RAG instrumentation, I created the workflow span
| # Create root agent invocation before workflow starts | ||
| root_agent = AgentInvocation( | ||
| name=f"agent.{type(instance).__name__}", | ||
| run_id=str(uuid4()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to pass run_id when creating new types as it is a default field. Also, we shouldn't be passing run_id or parent_run_id from the instrumentation layer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| from uuid import uuid4 | ||
|
|
||
| # Create root agent invocation before workflow starts | ||
| root_agent = AgentInvocation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something but it's better to create a root workflow span and make this root_agent it's child
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design logic here is
For root-level calls (parent_run_id is None):
Check if it's an agent using [_is_agent_root()] - looks for agent tags/metadata
If agent → create AgentInvocation span
If not agent → create Workflow span (for chains, sequences, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added workflow span
…alfx/splunk-otel-python-contrib into singleAgent-tool-llamaindex
|
|
||
|
|
||
| # OAuth2 Token Manager for CircuIT | ||
| class OAuth2TokenManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to a util dir and share with zero-code example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved.
| return a + b | ||
|
|
||
|
|
||
| def setup_telemetry(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| return tracer_provider | ||
|
|
||
|
|
||
| def setup_telemetry_with_memory(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does having an InMemorySpanExporter help with testing vs the Console based exporters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enables assertions - you can verify span count, names, attributes, hierarchy without printing out the spans
| Wrap agent.run() to instrument workflow events. | ||
| This creates a Workflow span as the root (since ReActAgent inherits from Workflow), | ||
| then creates an AgentInvocation span nested inside it. This matches the CrewAI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's remove reference to CrewAI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| initial_input=str(user_msg), | ||
| attributes={}, | ||
| ) | ||
| root_workflow.framework = "llamaindex" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already passing "llamaindex" on line 120 when instantiating the Workflow(). This looks redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| self._active_tools = {} # tool_id -> ToolCall | ||
| self._root_agent = None # Reference to the root agent span | ||
|
|
||
| async def instrument_workflow_handler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does llamaindex not support synchronous workflows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LlamaIndex's Workflow-based agents are fundamentally async-only. The legacy one is synchronous but the newer ReactAgent(workflow-based agents) are async only.
|
|
||
| # Get TelemetryHandler from callback handler if available | ||
| from llama_index.core import Settings | ||
| from opentelemetry import context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid context management on the instrumentation layer. Pls use genai utils for context management
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| parent_context = None | ||
| if telemetry_handler: | ||
| # Level 1: Create root workflow span (since ReActAgent inherits from Workflow) | ||
| root_workflow = Workflow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code flow I think this will wrap every agent call inside a new workflow span but we need a single workflow root span to which agent span are attached, e.g.
- workflow
- agent 1
- agent 2
- agent 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
| # Call the original run() method to get the workflow handler | ||
| handler = wrapped(*args, **kwargs) | ||
|
|
||
| if telemetry_handler and root_workflow and root_agent and parent_context: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove parent_context from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| def __init__(self, handler: TelemetryHandler): | ||
| self._handler = handler | ||
| self._active_tools = {} # tool_id -> ToolCall | ||
| self._root_agent = None # Reference to the root agent span |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LLamaIndex must have multi-agent support. I might be wrong but pls check. How will we track the multi-agent apps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found this in the docs - https://developers.llamaindex.ai/python/framework/understanding/agent/multi_agent/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I actually dealt with multi-agent scenario in the code. The PR title is confusing, i will change it.
|
Adding a separate workflow agent instrumentation mechanism with a wrapper function:
✅ Dual instrumentation paths for backward compatibility
✅ Non-invasive: Uses monkey-patching, no code changes needed
✅ Async-aware: Background task doesn't block user code
✅ Context propagation: Manual for workflows, automatic for callbacks
✅ Shared TelemetryHandler: Both paths use same handler instance
User code:
agent = ReActAgent(tools=[...])
handler = agent.run("Calculate 5*3+2")
↓
↓ (intercepted by wrap_agent_run)
↓
wrap_agent_run():
Line 137: handler = wrapped(*args, **kwargs)
↓
└─→ Calls original ReActAgent.run()
↓
└─→ Returns WorkflowHandler
(contains stream of events)
Line 159: await instrumentor.instrument_workflow_handler(handler, ...)
↓
└─→ Pass handler to instrumentor
Line 169: return handler # Return to user
↓
└─→ User can await it
instrument_workflow_handler(workflow_handler):
Line 43: async for event in workflow_handler.stream_events():
↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
THIS IS THE CONSUMPTION!
Below is a sample trace when running this app locally
https://shw-playground.signalfx.com/#/apm/traces/f37aebbad8b26e16fd20810c5b23a017?tab=waterfall