Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/edgewalker/modules/cve_scan/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def search_cves_async(
)
logger.debug(f"NVD Response: {response.status_code}")

if response.status_code == 403:
if response.status_code == 429:
# Rate limit hit, wait and retry once
logger.warning(
f"NVD Rate limit hit (403). Waiting {settings.nvd_rate_limit_delay * 2}s..."
Expand Down
3 changes: 2 additions & 1 deletion src/edgewalker/modules/password_scan/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async def scan(self) -> PasswordScanResultModel:

found_cred = None
login_status = StatusEnum.failed
i = -1

for i, (user, pw) in enumerate(creds):
logger.debug(
Expand Down Expand Up @@ -704,7 +705,7 @@ async def scan_host(
) -> dict:
"""Backward compatible scan_host function asynchronously."""
scanner = PasswordScanner(target, top_n, verbose, progress_callback)
results = await scanner.scan_host(host, ports)
results = await scanner.scan_host(host, "", ports)
out = {"host": host, "services": {}}
for r in results:
status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure"
Expand Down
41 changes: 38 additions & 3 deletions tests/test_cve_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ async def test_search_cves_async_real_calls():

@pytest.mark.asyncio
async def test_search_cves_async_rate_limit():
mock_response_403 = MagicMock()
mock_response_403.status_code = 403
mock_response_429 = MagicMock()
mock_response_429.status_code = 429

mock_response_200 = MagicMock()
mock_response_200.status_code = 200
mock_response_200.json.return_value = {"vulnerabilities": []}

client = AsyncMock()
client.get.side_effect = [mock_response_403, mock_response_200]
client.get.side_effect = [mock_response_429, mock_response_200]

with patch("asyncio.sleep", new_callable=AsyncMock):
res = await scanner.search_cves_async(client, "product", "1.0")
Expand Down Expand Up @@ -217,3 +217,38 @@ async def test_cve_scanner_scan_interface():

await s.scan(hosts=None)
mock_scan.assert_called_with([])


@pytest.mark.asyncio
async def test_search_cves_async_rate_limit_uses_429():
"""Bug fix: NVD rate limiting uses HTTP 429, not 403."""
mock_response_429 = MagicMock()
mock_response_429.status_code = 429

mock_response_200 = MagicMock()
mock_response_200.status_code = 200
mock_response_200.json.return_value = {"vulnerabilities": []}

client = AsyncMock()
client.get.side_effect = [mock_response_429, mock_response_200]

with patch("asyncio.sleep", new_callable=AsyncMock):
res = await scanner.search_cves_async(client, "product", "1.0")
assert res == []
assert client.get.call_count == 2


@pytest.mark.asyncio
async def test_search_cves_async_403_is_not_retried():
"""Bug fix: 403 Forbidden should not be retried (it's not a rate limit)."""
mock_response_403 = MagicMock()
mock_response_403.status_code = 403

client = AsyncMock()
client.get.return_value = mock_response_403

with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
res = await scanner.search_cves_async(client, "product", "1.0")
assert res == []
assert client.get.call_count == 1 # no retry
mock_sleep.assert_not_called()
37 changes: 37 additions & 0 deletions tests/test_password_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,3 +929,40 @@ async def test_password_scanner_scan_host_by_name_smb():
) as mock_scan:
await s.scan_host("1.1.1.1", "00:11:22:33:44:55", {"smb": 4455})
mock_scan.assert_called_once()


@pytest.mark.asyncio
async def test_async_service_scanner_scan_empty_credentials():
"""Bug fix: tested_count should be 0, not UnboundLocalError, when creds list is empty."""

class TestScanner(scanner.AsyncServiceScanner):
def service_name(self):
return "ssh"

def service_enum(self):
return scanner.ServiceEnum.ssh

async def attempt_login(self, u, p):
return False, False

s = TestScanner("1.1.1.1", 22)
with patch.object(s, "is_port_open", new_callable=AsyncMock, return_value=True):
with patch("edgewalker.modules.password_scan.scanner.load_credentials", return_value=[]):
res = await s.scan()
# Should complete without UnboundLocalError
assert res.login_attempt == scanner.StatusEnum.failed


@pytest.mark.asyncio
async def test_scan_host_backward_compat_passes_mac():
"""Bug fix: backward compat scan_host must pass mac as second positional arg."""
with patch(
"edgewalker.modules.password_scan.scanner.PasswordScanner.scan_host", new_callable=AsyncMock
) as mock_scan:
mock_scan.return_value = []
await scanner.scan_host("1.1.1.1", {"ssh": 22})
args, kwargs = mock_scan.call_args
# args[0]=host, args[1]=mac, args[2]=ports
assert args[0] == "1.1.1.1"
assert args[1] == "" # mac must be passed explicitly
assert args[2] == {"ssh": 22}