From 5f31b2cdc02e29eae9b49b98044c972a6e50377c Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Tue, 12 May 2026 20:13:36 -0700 Subject: [PATCH] vulnerability: fix search payload key + complete extension coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CLI's search dict was sending {"search": , ...} but the extension's parseSearchOp reads s["field"] — so every --search-* call was silently dropped server-side. Switch to {"field": ...} and update the assertion that locked in the wrong shape. Round out the rest of the surface the extension exposes: - new flags: --include-enrichment, --filter-via-state on the list commands; --normalized-package-name on cve hosts; --rollup-subpackages on host packages; --include-enrichment on cve get / cve packages - new subcommands: - vuln cve epss-history (query_epss_history) - vuln finding resolve / bulk-resolve / list / reset (set_finding_resolution, bulk_set_finding_resolution, list_finding_resolutions, reset_asset_findings) - vuln snapshot list (query_daily_snapshots) - help text now mentions lc_risk as a valid --sort-by on cve list and host packages Co-Authored-By: Claude Opus 4.7 (1M context) --- limacharlie/commands/vulnerability.py | 425 ++++++++++++++++-- limacharlie/sdk/vulnerability.py | 236 +++++++++- .../unit/test_cli_lazy_loading_regression.py | 4 +- tests/unit/test_cli_vulnerability.py | 343 +++++++++++++- tests/unit/test_sdk_vulnerability.py | 136 +++++- 5 files changed, 1090 insertions(+), 54 deletions(-) diff --git a/limacharlie/commands/vulnerability.py b/limacharlie/commands/vulnerability.py index c0bf4a6..fa3d912 100644 --- a/limacharlie/commands/vulnerability.py +++ b/limacharlie/commands/vulnerability.py @@ -2,10 +2,11 @@ Commands for querying the ``ext-vulnerability-reporting`` extension: list CVEs, list affected endpoints, get the dashboard, drill into a -single host or CVE, and trigger an on-demand scan of a sensor. +single host or CVE, trigger an on-demand scan, manage finding +resolutions, and read daily / EPSS time-series. Subscription management lives under ``limacharlie extension``; this -module is purely the user-facing query surface. +module is purely the user-facing query + write surface. """ from __future__ import annotations @@ -41,8 +42,8 @@ limacharlie vulnerability scan --sid -Once data has flowed in, use the cve / host / dashboard subcommands -to query it. +Once data has flowed in, use the cve / host / dashboard / finding / +snapshot subcommands to query it. """ _EXPLAIN_SCAN = """\ @@ -80,7 +81,7 @@ --search-field cve --search-op contains --search-value CVE-2025 -Sort by 'cve', 'count', or 'severity'. +Sort by 'cve', 'count', 'severity', or 'lc_risk'. Examples: limacharlie vulnerability cve list @@ -92,7 +93,9 @@ Return raw details for a single CVE id. The response shape is the upstream NVD record (descriptions, CVSS -metrics, weaknesses, references, configurations). +metrics, weaknesses, references, configurations). KEV / EPSS / +exploit-ref enrichment is attached by default; pass +``--no-include-enrichment`` for cheap admin lookups. Examples: limacharlie vulnerability cve get CVE-2021-44228 @@ -104,6 +107,11 @@ Sort by 'hostname' (default) or 'platform_string'. +When you already know the normalized package name (e.g. you reached +this view by drilling from cve packages), pass it via +``--normalized-package-name`` so the resolution overlay can read +host-scope rows too — without it, only org-scope state hits land. + Examples: limacharlie vulnerability cve hosts CVE-2021-44228 limacharlie vulnerability cve hosts CVE-2021-44228 --include-tags @@ -121,6 +129,17 @@ limacharlie vulnerability cve packages CVE-2021-44228 --sort-by package_name --sort-asc """ +_EXPLAIN_CVE_EPSS_HISTORY = """\ +Return per-day EPSS score / percentile rows for one CVE. + +Used to render a sparkline of how the exploit-prediction score has +moved over time. Result is ordered snapshot_date ASC. + +Examples: + limacharlie vulnerability cve epss-history CVE-2024-1234 + limacharlie vulnerability cve epss-history CVE-2024-1234 --days 30 +""" + _EXPLAIN_HOST_LIST = """\ List endpoints with their vulnerability counts. @@ -134,22 +153,103 @@ _EXPLAIN_HOST_PACKAGES = """\ List vulnerable packages and their CVEs on a single host. -Sort by 'cve' (default), 'score', 'severity', 'package_name', or -'package_name_package_version_cve'. +Sort by 'cve' (default), 'score', 'severity', 'lc_risk', +'package_name', or 'package_name_package_version_cve'. + +Pass ``--rollup-subpackages`` to collapse rows for binary packages +built from the same source-package family (e.g. the vim / openssh / +systemd binaries co-installed from one source) into a single row +per (root, version, CVE). Examples: limacharlie vulnerability host packages 11111111-2222-3333-4444-555555555555 limacharlie vulnerability host packages --sort-by score """ +_EXPLAIN_FINDING_RESOLVE = """\ +Set or clear the resolution on a single finding. + +Identify the finding either by ``--fingerprint`` OR by +``--cve`` + ``--normalized-package-name`` (+ ``--sid`` when +``--scope host``). Pass ``--reopen`` (or omit ``--resolution``) to +remove the overlay row and let the finding revert to implicit +"open". + +Examples: + # Mark a chrome CVE accepted org-wide for 90 days + limacharlie vulnerability finding resolve --scope org \\ + --cve CVE-2024-1234 --normalized-package-name chrome \\ + --resolution accepted --expires-at 2026-08-12T00:00:00Z + + # Reopen the same finding + limacharlie vulnerability finding resolve --scope org \\ + --cve CVE-2024-1234 --normalized-package-name chrome --reopen +""" + +_EXPLAIN_FINDING_BULK_RESOLVE = """\ +Apply one resolution to up to 100 findings in a single call. + +``--targets-json`` takes a JSON array; each entry needs ``scope`` + +either ``fingerprint`` OR ``cve`` + ``normalized_package_name`` +(+ ``sid`` for host scope). Per-target failures are returned +alongside successes so partial successes are observable. + +Example: + limacharlie vulnerability finding bulk-resolve \\ + --resolution mitigated \\ + --targets-json '[{"scope":"org","fingerprint":"deadbeef..."}, + {"scope":"host","cve":"CVE-2024-1","normalized_package_name":"openssl","sid":""}]' +""" + +_EXPLAIN_FINDING_LIST = """\ +Page through the org's resolved findings (rows in vuln_finding_state). + +Only resolved rows are stored; an absent row means the finding is +implicitly open. Filters stack as AND. + +Examples: + limacharlie vulnerability finding list + limacharlie vulnerability finding list --scope org --resolution accepted +""" + +_EXPLAIN_FINDING_RESET = """\ +Wipe every stored finding for one sensor. + +Use this when a host has been reformatted / reimaged or +decommissioned and the existing findings no longer reflect reality. +Org-scope fingerprints that were only on this sensor fire +vuln_finding.closed events. The next package scan repopulates +findings from scratch. + +Example: + limacharlie vulnerability finding reset --sid 11111111-2222-3333-4444-555555555555 +""" + +_EXPLAIN_SNAPSHOT_LIST = """\ +Read the daily open-finding burndown counts. + +Returns persisted per-day per-severity counts (and the KEV subset) +ordered (snapshot_date ASC, severity ASC). + +Examples: + limacharlie vulnerability snapshot list + limacharlie vulnerability snapshot list --days 90 --severity critical --severity high +""" + register_explain("vulnerability.scan", _EXPLAIN_SCAN) register_explain("vulnerability.dashboard", _EXPLAIN_DASHBOARD) register_explain("vulnerability.cve.list", _EXPLAIN_CVE_LIST) register_explain("vulnerability.cve.get", _EXPLAIN_CVE_GET) register_explain("vulnerability.cve.hosts", _EXPLAIN_CVE_HOSTS) register_explain("vulnerability.cve.packages", _EXPLAIN_CVE_PACKAGES) +register_explain("vulnerability.cve.epss-history", _EXPLAIN_CVE_EPSS_HISTORY) register_explain("vulnerability.host.list", _EXPLAIN_HOST_LIST) register_explain("vulnerability.host.packages", _EXPLAIN_HOST_PACKAGES) +register_explain("vulnerability.finding.resolve", _EXPLAIN_FINDING_RESOLVE) +register_explain("vulnerability.finding.bulk-resolve", _EXPLAIN_FINDING_BULK_RESOLVE) +register_explain("vulnerability.finding.list", _EXPLAIN_FINDING_LIST) +register_explain("vulnerability.finding.reset", _EXPLAIN_FINDING_RESET) +register_explain("vulnerability.snapshot.list", _EXPLAIN_SNAPSHOT_LIST) # --------------------------------------------------------------------------- @@ -220,8 +320,10 @@ def _build_search( ) -> dict[str, Any] | None: """Build the search dict expected by the extension. - The extension shape is ``{"search": , "op": "is|contains", - "value": }``. All three flags must be set together, or none. + The extension shape is ``{"field": , "op": "is|contains", + "value": }`` — see parseSearchOp in + ext-vulnerability-reporting/ext/datasource.go which reads + ``s["field"]``. All three flags must be set together, or none. """ provided = [x is not None for x in (field, op, value)] if not any(provided): @@ -230,7 +332,7 @@ def _build_search( raise click.UsageError( "--search-field, --search-op, and --search-value must be used together", ) - return {"search": field, "op": op, "value": value} + return {"field": field, "op": op, "value": value} # --------------------------------------------------------------------------- @@ -238,6 +340,13 @@ def _build_search( # --------------------------------------------------------------------------- _SEARCH_OP_CHOICES = click.Choice(["is", "contains"], case_sensitive=False) +_SCOPE_CHOICES = click.Choice(["org", "host"], case_sensitive=False) +_RESOLUTION_CHOICES = click.Choice( + ["mitigated", "accepted", "false_positive"], case_sensitive=False, +) +_SEVERITY_CHOICES = click.Choice( + ["critical", "high", "medium", "low"], case_sensitive=False, +) def _common_query_options(f): @@ -246,6 +355,10 @@ def _common_query_options(f): Order matters in Click: decorators stack bottom-up, so the option that should appear first in --help is applied last. """ + f = click.option( + "--filter-via-state/--no-filter-via-state", default=None, + help="Apply state-overlay filters (default true server-side).", + )(f) f = click.option( "--include-tags", is_flag=True, default=False, help="Include sensor tags on each returned row.", @@ -289,6 +402,13 @@ def _common_query_options(f): return f +def _enrichment_option(f): + return click.option( + "--include-enrichment/--no-include-enrichment", default=None, + help="Attach KEV / EPSS data (default true server-side).", + )(f) + + # --------------------------------------------------------------------------- # Group # --------------------------------------------------------------------------- @@ -298,18 +418,25 @@ def group() -> None: """Query vulnerability data (ext-vulnerability-reporting). Provides CVE, host, and dashboard views over the per-org - vulnerability index, plus an on-demand scan trigger. + vulnerability index, plus an on-demand scan trigger, finding + resolution overlay, and burndown / EPSS time-series. \b Subgroups / commands: - scan Trigger an on-demand os_packages scan - dashboard Per-org vulnerability dashboard graphs - cve list List CVEs observed in the org - cve get Get raw CVE details - cve hosts Hosts affected by a CVE - cve packages Packages affected by a CVE - host list Endpoints with vulnerability counts - host packages Vulnerable packages on a single host + scan Trigger an on-demand os_packages scan + dashboard Per-org vulnerability dashboard graphs + cve list List CVEs observed in the org + cve get Get raw CVE details + cve hosts Hosts affected by a CVE + cve packages Packages affected by a CVE + cve epss-history Per-day EPSS history for one CVE + host list Endpoints with vulnerability counts + host packages Vulnerable packages on a single host + finding resolve Set/clear resolution on a finding + finding bulk-resolve Bulk-apply a resolution + finding list List resolved findings + finding reset Wipe all findings for a sensor + snapshot list Daily open-finding burndown counts """ @@ -357,18 +484,21 @@ def dashboard(ctx, sort_asc) -> None: @group.group("cve") def cve_group() -> None: - """CVE-centric views: list, get, hosts affected, packages affected.""" + """CVE-centric views: list, get, hosts affected, packages affected, EPSS history.""" @cve_group.command("list") @_common_query_options +@_enrichment_option @pass_context def cve_list(ctx, cursor, limit, sort_by, sort_asc, filter_pairs, filters_json, - search_field, search_op, search_value, include_tags) -> None: + search_field, search_op, search_value, + include_tags, filter_via_state, + include_enrichment) -> None: """List CVEs observed across the org. - Valid --sort-by values: cve, count, severity. + Valid --sort-by values: cve, count, severity, lc_risk. \b Examples: @@ -384,14 +514,17 @@ def cve_list(ctx, cursor, limit, sort_by, sort_asc, filters=_parse_filters(filter_pairs, filters_json), search=_build_search(search_field, search_op, search_value), include_tags=include_tags or None, + include_enrichment=include_enrichment, + filter_via_state=filter_via_state, ) _output(ctx, data) @cve_group.command("get") @click.argument("cve_id") +@_enrichment_option @pass_context -def cve_get(ctx, cve_id) -> None: +def cve_get(ctx, cve_id, include_enrichment) -> None: """Return raw details for CVE_ID (e.g. CVE-2021-44228). \b @@ -399,17 +532,23 @@ def cve_get(ctx, cve_id) -> None: limacharlie vulnerability cve get CVE-2021-44228 """ v = _get_vuln(ctx) - data = v.get_cve(cve_id) + data = v.get_cve(cve_id, include_enrichment=include_enrichment) _output(ctx, data) @cve_group.command("hosts") @click.argument("cve_id") @_common_query_options +@click.option( + "--normalized-package-name", default=None, + help="Normalized package name; when set, host-scope resolution rows are read.", +) @pass_context def cve_hosts(ctx, cve_id, cursor, limit, sort_by, sort_asc, filter_pairs, filters_json, - search_field, search_op, search_value, include_tags) -> None: + search_field, search_op, search_value, + include_tags, filter_via_state, + normalized_package_name) -> None: """List hosts affected by CVE_ID. Valid --sort-by values: hostname (default), platform_string. @@ -428,6 +567,8 @@ def cve_hosts(ctx, cve_id, cursor, limit, sort_by, sort_asc, filters=_parse_filters(filter_pairs, filters_json), search=_build_search(search_field, search_op, search_value), include_tags=include_tags or None, + filter_via_state=filter_via_state, + normalized_package_name=normalized_package_name, ) _output(ctx, data) @@ -439,8 +580,10 @@ def cve_hosts(ctx, cve_id, cursor, limit, sort_by, sort_asc, @click.option("--sort-by", default=None, help="Field to sort by: count (default), package_name, package_version.") @click.option("--sort-asc", is_flag=True, default=False, help="Sort ascending.") +@_enrichment_option @pass_context -def cve_packages(ctx, cve_id, cursor, limit, sort_by, sort_asc) -> None: +def cve_packages(ctx, cve_id, cursor, limit, sort_by, sort_asc, + include_enrichment) -> None: """List (package_name, package_version) pairs affected by CVE_ID. \b @@ -454,10 +597,30 @@ def cve_packages(ctx, cve_id, cursor, limit, sort_by, sort_asc) -> None: limit=limit, sort_by=sort_by, sort_asc=sort_asc or None, + include_enrichment=include_enrichment, ) _output(ctx, data) +@cve_group.command("epss-history") +@click.argument("cve_id") +@click.option( + "--days", default=None, type=int, + help="Days back from today (default 90, max 365).", +) +@pass_context +def cve_epss_history(ctx, cve_id, days) -> None: + """Per-day EPSS score / percentile history for CVE_ID. + + \b + Example: + limacharlie vulnerability cve epss-history CVE-2024-1234 --days 30 + """ + v = _get_vuln(ctx) + data = v.query_epss_history(cve_id, days=days) + _output(ctx, data) + + # --------------------------------------------------------------------------- # host subgroup # --------------------------------------------------------------------------- @@ -472,7 +635,8 @@ def host_group() -> None: @pass_context def host_list(ctx, cursor, limit, sort_by, sort_asc, filter_pairs, filters_json, - search_field, search_op, search_value, include_tags) -> None: + search_field, search_op, search_value, + include_tags, filter_via_state) -> None: """List endpoints with their vulnerability counts. Valid --sort-by values: hostname, platform_string, count, severity. @@ -490,6 +654,7 @@ def host_list(ctx, cursor, limit, sort_by, sort_asc, filters=_parse_filters(filter_pairs, filters_json), search=_build_search(search_field, search_op, search_value), include_tags=include_tags or None, + filter_via_state=filter_via_state, ) _output(ctx, data) @@ -497,14 +662,21 @@ def host_list(ctx, cursor, limit, sort_by, sort_asc, @host_group.command("packages") @click.argument("sid") @_common_query_options +@_enrichment_option +@click.option( + "--rollup-subpackages/--no-rollup-subpackages", default=None, + help="Collapse binaries from the same source package family into one row.", +) @pass_context def host_packages(ctx, sid, cursor, limit, sort_by, sort_asc, filter_pairs, filters_json, - search_field, search_op, search_value, include_tags) -> None: + search_field, search_op, search_value, + include_tags, filter_via_state, + include_enrichment, rollup_subpackages) -> None: """List vulnerable packages and their CVEs on host SID. - Valid --sort-by values: cve (default), score, severity, package_name, - package_name_package_version_cve. + Valid --sort-by values: cve (default), score, severity, lc_risk, + package_name, package_name_package_version_cve. \b Example: @@ -520,5 +692,196 @@ def host_packages(ctx, sid, cursor, limit, sort_by, sort_asc, filters=_parse_filters(filter_pairs, filters_json), search=_build_search(search_field, search_op, search_value), include_tags=include_tags or None, + include_enrichment=include_enrichment, + filter_via_state=filter_via_state, + rollup_subpackages=rollup_subpackages, + ) + _output(ctx, data) + + +# --------------------------------------------------------------------------- +# finding subgroup +# --------------------------------------------------------------------------- + +@group.group("finding") +def finding_group() -> None: + """Finding resolution overlay: resolve, bulk-resolve, list, reset.""" + + +@finding_group.command("resolve") +@click.option("--scope", type=_SCOPE_CHOICES, required=True, + help="Resolution scope: org (org-wide) or host (single sid).") +@click.option("--fingerprint", default=None, + help="Fingerprint hex (alternative to --cve + --normalized-package-name).") +@click.option("--cve", default=None, + help="CVE id (when --fingerprint is not supplied).") +@click.option("--normalized-package-name", default=None, + help="Normalized package name (when --fingerprint is not supplied).") +@click.option("--sid", default=None, + help="Sensor id (required when --scope host and --fingerprint not supplied).") +@click.option("--resolution", type=_RESOLUTION_CHOICES, default=None, + help="Resolution to apply. Omit (or pass --reopen) to clear.") +@click.option("--reopen", is_flag=True, default=False, + help="Explicitly reopen the finding (delete the overlay row).") +@click.option("--expires-at", default=None, + help="RFC3339 expiry; only meaningful with --resolution accepted.") +@click.option("--case-number", type=int, default=None, + help="Optional ext-cases linkage.") +@pass_context +def finding_resolve(ctx, scope, fingerprint, cve, normalized_package_name, sid, + resolution, reopen, expires_at, case_number) -> None: + """Set or clear the resolution on a single finding. + + \b + Example: + limacharlie vulnerability finding resolve --scope org \\ + --cve CVE-2024-1234 --normalized-package-name chrome \\ + --resolution accepted --expires-at 2026-08-12T00:00:00Z + """ + if reopen and resolution is not None: + raise click.UsageError("--reopen is incompatible with --resolution") + if fingerprint is None and (cve is None or normalized_package_name is None): + raise click.UsageError( + "must supply --fingerprint OR (--cve + --normalized-package-name)", + ) + if scope == "host" and fingerprint is None and sid is None: + raise click.UsageError("--scope host requires --sid (or --fingerprint)") + v = _get_vuln(ctx) + data = v.set_finding_resolution( + scope=scope, + fingerprint=fingerprint, + cve=cve, + normalized_package_name=normalized_package_name, + sid=sid, + resolution=None if reopen else resolution, + expires_at=expires_at, + case_number=case_number, + ) + _output(ctx, data) + + +@finding_group.command("bulk-resolve") +@click.option("--targets-json", required=True, + help='JSON array of targets. Each: {"scope": "org|host", ' + '"fingerprint": "..."} or {"scope": "...", "cve": "...", ' + '"normalized_package_name": "...", "sid": "..."}.') +@click.option("--resolution", type=_RESOLUTION_CHOICES, default=None, + help="Resolution to apply. Omit (or pass --reopen) to clear all.") +@click.option("--reopen", is_flag=True, default=False, + help="Explicitly reopen every target (delete the overlay rows).") +@click.option("--expires-at", default=None, + help="RFC3339 expiry; only meaningful with --resolution accepted.") +@click.option("--case-number", type=int, default=None, + help="Optional ext-cases linkage.") +@pass_context +def finding_bulk_resolve(ctx, targets_json, resolution, reopen, + expires_at, case_number) -> None: + """Apply one resolution to up to 100 findings. + + \b + Example: + limacharlie vulnerability finding bulk-resolve \\ + --resolution mitigated \\ + --targets-json '[{"scope":"org","fingerprint":"..."}]' + """ + if reopen and resolution is not None: + raise click.UsageError("--reopen is incompatible with --resolution") + try: + targets = json.loads(targets_json) + except json.JSONDecodeError as exc: + raise click.BadParameter(f"invalid JSON: {exc}", param_hint="--targets-json") + if not isinstance(targets, list): + raise click.BadParameter( + "must decode to a JSON array of target objects", + param_hint="--targets-json", + ) + v = _get_vuln(ctx) + data = v.bulk_set_finding_resolution( + targets, + resolution=None if reopen else resolution, + expires_at=expires_at, + case_number=case_number, + ) + _output(ctx, data) + + +@finding_group.command("list") +@click.option("--scope", type=_SCOPE_CHOICES, default=None, + help="Filter by scope.") +@click.option("--resolution", "resolutions", type=_RESOLUTION_CHOICES, + multiple=True, + help="Filter by resolution; repeat for multiple.") +@click.option("--cursor", default=None, help="Pagination cursor.") +@click.option("--limit", type=int, default=None, + help="Page size (default 100, max 1000).") +@pass_context +def finding_list(ctx, scope, resolutions, cursor, limit) -> None: + """Page through the org's resolved findings. + + \b + Example: + limacharlie vulnerability finding list --scope org --resolution accepted + """ + v = _get_vuln(ctx) + data = v.list_finding_resolutions( + scope=scope, + resolutions=list(resolutions) if resolutions else None, + cursor=cursor, + limit=limit, + ) + _output(ctx, data) + + +@finding_group.command("reset") +@click.option("--sid", required=True, help="Sensor ID whose findings to wipe.") +@click.confirmation_option( + prompt="This wipes every stored finding for the sensor. Continue?", +) +@pass_context +def finding_reset(ctx, sid) -> None: + """Wipe every stored finding for one sensor. + + Use after reformat / reimage / decommission. The next scan + repopulates findings from scratch. + + \b + Example: + limacharlie vulnerability finding reset --sid 11111111-... + """ + v = _get_vuln(ctx) + data = v.reset_asset_findings(sid) + _output(ctx, data) + + +# --------------------------------------------------------------------------- +# snapshot subgroup +# --------------------------------------------------------------------------- + +@group.group("snapshot") +def snapshot_group() -> None: + """Daily burndown snapshots of open-finding counts.""" + + +@snapshot_group.command("list") +@click.option( + "--days", default=None, type=int, + help="Days back from today (default 30, max 365).", +) +@click.option( + "--severity", "severities", type=_SEVERITY_CHOICES, multiple=True, + help="Severity filter; repeat for multiple. Defaults to all four.", +) +@pass_context +def snapshot_list(ctx, days, severities) -> None: + """Read daily per-severity open-finding counts. + + \b + Example: + limacharlie vulnerability snapshot list --days 90 --severity critical --severity high + """ + v = _get_vuln(ctx) + data = v.query_daily_snapshots( + days=days, + severities=list(severities) if severities else None, ) _output(ctx, data) diff --git a/limacharlie/sdk/vulnerability.py b/limacharlie/sdk/vulnerability.py index 15296ea..48886bd 100644 --- a/limacharlie/sdk/vulnerability.py +++ b/limacharlie/sdk/vulnerability.py @@ -2,8 +2,8 @@ Wraps the ext-vulnerability-reporting extension. The extension keeps a per-org index of OS packages reported by sensors and joins it against -a CVE database, giving per-CVE / per-host vulnerability views and a -dashboard summary. +a CVE database, giving per-CVE / per-host vulnerability views, a +dashboard summary, and per-finding resolution state. The extension exposes a small set of RPC actions over the standard extension request channel; this module is a thin typed shim around @@ -31,6 +31,8 @@ def _build_query( filters: dict[str, list[str]] | None = None, search: dict[str, Any] | None = None, include_tags: bool | None = None, + include_enrichment: bool | None = None, + filter_via_state: bool | None = None, ) -> dict[str, Any]: """Assemble the optional query parameters shared by the list endpoints. @@ -52,6 +54,10 @@ def _build_query( data["search"] = search if include_tags is not None: data["include_tags"] = include_tags + if include_enrichment is not None: + data["include_enrichment"] = include_enrichment + if filter_via_state is not None: + data["filter_via_state"] = filter_via_state return data @@ -79,17 +85,21 @@ def query_cves( filters: dict[str, list[str]] | None = None, search: dict[str, Any] | None = None, include_tags: bool | None = None, + include_enrichment: bool | None = None, + filter_via_state: bool | None = None, ) -> dict[str, Any]: """List CVEs observed across the org. Args: cursor: Pagination cursor returned by a previous call. limit: Maximum number of rows to return (default 100). - sort_by: One of ``cve``, ``count``, ``severity``. + sort_by: One of ``cve``, ``count``, ``severity``, ``lc_risk``. sort_asc: Sort ascending instead of descending. filters: ``{field: [values]}`` filter map (e.g. ``{"severity": ["HIGH","CRITICAL"]}``). - search: ``{"search": field, "op": "is|contains", "value": str}`` substring search. + search: ``{"field": , "op": "is|contains", "value": }`` substring search. include_tags: Include sensor tags on returned rows. + include_enrichment: Attach KEV / EPSS data (default true server-side). + filter_via_state: Apply state-based filters (default true server-side). Returns: ``{"results": [...], "next_cursor": str, "total_return_count": int}``. @@ -106,17 +116,27 @@ def query_cves( filters=filters, search=search, include_tags=include_tags, + include_enrichment=include_enrichment, + filter_via_state=filter_via_state, ), unwrap=True, ) - def get_cve(self, cve_id: str) -> dict[str, Any]: + def get_cve( + self, + cve_id: str, + *, + include_enrichment: bool | None = None, + ) -> dict[str, Any]: """Return raw details for a single CVE id (e.g. ``CVE-2021-44228``).""" + data: dict[str, Any] = {"cve_id": cve_id} + if include_enrichment is not None: + data["include_enrichment"] = include_enrichment ext = Extensions(self._org) return ext.request( _EXTENSION_NAME, "query_cve", - data={"cve_id": cve_id}, + data=data, unwrap=True, ) @@ -131,6 +151,8 @@ def query_cve_hosts( filters: dict[str, list[str]] | None = None, search: dict[str, Any] | None = None, include_tags: bool | None = None, + filter_via_state: bool | None = None, + normalized_package_name: str | None = None, ) -> dict[str, Any]: """List hosts in the org affected by ``cve``. @@ -139,6 +161,9 @@ def query_cve_hosts( cursor, limit, sort_by, sort_asc: pagination + ordering. ``sort_by`` accepts ``hostname`` (default) or ``platform_string``. filters, search, include_tags: see :meth:`query_cves`. + filter_via_state: Apply state-based filters (default true). + normalized_package_name: When set, the resolution overlay reads + host-scope rows; otherwise org-scope only. Returns: ``{"hosts": [...], "cursor": str, "total": int}``. @@ -151,8 +176,11 @@ def query_cve_hosts( filters=filters, search=search, include_tags=include_tags, + filter_via_state=filter_via_state, ) data["cve"] = cve + if normalized_package_name is not None: + data["normalized_package_name"] = normalized_package_name ext = Extensions(self._org) return ext.request(_EXTENSION_NAME, "query_cve_vuln_hosts", data=data, unwrap=True) @@ -164,6 +192,7 @@ def query_cve_packages( limit: int | None = None, sort_by: str | None = None, sort_asc: bool | None = None, + include_enrichment: bool | None = None, ) -> dict[str, Any]: """List ``(package_name, package_version)`` pairs in the org affected by ``cve``. @@ -182,6 +211,8 @@ def query_cve_packages( data["sort_by"] = sort_by if sort_asc is not None: data["sort_asc"] = sort_asc + if include_enrichment is not None: + data["include_enrichment"] = include_enrichment ext = Extensions(self._org) return ext.request(_EXTENSION_NAME, "query_cve_vuln_packages", data=data, unwrap=True) @@ -199,6 +230,7 @@ def query_endpoints( filters: dict[str, list[str]] | None = None, search: dict[str, Any] | None = None, include_tags: bool | None = None, + filter_via_state: bool | None = None, ) -> dict[str, Any]: """List endpoints with vulnerability counts. @@ -220,6 +252,7 @@ def query_endpoints( filters=filters, search=search, include_tags=include_tags, + filter_via_state=filter_via_state, ), unwrap=True, ) @@ -235,10 +268,13 @@ def query_host_packages( filters: dict[str, list[str]] | None = None, search: dict[str, Any] | None = None, include_tags: bool | None = None, + include_enrichment: bool | None = None, + filter_via_state: bool | None = None, + rollup_subpackages: bool | None = None, ) -> dict[str, Any]: """List vulnerable packages and their CVEs on a single host. - ``sort_by`` accepts ``cve``, ``score``, ``severity``, + ``sort_by`` accepts ``cve``, ``score``, ``severity``, ``lc_risk``, ``package_name``, or ``package_name_package_version_cve``. Returns: @@ -252,8 +288,12 @@ def query_host_packages( filters=filters, search=search, include_tags=include_tags, + include_enrichment=include_enrichment, + filter_via_state=filter_via_state, ) data["sid"] = sid + if rollup_subpackages is not None: + data["rollup_subpackages"] = rollup_subpackages ext = Extensions(self._org) return ext.request(_EXTENSION_NAME, "query_host_vuln_packages", data=data, unwrap=True) @@ -286,3 +326,185 @@ def scan(self, sid: str) -> dict[str, Any]: return ext.request( _EXTENSION_NAME, "scan_packages", data={"sid": sid}, unwrap=True, ) + + # ------------------------------------------------------------------ + # Finding resolution overlay + # ------------------------------------------------------------------ + + def set_finding_resolution( + self, + *, + scope: str, + fingerprint: str | None = None, + cve: str | None = None, + normalized_package_name: str | None = None, + sid: str | None = None, + resolution: str | None = None, + expires_at: str | None = None, + case_number: int | None = None, + ) -> dict[str, Any]: + """Set or clear the resolution on a single finding. + + Either ``fingerprint`` OR (``cve`` + ``normalized_package_name`` + [+ ``sid`` for ``scope=host``]) must be supplied. + + ``resolution`` is one of ``mitigated``, ``accepted``, + ``false_positive``. Passing ``None`` reopens the finding (deletes + the overlay row). + + ``expires_at`` is RFC3339 and only meaningful when + ``resolution=accepted``. + """ + data: dict[str, Any] = {"scope": scope} + if fingerprint is not None: + data["fingerprint"] = fingerprint + if cve is not None: + data["cve"] = cve + if normalized_package_name is not None: + data["normalized_package_name"] = normalized_package_name + if sid is not None: + data["sid"] = sid + if resolution is not None: + data["resolution"] = resolution + if expires_at is not None: + data["expires_at"] = expires_at + if case_number is not None: + data["case_number"] = case_number + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "set_finding_resolution", data=data, unwrap=True, + ) + + def bulk_set_finding_resolution( + self, + targets: list[dict[str, Any]], + *, + resolution: str | None = None, + expires_at: str | None = None, + case_number: int | None = None, + ) -> dict[str, Any]: + """Apply one resolution to up to 100 findings. + + Args: + targets: List of finding identifiers. Each entry needs + ``scope`` + (``fingerprint`` OR + ``cve`` + ``normalized_package_name`` [+ ``sid`` for host]). + resolution: ``mitigated`` / ``accepted`` / ``false_positive``. + ``None`` reopens every target. + expires_at: RFC3339 expiry (only meaningful when accepted). + case_number: Optional ext-cases linkage. + + Returns: + ``{"applied": int, "errors": [...], "results": [...]}``. + """ + data: dict[str, Any] = {"targets": list(targets)} + if resolution is not None: + data["resolution"] = resolution + if expires_at is not None: + data["expires_at"] = expires_at + if case_number is not None: + data["case_number"] = case_number + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "bulk_set_finding_resolution", data=data, unwrap=True, + ) + + def list_finding_resolutions( + self, + *, + scope: str | None = None, + resolutions: list[str] | None = None, + cursor: str | None = None, + limit: int | None = None, + ) -> dict[str, Any]: + """Page through the org's resolved findings. + + Filters stack as AND. ``resolutions``, when set, must be a subset + of ``{mitigated, accepted, false_positive}``. + + Returns: + ``{"resolutions": [...], "next_cursor": str}``. + """ + data: dict[str, Any] = {} + if scope is not None: + data["scope"] = scope + if resolutions is not None: + data["resolutions"] = resolutions + if cursor is not None: + data["cursor"] = cursor + if limit is not None: + data["limit"] = limit + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "list_finding_resolutions", data=data, unwrap=True, + ) + + def reset_asset_findings(self, sid: str) -> dict[str, Any]: + """Wipe every stored finding for one sensor. + + Use when the host was reformatted/reimaged or decommissioned and + the existing findings no longer reflect reality. Org-scope + fingerprints that were only on this sensor fire + ``vuln_finding.closed`` events. The next package scan repopulates + findings from scratch. + + Returns: + ``{"sid": str, "closed": int}``. + """ + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "reset_asset_findings", data={"sid": sid}, unwrap=True, + ) + + # ------------------------------------------------------------------ + # Daily snapshots / EPSS history + # ------------------------------------------------------------------ + + def query_daily_snapshots( + self, + *, + days: int | None = None, + severities: list[str] | None = None, + ) -> dict[str, Any]: + """Read the per-day per-severity open-finding burndown counts. + + Args: + days: Days back from today (default 30 server-side, max 365). + severities: Severity filter (defaults to all four buckets). + + Returns: + ``{"snapshots": [...]}`` ordered (snapshot_date ASC, severity ASC). + """ + data: dict[str, Any] = {} + if days is not None: + data["days"] = days + if severities is not None: + data["severities"] = severities + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "query_daily_snapshots", data=data, unwrap=True, + ) + + def query_epss_history( + self, + cve: str, + *, + days: int | None = None, + ) -> dict[str, Any]: + """Read the per-day EPSS history for one CVE. + + Args: + cve: CVE id (e.g. ``CVE-2024-1234``). + days: Days back (default 90 server-side, max 365). + + Returns: + ``{"history": [{"snapshot_date": ..., "score": ..., "percentile": ...}, ...]}`` + ordered snapshot_date ASC. + """ + data: dict[str, Any] = {"cve": cve} + if days is not None: + data["days"] = days + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, "query_epss_history", data=data, unwrap=True, + ) diff --git a/tests/unit/test_cli_lazy_loading_regression.py b/tests/unit/test_cli_lazy_loading_regression.py index 63ad63b..2154225 100644 --- a/tests/unit/test_cli_lazy_loading_regression.py +++ b/tests/unit/test_cli_lazy_loading_regression.py @@ -211,7 +211,9 @@ }), "user": frozenset({"invite", "list", "permissions", "remove"}), "usp": frozenset({"validate"}), - "vulnerability": frozenset({"cve", "dashboard", "host", "scan"}), + "vulnerability": frozenset({ + "cve", "dashboard", "finding", "host", "scan", "snapshot", + }), "yara": frozenset({ "rule-add", "rule-delete", "rules-list", "scan", "source-add", "source-delete", "source-get", "sources-list", diff --git a/tests/unit/test_cli_vulnerability.py b/tests/unit/test_cli_vulnerability.py index 3a16340..cefd279 100644 --- a/tests/unit/test_cli_vulnerability.py +++ b/tests/unit/test_cli_vulnerability.py @@ -1,5 +1,7 @@ """Tests for limacharlie vulnerability CLI commands.""" +import json + from unittest.mock import patch, MagicMock from click.testing import CliRunner @@ -15,7 +17,18 @@ def _patches(): ) -def _invoke(args, mock_vuln_cls, return_value=None): +_SDK_METHODS = [ + "scan", "query_dashboard", + "query_cves", "get_cve", + "query_cve_hosts", "query_cve_packages", + "query_endpoints", "query_host_packages", + "set_finding_resolution", "bulk_set_finding_resolution", + "list_finding_resolutions", "reset_asset_findings", + "query_daily_snapshots", "query_epss_history", +] + + +def _invoke(args, mock_vuln_cls, return_value=None, stdin=None): """Run the CLI with a mocked Vulnerability instance. All SDK methods on the mocked instance return ``return_value`` so the @@ -25,15 +38,10 @@ def _invoke(args, mock_vuln_cls, return_value=None): inst = MagicMock() mock_vuln_cls.return_value = inst if return_value is not None: - for name in [ - "scan", "query_dashboard", - "query_cves", "get_cve", - "query_cve_hosts", "query_cve_packages", - "query_endpoints", "query_host_packages", - ]: + for name in _SDK_METHODS: getattr(inst, name).return_value = return_value runner = CliRunner() - result = runner.invoke(cli, ["--output", "json"] + args) + result = runner.invoke(cli, ["--output", "json"] + args, input=stdin) return result, inst @@ -47,14 +55,14 @@ def test_root_help_lists_subcommands(self): runner = CliRunner() result = runner.invoke(cli, ["vulnerability", "--help"]) assert result.exit_code == 0 - for cmd in ["scan", "dashboard", "cve", "host"]: + for cmd in ["scan", "dashboard", "cve", "host", "finding", "snapshot"]: assert cmd in result.output def test_cve_subgroup_help(self): runner = CliRunner() result = runner.invoke(cli, ["vulnerability", "cve", "--help"]) assert result.exit_code == 0 - for cmd in ["list", "get", "hosts", "packages"]: + for cmd in ["list", "get", "hosts", "packages", "epss-history"]: assert cmd in result.output def test_host_subgroup_help(self): @@ -64,6 +72,19 @@ def test_host_subgroup_help(self): for cmd in ["list", "packages"]: assert cmd in result.output + def test_finding_subgroup_help(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "finding", "--help"]) + assert result.exit_code == 0 + for cmd in ["resolve", "bulk-resolve", "list", "reset"]: + assert cmd in result.output + + def test_snapshot_subgroup_help(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "snapshot", "--help"]) + assert result.exit_code == 0 + assert "list" in result.output + # --------------------------------------------------------------------------- # scan @@ -142,6 +163,8 @@ def test_default(self): filters=None, search=None, include_tags=None, + include_enrichment=None, + filter_via_state=None, ) def test_filter_repeats_become_list(self): @@ -169,6 +192,8 @@ def test_filter_repeats_become_list(self): filters={"severity": ["HIGH", "CRITICAL"], "platform": ["1"]}, search=None, include_tags=None, + include_enrichment=None, + filter_via_state=None, ) def test_filters_json_merges_with_filter_pairs(self): @@ -211,6 +236,7 @@ def test_search_requires_all_three(self): assert "must be used together" in result.output def test_search_built_correctly(self): + """Search dict uses 'field' key (matches parseSearchOp in extension).""" p1, p2, p3 = _patches() with p1, p2, p3 as cls: result, inst = _invoke( @@ -226,12 +252,34 @@ def test_search_built_correctly(self): assert result.exit_code == 0, result.output kwargs = inst.query_cves.call_args[1] assert kwargs["search"] == { - "search": "cve", "op": "contains", "value": "CVE-2025", + "field": "cve", "op": "contains", "value": "CVE-2025", } + def test_include_enrichment_toggles(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "cve", "list", "--no-include-enrichment"], + cls, + return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + assert inst.query_cves.call_args[1]["include_enrichment"] is False + + def test_filter_via_state_toggles(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "cve", "list", "--no-filter-via-state"], + cls, + return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + assert inst.query_cves.call_args[1]["filter_via_state"] is False + # --------------------------------------------------------------------------- -# cve get / hosts / packages +# cve get / hosts / packages / epss-history # --------------------------------------------------------------------------- @@ -245,7 +293,25 @@ def test_get(self): return_value={"cve": {}}, ) assert result.exit_code == 0, result.output - inst.get_cve.assert_called_once_with("CVE-2021-44228") + inst.get_cve.assert_called_once_with( + "CVE-2021-44228", include_enrichment=None, + ) + + def test_get_no_enrichment(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "get", "CVE-2021-44228", + "--no-include-enrichment", + ], + cls, + return_value={"cve": {}}, + ) + assert result.exit_code == 0, result.output + inst.get_cve.assert_called_once_with( + "CVE-2021-44228", include_enrichment=False, + ) def test_hosts(self): p1, p2, p3 = _patches() @@ -254,6 +320,7 @@ def test_hosts(self): [ "vulnerability", "cve", "hosts", "CVE-2021-44228", "--include-tags", + "--normalized-package-name", "openssl", ], cls, return_value={"hosts": []}, @@ -268,6 +335,8 @@ def test_hosts(self): filters=None, search=None, include_tags=True, + filter_via_state=None, + normalized_package_name="openssl", ) def test_packages(self): @@ -288,6 +357,23 @@ def test_packages(self): limit=None, sort_by="package_name", sort_asc=True, + include_enrichment=None, + ) + + def test_epss_history(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "epss-history", "CVE-2024-1234", + "--days", "30", + ], + cls, + return_value={"history": []}, + ) + assert result.exit_code == 0, result.output + inst.query_epss_history.assert_called_once_with( + "CVE-2024-1234", days=30, ) @@ -318,6 +404,7 @@ def test_host_list(self): filters={"platform": ["1"]}, search=None, include_tags=None, + filter_via_state=None, ) def test_host_packages(self): @@ -327,6 +414,7 @@ def test_host_packages(self): [ "vulnerability", "host", "packages", "sid-1", "--sort-by", "score", + "--rollup-subpackages", ], cls, return_value={"packages": []}, @@ -341,4 +429,233 @@ def test_host_packages(self): filters=None, search=None, include_tags=None, + include_enrichment=None, + filter_via_state=None, + rollup_subpackages=True, + ) + + +# --------------------------------------------------------------------------- +# finding +# --------------------------------------------------------------------------- + + +class TestVulnerabilityFinding: + def test_resolve_by_cve_and_package(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "finding", "resolve", + "--scope", "org", + "--cve", "CVE-2024-1234", + "--normalized-package-name", "chrome", + "--resolution", "accepted", + "--expires-at", "2026-08-12T00:00:00Z", + ], + cls, + return_value={"scope": "org", "fingerprint": "abc"}, + ) + assert result.exit_code == 0, result.output + inst.set_finding_resolution.assert_called_once_with( + scope="org", + fingerprint=None, + cve="CVE-2024-1234", + normalized_package_name="chrome", + sid=None, + resolution="accepted", + expires_at="2026-08-12T00:00:00Z", + case_number=None, + ) + + def test_resolve_reopen_clears_resolution(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "finding", "resolve", + "--scope", "org", + "--fingerprint", "deadbeef", + "--reopen", + ], + cls, + return_value={"scope": "org", "fingerprint": "deadbeef"}, + ) + assert result.exit_code == 0, result.output + inst.set_finding_resolution.assert_called_once_with( + scope="org", + fingerprint="deadbeef", + cve=None, + normalized_package_name=None, + sid=None, + resolution=None, + expires_at=None, + case_number=None, + ) + + def test_resolve_rejects_reopen_with_resolution(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + cls.return_value.set_finding_resolution.return_value = {} + runner = CliRunner() + result = runner.invoke(cli, [ + "vulnerability", "finding", "resolve", + "--scope", "org", + "--fingerprint", "x", + "--resolution", "mitigated", + "--reopen", + ]) + assert result.exit_code != 0 + assert "incompatible" in result.output + + def test_resolve_requires_fingerprint_or_pair(self): + runner = CliRunner() + result = runner.invoke(cli, [ + "vulnerability", "finding", "resolve", "--scope", "org", + "--resolution", "mitigated", + ]) + assert result.exit_code != 0 + assert "fingerprint" in result.output.lower() + + def test_resolve_host_requires_sid(self): + runner = CliRunner() + result = runner.invoke(cli, [ + "vulnerability", "finding", "resolve", "--scope", "host", + "--cve", "CVE-2024-1", "--normalized-package-name", "openssl", + "--resolution", "mitigated", + ]) + assert result.exit_code != 0 + assert "--sid" in result.output + + def test_bulk_resolve(self): + p1, p2, p3 = _patches() + targets = [ + {"scope": "org", "fingerprint": "aaaa"}, + {"scope": "host", "cve": "CVE-2024-1", + "normalized_package_name": "openssl", "sid": "sid-2"}, + ] + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "finding", "bulk-resolve", + "--resolution", "mitigated", + "--targets-json", json.dumps(targets), + ], + cls, + return_value={"applied": 2}, + ) + assert result.exit_code == 0, result.output + inst.bulk_set_finding_resolution.assert_called_once_with( + targets, + resolution="mitigated", + expires_at=None, + case_number=None, + ) + + def test_bulk_resolve_rejects_non_array(self): + runner = CliRunner() + result = runner.invoke(cli, [ + "vulnerability", "finding", "bulk-resolve", + "--targets-json", '{"scope":"org"}', + "--resolution", "mitigated", + ]) + assert result.exit_code != 0 + assert "array" in result.output.lower() + + def test_list_no_filters(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "finding", "list"], + cls, + return_value={"resolutions": []}, + ) + assert result.exit_code == 0, result.output + inst.list_finding_resolutions.assert_called_once_with( + scope=None, resolutions=None, cursor=None, limit=None, + ) + + def test_list_multi_resolution_filter(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "finding", "list", + "--scope", "org", + "--resolution", "accepted", + "--resolution", "mitigated", + "--limit", "50", + ], + cls, + return_value={"resolutions": []}, + ) + assert result.exit_code == 0, result.output + inst.list_finding_resolutions.assert_called_once_with( + scope="org", + resolutions=["accepted", "mitigated"], + cursor=None, + limit=50, + ) + + def test_reset_requires_confirmation(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + # Provide --yes-equivalent via stdin "y" + result, inst = _invoke( + ["vulnerability", "finding", "reset", "--sid", "sid-1"], + cls, + return_value={"sid": "sid-1", "closed": 0}, + stdin="y\n", + ) + assert result.exit_code == 0, result.output + inst.reset_asset_findings.assert_called_once_with("sid-1") + + def test_reset_aborted_when_declined(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "finding", "reset", "--sid", "sid-1"], + cls, + return_value={"sid": "sid-1", "closed": 0}, + stdin="n\n", + ) + assert result.exit_code != 0 + inst.reset_asset_findings.assert_not_called() + + +# --------------------------------------------------------------------------- +# snapshot +# --------------------------------------------------------------------------- + + +class TestVulnerabilitySnapshot: + def test_list_defaults(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "snapshot", "list"], + cls, + return_value={"snapshots": []}, + ) + assert result.exit_code == 0, result.output + inst.query_daily_snapshots.assert_called_once_with( + days=None, severities=None, + ) + + def test_list_with_filters(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "snapshot", "list", + "--days", "90", + "--severity", "critical", + "--severity", "high", + ], + cls, + return_value={"snapshots": []}, + ) + assert result.exit_code == 0, result.output + inst.query_daily_snapshots.assert_called_once_with( + days=90, severities=["critical", "high"], ) diff --git a/tests/unit/test_sdk_vulnerability.py b/tests/unit/test_sdk_vulnerability.py index 9d9850e..c110130 100644 --- a/tests/unit/test_sdk_vulnerability.py +++ b/tests/unit/test_sdk_vulnerability.py @@ -55,8 +55,10 @@ def test_passes_all_options(self, vuln, mock_org): sort_by="severity", sort_asc=True, filters={"severity": ["HIGH", "CRITICAL"]}, - search={"search": "cve", "op": "contains", "value": "CVE-2025"}, + search={"field": "cve", "op": "contains", "value": "CVE-2025"}, include_tags=True, + include_enrichment=False, + filter_via_state=False, ) action, data = _decode(mock_org.client.request.call_args) assert action == "query_cves" @@ -66,8 +68,10 @@ def test_passes_all_options(self, vuln, mock_org): "sort_by": "severity", "sort_asc": True, "filters": {"severity": ["HIGH", "CRITICAL"]}, - "search": {"search": "cve", "op": "contains", "value": "CVE-2025"}, + "search": {"field": "cve", "op": "contains", "value": "CVE-2025"}, "include_tags": True, + "include_enrichment": False, + "filter_via_state": False, } @@ -79,6 +83,13 @@ def test_sends_cve_id(self, vuln, mock_org): assert action == "query_cve" assert data == {"cve_id": "CVE-2021-44228"} + def test_sends_include_enrichment(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.get_cve("CVE-2021-44228", include_enrichment=False) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cve" + assert data == {"cve_id": "CVE-2021-44228", "include_enrichment": False} + class TestQueryCveHosts: def test_required_cve_and_options(self, vuln, mock_org): @@ -88,6 +99,8 @@ def test_required_cve_and_options(self, vuln, mock_org): limit=10, sort_by="hostname", include_tags=True, + normalized_package_name="openssl", + filter_via_state=False, ) action, data = _decode(mock_org.client.request.call_args) assert action == "query_cve_vuln_hosts" @@ -95,6 +108,8 @@ def test_required_cve_and_options(self, vuln, mock_org): assert data["limit"] == 10 assert data["sort_by"] == "hostname" assert data["include_tags"] is True + assert data["normalized_package_name"] == "openssl" + assert data["filter_via_state"] is False class TestQueryCvePackages: @@ -106,6 +121,7 @@ def test_required_cve_and_pagination(self, vuln, mock_org): limit=25, sort_by="count", sort_asc=False, + include_enrichment=False, ) action, data = _decode(mock_org.client.request.call_args) assert action == "query_cve_vuln_packages" @@ -115,6 +131,7 @@ def test_required_cve_and_pagination(self, vuln, mock_org): "limit": 25, "sort_by": "count", "sort_asc": False, + "include_enrichment": False, } @@ -136,6 +153,20 @@ def test_required_sid(self, vuln, mock_org): assert data["sid"] == "sid-1" assert data["sort_by"] == "score" + def test_passes_rollup_and_enrichment(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_host_packages( + "sid-1", + rollup_subpackages=True, + include_enrichment=False, + filter_via_state=False, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_host_vuln_packages" + assert data["rollup_subpackages"] is True + assert data["include_enrichment"] is False + assert data["filter_via_state"] is False + class TestQueryDashboard: def test_default_empty(self, vuln, mock_org): @@ -159,3 +190,104 @@ def test_sends_sid(self, vuln, mock_org): action, data = _decode(mock_org.client.request.call_args) assert action == "scan_packages" assert data == {"sid": "sid-42"} + + +class TestFindingResolution: + def test_set_by_cve_and_package(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.set_finding_resolution( + scope="org", + cve="CVE-2024-1", + normalized_package_name="openssl", + resolution="accepted", + expires_at="2026-12-31T23:59:59Z", + case_number=42, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "set_finding_resolution" + assert data == { + "scope": "org", + "cve": "CVE-2024-1", + "normalized_package_name": "openssl", + "resolution": "accepted", + "expires_at": "2026-12-31T23:59:59Z", + "case_number": 42, + } + + def test_set_reopen_omits_resolution(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.set_finding_resolution(scope="org", fingerprint="deadbeef") + action, data = _decode(mock_org.client.request.call_args) + assert action == "set_finding_resolution" + assert data == {"scope": "org", "fingerprint": "deadbeef"} + assert "resolution" not in data + + def test_bulk_set(self, vuln, mock_org): + mock_org.client.request.return_value = {} + targets = [ + {"scope": "org", "fingerprint": "aaaa"}, + {"scope": "host", "cve": "CVE-2024-1", + "normalized_package_name": "openssl", "sid": "sid-2"}, + ] + vuln.bulk_set_finding_resolution(targets, resolution="mitigated") + action, data = _decode(mock_org.client.request.call_args) + assert action == "bulk_set_finding_resolution" + assert data == {"targets": targets, "resolution": "mitigated"} + + def test_list(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.list_finding_resolutions( + scope="org", resolutions=["accepted"], cursor="c", limit=10, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "list_finding_resolutions" + assert data == { + "scope": "org", + "resolutions": ["accepted"], + "cursor": "c", + "limit": 10, + } + + def test_list_defaults(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.list_finding_resolutions() + action, data = _decode(mock_org.client.request.call_args) + assert action == "list_finding_resolutions" + assert data == {} + + def test_reset_asset_findings(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.reset_asset_findings("sid-9") + action, data = _decode(mock_org.client.request.call_args) + assert action == "reset_asset_findings" + assert data == {"sid": "sid-9"} + + +class TestDailySnapshots: + def test_defaults(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_daily_snapshots() + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_daily_snapshots" + assert data == {} + + def test_with_filters(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_daily_snapshots(days=90, severities=["critical", "high"]) + action, data = _decode(mock_org.client.request.call_args) + assert data == {"days": 90, "severities": ["critical", "high"]} + + +class TestEpssHistory: + def test_defaults(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_epss_history("CVE-2024-1") + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_epss_history" + assert data == {"cve": "CVE-2024-1"} + + def test_with_days(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_epss_history("CVE-2024-1", days=30) + action, data = _decode(mock_org.client.request.call_args) + assert data == {"cve": "CVE-2024-1", "days": 30}