From 0949486103b6c421e7e51301c64620bcb962a53b Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:12:22 +0000 Subject: [PATCH 1/4] fix: four bugs in cve_scan and password_scan modules - cve_scan/scanner.py: remove unreachable duplicate code block after return in _scan_service (dead code from a bad copy-paste refactor) - cve_scan/scanner.py: fix NVD rate-limit detection using wrong HTTP status code; NVD returns 429 (Too Many Requests) for rate limits, not 403 (Forbidden). Retrying on 403 was pointless and indicated an auth problem. - password_scan/scanner.py: fix UnboundLocalError when credentials list is empty; variable i was unbound if the for-loop never executed, causing tested_count=i+1 to crash. Initialize i=-1 before the loop. - password_scan/scanner.py: fix backward-compat scan_host() calling scanner.scan_host(host, ports) without the required mac argument, passing ports into the mac parameter and leaving ports missing. --- src/edgewalker/modules/cve_scan/scanner.py | 37 +---------------- .../modules/password_scan/scanner.py | 3 +- tests/test_cve_scan.py | 41 +++++++++++++++++-- tests/test_password_scan.py | 38 +++++++++++++++++ 4 files changed, 79 insertions(+), 40 deletions(-) diff --git a/src/edgewalker/modules/cve_scan/scanner.py b/src/edgewalker/modules/cve_scan/scanner.py index 9370cf2..1082eb3 100644 --- a/src/edgewalker/modules/cve_scan/scanner.py +++ b/src/edgewalker/modules/cve_scan/scanner.py @@ -53,7 +53,7 @@ async def search_cves_async( response = await client.get( settings.nvd_api_url, params=params, headers=headers, timeout=30 ) - if response.status_code == 403: + if response.status_code == 429: # Rate limit hit, wait and retry once await asyncio.sleep(settings.nvd_rate_limit_delay * 2) response = await client.get( @@ -266,41 +266,6 @@ async def _scan_service( version=svc["version"], cves=cves, ) - """Scan a single service for CVEs asynchronously.""" - if self.progress_callback: - self.progress_callback( - "cve_check", - f"Checking {svc['product']} {svc['version']} on {svc['ip']}:{svc['port']}", - ) - - cve_dicts = await search_cves_async( - client, svc["product"], svc["version"], self.verbose, semaphore - ) - - cves = [ - CveModel( - id=c["id"], - description=c.get("description", ""), - severity=c["severity"], - score=c["score"], - ) - for c in cve_dicts - ] - - if cves and self.progress_callback: - self.progress_callback( - "cve_found", - f"{len(cves)} CVE(s) found for {svc['product']} {svc['version']}", - ) - - return CveScanResultModel( - ip=svc["ip"], - port=svc["port"], - service=svc["service"], - product=svc["product"], - version=svc["version"], - cves=cves, - ) def _build_empty_model(self, skipped_no_version: int) -> CveScanModel: return CveScanModel( diff --git a/src/edgewalker/modules/password_scan/scanner.py b/src/edgewalker/modules/password_scan/scanner.py index 28f7ee3..083bcf5 100644 --- a/src/edgewalker/modules/password_scan/scanner.py +++ b/src/edgewalker/modules/password_scan/scanner.py @@ -161,6 +161,7 @@ async def scan(self) -> PasswordScanResultModel: found_cred = None login_status = StatusEnum.failed + i = -1 for i, (user, pw) in enumerate(creds): if self.rich_progress: @@ -661,7 +662,7 @@ async def scan_host( ) -> dict: """Backward compatible scan_host function asynchronously.""" scanner = PasswordScanner(target, top_n, verbose, progress_callback) - results = await scanner.scan_host(host, ports) + results = await scanner.scan_host(host, "", ports) out = {"host": host, "services": {}} for r in results: status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure" diff --git a/tests/test_cve_scan.py b/tests/test_cve_scan.py index d8602cc..52b6efa 100644 --- a/tests/test_cve_scan.py +++ b/tests/test_cve_scan.py @@ -97,15 +97,15 @@ async def test_search_cves_async_real_calls(): @pytest.mark.asyncio async def test_search_cves_async_rate_limit(): - mock_response_403 = MagicMock() - mock_response_403.status_code = 403 + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 mock_response_200 = MagicMock() mock_response_200.status_code = 200 mock_response_200.json.return_value = {"vulnerabilities": []} client = AsyncMock() - client.get.side_effect = [mock_response_403, mock_response_200] + client.get.side_effect = [mock_response_429, mock_response_200] with patch("asyncio.sleep", new_callable=AsyncMock): res = await scanner.search_cves_async(client, "product", "1.0") @@ -217,3 +217,38 @@ async def test_cve_scanner_scan_interface(): await s.scan(hosts=None) mock_scan.assert_called_with([]) + + +@pytest.mark.asyncio +async def test_search_cves_async_rate_limit_uses_429(): + """Bug fix: NVD rate limiting uses HTTP 429, not 403.""" + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 + + mock_response_200 = MagicMock() + mock_response_200.status_code = 200 + mock_response_200.json.return_value = {"vulnerabilities": []} + + client = AsyncMock() + client.get.side_effect = [mock_response_429, mock_response_200] + + with patch("asyncio.sleep", new_callable=AsyncMock): + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 2 + + +@pytest.mark.asyncio +async def test_search_cves_async_403_is_not_retried(): + """Bug fix: 403 Forbidden should not be retried (it's not a rate limit).""" + mock_response_403 = MagicMock() + mock_response_403.status_code = 403 + + client = AsyncMock() + client.get.return_value = mock_response_403 + + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 1 # no retry + mock_sleep.assert_not_called() diff --git a/tests/test_password_scan.py b/tests/test_password_scan.py index b87d897..3d9b397 100644 --- a/tests/test_password_scan.py +++ b/tests/test_password_scan.py @@ -929,3 +929,41 @@ async def test_password_scanner_scan_host_by_name_smb(): ) as mock_scan: await s.scan_host("1.1.1.1", "00:11:22:33:44:55", {"smb": 4455}) mock_scan.assert_called_once() + + +@pytest.mark.asyncio +async def test_async_service_scanner_scan_empty_credentials(): + """Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty.""" + class TestScanner(scanner.AsyncServiceScanner): + def service_name(self): + return "ssh" + + def service_enum(self): + return scanner.ServiceEnum.ssh + + async def attempt_login(self, u, p): + return False, False + + s = TestScanner("1.1.1.1", 22) + with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True): + with patch( + "edgewalker.modules.password_scan.scanner.load_credentials", return_value=[] + ): + res = await s.scan() + # Should complete without UnboundLocalError + assert res.login_attempt == scanner.StatusEnum.failed + + +@pytest.mark.asyncio +async def test_scan_host_backward_compat_passes_mac(): + """Bug fix: backward compat scan_host must pass mac as second positional arg.""" + with patch( + "edgewalker.modules.password_scan.scanner.PasswordScanner.scan_host", new_callable=AsyncMock + ) as mock_scan: + mock_scan.return_value = [] + await scanner.scan_host("1.1.1.1", {"ssh": 22}) + args, kwargs = mock_scan.call_args + # args[0]=host, args[1]=mac, args[2]=ports + assert args[0] == "1.1.1.1" + assert args[1] == "" # mac must be passed explicitly + assert args[2] == {"ssh": 22} From 80f135532d659fdfe7bdba80660b626f0d1762ff Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:35:16 +0000 Subject: [PATCH 2/4] feat: add --unprivileged and --verbose flags to scan command Threads unprivileged=True from CLI through to nmap, bypassing the privilege check and passing --unprivileged to all nmap invocations (ping sweep, port scan, service probe) so users on macOS can run without sudo using TCP connect scans. Also threads verbose=True from CLI through to the scanner so --verbose prints phase-by-phase nmap progress, discovered hosts, and open ports as they are found. --- src/edgewalker/cli/cli.py | 8 ++- src/edgewalker/cli/controller.py | 4 +- src/edgewalker/cli/guided.py | 4 +- src/edgewalker/core/scanner_service.py | 6 +- src/edgewalker/modules/port_scan/scanner.py | 56 +++++++++++---- tests/test_cli.py | 30 ++++++-- tests/test_port_scan.py | 78 +++++++++++++++++++++ tests/test_scanner_service.py | 37 ++++++++++ 8 files changed, 199 insertions(+), 24 deletions(-) diff --git a/src/edgewalker/cli/cli.py b/src/edgewalker/cli/cli.py index b3c9280..3d55586 100644 --- a/src/edgewalker/cli/cli.py +++ b/src/edgewalker/cli/cli.py @@ -168,6 +168,12 @@ def run_guided_scan( "-ao", help="Allow scan to proceed with active configuration overrides.", ), + unprivileged: bool = typer.Option( + False, "--unprivileged", help="Run without sudo using TCP connect scans (macOS/no-root)." + ), + verbose: bool = typer.Option( + False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports." + ), ) -> None: """Run a guided security scan. @@ -217,7 +223,7 @@ def run_guided_scan( ensure_telemetry_choice() controller = ScanController() guided = GuidedScanner(controller) - asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds)) + asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds, unprivileged=unprivileged, verbose=verbose)) @app.command() diff --git a/src/edgewalker/cli/controller.py b/src/edgewalker/cli/controller.py index 6e4b649..4c6a8f7 100644 --- a/src/edgewalker/cli/controller.py +++ b/src/edgewalker/cli/controller.py @@ -35,7 +35,7 @@ def __init__(self, scanner_service: Optional[ScannerService] = None) -> None: self.scanner = scanner_service or ScannerService() async def run_port_scan( - self, full: bool = False, target: str = None + self, full: bool = False, target: str = None, unprivileged: bool = False, verbose: bool = False ) -> Optional[PortScanModel]: """Run port scan and display results asynchronously.""" scan_type = "FULL" if full else "QUICK" @@ -50,7 +50,7 @@ async def run_port_scan( try: with utils.console.status(f"[bold green]Scanning {target}..."): - results = await self.scanner.perform_port_scan(target=target, full=full) + results = await self.scanner.perform_port_scan(target=target, full=full, unprivileged=unprivileged, verbose=verbose) except Exception as e: logger.error(f"Scan failed: {str(e)}") logger.error("Make sure nmap is installed and you have sudo privileges.") diff --git a/src/edgewalker/cli/guided.py b/src/edgewalker/cli/guided.py index cbac680..ba2c0bc 100644 --- a/src/edgewalker/cli/guided.py +++ b/src/edgewalker/cli/guided.py @@ -30,6 +30,8 @@ async def automatic_mode( full_scan: Optional[bool] = None, target: Optional[str] = None, full_creds: bool = False, + unprivileged: bool = False, + verbose: bool = False, ) -> None: """Run the automatic guided security assessment asynchronously.""" # Step 1: Choose scan type @@ -49,7 +51,7 @@ async def automatic_mode( # Step 3: Run port scan utils.console.print() logger.info(f"Starting {scan_type.lower()} port scan on {target}...") - port_results = await self.controller.run_port_scan(full=full_scan, target=target) + port_results = await self.controller.run_port_scan(full=full_scan, target=target, unprivileged=unprivileged, verbose=verbose) if not port_results: logger.error("Port scan failed. Returning to mode selection.") diff --git a/src/edgewalker/core/scanner_service.py b/src/edgewalker/core/scanner_service.py index 350337f..b05c849 100644 --- a/src/edgewalker/core/scanner_service.py +++ b/src/edgewalker/core/scanner_service.py @@ -81,7 +81,7 @@ def submit_scan_data(self, module: str, data: dict) -> None: # No loop running, use sync version self.telemetry.submit_scan_data_sync(module, data) - async def perform_port_scan(self, target: str, full: bool = False) -> PortScanModel: + async def perform_port_scan(self, target: str, full: bool = False, unprivileged: bool = False, verbose: bool = False) -> PortScanModel: """Perform a port scan and return results as a model asynchronously.""" if self.demo_mode and self.demo_service: return await self.demo_service.perform_port_scan(target, full) @@ -94,11 +94,11 @@ async def perform_port_scan(self, target: str, full: bool = False) -> PortScanMo if full: results = await port_scan.full_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) else: results = await port_scan.quick_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) if isinstance(results, dict): diff --git a/src/edgewalker/modules/port_scan/scanner.py b/src/edgewalker/modules/port_scan/scanner.py index e7890fe..d911318 100644 --- a/src/edgewalker/modules/port_scan/scanner.py +++ b/src/edgewalker/modules/port_scan/scanner.py @@ -126,8 +126,11 @@ def fix_nmap_permissions() -> bool: return False -def check_privileges() -> str | None: +def check_privileges(unprivileged: bool = False) -> str | None: """Check for root/sudo privileges or nmap capabilities.""" + if unprivileged: + return None + if check_nmap_permissions(): return None @@ -140,8 +143,11 @@ def check_privileges() -> str | None: return "Port scanning requires root privileges on this OS. Please run with sudo." -def get_nmap_command() -> list[str]: +def get_nmap_command(unprivileged: bool = False) -> list[str]: """Return the base nmap command with sudo if necessary.""" + if unprivileged: + return ["nmap"] + if check_nmap_permissions(): return ["nmap"] @@ -245,6 +251,7 @@ async def _scan_batch( batch_label: str = "", progress_callback: Callable[[str, str], None] | None = None, rich_progress: Optional[tuple[utils.Progress, utils.TaskID]] = None, + unprivileged: bool = False, ) -> tuple[str, set[str]]: """Run one nmap subprocess on a batch of hosts asynchronously.""" if not hosts: @@ -254,7 +261,8 @@ async def _scan_batch( xml_path = xml_fd.name xml_fd.close() - cmd = get_nmap_command() + extra_flags + flags = (["--unprivileged"] + extra_flags) if unprivileged else extra_flags + cmd = get_nmap_command(unprivileged=unprivileged) + flags if ports: cmd += ["-p", ports] cmd += ["-oX", xml_path, "-v", "--stats-every", "10s", "--open"] @@ -349,6 +357,7 @@ async def _parallel_scan( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run parallel scans across hosts asynchronously.""" if not live_hosts: @@ -368,6 +377,7 @@ async def _parallel_scan( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -396,6 +406,7 @@ async def _parallel_scan( label, progress_callback, rich_progress, + unprivileged, ) ) @@ -422,6 +433,7 @@ async def _probe_services( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run service/OS probes per host asynchronously.""" if not host_ports: @@ -446,6 +458,7 @@ async def _probe_services( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -473,6 +486,7 @@ async def _probe_services( ip, progress_callback, rich_progress, + unprivileged, ) ) @@ -507,6 +521,7 @@ def __init__( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> None: """Initialize the PortScanner. @@ -514,10 +529,12 @@ def __init__( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. """ self.target = target or get_default_target() self.verbose = verbose self.progress_callback = progress_callback + self.unprivileged = unprivileged async def scan(self, **kwargs: object) -> PortScanModel: """Execute the scan asynchronously (ScanModule interface).""" @@ -531,7 +548,7 @@ async def quick_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -541,7 +558,7 @@ async def quick_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -575,6 +592,7 @@ async def quick_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: all_xml, all_hosts_found = await _parallel_scan( @@ -584,6 +602,7 @@ async def quick_scan(self) -> PortScanModel: settings.nmap_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except FileNotFoundError: return PortScanModel( @@ -628,7 +647,7 @@ async def full_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -637,7 +656,7 @@ async def full_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -671,6 +690,7 @@ async def full_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: disc_xml, _ = await _parallel_scan( @@ -680,6 +700,7 @@ async def full_scan(self) -> PortScanModel: settings.nmap_full_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except Exception: disc_xml = [] @@ -721,10 +742,10 @@ async def full_scan(self) -> PortScanModel: if self.verbose: with utils.get_progress() as progress: probe_xml, _ = await _probe_services( - host_ports, self.verbose, self.progress_callback, progress + host_ports, self.verbose, self.progress_callback, progress, unprivileged=self.unprivileged ) else: - probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback) + probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback, unprivileged=self.unprivileged) hosts_by_ip = {} for xml_data in probe_xml: for host in parse_nmap_xml(xml_data): @@ -753,6 +774,7 @@ async def scan( full: bool = False, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a port scan asynchronously. @@ -761,11 +783,12 @@ async def scan( full: Whether to perform a full scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - scanner = PortScanner(target, verbose, progress_callback) + scanner = PortScanner(target, verbose, progress_callback, unprivileged=unprivileged) return await scanner.full_scan() if full else await scanner.quick_scan() @@ -773,6 +796,7 @@ async def quick_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a quick scan of common IoT ports asynchronously. @@ -780,17 +804,19 @@ async def quick_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).quick_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).quick_scan() async def full_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a full scan of all ports with OS detection asynchronously. @@ -798,17 +824,19 @@ async def full_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).full_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).full_scan() async def ping_sweep( target: str, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> list[str]: """Quick ping sweep to find live hosts asynchronously. @@ -816,6 +844,7 @@ async def ping_sweep( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: List of live host IP addresses. @@ -830,7 +859,8 @@ async def ping_sweep( if err: raise ValueError(err) - cmd = get_nmap_command() + ["-sn", "-T4", target] + flags = ["--unprivileged"] if unprivileged else [] + cmd = get_nmap_command(unprivileged=unprivileged) + flags + ["-sn", "-T4", target] live_hosts = [] try: process = await asyncio.create_subprocess_exec( diff --git a/tests/test_cli.py b/tests/test_cli.py index 6581301..0cb2368 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -68,7 +68,7 @@ async def test_run_port_scan(mock_submit, mock_save, mock_input, mock_quick, moc mock_quick.return_value = mock_results res = await cli.ScanController().run_port_scan(full=False) assert res.model_dump(mode="json") == mock_results.model_dump(mode="json") - mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None) + mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False) @pytest.mark.asyncio @@ -155,7 +155,7 @@ def test_typer_scan(mock_guided_cls, mock_logo): result = runner.invoke(app, ["scan", "--target", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=False + full_scan=False, target="1.1.1.1", full_creds=False, unprivileged=False, verbose=False ) @@ -166,7 +166,7 @@ def test_typer_scan_full(mock_guided_cls): result = runner.invoke(app, ["scan", "--full", "-t", "1.1.1.1", "--full-creds"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=True, target="1.1.1.1", full_creds=True + full_scan=True, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -177,7 +177,7 @@ def test_typer_scan_full_creds(mock_guided_cls): result = runner.invoke(app, ["scan", "--full-creds", "-t", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=True + full_scan=False, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -476,6 +476,28 @@ def test_prompt_next_scan_suggest_cve(mock_run, mock_input, mock_status): assert mock_run.called +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_verbose_flag(mock_telemetry, mock_auto): + """--verbose flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--verbose", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("verbose") is True + + +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_unprivileged_flag(mock_telemetry, mock_auto): + """--unprivileged flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--unprivileged", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("unprivileged") is True + + @patch("edgewalker.utils.has_any_results", return_value=True) @patch("edgewalker.cli.results.settings") @patch("edgewalker.utils.get_input", side_effect=["1", "", "0"]) diff --git a/tests/test_port_scan.py b/tests/test_port_scan.py index d3725aa..b1dbd56 100644 --- a/tests/test_port_scan.py +++ b/tests/test_port_scan.py @@ -309,3 +309,81 @@ async def test_probe_services(mock_batch): res_xml, res_found = await scanner._probe_services({"1.1.1.1": [80, 443]}) assert len(res_xml) == 1 assert "1.1.1.1" in res_found + + +# --- Unprivileged mode tests --- + + +def test_check_privileges_unprivileged_mode(): + """check_privileges returns None immediately when unprivileged=True.""" + # Even without root, unprivileged mode skips the check + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.check_privileges(unprivileged=True) is None + + +def test_get_nmap_command_unprivileged(): + """get_nmap_command returns ['nmap'] when unprivileged=True, even without root.""" + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.get_nmap_command(unprivileged=True) == ["nmap"] + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +@patch("tempfile.NamedTemporaryFile") +@patch("builtins.open", new_callable=mock_open, read_data="") +@patch("os.unlink") +async def test_scan_batch_adds_unprivileged_flag(mock_unlink, mock_file, mock_temp, mock_exec): + """_scan_batch prepends --unprivileged to the nmap command when unprivileged=True.""" + mock_temp.return_value.name = "test.xml" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b""] + mock_proc.wait.return_value = 0 + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + await scanner._scan_batch(["1.1.1.1"], "80", [], 10, unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +async def test_ping_sweep_unprivileged(mock_exec): + """ping_sweep passes --unprivileged flag to nmap when unprivileged=True.""" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b"Nmap scan report for 1.1.1.1", b""] + mock_proc.wait.return_value = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + res = await scanner.ping_sweep("1.1.1.0/24", unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + assert "1.1.1.1" in res + + +def test_port_scanner_stores_unprivileged(): + """PortScanner stores the unprivileged flag.""" + ps = scanner.PortScanner(unprivileged=True) + assert ps.unprivileged is True + + ps_default = scanner.PortScanner() + assert ps_default.unprivileged is False + + +@pytest.mark.asyncio +@patch("edgewalker.modules.port_scan.scanner.ping_sweep", new_callable=AsyncMock, return_value=[]) +@patch("edgewalker.modules.port_scan.scanner._parallel_scan", new_callable=AsyncMock) +async def test_quick_scan_unprivileged_skips_permission_check(mock_parallel, mock_ping): + """quick_scan with unprivileged=True does not raise PermissionError.""" + mock_parallel.return_value = ([], set()) + # On macOS without root, normal mode would raise PermissionError + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + ps = scanner.PortScanner(target="1.1.1.0/24", unprivileged=True) + result = await ps.quick_scan() + assert result.success is True diff --git a/tests/test_scanner_service.py b/tests/test_scanner_service.py index 874358f..c63a356 100644 --- a/tests/test_scanner_service.py +++ b/tests/test_scanner_service.py @@ -242,3 +242,40 @@ def test_global_submit_scan_data_async(): mock_loop.return_value = mock_loop_instance submit_scan_data("test", {}) mock_loop_instance.create_task.assert_called_once() + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_verbose(scanner_service): + """perform_port_scan passes verbose flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, verbose=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=True, progress_callback=scanner_service.progress_callback, unprivileged=False + ) + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_unprivileged(scanner_service): + """perform_port_scan passes unprivileged flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, unprivileged=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) + + with patch( + "edgewalker.modules.port_scan.full_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_full: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=True, unprivileged=True) + mock_full.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) From 014ba94c3a330f38116694d63e0fb16c2b64160c Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:44:26 +0000 Subject: [PATCH 3/4] feat: add --colorblind flag with Okabe-Ito safe palette MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a colorblind.yaml skin using the Okabe-Ito (2008) palette — the scientific standard for colorblind accessibility, safe for deuteranopia, protanopia, and tritanopia. Sky blue replaces cyan, orange replaces yellow warnings, vermillion replaces red for danger, and all icons fall back to ASCII text forms ([OK], [X], [!]) so meaning is never conveyed by color alone. --colorblind hot-swaps the theme before the scan runs so all output including results, risk grades, and CVE badges renders in the safe palette. --- src/edgewalker/cli/cli.py | 15 ++++++++ src/edgewalker/skins/colorblind.yaml | 53 ++++++++++++++++++++++++++++ tests/test_cli.py | 14 ++++++++ tests/test_theme.py | 35 ++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 src/edgewalker/skins/colorblind.yaml diff --git a/src/edgewalker/cli/cli.py b/src/edgewalker/cli/cli.py index 3d55586..14e14bd 100644 --- a/src/edgewalker/cli/cli.py +++ b/src/edgewalker/cli/cli.py @@ -41,6 +41,15 @@ print_logo, ) + +def apply_colorblind_theme() -> None: + """Switch the active theme to the colorblind-safe skin.""" + # First Party + from edgewalker import theme as _theme + from edgewalker.core.config import settings + settings.theme = "colorblind" + _theme.load_active_theme() + # ============================================================================ # TYPER APP SETUP # ============================================================================ @@ -174,6 +183,9 @@ def run_guided_scan( verbose: bool = typer.Option( False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports." ), + colorblind: bool = typer.Option( + False, "--colorblind", help="Use colorblind-safe palette (Okabe-Ito) instead of default theme." + ), ) -> None: """Run a guided security scan. @@ -220,6 +232,9 @@ def run_guided_scan( ) raise typer.Exit() + if colorblind: + apply_colorblind_theme() + ensure_telemetry_choice() controller = ScanController() guided = GuidedScanner(controller) diff --git a/src/edgewalker/skins/colorblind.yaml b/src/edgewalker/skins/colorblind.yaml new file mode 100644 index 0000000..964af73 --- /dev/null +++ b/src/edgewalker/skins/colorblind.yaml @@ -0,0 +1,53 @@ +metadata: + name: "Colorblind Safe (Okabe-Ito)" + author: "Periphery" + +# Palette: Okabe & Ito (2008) — universal colorblind-safe scientific standard. +# Works for deuteranopia, protanopia, and tritanopia. +# +# Sky Blue #56B4E9 — primary accent, borders, headers +# Orange #E69F00 — warnings +# Vermillion #D55E00 — danger / critical (distinct from orange at all types) +# Blue #0072B2 — secondary / active elements +# Bluish Grn #009E73 — success (readable by red-green blind as distinct from orange) +# Yellow #F0E442 — highlight +# Foreground #E8E8E8 — high-contrast white-ish text on dark bg + +theme: + primary: "#0072B2" + secondary: "#56B4E9" + warning: "#E69F00" + error: "#D55E00" + success: "#009E73" + accent: "#56B4E9" + foreground: "#E8E8E8" + background: "#0f0f1a" + surface: "#1a1a2e" + panel: "#16213e" + boost: "#1e1e3a" + dark: true + lavender: "#56B4E9" + highlight: "#F0E442" + variables: + muted: "#888888" + text: "#E8E8E8" + danger: "#D55E00" + surface-alt: "#16213e" + +icons: + scan: "⌕" + check: "[OK]" + fail: "[X]" + bullet: "•" + arrow: "->" + warn: "[!]" + skull: "[!]" + vulnerable: "[X]" + circle: "o" + circle_filled: "*" + step: ">" + bar_full: "█" + bar_empty: "░" + plus: "[+]" + info: "[*]" + alert: "[!]" diff --git a/tests/test_cli.py b/tests/test_cli.py index 0cb2368..75aa7c8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -476,6 +476,20 @@ def test_prompt_next_scan_suggest_cve(mock_run, mock_input, mock_status): assert mock_run.called +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_colorblind_flag(mock_telemetry, mock_auto): + """--colorblind flag is accepted and applies the colorblind theme.""" + from edgewalker import theme as t + original_accent = t.ACCENT + result = runner.invoke(app, ["scan", "--colorblind", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + # restore default theme so other tests aren't affected + from edgewalker.core.config import settings + settings.theme = "periphery" + t.load_active_theme() + + @patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) @patch("edgewalker.utils.ensure_telemetry_choice") def test_run_guided_scan_verbose_flag(mock_telemetry, mock_auto): diff --git a/tests/test_theme.py b/tests/test_theme.py index 777b032..97bfc20 100644 --- a/tests/test_theme.py +++ b/tests/test_theme.py @@ -32,3 +32,38 @@ def test_icons(): assert theme.ICON_PLUS == "[+]" assert theme.ICON_INFO == "[*]" assert theme.ICON_ALERT == "[!]" + + +def test_colorblind_skin_exists_and_has_required_keys(): + """colorblind.yaml skin exists and defines all required color roles.""" + from pathlib import Path + skin_path = Path(__file__).parent.parent / "src/edgewalker/skins/colorblind.yaml" + assert skin_path.exists(), "colorblind.yaml skin file missing" + + import yaml + data = yaml.safe_load(skin_path.read_text()) + theme_section = data.get("theme", {}) + for key in ("primary", "accent", "success", "warning", "error", "foreground"): + assert key in theme_section, f"colorblind skin missing key: {key}" + + +def test_colorblind_skin_loads_via_theme_manager(): + """ThemeManager can load the colorblind skin without errors.""" + from edgewalker.core.theme_manager import ThemeManager + tm = ThemeManager() + data = tm.load_theme("colorblind") + assert data.get("theme", {}).get("accent") is not None + + +def test_colorblind_flag_reloads_theme(): + """Passing colorblind=True to apply_colorblind_theme switches the active theme.""" + from edgewalker import theme as t + from edgewalker.cli.cli import apply_colorblind_theme + original_accent = t.ACCENT + apply_colorblind_theme() + # accent should now be the colorblind skin's value, not the periphery cyan + assert t.ACCENT != "#00FFFF" + # restore + from edgewalker.core.config import settings + settings.theme = "periphery" + t.load_active_theme() From a73dfc4b7290cf1a66c8f8094da65ca9b7dfcb48 Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:48:30 +0000 Subject: [PATCH 4/4] fix: save scan results to ~/.edgewalker/scans not Application Support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit output_dir was defaulting to get_config_dir()/scans which buried scan results inside ~/Library/Application Support/edgewalker/scans on macOS — a hidden system directory users can't easily find. Now uses a dedicated get_data_dir() (defaulting to ~/.edgewalker) so results land in ~/.edgewalker/scans, visible and accessible from the terminal. Override with EW_DATA_DIR env var. --- src/edgewalker/core/config.py | 18 ++++++++++++++---- tests/test_config.py | 23 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/edgewalker/core/config.py b/src/edgewalker/core/config.py index a7904c1..84122cd 100644 --- a/src/edgewalker/core/config.py +++ b/src/edgewalker/core/config.py @@ -38,6 +38,16 @@ def get_cache_dir() -> Path: return Path(os.environ.get("EW_CACHE_DIR", user_cache_dir("edgewalker"))) +def get_data_dir() -> Path: + """Return the user-facing data directory for scan results. + + Defaults to ~/.edgewalker so results are easy to find and separate + from the application config in Library/Application Support (macOS) + or ~/.config (Linux). Override with EW_DATA_DIR. + """ + return Path(os.environ.get("EW_DATA_DIR", Path.home() / ".edgewalker")) + + # Legacy constants for backward compatibility (evaluated at load time) # Note: Tests should use EW_CONFIG_DIR/EW_CACHE_DIR env vars BEFORE importing this module # or we need to ensure they are used dynamically. @@ -128,7 +138,7 @@ def validate_urls(cls, v: str, info: ValidationInfo) -> str: def handle_demo_mode(cls, v: Path) -> Path: """Ensure output_dir points to demo_scans when in demo mode.""" if os.environ.get("EW_DEMO_MODE") == "1": - return v.parent / "demo_scans" + return get_data_dir() / "demo_scans" return v api_timeout: int = Field( @@ -287,11 +297,11 @@ def handle_demo_mode(cls, v: Path) -> Path: ) output_dir: Path = Field( default_factory=lambda: ( - get_config_dir() / "demo_scans" + get_data_dir() / "demo_scans" if os.environ.get("EW_DEMO_MODE") == "1" - else get_config_dir() / "scans" + else get_data_dir() / "scans" ), - description="Output directory", + description="Output directory for scan results (~/.edgewalker/scans by default)", ) creds_file: Path = Field( default=Path(__file__).parent.parent / "data" / "creds.csv", diff --git a/tests/test_config.py b/tests/test_config.py index c43f5be..47f4170 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -144,3 +144,26 @@ def test_update_setting_readonly(): with pytest.raises(AttributeError, match="read-only"): update_setting("device_id", "new-id") + + +def test_default_output_dir_is_not_inside_config_dir(tmp_path): + """output_dir default must not nest inside the config/Application Support directory.""" + with patch.dict(os.environ, {"EW_CONFIG_DIR": str(tmp_path)}): + from edgewalker.core.config import Settings, get_config_dir, get_data_dir + s = Settings() + config_dir = get_config_dir() + data_dir = get_data_dir() + # output_dir must live under data_dir, not config_dir + assert str(s.output_dir).startswith(str(data_dir)), ( + f"output_dir ({s.output_dir}) should be under data_dir ({data_dir})" + ) + assert not str(s.output_dir).startswith(str(config_dir)), ( + f"output_dir ({s.output_dir}) must not be inside config_dir ({config_dir})" + ) + + +def test_get_data_dir_respects_env_override(): + """EW_DATA_DIR env var overrides the data directory.""" + from edgewalker.core.config import get_data_dir + with patch.dict(os.environ, {"EW_DATA_DIR": "/tmp/ew_data"}): + assert str(get_data_dir()) == "/tmp/ew_data"