-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathmain.py
More file actions
187 lines (154 loc) · 6.05 KB
/
main.py
File metadata and controls
187 lines (154 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import argparse
import uvicorn
from agentmesh.common import logger
from agentmesh.common import load_config, config, ModelFactory
from agentmesh.protocol import AgentTeam, Agent, Task
from agentmesh.tools.tool_manager import ToolManager
def create_team_from_config(team_name):
"""Create a team from configuration."""
# Get teams configuration
teams_config = config().get("teams", {})
# Check if the specified team exists
if team_name not in teams_config:
print(f"Error: Team '{team_name}' not found in configuration.")
available_teams = list(teams_config.keys())
print(f"Available teams: {', '.join(available_teams)}")
return None
# Get team configuration
team_config = teams_config[team_name]
# Initialize ModelFactory
model_factory = ModelFactory()
# Get team's model
team_model_name = team_config.get("model", "gpt-4o")
team_model = model_factory.get_model(team_model_name)
# Get team's max_steps (default to 100 if not specified)
team_max_steps = team_config.get("max_steps", 100)
# Create team with the model
team = AgentTeam(
name=team_name,
description=team_config.get("description", ""),
rule=team_config.get("rule", ""),
model=team_model,
max_steps=team_max_steps
)
# Create and add agents to the team
agents_config = team_config.get("agents", [])
for agent_config in agents_config:
# Check if agent has a specific model
if agent_config.get("model"):
agent_model = model_factory.get_model(agent_config.get("model"))
else:
agent_model = team_model
# Get agent's max_steps
agent_max_steps = agent_config.get("max_steps")
agent = Agent(
name=agent_config.get("name", ""),
system_prompt=agent_config.get("system_prompt", ""),
model=agent_model, # Use agent's model if specified, otherwise will use team's model
description=agent_config.get("description", ""),
max_steps=agent_max_steps
)
# Add tools to the agent if specified
tool_names = agent_config.get("tools", [])
tool_manager = ToolManager()
for tool_name in tool_names:
tool = tool_manager.create_tool(tool_name)
if tool:
agent.add_tool(tool)
else:
if tool_name == "browser":
logger.warning(
"Tool 'Browser' loaded failed, "
"please install the required dependency with: \n"
"'pip install browser-use>=0.1.40' or 'pip install agentmesh-sdk[full]'\n"
)
else:
logger.warning(f"Tool '{tool_name}' not found for agent '{agent.name}'\n")
# Add agent to team
team.add(agent)
return team
def list_available_teams():
"""List all available teams from configuration."""
teams_config = config().get("teams", {})
if not teams_config:
print("No teams found in configuration.")
return
print("Available teams:")
for team_name, team_config in teams_config.items():
print(f" - {team_name}: {team_config.get('description', '')}")
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="AgentMesh - Multi-Agent Framework")
parser.add_argument("-t", "--team", help="Specify the team to run")
parser.add_argument("-l", "--list", action="store_true", help="List available teams")
parser.add_argument("-q", "--query", help="Direct query to run (non-interactive mode)")
parser.add_argument("-s", "--server", action="store_true", help="Start API server")
args = parser.parse_args()
# Load configuration
load_config()
# Load tools
ToolManager().load_tools("agentmesh/tools")
# Start API server if requested
if args.server:
print("Starting AgentMesh API server...")
print("Server will be available at: http://localhost:8000")
print("API documentation: http://localhost:8000/docs")
print("Press Ctrl+C to stop the server")
try:
uvicorn.run(
"agentmesh.api.app:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
except KeyboardInterrupt:
print("\nServer stopped.")
return
# List teams if requested
if args.list:
list_available_teams()
return
# If no team is specified, show usage
if not args.team:
parser.print_help()
return
# Create team from configuration
team = create_team_from_config(args.team)
if not team:
return
# If a direct query is provided, run it in non-interactive mode
if args.query:
print(f"Team '{team.name}' loaded successfully.")
print(f"User task: {args.query}")
print()
# Run the team with the user's query
team.run(Task(content=args.query), output_mode="print")
return
# Otherwise, run in interactive mode
print(f"Team '{team.name}' loaded successfully.")
print(f"Description: {team.description}")
print(f"Number of agents: {len(team.agents)}")
print("\nEnter your task (type 'exit' to quit):")
count = 0
# Interactive loop
while True:
try:
if count > 0:
team = create_team_from_config(args.team)
count += 1
user_input = input("> ")
if user_input.lower() in ["exit", "quit", "q"]:
print("Exiting AgentMesh. Goodbye!")
break
if user_input.strip():
# Run the team with the user's task
team.run(Task(content=user_input), output_mode="print")
print("\nEnter your next task (type 'exit' to quit):")
except KeyboardInterrupt:
print("\nExiting AgentMesh. Goodbye!")
break
except Exception as e:
logger.error(f"Error: {e}")
if __name__ == "__main__":
main()