From 0949486103b6c421e7e51301c64620bcb962a53b Mon Sep 17 00:00:00 2001 From: Simon Morley Date: Fri, 6 Mar 2026 13:12:22 +0000 Subject: [PATCH 1/2] fix: four bugs in cve_scan and password_scan modules - cve_scan/scanner.py: remove unreachable duplicate code block after return in _scan_service (dead code from a bad copy-paste refactor) - cve_scan/scanner.py: fix NVD rate-limit detection using wrong HTTP status code; NVD returns 429 (Too Many Requests) for rate limits, not 403 (Forbidden). Retrying on 403 was pointless and indicated an auth problem. - password_scan/scanner.py: fix UnboundLocalError when credentials list is empty; variable i was unbound if the for-loop never executed, causing tested_count=i+1 to crash. Initialize i=-1 before the loop. - password_scan/scanner.py: fix backward-compat scan_host() calling scanner.scan_host(host, ports) without the required mac argument, passing ports into the mac parameter and leaving ports missing. --- src/edgewalker/modules/cve_scan/scanner.py | 37 +---------------- .../modules/password_scan/scanner.py | 3 +- tests/test_cve_scan.py | 41 +++++++++++++++++-- tests/test_password_scan.py | 38 +++++++++++++++++ 4 files changed, 79 insertions(+), 40 deletions(-) diff --git a/src/edgewalker/modules/cve_scan/scanner.py b/src/edgewalker/modules/cve_scan/scanner.py index 9370cf2..1082eb3 100644 --- a/src/edgewalker/modules/cve_scan/scanner.py +++ b/src/edgewalker/modules/cve_scan/scanner.py @@ -53,7 +53,7 @@ async def search_cves_async( response = await client.get( settings.nvd_api_url, params=params, headers=headers, timeout=30 ) - if response.status_code == 403: + if response.status_code == 429: # Rate limit hit, wait and retry once await asyncio.sleep(settings.nvd_rate_limit_delay * 2) response = await client.get( @@ -266,41 +266,6 @@ async def _scan_service( version=svc["version"], cves=cves, ) - """Scan a single service for CVEs asynchronously.""" - if self.progress_callback: - self.progress_callback( - "cve_check", - f"Checking {svc['product']} {svc['version']} on {svc['ip']}:{svc['port']}", - ) - - cve_dicts = await search_cves_async( - client, svc["product"], svc["version"], self.verbose, semaphore - ) - - cves = [ - CveModel( - id=c["id"], - description=c.get("description", ""), - severity=c["severity"], - score=c["score"], - ) - for c in cve_dicts - ] - - if cves and self.progress_callback: - self.progress_callback( - "cve_found", - f"{len(cves)} CVE(s) found for {svc['product']} {svc['version']}", - ) - - return CveScanResultModel( - ip=svc["ip"], - port=svc["port"], - service=svc["service"], - product=svc["product"], - version=svc["version"], - cves=cves, - ) def _build_empty_model(self, skipped_no_version: int) -> CveScanModel: return CveScanModel( diff --git a/src/edgewalker/modules/password_scan/scanner.py b/src/edgewalker/modules/password_scan/scanner.py index 28f7ee3..083bcf5 100644 --- a/src/edgewalker/modules/password_scan/scanner.py +++ b/src/edgewalker/modules/password_scan/scanner.py @@ -161,6 +161,7 @@ async def scan(self) -> PasswordScanResultModel: found_cred = None login_status = StatusEnum.failed + i = -1 for i, (user, pw) in enumerate(creds): if self.rich_progress: @@ -661,7 +662,7 @@ async def scan_host( ) -> dict: """Backward compatible scan_host function asynchronously.""" scanner = PasswordScanner(target, top_n, verbose, progress_callback) - results = await scanner.scan_host(host, ports) + results = await scanner.scan_host(host, "", ports) out = {"host": host, "services": {}} for r in results: status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure" diff --git a/tests/test_cve_scan.py b/tests/test_cve_scan.py index d8602cc..52b6efa 100644 --- a/tests/test_cve_scan.py +++ b/tests/test_cve_scan.py @@ -97,15 +97,15 @@ async def test_search_cves_async_real_calls(): @pytest.mark.asyncio async def test_search_cves_async_rate_limit(): - mock_response_403 = MagicMock() - mock_response_403.status_code = 403 + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 mock_response_200 = MagicMock() mock_response_200.status_code = 200 mock_response_200.json.return_value = {"vulnerabilities": []} client = AsyncMock() - client.get.side_effect = [mock_response_403, mock_response_200] + client.get.side_effect = [mock_response_429, mock_response_200] with patch("asyncio.sleep", new_callable=AsyncMock): res = await scanner.search_cves_async(client, "product", "1.0") @@ -217,3 +217,38 @@ async def test_cve_scanner_scan_interface(): await s.scan(hosts=None) mock_scan.assert_called_with([]) + + +@pytest.mark.asyncio +async def test_search_cves_async_rate_limit_uses_429(): + """Bug fix: NVD rate limiting uses HTTP 429, not 403.""" + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 + + mock_response_200 = MagicMock() + mock_response_200.status_code = 200 + mock_response_200.json.return_value = {"vulnerabilities": []} + + client = AsyncMock() + client.get.side_effect = [mock_response_429, mock_response_200] + + with patch("asyncio.sleep", new_callable=AsyncMock): + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 2 + + +@pytest.mark.asyncio +async def test_search_cves_async_403_is_not_retried(): + """Bug fix: 403 Forbidden should not be retried (it's not a rate limit).""" + mock_response_403 = MagicMock() + mock_response_403.status_code = 403 + + client = AsyncMock() + client.get.return_value = mock_response_403 + + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 1 # no retry + mock_sleep.assert_not_called() diff --git a/tests/test_password_scan.py b/tests/test_password_scan.py index b87d897..3d9b397 100644 --- a/tests/test_password_scan.py +++ b/tests/test_password_scan.py @@ -929,3 +929,41 @@ async def test_password_scanner_scan_host_by_name_smb(): ) as mock_scan: await s.scan_host("1.1.1.1", "00:11:22:33:44:55", {"smb": 4455}) mock_scan.assert_called_once() + + +@pytest.mark.asyncio +async def test_async_service_scanner_scan_empty_credentials(): + """Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty.""" + class TestScanner(scanner.AsyncServiceScanner): + def service_name(self): + return "ssh" + + def service_enum(self): + return scanner.ServiceEnum.ssh + + async def attempt_login(self, u, p): + return False, False + + s = TestScanner("1.1.1.1", 22) + with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True): + with patch( + "edgewalker.modules.password_scan.scanner.load_credentials", return_value=[] + ): + res = await s.scan() + # Should complete without UnboundLocalError + assert res.login_attempt == scanner.StatusEnum.failed + + +@pytest.mark.asyncio +async def test_scan_host_backward_compat_passes_mac(): + """Bug fix: backward compat scan_host must pass mac as second positional arg.""" + with patch( + "edgewalker.modules.password_scan.scanner.PasswordScanner.scan_host", new_callable=AsyncMock + ) as mock_scan: + mock_scan.return_value = [] + await scanner.scan_host("1.1.1.1", {"ssh": 22}) + args, kwargs = mock_scan.call_args + # args[0]=host, args[1]=mac, args[2]=ports + assert args[0] == "1.1.1.1" + assert args[1] == "" # mac must be passed explicitly + assert args[2] == {"ssh": 22} From d0c183c6ce081884da856c99cd8a8296dd78aa1b Mon Sep 17 00:00:00 2001 From: Steven Marks Date: Mon, 9 Mar 2026 14:33:03 +0000 Subject: [PATCH 2/2] test: formatting issue in test file --- tests/test_password_scan.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_password_scan.py b/tests/test_password_scan.py index 3d9b397..693ca69 100644 --- a/tests/test_password_scan.py +++ b/tests/test_password_scan.py @@ -934,6 +934,7 @@ async def test_password_scanner_scan_host_by_name_smb(): @pytest.mark.asyncio async def test_async_service_scanner_scan_empty_credentials(): """Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty.""" + class TestScanner(scanner.AsyncServiceScanner): def service_name(self): return "ssh" @@ -946,9 +947,7 @@ async def attempt_login(self, u, p): s = TestScanner("1.1.1.1", 22) with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True): - with patch( - "edgewalker.modules.password_scan.scanner.load_credentials", return_value=[] - ): + with patch("edgewalker.modules.password_scan.scanner.load_credentials", return_value=[]): res = await s.scan() # Should complete without UnboundLocalError assert res.login_attempt == scanner.StatusEnum.failed @@ -965,5 +964,5 @@ async def test_scan_host_backward_compat_passes_mac(): args, kwargs = mock_scan.call_args # args[0]=host, args[1]=mac, args[2]=ports assert args[0] == "1.1.1.1" - assert args[1] == "" # mac must be passed explicitly + assert args[1] == "" # mac must be passed explicitly assert args[2] == {"ssh": 22}