From 80f135532d659fdfe7bdba80660b626f0d1762ff Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:35:16 +0000 Subject: [PATCH 1/2] feat: add --unprivileged and --verbose flags to scan command Threads unprivileged=True from CLI through to nmap, bypassing the privilege check and passing --unprivileged to all nmap invocations (ping sweep, port scan, service probe) so users on macOS can run without sudo using TCP connect scans. Also threads verbose=True from CLI through to the scanner so --verbose prints phase-by-phase nmap progress, discovered hosts, and open ports as they are found. --- src/edgewalker/cli/cli.py | 8 ++- src/edgewalker/cli/controller.py | 4 +- src/edgewalker/cli/guided.py | 4 +- src/edgewalker/core/scanner_service.py | 6 +- src/edgewalker/modules/port_scan/scanner.py | 56 +++++++++++---- tests/test_cli.py | 30 ++++++-- tests/test_port_scan.py | 78 +++++++++++++++++++++ tests/test_scanner_service.py | 37 ++++++++++ 8 files changed, 199 insertions(+), 24 deletions(-) diff --git a/src/edgewalker/cli/cli.py b/src/edgewalker/cli/cli.py index b3c9280..3d55586 100644 --- a/src/edgewalker/cli/cli.py +++ b/src/edgewalker/cli/cli.py @@ -168,6 +168,12 @@ def run_guided_scan( "-ao", help="Allow scan to proceed with active configuration overrides.", ), + unprivileged: bool = typer.Option( + False, "--unprivileged", help="Run without sudo using TCP connect scans (macOS/no-root)." + ), + verbose: bool = typer.Option( + False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports." + ), ) -> None: """Run a guided security scan. @@ -217,7 +223,7 @@ def run_guided_scan( ensure_telemetry_choice() controller = ScanController() guided = GuidedScanner(controller) - asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds)) + asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds, unprivileged=unprivileged, verbose=verbose)) @app.command() diff --git a/src/edgewalker/cli/controller.py b/src/edgewalker/cli/controller.py index 6e4b649..4c6a8f7 100644 --- a/src/edgewalker/cli/controller.py +++ b/src/edgewalker/cli/controller.py @@ -35,7 +35,7 @@ def __init__(self, scanner_service: Optional[ScannerService] = None) -> None: self.scanner = scanner_service or ScannerService() async def run_port_scan( - self, full: bool = False, target: str = None + self, full: bool = False, target: str = None, unprivileged: bool = False, verbose: bool = False ) -> Optional[PortScanModel]: """Run port scan and display results asynchronously.""" scan_type = "FULL" if full else "QUICK" @@ -50,7 +50,7 @@ async def run_port_scan( try: with utils.console.status(f"[bold green]Scanning {target}..."): - results = await self.scanner.perform_port_scan(target=target, full=full) + results = await self.scanner.perform_port_scan(target=target, full=full, unprivileged=unprivileged, verbose=verbose) except Exception as e: logger.error(f"Scan failed: {str(e)}") logger.error("Make sure nmap is installed and you have sudo privileges.") diff --git a/src/edgewalker/cli/guided.py b/src/edgewalker/cli/guided.py index cbac680..ba2c0bc 100644 --- a/src/edgewalker/cli/guided.py +++ b/src/edgewalker/cli/guided.py @@ -30,6 +30,8 @@ async def automatic_mode( full_scan: Optional[bool] = None, target: Optional[str] = None, full_creds: bool = False, + unprivileged: bool = False, + verbose: bool = False, ) -> None: """Run the automatic guided security assessment asynchronously.""" # Step 1: Choose scan type @@ -49,7 +51,7 @@ async def automatic_mode( # Step 3: Run port scan utils.console.print() logger.info(f"Starting {scan_type.lower()} port scan on {target}...") - port_results = await self.controller.run_port_scan(full=full_scan, target=target) + port_results = await self.controller.run_port_scan(full=full_scan, target=target, unprivileged=unprivileged, verbose=verbose) if not port_results: logger.error("Port scan failed. Returning to mode selection.") diff --git a/src/edgewalker/core/scanner_service.py b/src/edgewalker/core/scanner_service.py index 350337f..b05c849 100644 --- a/src/edgewalker/core/scanner_service.py +++ b/src/edgewalker/core/scanner_service.py @@ -81,7 +81,7 @@ def submit_scan_data(self, module: str, data: dict) -> None: # No loop running, use sync version self.telemetry.submit_scan_data_sync(module, data) - async def perform_port_scan(self, target: str, full: bool = False) -> PortScanModel: + async def perform_port_scan(self, target: str, full: bool = False, unprivileged: bool = False, verbose: bool = False) -> PortScanModel: """Perform a port scan and return results as a model asynchronously.""" if self.demo_mode and self.demo_service: return await self.demo_service.perform_port_scan(target, full) @@ -94,11 +94,11 @@ async def perform_port_scan(self, target: str, full: bool = False) -> PortScanMo if full: results = await port_scan.full_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) else: results = await port_scan.quick_scan( - target=target, verbose=False, progress_callback=self.progress_callback + target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged ) if isinstance(results, dict): diff --git a/src/edgewalker/modules/port_scan/scanner.py b/src/edgewalker/modules/port_scan/scanner.py index e7890fe..d911318 100644 --- a/src/edgewalker/modules/port_scan/scanner.py +++ b/src/edgewalker/modules/port_scan/scanner.py @@ -126,8 +126,11 @@ def fix_nmap_permissions() -> bool: return False -def check_privileges() -> str | None: +def check_privileges(unprivileged: bool = False) -> str | None: """Check for root/sudo privileges or nmap capabilities.""" + if unprivileged: + return None + if check_nmap_permissions(): return None @@ -140,8 +143,11 @@ def check_privileges() -> str | None: return "Port scanning requires root privileges on this OS. Please run with sudo." -def get_nmap_command() -> list[str]: +def get_nmap_command(unprivileged: bool = False) -> list[str]: """Return the base nmap command with sudo if necessary.""" + if unprivileged: + return ["nmap"] + if check_nmap_permissions(): return ["nmap"] @@ -245,6 +251,7 @@ async def _scan_batch( batch_label: str = "", progress_callback: Callable[[str, str], None] | None = None, rich_progress: Optional[tuple[utils.Progress, utils.TaskID]] = None, + unprivileged: bool = False, ) -> tuple[str, set[str]]: """Run one nmap subprocess on a batch of hosts asynchronously.""" if not hosts: @@ -254,7 +261,8 @@ async def _scan_batch( xml_path = xml_fd.name xml_fd.close() - cmd = get_nmap_command() + extra_flags + flags = (["--unprivileged"] + extra_flags) if unprivileged else extra_flags + cmd = get_nmap_command(unprivileged=unprivileged) + flags if ports: cmd += ["-p", ports] cmd += ["-oX", xml_path, "-v", "--stats-every", "10s", "--open"] @@ -349,6 +357,7 @@ async def _parallel_scan( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run parallel scans across hosts asynchronously.""" if not live_hosts: @@ -368,6 +377,7 @@ async def _parallel_scan( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -396,6 +406,7 @@ async def _parallel_scan( label, progress_callback, rich_progress, + unprivileged, ) ) @@ -422,6 +433,7 @@ async def _probe_services( verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, progress: Optional[utils.Progress] = None, + unprivileged: bool = False, ) -> tuple[list[str], set[str]]: """Run service/OS probes per host asynchronously.""" if not host_ports: @@ -446,6 +458,7 @@ async def _probe_services( verbose, progress_callback=progress_callback, rich_progress=rich_progress, + unprivileged=unprivileged, ) return ([xml_data] if xml_data else [], found) @@ -473,6 +486,7 @@ async def _probe_services( ip, progress_callback, rich_progress, + unprivileged, ) ) @@ -507,6 +521,7 @@ def __init__( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> None: """Initialize the PortScanner. @@ -514,10 +529,12 @@ def __init__( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. """ self.target = target or get_default_target() self.verbose = verbose self.progress_callback = progress_callback + self.unprivileged = unprivileged async def scan(self, **kwargs: object) -> PortScanModel: """Execute the scan asynchronously (ScanModule interface).""" @@ -531,7 +548,7 @@ async def quick_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -541,7 +558,7 @@ async def quick_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -575,6 +592,7 @@ async def quick_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: all_xml, all_hosts_found = await _parallel_scan( @@ -584,6 +602,7 @@ async def quick_scan(self) -> PortScanModel: settings.nmap_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except FileNotFoundError: return PortScanModel( @@ -628,7 +647,7 @@ async def full_scan(self) -> PortScanModel: err = validate_target(self.target) if err: raise ValueError(err) - err = check_privileges() + err = check_privileges(unprivileged=self.unprivileged) if err: raise PermissionError(err) @@ -637,7 +656,7 @@ async def full_scan(self) -> PortScanModel: print(f"{Colors.CYAN}-{Colors.RESET}" * 50) sys.stdout.flush() - live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback) + live_hosts = await ping_sweep(self.target, self.verbose, self.progress_callback, unprivileged=self.unprivileged) if not live_hosts: return PortScanModel( id=str(uuid.uuid4()), @@ -671,6 +690,7 @@ async def full_scan(self) -> PortScanModel: self.verbose, self.progress_callback, progress, + unprivileged=self.unprivileged, ) else: disc_xml, _ = await _parallel_scan( @@ -680,6 +700,7 @@ async def full_scan(self) -> PortScanModel: settings.nmap_full_timeout, self.verbose, self.progress_callback, + unprivileged=self.unprivileged, ) except Exception: disc_xml = [] @@ -721,10 +742,10 @@ async def full_scan(self) -> PortScanModel: if self.verbose: with utils.get_progress() as progress: probe_xml, _ = await _probe_services( - host_ports, self.verbose, self.progress_callback, progress + host_ports, self.verbose, self.progress_callback, progress, unprivileged=self.unprivileged ) else: - probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback) + probe_xml, _ = await _probe_services(host_ports, self.verbose, self.progress_callback, unprivileged=self.unprivileged) hosts_by_ip = {} for xml_data in probe_xml: for host in parse_nmap_xml(xml_data): @@ -753,6 +774,7 @@ async def scan( full: bool = False, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a port scan asynchronously. @@ -761,11 +783,12 @@ async def scan( full: Whether to perform a full scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - scanner = PortScanner(target, verbose, progress_callback) + scanner = PortScanner(target, verbose, progress_callback, unprivileged=unprivileged) return await scanner.full_scan() if full else await scanner.quick_scan() @@ -773,6 +796,7 @@ async def quick_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a quick scan of common IoT ports asynchronously. @@ -780,17 +804,19 @@ async def quick_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).quick_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).quick_scan() async def full_scan( target: str | None = None, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> PortScanModel: """Perform a full scan of all ports with OS detection asynchronously. @@ -798,17 +824,19 @@ async def full_scan( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: PortScanModel with scan results. """ - return await PortScanner(target, verbose, progress_callback).full_scan() + return await PortScanner(target, verbose, progress_callback, unprivileged=unprivileged).full_scan() async def ping_sweep( target: str, verbose: bool = False, progress_callback: Callable[[str, str], None] | None = None, + unprivileged: bool = False, ) -> list[str]: """Quick ping sweep to find live hosts asynchronously. @@ -816,6 +844,7 @@ async def ping_sweep( target: IP, CIDR range, or hostname to scan. verbose: Whether to print verbose output. progress_callback: Optional callback for progress updates. + unprivileged: Run without sudo using TCP connect scans. Returns: List of live host IP addresses. @@ -830,7 +859,8 @@ async def ping_sweep( if err: raise ValueError(err) - cmd = get_nmap_command() + ["-sn", "-T4", target] + flags = ["--unprivileged"] if unprivileged else [] + cmd = get_nmap_command(unprivileged=unprivileged) + flags + ["-sn", "-T4", target] live_hosts = [] try: process = await asyncio.create_subprocess_exec( diff --git a/tests/test_cli.py b/tests/test_cli.py index 6581301..0cb2368 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -68,7 +68,7 @@ async def test_run_port_scan(mock_submit, mock_save, mock_input, mock_quick, moc mock_quick.return_value = mock_results res = await cli.ScanController().run_port_scan(full=False) assert res.model_dump(mode="json") == mock_results.model_dump(mode="json") - mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None) + mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False) @pytest.mark.asyncio @@ -155,7 +155,7 @@ def test_typer_scan(mock_guided_cls, mock_logo): result = runner.invoke(app, ["scan", "--target", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=False + full_scan=False, target="1.1.1.1", full_creds=False, unprivileged=False, verbose=False ) @@ -166,7 +166,7 @@ def test_typer_scan_full(mock_guided_cls): result = runner.invoke(app, ["scan", "--full", "-t", "1.1.1.1", "--full-creds"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=True, target="1.1.1.1", full_creds=True + full_scan=True, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -177,7 +177,7 @@ def test_typer_scan_full_creds(mock_guided_cls): result = runner.invoke(app, ["scan", "--full-creds", "-t", "1.1.1.1"]) assert result.exit_code == 0 mock_guided.automatic_mode.assert_called_once_with( - full_scan=False, target="1.1.1.1", full_creds=True + full_scan=False, target="1.1.1.1", full_creds=True, unprivileged=False, verbose=False ) @@ -476,6 +476,28 @@ def test_prompt_next_scan_suggest_cve(mock_run, mock_input, mock_status): assert mock_run.called +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_verbose_flag(mock_telemetry, mock_auto): + """--verbose flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--verbose", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("verbose") is True + + +@patch("edgewalker.cli.guided.GuidedScanner.automatic_mode", new_callable=AsyncMock) +@patch("edgewalker.utils.ensure_telemetry_choice") +def test_run_guided_scan_unprivileged_flag(mock_telemetry, mock_auto): + """--unprivileged flag is accepted and passed through to automatic_mode.""" + result = runner.invoke(app, ["scan", "--unprivileged", "--target", "1.1.1.1"]) + assert result.exit_code == 0 + mock_auto.assert_called_once() + _, kwargs = mock_auto.call_args + assert kwargs.get("unprivileged") is True + + @patch("edgewalker.utils.has_any_results", return_value=True) @patch("edgewalker.cli.results.settings") @patch("edgewalker.utils.get_input", side_effect=["1", "", "0"]) diff --git a/tests/test_port_scan.py b/tests/test_port_scan.py index d3725aa..b1dbd56 100644 --- a/tests/test_port_scan.py +++ b/tests/test_port_scan.py @@ -309,3 +309,81 @@ async def test_probe_services(mock_batch): res_xml, res_found = await scanner._probe_services({"1.1.1.1": [80, 443]}) assert len(res_xml) == 1 assert "1.1.1.1" in res_found + + +# --- Unprivileged mode tests --- + + +def test_check_privileges_unprivileged_mode(): + """check_privileges returns None immediately when unprivileged=True.""" + # Even without root, unprivileged mode skips the check + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.check_privileges(unprivileged=True) is None + + +def test_get_nmap_command_unprivileged(): + """get_nmap_command returns ['nmap'] when unprivileged=True, even without root.""" + with patch("os.geteuid", return_value=1000): + with patch("sys.platform", "darwin"): + assert scanner.get_nmap_command(unprivileged=True) == ["nmap"] + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +@patch("tempfile.NamedTemporaryFile") +@patch("builtins.open", new_callable=mock_open, read_data="") +@patch("os.unlink") +async def test_scan_batch_adds_unprivileged_flag(mock_unlink, mock_file, mock_temp, mock_exec): + """_scan_batch prepends --unprivileged to the nmap command when unprivileged=True.""" + mock_temp.return_value.name = "test.xml" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b""] + mock_proc.wait.return_value = 0 + mock_proc.returncode = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + await scanner._scan_batch(["1.1.1.1"], "80", [], 10, unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + + +@pytest.mark.asyncio +@patch("asyncio.create_subprocess_exec") +async def test_ping_sweep_unprivileged(mock_exec): + """ping_sweep passes --unprivileged flag to nmap when unprivileged=True.""" + mock_proc = AsyncMock() + mock_proc.stdout.readline.side_effect = [b"Nmap scan report for 1.1.1.1", b""] + mock_proc.wait.return_value = 0 + mock_exec.return_value = mock_proc + + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + res = await scanner.ping_sweep("1.1.1.0/24", unprivileged=True) + + call_args = mock_exec.call_args[0] + assert "--unprivileged" in call_args + assert "1.1.1.1" in res + + +def test_port_scanner_stores_unprivileged(): + """PortScanner stores the unprivileged flag.""" + ps = scanner.PortScanner(unprivileged=True) + assert ps.unprivileged is True + + ps_default = scanner.PortScanner() + assert ps_default.unprivileged is False + + +@pytest.mark.asyncio +@patch("edgewalker.modules.port_scan.scanner.ping_sweep", new_callable=AsyncMock, return_value=[]) +@patch("edgewalker.modules.port_scan.scanner._parallel_scan", new_callable=AsyncMock) +async def test_quick_scan_unprivileged_skips_permission_check(mock_parallel, mock_ping): + """quick_scan with unprivileged=True does not raise PermissionError.""" + mock_parallel.return_value = ([], set()) + # On macOS without root, normal mode would raise PermissionError + with patch("os.geteuid", return_value=1000), patch("sys.platform", "darwin"): + ps = scanner.PortScanner(target="1.1.1.0/24", unprivileged=True) + result = await ps.quick_scan() + assert result.success is True diff --git a/tests/test_scanner_service.py b/tests/test_scanner_service.py index 874358f..c63a356 100644 --- a/tests/test_scanner_service.py +++ b/tests/test_scanner_service.py @@ -242,3 +242,40 @@ def test_global_submit_scan_data_async(): mock_loop.return_value = mock_loop_instance submit_scan_data("test", {}) mock_loop_instance.create_task.assert_called_once() + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_verbose(scanner_service): + """perform_port_scan passes verbose flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, verbose=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=True, progress_callback=scanner_service.progress_callback, unprivileged=False + ) + + +@pytest.mark.asyncio +async def test_perform_port_scan_passes_unprivileged(scanner_service): + """perform_port_scan passes unprivileged flag to port_scan functions.""" + mock_results = PortScanModel(success=True, target="1.1.1.1") + with patch( + "edgewalker.modules.port_scan.quick_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_quick: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=False, unprivileged=True) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) + + with patch( + "edgewalker.modules.port_scan.full_scan", new_callable=AsyncMock, return_value=mock_results + ) as mock_full: + with patch("edgewalker.core.scanner_service.save_results"): + await scanner_service.perform_port_scan("1.1.1.1", full=True, unprivileged=True) + mock_full.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + ) From 1957faee118d5f70060e1364c4b2bef7b2b7138a Mon Sep 17 00:00:00 2001 From: Steven Marks Date: Mon, 9 Mar 2026 15:36:20 +0000 Subject: [PATCH 2/2] test: formatting issue in test file --- tests/test_cli.py | 4 +++- tests/test_scanner_service.py | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 0cb2368..7545530 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -68,7 +68,9 @@ async def test_run_port_scan(mock_submit, mock_save, mock_input, mock_quick, moc mock_quick.return_value = mock_results res = await cli.ScanController().run_port_scan(full=False) assert res.model_dump(mode="json") == mock_results.model_dump(mode="json") - mock_quick.assert_called_once_with(target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False) + mock_quick.assert_called_once_with( + target="1.1.1.1", verbose=False, progress_callback=None, unprivileged=False + ) @pytest.mark.asyncio diff --git a/tests/test_scanner_service.py b/tests/test_scanner_service.py index c63a356..56cc40a 100644 --- a/tests/test_scanner_service.py +++ b/tests/test_scanner_service.py @@ -254,7 +254,10 @@ async def test_perform_port_scan_passes_verbose(scanner_service): with patch("edgewalker.core.scanner_service.save_results"): await scanner_service.perform_port_scan("1.1.1.1", full=False, verbose=True) mock_quick.assert_called_once_with( - target="1.1.1.1", verbose=True, progress_callback=scanner_service.progress_callback, unprivileged=False + target="1.1.1.1", + verbose=True, + progress_callback=scanner_service.progress_callback, + unprivileged=False, ) @@ -268,7 +271,10 @@ async def test_perform_port_scan_passes_unprivileged(scanner_service): with patch("edgewalker.core.scanner_service.save_results"): await scanner_service.perform_port_scan("1.1.1.1", full=False, unprivileged=True) mock_quick.assert_called_once_with( - target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + target="1.1.1.1", + verbose=False, + progress_callback=scanner_service.progress_callback, + unprivileged=True, ) with patch( @@ -277,5 +283,8 @@ async def test_perform_port_scan_passes_unprivileged(scanner_service): with patch("edgewalker.core.scanner_service.save_results"): await scanner_service.perform_port_scan("1.1.1.1", full=True, unprivileged=True) mock_full.assert_called_once_with( - target="1.1.1.1", verbose=False, progress_callback=scanner_service.progress_callback, unprivileged=True + target="1.1.1.1", + verbose=False, + progress_callback=scanner_service.progress_callback, + unprivileged=True, )