diff --git a/src/edgewalker/modules/cve_scan/scanner.py b/src/edgewalker/modules/cve_scan/scanner.py index 8cc85ff..a5e1e56 100644 --- a/src/edgewalker/modules/cve_scan/scanner.py +++ b/src/edgewalker/modules/cve_scan/scanner.py @@ -53,7 +53,7 @@ async def search_cves_async( ) logger.debug(f"NVD Response: {response.status_code}") - if response.status_code == 403: + if response.status_code == 429: # Rate limit hit, wait and retry once logger.warning( f"NVD Rate limit hit (403). Waiting {settings.nvd_rate_limit_delay * 2}s..." diff --git a/src/edgewalker/modules/password_scan/scanner.py b/src/edgewalker/modules/password_scan/scanner.py index ae2cb6e..735e1d5 100644 --- a/src/edgewalker/modules/password_scan/scanner.py +++ b/src/edgewalker/modules/password_scan/scanner.py @@ -162,6 +162,7 @@ async def scan(self) -> PasswordScanResultModel: found_cred = None login_status = StatusEnum.failed + i = -1 for i, (user, pw) in enumerate(creds): logger.debug( @@ -704,7 +705,7 @@ async def scan_host( ) -> dict: """Backward compatible scan_host function asynchronously.""" scanner = PasswordScanner(target, top_n, verbose, progress_callback) - results = await scanner.scan_host(host, ports) + results = await scanner.scan_host(host, "", ports) out = {"host": host, "services": {}} for r in results: status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure" diff --git a/tests/test_cve_scan.py b/tests/test_cve_scan.py index d8602cc..52b6efa 100644 --- a/tests/test_cve_scan.py +++ b/tests/test_cve_scan.py @@ -97,15 +97,15 @@ async def test_search_cves_async_real_calls(): @pytest.mark.asyncio async def test_search_cves_async_rate_limit(): - mock_response_403 = MagicMock() - mock_response_403.status_code = 403 + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 mock_response_200 = MagicMock() mock_response_200.status_code = 200 mock_response_200.json.return_value = {"vulnerabilities": []} client = AsyncMock() - client.get.side_effect = [mock_response_403, mock_response_200] + client.get.side_effect = [mock_response_429, mock_response_200] with patch("asyncio.sleep", new_callable=AsyncMock): res = await scanner.search_cves_async(client, "product", "1.0") @@ -217,3 +217,38 @@ async def test_cve_scanner_scan_interface(): await s.scan(hosts=None) mock_scan.assert_called_with([]) + + +@pytest.mark.asyncio +async def test_search_cves_async_rate_limit_uses_429(): + """Bug fix: NVD rate limiting uses HTTP 429, not 403.""" + mock_response_429 = MagicMock() + mock_response_429.status_code = 429 + + mock_response_200 = MagicMock() + mock_response_200.status_code = 200 + mock_response_200.json.return_value = {"vulnerabilities": []} + + client = AsyncMock() + client.get.side_effect = [mock_response_429, mock_response_200] + + with patch("asyncio.sleep", new_callable=AsyncMock): + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 2 + + +@pytest.mark.asyncio +async def test_search_cves_async_403_is_not_retried(): + """Bug fix: 403 Forbidden should not be retried (it's not a rate limit).""" + mock_response_403 = MagicMock() + mock_response_403.status_code = 403 + + client = AsyncMock() + client.get.return_value = mock_response_403 + + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + res = await scanner.search_cves_async(client, "product", "1.0") + assert res == [] + assert client.get.call_count == 1 # no retry + mock_sleep.assert_not_called() diff --git a/tests/test_password_scan.py b/tests/test_password_scan.py index b87d897..693ca69 100644 --- a/tests/test_password_scan.py +++ b/tests/test_password_scan.py @@ -929,3 +929,40 @@ async def test_password_scanner_scan_host_by_name_smb(): ) as mock_scan: await s.scan_host("1.1.1.1", "00:11:22:33:44:55", {"smb": 4455}) mock_scan.assert_called_once() + + +@pytest.mark.asyncio +async def test_async_service_scanner_scan_empty_credentials(): + """Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty.""" + + class TestScanner(scanner.AsyncServiceScanner): + def service_name(self): + return "ssh" + + def service_enum(self): + return scanner.ServiceEnum.ssh + + async def attempt_login(self, u, p): + return False, False + + s = TestScanner("1.1.1.1", 22) + with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True): + with patch("edgewalker.modules.password_scan.scanner.load_credentials", return_value=[]): + res = await s.scan() + # Should complete without UnboundLocalError + assert res.login_attempt == scanner.StatusEnum.failed + + +@pytest.mark.asyncio +async def test_scan_host_backward_compat_passes_mac(): + """Bug fix: backward compat scan_host must pass mac as second positional arg.""" + with patch( + "edgewalker.modules.password_scan.scanner.PasswordScanner.scan_host", new_callable=AsyncMock + ) as mock_scan: + mock_scan.return_value = [] + await scanner.scan_host("1.1.1.1", {"ssh": 22}) + args, kwargs = mock_scan.call_args + # args[0]=host, args[1]=mac, args[2]=ports + assert args[0] == "1.1.1.1" + assert args[1] == "" # mac must be passed explicitly + assert args[2] == {"ssh": 22}