diff --git a/src/edgewalker/cli/cli.py b/src/edgewalker/cli/cli.py index 7a4549b..11a2a35 100644 --- a/src/edgewalker/cli/cli.py +++ b/src/edgewalker/cli/cli.py @@ -168,6 +168,14 @@ def run_guided_scan( "-ao", help="Allow scan to proceed with active configuration overrides.", ), + unprivileged: bool = typer.Option( + settings.unprivileged, + "--unprivileged", + help="Run without sudo using TCP connect scans (macOS/no-root).", + ), + verbose: bool = typer.Option( + False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports." + ), ) -> None: """Run a guided security scan. @@ -224,7 +232,15 @@ def run_guided_scan( ensure_telemetry_choice() controller = ScanController() guided = GuidedScanner(controller) - asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds)) + asyncio.run( + guided.automatic_mode( + full_scan=full, + target=target, + full_creds=full_creds, + unprivileged=unprivileged, + verbose=verbose, + ) + ) @app.command() @@ -249,8 +265,16 @@ def clear() -> None: @app.command() -def tui() -> None: +def tui( + unprivileged: bool = typer.Option( + settings.unprivileged, + "--unprivileged", + help="Run without sudo using TCP connect scans (macOS/no-root).", + ), +) -> None: """Launch the interactive Textual TUI.""" + if unprivileged: + update_setting("unprivileged", True) EdgeWalkerApp().run() diff --git a/src/edgewalker/cli/controller.py b/src/edgewalker/cli/controller.py index 6e4b649..f0cd3c0 100644 --- a/src/edgewalker/cli/controller.py +++ b/src/edgewalker/cli/controller.py @@ -35,7 +35,11 @@ def __init__(self, scanner_service: Optional[ScannerService] = None) -> None: self.scanner = scanner_service or ScannerService() async def run_port_scan( - self, full: bool = False, target: str = None + self, + full: bool = False, + target: str = None, + unprivileged: bool = False, + verbose: bool = False, ) -> Optional[PortScanModel]: """Run port scan and display results asynchronously.""" scan_type = "FULL" if full else "QUICK" @@ -50,7 +54,9 @@ async def run_port_scan( try: with utils.console.status(f"[bold green]Scanning {target}..."): - results = await self.scanner.perform_port_scan(target=target, full=full) + results = await self.scanner.perform_port_scan( + target=target, full=full, unprivileged=unprivileged, verbose=verbose + ) except Exception as e: logger.error(f"Scan failed: {str(e)}") logger.error("Make sure nmap is installed and you have sudo privileges.") diff --git a/src/edgewalker/cli/guided.py b/src/edgewalker/cli/guided.py index cbac680..2ba0c11 100644 --- a/src/edgewalker/cli/guided.py +++ b/src/edgewalker/cli/guided.py @@ -30,6 +30,8 @@ async def automatic_mode( full_scan: Optional[bool] = None, target: Optional[str] = None, full_creds: bool = False, + unprivileged: bool = False, + verbose: bool = False, ) -> None: """Run the automatic guided security assessment asynchronously.""" # Step 1: Choose scan type @@ -49,7 +51,9 @@ async def automatic_mode( # Step 3: Run port scan utils.console.print() logger.info(f"Starting {scan_type.lower()} port scan on {target}...") - port_results = await self.controller.run_port_scan(full=full_scan, target=target) + port_results = await self.controller.run_port_scan( + full=full_scan, target=target, unprivileged=unprivileged, verbose=verbose + ) if not port_results: logger.error("Port scan failed. Returning to mode selection.") diff --git a/src/edgewalker/core/config.py b/src/edgewalker/core/config.py index ac13f89..0af29f9 100644 --- a/src/edgewalker/core/config.py +++ b/src/edgewalker/core/config.py @@ -308,6 +308,11 @@ def handle_demo_mode(cls, v: Path) -> Path: description="Run in non-interactive mode (bypass prompts)", ) + unprivileged: bool = Field( + default=False, + description="Run without sudo using TCP connect scans (macOS/no-root).", + ) + suppress_warnings: bool = Field( default=False, description="Suppress configuration and security warnings in the console", @@ -489,6 +494,18 @@ def init_config() -> None: for warning in settings.get_security_warnings(): logger.warning(warning) + # Check for root ownership of config file (common if run with sudo first) + if config_file.exists() and os.getuid() != 0: + try: + if config_file.stat().st_uid == 0: + logger.warning( + f"Config file '{config_file}' is owned by root. " + "Settings changes will not be saved. " + f"Run 'sudo chown {os.getlogin()} \"{config_file}\"' to fix." + ) + except (OSError, AttributeError): + pass + if not config_file.exists(): save_settings(settings) else: @@ -517,9 +534,18 @@ def save_settings(settings_obj: Settings) -> None: data["theme"] = settings_obj.theme # Open with restricted permissions (0o600: read/write for owner only) - fd = os.open(config_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - with os.fdopen(fd, "w", encoding="utf-8") as f: - yaml.dump(data, f, sort_keys=False) + try: + fd = os.open(config_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + yaml.dump(data, f, sort_keys=False) + except PermissionError: + logger.warning( + f"Permission denied when saving config to '{config_file}'. " + "If you previously ran with sudo, you may need to fix file ownership: " + f'sudo chown {os.getlogin()} "{config_file}"' + ) + except Exception as e: + logger.error(f"Failed to save settings to {config_file}: {e}") def update_setting(key: str, value: object) -> None: diff --git a/src/edgewalker/core/scanner_service.py b/src/edgewalker/core/scanner_service.py index 350337f..df59b3c 100644 --- a/src/edgewalker/core/scanner_service.py +++ b/src/edgewalker/core/scanner_service.py @@ -81,7 +81,13 @@ def submit_scan_data(self, module: str, data: dict) -> None: # No loop running, use sync version self.telemetry.submit_scan_data_sync(module, data) - async def perform_port_scan(self, target: str, full: bool = False) -> PortScanModel: + async def perform_port_scan( + self, + target: str, + full: bool = False, + unprivileged: bool = False, + verbose: bool = False, + ) -> PortScanModel: """Perform a port scan and return results as a model asynchronously.""" if self.demo_mode and self.demo_service: return await self.demo_service.perform_port_scan(target, full) @@ -94,11 +100,17 @@ async def perform_port_scan(self, target: str, full: bool = False) -> PortScanMo if full: results = await port_scan.full_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, + verbose=verbose, + progress_callback=self.progress_callback, + unprivileged=unprivileged, ) else: results = await port_scan.quick_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, + verbose=verbose, + progress_callback=self.progress_callback, + unprivileged=unprivileged, ) if isinstance(results, dict): diff --git a/src/edgewalker/modules/port_scan/scanner.py b/src/edgewalker/modules/port_scan/scanner.py index cf2c67d..56dde8a 100644 --- a/src/edgewalker/modules/port_scan/scanner.py +++ b/src/edgewalker/modules/port_scan/scanner.py @@ -127,8 +127,11 @@ def fix_nmap_permissions() -> bool: return False -def check_privileges() -> str | None: +def check_privileges(unprivileged: bool = False) -> str | None: """Check for root/sudo privileges or nmap capabilities.""" + if unprivileged: + return None + if check_nmap_permissions(): return None @@ -141,8 +144,11 @@ def check_privileges() -> str | None: return "Port scanning requires root privileges on this OS. Please run with sudo." -def get_nmap_command() -> list[str]: +def get_nmap_command(unprivileged: bool = False) -> list[str]: """Return the base nmap command with sudo if necessary.""" + if unprivileged: + return ["nmap"] + if check_nmap_permissions(): return ["nmap"] @@ -250,6 +256,7 @@ async def _scan_batch( batch_label: str = "", progress_callback: Callable[[str, str], None] | None = None, rich_progress: Optional[tuple[utils.Progress, utils.TaskID]] = None, + unprivileged: bool = False, ) -> tuple[str, set[str]]: """Run one nmap subprocess on a batch of hosts asynchronously.""" if not hosts: @@ -259,7 +266,8 @@ async def _scan_batch( xml_path = xml_fd.name xml_fd.close() - cmd = get_nmap_command() + extra_flags + flags = (["--unprivileged"] + extra_flags) if unprivileged else extra_flags + cmd = get_nmap_command(unprivileged=unprivileged) + flags if ports: cmd += ["-p", ports] cmd += ["-oX", xml_path, "-v", "--stats-every", "10s", "--open"] @@ -358,6 +366,7 @@ async def _parallel_scan( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run parallel scans across hosts asynchronously.""" if not live_hosts: @@ -377,6 +386,7 @@ async def _parallel_scan( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -405,6 +415,7 @@ async def _parallel_scan( label, progress_callback, rich_progress, + unprivileged, ) ) @@ -431,6 +442,7 @@ async def _probe_services( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run service/OS probes per host asynchronously.""" if not host_ports: @@ -455,6 +467,7 @@ async def _probe_services( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -482,6 +495,7 @@ async def _probe_services( ip, progress_callback, rich_progress, + unprivileged, ) ) @@ -516,6 +530,7 @@ def __init__( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> None: """Initialize the PortScanner. @@ -523,10 +538,12 @@ def __init__( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. """ self.target = target or get_default_target() self.verbose = verbose self.progress_callback = progress_callback + self.unprivileged = unprivileged async def scan(self, **kwargs: object) -> PortScanModel: """Execute the scan asynchronously (ScanModule interface).""" @@ -541,7 +558,7 @@ async def quick_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -551,7 +568,9 @@ async def quick_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep( + self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged + ) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -585,6 +604,7 @@ async def quick_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: all_xml, all_hosts_found = await _parallel_scan( @@ -594,6 +614,7 @@ async def quick_scan(self) -> PortScanModel: settings.nmap_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except FileNotFoundError: return PortScanModel( @@ -639,7 +660,7 @@ async def full_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -648,7 +669,9 @@ async def full_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep( + self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged + ) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -682,6 +705,7 @@ async def full_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: disc_xml, _ = await _parallel_scan( @@ -691,6 +715,7 @@ async def full_scan(self) -> PortScanModel: settings.nmap_full_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except Exception: disc_xml = [] @@ -732,10 +757,16 @@ async def full_scan(self) -> PortScanModel: if self.verbose: with utils.get_progress() as progress: probe_xml, _ = await _probe_services( - host_ports, self.verbose, self.progress_callback, progress + host_ports, + self.verbose, + self.progress_callback, + progress, + unprivileged=self.unprivileged, ) else: - probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback) + probe_xml, _ = await _probe_services( + host_ports, self.verbose, self.progress_callback, unprivileged=self.unprivileged + ) hosts_by_ip = {} for xml_data in probe_xml: for host in parse_nmap_xml(xml_data): @@ -764,6 +795,7 @@ async def scan( full: bool = False, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a port scan asynchronously. @@ -772,11 +804,12 @@ async def scan( full: Whether to perform a full scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - scanner = PortScanner(target, verbose, progress_callback) + scanner = PortScanner(target, verbose, progress_callback, unprivileged=unprivileged) return await scanner.full_scan() if full else await scanner.quick_scan() @@ -784,6 +817,7 @@ async def quick_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a quick scan of common IoT ports asynchronously. @@ -791,17 +825,21 @@ async def quick_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).quick_scan() + return await PortScanner( + target, verbose, progress_callback, unprivileged=unprivileged + ).quick_scan() async def full_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a full scan of all ports with OS detection asynchronously. @@ -809,17 +847,21 @@ async def full_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).full_scan() + return await PortScanner( + target, verbose, progress_callback, unprivileged=unprivileged + ).full_scan() async def ping_sweep( target: str, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> list[str]: """Quick ping sweep to find live hosts asynchronously. @@ -827,6 +869,7 @@ async def ping_sweep( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: List of live host IP addresses. @@ -841,7 +884,8 @@ async def ping_sweep( if err: raise ValueError(err) - cmd = get_nmap_command() + ["-sn", "-T4", target] + flags = ["--unprivileged"] if unprivileged else [] + cmd = get_nmap_command(unprivileged=unprivileged) + flags + ["-sn", "-T4", target] logger.debug(f"Executing ping sweep: {' '.join(cmd)}") live_hosts = [] try: diff --git a/src/edgewalker/tui/app.py b/src/edgewalker/tui/app.py index b8cd1de..f1d085a 100644 --- a/src/edgewalker/tui/app.py +++ b/src/edgewalker/tui/app.py @@ -3,7 +3,6 @@ from __future__ import annotations # Standard Library -import sys import threading from typing import TYPE_CHECKING, Iterable @@ -261,30 +260,25 @@ def _check_config_overrides(self) -> bool: return False def _check_nmap_permissions(self) -> None: - """Check nmap permissions early and offer fix if on Linux.""" + """Check nmap permissions early and offer fix or unprivileged mode.""" self.has_nmap_permissions = check_nmap_permissions() - if self.has_nmap_permissions: + if self.has_nmap_permissions or settings.unprivileged: return - if sys.platform.startswith("linux"): - - def on_fix_confirmed(confirmed: bool) -> None: - if confirmed: - with self.suspend(): - success = fix_nmap_permissions() - if success: - self.has_nmap_permissions = True - self.notify("Permissions fixed!") - else: - self.notify("Failed to fix permissions.", severity="error") - - self.push_screen(PermissionModal(), on_fix_confirmed) - elif sys.platform == "darwin": - self.notify( - "Port scanning requires sudo on macOS. Please restart with 'sudo edgewalker'.", - severity="warning", - timeout=10, - ) + def on_permission_choice(choice: str) -> None: + if choice == "fix": + with self.suspend(): + success = fix_nmap_permissions() + if success: + self.has_nmap_permissions = True + self.notify("Permissions fixed!") + else: + self.notify("Failed to fix permissions.", severity="error") + elif choice == "unprivileged": + update_setting("unprivileged", True) + self.notify("Switched to Unprivileged Mode (TCP Connect scans).") + + self.push_screen(PermissionModal(), on_permission_choice) def get_system_commands(self, screen: Screen) -> Iterable[Hit]: """Filter system commands to remove built-in theme switching.""" diff --git a/src/edgewalker/tui/modals/dialogs.py b/src/edgewalker/tui/modals/dialogs.py index 0b04a3d..3fd7a84 100644 --- a/src/edgewalker/tui/modals/dialogs.py +++ b/src/edgewalker/tui/modals/dialogs.py @@ -2,6 +2,9 @@ from __future__ import annotations +# Standard Library +import sys + # Third Party from textual.app import ComposeResult from textual.containers import Container, Horizontal @@ -178,29 +181,45 @@ def on_button_pressed(self, event: Button.Pressed) -> None: self.dismiss(False) -class PermissionModal(ModalScreen[bool]): - """Modal dialog for fixing nmap permissions.""" +class PermissionModal(ModalScreen[str]): + """Modal dialog for fixing nmap permissions or switching to unprivileged mode.""" def compose(self) -> ComposeResult: """Compose the modal layout.""" + is_linux = sys.platform.startswith("linux") + with Container(id="permission-dialog", classes="modal-container"): yield Static("NMAP PERMISSIONS REQUIRED", id="permission-title", classes="modal-title") - yield Static( + + msg = ( "Nmap requires elevated privileges for raw socket access " "(SYN scans and OS detection).\n\n" - "Would you like to apply a one-time permission fix? " - "This will allow EdgeWalker to run scans without sudo.\n\n" - "[bold]Requires sudo password.[/]", - id="permission-text", - classes="modal-body", ) + if is_linux: + msg += ( + "Would you like to apply a one-time permission fix? " + "This will allow EdgeWalker to run scans without sudo.\n\n" + "[bold]Requires sudo password.[/]" + ) + else: + msg += ( + "On macOS, you must run with 'sudo edgewalker' for full scans, " + "or use Unprivileged Mode (TCP Connect scans)." + ) + + yield Static(msg, id="permission-text", classes="modal-body") + with Horizontal(id="permission-buttons", classes="modal-buttons"): yield Button("Cancel", variant="default", id="perm-no") - yield Button("Apply Fix", variant="success", id="perm-yes") + if is_linux: + yield Button("Apply Fix", variant="success", id="perm-fix") + yield Button("Unprivileged Mode", variant="primary", id="perm-unprivileged") def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button presses.""" - if event.button.id == "perm-yes": - self.dismiss(True) + if event.button.id == "perm-fix": + self.dismiss("fix") + elif event.button.id == "perm-unprivileged": + self.dismiss("unprivileged") else: - self.dismiss(False) + self.dismiss("cancel") diff --git a/src/edgewalker/tui/screens/config.py b/src/edgewalker/tui/screens/config.py index cfa513c..d12eac4 100644 --- a/src/edgewalker/tui/screens/config.py +++ b/src/edgewalker/tui/screens/config.py @@ -115,6 +115,17 @@ def compose(self) -> ComposeResult: classes="config-help", ) + with Horizontal(classes="config-row"): + yield Label( + f"Unprivileged Mode{self._get_override_label('unprivileged')}", + classes="config-label", + ) + yield Switch(value=settings.unprivileged, id="unprivileged") + yield Label( + "Run without sudo using TCP connect scans (macOS/no-root).", + classes="config-help", + ) + with Horizontal(classes="config-row"): yield Label("Device ID", classes="config-label") yield Label(settings.device_id, id="device_id_label") @@ -479,6 +490,7 @@ def action_save_and_exit(self) -> None: # Collect and update all settings dynamically based on widget IDs simple_fields = [ "telemetry_enabled", + "unprivileged", "nvd_api_key", "mac_api_key", "nvd_api_url", diff --git a/src/edgewalker/tui/screens/dashboard.py b/src/edgewalker/tui/screens/dashboard.py index 00bc81b..7e44790 100644 --- a/src/edgewalker/tui/screens/dashboard.py +++ b/src/edgewalker/tui/screens/dashboard.py @@ -5,7 +5,6 @@ # Standard Library import io import json -import sys from typing import Callable # Third Party @@ -20,7 +19,7 @@ # First Party from edgewalker import theme -from edgewalker.core.config import get_active_overrides, settings +from edgewalker.core.config import get_active_overrides, settings, update_setting from edgewalker.display import ( build_credential_display, build_port_scan_display, @@ -314,7 +313,7 @@ def action_quick_scan(self) -> None: """Start a guided quick scan.""" if self.app.is_scanning: return - if not getattr(self.app, "has_nmap_permissions", True): + if not getattr(self.app, "has_nmap_permissions", True) and not settings.unprivileged: self.notify("Port scanning requires elevated privileges.", severity="error") return @@ -339,7 +338,7 @@ def action_full_scan(self) -> None: """Start a guided full scan.""" if self.app.is_scanning: return - if not getattr(self.app, "has_nmap_permissions", True): + if not getattr(self.app, "has_nmap_permissions", True) and not settings.unprivileged: self.notify("Port scanning requires elevated privileges.", severity="error") return @@ -382,7 +381,9 @@ async def _run_guided_port_scan(self) -> None: scan_label = "full" if self._full_scan else "quick IoT" self._show_loading(f"Running {scan_label} scan on {target}...") try: - results = await self.app.scanner.perform_port_scan(target=target, full=self._full_scan) + results = await self.app.scanner.perform_port_scan( + target=target, full=self._full_scan, unprivileged=settings.unprivileged + ) self._on_guided_port_done(results) except PermissionError as e: self._handle_permission_error(str(e)) @@ -390,17 +391,12 @@ async def _run_guided_port_scan(self) -> None: self._on_scan_error(f"Port scan failed: {str(e)}") def _handle_permission_error(self, error: str) -> None: - """Handle permission errors by offering to fix them.""" + """Handle permission errors by offering to fix them or switch to unprivileged mode.""" self.app.is_scanning = False self._auto_run = False - # Only offer the fix on Linux - if not sys.platform.startswith("linux"): - self._on_scan_error(error) - return - - def on_fix_confirmed(confirmed: bool) -> None: - if confirmed: + def on_permission_choice(choice: str) -> None: + if choice == "fix": # Suspend textual to allow sudo prompt in terminal try: with self.app.suspend(): @@ -415,10 +411,14 @@ def on_fix_confirmed(confirmed: bool) -> None: else: self.notify("Failed to fix permissions.", severity="error") self._on_scan_error(error) + elif choice == "unprivileged": + update_setting("unprivileged", True) + self.notify("Switched to Unprivileged Mode. Retrying scan...") + self._run_guided_port_scan() else: self._on_scan_error(error) - self.app.push_screen(PermissionModal(), on_fix_confirmed) + self.app.push_screen(PermissionModal(), on_permission_choice) def _on_guided_port_done(self, results: object) -> None: """Handle completion of the guided port scan.""" diff --git a/src/edgewalker/tui/screens/home.py b/src/edgewalker/tui/screens/home.py index f090f70..9bcb05e 100644 --- a/src/edgewalker/tui/screens/home.py +++ b/src/edgewalker/tui/screens/home.py @@ -70,9 +70,12 @@ def on_mount(self) -> None: def _update_permissions(self) -> None: """Update UI based on nmap permissions.""" + # First Party + from edgewalker.core.config import settings # noqa: PLC0415 + has_perms = self.app.has_nmap_permissions btn_scan = self.query_one("#btn-scan", Button) - btn_scan.disabled = not has_perms + btn_scan.disabled = not (has_perms or settings.unprivileged) def watch_app_has_nmap_permissions(self, has_perms: bool) -> None: """React to permission changes.""" @@ -81,9 +84,10 @@ def watch_app_has_nmap_permissions(self, has_perms: bool) -> None: def action_start_guided(self) -> None: """Start the guided scan workflow.""" # First Party + from edgewalker.core.config import settings # noqa: PLC0415 from edgewalker.tui.screens.guided import GuidedAssessmentScreen # noqa: PLC0415 - if self.app.has_nmap_permissions: + if self.app.has_nmap_permissions or settings.unprivileged: self.app.push_screen(GuidedAssessmentScreen()) else: self.notify("Port scanning requires elevated privileges.", severity="error") @@ -116,4 +120,4 @@ def action_select_report(self) -> None: def on_screen_resume(self) -> None: """Handle screen resume.""" - pass + self._update_permissions() diff --git a/tests/test_cli.py b/tests/test_cli.py index 6581301..7545530 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -68,7 +68,9 @@ async def test_run_port_scan(mock_submit, mock_save, mock_input, mock_quick, moc mock_quick.return_value = mock_results res = await cli.ScanController().run_port_scan(full=False) assert res.model_dump(mode="json") == mock_results.model_dump(mode="json") - mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False + ) @pytest.mark.asyncio @@ -155,7 +157,7 @@ def test_typer_scan(mock_guided_cls, mock_logo): result = runner.invoke(app, ["scan", "--target", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=False + full_scan=False, target="1.1.1.1", full_creds=False, unprivileged=False, verbose=False ) @@ -166,7 +168,7 @@ def test_typer_scan_full(mock_guided_cls): result = runner.invoke(app, ["scan", "--full", "-t", "1.1.1.1", "--full-creds"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=True, target="1.1.1.1", full_creds=True + full_scan=True, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -177,7 +179,7 @@ def test_typer_scan_full_creds(mock_guided_cls): result = runner.invoke(app, ["scan", "--full-creds", "-t", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=True + full_scan=False, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -476,6 +478,28 @@ def test_prompt_next_scan_suggest_cve(mock_run, mock_input, mock_status): assert mock_run.called +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_verbose_flag(mock_telemetry, mock_auto): + """--verbose flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--verbose", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("verbose") is True + + +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_unprivileged_flag(mock_telemetry, mock_auto): + """--unprivileged flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--unprivileged", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("unprivileged") is True + + @patch("edgewalker.utils.has_any_results", return_value=True) @patch("edgewalker.cli.results.settings") @patch("edgewalker.utils.get_input", side_effect=["1", "", "0"]) diff --git a/tests/test_port_scan.py b/tests/test_port_scan.py index d3725aa..b1dbd56 100644 --- a/tests/test_port_scan.py +++ b/tests/test_port_scan.py @@ -309,3 +309,81 @@ async def test_probe_services(mock_batch): res_xml, res_found = await scanner._probe_services({"1.1.1.1": [80, 443]}) assert len(res_xml) == 1 assert "1.1.1.1" in res_found + + +# --- Unprivileged mode tests --- + + +def test_check_privileges_unprivileged_mode(): + """check_privileges returns None immediately when unprivileged=True.""" + # Even without root, unprivileged mode skips the check + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.check_privileges(unprivileged=True) is None + + +def test_get_nmap_command_unprivileged(): + """get_nmap_command returns ['nmap'] when unprivileged=True, even without root.""" + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.get_nmap_command(unprivileged=True) == ["nmap"] + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +@patch("tempfile.NamedTemporaryFile") +@patch("builtins.open", new_callable=mock_open, read_data="") +@patch("os.unlink") +async def test_scan_batch_adds_unprivileged_flag(mock_unlink, mock_file, mock_temp, mock_exec): + """_scan_batch prepends --unprivileged to the nmap command when unprivileged=True.""" + mock_temp.return_value.name = "test.xml" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b""] + mock_proc.wait.return_value = 0 + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + await scanner._scan_batch(["1.1.1.1"], "80", [], 10, unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +async def test_ping_sweep_unprivileged(mock_exec): + """ping_sweep passes --unprivileged flag to nmap when unprivileged=True.""" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b"Nmap scan report for 1.1.1.1", b""] + mock_proc.wait.return_value = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + res = await scanner.ping_sweep("1.1.1.0/24", unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + assert "1.1.1.1" in res + + +def test_port_scanner_stores_unprivileged(): + """PortScanner stores the unprivileged flag.""" + ps = scanner.PortScanner(unprivileged=True) + assert ps.unprivileged is True + + ps_default = scanner.PortScanner() + assert ps_default.unprivileged is False + + +@pytest.mark.asyncio +@patch("edgewalker.modules.port_scan.scanner.ping_sweep", new_callable=AsyncMock, return_value=[]) +@patch("edgewalker.modules.port_scan.scanner._parallel_scan", new_callable=AsyncMock) +async def test_quick_scan_unprivileged_skips_permission_check(mock_parallel, mock_ping): + """quick_scan with unprivileged=True does not raise PermissionError.""" + mock_parallel.return_value = ([], set()) + # On macOS without root, normal mode would raise PermissionError + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + ps = scanner.PortScanner(target="1.1.1.0/24", unprivileged=True) + result = await ps.quick_scan() + assert result.success is True diff --git a/tests/test_scanner_service.py b/tests/test_scanner_service.py index 874358f..56cc40a 100644 --- a/tests/test_scanner_service.py +++ b/tests/test_scanner_service.py @@ -242,3 +242,49 @@ def test_global_submit_scan_data_async(): mock_loop.return_value = mock_loop_instance submit_scan_data("test", {}) mock_loop_instance.create_task.assert_called_once() + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_verbose(scanner_service): + """perform_port_scan passes verbose flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, verbose=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", + verbose=True, + progress_callback=scanner_service.progress_callback, + unprivileged=False, + ) + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_unprivileged(scanner_service): + """perform_port_scan passes unprivileged flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, unprivileged=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", + verbose=False, + progress_callback=scanner_service.progress_callback, + unprivileged=True, + ) + + with patch( + "edgewalker.modules.port_scan.full_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_full: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=True, unprivileged=True) + mock_full.assert_called_once_with( + target="1.1.1.1", + verbose=False, + progress_callback=scanner_service.progress_callback, + unprivileged=True, + )