Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion prompts/default.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ MeshCore is a simple text messaging system with some limitations:

## Special Behaviors

When users send 'ping', respond with 'pong'
When users ask you to ping or trace a node:
- Use the 'ping_node' tool to check connectivity to a specific mesh node
- Use the 'trace_path' tool to diagnose routing through the mesh network
- These are actual network diagnostics, not just text responses

## Examples of Good Formatting

Expand Down
195 changes: 194 additions & 1 deletion src/meshbot/meshcore_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class MeshCoreMessage:
timestamp: float
message_type: str = "direct" # direct, channel, broadcast
channel: Optional[str] = None # Channel ID or name (for channel messages)
path_len: Optional[int] = None # Number of hops the message took (0 = direct)


@dataclass
Expand Down Expand Up @@ -70,6 +71,32 @@ async def ping_node(self, destination: str) -> bool:
"""Ping a node to check connectivity."""
pass

@abstractmethod
async def send_trace(
self, path: Optional[str] = None, auth_code: Optional[int] = None
) -> bool:
"""Send trace packet for routing diagnostics."""
pass

@abstractmethod
async def send_trace_and_wait(
self,
path: Optional[str] = None,
auth_code: Optional[int] = None,
timeout: float = 10.0,
) -> List[Dict[str, Any]]:
"""Send trace packet and wait for responses.

Args:
path: Optional comma-separated path of node IDs
auth_code: Optional authentication code
timeout: Maximum time to wait for responses in seconds

Returns:
List of trace response payloads received within timeout
"""
pass

@abstractmethod
def is_connected(self) -> bool:
"""Check if connected to MeshCore device."""
Expand Down Expand Up @@ -190,6 +217,44 @@ async def ping_node(self, destination: str) -> bool:
# Return True if node exists in contacts
return destination in self._contacts

async def send_trace(
self, path: Optional[str] = None, auth_code: Optional[int] = None
) -> bool:
"""Mock send trace packet."""
if not self._connected:
return False

logger.info(f"Mock: Sending trace packet (path={path}, auth_code={auth_code})")
await asyncio.sleep(0.1)
return True

async def send_trace_and_wait(
self,
path: Optional[str] = None,
auth_code: Optional[int] = None,
timeout: float = 10.0,
) -> List[Dict[str, Any]]:
"""Mock send trace and wait for responses."""
if not self._connected:
return []

logger.info(
f"Mock: Sending trace packet and waiting (path={path}, auth_code={auth_code}, timeout={timeout}s)"
)

# Simulate sending trace
await asyncio.sleep(0.1)

# Simulate mock trace responses
responses = [
{"hop": 0, "node": "local", "latency_ms": 0},
{"hop": 1, "node": "node1", "latency_ms": 45},
{"hop": 2, "node": "node2", "latency_ms": 92},
]
Comment on lines +248 to +253
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock trace response format doesn't match what the trace_path tool expects. The tool in network.py lines 88-93 expects responses with a "path" key containing a list of steps with "hash" and "snr" fields, but these mock responses have "hop", "node", and "latency_ms" fields instead. This will cause the trace_path tool to display empty results when used with the mock interface.

Consider updating the mock responses to match the expected format:

responses = [
    {"path": [
        {"hash": "local", "snr": 0},
        {"hash": "node1", "snr": 15.5},
        {"hash": "node2", "snr": 12.3}
    ]}
]
Suggested change
# Simulate mock trace responses
responses = [
{"hop": 0, "node": "local", "latency_ms": 0},
{"hop": 1, "node": "node1", "latency_ms": 45},
{"hop": 2, "node": "node2", "latency_ms": 92},
]
# Simulate mock trace responses in expected format
responses = [
{
"path": [
{"hash": "local", "snr": 0},
{"hash": "node1", "snr": 15.5},
{"hash": "node2", "snr": 12.3},
]
}
]
]

Copilot uses AI. Check for mistakes.

await asyncio.sleep(0.5) # Simulate response delay
return responses

async def sync_time(self) -> bool:
"""Mock sync companion node clock."""
if not self._connected:
Expand Down Expand Up @@ -278,6 +343,7 @@ def __init__(self, connection_type: ConnectionType, **kwargs):
self._own_public_key: Optional[str] = None
self._own_node_name: Optional[str] = None
self._message_handlers: List[Callable[[MeshCoreMessage], Any]] = []
self._trace_responses: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asyncio.Queue is instantiated without calling it as a function. This should be asyncio.Queue() with parentheses to actually create a queue instance. Without the parentheses, _trace_responses will be a reference to the Queue class itself, not an instance, which will cause AttributeErrors when trying to call .put(), .get(), or .empty() methods.

Copilot uses AI. Check for mistakes.

