|
| 1 | +from dimo.errors import check_type, check_optional_type, HTTPError |
| 2 | +from typing import Dict, List, Optional, Any, Generator |
| 3 | +from requests import Session, RequestException |
| 4 | +import json |
| 5 | + |
| 6 | + |
| 7 | +class Conversations: |
| 8 | + """ |
| 9 | + Client for the DIMO Conversations API. |
| 10 | +
|
| 11 | + This API enables developers to create conversational AI agents that can query |
| 12 | + vehicle data, telemetry data, and perform web searches on behalf of users. |
| 13 | +
|
| 14 | + Key Features: |
| 15 | + - Create AI agents with access to specific vehicles |
| 16 | + - Query vehicle identity (make, model, owner) via GraphQL |
| 17 | + - Query real-time telemetry (speed, fuel, location) via GraphQL |
| 18 | + - Perform location-based web searches |
| 19 | + - Stream responses in real-time using Server-Sent Events (SSE) |
| 20 | + - Multi-agent delegation architecture with specialized subagents |
| 21 | + """ |
| 22 | + |
| 23 | + def __init__(self, request_method, get_auth_headers, get_full_path, session: Session): |
| 24 | + self._request = request_method |
| 25 | + self._get_auth_headers = get_auth_headers |
| 26 | + self._get_full_path = get_full_path |
| 27 | + self._session = session |
| 28 | + |
| 29 | + def health_check(self) -> Dict: |
| 30 | + """ |
| 31 | + Check the service status and configuration. |
| 32 | +
|
| 33 | + Returns: |
| 34 | + dict: Service health information including status, version, proxy, and default_model |
| 35 | +
|
| 36 | + Example: |
| 37 | + >>> dimo = DIMO("Production") |
| 38 | + >>> health = dimo.conversations.health_check() |
| 39 | + >>> print(health['status']) |
| 40 | + """ |
| 41 | + response = self._request("GET", "Conversations", "/") |
| 42 | + return response |
| 43 | + |
| 44 | + def create_agent( |
| 45 | + self, |
| 46 | + developer_jwt: str, |
| 47 | + user: str, |
| 48 | + vehicle_ids: Optional[List[int]] = None, |
| 49 | + ) -> Dict: |
| 50 | + """ |
| 51 | + Create a new conversational agent for a user with optional vehicle access. |
| 52 | +
|
| 53 | + Args: |
| 54 | + developer_jwt (str): Developer JWT token for authentication |
| 55 | + user (str): Wallet address (0x...) or email identifying the user |
| 56 | + vehicle_ids (list[int], optional): List of vehicle token IDs this agent can access. |
| 57 | + - None (default): Unrestricted access, ownership validated at runtime |
| 58 | + - []: Empty list means no vehicle access (identity queries only) |
| 59 | + - [872, 1234]: Explicit list of allowed vehicles |
| 60 | +
|
| 61 | + Returns: |
| 62 | + dict: Agent information including agentId, mode, user, vehicleIds, and createdAt |
| 63 | +
|
| 64 | + Behavior: |
| 65 | + - One agent per user (idempotent creation) |
| 66 | + - Validates configuration and mode detection |
| 67 | + - Creates/reuses shared identity subagent |
| 68 | + - Creates per-vehicle telemetry subagents with token exchange |
| 69 | + - Creates shared websearch subagent if enabled |
| 70 | +
|
| 71 | + Example: |
| 72 | + >>> dimo = DIMO("Production") |
| 73 | + >>> dev_jwt = "your_developer_jwt" |
| 74 | + >>> agent = dimo.conversations.create_agent( |
| 75 | + ... developer_jwt=dev_jwt, |
| 76 | + ... user="0x1234567890abcdef1234567890abcdef12345678", |
| 77 | + ... vehicle_ids=[872, 1234], |
| 78 | + ... ) |
| 79 | + >>> print(agent['agentId']) |
| 80 | + """ |
| 81 | + check_type("developer_jwt", developer_jwt, str) |
| 82 | + check_type("user", user, str) |
| 83 | + check_optional_type("vehicle_ids", vehicle_ids, list) |
| 84 | + # check_type("enable_websearch", enable_websearch, bool) |
| 85 | + |
| 86 | + body = { |
| 87 | + "user": user, |
| 88 | + "vehicleIds": vehicle_ids, |
| 89 | + # "enableWebsearch": enable_websearch, |
| 90 | + } |
| 91 | + |
| 92 | + response = self._request( |
| 93 | + "POST", |
| 94 | + "Conversations", |
| 95 | + "/agents", |
| 96 | + headers=self._get_auth_headers(developer_jwt), |
| 97 | + data=body, |
| 98 | + ) |
| 99 | + return response |
| 100 | + |
| 101 | + def delete_agent(self, developer_jwt: str, agent_id: str) -> Dict: |
| 102 | + """ |
| 103 | + Delete an agent and all associated resources. |
| 104 | +
|
| 105 | + Args: |
| 106 | + developer_jwt (str): Developer JWT token for authentication |
| 107 | + agent_id (str): The agent ID to delete |
| 108 | +
|
| 109 | + Returns: |
| 110 | + dict: Confirmation message |
| 111 | +
|
| 112 | + Behavior: |
| 113 | + - Deletes Letta agent from server |
| 114 | + - Removes metadata from AgentManager |
| 115 | + - Cleanup errors are logged but don't fail the request |
| 116 | +
|
| 117 | + Example: |
| 118 | + >>> dimo = DIMO("Production") |
| 119 | + >>> dev_jwt = "your_developer_jwt" |
| 120 | + >>> result = dimo.conversations.delete_agent( |
| 121 | + ... developer_jwt=dev_jwt, |
| 122 | + ... agent_id="agent-abc123" |
| 123 | + ... ) |
| 124 | + >>> print(result['message']) |
| 125 | + """ |
| 126 | + check_type("developer_jwt", developer_jwt, str) |
| 127 | + check_type("agent_id", agent_id, str) |
| 128 | + |
| 129 | + response = self._request( |
| 130 | + "DELETE", |
| 131 | + "Conversations", |
| 132 | + f"/agents/{agent_id}", |
| 133 | + headers=self._get_auth_headers(developer_jwt), |
| 134 | + ) |
| 135 | + return response |
| 136 | + |
| 137 | + def send_message( |
| 138 | + self, |
| 139 | + developer_jwt: str, |
| 140 | + agent_id: str, |
| 141 | + message: str, |
| 142 | + vehicle_ids: Optional[List[int]] = None, |
| 143 | + user: Optional[str] = None, |
| 144 | + ) -> Dict: |
| 145 | + """ |
| 146 | + Send a message to an agent and receive the complete response (synchronous). |
| 147 | +
|
| 148 | + Args: |
| 149 | + developer_jwt (str): Developer JWT token for authentication |
| 150 | + agent_id (str): The agent ID to send the message to |
| 151 | + message (str): The message to send to the agent |
| 152 | + vehicle_ids (list[int], optional): Optional vehicle IDs override |
| 153 | + user (str, optional): Optional user override |
| 154 | +
|
| 155 | + Returns: |
| 156 | + dict: Response including agentId, message, response, vehiclesQueried, and timestamp |
| 157 | +
|
| 158 | + Behavior: |
| 159 | + - Synchronous request/response |
| 160 | + - Agent delegates to subagents as needed |
| 161 | + - Returns full response after agent completes reasoning |
| 162 | + - Timeout: 120 seconds for complex queries |
| 163 | +
|
| 164 | + Example: |
| 165 | + >>> dimo = DIMO("Production") |
| 166 | + >>> dev_jwt = "your_developer_jwt" |
| 167 | + >>> response = dimo.conversations.send_message( |
| 168 | + ... developer_jwt=dev_jwt, |
| 169 | + ... agent_id="agent-abc123", |
| 170 | + ... message="What's the make and model of my vehicle?" |
| 171 | + ... ) |
| 172 | + >>> print(response['response']) |
| 173 | + """ |
| 174 | + check_type("developer_jwt", developer_jwt, str) |
| 175 | + check_type("agent_id", agent_id, str) |
| 176 | + check_type("message", message, str) |
| 177 | + check_optional_type("vehicle_ids", vehicle_ids, list) |
| 178 | + check_optional_type("user", user, str) |
| 179 | + |
| 180 | + body = {"message": message} |
| 181 | + if vehicle_ids is not None: |
| 182 | + body["vehicleIds"] = vehicle_ids |
| 183 | + if user is not None: |
| 184 | + body["user"] = user |
| 185 | + |
| 186 | + response = self._request( |
| 187 | + "POST", |
| 188 | + "Conversations", |
| 189 | + f"/agents/{agent_id}/message", |
| 190 | + headers=self._get_auth_headers(developer_jwt), |
| 191 | + data=body, |
| 192 | + ) |
| 193 | + return response |
| 194 | + |
| 195 | + def stream_message( |
| 196 | + self, |
| 197 | + developer_jwt: str, |
| 198 | + agent_id: str, |
| 199 | + message: str, |
| 200 | + vehicle_ids: Optional[List[int]] = None, |
| 201 | + user: Optional[str] = None, |
| 202 | + ) -> Generator[Dict[str, Any], None, None]: |
| 203 | + """ |
| 204 | + Send a message and receive real-time token-by-token streaming response via SSE. |
| 205 | +
|
| 206 | + Args: |
| 207 | + developer_jwt (str): Developer JWT token for authentication |
| 208 | + agent_id (str): The agent ID to send the message to |
| 209 | + message (str): The message to send to the agent |
| 210 | + vehicle_ids (list[int], optional): Optional vehicle IDs override |
| 211 | + user (str, optional): Optional user override |
| 212 | +
|
| 213 | + Yields: |
| 214 | + dict: SSE events with either {"content": "token"} or {"done": true, ...metadata} |
| 215 | +
|
| 216 | + Behavior: |
| 217 | + - Real-time streaming for better UX |
| 218 | + - Token-by-token generation from LLM |
| 219 | + - Final message includes metadata (agentId, vehiclesQueried) |
| 220 | +
|
| 221 | + Example: |
| 222 | + >>> dimo = DIMO("Production") |
| 223 | + >>> dev_jwt = "your_developer_jwt" |
| 224 | + >>> for chunk in dimo.conversations.stream_message( |
| 225 | + ... developer_jwt=dev_jwt, |
| 226 | + ... agent_id="agent-abc123", |
| 227 | + ... message="What's my current speed?" |
| 228 | + ... ): |
| 229 | + ... if "content" in chunk: |
| 230 | + ... print(chunk["content"], end="", flush=True) |
| 231 | + ... elif "done" in chunk: |
| 232 | + ... print(f"\\nVehicles queried: {chunk['vehiclesQueried']}") |
| 233 | + """ |
| 234 | + check_type("developer_jwt", developer_jwt, str) |
| 235 | + check_type("agent_id", agent_id, str) |
| 236 | + check_type("message", message, str) |
| 237 | + check_optional_type("vehicle_ids", vehicle_ids, list) |
| 238 | + check_optional_type("user", user, str) |
| 239 | + |
| 240 | + body = {"message": message} |
| 241 | + if vehicle_ids is not None: |
| 242 | + body["vehicleIds"] = vehicle_ids |
| 243 | + if user is not None: |
| 244 | + body["user"] = user |
| 245 | + |
| 246 | + headers = self._get_auth_headers(developer_jwt) |
| 247 | + headers["Accept"] = "text/event-stream" |
| 248 | + headers["Content-Type"] = "application/json" |
| 249 | + |
| 250 | + # Build full URL |
| 251 | + url = self._get_full_path("Conversations", f"/agents/{agent_id}/stream") |
| 252 | + |
| 253 | + # Make streaming request directly with session |
| 254 | + try: |
| 255 | + response = self._session.request( |
| 256 | + method="POST", |
| 257 | + url=url, |
| 258 | + headers=headers, |
| 259 | + data=json.dumps(body), |
| 260 | + stream=True, |
| 261 | + ) |
| 262 | + response.raise_for_status() |
| 263 | + except RequestException as exc: |
| 264 | + status = getattr(exc.response, "status_code", None) |
| 265 | + body_error = None |
| 266 | + try: |
| 267 | + body_error = exc.response.json() |
| 268 | + except Exception: |
| 269 | + body_error = exc.response.text if exc.response else None |
| 270 | + raise HTTPError(status=status or -1, message=str(exc), body=body_error) |
| 271 | + |
| 272 | + # Parse SSE stream |
| 273 | + for line in response.iter_lines(): |
| 274 | + if line: |
| 275 | + line = line.decode("utf-8") |
| 276 | + if line.startswith("data: "): |
| 277 | + data = line[6:] # Remove "data: " prefix |
| 278 | + try: |
| 279 | + yield json.loads(data) |
| 280 | + except json.JSONDecodeError: |
| 281 | + # Skip malformed JSON |
| 282 | + continue |
| 283 | + |
| 284 | + def get_history( |
| 285 | + self, |
| 286 | + developer_jwt: str, |
| 287 | + agent_id: str, |
| 288 | + limit: int = 100, |
| 289 | + ) -> Dict: |
| 290 | + """ |
| 291 | + Retrieve all messages in a conversation. |
| 292 | +
|
| 293 | + Args: |
| 294 | + developer_jwt (str): Developer JWT token for authentication |
| 295 | + agent_id (str): The agent ID to get history for |
| 296 | + limit (int): Maximum number of messages to return (default: 100) |
| 297 | +
|
| 298 | + Returns: |
| 299 | + dict: Conversation history including agentId, messages array, and total count |
| 300 | +
|
| 301 | + Behavior: |
| 302 | + - Retrieves from Letta server |
| 303 | + - Includes all message roles (user, agent, system) |
| 304 | + - Reverse chronological order (newest first) |
| 305 | +
|
| 306 | + Example: |
| 307 | + >>> dimo = DIMO("Production") |
| 308 | + >>> dev_jwt = "your_developer_jwt" |
| 309 | + >>> history = dimo.conversations.get_history( |
| 310 | + ... developer_jwt=dev_jwt, |
| 311 | + ... agent_id="agent-abc123", |
| 312 | + ... limit=50 |
| 313 | + ... ) |
| 314 | + >>> for msg in history['messages']: |
| 315 | + ... print(f"{msg['role']}: {msg['content']}") |
| 316 | + """ |
| 317 | + check_type("developer_jwt", developer_jwt, str) |
| 318 | + check_type("agent_id", agent_id, str) |
| 319 | + check_type("limit", limit, int) |
| 320 | + |
| 321 | + response = self._request( |
| 322 | + "GET", |
| 323 | + "Conversations", |
| 324 | + f"/agents/{agent_id}/history", |
| 325 | + headers=self._get_auth_headers(developer_jwt), |
| 326 | + params={"limit": limit}, |
| 327 | + ) |
| 328 | + return response |
0 commit comments