Skip to content
Merged

Dev #48

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ __pycache__

# Podman/Docker container storage artifacts
~/.fuzzforge/

# User-specific hub config (generated at runtime)
hub-config.json
238 changes: 106 additions & 132 deletions README.md

Large diffs are not rendered by default.

Binary file removed assets/demopart1.gif
Binary file not shown.
Binary file removed assets/demopart2.gif
Binary file not shown.
41 changes: 36 additions & 5 deletions fuzzforge-mcp/src/fuzzforge_mcp/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,46 @@ async def lifespan(_: FastMCP) -> AsyncGenerator[Settings]:

Typical workflow:
1. Initialize a project with `init_project`
2. Set project assets with `set_project_assets` (optional, only needed once for the source directory)
2. Set project assets with `set_project_assets` — path to the directory containing
target files (firmware images, binaries, source code, etc.)
3. List available hub servers with `list_hub_servers`
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`

Hub workflow:
1. List available hub servers with `list_hub_servers`
2. Discover tools from servers with `discover_hub_tools`
3. Execute hub tools with `execute_hub_tool`
Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools.

File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools

Stateful tools:
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.

Firmware analysis pipeline (when analyzing firmware images):
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
Run steps 3 and 4 outputs feed into a final triage summary.

radare2-mcp agent context (upstream tool — no embedded context):
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
returns an empty result, fall back to `list_symbols` and search for dangerous function names
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
- For string extraction, use `run_command` with `iz` (data section strings).
The `list_all_strings` tool may return garbled output for large binaries.
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
""",
lifespan=lifespan,
)
Expand Down
6 changes: 3 additions & 3 deletions fuzzforge-mcp/src/fuzzforge_mcp/resources/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ async def list_executions() -> list[dict[str, Any]]:

return [
{
"execution_id": exec_id,
"has_results": storage.get_execution_results(project_path, exec_id) is not None,
"execution_id": entry["execution_id"],
"has_results": storage.get_execution_results(project_path, entry["execution_id"]) is not None,
}
for exec_id in execution_ids
for entry in execution_ids
]

except Exception as exception:
Expand Down
80 changes: 76 additions & 4 deletions fuzzforge-mcp/src/fuzzforge_mcp/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from tarfile import open as Archive # noqa: N812
from typing import Any
from uuid import uuid4

logger = logging.getLogger("fuzzforge-mcp")

Expand Down Expand Up @@ -79,13 +81,15 @@ def init_project(self, project_path: Path) -> Path:
storage_path = self._get_project_path(project_path)
storage_path.mkdir(parents=True, exist_ok=True)
(storage_path / "runs").mkdir(parents=True, exist_ok=True)
(storage_path / "output").mkdir(parents=True, exist_ok=True)

# Create .gitignore to avoid committing large files
gitignore_path = storage_path / ".gitignore"
if not gitignore_path.exists():
gitignore_path.write_text(
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"output/\n"
"!config.json\n"
)

Expand Down Expand Up @@ -141,17 +145,85 @@ def set_project_assets(self, project_path: Path, assets_path: Path) -> Path:
logger.info("Set project assets: %s -> %s", project_path.name, assets_path)
return assets_path

def list_executions(self, project_path: Path) -> list[str]:
"""List all execution IDs for a project.
def get_project_output_path(self, project_path: Path) -> Path | None:
"""Get the output directory path for a project.

Returns the path to the writable output directory that is mounted
into hub tool containers at /app/output.

:param project_path: Path to the project directory.
:returns: Path to output directory, or None if project not initialized.

"""
output_path = self._get_project_path(project_path) / "output"
if output_path.exists():
return output_path
return None

def record_execution(
self,
project_path: Path,
server_name: str,
tool_name: str,
arguments: dict[str, Any],
result: dict[str, Any],
) -> str:
"""Record an execution result to the project's runs directory.

:param project_path: Path to the project directory.
:returns: List of execution IDs.
:param server_name: Hub server name.
:param tool_name: Tool name that was executed.
:param arguments: Arguments passed to the tool.
:param result: Execution result dictionary.
:returns: Execution ID.

"""
execution_id = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:8]}"
run_dir = self._get_project_path(project_path) / "runs" / execution_id
run_dir.mkdir(parents=True, exist_ok=True)

metadata = {
"execution_id": execution_id,
"timestamp": datetime.now(tz=UTC).isoformat(),
Comment on lines +181 to +187
"server": server_name,
"tool": tool_name,
"arguments": arguments,
"success": result.get("success", False),
"result": result,
Comment on lines +185 to +192
}
(run_dir / "metadata.json").write_text(json.dumps(metadata, indent=2, default=str))

Comment on lines +194 to +195
logger.info("Recorded execution %s: %s:%s", execution_id, server_name, tool_name)
return execution_id

def list_executions(self, project_path: Path) -> list[dict[str, Any]]:
"""List all executions for a project with summary metadata.

:param project_path: Path to the project directory.
:returns: List of execution summaries (id, timestamp, server, tool, success).

"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
return [d.name for d in runs_dir.iterdir() if d.is_dir()]

executions: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir(), reverse=True):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
executions.append({
"execution_id": meta.get("execution_id", run_dir.name),
"timestamp": meta.get("timestamp"),
"server": meta.get("server"),
"tool": meta.get("tool"),
"success": meta.get("success"),
})
Comment on lines +215 to +223
else:
executions.append({"execution_id": run_dir.name})
return executions

def get_execution_results(
self,
Expand Down
92 changes: 87 additions & 5 deletions fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,41 @@

mcp: FastMCP = FastMCP()

# Name of the convention tool that hub servers can implement to provide
# rich usage context for AI agents (known issues, workflow tips, rules, etc.).
_AGENT_CONTEXT_TOOL = "get_agent_context"

# Global hub executor instance (lazy initialization)
_hub_executor: HubExecutor | None = None


async def _fetch_agent_context(
executor: HubExecutor,
server_name: str,
tools: list[Any],
) -> str | None:
"""Call get_agent_context if the server provides it.

Returns the context string, or None if the server doesn't implement
the convention or the call fails.
"""
if not any(t.name == _AGENT_CONTEXT_TOOL for t in tools):
return None
try:
result = await executor.execute_tool(
identifier=f"hub:{server_name}:{_AGENT_CONTEXT_TOOL}",
arguments={},
)
if result.success and result.result:
content = result.result.get("content", [])
if content and isinstance(content, list):
text: str = content[0].get("text", "")
return text
Comment on lines +50 to +52
except Exception: # noqa: BLE001, S110 - best-effort context fetch
pass
return None


def _get_hub_executor() -> HubExecutor:
"""Get or create the hub executor instance.

Expand All @@ -50,19 +81,25 @@ def _get_hub_executor() -> HubExecutor:


@mcp.tool
async def list_hub_servers() -> dict[str, Any]:
async def list_hub_servers(category: str | None = None) -> dict[str, Any]:
"""List all registered MCP hub servers.

Returns information about configured hub servers, including
their connection type, status, and discovered tool count.

:param category: Optional category to filter by (e.g. "binary-analysis",
"web-security", "reconnaissance"). Only servers in this category
are returned.
:return: Dictionary with list of hub servers.

"""
try:
executor = _get_hub_executor()
servers = executor.list_servers()

if category:
servers = [s for s in servers if s.get("category") == category]

return {
"servers": servers,
"count": len(servers),
Expand Down Expand Up @@ -93,7 +130,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:

if server_name:
tools = await executor.discover_server_tools(server_name)
return {

# Convention: auto-fetch agent context if server provides it.
agent_context = await _fetch_agent_context(executor, server_name, tools)

# Hide the convention tool from the agent's tool list.
visible_tools = [t for t in tools if t.name != "get_agent_context"]

Comment on lines +137 to +139
result: dict[str, Any] = {
"server": server_name,
"tools": [
{
Expand All @@ -102,15 +146,24 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"description": t.description,
"parameters": [p.model_dump() for p in t.parameters],
}
for t in tools
for t in visible_tools
],
"count": len(tools),
"count": len(visible_tools),
}
if agent_context:
result["agent_context"] = agent_context
return result
else:
results = await executor.discover_all_tools()
all_tools = []
contexts: dict[str, str] = {}
for server, tools in results.items():
ctx = await _fetch_agent_context(executor, server, tools)
if ctx:
contexts[server] = ctx
Comment on lines 158 to +163
for tool in tools:
if tool.name == "get_agent_context":
continue
all_tools.append({
"identifier": tool.identifier,
"name": tool.name,
Expand All @@ -119,11 +172,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"parameters": [p.model_dump() for p in tool.parameters],
})

return {
result = {
"servers_discovered": len(results),
"tools": all_tools,
"count": len(all_tools),
}
if contexts:
result["agent_contexts"] = contexts
return result

except Exception as e:
if isinstance(e, ToolError):
Expand Down Expand Up @@ -183,6 +239,11 @@ async def execute_hub_tool(
Always use /app/uploads/<filename> or /app/samples/<filename> when
passing file paths to hub tools — do NOT use the host path.

Tool outputs are persisted to a writable shared volume:
- /app/output/ (writable — extraction results, reports, etc.)
Files written here survive container destruction and are available
to subsequent tool calls. The host path is .fuzzforge/output/.

"""
try:
executor = _get_hub_executor()
Expand All @@ -191,6 +252,7 @@ async def execute_hub_tool(
# Mounts the assets directory at the standard paths used by hub tools:
# /app/uploads — binwalk, and other tools that use UPLOAD_DIR
# /app/samples — yara, capa, and other tools that use SAMPLES_DIR
# /app/output — writable volume for tool outputs (persists across calls)
extra_volumes: list[str] = []
try:
storage = get_storage()
Expand All @@ -202,6 +264,9 @@ async def execute_hub_tool(
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
Comment on lines +267 to +269
except Exception: # noqa: BLE001 - never block tool execution due to asset injection failure
extra_volumes = []

Expand All @@ -212,6 +277,20 @@ async def execute_hub_tool(
extra_volumes=extra_volumes or None,
)

# Record execution history for list_executions / get_execution_results.
try:
storage = get_storage()
project_path = get_project_path()
storage.record_execution(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
arguments=arguments or {},
result=result.to_dict(),
)
Comment on lines +284 to +290
except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues
pass

return result.to_dict()

except Exception as e:
Expand Down Expand Up @@ -372,6 +451,9 @@ async def start_hub_server(server_name: str) -> dict[str, Any]:
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
except Exception: # noqa: BLE001 - never block server start due to asset injection failure
extra_volumes = []

Expand Down
Loading
Loading