Skip to content

Commit 78bf050

Browse files
author
AgentPatterns
committed
feat(examples/python): add runnable example
1 parent adcc7ee commit 78bf050

6 files changed

Lines changed: 615 additions & 0 deletions

File tree

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Multi-Agent Collaboration - Python Implementation
2+
3+
Runnable implementation of a multi-agent collaboration flow where specialized
4+
agents align on one launch decision across multiple rounds.
5+
6+
---
7+
8+
## Quick start
9+
10+
```bash
11+
# (optional) create venv
12+
python -m venv .venv && source .venv/bin/activate
13+
14+
# install dependencies
15+
pip install -r requirements.txt
16+
17+
# set API key
18+
export OPENAI_API_KEY="sk-..."
19+
20+
# run the agent
21+
python main.py
22+
```
23+
24+
## Full walkthrough
25+
26+
Read the complete implementation guide:
27+
https://agentpatterns.tech/en/agent-patterns/multi-agent-collaboration
28+
29+
## What's inside
30+
31+
- Role-based team loop (`demand`, `finance`, `risk`)
32+
- Shared context board passed to every role
33+
- Collaboration gateway with strict contribution contract
34+
- Round policy: conflict detection + consensus rule
35+
- Runtime budgets (`max_rounds`, `max_messages`, `max_seconds`)
36+
- Final synthesis after team alignment
37+
- Trace and history for auditability
38+
39+
## Project layout
40+
41+
```text
42+
examples/
43+
agent-patterns/
44+
multi-agent-collaboration/
45+
python/
46+
README.md
47+
main.py
48+
llm.py
49+
gateway.py
50+
signals.py
51+
requirements.txt
52+
```
53+
54+
## Notes
55+
56+
- Code and README are English-only by design.
57+
- The website provides multilingual explanations and theory.
58+
59+
## License
60+
61+
MIT
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from typing import Any
5+
6+
7+
class StopRun(Exception):
8+
def __init__(self, reason: str):
9+
super().__init__(reason)
10+
self.reason = reason
11+
12+
13+
@dataclass(frozen=True)
14+
class Budget:
15+
max_rounds: int = 3
16+
max_messages: int = 12
17+
max_seconds: int = 40
18+
min_go_votes: int = 2
19+
20+
21+
ALLOWED_STANCES = {"go", "caution", "block"}
22+
23+
24+
25+
def _is_number(value: Any) -> bool:
26+
return isinstance(value, (int, float)) and not isinstance(value, bool)
27+
28+
29+
30+
def validate_contribution(raw: Any, *, allowed_agents: set[str]) -> dict[str, Any]:
31+
if not isinstance(raw, dict):
32+
raise StopRun("invalid_contribution:not_object")
33+
34+
required = {"agent", "stance", "summary", "confidence", "actions"}
35+
if not required.issubset(raw.keys()):
36+
raise StopRun("invalid_contribution:missing_keys")
37+
38+
agent = raw["agent"]
39+
stance = raw["stance"]
40+
summary = raw["summary"]
41+
confidence = raw["confidence"]
42+
actions = raw["actions"]
43+
44+
if not isinstance(agent, str) or not agent.strip():
45+
raise StopRun("invalid_contribution:agent")
46+
agent = agent.strip()
47+
if agent not in allowed_agents:
48+
raise StopRun(f"invalid_contribution:agent_not_allowed:{agent}")
49+
50+
if not isinstance(stance, str) or stance.strip() not in ALLOWED_STANCES:
51+
raise StopRun("invalid_contribution:stance")
52+
stance = stance.strip()
53+
54+
if not isinstance(summary, str) or not summary.strip():
55+
raise StopRun("invalid_contribution:summary")
56+
57+
if not _is_number(confidence):
58+
raise StopRun("invalid_contribution:confidence_type")
59+
confidence = float(confidence)
60+
if not (0.0 <= confidence <= 1.0):
61+
raise StopRun("invalid_contribution:confidence_range")
62+
63+
if not isinstance(actions, list) or not actions:
64+
raise StopRun("invalid_contribution:actions")
65+
66+
normalized_actions: list[str] = []
67+
for item in actions:
68+
if not isinstance(item, str) or not item.strip():
69+
raise StopRun("invalid_contribution:action_item")
70+
normalized_actions.append(item.strip())
71+
72+
# Ignore unknown keys to tolerate extra LLM fields.
73+
return {
74+
"agent": agent,
75+
"stance": stance,
76+
"summary": summary.strip(),
77+
"confidence": round(confidence, 3),
78+
"actions": normalized_actions[:3],
79+
}
80+
81+
82+
83+
def detect_conflicts(contributions: list[dict[str, Any]]) -> list[str]:
84+
if not contributions:
85+
return ["no_contributions"]
86+
87+
stances = {item["stance"] for item in contributions}
88+
conflicts: list[str] = []
89+
90+
if "go" in stances and "caution" in stances and "block" not in stances:
91+
conflicts.append("go_vs_caution")
92+
if "block" in stances and len(stances) > 1:
93+
conflicts.append("blocking_vs_non_block")
94+
if len(stances) == 3:
95+
conflicts.append("high_divergence")
96+
97+
return conflicts
98+
99+
100+
101+
def decide_round_outcome(
102+
contributions: list[dict[str, Any]],
103+
*,
104+
min_go_votes: int,
105+
) -> str | None:
106+
go_votes = sum(1 for item in contributions if item["stance"] == "go")
107+
caution_votes = sum(1 for item in contributions if item["stance"] == "caution")
108+
block_votes = sum(1 for item in contributions if item["stance"] == "block")
109+
110+
if block_votes >= 2:
111+
return "no_go"
112+
113+
if block_votes > 0:
114+
return None
115+
if go_votes >= min_go_votes and caution_votes == 0:
116+
return "go"
117+
if go_votes >= min_go_votes and caution_votes > 0:
118+
return "go_with_caution"
119+
return None
120+
121+
122+
class CollaborationGateway:
123+
def __init__(self, *, allow: set[str], budget: Budget):
124+
self.allow = set(allow)
125+
self.budget = budget
126+
self.message_count = 0
127+
128+
def _consume_message_budget(self) -> None:
129+
self.message_count += 1
130+
if self.message_count > self.budget.max_messages:
131+
raise StopRun("max_messages")
132+
133+
def accept(self, raw: Any, *, expected_agent: str) -> dict[str, Any]:
134+
if expected_agent not in self.allow:
135+
raise StopRun(f"agent_denied:{expected_agent}")
136+
137+
self._consume_message_budget()
138+
contribution = validate_contribution(raw, allowed_agents=self.allow)
139+
140+
if contribution["agent"] != expected_agent:
141+
raise StopRun(f"invalid_contribution:agent_mismatch:{expected_agent}")
142+
143+
return contribution
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import os
5+
from typing import Any
6+
7+
from openai import APIConnectionError, APITimeoutError, OpenAI
8+
9+
MODEL = os.getenv("OPENAI_MODEL", "gpt-4.1-mini")
10+
LLM_TIMEOUT_SECONDS = float(os.getenv("OPENAI_TIMEOUT_SECONDS", "60"))
11+
12+
13+
class LLMTimeout(Exception):
14+
pass
15+
16+
17+
class LLMEmpty(Exception):
18+
pass
19+
20+
21+
COMMON_RULES = """
22+
Return exactly one JSON object with this shape:
23+
{
24+
"agent": "<role_name>",
25+
"stance": "go|caution|block",
26+
"summary": "one short paragraph",
27+
"confidence": 0.0,
28+
"actions": ["action 1", "action 2"]
29+
}
30+
31+
Rules:
32+
- Use only the provided facts.
33+
- Keep actions concrete and operational.
34+
- Do not output markdown or extra keys.
35+
""".strip()
36+
37+
AGENT_PROMPTS = {
38+
"demand_analyst": (
39+
"You are Demand Analyst. Focus on demand capacity, conversion, and traffic risks. "
40+
"Decide whether launch is feasible from growth and operational demand perspective."
41+
),
42+
"finance_analyst": (
43+
"You are Finance Analyst. Focus on revenue, margin, campaign cost, and downside exposure. "
44+
"Decide if launch economics are acceptable."
45+
),
46+
"risk_analyst": (
47+
"You are Risk Analyst. Focus on payment reliability, chargebacks, and incidents. "
48+
"Prioritize safety and compliance risk containment."
49+
),
50+
"legal_analyst": (
51+
"You are Legal Analyst. Focus on regulatory, compliance, consumer protection, and policy constraints. "
52+
"Flag launch blockers and required mitigations."
53+
),
54+
}
55+
56+
FINAL_SYSTEM_PROMPT = """
57+
You are a launch readiness editor.
58+
Write a short operations brief in English.
59+
Include:
60+
- final decision (go/go_with_caution/no_go)
61+
- why the team agreed
62+
- top 2 immediate actions
63+
Use only evidence from collaboration history.
64+
""".strip()
65+
66+
67+
68+
def _get_client() -> OpenAI:
69+
api_key = os.getenv("OPENAI_API_KEY")
70+
if not api_key:
71+
raise EnvironmentError(
72+
"OPENAI_API_KEY is not set. Run: export OPENAI_API_KEY='sk-...'"
73+
)
74+
return OpenAI(api_key=api_key)
75+
76+
77+
78+
def _round_summaries(history: list[dict[str, Any]], limit: int = 2) -> list[dict[str, Any]]:
79+
summaries: list[dict[str, Any]] = []
80+
for row in history[-limit:]:
81+
summaries.append(
82+
{
83+
"round": row.get("round"),
84+
"decision": row.get("decision"),
85+
"conflicts": row.get("conflicts", []),
86+
"stances": [
87+
{
88+
"agent": item.get("agent"),
89+
"stance": item.get("stance"),
90+
"confidence": item.get("confidence"),
91+
}
92+
for item in row.get("contributions", [])
93+
],
94+
}
95+
)
96+
return summaries
97+
98+
99+
100+
def propose_contribution(
101+
*,
102+
role: str,
103+
goal: str,
104+
shared_context: dict[str, Any],
105+
history: list[dict[str, Any]],
106+
open_conflicts: list[str],
107+
) -> dict[str, Any]:
108+
system = AGENT_PROMPTS.get(role)
109+
if not system:
110+
raise ValueError(f"unknown_role:{role}")
111+
112+
payload = {
113+
"goal": goal,
114+
"role": role,
115+
"shared_context": shared_context,
116+
"recent_rounds": _round_summaries(history, limit=2),
117+
"open_conflicts": open_conflicts,
118+
}
119+
120+
client = _get_client()
121+
try:
122+
completion = client.chat.completions.create(
123+
model=MODEL,
124+
temperature=0,
125+
timeout=LLM_TIMEOUT_SECONDS,
126+
response_format={"type": "json_object"},
127+
messages=[
128+
{"role": "system", "content": f"{system}\n\n{COMMON_RULES}"},
129+
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
130+
],
131+
)
132+
except (APITimeoutError, APIConnectionError) as exc:
133+
raise LLMTimeout("llm_timeout") from exc
134+
135+
text = completion.choices[0].message.content or "{}"
136+
try:
137+
return json.loads(text)
138+
except json.JSONDecodeError:
139+
return {"invalid": True, "raw": text}
140+
141+
142+
143+
def compose_final_answer(
144+
*,
145+
goal: str,
146+
final_decision: str,
147+
history: list[dict[str, Any]],
148+
) -> str:
149+
payload = {
150+
"goal": goal,
151+
"final_decision": final_decision,
152+
"history": history,
153+
}
154+
155+
client = _get_client()
156+
try:
157+
completion = client.chat.completions.create(
158+
model=MODEL,
159+
temperature=0,
160+
timeout=LLM_TIMEOUT_SECONDS,
161+
messages=[
162+
{"role": "system", "content": FINAL_SYSTEM_PROMPT},
163+
{"role": "user", "content": json.dumps(payload, ensure_ascii=True)},
164+
],
165+
)
166+
except (APITimeoutError, APIConnectionError) as exc:
167+
raise LLMTimeout("llm_timeout") from exc
168+
169+
text = (completion.choices[0].message.content or "").strip()
170+
if not text:
171+
raise LLMEmpty("llm_empty")
172+
return text

0 commit comments

Comments
 (0)