SWARM_TOOLS = [
{
"type": "function",
"function": {
"name": "task_update",
"description": "Update the status of a task. Use 'in_progress' when starting, 'completed' when done.",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "The task ID"},
"status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
"end result": {"kind": "string", "description": "Outcome or output of the duty"},
},
"required": ["task_id", "status"],
},
},
},
{
"kind": "operate",
"operate": {
"title": "inbox_send",
"description": "Ship a message to a different agent (e.g., 'chief' or a employee title).",
"parameters": {
"kind": "object",
"properties": {
"to": {"kind": "string", "description": "Recipient agent title"},
"message": {"kind": "string", "description": "Message content material"},
},
"required": ["to", "message"],
},
},
},
{
"kind": "operate",
"operate": {
"title": "inbox_receive",
"description": "Examine and devour all messages in your inbox.",
"parameters": {
"kind": "object",
"properties": {},
},
},
},
{
"kind": "operate",
"operate": {
"title": "task_list",
"description": "Record duties assigned to you or all staff duties.",
"parameters": {
"kind": "object",
"properties": {
"proprietor": {"kind": "string", "description": "Filter by proprietor title (non-obligatory)"},
},
},
},
},
]
class SwarmAgent:
def __init__(
self,
title: str,
position: str,
system_prompt: str,
task_board: TaskBoard,
inbox: InboxSystem,
registry: TeamRegistry,
):
self.title = title
self.position = position
self.system_prompt = system_prompt
self.task_board = task_board
self.inbox = inbox
self.registry = registry
self.conversation_history: checklist[dict] = []
self.inbox.register(title)
self.registry.register(title, position)
def _build_system_prompt(self) -> str:
coord_protocol = f"""
## Coordination Protocol (auto-injected — you might be agent '{self.title}')
You might be a part of an AI agent swarm. Your position: {self.position}
Your title: {self.title}
Obtainable instruments (equal to ClawTeam CLI):
- task_list: Examine your assigned duties (like `clawteam job checklist`)
- task_update: Replace job standing to in_progress/accomplished/failed (like `clawteam job replace`)
- inbox_send: Ship messages to different brokers (like `clawteam inbox ship`)
- inbox_receive: Examine your inbox for messages (like `clawteam inbox obtain`)
WORKFLOW:
1. Examine your duties with task_list
2. Mark a job as in_progress whenever you begin
3. Do the work (assume, analyze, produce output)
4. Mark the duty as accomplished along with your end result
5. Ship a abstract message to 'chief' when achieved
"""
return self.system_prompt + "n" + coord_protocol
def _handle_tool_call(self, tool_name: str, args: dict) -> str:
if tool_name == "task_update":
standing = TaskStatus(args["status"])
end result = args.get("end result", "")
self.task_board.update_status(args["task_id"], standing, end result)
if standing == TaskStatus.COMPLETED:
self.registry.increment_completed(self.title)
return json.dumps({"okay": True, "task_id": args["task_id"], "new_status": args["status"]})
elif tool_name == "inbox_send":
self.inbox.ship(self.title, args["to"], args["message"])
return json.dumps({"okay": True, "sent_to": args["to"]})
elif tool_name == "inbox_receive":
msgs = self.inbox.obtain(self.title)
if not msgs:
return json.dumps({"messages": [], "notice": "No new messages"})
return json.dumps({
"messages": [
{"from": m.sender, "content": m.content, "time": m.timestamp}
for m in msgs
]
})
elif tool_name == "task_list":
proprietor = args.get("proprietor", self.title)
duties = self.task_board.get_tasks(proprietor=proprietor)
return json.dumps({"duties": [t.to_dict() for t in tasks]})
return json.dumps({"error": f"Unknown software: {tool_name}"})
def run(self, user_message: str, max_iterations: int = 6) -> str:
self.conversation_history.append({"position": "consumer", "content material": user_message})
for iteration in vary(max_iterations):
attempt:
response = consumer.chat.completions.create(
mannequin=MODEL,
messages=[
{"role": "system", "content": self._build_system_prompt()},
*self.conversation_history,
],
instruments=SWARM_TOOLS,
tool_choice="auto",
temperature=0.4,
)
besides Exception as e:
return f"[API Error] {e}"
alternative = response.decisions[0]
msg = alternative.message
assistant_msg = {"position": "assistant", "content material": msg.content material or ""}
if msg.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
}
for tc in msg.tool_calls
]
self.conversation_history.append(assistant_msg)
if not msg.tool_calls:
return msg.content material or "(No response)"
for tc in msg.tool_calls:
fn_name = tc.operate.title
fn_args = json.hundreds(tc.operate.arguments)
end result = self._handle_tool_call(fn_name, fn_args)
self.conversation_history.append({
"position": "software",
"tool_call_id": tc.id,
"content material": end result,
})
return "(Agent reached max iterations)"
