Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/edgewalker/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
print_logo,
)


def apply_colorblind_theme() -> None:
"""Switch the active theme to the colorblind-safe skin."""
# First Party
from edgewalker import theme as _theme
from edgewalker.core.config import settings
settings.theme = "colorblind"
_theme.load_active_theme()

# ============================================================================
# TYPER APP SETUP
# ============================================================================
Expand Down Expand Up @@ -168,6 +177,15 @@ def run_guided_scan(
"-ao",
help="Allow scan to proceed with active configuration overrides.",
),
unprivileged: bool = typer.Option(
False, "--unprivileged", help="Run without sudo using TCP connect scans (macOS/no-root)."
),
verbose: bool = typer.Option(
False, "--verbose", help="Print detailed nmap progress and discovered hosts/ports."
),
colorblind: bool = typer.Option(
False, "--colorblind", help="Use colorblind-safe palette (Okabe-Ito) instead of default theme."
),
) -> None:
"""Run a guided security scan.

Expand Down Expand Up @@ -214,10 +232,13 @@ def run_guided_scan(
)
raise typer.Exit()

if colorblind:
apply_colorblind_theme()

ensure_telemetry_choice()
controller = ScanController()
guided = GuidedScanner(controller)
asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds))
asyncio.run(guided.automatic_mode(full_scan=full, target=target, full_creds=full_creds, unprivileged=unprivileged, verbose=verbose))


@app.command()
Expand Down
4 changes: 2 additions & 2 deletions src/edgewalker/cli/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, scanner_service: Optional[ScannerService] = None) -> None:
self.scanner = scanner_service or ScannerService()

async def run_port_scan(
self, full: bool = False, target: str = None
self, full: bool = False, target: str = None, unprivileged: bool = False, verbose: bool = False
) -> Optional[PortScanModel]:
"""Run port scan and display results asynchronously."""
scan_type = "FULL" if full else "QUICK"
Expand All @@ -50,7 +50,7 @@ async def run_port_scan(

try:
with utils.console.status(f"[bold green]Scanning {target}..."):
results = await self.scanner.perform_port_scan(target=target, full=full)
results = await self.scanner.perform_port_scan(target=target, full=full, unprivileged=unprivileged, verbose=verbose)
except Exception as e:
logger.error(f"Scan failed: {str(e)}")
logger.error("Make sure nmap is installed and you have sudo privileges.")
Expand Down
4 changes: 3 additions & 1 deletion src/edgewalker/cli/guided.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ async def automatic_mode(
full_scan: Optional[bool] = None,
target: Optional[str] = None,
full_creds: bool = False,
unprivileged: bool = False,
verbose: bool = False,
) -> None:
"""Run the automatic guided security assessment asynchronously."""
# Step 1: Choose scan type
Expand All @@ -49,7 +51,7 @@ async def automatic_mode(
# Step 3: Run port scan
utils.console.print()
logger.info(f"Starting {scan_type.lower()} port scan on {target}...")
port_results = await self.controller.run_port_scan(full=full_scan, target=target)
port_results = await self.controller.run_port_scan(full=full_scan, target=target, unprivileged=unprivileged, verbose=verbose)

if not port_results:
logger.error("Port scan failed. Returning to mode selection.")
Expand Down
18 changes: 14 additions & 4 deletions src/edgewalker/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ def get_cache_dir() -> Path:
return Path(os.environ.get("EW_CACHE_DIR", user_cache_dir("edgewalker")))


def get_data_dir() -> Path:
"""Return the user-facing data directory for scan results.

