From 756752d2b399de1d0c83b3f473d4de18153ab64b Mon Sep 17 00:00:00 2001 From: Humza Sami Date: Thu, 17 Jul 2025 17:38:28 +0500 Subject: [PATCH 1/3] feat: added Architect files functionality into nexus --- .../nexus/architect/NEXUS_DOCUMENTATION.md | 1351 +++++++++++++++++ primisai/nexus/architect/__init__.py | 15 + primisai/nexus/architect/builder.py | 537 +++++++ primisai/nexus/architect/evaluator.py | 517 +++++++ primisai/nexus/architect/expander.py | 56 + primisai/nexus/architect/manager.py | 188 +++ primisai/nexus/architect/prompter.py | 330 ++++ primisai/nexus/architect/prompts.py | 169 +++ primisai/nexus/architect/schemas.py | 53 + primisai/nexus/architect/structurer.py | 69 + 10 files changed, 3285 insertions(+) create mode 100644 primisai/nexus/architect/NEXUS_DOCUMENTATION.md create mode 100644 primisai/nexus/architect/__init__.py create mode 100644 primisai/nexus/architect/builder.py create mode 100644 primisai/nexus/architect/evaluator.py create mode 100644 primisai/nexus/architect/expander.py create mode 100644 primisai/nexus/architect/manager.py create mode 100644 primisai/nexus/architect/prompter.py create mode 100644 primisai/nexus/architect/prompts.py create mode 100644 primisai/nexus/architect/schemas.py create mode 100644 primisai/nexus/architect/structurer.py diff --git a/primisai/nexus/architect/NEXUS_DOCUMENTATION.md b/primisai/nexus/architect/NEXUS_DOCUMENTATION.md new file mode 100644 index 0000000..1d6ab70 --- /dev/null +++ b/primisai/nexus/architect/NEXUS_DOCUMENTATION.md @@ -0,0 +1,1351 @@ +# PrimisAI Nexus Framework - User Guide + +## 1. Introduction +PrimisAI Nexus is a powerful Python framework for building and managing hierarchical AI agent systems. It provides a flexible architecture for creating specialized AI agents that can work together under supervisor coordination, making it ideal for complex task automation and multi-agent workflows. + +### Key Features +- Hierarchical agent management +- Flexible configuration system +- Built-in conversation history tracking +- Comprehensive logging system +- Tool integration capabilities +- Support for stateful and stateless agents +- YAML-based configuration + +## 2. Installation + +### Via pip +```bash +pip install primisai +``` + +### From source +```bash +git clone git@github.com:PrimisAI/nexus.git +cd nexus +pip install -e . +``` + +### Environment Setup +Create a `.env` file with your LLM configuration: +```env +LLM_MODEL=your-model-name +LLM_API_KEY=your-api-key +LLM_BASE_URL=your-api-base-url +``` + +## 3. Core Components + +### 3.1 AI Base Class +The foundation for all AI interactions in the framework. + +```python +def __init__(self, llm_config: Dict[str, str]): + """ + Initialize AI instance. + + Args: + llm_config (Dict[str, str]): Configuration containing: + - api_key: OpenAI API key + - model: Model name + - base_url: API base URL + """ + +def generate_response( + self, + messages: List[Dict[str, str]], + tools: Optional[List[Dict[str, Any]]] = None, + use_tools: bool = False +) -> ChatCompletion: + """ + Generate response using OpenAI's API. + + Args: + messages: Conversation history + tools: Optional tool configurations + use_tools: Whether to use tools + + Returns: + ChatCompletion: OpenAI API response + """ +``` + +```python +from primisai.nexus.core import AI + +llm_config = { + "api_key": "your-api-key", + "model": "your-model", + "base_url": "your-base-url" +} + +ai = AI(llm_config=llm_config) +``` + +### 3.2 Agent Class +Extends AI base class with specialized capabilities. + +```python +class Agent(AI): + """Specialized AI agent with tool usage capabilities.""" + + def __init__( + self, + name: str, + llm_config: Dict[str, str], + workflow_id: Optional[str] = None, + tools: Optional[List[Dict[str, Any]]] = None, + system_message: Optional[str] = None, + use_tools: bool = False, + keep_history: bool = True, + output_schema: Optional[Dict[str, Any]] = None, + strict: bool = False): + ): + """ + Initialize Agent. + + Args: + name: Agent name + llm_config: LLM configuration + workflow_id: Optional workflow identifier + tools: List of available tools + system_message: Initial system message + use_tools: Enable tool usage + keep_history: Maintain chat history + output_schema (Optional[Dict[str, Any]]): Schema for agent's output format. + strict (bool): If True, always enforce output schema. + """ + + def chat(self, query: str, sender_name: Optional[str] = None) -> str: + """ + Process a chat interaction. + + Args: + query: User input + sender_name: Name of message sender + + Returns: + str: Agent's response + """ + + def get_chat_history(self) -> List[Dict[str, str]]: + """ + Get current chat history. + + Returns: + List[Dict[str, str]]: Chat history + """ +``` + + +```python +from primisai.nexus.core import Agent + +agent = Agent( + name="SpecializedAgent", + llm_config=llm_config, + system_message="Your specialized role description", + tools=optional_tools, + use_tools=True, + keep_history=True +) +``` + +### 3.3 Supervisor Class +Manages and coordinates multiple agents. + +```python +class Supervisor(AI): + """Manages and coordinates multiple AI agents.""" + + def __init__( + self, + name: str, + llm_config: Dict[str, str], + workflow_id: Optional[str] = None, + is_assistant: bool = False, + system_message: Optional[str] = None + ): + """ + Initialize Supervisor. + + Args: + name: Supervisor name + llm_config: LLM configuration + workflow_id: Optional workflow identifier + is_assistant: Whether this is an assistant supervisor + system_message: Initial system message + """ + + def register_agent(self, agent: Union[Agent, 'Supervisor']) -> None: + """ + Register an agent or assistant supervisor. + + Args: + agent: Agent or assistant supervisor to register + """ + + def chat( + self, + query: str, + sender_name: Optional[str] = None, + supervisor_chain: Optional[List[str]] = None + ) -> str: + """ + Process user input and coordinate responses. + + Args: + query: User input + sender_name: Name of message sender + supervisor_chain: Chain of supervisors in delegation + + Returns: + str: Final response + """ + + def start_interactive_session(self) -> None: + """Start interactive chat session.""" + + def display_agent_graph(self, indent: str = "", skip_header: bool = False) -> None: + """ + Display hierarchical structure. + + Args: + indent: Current indentation level + skip_header: Whether to skip printing supervisor header + """ +``` + +```python +from primisai.nexus.core import Supervisor + +supervisor = Supervisor( + name="MainSupervisor", + llm_config=llm_config, + system_message="Your coordination instructions", + workflow_id=optional_workflow_id, + is_assistant=False +) +``` + +## 4. Basic Usage + +### 4.1 Creating a Simple Agent System +```python +from primisai.nexus.core import Agent, Supervisor + +# Create supervisor +supervisor = Supervisor( + name="MainSupervisor", + llm_config=llm_config, + system_message="You are the main coordinator." +) + +# Create agents +agent1 = Agent( + name="Agent1", + llm_config=llm_config, + system_message="You are specialized in task A." +) + +agent2 = Agent( + name="Agent2", + llm_config=llm_config, + system_message="You are specialized in task B." +) + +# Register agents +supervisor.register_agent(agent1) +supervisor.register_agent(agent2) + +# Start interaction +response = supervisor.chat("What can you help me with?") +``` + +### 4.2 Interactive Sessions +```python +# Start an interactive chat session +supervisor.start_interactive_session() +``` + +## 5. Advanced Features + +### 5.1 Hierarchical Structure +```python +# Create main supervisor +main_supervisor = Supervisor( + name="MainSupervisor", + llm_config=llm_config, + system_message="Main coordination instructions" +) + +# Create assistant supervisor +assistant_supervisor = Supervisor( + name="AssistantSupervisor", + llm_config=llm_config, + is_assistant=True, + system_message="Specialized domain coordination" +) + +# Create specialized agents +agent1 = Agent(name="Agent1", llm_config=llm_config) +agent2 = Agent(name="Agent2", llm_config=llm_config) + +# Build hierarchy +assistant_supervisor.register_agent(agent1) +assistant_supervisor.register_agent(agent2) +main_supervisor.register_agent(assistant_supervisor) + +# Display hierarchy +main_supervisor.display_agent_graph() +``` + +### 5.2 Stateful vs Stateless Agents +```python +# Stateful agent (maintains conversation history) +stateful_agent = Agent( + name="StatefulAgent", + llm_config=llm_config, + keep_history=True +) + +# Stateless agent (treats each interaction independently) +stateless_agent = Agent( + name="StatelessAgent", + llm_config=llm_config, + keep_history=False +) +``` + +## 6. Configuration Management + +### 6.1 YAML Configuration +```yaml +# config.yaml +supervisor: + name: MainSupervisor + type: supervisor + llm_config: + model: ${LLM_MODEL} + api_key: ${LLM_API_KEY} + base_url: ${LLM_BASE_URL} + system_message: "Main supervisor instructions" + children: + - name: Agent1 + type: agent + llm_config: + model: ${LLM_MODEL} + api_key: ${LLM_API_KEY} + base_url: ${LLM_BASE_URL} + system_message: "Agent1 instructions" + tools: + - name: tool_name + type: function + python_path: path.to.function + description: "Tool description" +``` + +### 6.2 Loading Configuration +```python +from primisai.nexus.config import load_yaml_config, AgentFactory + +config = load_yaml_config('path/to/config.yaml') +factory = AgentFactory() +supervisor = factory.create_from_config(config) +``` + +## 7. Tools Integration + +### 7.1 Creating Tools +```python +# Define tool function +def calculator(num1: int, num2: int, operation: str) -> float: + if operation == "add": return num1 + num2 + elif operation == "multiply": return num1 * num2 + raise ValueError("Unsupported operation") + +# Define tool metadata +calculator_metadata = { + "type": "function", + "function": { + "name": "calculate", + "description": "Perform calculations", + "parameters": { + "type": "object", + "properties": { + "num1": {"type": "integer"}, + "num2": {"type": "integer"}, + "operation": { + "type": "string", + "enum": ["add", "multiply"] + } + }, + "required": ["num1", "num2", "operation"] + } + } +} + +# Create tool-enabled agent +calculator_agent = Agent( + name="CalculatorAgent", + llm_config=llm_config, + tools=[{"tool": calculator, "metadata": calculator_metadata}], + use_tools=True +) +``` + +## 8. Real-World Examples + +### 8.1 Multiple Tool Integration +This example demonstrates how to create an agent with multiple tools and a supervisor to manage complex tasks. + +```python +from primisai.nexus.core import Agent, Supervisor + +# Tool 1: Calculator +calculator_metadata = { + "type": "function", + "function": { + "name": "calculate", + "description": "Perform basic arithmetic operations.", + "parameters": { + "type": "object", + "properties": { + "num1": {"type": "integer", "description": "First number"}, + "num2": {"type": "integer", "description": "Second number"}, + "operation": { + "type": "string", + "description": "Mathematical operation to perform", + "enum": ["add", "subtract", "multiply", "divide"] + } + }, + "required": ["num1", "num2", "operation"] + } + } +} + +def calculator(num1: int, num2: int, operation: str) -> float: + if operation == "add": return num1 + num2 + elif operation == "subtract": return num1 - num2 + elif operation == "multiply": return num1 * num2 + elif operation == "divide": return num1 / num2 + else: raise ValueError("Invalid operation") + +# Tool 2: Text Processor +text_processor_metadata = { + "type": "function", + "function": { + "name": "process_text", + "description": "Process text with various operations.", + "parameters": { + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to process"}, + "operation": { + "type": "string", + "description": "Text operation to perform", + "enum": ["uppercase", "lowercase", "reverse"] + } + }, + "required": ["text", "operation"] + } + } +} + +def text_processor(text: str, operation: str) -> str: + if operation == "uppercase": return text.upper() + elif operation == "lowercase": return text.lower() + elif operation == "reverse": return text[::-1] + else: raise ValueError("Invalid operation") + +# Create agents with their respective tools +calculator_agent = Agent( + name="Calculator_Agent", + llm_config=llm_config, + system_message="You are a mathematical calculator agent.", + tools=[{"tool": calculator, "metadata": calculator_metadata}], + use_tools=True +) + +text_agent = Agent( + name="Text_Processor_Agent", + llm_config=llm_config, + system_message="You are a text processing agent.", + tools=[{"tool": text_processor, "metadata": text_processor_metadata}], + use_tools=True +) + +# Create and set up supervisor +supervisor = Supervisor( + name="Multi_Tool_Supervisor", + llm_config=llm_config, + system_message="You are a helpful supervisor who can assign tasks to agents." +) + +supervisor.register_agent(calculator_agent) +supervisor.register_agent(text_agent) + +# Example usage +query = """I need three things done: +1. Add 15 and 27 +2. Convert 'Hello World' to uppercase +3. Multiply 8 by 6""" + +response = supervisor.chat(query) +print(f"Response: {response}") +``` + +### 8.2 Context-Aware Task Management +This example shows how to implement a task management system with context awareness using YAML configuration. + +```yaml +# config.yaml +supervisor: + name: ContextTaskManager + type: supervisor + llm_config: + model: ${LLM_MODEL} + api_key: ${LLM_API_KEY} + base_url: ${LLM_BASE_URL} + system_message: "You are the context-aware task management supervisor." + children: + - name: StatefulTaskManager + type: agent + llm_config: + model: ${LLM_MODEL} + api_key: ${LLM_API_KEY} + base_url: ${LLM_BASE_URL} + system_message: "You are a stateful task manager that remembers context." + keep_history: true + tools: + - name: add_task_with_context + type: function + python_path: task_tools.add_task_with_context + description: "Add a task with context information" + parameters: + task: + type: string + description: "The task to be added" + context: + type: string + description: "The context/category for the task" +``` + +```python +# task_tools.py +context_tasks = {} + +def add_task_with_context(task: str, context: str) -> str: + if context not in context_tasks: + context_tasks[context] = [] + context_tasks[context].append(task) + return f"Task added to {context}: {task}" + +def list_context_tasks(context: str) -> str: + if context not in context_tasks: + return f"No tasks found in context: {context}" + tasks = context_tasks[context] + return "\n".join(f"{i+1}. {task}" for i, task in enumerate(tasks)) + +def get_contexts() -> str: + if not context_tasks: + return "No contexts available" + return "\n".join(f"- {context} ({len(tasks)} tasks)" + for context, tasks in context_tasks.items()) + +# main.py +from primisai.nexus.config import load_yaml_config, AgentFactory + +config = load_yaml_config('config.yaml') +factory = AgentFactory() +context_manager = factory.create_from_config(config) + +# Example usage +context_manager.chat("Add a task 'Update documentation' to the 'Development' context") +context_manager.chat("What tasks are in the Development context?") +``` + +### 8.3 Hierarchical Agent Structure with History Tracking +This example demonstrates how to create a complex hierarchical structure with history tracking. + +```python +from primisai.nexus.core import Agent, Supervisor + +# Create specialized agents +math_agent = Agent( + name="MathAgent", + llm_config=llm_config, + system_message="You are a mathematical computation specialist." +) + +writing_agent = Agent( + name="WritingAgent", + llm_config=llm_config, + system_message="You are a creative writing specialist." +) + +# Create assistant supervisor +domain_supervisor = Supervisor( + name="DomainSupervisor", + llm_config=llm_config, + is_assistant=True, + system_message="You manage specialized domain tasks." +) + +# Create main supervisor +main_supervisor = Supervisor( + name="MainSupervisor", + llm_config=llm_config, + system_message="You coordinate all operations." +) + +# Build hierarchy +domain_supervisor.register_agent(math_agent) +domain_supervisor.register_agent(writing_agent) +main_supervisor.register_agent(domain_supervisor) + +# Display hierarchy +main_supervisor.display_agent_graph() + +# Example complex interaction +query = """I need two things: +1. Calculate 25 * 4 +2. Write a haiku about mathematics""" + +response = main_supervisor.chat(query) + +# View conversation history +print("\nMain Supervisor History:") +for msg in main_supervisor.get_chat_history(): + print(f"Role: {msg['role']}") + print(f"Content: {msg['content']}\n") + +print("\nMath Agent History:") +for msg in math_agent.get_chat_history(): + print(f"Role: {msg['role']}") + print(f"Content: {msg['content']}\n") +``` + +### 8.4 Terminal Command Execution Agent +This example shows how to create an agent that can execute terminal commands with proper error handling. + +```python +import subprocess +from primisai.nexus.core import Agent + +# Tool definition +def execute_command(argument: str): + try: + result = subprocess.run( + argument, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + output = result.stdout + result.stderr + if result.returncode == 0: + return {"status": "success", "output": output.strip()} + else: + return {"status": "error", "output": output.strip()} + except Exception as e: + return {"status": "error", "output": str(e)} + +command_tool_metadata = { + "type": "function", + "function": { + "name": "execute_command", + "description": "Execute a command on the terminal", + "parameters": { + "type": "object", + "properties": { + "argument": { + "type": "string", + "description": "Command to execute" + } + }, + "required": ["argument"] + } + } +} + +# Create command execution agent +terminal_agent = Agent( + name="TerminalAgent", + llm_config=llm_config, + system_message="You can execute terminal commands safely.", + tools=[{"tool": execute_command, "metadata": command_tool_metadata}], + use_tools=True +) + +# Example usage +response = terminal_agent.chat("List files in the current directory") +print(response) +``` + +### 8.5 Stateful vs Stateless Agents Comparison +This example demonstrates the difference between agents that maintain conversation history and those that don't. + +```python +from primisai.nexus.core import Agent + +# Initialize two agents with different history settings +stateful_agent = Agent( + name="StatefulAgent", + llm_config=llm_config, + system_message="You are a helpful assistant.", + keep_history=True # default behavior +) + +stateless_agent = Agent( + name="StatelessAgent", + llm_config=llm_config, + system_message="You are a helpful assistant.", + keep_history=False +) + +# Test queries that reference previous interactions +queries = [ + "What is the capital of France?", + "What is the population of this city?", + "Is this city bigger than London?" +] + +print("\nTesting Stateful Agent (remembers context):") +print("=======================================") +for query in queries: + response = stateful_agent.chat(query) + print(f"\nQuery: {query}") + print(f"Response: {response}") + +print("\nTesting Stateless Agent (no context memory):") +print("=========================================") +for query in queries: + response = stateless_agent.chat(query) + print(f"\nQuery: {query}") + print(f"Response: {response}") +``` + +### 8.6 Multi-Argument Function Calling +This example shows how to create and use tools with multiple arguments. + +```python +from primisai.nexus.core import Agent + +# Define a complex tool with multiple arguments +complex_calculator_metadata = { + "type": "function", + "function": { + "name": "complex_calculate", + "description": "Perform mathematical operations with multiple numbers.", + "parameters": { + "type": "object", + "properties": { + "num1": { + "type": "integer", + "description": "First number" + }, + "num2": { + "type": "integer", + "description": "Second number" + }, + "num3": { + "type": "integer", + "description": "Third number" + }, + "operation": { + "type": "string", + "description": "Operation to perform", + "enum": ["sum", "product", "average"] + } + }, + "required": ["num1", "num2", "num3", "operation"] + } + } +} + +def complex_calculate(num1: int, num2: int, num3: int, operation: str) -> float: + if operation == "sum": + return num1 + num2 + num3 + elif operation == "product": + return num1 * num2 * num3 + elif operation == "average": + return (num1 + num2 + num3) / 3 + raise ValueError(f"Unsupported operation: {operation}") + +# Create agent with the complex calculator tool +math_agent = Agent( + name="ComplexMathAgent", + llm_config=llm_config, + system_message="You are a mathematical agent capable of complex calculations.", + tools=[{"tool": complex_calculate, "metadata": complex_calculator_metadata}], + use_tools=True +) + +# Example usage +query = "Calculate the average of numbers 10, 15, and 20" +response = math_agent.chat(query) +print(f"Query: {query}") +print(f"Response: {response}") +``` + +### 8.7 Multi-Tool Agent +This example demonstrates how to create a single agent with multiple diverse tools. + +```python +from primisai.nexus.core import Agent + +# Tool 1: Text Analysis +text_analysis_metadata = { + "type": "function", + "function": { + "name": "analyze_text", + "description": "Analyze text properties", + "parameters": { + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to analyze"}, + "analysis_type": { + "type": "string", + "enum": ["word_count", "char_count", "sentence_count"] + } + }, + "required": ["text", "analysis_type"] + } + } +} + +def analyze_text(text: str, analysis_type: str) -> int: + if analysis_type == "word_count": + return len(text.split()) + elif analysis_type == "char_count": + return len(text) + elif analysis_type == "sentence_count": + return len([s for s in text.split('.') if s.strip()]) + raise ValueError("Invalid analysis type") + +# Tool 2: Data Conversion +data_conversion_metadata = { + "type": "function", + "function": { + "name": "convert_units", + "description": "Convert between different units", + "parameters": { + "type": "object", + "properties": { + "value": {"type": "number", "description": "Value to convert"}, + "from_unit": { + "type": "string", + "enum": ["km", "miles", "kg", "lbs"] + }, + "to_unit": { + "type": "string", + "enum": ["km", "miles", "kg", "lbs"] + } + }, + "required": ["value", "from_unit", "to_unit"] + } + } +} + +def convert_units(value: float, from_unit: str, to_unit: str) -> float: + conversions = { + ("km", "miles"): lambda x: x * 0.621371, + ("miles", "km"): lambda x: x * 1.60934, + ("kg", "lbs"): lambda x: x * 2.20462, + ("lbs", "kg"): lambda x: x * 0.453592 + } + + if (from_unit, to_unit) in conversions: + return conversions[(from_unit, to_unit)](value) + raise ValueError("Unsupported conversion") + +# Tool 3: Date Operations +date_operations_metadata = { + "type": "function", + "function": { + "name": "date_operation", + "description": "Perform date calculations", + "parameters": { + "type": "object", + "properties": { + "date": { + "type": "string", + "description": "Date in YYYY-MM-DD format" + }, + "operation": { + "type": "string", + "enum": ["add_days", "subtract_days"] + }, + "days": { + "type": "integer", + "description": "Number of days" + } + }, + "required": ["date", "operation", "days"] + } + } +} + +from datetime import datetime, timedelta + +def date_operation(date: str, operation: str, days: int) -> str: + date_obj = datetime.strptime(date, "%Y-%m-%d") + if operation == "add_days": + result = date_obj + timedelta(days=days) + elif operation == "subtract_days": + result = date_obj - timedelta(days=days) + else: + raise ValueError("Invalid operation") + return result.strftime("%Y-%m-%d") + +# Create multi-tool agent +multi_tool_agent = Agent( + name="MultiToolAgent", + llm_config=llm_config, + system_message="""You are a versatile agent capable of: + 1. Text analysis + 2. Unit conversions + 3. Date calculations + Use the appropriate tool based on the user's request.""", + tools=[ + {"tool": analyze_text, "metadata": text_analysis_metadata}, + {"tool": convert_units, "metadata": data_conversion_metadata}, + {"tool": date_operation, "metadata": date_operations_metadata} + ], + use_tools=True +) + +# Example usage +queries = [ + "How many words are in the text 'The quick brown fox jumps over the lazy dog'?", + "Convert 100 kilometers to miles", + "What date is 30 days from 2024-03-15?" +] + +for query in queries: + print(f"\nQuery: {query}") + response = multi_tool_agent.chat(query) + print(f"Response: {response}") +``` + +### 8.8 Complex Tool Implementation with Error Handling and Validation +This example shows a more sophisticated tool implementation with proper error handling and input validation. + +```python +from typing import Dict, Any, Optional, Union +import json +from datetime import datetime +import re + +# Complex tool for data processing +data_processor_metadata = { + "type": "function", + "function": { + "name": "process_data", + "description": "Process and validate various types of data", + "parameters": { + "type": "object", + "properties": { + "data": { + "type": "string", + "description": "Data to process (JSON string)" + }, + "data_type": { + "type": "string", + "enum": ["email", "phone", "date", "address"], + "description": "Type of data to validate" + }, + "format_output": { + "type": "boolean", + "description": "Whether to format the output" + } + }, + "required": ["data", "data_type"] + } + } +} + +class DataValidationError(Exception): + """Custom exception for data validation errors.""" + pass + +def validate_email(email: str) -> bool: + pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' + return bool(re.match(pattern, email)) + +def validate_phone(phone: str) -> bool: + pattern = r'^\+?1?\d{9,15}$' + return bool(re.match(pattern, phone)) + +def validate_date(date_str: str) -> bool: + try: + datetime.strptime(date_str, "%Y-%m-%d") + return True + except ValueError: + return False + +def format_address(address: Dict[str, str]) -> str: + return f"{address.get('street', '')}, {address.get('city', '')}, {address.get('state', '')} {address.get('zip', '')}" + +def process_data(data: str, data_type: str, format_output: Optional[bool] = False) -> Dict[str, Any]: + try: + # Parse input data + try: + parsed_data = json.loads(data) + except json.JSONDecodeError: + parsed_data = data + + result = { + "valid": False, + "formatted_data": None, + "error": None + } + + # Process based on data type + if data_type == "email": + result["valid"] = validate_email(parsed_data) + if result["valid"] and format_output: + result["formatted_data"] = parsed_data.lower() + + elif data_type == "phone": + result["valid"] = validate_phone(parsed_data) + if result["valid"] and format_output: + # Format phone number: +1-XXX-XXX-XXXX + digits = re.sub(r'\D', '', parsed_data) + if len(digits) == 10: + result["formatted_data"] = f"+1-{digits[:3]}-{digits[3:6]}-{digits[6:]}" + else: + result["formatted_data"] = parsed_data + + elif data_type == "date": + result["valid"] = validate_date(parsed_data) + if result["valid"] and format_output: + date_obj = datetime.strptime(parsed_data, "%Y-%m-%d") + result["formatted_data"] = date_obj.strftime("%B %d, %Y") + + elif data_type == "address": + if isinstance(parsed_data, dict) and all(k in parsed_data for k in ["street", "city", "state", "zip"]): + result["valid"] = True + if format_output: + result["formatted_data"] = format_address(parsed_data) + else: + raise DataValidationError("Invalid address format") + + return result + + except DataValidationError as e: + return {"valid": False, "error": str(e)} + except Exception as e: + return {"valid": False, "error": f"Processing error: {str(e)}"} + +# Create agent with complex data processing tool +data_processing_agent = Agent( + name="DataProcessor", + llm_config=llm_config, + system_message="You are a data processing specialist with advanced validation capabilities.", + tools=[{"tool": process_data, "metadata": data_processor_metadata}], + use_tools=True +) + +# Example usage +test_queries = [ + 'Validate and format the email "user@example.com"', + 'Validate and format the phone number "1234567890"', + 'Validate and format the date "2024-03-15"', + '''Validate and format the address { + "street": "123 Main St", + "city": "Springfield", + "state": "IL", + "zip": "62701" + }''' +] + +for query in test_queries: + print(f"\nQuery: {query}") + response = data_processing_agent.chat(query) + print(f"Response: {response}") +``` + +### 8.9 Schema-Aware Agents +This example demonstrates how to create agents with structured outputs using output schemas and validation. + +```python +from primisai.nexus.core import Agent, Supervisor + +# Example 1: Code Generation Agent with Schema +code_agent = Agent( + name="CodeWriter", + llm_config=llm_config, + system_message="You are a skilled programmer who writes clean, documented code.", + output_schema='''{ + "type": "object", + "properties": { + "description": { + "type": "string", + "description": "Explanation of the code's purpose" + }, + "code": { + "type": "string", + "description": "The actual code implementation" + }, + "language": { + "type": "string", + "description": "Programming language used" + } + }, + "required": ["description", "code"] + }''', + strict=True # Always enforce schema +) + +# Example 2: Analysis Agent with Non-Strict Schema +analysis_agent = Agent( + name="DataAnalyst", + llm_config=llm_config, + system_message="You analyze data and provide structured insights.", + output_schema='''{ + "type": "object", + "properties": { + "summary": { + "type": "string", + "description": "Brief analysis summary" + }, + "key_points": { + "type": "array", + "items": {"type": "string"}, + "description": "List of key findings" + }, + "recommendations": { + "type": "array", + "items": {"type": "string"}, + "description": "Suggested actions" + } + }, + "required": ["summary", "key_points"] + }''', + strict=False # Allow fallback to unstructured responses +) + +# Example 3: Content Creation Agent with Metadata Schema +content_agent = Agent( + name="ContentCreator", + llm_config=llm_config, + system_message="You create structured content with metadata.", + output_schema='''{ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "Content title" + }, + "content": { + "type": "string", + "description": "Main content body" + }, + "metadata": { + "type": "object", + "properties": { + "category": {"type": "string"}, + "tags": { + "type": "array", + "items": {"type": "string"} + }, + "readingTime": {"type": "string"} + } + } + }, + "required": ["title", "content"] + }''', + strict=True +) + +# Create a supervisor to manage schema-aware agents +supervisor = Supervisor( + name="ContentSupervisor", + llm_config=llm_config, + system_message="You coordinate between different content creation and analysis tasks." +) + +supervisor.register_agent(code_agent) +supervisor.register_agent(analysis_agent) +supervisor.register_agent(content_agent) +``` + +### 8.10 Task Management System with Structured Outputs +This example shows how to build a task management system where each agent produces structured outputs for reliable downstream processing. + +```python +from primisai.nexus.core import Agent, Supervisor + +# Task Creation Agent with Schema +task_creator = Agent( + name="TaskCreator", + llm_config=llm_config, + system_message="You create well-structured task definitions.", + output_schema='''{ + "type": "object", + "properties": { + "task_id": { + "type": "string", + "description": "Unique identifier for the task" + }, + "title": { + "type": "string", + "description": "Task title" + }, + "description": { + "type": "string", + "description": "Detailed task description" + }, + "requirements": { + "type": "array", + "items": {"type": "string"}, + "description": "List of requirements" + }, + "estimated_hours": { + "type": "number", + "description": "Estimated hours to complete" + }, + "priority": { + "type": "string", + "enum": ["low", "medium", "high", "urgent"] + } + }, + "required": ["task_id", "title", "description", "priority"] + }''', + strict=True +) + +# Task Analyzer with Complex Schema +task_analyzer = Agent( + name="TaskAnalyzer", + llm_config=llm_config, + system_message="You analyze tasks and provide structured assessments.", + output_schema='''{ + "type": "object", + "properties": { + "analysis": { + "type": "object", + "properties": { + "complexity_score": { + "type": "integer", + "description": "Task complexity (1-10)" + }, + "risk_factors": { + "type": "array", + "items": { + "type": "object", + "properties": { + "factor": {"type": "string"}, + "severity": {"type": "string", "enum": ["low", "medium", "high"]}, + "mitigation": {"type": "string"} + } + } + }, + "dependencies": { + "type": "array", + "items": {"type": "string"} + } + } + }, + "recommendations": { + "type": "object", + "properties": { + "team_size": {"type": "integer"}, + "skill_requirements": { + "type": "array", + "items": {"type": "string"} + }, + "suggested_timeline": { + "type": "object", + "properties": { + "start_date": {"type": "string"}, + "end_date": {"type": "string"}, + "milestones": { + "type": "array", + "items": { + "type": "object", + "properties": { + "description": {"type": "string"}, + "date": {"type": "string"} + } + } + } + } + } + } + } + }, + "required": ["analysis", "recommendations"] + }''', + strict=True +) + +# Progress Reporter with Metrics Schema +progress_reporter = Agent( + name="ProgressReporter", + llm_config=llm_config, + system_message="You generate structured progress reports with metrics.", + output_schema='''{ + "type": "object", + "properties": { + "project_metrics": { + "type": "object", + "properties": { + "completion_percentage": {"type": "number"}, + "hours_logged": {"type": "number"}, + "tasks_completed": {"type": "integer"}, + "tasks_remaining": {"type": "integer"} + } + }, + "milestone_status": { + "type": "array", + "items": { + "type": "object", + "properties": { + "milestone": {"type": "string"}, + "status": {"type": "string", "enum": ["pending", "in_progress", "completed", "delayed"]}, + "actual_vs_planned": {"type": "string"} + } + } + }, + "key_achievements": { + "type": "array", + "items": {"type": "string"} + }, + "blockers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "description": {"type": "string"}, + "impact": {"type": "string"}, + "resolution_plan": {"type": "string"} + } + } + }, + "next_steps": { + "type": "array", + "items": {"type": "string"} + } + }, + "required": ["project_metrics", "milestone_status", "next_steps"] + }''', + strict=True +) + +# Create Project Manager Supervisor +project_manager = Supervisor( + name="ProjectManager", + llm_config=llm_config, + system_message="""You are a project manager coordinating task creation, analysis, and reporting. + Ensure all outputs maintain their structured format for reliable project tracking.""" +) + +# Register agents +project_manager.register_agent(task_creator) +project_manager.register_agent(task_analyzer) +project_manager.register_agent(progress_reporter) + +``` \ No newline at end of file diff --git a/primisai/nexus/architect/__init__.py b/primisai/nexus/architect/__init__.py new file mode 100644 index 0000000..a1ee720 --- /dev/null +++ b/primisai/nexus/architect/__init__.py @@ -0,0 +1,15 @@ +from .expander import WorkflowExpander +from .structurer import WorkflowStructurer +from .builder import WorkflowBuilder +from .prompter import Prompter +from .evaluator import Evaluator +from .schemas import * +from .prompts import * + +__all__ = [ + "WorkflowExpander", + "WorkflowStructurer", + "WorkflowBuilder", + "Prompter", + "Evaluator", +] diff --git a/primisai/nexus/architect/builder.py b/primisai/nexus/architect/builder.py new file mode 100644 index 0000000..7ad9e6a --- /dev/null +++ b/primisai/nexus/architect/builder.py @@ -0,0 +1,537 @@ +from primisai.nexus.core import Agent, Supervisor +from typing import Dict, Any +from primisai.nexus.architect.schemas import Tool, AgentDefinition, SupervisorDefinition, WorkflowDefinition + + +class ValidationError(Exception): + """Custom exception for validation errors""" + pass + + +class ToolBuilder: + + def __init__(self, tool_definition: Tool): + self.definition = tool_definition + + def build(self) -> Dict[str, Any]: + """Convert Tool definition to Nexus tool format""" + try: + # Create the tool function from implementation string + namespace = {} + exec(self.definition.implementation, namespace) + tool_func = namespace[self.definition.metadata.function.name] + + # Construct the metadata in the correct format + metadata = { + "type": self.definition.metadata.type, + "function": { + "name": self.definition.metadata.function.name, + "description": self.definition.metadata.function.description, + "parameters": { + "type": self.definition.metadata.function.parameters.type, + "properties": { + prop.argument: { + "type": prop.type, + "description": prop.description + } for prop in self.definition.metadata.function.parameters.properties + }, + "required": self.definition.metadata.function.parameters.required + } + } + } + + return {"tool": tool_func, "metadata": metadata} + + except Exception as e: + print(f"Error in tool building: {str(e)}") + raise + + def validate(self) -> bool: + """Run validation tests based on constraints""" + try: + # Validate function implementation + namespace = {} + exec(self.definition.implementation, namespace) + tool_func = namespace[self.definition.metadata.function.name] + + # Validate function signature matches parameters + import inspect + sig = inspect.signature(tool_func) + param_names = set(sig.parameters.keys()) + required_params = set(p.argument for p in self.definition.metadata.function.parameters.properties) + + if param_names != required_params: + print(f"Parameter mismatch: function has {param_names}, metadata requires {required_params}") + return False + + return True + + except Exception as e: + print(f"Tool validation failed: {str(e)}") + return False + + +class AgentBuilder: + + def __init__(self, agent_definition: AgentDefinition, llm_config: Dict[str, str]): + self.definition = agent_definition + self.llm_config = llm_config + + def validate(self) -> bool: + """ + Validate agent definition meets all requirements. + + Validates: + 1. Basic requirements (name, system message) + 2. Tool configuration + 3. History management settings + 4. Output schema configuration # New validation + """ + try: + # 1. Basic validation + self._validate_basic_requirements() + + # 2. Tool validation + self._validate_tool_configuration() + + # 3. History management validation + self._validate_history_management() + + # 4. Output schema validation # New method + self._validate_output_schema() + + return True + except ValidationError as e: + print(f"Agent validation failed: {str(e)}") + return False + except Exception as e: + print(f"Unexpected error in agent validation: {str(e)}") + return False + + def _validate_basic_requirements(self): + """Validate basic agent requirements""" + if not self.definition.name or not self.definition.name.strip(): + raise ValidationError("Agent name cannot be empty") + + if not self.definition.system_message or not self.definition.system_message.strip(): + raise ValidationError("System message cannot be empty") + + def _validate_tool_configuration(self): + """Validate tool configuration consistency""" + if self.definition.use_tools and not self.definition.tools: + raise ValidationError("Agent is configured to use tools but no tools provided") + + if not self.definition.use_tools and self.definition.tools: + raise ValidationError("Tools provided but agent not configured to use them") + + # Validate each tool + if self.definition.tools: + tool_names = set() + for tool in self.definition.tools: + # Check for duplicate tool names + tool_name = tool.metadata.function.name + if tool_name in tool_names: + raise ValidationError(f"Duplicate tool name found: {tool_name}") + tool_names.add(tool_name) + + def _validate_history_management(self): + """Validate history management settings""" + if self.definition.keep_history is None: + raise ValidationError("History management setting must be specified") + + def _validate_output_schema(self): + """Validate output schema configuration if provided""" + if self.definition.output_schema: + try: + # Try to parse the schema string as JSON + import json + schema = json.loads(self.definition.output_schema) + + # Basic schema validation + if not isinstance(schema, dict): + raise ValidationError("Output schema must be a valid JSON object") + + if "type" not in schema: + raise ValidationError("Output schema must have 'type' field") + + if schema["type"] != "object": + raise ValidationError("Output schema type must be 'object'") + + if "properties" not in schema: + raise ValidationError("Output schema must have 'properties' field") + + except json.JSONDecodeError: + raise ValidationError("Output schema must be valid JSON") + + def build(self) -> Agent: + """Build agent if validation passes""" + if not self.validate(): + raise ValidationError(f"Validation failed for agent {self.definition.name}") + + # Build tools first + tools = [] + if self.definition.tools: + for tool_def in self.definition.tools: + tool_builder = ToolBuilder(tool_def) + if tool_builder.validate(): + tools.append(tool_builder.build()) + else: + raise ValidationError(f"Tool validation failed for {tool_def.metadata.function.name}") + + # Parse output schema if provided + output_schema = None + if self.definition.output_schema: + try: + import json + output_schema = json.loads(self.definition.output_schema) + except json.JSONDecodeError as e: + raise ValidationError(f"Invalid output schema JSON: {str(e)}") + + return Agent(name=self.definition.name, + system_message=self.definition.system_message, + llm_config=self.llm_config, + tools=tools if tools else None, + use_tools=self.definition.use_tools, + keep_history=self.definition.keep_history, + output_schema=output_schema, + strict=self.definition.strict) + + +class SupervisorBuilder: + + def __init__(self, supervisor_definition: SupervisorDefinition, llm_config: Dict[str, str]): + self.definition = supervisor_definition + self.llm_config = llm_config + + def validate(self) -> bool: + """ + Validate supervisor definition meets all requirements. + + Validates: + 1. Basic requirements (name, system message) + 2. Management structure + 3. Assistant supervisor constraints + """ + try: + # 1. Basic validation + self._validate_basic_requirements() + + # 2. Management structure validation + self._validate_management_structure() + + # 3. Assistant supervisor constraints + self._validate_assistant_constraints() + + return True + + except ValidationError as e: + print(f"Supervisor validation failed: {str(e)}") + return False + except Exception as e: + print(f"Unexpected error in supervisor validation: {str(e)}") + return False + + def _validate_basic_requirements(self): + """Validate basic supervisor requirements""" + if not self.definition.name or not self.definition.name.strip(): + raise ValidationError("Supervisor name cannot be empty") + + if not self.definition.system_message or not self.definition.system_message.strip(): + raise ValidationError("System message cannot be empty") + + def _validate_management_structure(self): + """Validate management structure is consistent""" + # Check for duplicate entries + managed_components = (self.definition.managed_agents + self.definition.managed_assistant_supervisors) + if len(set(managed_components)) != len(managed_components): + raise ValidationError("Duplicate component names in management structure") + + def _validate_assistant_constraints(self): + """Validate assistant supervisor specific constraints""" + if self.definition.is_assistant: + # Assistant supervisors shouldn't manage other assistant supervisors + if self.definition.managed_assistant_supervisors: + raise ValidationError("Assistant supervisors cannot manage other assistant supervisors") + + def build(self, id_) -> Supervisor: + """Build supervisor if validation passes""" + if not self.validate(): + raise ValidationError(f"Validation failed for supervisor {self.definition.name}") + + return Supervisor(name=self.definition.name, + system_message=self.definition.system_message, + llm_config=self.llm_config, + is_assistant=self.definition.is_assistant, + workflow_id=id_) + + +class WorkflowBuilder: + + def __init__(self, + agents_system_messages, + workflow_definition: WorkflowDefinition, + llm_config: Dict[str, str], + workflow_id: str = "000"): + self.definition = workflow_definition + self.llm_config = llm_config + self.components = {} # Store built components + if agents_system_messages: + + self._update_system_messages(agents_system_messages) + self.workflow_id = workflow_id + + def _update_system_messages(self, agent_messages: Dict[str, str]): + """Update system messages in the workflow definition""" + # Update main supervisor + if self.definition.main_supervisor.name in agent_messages: + self.definition.main_supervisor.system_message = agent_messages[self.definition.main_supervisor.name] + + # Update assistant supervisors + for supervisor in self.definition.assistant_supervisors: + if supervisor.name in agent_messages: + supervisor.system_message = agent_messages[supervisor.name] + + # Update agents + for agent in self.definition.agents: + if agent.name in agent_messages: + agent.system_message = agent_messages[agent.name] + + def build_and_validate(self) -> Supervisor: + """Build and validate all components, then assemble the workflow""" + # 1. Build and validate main supervisor + main_sup_builder = SupervisorBuilder(self.definition.main_supervisor, self.llm_config) + if main_sup_builder.validate(): + main_supervisor = main_sup_builder.build(self.workflow_id) + self.components['main_supervisor'] = main_supervisor + + # 2. Build and validate assistant supervisors + for asst_sup_def in self.definition.assistant_supervisors: + asst_sup_builder = SupervisorBuilder(asst_sup_def, self.llm_config) + if asst_sup_builder.validate(): + asst_supervisor = asst_sup_builder.build(self.workflow_id) + self.components[asst_sup_def.name] = asst_supervisor + + # 3. Build and validate agents + for agent_def in self.definition.agents: + agent_builder = AgentBuilder(agent_def, self.llm_config) + if agent_builder.validate(): + agent = agent_builder.build() + self.components[agent_def.name] = agent + + # 4. Connect components based on management structure + self._connect_components() + + return self.components['main_supervisor'] + + def _connect_components(self): + """ + Connect all components based on management structure. + Ensures agents are only registered with their designated supervisors. + """ + # 1. First, connect assistant supervisors with their agents + for asst_sup_def in self.definition.assistant_supervisors: + asst_sup = self.components[asst_sup_def.name] + for agent_name in asst_sup_def.managed_agents: + if agent_name not in self.components: + raise ValueError(f"Agent {agent_name} not found for assistant supervisor {asst_sup_def.name}") + asst_sup.register_agent(self.components[agent_name]) + + # 2. Then, connect components to main supervisor + main_sup = self.components['main_supervisor'] + + # a. Connect direct agents (only those specifically managed by main supervisor) + for agent_name in self.definition.main_supervisor.managed_agents: + if agent_name not in self.components: + raise ValueError(f"Agent {agent_name} not found for main supervisor") + main_sup.register_agent(self.components[agent_name]) + + # b. Connect assistant supervisors to main supervisor + for asst_sup_name in self.definition.main_supervisor.managed_assistant_supervisors: + if asst_sup_name not in self.components: + raise ValueError(f"Assistant supervisor {asst_sup_name} not found") + main_sup.register_agent(self.components[asst_sup_name]) + + def save_workflow_to_file(self, output_path: str) -> None: + """ + Save the workflow implementation to a Python file. + + Args: + output_path (str): Path where the Python file should be saved + """ + try: + with open(output_path, 'w') as file: + # Write imports + file.write('''import os +import json # Added for output schema parsing +from dotenv import load_dotenv +from primisai.nexus.core import Agent, Supervisor + +# Load environment variables +load_dotenv() + +# LLM Configuration +llm_config = { + 'model': os.getenv('LLM_MODEL'), + 'api_key': os.getenv('LLM_API_KEY'), + 'base_url': os.getenv('LLM_BASE_URL') +} + ''') + + # Write tool functions and their metadata + file.write("\n# Tool Definitions\n") + tool_definitions = self._generate_tool_definitions() + file.write(tool_definitions) + + # Write agent creation + file.write("\n# Agent Definitions\n") + agent_definitions = self._generate_agent_definitions() + file.write(agent_definitions) + + # Write assistant supervisor creation + file.write("\n# Assistant Supervisor Definitions\n") + asst_sup_definitions = self._generate_assistant_supervisor_definitions() + file.write(asst_sup_definitions) + + # Write main supervisor creation and component registration + file.write("\n# Main Supervisor Definition and Component Registration\n") + main_sup_definition = self._generate_main_supervisor_definition() + file.write(main_sup_definition) + + # Write main execution block + file.write(''' +if __name__ == "__main__": + # Display the workflow structure + print("\\nGenerated Workflow Structure:") + main_supervisor.display_agent_graph() + + # Start interactive session + print("\\nStarting interactive session. Type 'exit' to end.") + while True: + user_input = input("\\nUser: ").strip() + if user_input.lower() == "exit": + break + + try: + response = main_supervisor.chat(user_input) + print(f"Response: {response}") + except Exception as e: + print(f"Error: {str(e)}") + ''') + + print(f"Workflow implementation saved to {output_path}") + + except Exception as e: + raise Exception(f"Error saving workflow to file: {str(e)}") + + def _generate_tool_definitions(self) -> str: + """Generate code for tool definitions.""" + tool_code = [] + tool_vars = [] # To keep track of tool variable names + + for agent_def in self.definition.agents: + if agent_def.tools: + for tool in agent_def.tools: + # Add tool function implementation + tool_code.append(tool.implementation) + + # Add tool metadata + tool_name = tool.metadata.function.name + metadata_var = f"{tool_name}_metadata" + tool_vars.append((tool_name, metadata_var)) + + metadata_code = f''' +{metadata_var} = {{ + "type": "{tool.metadata.type}", + "function": {{ + "name": "{tool.metadata.function.name}", + "description": "{tool.metadata.function.description}", + "parameters": {{ + "type": "{tool.metadata.function.parameters.type}", + "properties": {{''' + + # Add properties + for prop in tool.metadata.function.parameters.properties: + metadata_code += f''' + "{prop.argument}": {{"type": "{prop.type}", "description": "{prop.description}"}},''' + + metadata_code += f''' + }}, + "required": {tool.metadata.function.parameters.required} + }} + }} +}} + +{tool_name}_tool = {{"tool": {tool_name}, "metadata": {metadata_var}}} + ''' + tool_code.append(metadata_code) + + return "\n".join(tool_code) + + def _generate_agent_definitions(self) -> str: + """Generate code for agent definitions.""" + agent_code = [] + + for agent_def in self.definition.agents: + tools_list = [] + if agent_def.tools: + tools_list = [f"{tool.metadata.function.name}_tool" for tool in agent_def.tools] + + # Format output schema if provided + output_schema_str = (f"json.loads('''{agent_def.output_schema}''')" if agent_def.output_schema else "None") + + agent_code.append(f''' +{agent_def.name.lower()} = Agent( + name="{agent_def.name}", + system_message="""{agent_def.system_message}""", + llm_config=llm_config, + tools=[{", ".join(tools_list)}] if {bool(tools_list)} else None, + use_tools={agent_def.use_tools}, + keep_history={agent_def.keep_history}, + output_schema={output_schema_str}, + strict={agent_def.strict} +)''') + + return "\n".join(agent_code) + + def _generate_assistant_supervisor_definitions(self) -> str: + """Generate code for assistant supervisor definitions.""" + sup_code = [] + + for sup_def in self.definition.assistant_supervisors: + sup_code.append(f''' +{sup_def.name.lower()} = Supervisor( + name="{sup_def.name}", + system_message="""{sup_def.system_message}""", + llm_config=llm_config, + is_assistant=True +) + +# Register agents with {sup_def.name} +{" ".join(f'{sup_def.name.lower()}.register_agent({agent_name.lower()});' + for agent_name in sup_def.managed_agents)} +''') + + return "\n".join(sup_code) + + def _generate_main_supervisor_definition(self) -> str: + """Generate code for main supervisor definition and final registration.""" + main_sup_def = self.definition.main_supervisor + + code = f''' +main_supervisor = Supervisor( + name="{main_sup_def.name}", + system_message="""{main_sup_def.system_message}""", + llm_config=llm_config, + is_assistant=False +) + +# Register direct agents with main supervisor +{" ".join(f'main_supervisor.register_agent({agent_name.lower()});' + for agent_name in main_sup_def.managed_agents)} + +# Register assistant supervisors with main supervisor +{" ".join(f'main_supervisor.register_agent({sup_name.lower()});' + for sup_name in main_sup_def.managed_assistant_supervisors)} +''' + + return code diff --git a/primisai/nexus/architect/evaluator.py b/primisai/nexus/architect/evaluator.py new file mode 100644 index 0000000..3c68a39 --- /dev/null +++ b/primisai/nexus/architect/evaluator.py @@ -0,0 +1,517 @@ +from primisai.nexus.core import AI +from typing import Dict +import json +import random +from typing import List, Dict, Any, Tuple +from pathlib import Path +from tqdm import tqdm +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock +import threading +import time +import uuid + + +class Evaluator: + + def __init__(self, llm_config: Dict[str, str], benchmark_path: str, subset_size: int = 10): + """ + Initialize Evaluator with benchmark data. + + Args: + llm_config: LLM configuration for AI evaluator + benchmark_path: Path to the JSONL benchmark file + subset_size: Number of examples to test (default: 10) + """ + self.ai = AI(llm_config) + self.benchmark_path = benchmark_path + self.subset_size = subset_size + self.benchmark_data = [] + self.test_subset = [] + self.results = [] + self.feed_chat_history = [] + # New attributes for tracking QA pairs and AI feedback + self.all_qa_pairs = [] + self.failed_qa_pairs = [] + self.latest_ai_feedback = None + self._results_lock = Lock() + self._progress_lock = Lock() + self.correct_answers = 0 + self.wrong_answers = 0 + # Load benchmark data + self._load_benchmark() + + def _load_benchmark(self): + """Load benchmark data and randomly select good quality examples.""" + try: + with open(self.benchmark_path, 'r', encoding='utf-8') as file: + for line in file: + data = json.loads(line.strip()) + # Only keep good quality examples + # if data.get('quality', '').lower() == 'good': + self.benchmark_data.append(data) + + print(f"Loaded {len(self.benchmark_data)} examples from benchmark") + + # Randomly select subset_size examples instead of taking the first ones + if len(self.benchmark_data) >= self.subset_size: + self.test_subset = random.sample(self.benchmark_data, self.subset_size) + else: + self.test_subset = self.benchmark_data + print(f"Warning: Only {len(self.benchmark_data)} examples available, using all") + + except Exception as e: + raise Exception(f"Error loading benchmark: {str(e)}") + + def evaluate_supervisor(self, main_supervisor_or_factory, workflow_id, iteration, is_factory=False) -> Dict[str, Any]: + """ + Evaluate the supervisor on the test subset with parallel processing. + + Args: + main_supervisor_or_factory: Either a supervisor object or a factory function to create supervisors + is_factory: If True, main_supervisor_or_factory is a factory function + + Returns: + Dict: Evaluation results with scores and details + """ + # Reset counters and results + self.correct_answers = 0 + self.wrong_answers = 0 + self.results = [] + + # Add thread-safe supervisor creation lock + supervisor_creation_lock = threading.Lock() + + print(f"Testing supervisor on {len(self.test_subset)} examples...") + + # Initialize progress bar + progress_bar = tqdm(total=len(self.test_subset), desc="Evaluating", unit="question") + + def process_example(example_with_index): + i, example = example_with_index + test_query = example['question'] + expected_answer = example['answer'] + + try: + # Create supervisor instance for this thread with thread-safe unique ID + if is_factory: + with supervisor_creation_lock: # Ensure thread-safe supervisor creation + # Create truly unique ID using UUID and thread ID + + unique_suffix = f"{uuid.uuid4().hex[:8]}__{iteration}__{example.get('id', i)}__{i}" + supervisor = main_supervisor_or_factory(workflow_id, unique_suffix) + else: + supervisor = main_supervisor_or_factory + + # Get response from supervisor + response = supervisor.chat(test_query) + path = "/home/humza/office/primisai/nexus/nexus_workflows/" + path = path + workflow_id + "_" + str(unique_suffix) + path = path + "/history.jsonl" + messages = [] + with open(path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line: + messages.append(json.loads(line)) + + chat = """""" + + for i, msg in enumerate(messages): + if msg['role'] == "system": + continue + + if msg['sender_type'] == "user": + if i == len(messages): + continue + else: + temp = "User to " + messages[i + 1]['sender_name'] + ": " + msg['content'] + chat = chat + temp + "\n\n" + + elif messages[i]['sender_type'] == "main_supervisor" and messages[i - 1]['sender_type'] == "main_supervisor": + content = msg['content'] + to = messages[i - 1]['tool_calls'][0]['function']['name'].replace("delegate_to_", "") + from_ = msg['sender_name'] + temp = from_ + " to " + to + " : " + content + chat = chat + temp + "\n\n" + + elif msg['sender_type'] == "agent" and messages[i - 1]['sender_type'] == "main_supervisor": + content = msg['content'] + to = messages[i - 1]['sender_name'] + from_ = msg['sender_name'] + temp = from_ + " to " + to + " : " + content + chat = chat + temp + "\n\n" + + elif msg['sender_type'] == "main_supervisor" and msg['role'] == "assistant" and msg['content'] is not None: + content = msg['content'] + to = "User" + from_ = msg['sender_name'] + temp = from_ + " to " + to + " : " + content + chat = chat + temp + "\n\n" + + # Check answer correctness using LLM + is_correct = self._check_answer_correctness(expected_answer, response) + + # Create result detail + result_detail = { + 'id': example.get('id', i), + 'question': test_query, + 'expected_answer': expected_answer, + 'actual_response': response, + 'is_correct': is_correct, + 'difficulty': example.get('difficulty', 'unknown'), + 'index': i, + 'chat': chat + } + + return result_detail, is_correct, None + + except Exception as e: + result_detail = { + 'id': example.get('id', i), + 'question': test_query, + 'expected_answer': expected_answer, + 'actual_response': f"ERROR: {str(e)}", + 'is_correct': False, + 'difficulty': example.get('difficulty', 'unknown'), + 'index': i + } + return result_detail, False, str(e) + + # Use parallel processing with multiple supervisor instances + max_workers = min(8, len(self.test_subset)) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all tasks + future_to_index = {executor.submit(process_example, (i, example)): i for i, example in enumerate(self.test_subset)} + + # Process completed tasks as they finish + for future in as_completed(future_to_index): + try: + result_detail, is_correct, error = future.result() + + # Thread-safe updates + with self._results_lock: + self.results.append(result_detail) + if is_correct: + self.correct_answers += 1 + else: + self.wrong_answers += 1 + + # Thread-safe progress bar update + with self._progress_lock: + completed_count = len(self.results) + current_accuracy = self.correct_answers / self.subset_size if self.subset_size > 0 else 0 + progress_bar.set_postfix({ + 'Accuracy': f"{current_accuracy:.2%}", + 'Correct': self.correct_answers, + 'Wrong': self.wrong_answers + }) + progress_bar.update(1) + + except Exception as e: + print(f"Unexpected error processing example: {str(e)}") + with self._results_lock: + self.wrong_answers += 1 + with self._progress_lock: + progress_bar.update(1) + + # Close progress bar + progress_bar.close() + + # Sort results by original index to maintain order + self.results.sort(key=lambda x: x['index']) + + # Remove the index field as it was only needed for sorting + for result in self.results: + del result['index'] + + # Calculate final metrics + total_examples = len(self.test_subset) + accuracy = self.correct_answers / total_examples if total_examples > 0 else 0 + + evaluation_results = { + 'total_examples': total_examples, + 'correct_answers': self.correct_answers, + 'wrong_answers': self.wrong_answers, + 'accuracy': accuracy, + 'error_rate': self.wrong_answers / total_examples if total_examples > 0 else 0, + 'detailed_results': self.results + } + + print(f"\n=== Final Evaluation Results ===") + print(f"Total Examples: {total_examples}") + print(f"Correct Answers: {self.correct_answers}") + print(f"Wrong Answers: {self.wrong_answers}") + print(f"Final Accuracy: {accuracy:.2%}") + print(f"Error Rate: {evaluation_results['error_rate']:.2%}") + + return evaluation_results + + def generate_feedback_summary(self, old_system_messages, evaluation_results: Dict[str, Any]) -> str: + """ + Generate AI-powered structured feedback based on evaluation results and current system messages. + + Args: + system_messages: Current workflow system messages for agents + evaluation_results: Results from evaluate_supervisor() + + Returns: + str: AI-generated feedback with detailed analysis + """ + accuracy = evaluation_results['accuracy'] + detailed_results = evaluation_results['detailed_results'] + + # Prepare all question-answer pairs for AI analysis + qa_pairs = [] + failed_examples = [] + FAILED_METRIC = """Here are some examples from FAILED examples\n\n""" + # Randomly select up to 10 failed examples to include in the feedback + failed_results = [r for r in detailed_results if r['is_correct'] == False] + # If there are more than 10 failed examples, select a random subset + # Include all failed examples but only show chat for 10 + selected_for_chat = random.sample(failed_results, min(15, len(failed_results))) if len(failed_results) > 10 else failed_results + chat_ids = [result['id'] for result in selected_for_chat] + + # Build the failed examples text + for result in failed_results: + example_text = f"{result['question']}\nExpected Answer: {result['expected_answer']}\nPredicted Answer: {result['actual_response']}\nIs Correct: {result['is_correct']}\n" + + # Only include chat for selected examples + if result['id'] in chat_ids: + example_text += f"\nChat of this Example:\n{result['chat']}" + + FAILED_METRIC += example_text + "\n\n---\n\n" + + # Store in class attributes for future reference + self.all_qa_pairs = qa_pairs + self.failed_qa_pairs = failed_examples + + # Create comprehensive prompt for AI feedback generation + system_message = { + "role": + "system", + "content": ( + "You are an AI Workflow Performance Analyst specializing in identifying specific agent failures and providing precise system message improvements.\n" + "\n" + "YOUR ROLE:\n" + "Analyze failed examples where AI agents couldn't solve questions correctly. You will receive:\n" + "1. The original question\n" + "2. Expected correct answer\n" + "3. Actual predicted answer (wrong)\n" + "4. Complete conversation/chat between AI agents trying to solve the question\n" + "5. Current system message guidelines for each agent\n" + "\n" + "ANALYSIS PROCESS:\n" + "1. Read the agent conversation carefully to identify WHERE the failure occurred\n" + "2. Identify WHICH SPECIFIC AGENT made the critical error\n" + "3. Compare the agent's behavior in the chat with their current system message guidelines\n" + "4. Determine what guideline is missing, unclear, or causing the issue\n" + "5. Provide exact system message modifications\n" + "\n" + "YOUR OUTPUT WILL BE USED BY:\n" + "A prompt engineer who will directly update the agent system messages based on your recommendations.\n" + "\n" + "REQUIRED OUTPUT FORMAT (follow this exactly every time):\n" + "\n" + "# [EXACT_AGENT_NAME]\n" + "Issue: [Describe what the agent did wrong based on the chat conversation]\n" + "Root Cause: [What system message guideline is missing or inadequate]\n" + "Chat Evidence: \"[Exact quote from agent's response in the conversation]\"\n" + "Action Required: ADD/REMOVE/MODIFY\n" + "Guideline Change: \"[Exact text to add, remove, or modify in the system message]\"\n" + "\n" + "# [ANOTHER_AGENT_NAME]\n" + "NO CHANGE REQUIRED\n" + "\n" + "# [THIRD_AGENT_NAME]\n" + "Issue: [Describe what the agent did wrong]\n" + "Root Cause: [What system message guideline is missing]\n" + "Chat Evidence: \"[Exact quote from agent's response]\"\n" + "Action Required: ADD/REMOVE/MODIFY\n" + "Guideline Change: \"[Exact text to add, remove, or modify]\"\n" + "\n" + "CRITICAL REQUIREMENTS:\n" + "- Create one heading per agent (using exact agent names from the chat)\n" + "- If an agent performed correctly or didn't cause issues, write 'NO CHANGE REQUIRED' under their heading\n" + "- For agents with issues, provide all details under their single heading\n" + "- Quote specific parts of the agent conversation as evidence\n" + "- Provide actionable system message text that can be directly copy-pasted\n" + "- Focus on guideline gaps, not just describing what went wrong\n" + "- Each recommendation must be tied to specific chat evidence\n" + "- Be precise about ADD/REMOVE/MODIFY actions for system messages\n" + "\n" + "Your analysis must be thorough but concise, focusing on the most critical system message improvements needed for each agent." + ) + } + + # Prepare evaluation summary focusing on patterns not specifics + evaluation_summary = f"""EVALUATION RESULTS SUMMARY: + - Total Questions: {evaluation_results['total_examples']} + - Correct Answers: {evaluation_results['correct_answers']} + - Wrong Answers: {evaluation_results['wrong_answers']} + - Accuracy: {accuracy:.2%} + - Error Rate: {evaluation_results['error_rate']:.2%} + """ + + user_message = { + "role": + "user", + "content": + f"""CURRENT AGENT SYSTEM MESSAGES: +{old_system_messages} + +## EVALUATION SUMMARY +{evaluation_summary} + +## FAILED EXAMPLES WITH AGENT CONVERSATIONS +{FAILED_METRIC} + +Please analyze each failed example by carefully reading the agent conversations. For each agent that participated: + +1. Examine their specific responses and decisions in the chat +2. Identify which agents made critical errors that led to wrong answers +3. Compare their behavior against their current system message guidelines +4. Determine what specific guidelines are missing or inadequate +5. Provide exact system message modifications for each problematic agent + +Focus on agent-specific issues revealed through their actual chat behavior. If an agent performed correctly and didn't contribute to the failure, indicate "NO CHANGE REQUIRED" for that agent. + +Your analysis will be used to directly update individual agent system messages, so be precise about which agent needs what specific guideline changes.""" + } + # if self.feed_chat_history == []: + # self.feed_chat_history.append(system_message) + # self.feed_chat_history.append(user_message) + # msgs = [system_message, user_message] + # else: + # self.feed_chat_history.append(user_message) + # msgs = self._construct_messages_with_system_positioning(system_message) + + try: + messages_ = [system_message, user_message] + response = self.ai.generate_response(messages_) + ai_feedback = response.choices[0].message.content + self.feed_chat_history.append({'role': 'assistant', 'content': ai_feedback}) + # Store the AI feedback for reference + self.latest_ai_feedback = ai_feedback + + return ai_feedback + + except Exception as e: + print(f"Error generating AI feedback: {str(e)}") + return self._generate_basic_feedback(evaluation_results) + + def _check_answer_correctness(self, actual: str, predicted: str) -> bool: + """ + Check answer correctness using LLM evaluation with retry mechanism. + + Args: + actual: The correct/expected answer + predicted: The supervisor's predicted response + + Returns: + bool: True if answers match, False otherwise + """ + system_message = { + "role": + "system", + "content": + """You are an expert Reasoning and Analysis Agent. You will be given two answers: Actual (correct) vs Predicted (from AI system). +Your task is to determine if both answers convey the same meaning or conclusion, even if worded differently. +PREDICTED ANSWER can be long and will have different explaination but you have to carefully analyze the meaning of the answer. See if Actual answer is found inside Predicted answer. +Actual Answer can be in Predicted answer but in different wording or explaination. + +IMPORTANT: Reply with ONLY "1" or "0": +- 1 = Same meaning/conclusion (correct) +- 0 = Different meaning/conclusion (incorrect) + +Consider: +- Semantic equivalence (same meaning, different words) +- Numerical accuracy for math problems +- Logical consistency +- Core facts and conclusions""" + } + + user_message = { + "role": + "user", + "content": + f"""Please analyze these two answers and determine if the Predicted Answer matches the Actual Answer in logic and reasoning. + +Actual Answer: {actual} +Predicted Answer: {predicted} + +Return ONLY: 1 (same) or 0 (different)""" + } + + messages = [system_message, user_message] + + # Retry mechanism (up to 3 attempts) + for attempt in range(3): + try: + response = self.ai.generate_response(messages) + generated_content = response.choices[0].message.content.strip() + + # Extract digit from response using regex + digit_match = re.search(r'\b([01])\b', generated_content) + if digit_match: + result = digit_match.group(1) + return result == '1' # Return True if 1, False if 0 + + # If no clear 1 or 0 found, try again + if attempt < 2: # Don't print on last attempt + print(f"Warning: LLM evaluator returned unclear response: '{generated_content}'. Retrying...") + + except Exception as e: + if attempt < 2: # Don't print on last attempt + print(f"Error in LLM evaluation (attempt {attempt + 1}): {str(e)}. Retrying...") + continue + + # If all attempts failed, fall back to simple string matching + print("Warning: LLM evaluation failed after 3 attempts. Falling back to simple string matching.") + return self._simple_string_matching(actual, predicted) + + def _simple_string_matching(self, actual: str, predicted: str) -> bool: + """ + Fallback method for simple string matching. + + Args: + actual: Expected answer + predicted: Predicted answer + + Returns: + bool: True if strings match (case-insensitive) + """ + actual_clean = actual.lower().strip() + predicted_clean = predicted.lower().strip() + + # Check if expected answer is contained in response or vice versa + return (actual_clean in predicted_clean) or (predicted_clean in actual_clean) + + def _construct_messages_with_system_positioning(self, sys_msg) -> List[Dict[str, str]]: + """ + Construct message list with system message positioned 2 places before the current message. + + Returns: + List[Dict]: Properly positioned messages for API call + """ + messages = [] + + # If we have enough history, insert system message at correct position + if len(self.feed_chat_history) >= 2: + # Add all but last 2 messages + messages.extend(self.feed_chat_history[:-2]) + # Add system message + messages.append(sys_msg) + # Add last 2 messages + messages.extend(self.feed_chat_history[-2:]) + elif len(self.feed_chat_history) == 1: + # Add system message, then the single history message + messages.append(sys_msg) + messages.extend(self.feed_chat_history) + else: + # No history yet, just system message + messages.append(sys_msg) + + return messages diff --git a/primisai/nexus/architect/expander.py b/primisai/nexus/architect/expander.py new file mode 100644 index 0000000..958c34a --- /dev/null +++ b/primisai/nexus/architect/expander.py @@ -0,0 +1,56 @@ +from primisai.nexus.core import AI +from typing import Dict, Any + + +class WorkflowExpander: + + def __init__(self, llm_config: Dict[str, str]): + """ + Initialize WorkflowExpander with LLM configuration. + + Args: + llm_config: Dictionary containing LLM configuration + (api_key, model, base_url) + """ + self.ai = AI(llm_config) + + def expand_workflow_query(self, user_query: str, nexus_guidelines: str) -> str: + """ + Expand a high-level workflow query into detailed component pseudocode. + + Args: + user_query: User's high-level workflow description + nexus_guidelines: Basic guidelines about Nexus framework + + Returns: + str: Expanded workflow description as structured pseudocode + """ + # Construct the prompt + messages = [{ + "role": + "system", + "content": ( + "You are an advanced workflow architect specializing in designing intelligent, modular, and efficient workflows using the Nexus framework. " + "Your job is to deeply analyze high-level workflow requests and decompose them into a clear, structured set of components, strictly following the Nexus guidelines. " + "For each workflow, identify the absolute minimum set of agents, supervisors, and tools required, ensuring no unnecessary complexity. " + "For every component, clearly define its purpose, responsibilities, and how it interacts with other components. " + "Always create output schemas that are tailored to the nature of the task—if the task requires logic or reasoning, design the schema accordingly. " + "IMPORTANT: When passing the user query to agents, use the exact original wording without any modification. " + "Do not provide implementation code—focus on architecture, structure, and clarity. " + "Be precise, concise, and ensure your output is easy to follow for both humans and machines. " + "DONT USE TOOLS UNTILL UNLESS NEEDED") + }, { + "role": + "user", + "content": (f"Nexus Framework Guidelines:\n{nexus_guidelines}\n\n" + f"User Query Request: {user_query}\n\n" + "Please provide a detailed description of the workflow components needed, " + "including supervisors, agents, and tools. Focus on their roles and responsibilities.") + }] + + # Get response from LLM + try: + response = self.ai.generate_response(messages) + return response.choices[0].message.content + except Exception as e: + raise Exception(f"Error in workflow expansion: {str(e)}") diff --git a/primisai/nexus/architect/manager.py b/primisai/nexus/architect/manager.py new file mode 100644 index 0000000..fc62a48 --- /dev/null +++ b/primisai/nexus/architect/manager.py @@ -0,0 +1,188 @@ +# primisai/nexus/architect/manager.py + +import os +import datetime +import logging +from typing import Dict, Any, List, Optional + +from .expander import WorkflowExpander +from .structurer import WorkflowStructurer +from .builder import WorkflowBuilder +from .prompter import Prompter +from .evaluator import Evaluator +from . import prompts # Use relative import within the package +from .schemas import StructuredWorkflow # Import the schema for type hinting + +# Configure logging for the library module +logger = logging.getLogger(__name__) + + +class Architect: + """ + A high-level manager class to design, build, and optimize a multi-agent AI workflow. + + This class takes a user's high-level query and a JSONL dataset of + question/answer pairs to iteratively generate and refine a Python-based + multi-agent system. It orchestrates the Expander, Structurer, Prompter, + Evaluator, and Builder components to produce an optimized workflow. + """ + + def __init__(self, + user_query: str, + benchmark_path: str, + llm_config: Dict[str, str], + output_dir: str = "./optimized_workflows", + workflow_name: str = "optimized_workflow", + subset_size: int = 10, + max_iterations: int = 5): + """ + Initializes the WorkflowArchitect. + + Args: + user_query (str): The high-level user requirement for the workflow. + benchmark_path (str): The file path to the JSONL training data. + Each line must be a JSON object with "question" and "answer" keys. + llm_config (Dict[str, str]): Configuration for the language model, + e.g., {'model': '...', 'api_key': '...'}. + output_dir (str): Directory to save the final workflow file and logs. + workflow_name (str): The base name for the output Python file. + subset_size (int): Number of samples from the benchmark to use for each evaluation. + max_iterations (int): The maximum number of optimization loops to run. + """ + # --- Validate Inputs --- + if not os.path.exists(benchmark_path): + raise FileNotFoundError(f"Benchmark file not found at: {benchmark_path}") + if not all(k in llm_config for k in ['model', 'api_key']): + raise ValueError("llm_config must contain 'model' and 'api_key'.") + + # --- Store Configuration --- + self.user_query = user_query + self.benchmark_path = benchmark_path + self.llm_config = llm_config + self.output_dir = output_dir + self.workflow_name = workflow_name + self.subset_size = subset_size + self.max_iterations = max_iterations + + # --- Initialize Core Components --- + logger.info("Initializing architect components...") + self.expander = WorkflowExpander(self.llm_config) + self.structurer = WorkflowStructurer(self.llm_config) + self.evaluator = Evaluator(self.llm_config, self.benchmark_path, subset_size=self.subset_size) + self.prompter: Optional[Prompter] = None # Initialized after agents are known + + # --- Internal State --- + self.structured_workflow: Optional[StructuredWorkflow] = None + self.system_messages: Dict[str, str] = {} + self.performance_history: List[Dict[str, Any]] = [] + self.workflow_id = self._generate_workflow_id() + + os.makedirs(self.output_dir, exist_ok=True) + logger.info(f"WorkflowArchitect initialized. Output will be saved in '{self.output_dir}'.") + + def _generate_workflow_id(self) -> str: + """Creates a unique identifier for this workflow build session.""" + timestamp = datetime.datetime.now().strftime("%d-%m-%Y__%Hh-%Mmin") + benchmark_name = os.path.splitext(os.path.basename(self.benchmark_path))[0] + return f"{self.workflow_name}_subset={self.subset_size}_iter={self.max_iterations}_{benchmark_name}_{timestamp}" + + @staticmethod + def _extract_system_messages(system_messages_obj: Any, agents_names: List[str]) -> Dict[str, str]: + """ + Extracts system messages into a dictionary, handling both dict and Pydantic objects. + """ + if isinstance(system_messages_obj, dict): + return system_messages_obj + + system_messages_dict = {} + for agent_name in agents_names: + try: + system_messages_dict[agent_name] = getattr(system_messages_obj, agent_name) + except AttributeError: + logger.warning(f"Could not get system message for agent '{agent_name}'. Using default.") + system_messages_dict[agent_name] = "You are a helpful AI assistant." + return system_messages_dict + + def _create_supervisor_instance(self, workflow_id: str, unique_suffix: int): + """Factory function to create a supervisor instance for evaluation.""" + run_id = f"{workflow_id}_{unique_suffix}" + builder = WorkflowBuilder(self.system_messages, self.structured_workflow, self.llm_config, run_id) + return builder.build_and_validate() + + def _save_workflow_to_file(self, accuracy: float, iteration: int) -> str: + """Builds the workflow with current system messages and saves it to a file.""" + accuracy_percent = f"{accuracy*100:.1f}" + filename = f"{self.workflow_name}_iter_{iteration+1}_acc_{accuracy_percent}p.py" + output_path = os.path.join(self.output_dir, filename) + + builder = WorkflowBuilder(self.system_messages, self.structured_workflow, self.llm_config, self.workflow_id) + builder.build_and_validate() + builder.save_workflow_to_file(output_path) + logger.info(f"Workflow saved to: {output_path}") + return output_path + + def build_and_optimize(self) -> Dict[str, Any]: + """ + Executes the full workflow design and optimization process. + + Returns: + Dict[str, Any]: A dictionary containing the final results, including + final accuracy, path to the saved file, and performance history. + """ + logger.info("Step 1: Designing initial workflow from user query...") + expanded_workflow = self.expander.expand_workflow_query(self.user_query, prompts.nexus_guidelines) + self.structured_workflow = self.structurer.structure_workflow(expanded_workflow) + + agents_names = [self.structured_workflow.main_supervisor.name] + [agent.name for agent in self.structured_workflow.agents] + logger.info(f"Identified agents: {', '.join(agents_names)}") + + logger.info("Step 2: Generating initial system prompts for all agents...") + self.prompter = Prompter(agents_names, self.llm_config) + initial_messages_obj = self.prompter.generate_warmup_system_messages(self.user_query, self.structured_workflow) + self.system_messages = self._extract_system_messages(initial_messages_obj, agents_names) + + logger.info(f"Step 3: Starting optimization loop for {self.max_iterations} iterations...") + final_accuracy = 0.0 + output_path = "" + + for i in range(self.max_iterations): + logger.info(f"\n{'='*20} Iteration {i+1}/{self.max_iterations} {'='*20}") + + logger.info("Evaluating current workflow performance...") + evaluation_results = self.evaluator.evaluate_supervisor( + lambda suffix: self._create_supervisor_instance(self.workflow_id, suffix), self.workflow_id, iteration=i, is_factory=True) + accuracy = evaluation_results['accuracy'] + final_accuracy = accuracy + logger.info(f"Iteration {i+1} Accuracy: {accuracy:.2%}") + + output_path = self._save_workflow_to_file(accuracy, i) + + self.performance_history.append({ + "iteration": i + 1, + "accuracy": accuracy, + "system_messages": self.system_messages.copy(), + "saved_path": output_path + }) + + if accuracy >= 0.95 and i < self.max_iterations - 1: + logger.info("Achieved >95% accuracy. Stopping optimization early.") + break + + if i < self.max_iterations - 1: + logger.info("Generating feedback for improvement...") + feedback = self.evaluator.generate_feedback_summary(str(self.system_messages), evaluation_results) + + logger.info("Updating system messages based on feedback...") + self.system_messages = self.prompter.update_system_messages_with_feedback(self.system_messages, accuracy, feedback) + else: + logger.info(f"Max iterations ({self.max_iterations}) reached.") + + logger.info("\nWorkflow optimization complete.") + logger.info(f"Final Accuracy: {final_accuracy:.2%}") + logger.info(f"Most recent workflow saved at: {output_path}") + + return { + "final_accuracy": final_accuracy, + "output_path": output_path, + "history": self.performance_history, + } diff --git a/primisai/nexus/architect/prompter.py b/primisai/nexus/architect/prompter.py new file mode 100644 index 0000000..580b45d --- /dev/null +++ b/primisai/nexus/architect/prompter.py @@ -0,0 +1,330 @@ +from primisai.nexus.core import AI +from typing import Dict, Any, List +from copy import deepcopy +from pydantic import BaseModel, create_model + + +def process_system_messages(old_system_messages, new_system_messages): + """ + Process system messages by: + 1. Removing agent names (first part before ':' in first 100 characters) + 2. Keeping old message if new message contains 'NO_CHANGE' + """ + processed_messages = {} + + def clean_message(message): + """Remove agent name prefix from message""" + # Take first 100 characters to find the colon + first_part = message[:100] + colon_index = first_part.find(':') + + if colon_index != -1: + # Remove everything before and including the colon, then strip whitespace + cleaned = message[colon_index + 1:].strip() + return cleaned + else: + # If no colon found in first 100 chars, return original message + return message + + # Process each agent + for agent_name in new_system_messages.keys(): + new_message = new_system_messages[agent_name] + + # Check if new message contains NO_CHANGE + if 'NO_CHANGE' in new_message: + # Use old message and clean it + if agent_name in old_system_messages: + processed_messages[agent_name] = clean_message(old_system_messages[agent_name]) + else: + processed_messages[agent_name] = new_message # fallback + else: + # Use new message and clean it + processed_messages[agent_name] = clean_message(new_message) + + return processed_messages + + +def extract_system_messages(system_messages_obj, agents_names): + """Extract system messages from either dict or Pydantic object""" + if isinstance(system_messages_obj, dict): + return system_messages_obj + else: + # Handle Pydantic object + system_messages_dict = {} + for agent_name in agents_names: + try: + system_messages_dict[agent_name] = getattr(system_messages_obj, agent_name) + except AttributeError: + print(f"Warning: Could not get system message for agent {agent_name}") + system_messages_dict[agent_name] = "No system message available" + return system_messages_dict + + +def create_dynamic_schema(agent_names): + """ + Create a dynamic Pydantic model with fields for each agent name. + + Args: + agent_names: List of agent names + + Returns: + Dynamically created Pydantic model class + """ + fields = {agent_name: (str, ...) for agent_name in agent_names} + return create_model('ResponseStructure', **fields) + + +class Prompter: + + def __init__(self, agent_names, llm_config: Dict[str, str]): + """ + Initialize Prompter with LLM configuration and conversation management. + + Args: + llm_config: Dictionary containing LLM configuration + (api_key, model, base_url) + """ + self.ai = AI(llm_config) + self.model = llm_config.get("model", "gpt-4.1") + self.conversation_history = [] # Store all messages in conversation + self.agent_names = agent_names # List of agent names for system messages + self.system_message = { + "role": + "system", + "content": ( + "You are an AI Prompt Engineer specializing in updating system messages for AI agents based on performance feedback analysis.\n" + "\n" + "YOUR ROLE:\n" + "You receive detailed feedback from a Performance Analyst who has identified specific agent failures by analyzing failed examples and agent conversations. Your job is to implement precise system message updates to fix these identified issues.\n" + "\n" + "INPUT YOU RECEIVE:\n" + "1. Current system messages for all agents\n" + "2. Structured feedback in format:\n" + " # [AGENT_NAME]\n" + " Issue: [what went wrong]\n" + " Root Cause: [missing/inadequate guideline]\n" + " Chat Evidence: [quote from agent conversation]\n" + " Action Required: ADD/REMOVE/MODIFY\n" + " Guideline Change: [exact text to implement]\n" + "\n" + "YOUR SYSTEM MESSAGE UPDATE PROCESS:\n" + "1. For each agent in the feedback, read their current system message\n" + "2. Implement the EXACT changes specified in 'Action Required' and 'Guideline Change'\n" + "3. If Action = ADD: Add the guideline text to appropriate section\n" + "4. If Action = REMOVE: Remove the specified problematic text\n" + "5. If Action = MODIFY: Replace old text with new guideline text\n" + "6. Preserve all existing working guidelines unless explicitly told to remove them\n" + "If needed try to add few shot example from feedback to enhance performance of Agent" + "\n" + "CRITICAL RULES:\n" + "- NEVER remove or change the core agent identity (\"You are XYZ Agent...\")\n" + "- Only modify what the feedback explicitly specifies\n" + "- If feedback says 'NO CHANGE REQUIRED' for an agent, output: 'AGENT_NAME: NO_CHANGE'\n" + "- Keep all other working system message parts intact\n" + "- If Accuracy of current system messages is lesser than last one then get the last ones and update them based on feedback" + "- Make surgical, targeted updates based on evidence-based feedback\n" + "\n" + "OUTPUT FORMAT (use exactly):\n" + "AGENT_NAME: [complete updated system message]\n" + "or\n" + "AGENT_NAME: NO_CHANGE\n" + "\n" + "FORMATTING REQUIREMENTS:\n" + "- DO NOT include agent names inside the system message content\n" + "- Start system messages with 'You are...' directly\n" + "- DO NOT add headers or prefixes within the message text\n" + "- Maintain clean, professional system message structure\n" + "\n" + "VALIDATION CHECK:\n" + "Before finalizing each update, verify:\n" + "- Does this change address the specific issue identified in the chat evidence?\n" + "- Have I preserved the agent's core identity and working capabilities?\n" + "- Is the new guideline clear and actionable?\n" + "\n" + "Your updates will directly fix the reasoning failures identified through actual agent conversation analysis.") + } + + self.current_system_messages = None + self.ResponseStructure = create_dynamic_schema(agent_names) + + def generate_warmup_system_messages(self, user_query: str, workflow: str) -> str: + """ + Generate initial system messages for workflow components. + + Args: + workflow: Detailed workflow description + + Returns: + str: Generated system messages for all components + """ + # Create user message for initial generation + + system_message = { + "role": + "system", + "content": ( + "You are an expert AI Prompt Engineer. Your task is to generate clear, effective system messages for each agent in a workflow, based on the provided User Query and workflow description.\n\n" + "INSTRUCTIONS:\n" + "- For each agent, write a system message that starts with 'You are...' and clearly defines the agent's role and responsibilities in the workflow.\n" + "- Ensure each system message is actionable, unambiguous, and tailored to the agent's function.\n" + "- Do not include agent names inside the message content; only use them as keys.\n" + "- Avoid unnecessary headers or prefixes. Focus on clarity and completeness.\n" + "- If possible, include a brief example of the agent's expected behavior.\n\n" + "OUTPUT FORMAT:\n" + "[complete system message for that agent]\n\n" + "Begin by analyzing the User Query and workflow, then generate 1-2 line system messages for all supervisors and agents required to accomplish the workflow." + ) + } + + user_message = { + "role": + "user", + "content": + f"""This is User Query on which workflow is generated\n\n{user_query}\n{workflow}.\n Generate all supervisor, agents name and their system messages """ + } + + # Add to conversation history + # self.conversation_history.append(user_message) + + # Construct messages with system message at correct position + # messages = self._construct_messages_with_system_positioning() + messages = [system_message, user_message] + + # Get response from LLM + try: + + generated_content = self.ai.client.beta.chat.completions.parse(messages=messages, + response_format=self.ResponseStructure, + model=self.model) + + # response = self.ai.generate_response(messages) + # generated_content = response.choices[0].message.content + + # # Store the generated system messages + # self.current_system_messages = generated_content + + # Add assistant response to conversation history + assistant_message = {"role": "assistant", "content": str(generated_content.choices[0].message.parsed)} + self.conversation_history.append(user_message) + self.conversation_history.append(assistant_message) + + return generated_content.choices[0].message.parsed + + except Exception as e: + raise Exception(f"Error in system message generation: {str(e)}") + + def update_system_messages_with_feedback(self, old_system_messages, accuracy, feedback: str) -> str: + """ + Update system messages based on performance feedback. + + Args: + feedback: Feedback containing issues and improvement suggestions + Expected format: + # POSSIBLE ISSUES + ... + # IMPROVEMENTS + ... + + Returns: + str: Updated system messages + """ + + # Create feedback message + feedback_message = f'''CURRENT SYSTEM MESSAGES: +{old_system_messages} + +Accuracy on these system messages: {accuracy} + +PERFORMANCE ANALYST FEEDBACK: +{feedback} + +IMPLEMENTATION INSTRUCTIONS: +1. Read the structured feedback for each agent carefully +2. For agents with "NO CHANGE REQUIRED" in feedback → Output: "AGENT_NAME: NO_CHANGE" +3. For agents with specific issues identified: + - Implement the EXACT "Action Required" (ADD/REMOVE/MODIFY) + - Keep all other existing system message intact +4. REDUNDANCY CHECK: If the "Guideline Change" already exists in current system message → Output: "AGENT_NAME: NO_CHANGE"""" +OUTPUT FORMAT REQUIREMENTS: +- AGENT_NAME: NO_CHANGE (if no update needed) +- AGENT_NAME: [complete updated system message] (if update needed) +- DO NOT include explanations or analysis text +- DO NOT write agent names within the system message content + +The feedback is based on actual agent conversation analysis - implement changes precisely to fix the identified reasoning failures.''' + + # Add to conversation history + + self.conversation_history.append({'role': 'user', 'content': feedback_message}) + + # Construct messages with system message at correct position + messages = self._construct_messages_with_system_positioning() + + # messages_ = [self.system_message, {'role': 'user', 'content': feedback_message}] + + # Get response from LLM + try: + + response = self.ai.client.beta.chat.completions.parse(messages=messages, + response_format=self.ResponseStructure, + model=self.model) + # response = self.ai.generate_response(messages) + # updated_content = response.choices[0].message.content + + # Update stored system messages + # self.current_system_messages = updated_content + + # Add assistant response to conversation history + assistant_message = {"role": "assistant", "content": str(response.choices[0].message.parsed)} + self.conversation_history.append(assistant_message) + + new_system_messages = extract_system_messages(response.choices[0].message.parsed, self.agent_names) + result = process_system_messages(old_system_messages, new_system_messages) + return result + + except Exception as e: + raise Exception(f"Error in system message update: {str(e)}") + + def _construct_messages_with_system_positioning(self) -> List[Dict[str, str]]: + """ + Construct message list with system message positioned 2 places before the current message. + + Returns: + List[Dict]: Properly positioned messages for API call + """ + messages = [] + + # If we have enough history, insert system message at correct position + if len(self.conversation_history) >= 2: + # Add all but last 2 messages + messages.extend(self.conversation_history[:-2]) + # Add system message + messages.append(self.system_message) + # Add last 2 messages + messages.extend(self.conversation_history[-2:]) + elif len(self.conversation_history) == 1: + # Add system message, then the single history message + messages.append(self.system_message) + messages.extend(self.conversation_history) + else: + # No history yet, just system message + messages.append(self.system_message) + + return messages + """ + Get a summary of the conversation state. + + Returns: + Dict: Summary including message count, iterations, and current state + """ + user_messages = [msg for msg in self.conversation_history if msg["role"] == "user"] + assistant_messages = [msg for msg in self.conversation_history if msg["role"] == "assistant"] + + return { + "total_messages": len(self.conversation_history), + "user_messages": len(user_messages), + "assistant_messages": len(assistant_messages), + "iterations": len(assistant_messages), # Each assistant response is an iteration + "has_current_system_messages": self.current_system_messages is not None, + "last_update": self.conversation_history[-1]["content"][:100] + "..." if self.conversation_history else None + } diff --git a/primisai/nexus/architect/prompts.py b/primisai/nexus/architect/prompts.py new file mode 100644 index 0000000..5929167 --- /dev/null +++ b/primisai/nexus/architect/prompts.py @@ -0,0 +1,169 @@ +nexus_guidelines = """NEXUS FRAMEWORK - A HIGH-LEVEL GUIDE +1. OVERVIEW +Nexus is an advanced framework for building hierarchical AI agent systems that enables coordinated task execution through specialized agents and supervisors. + +2. CORE ARCHITECTURE + A. Supervisor Layer + • Controls overall workflow coordination + • Manages agent delegation and communication + • Can be hierarchically structured + + B. Agent Layer + • Performs specialized tasks + • Contains domain-specific knowledge + • Can be stateful or stateless + • Can enforce structured outputs using schemas + + C. Tool Layer + • Extends agent capabilities + • Provides specific functionalities + • Integrates with external systems + +3. KEY COMPONENTS + A. Main Supervisor + • Top-level coordinator + • Manages workflow distribution + • Handles high-level decision making + + B. Assistant Supervisors + • Domain-specific coordinators + • Manage subset of agents + • Handle specialized workflows + + C. Specialized Agents + • Task-focused AI entities + • Configurable behavior + • Tool integration capability + • Can enforce structured outputs + +4. OUTPUT SCHEMAS + A. Purpose + • Ensure consistent agent responses + • Validate output structure + • Enable reliable downstream processing + + B. Schema Configuration + • Define expected response format + • Specify required fields + • Set validation constraints + + C. Validation Modes + • Strict: Always enforce schema + • Non-strict: Allow fallback to unstructured + + D. Common Schema Types + • Analysis results + • Code generation + • Data processing + • Content creation + +5. SCHEMA EXAMPLES + Analysis Schema: + { + "type": "object", + "properties": { + "summary": {"type": "string", "description": "Brief analysis summary"}, + "key_points": { + "type": "array", + "items": {"type": "string"}, + "description": "List of key findings" + }, + "recommendations": { + "type": "array", + "items": {"type": "string"}, + "description": "Suggested actions" + } + }, + "required": ["summary", "key_points"] + } + + Code Schema: + { + "type": "object", + "properties": { + "description": {"type": "string", "description": "Code explanation"}, + "code": {"type": "string", "description": "Implementation"}, + "language": {"type": "string", "description": "Programming language"} + }, + "required": ["description", "code"] + } + + Content Schema: + { + "type": "object", + "properties": { + "title": {"type": "string", "description": "Content title"}, + "body": {"type": "string", "description": "Main content"}, + "metadata": { + "type": "object", + "properties": { + "category": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}} + } + } + }, + "required": ["title", "body"] + } +""" + +expanded_workflow = """Below is a detailed, yet simplified, workflow design for bedtime story generation using the Nexus framework guidelines. Each component’s role, responsibilities, and interactions are described following the Nexus’s Supervisor, Agent, and Tool layers. + +────────────────────────────── +1. Main Supervisor + +Role: +• Acts as the top-level coordinator for the entire bedtime story generation workflow. + +Responsibilities: +• Receives the initial request for a bedtime story. +• Determines overall story parameters (such as tone, length, and target audience) from any user inputs or defaults. +• Delegates the generation tasks to specialized assistant components. +• Oversees the communication flow and collects the final output from the agents before passing it back to the user. + +────────────────────────────── +2. Assistant Supervisor – Story Structure Coordinator + +Role: +• Functions as a domain-specific coordinator focused on narrative aspects. + +Responsibilities: +• Breaks down the story generation task into clear narrative components (introduction, conflict, resolution). +• Determines style guidelines (such as soothing tone, simple language, and engaging storytelling) suitable for a bedtime context. +• Coordinating with the Specialized Agent to ensure that the generated narrative adheres to the desired structure and style. + +────────────────────────────── +3. Specialized Agent – Story Content Generator + +Role: +• A task-focused AI entity responsible for generating creative story content. + +Responsibilities: +• Uses domain-specific knowledge to generate a bedtime story based on the parameters and narrative structure set by the assistant supervisor. +• Ensures that the story is engaging, calming, and age-appropriate. +• Can be configured to produce story variations should the user require alternative options. +• Handles iterative improvements if minor adjustments are needed (for example, softening certain parts of the narrative). + +────────────────────────────── +4. Tool Layer Component – Language Generation/Enhancement Tool + +Role: +• Extends the agent’s capability by providing specialized text generation and language processing functions. + +Responsibilities: +• Integrates with the underlying language model to convert narrative outlines into a full prose story. +• Refines the language, ensuring that it flows naturally and is optimized for a bedtime storytelling experience. +• May include post-generation editing functions such as grammar checks and style adjustments. +• Operates as a plug-in resource that the Story Content Generator calls to ensure that the final text is both creative and polished. + +────────────────────────────── +Workflow Summary + +1. The Main Supervisor receives a request for a bedtime story and determines overall parameters. +2. The Main Supervisor delegates narrative planning to the Assistant Supervisor, which organizes story structure and style guidelines. +3. The Specialized Agent (Story Content Generator) creates the narrative, leveraging the language processing capabilities of the Tool Layer. +4. The Tool Layer refines the text, ensuring smooth language and intended tone. +5. Finally, the Main Supervisor collects the refined bedtime story and returns it to the user in a straightforward, easy-to-understand manner. + +────────────────────────────── +This streamlined workflow leverages the hierarchical and coordinated nature of the Nexus framework while ensuring that the components are clear in their roles and interactions. The design keeps the process simple, yet modular enough to allow future adjustments such as additional personalization or more complex narrative elements, if needed.""" + diff --git a/primisai/nexus/architect/schemas.py b/primisai/nexus/architect/schemas.py new file mode 100644 index 0000000..5a1868f --- /dev/null +++ b/primisai/nexus/architect/schemas.py @@ -0,0 +1,53 @@ +from pydantic import BaseModel +from typing import List, Optional + +# Tool Schema +class ParameterProperty(BaseModel): + argument: str + type: str + description: str + +class ToolParameters(BaseModel): + type: str + properties: List[ParameterProperty] + required: List[str] + +class ToolFunctionDef(BaseModel): + name: str + description: str + parameters: ToolParameters + +class ToolMetadata(BaseModel): + type: str + function: ToolFunctionDef + +class Tool(BaseModel): + metadata: ToolMetadata + implementation: str + validation_constraints: List[str] + +# Agent Schema +class AgentDefinition(BaseModel): + name: str + system_message: str + use_tools: bool + keep_history: bool + tools: List[Tool] + output_schema: Optional[str] = None + strict: bool = False + validation_constraints: List[str] + +# Supervisor Schema +class SupervisorDefinition(BaseModel): + name: str + is_assistant: bool + system_message: str + managed_agents: List[str] + managed_assistant_supervisors: List[str] + validation_constraints: List[str] + +# Complete Workflow Schema +class WorkflowDefinition(BaseModel): + main_supervisor: SupervisorDefinition + assistant_supervisors: List[SupervisorDefinition] + agents: List[AgentDefinition] \ No newline at end of file diff --git a/primisai/nexus/architect/structurer.py b/primisai/nexus/architect/structurer.py new file mode 100644 index 0000000..26ad6fa --- /dev/null +++ b/primisai/nexus/architect/structurer.py @@ -0,0 +1,69 @@ +import os +from typing import Dict +from primisai.nexus.core import AI +from primisai.nexus.architect.schemas import WorkflowDefinition + + +class WorkflowStructurer: + + def __init__(self, llm_config: Dict[str, str]): + """ + Initialize WorkflowStructurer with LLM configuration. + + Args: + llm_config: Dictionary containing LLM configuration + """ + self.llm_config = llm_config + self.ai = AI(self.llm_config) + + curr_dir = os.path.dirname(os.path.abspath(__file__)) + doc_path = os.path.join(curr_dir, "NEXUS_DOCUMENTATION.md") + + with open(doc_path, "r", encoding="utf-8") as file: + self.nexus_documentation = file.read() + + def structure_workflow(self, expanded_workflow: str) -> WorkflowDefinition: + """ + Convert expanded workflow description into structured component definitions. + + Args: + expanded_workflow: Detailed workflow description from previous step + + Returns: + WorkflowDefinition: Structured workflow components + """ + messages = [{ + "role": + "system", + "content": ("You are a workflow structure expert. Your task is to convert " + "the expanded workflow description into structured component " + "definitions following the provided schema. Include validation " + "constraints for each component. USE same system messages provided in query if provided. DONT change them") + }, { + "role": + "system", + "content": (f"Nexus Documentation:\n\n {self.nexus_documentation}" + "Based on the examples provided in the Nexus documentation, " + "please structure the expanded workflow description into " + "component definitions. Include supervisors, agents, and tools. " + "Also, add validation constraints for each component.") + }, { + "role": + "user", + "content": ( + f"Given the following workflow description:\n\n{expanded_workflow}\n\n" + "Please provide structured definitions for all components including " + "supervisors, agents, and tools. For tools, include both metadata " + "and complete Python function implementations. Also include validation " + "constraints for each component. Also try not to add whitespace in the " + "names of components. Add a proper (detailed) system messgages for all the components. Detailing all the details. Covering all the points. USE same system messages provided in query. DONT change them" + ) + }] + + try: + completion = self.ai.client.beta.chat.completions.parse(messages=messages, + response_format=WorkflowDefinition, + model=self.llm_config["model"]) + return completion.choices[0].message.parsed + except Exception as e: + raise Exception(f"Error in workflow structuring: {str(e)}") From be1614e24f1ce44f98d06667a4d66942df022b4a Mon Sep 17 00:00:00 2001 From: Humza Sami Date: Thu, 17 Jul 2025 20:24:21 +0500 Subject: [PATCH 2/3] feat: created example file for testing Architect functionality --- .gitignore | 1 + examples/create_workflow_using_architect.py | 36 ++++++++++++++++++++ primisai/nexus/architect/__init__.py | 1 + primisai/nexus/architect/evaluator.py | 6 ++-- primisai/nexus/architect/expander.py | 13 ++++---- primisai/nexus/architect/manager.py | 37 ++++++++++++--------- primisai/nexus/architect/structurer.py | 2 +- 7 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 examples/create_workflow_using_architect.py diff --git a/.gitignore b/.gitignore index bca92bc..9f4a1b8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ logs* nexus_workflows chat.rapidgpt .vscode +optimized_workflows/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/examples/create_workflow_using_architect.py b/examples/create_workflow_using_architect.py new file mode 100644 index 0000000..4efffd4 --- /dev/null +++ b/examples/create_workflow_using_architect.py @@ -0,0 +1,36 @@ +import os, sys +from dotenv import load_dotenv + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from primisai.nexus.architect import Architect + +load_dotenv() + +llm_config = {'model': os.getenv('LLM_MODEL'), 'api_key': os.getenv('LLM_API_KEY'), 'base_url': os.getenv('LLM_BASE_URL')} + +user_query = """Design a workflow to solve GRE/SAT-style riddle questions and math problems. Use 1 supervisor and 1 Question Ansering Agent""" + +"""JSON object containing the keys "question" and "answer". +Example line: {"question": "What is 2+2?", "answer": "4"}""" + +benchmark_path = "path_to_the_data.jsonl" + +architect = Architect(user_query=user_query, + benchmark_path=benchmark_path, + llm_config=llm_config, + workflow_name="math_riddle_solver", + subset_size=5, + max_iterations=2) + +results = architect.build_and_optimize() + +# --- 3. Review the Results --- +print("\n\n" + "=" * 25 + " FINAL SUMMARY " + "=" * 25) +print(f"✅ Process Completed!") +print(f"Final Accuracy: {results['final_accuracy']:.2%}") +print(f"Final optimized workflow file saved to: {results['output_path']}") +print("\n--- Performance History ---") +for record in results['history']: + print(f" - Iteration {record['iteration']}: Accuracy={record['accuracy']:.2%}, Saved to -> {record['saved_path']}") +print("=" * 67) diff --git a/primisai/nexus/architect/__init__.py b/primisai/nexus/architect/__init__.py index a1ee720..e9f006c 100644 --- a/primisai/nexus/architect/__init__.py +++ b/primisai/nexus/architect/__init__.py @@ -5,6 +5,7 @@ from .evaluator import Evaluator from .schemas import * from .prompts import * +from .manager import Architect __all__ = [ "WorkflowExpander", diff --git a/primisai/nexus/architect/evaluator.py b/primisai/nexus/architect/evaluator.py index 3c68a39..01fb9e9 100644 --- a/primisai/nexus/architect/evaluator.py +++ b/primisai/nexus/architect/evaluator.py @@ -240,7 +240,7 @@ def process_example(example_with_index): 'detailed_results': self.results } - print(f"\n=== Final Evaluation Results ===") + print(f"\n=== Evaluation Results ===") print(f"Total Examples: {total_examples}") print(f"Correct Answers: {self.correct_answers}") print(f"Wrong Answers: {self.wrong_answers}") @@ -279,8 +279,8 @@ def generate_feedback_summary(self, old_system_messages, evaluation_results: Dic example_text = f"{result['question']}\nExpected Answer: {result['expected_answer']}\nPredicted Answer: {result['actual_response']}\nIs Correct: {result['is_correct']}\n" # Only include chat for selected examples - if result['id'] in chat_ids: - example_text += f"\nChat of this Example:\n{result['chat']}" + # if result['id'] in chat_ids: + # example_text += f"\nChat of this Example:\n{result['chat']}" FAILED_METRIC += example_text + "\n\n---\n\n" diff --git a/primisai/nexus/architect/expander.py b/primisai/nexus/architect/expander.py index 958c34a..33887e8 100644 --- a/primisai/nexus/architect/expander.py +++ b/primisai/nexus/architect/expander.py @@ -30,15 +30,14 @@ def expand_workflow_query(self, user_query: str, nexus_guidelines: str) -> str: "role": "system", "content": ( - "You are an advanced workflow architect specializing in designing intelligent, modular, and efficient workflows using the Nexus framework. " - "Your job is to deeply analyze high-level workflow requests and decompose them into a clear, structured set of components, strictly following the Nexus guidelines. " - "For each workflow, identify the absolute minimum set of agents, supervisors, and tools required, ensuring no unnecessary complexity. " - "For every component, clearly define its purpose, responsibilities, and how it interacts with other components. " - "Always create output schemas that are tailored to the nature of the task—if the task requires logic or reasoning, design the schema accordingly. " + "You are an advanced workflow architect specializing in designing intelligent, modular, and efficient workflows using the Nexus framework." + "Your job is to deeply analyze high-level workflow requests and decompose them into a clear, structured set of components, strictly following the Nexus guidelines." + "For each workflow, identify the absolute minimum set of agents, supervisors, and tools required, ensuring no unnecessary complexity." "IMPORTANT: When passing the user query to agents, use the exact original wording without any modification. " "Do not provide implementation code—focus on architecture, structure, and clarity. " - "Be precise, concise, and ensure your output is easy to follow for both humans and machines. " - "DONT USE TOOLS UNTILL UNLESS NEEDED") + "Be precise, concise, and ensure your output is easy to follow for both humans and machines." + "DONT USE TOOLS UNTILL UNLESS NEEDED. DONT USE Sub supervisors. Only supervisor and agents. Dont Use any output Schemas for Agents. Supervisor Cannot ask any feedback quetion from user" + ) }, { "role": "user", diff --git a/primisai/nexus/architect/manager.py b/primisai/nexus/architect/manager.py index fc62a48..7911b91 100644 --- a/primisai/nexus/architect/manager.py +++ b/primisai/nexus/architect/manager.py @@ -1,17 +1,14 @@ -# primisai/nexus/architect/manager.py - import os import datetime import logging from typing import Dict, Any, List, Optional -from .expander import WorkflowExpander -from .structurer import WorkflowStructurer -from .builder import WorkflowBuilder -from .prompter import Prompter -from .evaluator import Evaluator -from . import prompts # Use relative import within the package -from .schemas import StructuredWorkflow # Import the schema for type hinting +from primisai.nexus.architect.expander import WorkflowExpander +from primisai.nexus.architect.structurer import WorkflowStructurer +from primisai.nexus.architect.builder import WorkflowBuilder +from primisai.nexus.architect.prompter import Prompter +from primisai.nexus.architect.evaluator import Evaluator +from primisai.nexus.architect import prompts # Configure logging for the library module logger = logging.getLogger(__name__) @@ -36,12 +33,18 @@ def __init__(self, subset_size: int = 10, max_iterations: int = 5): """ - Initializes the WorkflowArchitect. + Initializes the Architect. Args: user_query (str): The high-level user requirement for the workflow. benchmark_path (str): The file path to the JSONL training data. - Each line must be a JSON object with "question" and "answer" keys. + + IMPORTANT: Each line in this file must be a valid + JSON object containing the keys "question" and "answer". + + Example line: + {"question": "What is 2+2?", "answer": "4"} + llm_config (Dict[str, str]): Configuration for the language model, e.g., {'model': '...', 'api_key': '...'}. output_dir (str): Directory to save the final workflow file and logs. @@ -72,7 +75,7 @@ def __init__(self, self.prompter: Optional[Prompter] = None # Initialized after agents are known # --- Internal State --- - self.structured_workflow: Optional[StructuredWorkflow] = None + self.structured_workflow = None self.system_messages: Dict[str, str] = {} self.performance_history: List[Dict[str, Any]] = [] self.workflow_id = self._generate_workflow_id() @@ -134,7 +137,7 @@ def build_and_optimize(self) -> Dict[str, Any]: self.structured_workflow = self.structurer.structure_workflow(expanded_workflow) agents_names = [self.structured_workflow.main_supervisor.name] + [agent.name for agent in self.structured_workflow.agents] - logger.info(f"Identified agents: {', '.join(agents_names)}") + logger.info(f"Created agents: {', '.join(agents_names)}") logger.info("Step 2: Generating initial system prompts for all agents...") self.prompter = Prompter(agents_names, self.llm_config) @@ -149,11 +152,13 @@ def build_and_optimize(self) -> Dict[str, Any]: logger.info(f"\n{'='*20} Iteration {i+1}/{self.max_iterations} {'='*20}") logger.info("Evaluating current workflow performance...") - evaluation_results = self.evaluator.evaluate_supervisor( - lambda suffix: self._create_supervisor_instance(self.workflow_id, suffix), self.workflow_id, iteration=i, is_factory=True) + evaluation_results = self.evaluator.evaluate_supervisor(self._create_supervisor_instance, + self.workflow_id, + iteration=i, + is_factory=True) + accuracy = evaluation_results['accuracy'] final_accuracy = accuracy - logger.info(f"Iteration {i+1} Accuracy: {accuracy:.2%}") output_path = self._save_workflow_to_file(accuracy, i) diff --git a/primisai/nexus/architect/structurer.py b/primisai/nexus/architect/structurer.py index 26ad6fa..daaf2d6 100644 --- a/primisai/nexus/architect/structurer.py +++ b/primisai/nexus/architect/structurer.py @@ -38,7 +38,7 @@ def structure_workflow(self, expanded_workflow: str) -> WorkflowDefinition: "content": ("You are a workflow structure expert. Your task is to convert " "the expanded workflow description into structured component " "definitions following the provided schema. Include validation " - "constraints for each component. USE same system messages provided in query if provided. DONT change them") + "constraints for each component. Don't Use Sub Supervisors.") }, { "role": "system", From d7b32d0c5ad2c9d40a2d6563bbe45ecb579a2ec8 Mon Sep 17 00:00:00 2001 From: Humza Sami Date: Thu, 17 Jul 2025 20:50:45 +0500 Subject: [PATCH 3/3] refactor: added docstrings in architect files --- primisai/nexus/architect/builder.py | 23 ++++++++++++++++ primisai/nexus/architect/evaluator.py | 36 ++++++++++++++++++++++---- primisai/nexus/architect/expander.py | 28 +++++++++++++++++--- primisai/nexus/architect/prompter.py | 29 +++++++++++++++++---- primisai/nexus/architect/structurer.py | 29 ++++++++++++++++++--- 5 files changed, 129 insertions(+), 16 deletions(-) diff --git a/primisai/nexus/architect/builder.py b/primisai/nexus/architect/builder.py index 7ad9e6a..382e5e4 100644 --- a/primisai/nexus/architect/builder.py +++ b/primisai/nexus/architect/builder.py @@ -9,8 +9,31 @@ class ValidationError(Exception): class ToolBuilder: + """ + Translates a structured workflow definition into executable Python code. + + This class acts as the "code generator" or "compiler" in the Archtect + pipeline. It takes a formal, structured definition of a workflow (as + produced by the `WorkflowStructurer`) and dynamically generates a runnable + Python script. + + The generated script orchestrates the execution of the various components + (nodes) in the correct order, handling the data flow between them as +e defined by the edges of the workflow graph. The final output is a + self-contained piece of code ready to be executed or passed to the + `Evaluator` for performance assessment. + """ def __init__(self, tool_definition: Tool): + """ + Initializes the ToolBuilder with a complete workflow definition. + + Args: + tool_definition (Tool): A structured object containing the full + specification of the workflow to be built. This object includes + the name, description, a list of component `nodes`, and a list + of `edges` that define the data flow graph. + """ self.definition = tool_definition def build(self) -> Dict[str, Any]: diff --git a/primisai/nexus/architect/evaluator.py b/primisai/nexus/architect/evaluator.py index 01fb9e9..b3fa52f 100644 --- a/primisai/nexus/architect/evaluator.py +++ b/primisai/nexus/architect/evaluator.py @@ -14,15 +14,41 @@ class Evaluator: + """ + Evaluates the performance of generated agentic workflows against a benchmark. + + This class provides a framework for systematically testing the quality and + correctness of workflows created by the "Archtect" system. It operates by + running a generated workflow against a set of predefined tasks from a + benchmark file. + + A key feature of this evaluator is its use of a Large Language Model (LLM) + as a "judge" to score the output of the workflow. The LLM compares the + workflow's final result against the ground-truth or expected outcome defined + in the benchmark, providing a flexible and nuanced assessment of performance. + """ def __init__(self, llm_config: Dict[str, str], benchmark_path: str, subset_size: int = 10): """ - Initialize Evaluator with benchmark data. - + Initializes the Evaluator by loading the benchmark dataset. + + This constructor configures the LLM that will act as the evaluator and + loads the benchmark problems from the specified JSONL file. It can also + limit the evaluation to a smaller subset of the benchmark for faster testing. + Args: - llm_config: LLM configuration for AI evaluator - benchmark_path: Path to the JSONL benchmark file - subset_size: Number of examples to test (default: 10) + llm_config (Dict[str, str]): The configuration for the Language Model + that will be used as the AI judge for scoring the results. + benchmark_path (str): The file path to the benchmark dataset. The file + is expected to be in JSONL (JSON Lines) format, where each line + is a JSON object representing a single test case. + subset_size (int, optional): The number of examples to load from the + benchmark file. If set, only the first `subset_size` examples + will be used. Defaults to 10 for quick evaluations. + + Raises: + FileNotFoundError: If the file at `benchmark_path` does not exist. + ValueError: If `subset_size` is not a positive integer. """ self.ai = AI(llm_config) self.benchmark_path = benchmark_path diff --git a/primisai/nexus/architect/expander.py b/primisai/nexus/architect/expander.py index 33887e8..20776ca 100644 --- a/primisai/nexus/architect/expander.py +++ b/primisai/nexus/architect/expander.py @@ -2,15 +2,37 @@ from typing import Dict, Any + class WorkflowExpander: + """ + Analyzes a user's initial query and expands it into a detailed, narrative plan. + + This class serves as the first step in the "Archtect" workflow creation + pipeline. It takes a concise, high-level user request and uses a Large + Language Model (LLM) to flesh it out into a comprehensive description. + + The primary goal of the expansion is to reason about the user's intent and + propose a concrete plan. This includes identifying the individual tasks, + determining the number and roles of AI agents required (e.g., a researcher, + a writer), and deciding if a "supervisor" agent is needed to manage the + overall workflow. + + The output of this class is a rich, natural-language text that serves as a + more detailed "expanded prompt" for the next component in the system, the + `WorkflowStructurer`. + """ def __init__(self, llm_config: Dict[str, str]): """ - Initialize WorkflowExpander with LLM configuration. + Initializes the WorkflowExpander with LLM configuration. + + This constructor sets up the connection to the Large Language Model (LLM) + that will be used to perform the query expansion and analysis. Args: - llm_config: Dictionary containing LLM configuration - (api_key, model, base_url) + llm_config (Dict[str, str]): A dictionary containing the configuration + for the Language Model client. This typically includes essential + details like 'api_key', 'model', and 'base_url'. """ self.ai = AI(llm_config) diff --git a/primisai/nexus/architect/prompter.py b/primisai/nexus/architect/prompter.py index 580b45d..d4f4b83 100644 --- a/primisai/nexus/architect/prompter.py +++ b/primisai/nexus/architect/prompter.py @@ -75,14 +75,33 @@ def create_dynamic_schema(agent_names): class Prompter: + """ + A specialized class for crafting and managing prompts for AI agents. + + Within a multi-agent framework, this class acts as a central prompt + engineering hub. It is responsible for creating contextually-aware and + role-specific prompts that guide each agent's behavior and responses. + + By encapsulating prompt logic here, we can easily manage system messages, + task instructions, and the formatting of conversational history to ensure + agents perform their designated functions effectively. + """ - def __init__(self, agent_names, llm_config: Dict[str, str]): + def __init__(self, agent_names: List[str], llm_config: Dict[str, str]): """ - Initialize Prompter with LLM configuration and conversation management. - + Initializes the Prompter instance. + + This constructor sets up the prompter with the names of the agents it will + be generating prompts for, and the configuration for the Language Model + that might be used in the prompting process. + Args: - llm_config: Dictionary containing LLM configuration - (api_key, model, base_url) + agent_names (List[str]): A list of unique string identifiers for the + AI agents in the system. This allows the prompter to tailor + instructions for specific agent roles. + llm_config (Dict[str, str]): A dictionary containing the configuration + for the Language Model client. It typically includes essential + keys like 'api_key', 'model', and 'base_url'. """ self.ai = AI(llm_config) self.model = llm_config.get("model", "gpt-4.1") diff --git a/primisai/nexus/architect/structurer.py b/primisai/nexus/architect/structurer.py index daaf2d6..9027f0f 100644 --- a/primisai/nexus/architect/structurer.py +++ b/primisai/nexus/architect/structurer.py @@ -5,13 +5,36 @@ class WorkflowStructurer: + """ + Handles the structuring phase of the "Archtect" agentic workflow creation. + + This class is responsible for taking a high-level, natural language user + prompt (an "expanded prompt") and converting it into a formal, structured + JSON output. It achieves this by using a Large Language Model (LLM) that is + guided by a 'NEXUS_DOCUMENTATION.md' file. This file defines the available + components and rules for building a valid workflow. + + The structured output generated by this class is designed to be consumed by + a "Builder" component, which then translates the structure into executable code. + """ def __init__(self, llm_config: Dict[str, str]): """ - Initialize WorkflowStructurer with LLM configuration. - + Initializes the WorkflowStructurer instance. + + This constructor configures the internal AI client with the provided + LLM settings. It also locates and loads the critical 'NEXUS_DOCUMENTATION.md' + file from the same directory. This documentation is stored in memory + to be used as context for the LLM during the structuring process. + Args: - llm_config: Dictionary containing LLM configuration + llm_config (Dict[str, str]): A dictionary containing the configuration + for the Language Model client, which may include API keys, model + names, endpoints, etc. + + Raises: + FileNotFoundError: If the 'NEXUS_DOCUMENTATION.md' file is not found + in the same directory as this module. """ self.llm_config = llm_config self.ai = AI(self.llm_config)