From 0341f5b7d223a53a18aeb2e74c6e2e3aad065539 Mon Sep 17 00:00:00 2001 From: njx <3771829673@qq.com> Date: Sat, 15 Nov 2025 00:42:10 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agents/base_agent.py | 50 +++++- src/agents/mcpmark_agent.py | 113 +++++++++++-- src/base/task_manager.py | 29 +++- src/config/config_schema.py | 12 +- src/errors.py | 163 ++++++++++++++++-- src/evaluator.py | 16 +- src/exceptions.py | 319 ++++++++++++++++++++++++++++++++++++ src/factory.py | 14 +- src/model_config.py | 6 +- src/services.py | 7 +- 10 files changed, 680 insertions(+), 49 deletions(-) create mode 100644 src/exceptions.py diff --git a/src/agents/base_agent.py b/src/agents/base_agent.py index 661a1994..abcc5eb0 100644 --- a/src/agents/base_agent.py +++ b/src/agents/base_agent.py @@ -10,6 +10,11 @@ from typing import Any, Dict, List, Optional, Callable from src.logger import get_logger +from src.exceptions import ( + MissingConfigurationError, + InvalidConfigurationError, + MCPServiceError, +) from .mcp import MCPStdioServer, MCPHttpServer from .utils import TokenUsageTracker @@ -144,13 +149,20 @@ async def _create_mcp_server(self) -> Any: return self._create_stdio_server() if self.mcp_service in self.HTTP_SERVICES: return self._create_http_server() - raise ValueError(f"Unsupported MCP service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported MCP service. STDIO services: {self.STDIO_SERVICES}, HTTP services: {self.HTTP_SERVICES}" + ) def _create_stdio_server(self) -> MCPStdioServer: if self.mcp_service == "notion": notion_key = self.service_config.get("notion_key") if not notion_key: - raise ValueError("Notion API key required") + raise MissingConfigurationError( + config_key="notion_key", + service="notion" + ) return MCPStdioServer( command="npx", args=["-y", "@notionhq/notion-mcp-server"], @@ -165,7 +177,10 @@ def _create_stdio_server(self) -> MCPStdioServer: if self.mcp_service == "filesystem": test_directory = self.service_config.get("test_directory") if not test_directory: - raise ValueError("Test directory required for filesystem service") + raise MissingConfigurationError( + config_key="test_directory", + service="filesystem" + ) return MCPStdioServer( command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", str(test_directory)], @@ -199,7 +214,11 @@ def _create_stdio_server(self) -> MCPStdioServer: password = self.service_config.get("password") database = self.service_config.get("current_database") or self.service_config.get("database") if not all([username, password, database]): - raise ValueError("PostgreSQL requires username, password, and database") + missing = [k for k, v in [("username", username), ("password", password), ("database", database)] if not v] + raise MissingConfigurationError( + config_key=", ".join(missing), + service="postgres" + ) database_url = f"postgresql://{username}:{password}@{host}:{port}/{database}" return MCPStdioServer( command="pipx", @@ -211,7 +230,11 @@ def _create_stdio_server(self) -> MCPStdioServer: api_key = self.service_config.get("api_key") backend_url = self.service_config.get("backend_url") if not all([api_key, backend_url]): - raise ValueError("Insforge requires api_key and backend_url") + missing = [k for k, v in [("api_key", api_key), ("backend_url", backend_url)] if not v] + raise MissingConfigurationError( + config_key=", ".join(missing), + service="insforge" + ) return MCPStdioServer( command="npx", args=["-y", "@insforge/mcp@dev"], @@ -221,13 +244,20 @@ def _create_stdio_server(self) -> MCPStdioServer: }, ) - raise ValueError(f"Unsupported stdio service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported stdio service. Supported: {self.STDIO_SERVICES}" + ) def _create_http_server(self) -> MCPHttpServer: if self.mcp_service == "github": github_token = self.service_config.get("github_token") if not github_token: - raise ValueError("GitHub token required") + raise MissingConfigurationError( + config_key="github_token", + service="github" + ) return MCPHttpServer( url="https://api.githubcopilot.com/mcp/", headers={ @@ -235,7 +265,11 @@ def _create_http_server(self) -> MCPHttpServer: "User-Agent": "MCPMark/1.0", }, ) - raise ValueError(f"Unsupported HTTP service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported HTTP service. Supported: {self.HTTP_SERVICES}" + ) # ------------------------------------------------------------------ # Message/Tool formatting helpers diff --git a/src/agents/mcpmark_agent.py b/src/agents/mcpmark_agent.py index 66c13109..013db7d0 100644 --- a/src/agents/mcpmark_agent.py +++ b/src/agents/mcpmark_agent.py @@ -15,6 +15,15 @@ import nest_asyncio from src.logger import get_logger +from src.exceptions import ( + AgentExecutionError, + AgentTimeoutError, + LLMRateLimitError, + LLMQuotaExceededError, + LLMContextWindowExceededError, + MissingConfigurationError, + InvalidConfigurationError, +) from .base_agent import BaseMCPAgent from .mcp import MCPStdioServer, MCPHttpServer @@ -133,9 +142,22 @@ async def _execute_with_strategy(): if isinstance(e, asyncio.TimeoutError): error_msg = f"Execution timed out after {self.timeout} seconds" logger.error(error_msg) + # Convert to AgentTimeoutError but don't raise - return error result + timeout_error = AgentTimeoutError( + agent_name=self.__class__.__name__, + timeout=self.timeout, + cause=e + ) + error_msg = str(timeout_error) else: - error_msg = f"Agent execution failed: {e}" - logger.error(error_msg, exc_info=True) + # Check if it's already an MCPMarkException + if isinstance(e, (AgentExecutionError, AgentTimeoutError, + LLMRateLimitError, LLMQuotaExceededError, + LLMContextWindowExceededError)): + error_msg = str(e) + else: + error_msg = f"Agent execution failed: {e}" + logger.error(error_msg, exc_info=True) self.usage_tracker.update( success=False, @@ -606,17 +628,46 @@ async def _execute_litellm_tool_loop( logger.warning(f"| ✗ LLM call timed out on turn {turn_count + 1}") consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: - raise Exception(f"Too many consecutive failures ({consecutive_failures})") + raise AgentTimeoutError( + agent_name=self.__class__.__name__, + timeout=self.timeout / 2, + cause=asyncio.TimeoutError(f"Too many consecutive failures ({consecutive_failures})") + ) await asyncio.sleep(8 ** consecutive_failures) # Exponential backoff continue except Exception as e: logger.error(f"| ✗ LLM call failed on turn {turn_count + 1}: {e}") consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: - raise + # Convert to appropriate exception type + error_str = str(e) + if "ContextWindowExceededError" in error_str: + raise LLMContextWindowExceededError( + model_name=self.litellm_input_model_name, + cause=e + ) + elif "RateLimitError" in error_str or "rate limit" in error_str.lower(): + raise LLMRateLimitError( + model_name=self.litellm_input_model_name, + cause=e + ) + elif "quota" in error_str.lower() or "account balance" in error_str.lower(): + raise LLMQuotaExceededError( + model_name=self.litellm_input_model_name, + cause=e + ) + else: + raise AgentExecutionError( + agent_name=self.__class__.__name__, + reason=str(e), + cause=e + ) if "ContextWindowExceededError" in str(e): - raise - elif "RateLimitError" in str(e): + raise LLMContextWindowExceededError( + model_name=self.litellm_input_model_name, + cause=e + ) + elif "RateLimitError" in str(e) or "rate limit" in str(e).lower(): await asyncio.sleep(12 ** consecutive_failures) else: await asyncio.sleep(2 ** consecutive_failures) @@ -789,7 +840,11 @@ async def _create_mcp_server(self) -> Any: elif self.mcp_service in self.HTTP_SERVICES: return self._create_http_server() else: - raise ValueError(f"Unsupported MCP service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported MCP service. STDIO services: {self.STDIO_SERVICES}, HTTP services: {self.HTTP_SERVICES}" + ) def _create_stdio_server(self) -> MCPStdioServer: @@ -797,7 +852,10 @@ def _create_stdio_server(self) -> MCPStdioServer: if self.mcp_service == "notion": notion_key = self.service_config.get("notion_key") if not notion_key: - raise ValueError("Notion API key required") + raise MissingConfigurationError( + config_key="notion_key", + service="notion" + ) return MCPStdioServer( command="npx", @@ -813,7 +871,10 @@ def _create_stdio_server(self) -> MCPStdioServer: elif self.mcp_service == "filesystem": test_directory = self.service_config.get("test_directory") if not test_directory: - raise ValueError("Test directory required for filesystem service") + raise MissingConfigurationError( + config_key="test_directory", + service="filesystem" + ) return MCPStdioServer( command="npx", @@ -846,7 +907,11 @@ def _create_stdio_server(self) -> MCPStdioServer: database = self.service_config.get("current_database") or self.service_config.get("database") if not all([username, password, database]): - raise ValueError("PostgreSQL requires username, password, and database") + missing = [k for k, v in [("username", username), ("password", password), ("database", database)] if not v] + raise MissingConfigurationError( + config_key=", ".join(missing), + service="postgres" + ) database_url = f"postgresql://{username}:{password}@{host}:{port}/{database}" @@ -860,7 +925,11 @@ def _create_stdio_server(self) -> MCPStdioServer: api_key = self.service_config.get("api_key") backend_url = self.service_config.get("backend_url") if not all([api_key, backend_url]): - raise ValueError("Insforge requires api_key and backend_url") + missing = [k for k, v in [("api_key", api_key), ("backend_url", backend_url)] if not v] + raise MissingConfigurationError( + config_key=", ".join(missing), + service="insforge" + ) return MCPStdioServer( command="npx", args=["-y", "@insforge/mcp@dev"], @@ -871,7 +940,11 @@ def _create_stdio_server(self) -> MCPStdioServer: ) else: - raise ValueError(f"Unsupported stdio service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported stdio service. Supported: {self.STDIO_SERVICES}" + ) def _create_http_server(self) -> MCPHttpServer: @@ -879,7 +952,10 @@ def _create_http_server(self) -> MCPHttpServer: if self.mcp_service == "github": github_token = self.service_config.get("github_token") if not github_token: - raise ValueError("GitHub token required") + raise MissingConfigurationError( + config_key="github_token", + service="github" + ) return MCPHttpServer( url="https://api.githubcopilot.com/mcp/", @@ -895,7 +971,10 @@ def _create_http_server(self) -> MCPHttpServer: api_key = self.service_config.get("api_key", "") if not api_key: - raise ValueError("Supabase requires api_key (use secret key from 'supabase status')") + raise MissingConfigurationError( + config_key="api_key", + service="supabase" + ) # Supabase CLI exposes MCP at /mcp endpoint mcp_url = f"{api_url}/mcp" @@ -909,5 +988,9 @@ def _create_http_server(self) -> MCPHttpServer: ) else: - raise ValueError(f"Unsupported HTTP service: {self.mcp_service}") + raise InvalidConfigurationError( + config_key="mcp_service", + value=self.mcp_service, + reason=f"Unsupported HTTP service. Supported: {self.HTTP_SERVICES}" + ) diff --git a/src/base/task_manager.py b/src/base/task_manager.py index 65923008..41503cc6 100644 --- a/src/base/task_manager.py +++ b/src/base/task_manager.py @@ -17,6 +17,7 @@ from src.logger import get_logger from src.results_reporter import TaskResult +from src.exceptions import TaskVerificationError logger = get_logger(__name__) @@ -247,13 +248,29 @@ def run_verification(self, task: BaseTask) -> subprocess.CompletedProcess: Default implementation runs the verification command. Services can override this to add environment variables or custom logic. + + Raises: + TaskVerificationError: If verification fails or times out """ - return subprocess.run( - self._get_verification_command(task), - capture_output=True, # Capture stdout and stderr for logging - text=True, - timeout=300, - ) + try: + return subprocess.run( + self._get_verification_command(task), + capture_output=True, # Capture stdout and stderr for logging + text=True, + timeout=300, + ) + except subprocess.TimeoutExpired as e: + raise TaskVerificationError( + task_name=task.name, + reason=f"Verification script timed out after 300s: {e}", + cause=e + ) + except subprocess.CalledProcessError as e: + raise TaskVerificationError( + task_name=task.name, + reason=f"Verification script failed with exit code {e.returncode}", + cause=e + ) # ========================================================================= # Abstract Methods - Minimal Set Required diff --git a/src/config/config_schema.py b/src/config/config_schema.py index 907ed9f5..f09bb7be 100644 --- a/src/config/config_schema.py +++ b/src/config/config_schema.py @@ -17,6 +17,7 @@ from dotenv import load_dotenv from src.logger import get_logger +from src.exceptions import MissingConfigurationError, InvalidConfigurationError logger = get_logger(__name__) @@ -42,11 +43,18 @@ class ConfigValue: def validate(self) -> bool: """Validate the configuration value.""" if self.required and self.value is None: - raise ValueError(f"Required configuration '{self.key}' is missing") + raise MissingConfigurationError( + config_key=self.key, + service=None, # Could be passed from parent if available + ) if self.validator and self.value is not None: if not self.validator(self.value): - raise ValueError(f"Invalid value for '{self.key}': {self.value}") + raise InvalidConfigurationError( + config_key=self.key, + value=self.value, + reason="Validation failed" + ) return True diff --git a/src/errors.py b/src/errors.py index d39d5f05..37c6ae25 100644 --- a/src/errors.py +++ b/src/errors.py @@ -1,12 +1,20 @@ #!/usr/bin/env python3 """ -Simple Error Handling for MCPMark -================================== +Error Handling for MCPMark +=========================== -Provides basic error standardization and retry logic. +Provides error standardization, retry logic, and exception utilities. """ -from typing import Optional +from typing import Optional, Union, Dict, Any +import asyncio + +# Lazy import to avoid circular dependencies +try: + from src.exceptions import MCPMarkException +except ImportError: + # Fallback for when exceptions module is not available + MCPMarkException = Exception """Retryable error detection via minimal substring matching (lower-case).""" @@ -29,14 +37,47 @@ } -def is_retryable_error(error: str) -> bool: - """Return True if the error string contains any retryable pattern.""" - error_lower = str(error or "").lower() - return any(pattern in error_lower for pattern in RETRYABLE_PATTERNS) +def is_retryable_error(error: Union[str, Exception]) -> bool: + """ + Determine if an error is retryable. + + Args: + error: Error message string or Exception instance + + Returns: + True if the error is retryable, False otherwise + """ + # If it's an MCPMarkException, use its retryable flag + if isinstance(error, MCPMarkException): + return error.retryable + + # Otherwise, check error message against patterns + error_str = str(error).lower() + return any(pattern in error_str for pattern in RETRYABLE_PATTERNS) -def standardize_error_message(error: str, mcp_service: Optional[str] = None) -> str: - """Standardize error messages for consistent reporting.""" +def standardize_error_message( + error: Union[str, Exception], + mcp_service: Optional[str] = None +) -> str: + """ + Standardize error messages for consistent reporting. + + Args: + error: Error message string or Exception instance + mcp_service: Optional MCP service name for context + + Returns: + Standardized error message + """ + # If it's an MCPMarkException, use its formatted message + if isinstance(error, MCPMarkException): + error_msg = str(error) + # Add service prefix if provided and not already in message + if mcp_service and mcp_service.lower() not in error_msg.lower(): + return f"{mcp_service.title()} {error_msg}" + return error_msg + error_str = str(error).strip() # Common standardizations @@ -63,3 +104,105 @@ def standardize_error_message(error: str, mcp_service: Optional[str] = None) -> return f"{mcp_service.title()} {base_msg}" return base_msg + + +def extract_error_message(error: Union[str, Exception]) -> str: + """ + Extract error message from exception or string. + + Args: + error: Error message string or Exception instance + + Returns: + Error message string + """ + if isinstance(error, Exception): + return str(error) + return str(error) + + +def convert_to_mcpmark_exception( + error: Exception, + context: Optional[Dict[str, Any]] = None +) -> MCPMarkException: + """ + Convert a standard exception to MCPMarkException. + + This function provides backward compatibility by converting + existing exceptions to the new exception hierarchy. + + Args: + error: Standard exception to convert + context: Additional context information + + Returns: + Appropriate MCPMarkException subclass + """ + # Import here to avoid circular dependencies + from src.exceptions import ( + MCPMarkException, + ConfigurationError, + ServiceError, + MCPServiceTimeoutError, + MCPServiceAuthenticationError, + AgentTimeoutError, + ) + + error_str = str(error).lower() + context = context or {} + + # Handle specific exception types + if isinstance(error, ValueError): + if "configuration" in error_str or "config" in error_str: + return ConfigurationError( + message=str(error), + context=context, + cause=error + ) + elif "service" in error_str or "mcp" in error_str: + service_name = context.get("service_name", "unknown") + return ServiceError( + message=str(error), + context=context, + cause=error + ) + + elif isinstance(error, RuntimeError): + if "timeout" in error_str: + service_name = context.get("service_name", "unknown") + timeout = context.get("timeout", 0) + return MCPServiceTimeoutError( + service_name=service_name, + timeout=timeout, + cause=error + ) + elif "authentication" in error_str or "unauthorized" in error_str: + service_name = context.get("service_name", "unknown") + return MCPServiceAuthenticationError( + service_name=service_name, + reason=str(error), + cause=error + ) + + elif isinstance(error, FileNotFoundError): + return ConfigurationError( + message=f"File not found: {error}", + context=context, + cause=error + ) + + elif isinstance(error, asyncio.TimeoutError): + agent_name = context.get("agent_name", "unknown") + timeout = context.get("timeout", 0) + return AgentTimeoutError( + agent_name=agent_name, + timeout=timeout, + cause=error + ) + + # Default: wrap as generic MCPMarkException + return MCPMarkException( + message=str(error), + context=context, + cause=error + ) diff --git a/src/evaluator.py b/src/evaluator.py index 31f1b711..674e2b26 100644 --- a/src/evaluator.py +++ b/src/evaluator.py @@ -12,6 +12,12 @@ from src.results_reporter import EvaluationReport, ResultsReporter, TaskResult from src.errors import is_retryable_error from src.agents import AGENT_REGISTRY +from src.exceptions import ( + InvalidConfigurationError, + StateSetupError, + AgentExecutionError, + TaskExecutionError, +) # Initialize logger logger = get_logger(__name__) @@ -33,7 +39,11 @@ def __init__( self.timeout = timeout self.agent_name = (agent_name or "mcpmark").lower() if self.agent_name not in AGENT_REGISTRY: - raise ValueError(f"Unsupported agent '{agent_name}'. Available: {sorted(AGENT_REGISTRY)}") + raise InvalidConfigurationError( + config_key="agent_name", + value=agent_name, + reason=f"Unsupported agent. Available: {sorted(AGENT_REGISTRY)}" + ) # Initialize model configuration self.reasoning_effort = reasoning_effort @@ -190,10 +200,12 @@ def _run_single_task(self, task) -> TaskResult: if not setup_success: logger.error(f"| State setup failed for task: {task.name}") task_total_time = time.time() - task_start_time + # Use StateSetupError for better error classification + error_msg = f"State setup failed for service '{self.mcp_service}' (task: {task.name})" return TaskResult( task_name=task.name, success=False, - error_message="State Duplication Error", + error_message=error_msg, verification_error=None, verification_output=None, category_id=task.category_id, diff --git a/src/exceptions.py b/src/exceptions.py new file mode 100644 index 00000000..383c7d50 --- /dev/null +++ b/src/exceptions.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +MCPMark Custom Exception Classes +================================= + +Provides a hierarchical exception system for better error handling, +classification, and debugging. +""" + +from typing import Optional, Dict, Any + + +class MCPMarkException(Exception): + """Base exception for all MCPMark errors.""" + + def __init__( + self, + message: str, + error_code: Optional[str] = None, + context: Optional[Dict[str, Any]] = None, + retryable: bool = False, + cause: Optional[Exception] = None, + ): + super().__init__(message) + self.message = message + self.error_code = error_code or self.__class__.__name__ + self.context = context or {} + self.retryable = retryable + self.cause = cause + + # Preserve exception chain if cause is provided + if cause: + self.__cause__ = cause + + def __str__(self) -> str: + """Return formatted error message with context.""" + msg = self.message + if self.context: + ctx_str = ", ".join(f"{k}={v}" for k, v in self.context.items() if v is not None) + if ctx_str: + msg = f"{msg} (Context: {ctx_str})" + return msg + + def to_dict(self) -> Dict[str, Any]: + """Convert exception to dictionary for serialization.""" + return { + "error_type": self.__class__.__name__, + "error_code": self.error_code, + "message": self.message, + "context": self.context, + "retryable": self.retryable, + "cause": str(self.cause) if self.cause else None, + } + + +# Configuration Errors +class ConfigurationError(MCPMarkException): + """Base class for configuration-related errors.""" + pass + + +class MissingConfigurationError(ConfigurationError): + """Raised when required configuration is missing.""" + + def __init__(self, config_key: str, service: Optional[str] = None, **kwargs): + message = f"Missing required configuration: {config_key}" + if service: + message += f" for service: {service}" + context = {"config_key": config_key} + if service: + context["service"] = service + super().__init__(message, context=context, **kwargs) + + +class InvalidConfigurationError(ConfigurationError): + """Raised when configuration value is invalid.""" + + def __init__(self, config_key: str, value: Any, reason: Optional[str] = None, **kwargs): + message = f"Invalid configuration value for '{config_key}': {value}" + if reason: + message += f" - {reason}" + context = {"config_key": config_key, "value": str(value)} + if reason: + context["reason"] = reason + super().__init__(message, context=context, **kwargs) + + +class ModelConfigurationError(ConfigurationError): + """Raised when model configuration is invalid.""" + + def __init__(self, model_name: str, reason: str, **kwargs): + message = f"Model configuration error for '{model_name}': {reason}" + context = {"model_name": model_name, "reason": reason} + super().__init__(message, context=context, **kwargs) + + +# Service Errors +class ServiceError(MCPMarkException): + """Base class for service-related errors.""" + pass + + +class MCPServiceError(ServiceError): + """Base class for MCP service errors.""" + + def __init__(self, message: str, service_name: str, **kwargs): + context = kwargs.get("context", {}) + context["service_name"] = service_name + kwargs["context"] = context + super().__init__(message, **kwargs) + + +class MCPServiceUnavailableError(MCPServiceError): + """Raised when MCP service is unavailable.""" + + def __init__(self, service_name: str, reason: Optional[str] = None, **kwargs): + message = f"MCP service '{service_name}' is unavailable" + if reason: + message += f": {reason}" + super().__init__(message, service_name, retryable=True, **kwargs) + + +class MCPServiceTimeoutError(MCPServiceError): + """Raised when MCP service operation times out.""" + + def __init__(self, service_name: str, timeout: float, operation: Optional[str] = None, **kwargs): + message = f"MCP service '{service_name}' operation timed out after {timeout}s" + if operation: + message += f" during {operation}" + context = {"timeout": timeout} + if operation: + context["operation"] = operation + super().__init__(message, service_name, context=context, retryable=True, **kwargs) + + +class MCPServiceAuthenticationError(MCPServiceError): + """Raised when MCP service authentication fails.""" + + def __init__(self, service_name: str, reason: Optional[str] = None, **kwargs): + message = f"Authentication failed for MCP service '{service_name}'" + if reason: + message += f": {reason}" + super().__init__(message, service_name, retryable=False, **kwargs) + + +class StateManagerError(ServiceError): + """Base class for state manager errors.""" + pass + + +class StateSetupError(StateManagerError): + """Raised when state setup fails.""" + + def __init__(self, service_name: str, task_name: Optional[str] = None, reason: Optional[str] = None, **kwargs): + message = f"State setup failed for service '{service_name}'" + if task_name: + message += f" (task: {task_name})" + if reason: + message += f": {reason}" + context = {"service_name": service_name} + if task_name: + context["task_name"] = task_name + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=True, **kwargs) + + +class StateCleanupError(StateManagerError): + """Raised when state cleanup fails.""" + + def __init__(self, service_name: str, task_name: Optional[str] = None, reason: Optional[str] = None, **kwargs): + message = f"State cleanup failed for service '{service_name}'" + if task_name: + message += f" (task: {task_name})" + if reason: + message += f": {reason}" + context = {"service_name": service_name} + if task_name: + context["task_name"] = task_name + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=False, **kwargs) + + +class StateDuplicationError(StateManagerError): + """Raised when state duplication fails (e.g., resource already exists).""" + + def __init__(self, service_name: str, resource: Optional[str] = None, **kwargs): + message = f"State duplication error for service '{service_name}'" + if resource: + message += f" (resource: {resource})" + context = {"service_name": service_name} + if resource: + context["resource"] = resource + super().__init__(message, context=context, retryable=True, **kwargs) + + +class TaskManagerError(ServiceError): + """Base class for task manager errors.""" + pass + + +class TaskNotFoundError(TaskManagerError): + """Raised when a task is not found.""" + + def __init__(self, task_name: str, service: Optional[str] = None, **kwargs): + message = f"Task not found: {task_name}" + if service: + message += f" (service: {service})" + context = {"task_name": task_name} + if service: + context["service"] = service + super().__init__(message, context=context, retryable=False, **kwargs) + + +class TaskVerificationError(TaskManagerError): + """Raised when task verification fails.""" + + def __init__(self, task_name: str, reason: Optional[str] = None, **kwargs): + message = f"Task verification failed for '{task_name}'" + if reason: + message += f": {reason}" + context = {"task_name": task_name} + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=False, **kwargs) + + +# Agent Errors +class AgentError(MCPMarkException): + """Base class for agent execution errors.""" + pass + + +class AgentExecutionError(AgentError): + """Raised when agent execution fails.""" + + def __init__(self, agent_name: str, reason: Optional[str] = None, **kwargs): + message = f"Agent '{agent_name}' execution failed" + if reason: + message += f": {reason}" + context = {"agent_name": agent_name} + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=True, **kwargs) + + +class AgentTimeoutError(AgentError): + """Raised when agent execution times out.""" + + def __init__(self, agent_name: str, timeout: float, **kwargs): + message = f"Agent '{agent_name}' execution timed out after {timeout}s" + context = {"agent_name": agent_name, "timeout": timeout} + super().__init__(message, context=context, retryable=True, **kwargs) + + +class LLMError(AgentError): + """Base class for LLM-related errors.""" + pass + + +class LLMRateLimitError(LLMError): + """Raised when LLM rate limit is exceeded.""" + + def __init__(self, model_name: str, **kwargs): + message = f"Rate limit exceeded for model '{model_name}'" + context = {"model_name": model_name} + super().__init__(message, context=context, retryable=True, **kwargs) + + +class LLMQuotaExceededError(LLMError): + """Raised when LLM quota is exceeded.""" + + def __init__(self, model_name: str, **kwargs): + message = f"Quota exceeded for model '{model_name}'" + context = {"model_name": model_name} + super().__init__(message, context=context, retryable=False, **kwargs) + + +class LLMContextWindowExceededError(LLMError): + """Raised when LLM context window is exceeded.""" + + def __init__(self, model_name: str, **kwargs): + message = f"Context window exceeded for model '{model_name}'" + context = {"model_name": model_name} + super().__init__(message, context=context, retryable=False, **kwargs) + + +# Task Execution Errors +class TaskExecutionError(MCPMarkException): + """Base class for task execution errors.""" + pass + + +class TaskSetupError(TaskExecutionError): + """Raised when task setup fails.""" + + def __init__(self, task_name: str, reason: Optional[str] = None, **kwargs): + message = f"Task setup failed for '{task_name}'" + if reason: + message += f": {reason}" + context = {"task_name": task_name} + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=True, **kwargs) + + +class TaskCleanupError(TaskExecutionError): + """Raised when task cleanup fails.""" + + def __init__(self, task_name: str, reason: Optional[str] = None, **kwargs): + message = f"Task cleanup failed for '{task_name}'" + if reason: + message += f": {reason}" + context = {"task_name": task_name} + if reason: + context["reason"] = reason + super().__init__(message, context=context, retryable=False, **kwargs) + diff --git a/src/factory.py b/src/factory.py index eecc3148..80034d50 100644 --- a/src/factory.py +++ b/src/factory.py @@ -21,6 +21,7 @@ from src.base.task_manager import BaseTaskManager from src.config.config_schema import ConfigRegistry from src.services import get_service_definition, get_supported_mcp_services +from src.exceptions import ConfigurationError @dataclass @@ -37,9 +38,16 @@ def import_class(module_path: str): """Dynamically import a class from module path string.""" if not module_path: return None - module_name, class_name = module_path.rsplit(".", 1) - module = importlib.import_module(module_name) - return getattr(module, class_name) + try: + module_name, class_name = module_path.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + raise ConfigurationError( + message=f"Failed to import class from '{module_path}': {e}", + context={"module_path": module_path}, + cause=e + ) def apply_config_mapping(config: dict, mapping: dict) -> dict: diff --git a/src/model_config.py b/src/model_config.py index f8fadde9..dd8e08a8 100644 --- a/src/model_config.py +++ b/src/model_config.py @@ -11,6 +11,7 @@ from typing import Dict, List from src.logger import get_logger +from src.exceptions import ModelConfigurationError # Initialize logger logger = get_logger(__name__) @@ -185,8 +186,9 @@ def __init__(self, model_name: str): self.api_key = os.getenv(model_info["api_key_var"]) if not self.api_key: - raise ValueError( - f"Missing required environment variable: {model_info['api_key_var']}" + raise ModelConfigurationError( + model_name=model_name, + reason=f"Missing required environment variable: {model_info['api_key_var']}" ) self.litellm_input_model_name = model_info.get("litellm_input_model_name", model_name) diff --git a/src/services.py b/src/services.py index 041ce311..849eebbc 100644 --- a/src/services.py +++ b/src/services.py @@ -445,7 +445,12 @@ def get_service_definition(service_name: str) -> dict: """Get MCP service definition by name.""" if service_name not in SERVICES: - raise ValueError(f"Unknown MCP service: {service_name}") + from src.exceptions import InvalidConfigurationError + raise InvalidConfigurationError( + config_key="service_name", + value=service_name, + reason=f"Unknown MCP service. Available: {sorted(SERVICES.keys())}" + ) return SERVICES[service_name] From 89d803924762a209fd51b329ea6c483a0fe3f84b Mon Sep 17 00:00:00 2001 From: njx <3771829673@qq.com> Date: Sat, 15 Nov 2025 00:53:42 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agents/mcpmark_agent.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/agents/mcpmark_agent.py b/src/agents/mcpmark_agent.py index 013db7d0..b6bc5705 100644 --- a/src/agents/mcpmark_agent.py +++ b/src/agents/mcpmark_agent.py @@ -349,14 +349,17 @@ async def _execute_anthropic_native_tool_loop( tools=tools, system=system_text ) - if turn_count == 1: - self.litellm_run_model_name = response['model'].split("/")[-1] + # Check for errors immediately after API call, before accessing response if error_msg: break + # Now safe to access response fields + if turn_count == 1 and response and "model" in response: + self.litellm_run_model_name = response['model'].split("/")[-1] + # Update token usage - if "usage" in response: + if response and "usage" in response: usage = response["usage"] input_tokens = usage.get("input_tokens", 0) output_tokens = usage.get("output_tokens", 0) @@ -370,7 +373,7 @@ async def _execute_anthropic_native_tool_loop( ## TODO: add reasoning tokens for claude # Extract blocks from response - blocks = response.get("content", []) + blocks = response.get("content", []) if response else [] tool_uses = [b for b in blocks if b.get("type") == "tool_use"] thinking_blocks = [b for b in blocks if b.get("type") == "thinking"] text_blocks = [b for b in blocks if b.get("type") == "text"] From fc9d6002c83160c5c0d70e0314371052912ea39d Mon Sep 17 00:00:00 2001 From: njx <3771829673@qq.com> Date: Sat, 15 Nov 2025 01:01:11 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agents/mcpmark_agent.py | 41 ++++++++++++++++++++----------------- src/base/task_manager.py | 12 ++++++++++- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/agents/mcpmark_agent.py b/src/agents/mcpmark_agent.py index b6bc5705..27a1507b 100644 --- a/src/agents/mcpmark_agent.py +++ b/src/agents/mcpmark_agent.py @@ -641,15 +641,19 @@ async def _execute_litellm_tool_loop( except Exception as e: logger.error(f"| ✗ LLM call failed on turn {turn_count + 1}: {e}") consecutive_failures += 1 + + # Handle specific error types that should not be retried + error_str = str(e) + if "ContextWindowExceededError" in error_str: + raise LLMContextWindowExceededError( + model_name=self.litellm_input_model_name, + cause=e + ) + + # Check if we've exceeded max consecutive failures if consecutive_failures >= max_consecutive_failures: # Convert to appropriate exception type - error_str = str(e) - if "ContextWindowExceededError" in error_str: - raise LLMContextWindowExceededError( - model_name=self.litellm_input_model_name, - cause=e - ) - elif "RateLimitError" in error_str or "rate limit" in error_str.lower(): + if "RateLimitError" in error_str or "rate limit" in error_str.lower(): raise LLMRateLimitError( model_name=self.litellm_input_model_name, cause=e @@ -665,12 +669,9 @@ async def _execute_litellm_tool_loop( reason=str(e), cause=e ) - if "ContextWindowExceededError" in str(e): - raise LLMContextWindowExceededError( - model_name=self.litellm_input_model_name, - cause=e - ) - elif "RateLimitError" in str(e) or "rate limit" in str(e).lower(): + + # Retry with exponential backoff for recoverable errors + if "RateLimitError" in error_str or "rate limit" in error_str.lower(): await asyncio.sleep(12 ** consecutive_failures) else: await asyncio.sleep(2 ** consecutive_failures) @@ -699,9 +700,13 @@ async def _execute_litellm_tool_loop( # Get response message choices = response.choices - if len(choices): - message = choices[0].message - message_dict = message.model_dump() if hasattr(message, 'model_dump') else dict(message) + if not len(choices): + logger.warning("| No choices in response, ending task") + ended_normally = False + break + + message = choices[0].message + message_dict = message.model_dump() if hasattr(message, 'model_dump') else dict(message) # Log assistant's text content if present if hasattr(message, 'content') and message.content: @@ -766,9 +771,7 @@ async def _execute_litellm_tool_loop( continue else: # Log end reason - if not choices: - logger.info("|\n|\n| Task ended with no messages generated by the model.") - elif choices[0].finish_reason == "stop": + if choices[0].finish_reason == "stop": logger.info("|\n|\n| Task ended with the finish reason from messages being 'stop'.") # No tool/function call, add message and we're done diff --git a/src/base/task_manager.py b/src/base/task_manager.py index 41503cc6..7617ff6d 100644 --- a/src/base/task_manager.py +++ b/src/base/task_manager.py @@ -253,12 +253,22 @@ def run_verification(self, task: BaseTask) -> subprocess.CompletedProcess: TaskVerificationError: If verification fails or times out """ try: - return subprocess.run( + result = subprocess.run( self._get_verification_command(task), capture_output=True, # Capture stdout and stderr for logging text=True, timeout=300, + check=False, # Don't raise on non-zero exit, we check returncode below ) + # Check return code manually and raise if non-zero + if result.returncode != 0: + raise subprocess.CalledProcessError( + result.returncode, + self._get_verification_command(task), + result.stdout, + result.stderr + ) + return result except subprocess.TimeoutExpired as e: raise TaskVerificationError( task_name=task.name,