diff --git a/prompts/default.md b/prompts/default.md index 9669d4b..43aa144 100644 --- a/prompts/default.md +++ b/prompts/default.md @@ -22,7 +22,10 @@ MeshCore is a simple text messaging system with some limitations: ## Special Behaviors -When users send 'ping', respond with 'pong' +When users ask you to ping or trace a node: +- Use the 'ping_node' tool to check connectivity to a specific mesh node +- Use the 'trace_path' tool to diagnose routing through the mesh network +- These are actual network diagnostics, not just text responses ## Examples of Good Formatting diff --git a/src/meshbot/meshcore_interface.py b/src/meshbot/meshcore_interface.py index 7205854..5b14536 100644 --- a/src/meshbot/meshcore_interface.py +++ b/src/meshbot/meshcore_interface.py @@ -30,6 +30,7 @@ class MeshCoreMessage: timestamp: float message_type: str = "direct" # direct, channel, broadcast channel: Optional[str] = None # Channel ID or name (for channel messages) + path_len: Optional[int] = None # Number of hops the message took (0 = direct) @dataclass @@ -70,6 +71,32 @@ async def ping_node(self, destination: str) -> bool: """Ping a node to check connectivity.""" pass + @abstractmethod + async def send_trace( + self, path: Optional[str] = None, auth_code: Optional[int] = None + ) -> bool: + """Send trace packet for routing diagnostics.""" + pass + + @abstractmethod + async def send_trace_and_wait( + self, + path: Optional[str] = None, + auth_code: Optional[int] = None, + timeout: float = 10.0, + ) -> List[Dict[str, Any]]: + """Send trace packet and wait for responses. + + Args: + path: Optional comma-separated path of node IDs + auth_code: Optional authentication code + timeout: Maximum time to wait for responses in seconds + + Returns: + List of trace response payloads received within timeout + """ + pass + @abstractmethod def is_connected(self) -> bool: """Check if connected to MeshCore device.""" @@ -190,6 +217,44 @@ async def ping_node(self, destination: str) -> bool: # Return True if node exists in contacts return destination in self._contacts + async def send_trace( + self, path: Optional[str] = None, auth_code: Optional[int] = None + ) -> bool: + """Mock send trace packet.""" + if not self._connected: + return False + + logger.info(f"Mock: Sending trace packet (path={path}, auth_code={auth_code})") + await asyncio.sleep(0.1) + return True + + async def send_trace_and_wait( + self, + path: Optional[str] = None, + auth_code: Optional[int] = None, + timeout: float = 10.0, + ) -> List[Dict[str, Any]]: + """Mock send trace and wait for responses.""" + if not self._connected: + return [] + + logger.info( + f"Mock: Sending trace packet and waiting (path={path}, auth_code={auth_code}, timeout={timeout}s)" + ) + + # Simulate sending trace + await asyncio.sleep(0.1) + + # Simulate mock trace responses + responses = [ + {"hop": 0, "node": "local", "latency_ms": 0}, + {"hop": 1, "node": "node1", "latency_ms": 45}, + {"hop": 2, "node": "node2", "latency_ms": 92}, + ] + + await asyncio.sleep(0.5) # Simulate response delay + return responses + async def sync_time(self) -> bool: """Mock sync companion node clock.""" if not self._connected: @@ -278,6 +343,7 @@ def __init__(self, connection_type: ConnectionType, **kwargs): self._own_public_key: Optional[str] = None self._own_node_name: Optional[str] = None self._message_handlers: List[Callable[[MeshCoreMessage], Any]] = [] + self._trace_responses: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() # SQLite storage for network events and node names from .storage import MeshBotStorage @@ -348,6 +414,15 @@ async def connect(self) -> None: self._meshcore.subscribe(EventType.STATUS_RESPONSE, self._on_network_event) logger.info("Subscribed to network events for situational awareness") + # Subscribe to trace responses for diagnostic tools + try: + self._meshcore.subscribe(EventType.TRACE_DATA, self._on_trace_response) + logger.info("Subscribed to trace response events") + except AttributeError: + logger.warning( + "TRACE_DATA event type not available in this meshcore version" + ) + # Sync node names from contacts list (leverages automatic contact discovery) await self._sync_node_names_from_contacts() @@ -525,6 +600,107 @@ async def ping_node(self, destination: str) -> bool: logger.error(f"Failed to send status request: {e}") return False + async def send_trace( + self, path: Optional[str] = None, auth_code: Optional[int] = None + ) -> bool: + """Send trace packet for routing diagnostics. + + Args: + path: Optional comma-separated path of node IDs to trace through + auth_code: Optional authentication code for the trace + + Returns: + True if trace was sent successfully, False otherwise + """ + if not self._connected or not self._meshcore: + return False + + try: + # Build trace command parameters + params = {} + if path is not None: + params["path"] = path + if auth_code is not None: + params["auth_code"] = auth_code + + logger.info(f"Sending trace packet: {params if params else 'no parameters'}") + + # Send trace command + result = await self._meshcore.commands.send_trace(**params) + + if result is not None: + logger.info("Trace packet sent successfully") + return True + else: + logger.warning("Trace command returned None") + return False + + except Exception as e: + logger.error(f"Failed to send trace: {type(e).__name__}: {e}") + return False + + async def send_trace_and_wait( + self, + path: Optional[str] = None, + auth_code: Optional[int] = None, + timeout: float = 10.0, + ) -> List[Dict[str, Any]]: + """Send trace packet and wait for responses. + + Args: + path: Optional comma-separated path of node IDs + auth_code: Optional authentication code + timeout: Maximum time to wait for responses in seconds + + Returns: + List of trace response payloads received within timeout + """ + if not self._connected or not self._meshcore: + return [] + + # Clear any old responses from queue + while not self._trace_responses.empty(): + try: + self._trace_responses.get_nowait() + except asyncio.QueueEmpty: + break + + # Send trace packet + success = await self.send_trace(path=path, auth_code=auth_code) + if not success: + logger.warning("Failed to send trace packet") + return [] + + logger.info(f"Waiting up to {timeout}s for trace responses...") + + # Collect responses until timeout + responses = [] + end_time = asyncio.get_event_loop().time() + timeout + + try: + while True: + time_left = end_time - asyncio.get_event_loop().time() + if time_left <= 0: + break + + try: + # Wait for next response with remaining timeout + response = await asyncio.wait_for( + self._trace_responses.get(), timeout=time_left + ) + responses.append(response) + logger.info(f"Received trace response {len(responses)}: {response}") + + except asyncio.TimeoutError: + # No more responses within timeout + break + + except Exception as e: + logger.error(f"Error collecting trace responses: {e}") + + logger.info(f"Collected {len(responses)} trace response(s)") + return responses + def is_connected(self) -> bool: """Check if real MeshCore is connected.""" return bool(self._connected and self._meshcore and self._meshcore.is_connected) @@ -619,9 +795,10 @@ async def _on_message_received(self, event) -> None: sender_timestamp = payload.get("sender_timestamp", 0) msg_type = payload.get("type", "PRIV") channel = payload.get("channel", "0") # Extract channel ID + path_len = payload.get("path_len") # Number of hops the message took logger.info( - f"Processing message: sender={sender}, content='{content}', type={msg_type}, channel={channel}" + f"Processing message: sender={sender}, content='{content}', type={msg_type}, channel={channel}, path_len={path_len}" ) # Map MeshCore message types to our types @@ -638,6 +815,7 @@ async def _on_message_received(self, event) -> None: ), message_type=message_type, channel=str(channel) if channel is not None else None, + path_len=path_len if path_len is not None else None, ) logger.info(f"Created MeshCoreMessage: {message}") @@ -856,6 +1034,21 @@ def get_recent_network_events(self, limit: int = 10) -> List[str]: logger.error(f"Error getting network events: {e}") return [] + async def _on_trace_response(self, event) -> None: + """Handle trace response events.""" + try: + logger.info(f"Trace response received: {event}") + payload = event.payload if hasattr(event, "payload") else {} + + # Add trace response to queue for waiting methods + await self._trace_responses.put(payload) + + # Log trace response details + logger.info(f"Trace response payload: {payload}") + + except Exception as e: + logger.error(f"Error processing trace response: {e}") + async def _sync_node_names_from_contacts(self) -> None: """Sync node names from the contacts list. diff --git a/src/meshbot/tools/__init__.py b/src/meshbot/tools/__init__.py index 37c862d..2af0bbe 100644 --- a/src/meshbot/tools/__init__.py +++ b/src/meshbot/tools/__init__.py @@ -10,6 +10,7 @@ def register_all_tools(agent: Any) -> None: agent: The Pydantic AI agent to register tools with """ from .fun import register_fun_tools + from .network import register_network_tools from .nodes import register_node_tools from .utility import register_utility_tools from .weather import register_weather_tool @@ -18,6 +19,7 @@ def register_all_tools(agent: Any) -> None: register_node_tools(agent) register_utility_tools(agent) register_fun_tools(agent) + register_network_tools(agent) register_weather_tool(agent) diff --git a/src/meshbot/tools/network.py b/src/meshbot/tools/network.py new file mode 100644 index 0000000..cae0613 --- /dev/null +++ b/src/meshbot/tools/network.py @@ -0,0 +1,99 @@ +"""Network diagnostic tools for mesh network operations.""" + +import logging +from typing import Any, Optional + +from pydantic_ai import RunContext + +from .logging_wrapper import create_logging_tool_decorator + +logger = logging.getLogger(__name__) + + +def register_network_tools(agent: Any) -> None: + """Register network diagnostic tools. + + Args: + agent: The Pydantic AI agent to register tools with + """ + # Create logging tool decorator + tool = create_logging_tool_decorator(agent) + + @tool() + async def ping_node(ctx: RunContext[Any], destination: str) -> str: + """Ping a mesh node to check connectivity and measure latency. + + This sends a status request to the specified node and waits for a response. + Use this to test if a node is reachable and measure round-trip time. + + Args: + destination: Node ID (public key or shortened ID) to ping + + Returns: + Success/failure message with ping results + """ + try: + logger.info(f"Pinging node: {destination}") + + success = await ctx.deps.meshcore.ping_node(destination) + + if success: + return f"✓ Ping successful to {destination[:16]}..." + else: + return f"✗ Ping failed to {destination[:16]}... (node may be offline or unreachable)" + + except Exception as e: + logger.error(f"Error pinging node {destination}: {e}") + return f"Error pinging node: {str(e)[:100]}" + + @tool() + async def trace_path( + ctx: RunContext[Any], + path: Optional[str] = None, + auth_code: Optional[int] = None, + timeout: float = 10.0, + ) -> str: + """Send a trace packet for mesh network routing diagnostics. + + This sends a trace packet through the mesh network to diagnose routing + and network topology. Like traceroute for mesh networks. Waits for responses. + + Args: + path: Optional comma-separated path of node IDs to trace through (e.g., "node1,node2,node3") + auth_code: Optional authentication code for the trace + timeout: Maximum time to wait for responses in seconds (default: 10s) + + Returns: + Trace results showing path and signal information + """ + try: + if path: + logger.info(f"Tracing path: {path}") + else: + logger.info("Sending trace packet (automatic path)") + + # Send trace and wait for responses + responses = await ctx.deps.meshcore.send_trace_and_wait( + path=path, auth_code=auth_code, timeout=timeout + ) + + if not responses: + return f"✗ No trace responses received within {timeout}s\n(Device may be busy, disconnected, or path unreachable)" + + # Format responses + msg = f"✓ Trace complete\n" + msg += "Path:\n" + + for i, response in enumerate(responses): + step_path = response.get("path", []) + for n, step in enumerate(step_path): + hash = step.get("hash") + snr = step.get("snr", "unknown") + if hash: + msg += f"{n + 1}. {hash} (SNR: {snr})\n" + + return msg.rstrip() + + except Exception as e: + logger.error(f"Error sending trace: {e}") + return f"Error sending trace: {str(e)[:100]}"