# SQLite storage for network events and node names
from .storage import MeshBotStorage
Expand Down Expand Up @@ -348,6 +414,15 @@ async def connect(self) -> None:
self._meshcore.subscribe(EventType.STATUS_RESPONSE, self._on_network_event)
logger.info("Subscribed to network events for situational awareness")

# Subscribe to trace responses for diagnostic tools
try:
self._meshcore.subscribe(EventType.TRACE_DATA, self._on_trace_response)
logger.info("Subscribed to trace response events")
except AttributeError:
logger.warning(
"TRACE_DATA event type not available in this meshcore version"
)

# Sync node names from contacts list (leverages automatic contact discovery)
await self._sync_node_names_from_contacts()

Expand Down Expand Up @@ -525,6 +600,107 @@ async def ping_node(self, destination: str) -> bool:
logger.error(f"Failed to send status request: {e}")
return False

async def send_trace(
self, path: Optional[str] = None, auth_code: Optional[int] = None
) -> bool:
"""Send trace packet for routing diagnostics.

Args:
path: Optional comma-separated path of node IDs to trace through
auth_code: Optional authentication code for the trace

Returns:
True if trace was sent successfully, False otherwise
"""
if not self._connected or not self._meshcore:
return False

try:
# Build trace command parameters
params = {}
if path is not None:
params["path"] = path
if auth_code is not None:
params["auth_code"] = auth_code

logger.info(f"Sending trace packet: {params if params else 'no parameters'}")

# Send trace command
result = await self._meshcore.commands.send_trace(**params)

if result is not None:
logger.info("Trace packet sent successfully")
return True
else:
logger.warning("Trace command returned None")
return False

except Exception as e:
logger.error(f"Failed to send trace: {type(e).__name__}: {e}")
return False

async def send_trace_and_wait(
self,
path: Optional[str] = None,
auth_code: Optional[int] = None,
timeout: float = 10.0,
) -> List[Dict[str, Any]]:
"""Send trace packet and wait for responses.

Args:
path: Optional comma-separated path of node IDs
auth_code: Optional authentication code
timeout: Maximum time to wait for responses in seconds

Returns:
List of trace response payloads received within timeout
"""
if not self._connected or not self._meshcore:
return []

# Clear any old responses from queue
while not self._trace_responses.empty():
try:
self._trace_responses.get_nowait()
except asyncio.QueueEmpty:
break

# Send trace packet
success = await self.send_trace(path=path, auth_code=auth_code)
if not success:
logger.warning("Failed to send trace packet")
return []

logger.info(f"Waiting up to {timeout}s for trace responses...")

# Collect responses until timeout
responses = []
end_time = asyncio.get_event_loop().time() + timeout

try:
while True:
time_left = end_time - asyncio.get_event_loop().time()
if time_left <= 0:
break

try:
# Wait for next response with remaining timeout
response = await asyncio.wait_for(
self._trace_responses.get(), timeout=time_left
)
responses.append(response)
logger.info(f"Received trace response {len(responses)}: {response}")

except asyncio.TimeoutError:
# No more responses within timeout
break

except Exception as e:
logger.error(f"Error collecting trace responses: {e}")

logger.info(f"Collected {len(responses)} trace response(s)")
return responses

def is_connected(self) -> bool:
"""Check if real MeshCore is connected."""
return bool(self._connected and self._meshcore and self._meshcore.is_connected)
Expand Down Expand Up @@ -619,9 +795,10 @@ async def _on_message_received(self, event) -> None:
sender_timestamp = payload.get("sender_timestamp", 0)
msg_type = payload.get("type", "PRIV")
channel = payload.get("channel", "0") # Extract channel ID
path_len = payload.get("path_len") # Number of hops the message took

logger.info(
f"Processing message: sender={sender}, content='{content}', type={msg_type}, channel={channel}"
f"Processing message: sender={sender}, content='{content}', type={msg_type}, channel={channel}, path_len={path_len}"
)

# Map MeshCore message types to our types
Expand All @@ -638,6 +815,7 @@ async def _on_message_received(self, event) -> None:
),
message_type=message_type,
channel=str(channel) if channel is not None else None,
path_len=path_len if path_len is not None else None,
)

logger.info(f"Created MeshCoreMessage: {message}")
Expand Down Expand Up @@ -856,6 +1034,21 @@ def get_recent_network_events(self, limit: int = 10) -> List[str]:
logger.error(f"Error getting network events: {e}")
return []

async def _on_trace_response(self, event) -> None:
"""Handle trace response events."""
try:
logger.info(f"Trace response received: {event}")
payload = event.payload if hasattr(event, "payload") else {}

# Add trace response to queue for waiting methods
await self._trace_responses.put(payload)

# Log trace response details
logger.info(f"Trace response payload: {payload}")

except Exception as e:
logger.error(f"Error processing trace response: {e}")