Defaults to ~/.edgewalker so results are easy to find and separate
from the application config in Library/Application Support (macOS)
or ~/.config (Linux). Override with EW_DATA_DIR.
"""
return Path(os.environ.get("EW_DATA_DIR", Path.home() / ".edgewalker"))


# Legacy constants for backward compatibility (evaluated at load time)
# Note: Tests should use EW_CONFIG_DIR/EW_CACHE_DIR env vars BEFORE importing this module
# or we need to ensure they are used dynamically.
Expand Down Expand Up @@ -128,7 +138,7 @@ def validate_urls(cls, v: str, info: ValidationInfo) -> str:
def handle_demo_mode(cls, v: Path) -> Path:
"""Ensure output_dir points to demo_scans when in demo mode."""
if os.environ.get("EW_DEMO_MODE") == "1":
return v.parent / "demo_scans"
return get_data_dir() / "demo_scans"
return v

api_timeout: int = Field(
Expand Down Expand Up @@ -287,11 +297,11 @@ def handle_demo_mode(cls, v: Path) -> Path:
)
output_dir: Path = Field(
default_factory=lambda: (
get_config_dir() / "demo_scans"
get_data_dir() / "demo_scans"
if os.environ.get("EW_DEMO_MODE") == "1"
else get_config_dir() / "scans"
else get_data_dir() / "scans"
),
description="Output directory",
description="Output directory for scan results (~/.edgewalker/scans by default)",
)
creds_file: Path = Field(
default=Path(__file__).parent.parent / "data" / "creds.csv",
Expand Down
6 changes: 3 additions & 3 deletions src/edgewalker/core/scanner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def submit_scan_data(self, module: str, data: dict) -> None:
# No loop running, use sync version
self.telemetry.submit_scan_data_sync(module, data)

async def perform_port_scan(self, target: str, full: bool = False) -> PortScanModel:
async def perform_port_scan(self, target: str, full: bool = False, unprivileged: bool = False, verbose: bool = False) -> PortScanModel:
"""Perform a port scan and return results as a model asynchronously."""
if self.demo_mode and self.demo_service:
return await self.demo_service.perform_port_scan(target, full)
Expand All @@ -94,11 +94,11 @@ async def perform_port_scan(self, target: str, full: bool = False) -> PortScanMo

if full:
results = await port_scan.full_scan(
target=target, verbose=False, progress_callback=self.progress_callback
target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged
)
else:
results = await port_scan.quick_scan(
target=target, verbose=False, progress_callback=self.progress_callback
target=target, verbose=verbose, progress_callback=self.progress_callback, unprivileged=unprivileged
)

if isinstance(results, dict):
Expand Down
37 changes: 1 addition & 36 deletions src/edgewalker/modules/cve_scan/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def search_cves_async(
response = await client.get(
settings.nvd_api_url, params=params, headers=headers, timeout=30
)
if response.status_code == 403:
if response.status_code == 429:
# Rate limit hit, wait and retry once
await asyncio.sleep(settings.nvd_rate_limit_delay * 2)
response = await client.get(
Expand Down Expand Up @@ -266,41 +266,6 @@ async def _scan_service(
version=svc["version"],
cves=cves,
)
"""Scan a single service for CVEs asynchronously."""
if self.progress_callback:
self.progress_callback(
"cve_check",
f"Checking {svc['product']} {svc['version']} on {svc['ip']}:{svc['port']}",
)

cve_dicts = await search_cves_async(
client, svc["product"], svc["version"], self.verbose, semaphore
)

cves = [
CveModel(
id=c["id"],
description=c.get("description", ""),
severity=c["severity"],
score=c["score"],
)
for c in cve_dicts
]

if cves and self.progress_callback:
self.progress_callback(
"cve_found",
f"{len(cves)} CVE(s) found for {svc['product']} {svc['version']}",
)

return CveScanResultModel(
ip=svc["ip"],
port=svc["port"],
service=svc["service"],
product=svc["product"],
version=svc["version"],
cves=cves,
)

def _build_empty_model(self, skipped_no_version: int) -> CveScanModel:
return CveScanModel(
Expand Down
3 changes: 2 additions & 1 deletion src/edgewalker/modules/password_scan/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ async def scan(self) -> PasswordScanResultModel:

found_cred = None
login_status = StatusEnum.failed
i = -1

for i, (user, pw) in enumerate(creds):
if self.rich_progress:
Expand Down Expand Up @@ -661,7 +662,7 @@ async def scan_host(
) -> dict:
"""Backward compatible scan_host function asynchronously."""
scanner = PasswordScanner(target, top_n, verbose, progress_callback)
results = await scanner.scan_host(host, ports)
results = await scanner.scan_host(host, "", ports)
out = {"host": host, "services": {}}
for r in results:
status = "vulnerable" if r.login_attempt == StatusEnum.successful else "secure"
Expand Down
Loading
Loading