diff --git a/src/edgewalker/cli/cli.py b/src/edgewalker/cli/cli.py index b3c9280..14e14bd 100644 --- a/src/edgewalker/cli/cli.py +++ b/src/edgewalker/cli/cli.py @@ -41,6 +41,15 @@ print_logo, ) + +def apply_colorblind_theme() -> None: + """Switch the active theme to the colorblind-safe skin.""" + # First Party + from edgewalker import theme as _theme + from edgewalker.core.config import settings + settings.theme = "colorblind" + _theme.load_active_theme() + # ============================================================================ # TYPER APP SETUP # ============================================================================ @@ -168,6 +177,15 @@ def run_guided_scan( "-ao", help="Allow scan to proceed with active configuration overrides.", ), + unprivileged: bool = typer.Option( + False, "--unprivileged", help="Run without sudo using TCP connect scans (macOS/no-root)." + ), + verbose: bool = typer.Option( + False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports." + ), + colorblind: bool = typer.Option( + False, "--colorblind", help="Use colorblind-safe palette (Okabe-Ito) instead of default theme." + ), ) -> None: """Run a guided security scan. @@ -214,10 +232,13 @@ def run_guided_scan( ) raise typer.Exit() + if colorblind: + apply_colorblind_theme() + ensure_telemetry_choice() controller = ScanController() guided = GuidedScanner(controller) - asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds)) + asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds, unprivileged=unprivileged, verbose=verbose)) @app.command() diff --git a/src/edgewalker/cli/controller.py b/src/edgewalker/cli/controller.py index 6e4b649..4c6a8f7 100644 --- a/src/edgewalker/cli/controller.py +++ b/src/edgewalker/cli/controller.py @@ -35,7 +35,7 @@ def __init__(self, scanner_service: Optional[ScannerService] = None) -> None: self.scanner = scanner_service or ScannerService() async def run_port_scan( - self, full: bool = False, target: str = None + self, full: bool = False, target: str = None, unprivileged: bool = False, verbose: bool = False ) -> Optional[PortScanModel]: """Run port scan and display results asynchronously.""" scan_type = "FULL" if full else "QUICK" @@ -50,7 +50,7 @@ async def run_port_scan( try: with utils.console.status(f"[bold green]Scanning {target}..."): - results = await self.scanner.perform_port_scan(target=target, full=full) + results = await self.scanner.perform_port_scan(target=target, full=full, unprivileged=unprivileged, verbose=verbose) except Exception as e: logger.error(f"Scan failed: {str(e)}") logger.error("Make sure nmap is installed and you have sudo privileges.") diff --git a/src/edgewalker/cli/guided.py b/src/edgewalker/cli/guided.py index cbac680..ba2c0bc 100644 --- a/src/edgewalker/cli/guided.py +++ b/src/edgewalker/cli/guided.py @@ -30,6 +30,8 @@ async def automatic_mode( full_scan: Optional[bool] = None, target: Optional[str] = None, full_creds: bool = False, + unprivileged: bool = False, + verbose: bool = False, ) -> None: """Run the automatic guided security assessment asynchronously.""" # Step 1: Choose scan type @@ -49,7 +51,7 @@ async def automatic_mode( # Step 3: Run port scan utils.console.print() logger.info(f"Starting {scan_type.lower()} port scan on {target}...") - port_results = await self.controller.run_port_scan(full=full_scan, target=target) + port_results = await self.controller.run_port_scan(full=full_scan, target=target, unprivileged=unprivileged, verbose=verbose) if not port_results: logger.error("Port scan failed. Returning to mode selection.") diff --git a/src/edgewalker/core/config.py b/src/edgewalker/core/config.py index a7904c1..84122cd 100644 --- a/src/edgewalker/core/config.py +++ b/src/edgewalker/core/config.py @@ -38,6 +38,16 @@ def get_cache_dir() -> Path: return Path(os.environ.get("EW_CACHE_DIR", user_cache_dir("edgewalker"))) +def get_data_dir() -> Path: + """Return the user-facing data directory for scan results. + + Defaults to ~/.edgewalker so results are easy to find and separate + from the application config in Library/Application Support (macOS) + or ~/.config (Linux). Override with EW_DATA_DIR. + """ + return Path(os.environ.get("EW_DATA_DIR", Path.home() / ".edgewalker")) + + # Legacy constants for backward compatibility (evaluated at load time) # Note: Tests should use EW_CONFIG_DIR/EW_CACHE_DIR env vars BEFORE importing this module # or we need to ensure they are used dynamically. @@ -128,7 +138,7 @@ def validate_urls(cls, v: str, info: ValidationInfo) -> str: def handle_demo_mode(cls, v: Path) -> Path: """Ensure output_dir points to demo_scans when in demo mode.""" if os.environ.get("EW_DEMO_MODE") == "1": - return v.parent / "demo_scans" + return get_data_dir() / "demo_scans" return v api_timeout: int = Field( @@ -287,11 +297,11 @@ def handle_demo_mode(cls, v: Path) -> Path: ) output_dir: Path = Field( default_factory=lambda: ( - get_config_dir() / "demo_scans" + get_data_dir() / "demo_scans" if os.environ.get("EW_DEMO_MODE") == "1" - else get_config_dir() / "scans" + else get_data_dir() / "scans" ), - description="Output directory", + description="Output directory for scan results (~/.edgewalker/scans by default)", ) creds_file: Path = Field( default=Path(__file__).parent.parent / "data" / "creds.csv", diff --git a/src/edgewalker/core/scanner_service.py b/src/edgewalker/core/scanner_service.py index 350337f..b05c849 100644 --- a/src/edgewalker/core/scanner_service.py +++ b/src/edgewalker/core/scanner_service.py @@ -81,7 +81,7 @@ def submit_scan_data(self, module: str, data: dict) -> None: # No loop running, use sync version self.telemetry.submit_scan_data_sync(module, data) - async def perform_port_scan(self, target: str, full: bool = False) -> PortScanModel: + async def perform_port_scan(self, target: str, full: bool = False, unprivileged: bool = False, verbose: bool = False) -> PortScanModel: """Perform a port scan and return results as a model asynchronously.""" if self.demo_mode and self.demo_service: return await self.demo_service.perform_port_scan(target, full) @@ -94,11 +94,11 @@ async def perform_port_scan(self, target: str, full: bool = False) -> PortScanMo if full: results = await port_scan.full_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) else: results = await port_scan.quick_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) if isinstance(results, dict): diff --git a/src/edgewalker/modules/cve_scan/scanner.py b/src/edgewalker/modules/cve_scan/scanner.py index 9370cf2..1082eb3 100644 --- a/src/edgewalker/modules/cve_scan/scanner.py +++ b/src/edgewalker/modules/cve_scan/scanner.py @@ -53,7 +53,7 @@ async def search_cves_async( response = await client.get( settings.nvd_api_url, params=params, headers=headers, timeout=30 ) - if response.status_code == 403: + if response.status_code == 429: # Rate limit hit, wait and retry once await asyncio.sleep(settings.nvd_rate_limit_delay * 2) response = await client.get( @@ -266,41 +266,6 @@ async def _scan_service( version=svc["version"], cves=cves, ) - """Scan a single service for CVEs asynchronously.""" - if self.progress_callback: - self.progress_callback( - "cve_check", - f"Checking {svc['product']} {svc['version']} on {svc['ip']}:{svc['port']}", - ) - - cve_dicts = await search_cves_async( - client, svc["product"], svc["version"], self.verbose, semaphore - ) - - cves = [ - CveModel( - id=c["id"], - description=c.get("description", ""), - severity=c["severity"], - score=c["score"], - ) - for c in cve_dicts - ] - - if cves and self.progress_callback: - self.progress_callback( - "cve_found", - f"{len(cves)} CVE(s) found for {svc['product']} {svc['version']}", - ) - - return CveScanResultModel( - ip=svc["ip"], - port=svc["port"], - service=svc["service"], - product=svc["product"], - version=svc["version"], - cves=cves, - ) def _build_empty_model(self, skipped_no_version: int) -> CveScanModel: return CveScanModel( diff --git a/src/edgewalker/modules/password_scan/scanner.py b/src/edgewalker/modules/password_scan/scanner.py index 28f7ee3..083bcf5 100644 --- a/src/edgewalker/modules/password_scan/scanner.py +++ b/src/edgewalker/modules/password_scan/scanner.py @@ -161,6 +161,7 @@ async def scan(self) -> PasswordScanResultModel: found_cred = None login_status = StatusEnum.failed + i = -1 for i, (user, pw) in enumerate(creds): if self.rich_progress: @@ -661,7 +662,7 @@ async def scan_host( ) -> dict: """Backward compatible scan_host function asynchronously.""" scanner = PasswordScanner(target, top_n, verbose, progress_callback) - results = await scanner.scan_host(host, ports) + results = await scanner.scan_host(host, "", ports) out = {"host": host, "services": {}} for r in results: status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure" diff --git a/src/edgewalker/modules/port_scan/scanner.py b/src/edgewalker/modules/port_scan/scanner.py index e7890fe..d911318 100644 --- a/src/edgewalker/modules/port_scan/scanner.py +++ b/src/edgewalker/modules/port_scan/scanner.py @@ -126,8 +126,11 @@ def fix_nmap_permissions() -> bool: return False -def check_privileges() -> str | None: +def check_privileges(unprivileged: bool = False) -> str | None: """Check for root/sudo privileges or nmap capabilities.""" + if unprivileged: + return None + if check_nmap_permissions(): return None @@ -140,8 +143,11 @@ def check_privileges() -> str | None: return "Port scanning requires root privileges on this OS. Please run with sudo." -def get_nmap_command() -> list[str]: +def get_nmap_command(unprivileged: bool = False) -> list[str]: """Return the base nmap command with sudo if necessary.""" + if unprivileged: + return ["nmap"] + if check_nmap_permissions(): return ["nmap"] @@ -245,6 +251,7 @@ async def _scan_batch( batch_label: str = "", progress_callback: Callable[[str, str], None] | None = None, rich_progress: Optional[tuple[utils.Progress, utils.TaskID]] = None, + unprivileged: bool = False, ) -> tuple[str, set[str]]: """Run one nmap subprocess on a batch of hosts asynchronously.""" if not hosts: @@ -254,7 +261,8 @@ async def _scan_batch( xml_path = xml_fd.name xml_fd.close() - cmd = get_nmap_command() + extra_flags + flags = (["--unprivileged"] + extra_flags) if unprivileged else extra_flags + cmd = get_nmap_command(unprivileged=unprivileged) + flags if ports: cmd += ["-p", ports] cmd += ["-oX", xml_path, "-v", "--stats-every", "10s", "--open"] @@ -349,6 +357,7 @@ async def _parallel_scan( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run parallel scans across hosts asynchronously.""" if not live_hosts: @@ -368,6 +377,7 @@ async def _parallel_scan( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -396,6 +406,7 @@ async def _parallel_scan( label, progress_callback, rich_progress, + unprivileged, ) ) @@ -422,6 +433,7 @@ async def _probe_services( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run service/OS probes per host asynchronously.""" if not host_ports: @@ -446,6 +458,7 @@ async def _probe_services( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -473,6 +486,7 @@ async def _probe_services( ip, progress_callback, rich_progress, + unprivileged, ) ) @@ -507,6 +521,7 @@ def __init__( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> None: """Initialize the PortScanner. @@ -514,10 +529,12 @@ def __init__( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. """ self.target = target or get_default_target() self.verbose = verbose self.progress_callback = progress_callback + self.unprivileged = unprivileged async def scan(self, **kwargs: object) -> PortScanModel: """Execute the scan asynchronously (ScanModule interface).""" @@ -531,7 +548,7 @@ async def quick_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -541,7 +558,7 @@ async def quick_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -575,6 +592,7 @@ async def quick_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: all_xml, all_hosts_found = await _parallel_scan( @@ -584,6 +602,7 @@ async def quick_scan(self) -> PortScanModel: settings.nmap_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except FileNotFoundError: return PortScanModel( @@ -628,7 +647,7 @@ async def full_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -637,7 +656,7 @@ async def full_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -671,6 +690,7 @@ async def full_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: disc_xml, _ = await _parallel_scan( @@ -680,6 +700,7 @@ async def full_scan(self) -> PortScanModel: settings.nmap_full_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except Exception: disc_xml = [] @@ -721,10 +742,10 @@ async def full_scan(self) -> PortScanModel: if self.verbose: with utils.get_progress() as progress: probe_xml, _ = await _probe_services( - host_ports, self.verbose, self.progress_callback, progress + host_ports, self.verbose, self.progress_callback, progress, unprivileged=self.unprivileged ) else: - probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback) + probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback, unprivileged=self.unprivileged) hosts_by_ip = {} for xml_data in probe_xml: for host in parse_nmap_xml(xml_data): @@ -753,6 +774,7 @@ async def scan( full: bool = False, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a port scan asynchronously. @@ -761,11 +783,12 @@ async def scan( full: Whether to perform a full scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - scanner = PortScanner(target, verbose, progress_callback) + scanner = PortScanner(target, verbose, progress_callback, unprivileged=unprivileged) return await scanner.full_scan() if full else await scanner.quick_scan() @@ -773,6 +796,7 @@ async def quick_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a quick scan of common IoT ports asynchronously. @@ -780,17 +804,19 @@ async def quick_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).quick_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).quick_scan() async def full_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a full scan of all ports with OS detection asynchronously. @@ -798,17 +824,19 @@ async def full_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).full_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).full_scan() async def ping_sweep( target: str, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> list[str]: """Quick ping sweep to find live hosts asynchronously. @@ -816,6 +844,7 @@ async def ping_sweep( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: List of live host IP addresses. @@ -830,7 +859,8 @@ async def ping_sweep( if err: raise ValueError(err) - cmd = get_nmap_command() + ["-sn", "-T4", target] + flags = ["--unprivileged"] if unprivileged else [] + cmd = get_nmap_command(unprivileged=unprivileged) + flags + ["-sn", "-T4", target] live_hosts = [] try: process = await asyncio.create_subprocess_exec( diff --git a/src/edgewalker/skins/colorblind.yaml b/src/edgewalker/skins/colorblind.yaml new file mode 100644 index 0000000..964af73 --- /dev/null +++ b/src/edgewalker/skins/colorblind.yaml @@ -0,0 +1,53 @@ +metadata: + name: "Colorblind Safe (Okabe-Ito)" + author: "Periphery" + +# Palette: Okabe & Ito (2008) — universal colorblind-safe scientific standard. +# Works for deuteranopia, protanopia, and tritanopia. +# +# Sky Blue #56B4E9 — primary accent, borders, headers +# Orange #E69F00 — warnings +# Vermillion #D55E00 — danger / critical (distinct from orange at all types) +# Blue #0072B2 — secondary / active elements +# Bluish Grn #009E73 — success (readable by red-green blind as distinct from orange) +# Yellow #F0E442 — highlight +# Foreground #E8E8E8 — high-contrast white-ish text on dark bg + +theme: + primary: "#0072B2" + secondary: "#56B4E9" + warning: "#E69F00" + error: "#D55E00" + success: "#009E73" + accent: "#56B4E9" + foreground: "#E8E8E8" + background: "#0f0f1a" + surface: "#1a1a2e" + panel: "#16213e" + boost: "#1e1e3a" + dark: true + lavender: "#56B4E9" + highlight: "#F0E442" + variables: + muted: "#888888" + text: "#E8E8E8" + danger: "#D55E00" + surface-alt: "#16213e" + +icons: + scan: "⌕" + check: "[OK]" + fail: "[X]" + bullet: "•" + arrow: "->" + warn: "[!]" + skull: "[!]" + vulnerable: "[X]" + circle: "o" + circle_filled: "*" + step: ">" + bar_full: "█" + bar_empty: "░" + plus: "[+]" + info: "[*]" + alert: "[!]" diff --git a/tests/test_cli.py b/tests/test_cli.py index 6581301..75aa7c8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -68,7 +68,7 @@ async def test_run_port_scan(mock_submit, mock_save, mock_input, mock_quick, moc mock_quick.return_value = mock_results res = await cli.ScanController().run_port_scan(full=False) assert res.model_dump(mode="json") == mock_results.model_dump(mode="json") - mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None) + mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False) @pytest.mark.asyncio @@ -155,7 +155,7 @@ def test_typer_scan(mock_guided_cls, mock_logo): result = runner.invoke(app, ["scan", "--target", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=False + full_scan=False, target="1.1.1.1", full_creds=False, unprivileged=False, verbose=False ) @@ -166,7 +166,7 @@ def test_typer_scan_full(mock_guided_cls): result = runner.invoke(app, ["scan", "--full", "-t", "1.1.1.1", "--full-creds"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=True, target="1.1.1.1", full_creds=True + full_scan=True, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -177,7 +177,7 @@ def test_typer_scan_full_creds(mock_guided_cls): result = runner.invoke(app, ["scan", "--full-creds", "-t", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=True + full_scan=False, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -476,6 +476,42 @@ def test_prompt_next_scan_suggest_cve(mock_run, mock_input, mock_status): assert mock_run.called +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_colorblind_flag(mock_telemetry, mock_auto): + """--colorblind flag is accepted and applies the colorblind theme.""" + from edgewalker import theme as t + original_accent = t.ACCENT + result = runner.invoke(app, ["scan", "--colorblind", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + # restore default theme so other tests aren't affected + from edgewalker.core.config import settings + settings.theme = "periphery" + t.load_active_theme() + + +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_verbose_flag(mock_telemetry, mock_auto): + """--verbose flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--verbose", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("verbose") is True + + +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_unprivileged_flag(mock_telemetry, mock_auto): + """--unprivileged flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--unprivileged", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("unprivileged") is True + + @patch("edgewalker.utils.has_any_results", return_value=True) @patch("edgewalker.cli.results.settings") @patch("edgewalker.utils.get_input", side_effect=["1", "", "0"]) diff --git a/tests/test_config.py b/tests/test_config.py index c43f5be..47f4170 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -144,3 +144,26 @@ def test_update_setting_readonly(): with pytest.raises(AttributeError, match="read-only"): update_setting("device_id", "new-id") + + +def test_default_output_dir_is_not_inside_config_dir(tmp_path): + """output_dir default must not nest inside the config/Application Support directory.""" + with patch.dict(os.environ, {"EW_CONFIG_DIR": str(tmp_path)}): + from edgewalker.core.config import Settings, get_config_dir, get_data_dir + s = Settings() + config_dir = get_config_dir() + data_dir = get_data_dir() + # output_dir must live under data_dir, not config_dir + assert str(s.output_dir).startswith(str(data_dir)), ( + f"output_dir ({s.output_dir}) should be under data_dir ({data_dir})" + ) + assert not str(s.output_dir).startswith(str(config_dir)), ( + f"output_dir ({s.output_dir}) must not be inside config_dir ({config_dir})" + ) + + +def test_get_data_dir_respects_env_override(): + """EW_DATA_DIR env var overrides the data directory.""" + from edgewalker.core.config import get_data_dir + with patch.dict(os.environ, {"EW_DATA_DIR": "/tmp/ew_data"}): + assert str(get_data_dir()) == "/tmp/ew_data" diff --git a/tests/test_cve_scan.py b/tests/test_cve_scan.py index d8602cc..52b6efa 100644 --- a/tests/test_cve_scan.py +++ b/tests/test_cve_scan.py @@ -97,15 +97,15 @@ async def test_search_cves_async_real_calls(): @pytest.mark.asyncio async def test_search_cves_async_rate_limit(): - mock_response_403 = MagicMock() - mock_response_403.status_code = 403 + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 mock_response_200 = MagicMock() mock_response_200.status_code = 200 mock_response_200.json.return_value = {"vulnerabilities": []} client = AsyncMock() - client.get.side_effect = [mock_response_403, mock_response_200] + client.get.side_effect = [mock_response_429, mock_response_200] with patch("asyncio.sleep", new_callable=AsyncMock): res = await scanner.search_cves_async(client, "product", "1.0") @@ -217,3 +217,38 @@ async def test_cve_scanner_scan_interface(): await s.scan(hosts=None) mock_scan.assert_called_with([]) + + +@pytest.mark.asyncio +async def test_search_cves_async_rate_limit_uses_429(): + """Bug fix: NVD rate limiting uses HTTP 429, not 403.""" + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 + + mock_response_200 = MagicMock() + mock_response_200.status_code = 200 + mock_response_200.json.return_value = {"vulnerabilities": []} + + client = AsyncMock() + client.get.side_effect = [mock_response_429, mock_response_200] + + with patch("asyncio.sleep", new_callable=AsyncMock): + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 2 + + +@pytest.mark.asyncio +async def test_search_cves_async_403_is_not_retried(): + """Bug fix: 403 Forbidden should not be retried (it's not a rate limit).""" + mock_response_403 = MagicMock() + mock_response_403.status_code = 403 + + client = AsyncMock() + client.get.return_value = mock_response_403 + + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 1 # no retry + mock_sleep.assert_not_called() diff --git a/tests/test_password_scan.py b/tests/test_password_scan.py index b87d897..3d9b397 100644 --- a/tests/test_password_scan.py +++ b/tests/test_password_scan.py @@ -929,3 +929,41 @@ async def test_password_scanner_scan_host_by_name_smb(): ) as mock_scan: await s.scan_host("1.1.1.1", "00:11:22:33:44:55", {"smb": 4455}) mock_scan.assert_called_once() + + +@pytest.mark.asyncio +async def test_async_service_scanner_scan_empty_credentials(): + """Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty.""" + class TestScanner(scanner.AsyncServiceScanner): + def service_name(self): + return "ssh" + + def service_enum(self): + return scanner.ServiceEnum.ssh + + async def attempt_login(self, u, p): + return False, False + + s = TestScanner("1.1.1.1", 22) + with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True): + with patch( + "edgewalker.modules.password_scan.scanner.load_credentials", return_value=[] + ): + res = await s.scan() + # Should complete without UnboundLocalError + assert res.login_attempt == scanner.StatusEnum.failed + + +@pytest.mark.asyncio +async def test_scan_host_backward_compat_passes_mac(): + """Bug fix: backward compat scan_host must pass mac as second positional arg.""" + with patch( + "edgewalker.modules.password_scan.scanner.PasswordScanner.scan_host", new_callable=AsyncMock + ) as mock_scan: + mock_scan.return_value = [] + await scanner.scan_host("1.1.1.1", {"ssh": 22}) + args, kwargs = mock_scan.call_args + # args[0]=host, args[1]=mac, args[2]=ports + assert args[0] == "1.1.1.1" + assert args[1] == "" # mac must be passed explicitly + assert args[2] == {"ssh": 22} diff --git a/tests/test_port_scan.py b/tests/test_port_scan.py index d3725aa..b1dbd56 100644 --- a/tests/test_port_scan.py +++ b/tests/test_port_scan.py @@ -309,3 +309,81 @@ async def test_probe_services(mock_batch): res_xml, res_found = await scanner._probe_services({"1.1.1.1": [80, 443]}) assert len(res_xml) == 1 assert "1.1.1.1" in res_found + + +# --- Unprivileged mode tests --- + + +def test_check_privileges_unprivileged_mode(): + """check_privileges returns None immediately when unprivileged=True.""" + # Even without root, unprivileged mode skips the check + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.check_privileges(unprivileged=True) is None + + +def test_get_nmap_command_unprivileged(): + """get_nmap_command returns ['nmap'] when unprivileged=True, even without root.""" + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.get_nmap_command(unprivileged=True) == ["nmap"] + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +@patch("tempfile.NamedTemporaryFile") +@patch("builtins.open", new_callable=mock_open, read_data="") +@patch("os.unlink") +async def test_scan_batch_adds_unprivileged_flag(mock_unlink, mock_file, mock_temp, mock_exec): + """_scan_batch prepends --unprivileged to the nmap command when unprivileged=True.""" + mock_temp.return_value.name = "test.xml" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b""] + mock_proc.wait.return_value = 0 + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + await scanner._scan_batch(["1.1.1.1"], "80", [], 10, unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +async def test_ping_sweep_unprivileged(mock_exec): + """ping_sweep passes --unprivileged flag to nmap when unprivileged=True.""" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b"Nmap scan report for 1.1.1.1", b""] + mock_proc.wait.return_value = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + res = await scanner.ping_sweep("1.1.1.0/24", unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + assert "1.1.1.1" in res + + +def test_port_scanner_stores_unprivileged(): + """PortScanner stores the unprivileged flag.""" + ps = scanner.PortScanner(unprivileged=True) + assert ps.unprivileged is True + + ps_default = scanner.PortScanner() + assert ps_default.unprivileged is False + + +@pytest.mark.asyncio +@patch("edgewalker.modules.port_scan.scanner.ping_sweep", new_callable=AsyncMock, return_value=[]) +@patch("edgewalker.modules.port_scan.scanner._parallel_scan", new_callable=AsyncMock) +async def test_quick_scan_unprivileged_skips_permission_check(mock_parallel, mock_ping): + """quick_scan with unprivileged=True does not raise PermissionError.""" + mock_parallel.return_value = ([], set()) + # On macOS without root, normal mode would raise PermissionError + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + ps = scanner.PortScanner(target="1.1.1.0/24", unprivileged=True) + result = await ps.quick_scan() + assert result.success is True diff --git a/tests/test_scanner_service.py b/tests/test_scanner_service.py index 874358f..c63a356 100644 --- a/tests/test_scanner_service.py +++ b/tests/test_scanner_service.py @@ -242,3 +242,40 @@ def test_global_submit_scan_data_async(): mock_loop.return_value = mock_loop_instance submit_scan_data("test", {}) mock_loop_instance.create_task.assert_called_once() + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_verbose(scanner_service): + """perform_port_scan passes verbose flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, verbose=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=True, progress_callback=scanner_service.progress_callback, unprivileged=False + ) + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_unprivileged(scanner_service): + """perform_port_scan passes unprivileged flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, unprivileged=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) + + with patch( + "edgewalker.modules.port_scan.full_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_full: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=True, unprivileged=True) + mock_full.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) diff --git a/tests/test_theme.py b/tests/test_theme.py index 777b032..97bfc20 100644 --- a/tests/test_theme.py +++ b/tests/test_theme.py @@ -32,3 +32,38 @@ def test_icons(): assert theme.ICON_PLUS == "[+]" assert theme.ICON_INFO == "[*]" assert theme.ICON_ALERT == "[!]" + + +def test_colorblind_skin_exists_and_has_required_keys(): + """colorblind.yaml skin exists and defines all required color roles.""" + from pathlib import Path + skin_path = Path(__file__).parent.parent / "src/edgewalker/skins/colorblind.yaml" + assert skin_path.exists(), "colorblind.yaml skin file missing" + + import yaml + data = yaml.safe_load(skin_path.read_text()) + theme_section = data.get("theme", {}) + for key in ("primary", "accent", "success", "warning", "error", "foreground"): + assert key in theme_section, f"colorblind skin missing key: {key}" + + +def test_colorblind_skin_loads_via_theme_manager(): + """ThemeManager can load the colorblind skin without errors.""" + from edgewalker.core.theme_manager import ThemeManager + tm = ThemeManager() + data = tm.load_theme("colorblind") + assert data.get("theme", {}).get("accent") is not None + + +def test_colorblind_flag_reloads_theme(): + """Passing colorblind=True to apply_colorblind_theme switches the active theme.""" + from edgewalker import theme as t + from edgewalker.cli.cli import apply_colorblind_theme + original_accent = t.ACCENT + apply_colorblind_theme() + # accent should now be the colorblind skin's value, not the periphery cyan + assert t.ACCENT != "#00FFFF" + # restore + from edgewalker.core.config import settings + settings.theme = "periphery" + t.load_active_theme()