async def _sync_node_names_from_contacts(self) -> None:
"""Sync node names from the contacts list.

Expand Down
2 changes: 2 additions & 0 deletions src/meshbot/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def register_all_tools(agent: Any) -> None:
agent: The Pydantic AI agent to register tools with
"""
from .fun import register_fun_tools
from .network import register_network_tools
from .nodes import register_node_tools
from .utility import register_utility_tools
from .weather import register_weather_tool
Expand All @@ -18,6 +19,7 @@ def register_all_tools(agent: Any) -> None:
register_node_tools(agent)
register_utility_tools(agent)
register_fun_tools(agent)
register_network_tools(agent)
register_weather_tool(agent)


Expand Down
99 changes: 99 additions & 0 deletions src/meshbot/tools/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Network diagnostic tools for mesh network operations."""

import logging
from typing import Any, Optional

from pydantic_ai import RunContext

from .logging_wrapper import create_logging_tool_decorator

logger = logging.getLogger(__name__)


def register_network_tools(agent: Any) -> None:
"""Register network diagnostic tools.

Args:
agent: The Pydantic AI agent to register tools with
"""
# Create logging tool decorator
tool = create_logging_tool_decorator(agent)

@tool()
async def ping_node(ctx: RunContext[Any], destination: str) -> str:
"""Ping a mesh node to check connectivity and measure latency.

This sends a status request to the specified node and waits for a response.
Use this to test if a node is reachable and measure round-trip time.
Comment on lines +24 to +27
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states "check connectivity and measure latency" and "measure round-trip time", but the function only returns a boolean success/failure and doesn't actually measure or report any latency or round-trip time information. Consider either:

  1. Updating the docstring to remove mentions of latency measurement: "Ping a mesh node to check connectivity."
  2. Enhancing the function to actually measure and return latency information if that's a desired feature.
Suggested change
"""Ping a mesh node to check connectivity and measure latency.
This sends a status request to the specified node and waits for a response.
Use this to test if a node is reachable and measure round-trip time.
"""Ping a mesh node to check connectivity.
This sends a status request to the specified node and waits for a response.
Use this to test if a node is reachable.

Copilot uses AI. Check for mistakes.

Args:
destination: Node ID (public key or shortened ID) to ping

Returns:
Success/failure message with ping results
"""
try:
logger.info(f"Pinging node: {destination}")

success = await ctx.deps.meshcore.ping_node(destination)

if success:
return f"✓ Ping successful to {destination[:16]}..."
else:
return f"✗ Ping failed to {destination[:16]}... (node may be offline or unreachable)"

except Exception as e:
logger.error(f"Error pinging node {destination}: {e}")
return f"Error pinging node: {str(e)[:100]}"

@tool()
async def trace_path(
ctx: RunContext[Any],
path: Optional[str] = None,
auth_code: Optional[int] = None,
timeout: float = 10.0,
) -> str:
"""Send a trace packet for mesh network routing diagnostics.

This sends a trace packet through the mesh network to diagnose routing
and network topology. Like traceroute for mesh networks. Waits for responses.

Args:
path: Optional comma-separated path of node IDs to trace through (e.g., "node1,node2,node3")
auth_code: Optional authentication code for the trace
timeout: Maximum time to wait for responses in seconds (default: 10s)

Returns:
Trace results showing path and signal information
"""
try:
if path:
logger.info(f"Tracing path: {path}")
else:
logger.info("Sending trace packet (automatic path)")

# Send trace and wait for responses
responses = await ctx.deps.meshcore.send_trace_and_wait(
path=path, auth_code=auth_code, timeout=timeout
)

if not responses:
return f"✗ No trace responses received within {timeout}s\n(Device may be busy, disconnected, or path unreachable)"

# Format responses
msg = f"✓ Trace complete\n"
msg += "Path:\n"

for i, response in enumerate(responses):
step_path = response.get("path", [])
for n, step in enumerate(step_path):
hash = step.get("hash")
snr = step.get("snr", "unknown")
if hash:
msg += f"{n + 1}. {hash} (SNR: {snr})\n"
Comment on lines +87 to +93
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested loop structure processes response.get("path", []) for each response and iterates through the path steps, but the outer loop variable i is never used. If multiple responses each contain their own path, this could result in duplicated or confusing output.

Consider clarifying the logic:

  • If each response represents a separate trace result, label them accordingly (e.g., "Trace result 1:", "Trace result 2:")
  • If only one response is expected, consider simplifying to just process responses[0]
  • If responses represent incremental updates to the same path, clarify this in the comments

Copilot uses AI. Check for mistakes.

return msg.rstrip()

except Exception as e:
logger.error(f"Error sending trace: {e}")
return f"Error sending trace: {str(e)[:100]}"
Loading