-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdemo.py
More file actions
153 lines (121 loc) · 7.12 KB
/
demo.py
File metadata and controls
153 lines (121 loc) · 7.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
"""
Demo script showing the Agent 2 Agent workflow structure without requiring API keys
This script demonstrates the flow and structure of the application
"""
import asyncio
from dataclasses import dataclass
from typing import List
@dataclass
class MockAgentResponse:
"""Mock response from an AI agent for demo purposes"""
cloud_platform: str
agent_name: str
favorite_color: str
message: str
class MockAgent2AgentDemo:
"""Demo implementation showing the Agent 2 Agent workflow structure"""
def __init__(self):
self.agents = self._create_mock_agents()
self.workflow_history: List[MockAgentResponse] = []
def _create_mock_agents(self) -> List[dict]:
"""Create mock agent configurations"""
rainbow_colors = ["Red", "Orange", "Yellow", "Green", "Blue", "Indigo", "Violet"]
agents = [
{"platform": "AWS", "name": "Alexandra", "color": rainbow_colors[0]},
{"platform": "Azure", "name": "Benjamin", "color": rainbow_colors[1]},
{"platform": "GCP", "name": "Catherine", "color": rainbow_colors[2]},
{"platform": "AWS", "name": "David", "color": rainbow_colors[3]},
{"platform": "Azure", "name": "Elena", "color": rainbow_colors[4]},
{"platform": "GCP", "name": "Francis", "color": rainbow_colors[5]},
{"platform": "Multi-Cloud", "name": "Gabriel", "color": rainbow_colors[6]}
]
return agents
async def mock_agent_response(self, agent: dict, task: str) -> MockAgentResponse:
"""Generate a mock response from an agent"""
# Simulate processing time
await asyncio.sleep(0.5)
mock_messages = {
"Alexandra": f"Hello! I'm Alexandra from AWS. My favorite color is {agent['color']}. I'll handle the cloud infrastructure setup for our rainbow presentation using AWS services like EC2 and S3.",
"Benjamin": f"Greetings! Benjamin here from Azure. I love the color {agent['color']}. I'll contribute by setting up Azure DevOps pipelines and Azure Functions for our collaborative project.",
"Catherine": f"Hi everyone! Catherine from GCP reporting in. My favorite color is {agent['color']}. I'll utilize Google Cloud Run and BigQuery to add data analytics to our rainbow-themed presentation.",
"David": f"Hey there! David from AWS. I'm passionate about {agent['color']}. I'll complement Alexandra's work by setting up AWS Lambda functions and API Gateway for our project.",
"Elena": f"Hello team! Elena from Azure here. I absolutely love {agent['color']}. I'll work on Azure Cognitive Services integration to add AI capabilities to our presentation.",
"Francis": f"Bonjour! Francis from GCP. My favorite color is {agent['color']}. I'll implement Google Cloud Storage and Pub/Sub messaging to ensure seamless communication between our services.",
"Gabriel": f"Greetings all! Gabriel, your multi-cloud coordinator. I cherish the color {agent['color']}. I'll orchestrate the integration across all cloud platforms to bring our rainbow vision to life."
}
return MockAgentResponse(
cloud_platform=agent["platform"],
agent_name=agent["name"],
favorite_color=agent["color"],
message=mock_messages.get(agent["name"], f"I'm {agent['name']} from {agent['platform']} and my favorite color is {agent['color']}.")
)
async def execute_demo_workflow(self, main_task: str) -> List[MockAgentResponse]:
"""Execute the demo Agent 2 Agent workflow"""
print(f"🚀 Starting Agent 2 Agent Demo Workflow: {main_task}")
print("=" * 60)
all_responses = []
# Process each agent sequentially
for i, agent in enumerate(self.agents, 1):
print(f"\n📋 Step {i}: Delegating to {agent['name']} on {agent['platform']}")
try:
response = await self.mock_agent_response(agent, main_task)
all_responses.append(response)
self.workflow_history.append(response)
print(f"✅ {agent['name']} ({agent['platform']}) completed their task")
print(f" Favorite Color: {agent['color']}")
print(f" Response: {response.message[:80]}...")
except Exception as e:
print(f"❌ Error with {agent['name']}: {str(e)}")
continue
print(f"\n🔄 Aggregating responses from {len(all_responses)} agents...")
return all_responses
def generate_demo_report(self, responses: List[MockAgentResponse]) -> str:
"""Generate demo report from all agent responses"""
report = "\n" + "=" * 60
report += "\n🌈 DEMO AGENT 2 AGENT WORKFLOW REPORT"
report += "\n" + "=" * 60
report += f"\n\n📊 Total Agents Participated: {len(responses)}"
report += "\n\n🌟 Agent Summary:"
for i, response in enumerate(responses, 1):
report += f"\n\n{i}. Agent: {response.agent_name}"
report += f"\n Cloud Platform: {response.cloud_platform}"
report += f"\n Favorite Color: {response.favorite_color}"
report += f"\n Message: {response.message}"
# Group by cloud platform
cloud_summary = {}
for response in responses:
if response.cloud_platform not in cloud_summary:
cloud_summary[response.cloud_platform] = []
cloud_summary[response.cloud_platform].append(response)
report += "\n\n☁️ Cloud Platform Summary:"
for cloud, agents in cloud_summary.items():
report += f"\n\n{cloud}:"
for agent in agents:
report += f"\n - {agent.agent_name} (Color: {agent.favorite_color})"
report += "\n\n🎯 Next Steps:"
report += "\n1. Configure your Azure OpenAI credentials in .env file"
report += "\n2. Run 'python main.py' to execute the real Agent 2 Agent workflow"
report += "\n3. Watch as real AI agents collaborate across cloud platforms!"
return report
async def main():
"""Main demo function"""
print("🎭 Semantic Kernel Agent 2 Agent Protocol - DEMO MODE")
print("=" * 55)
print("This demo shows the workflow structure without requiring API keys")
print("=" * 55)
# Initialize the demo
demo = MockAgent2AgentDemo()
# Define the main task
main_task = "Create a collaborative rainbow-themed presentation where each agent contributes their expertise from their respective cloud platform"
try:
# Execute the demo workflow
responses = await demo.execute_demo_workflow(main_task)
# Generate and display demo report
demo_report = demo.generate_demo_report(responses)
print(demo_report)
print(f"\n✨ Demo completed successfully with {len(responses)} agent responses!")
except Exception as e:
print(f"❌ Demo failed: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())