|
| 1 | +"""Agentic Chat Agent Implementation. |
| 2 | +
|
| 3 | +This module implements a conversational chat agent using CrewAI Flow framework |
| 4 | +integrated with Cloudbase Agent. The agent uses LiteLLM for model interactions and |
| 5 | +supports streaming responses. |
| 6 | +""" |
| 7 | + |
| 8 | +import os |
| 9 | +import sys |
| 10 | +import pysqlite3 |
| 11 | +from pathlib import Path |
| 12 | + |
| 13 | +# Configure HOME and CrewAI storage directories for SCF environment |
| 14 | +os.environ["HOME"] = "/tmp" |
| 15 | +os.environ["CREWAI_STORAGE_DIR"] = "/tmp" |
| 16 | + |
| 17 | +# Create required directories |
| 18 | +credentials_dir = Path("/tmp/.local/share/crewai/credentials") |
| 19 | +credentials_dir.mkdir(parents=True, exist_ok=True) |
| 20 | + |
| 21 | +# Replace sqlite3 with pysqlite3 for SCF compatibility |
| 22 | +sys.modules['sqlite3'] = pysqlite3 |
| 23 | + |
| 24 | +from crewai import Crew, Agent, Task |
| 25 | + |
| 26 | +try: |
| 27 | + from crewai.flow import Flow, start, persist |
| 28 | +except ModuleNotFoundError as exc: |
| 29 | + raise ImportError( |
| 30 | + "crewai.flow is required. Please install a CrewAI version that includes Flow (e.g., crewai>=1.7.2)." |
| 31 | + ) from exc |
| 32 | +from litellm import acompletion |
| 33 | +from crewai.events.event_bus import crewai_event_bus |
| 34 | +from ag_ui.core import EventType |
| 35 | +from cloudbase_agent.crewai import CrewAIAgent as _BaseCrewAIAgent |
| 36 | +from cloudbase_agent.crewai.converters import CopilotKitState |
| 37 | +from cloudbase_agent.crewai.context import flow_context |
| 38 | +from cloudbase_agent.crewai.events import BridgedTextMessageChunkEvent |
| 39 | + |
| 40 | +from dotenv import load_dotenv |
| 41 | +load_dotenv() |
| 42 | + |
| 43 | +class CrewAIAgent(_BaseCrewAIAgent): |
| 44 | + """Override run to avoid BaseAgent.as_current cross-context reset.""" |
| 45 | + |
| 46 | + async def run(self, run_input): |
| 47 | + if getattr(self, "_should_fix_event_ids", True): |
| 48 | + async for event in self._run_internal(run_input): |
| 49 | + yield super()._fix_event_ids(event, run_input.thread_id, run_input.run_id) |
| 50 | + else: |
| 51 | + async for event in self._run_internal(run_input): |
| 52 | + yield event |
| 53 | + |
| 54 | + |
| 55 | +@persist() |
| 56 | +class AgenticChatFlow(Flow[CopilotKitState]): |
| 57 | + """Conversational chat flow using CrewAI framework. |
| 58 | +
|
| 59 | + This flow implements a basic chat agent that processes user messages |
| 60 | + and generates streaming responses using LiteLLM completion API. |
| 61 | +
|
| 62 | + :ivar state: Flow state containing conversation messages and CopilotKit context |
| 63 | + :type state: CopilotKitState |
| 64 | + """ |
| 65 | + |
| 66 | + @start() |
| 67 | + async def chat(self) -> None: |
| 68 | + """Process chat messages and generate streaming responses. |
| 69 | +
|
| 70 | + This method is the entry point of the flow. It sends messages to the LLM, |
| 71 | + streams the response back, and updates the conversation state. |
| 72 | +
|
| 73 | + The method: |
| 74 | + 1. Constructs messages with system prompt and conversation history |
| 75 | + 2. Calls LiteLLM completion API with streaming enabled |
| 76 | + 3. Wraps the response in copilotkit_stream for proper formatting |
| 77 | + 4. Appends the assistant's response to conversation state |
| 78 | +
|
| 79 | + :raises Exception: If LLM completion fails or streaming encounters errors |
| 80 | + """ |
| 81 | + system_prompt = "You are a helpful assistant." |
| 82 | + |
| 83 | + try: |
| 84 | + model_name = os.getenv("OPENAI_MODEL") |
| 85 | + base_url = os.getenv("OPENAI_BASE_URL") |
| 86 | + api_key = os.getenv("OPENAI_API_KEY") |
| 87 | + tools = getattr(self.state.copilotkit, "actions", []) |
| 88 | + tools_arg = tools if tools else None |
| 89 | + |
| 90 | + # Run the model and stream the response |
| 91 | + stream = await acompletion( |
| 92 | + model=model_name, |
| 93 | + messages=[{"role": "system", "content": system_prompt}, *self.state.messages], |
| 94 | + tools=tools_arg, |
| 95 | + parallel_tool_calls=False, |
| 96 | + stream=False, |
| 97 | + base_url=base_url, |
| 98 | + api_key=api_key, |
| 99 | + custom_llm_provider="openai", |
| 100 | + ) |
| 101 | + |
| 102 | + message = stream.choices[0].message |
| 103 | + content = message.content if hasattr(message, "content") else None |
| 104 | + |
| 105 | + if content: |
| 106 | + flow = flow_context.get(None) |
| 107 | + if flow is not None: |
| 108 | + crewai_event_bus.emit( |
| 109 | + flow, |
| 110 | + BridgedTextMessageChunkEvent( |
| 111 | + type=EventType.TEXT_MESSAGE_CHUNK, |
| 112 | + message_id=getattr(message, "id", None) or stream.id, |
| 113 | + role="assistant", |
| 114 | + delta=content, |
| 115 | + ), |
| 116 | + ) |
| 117 | + |
| 118 | + self.state.messages.append(message) |
| 119 | + except Exception as e: |
| 120 | + print(f"[CrewAI Flow Chat] {e}") |
| 121 | + |
| 122 | + |
| 123 | +def build_chat_workflow() -> AgenticChatFlow: |
| 124 | + """Build and return a new chat workflow instance. |
| 125 | +
|
| 126 | + This factory function creates a fresh instance of AgenticChatFlow |
| 127 | + for each conversation to ensure proper isolation. |
| 128 | +
|
| 129 | + :return: New instance of the chat workflow |
| 130 | + :rtype: AgenticChatFlow |
| 131 | + """ |
| 132 | + return AgenticChatFlow() |
0 commit comments