From 61498d72d21c7ef9be34e3860fb8b211f1b1937b Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Mon, 4 May 2026 09:23:00 -0700 Subject: [PATCH 1/3] feat: add vulnerability CLI commands for ext-vulnerability-reporting Adds a dedicated `limacharlie vulnerability` command set that wraps the ext-vulnerability-reporting extension's RPC actions (query_cves, query_cve, query_cve_vuln_hosts, query_cve_vuln_packages, query_endpoints, query_host_vuln_packages, query_dashboard, scan_packages) behind typed SDK and Click surfaces. Subscription management stays under `limacharlie extension`; this group is purely the user-facing query / scan-trigger surface. Co-Authored-By: Claude Opus 4.7 (1M context) --- limacharlie/cli.py | 1 + limacharlie/commands/vulnerability.py | 534 ++++++++++++++++++ limacharlie/sdk/vulnerability.py | 286 ++++++++++ .../unit/test_cli_lazy_loading_regression.py | 4 +- tests/unit/test_cli_vulnerability.py | 348 ++++++++++++ tests/unit/test_sdk_vulnerability.py | 167 ++++++ 6 files changed, 1339 insertions(+), 1 deletion(-) create mode 100644 limacharlie/commands/vulnerability.py create mode 100644 limacharlie/sdk/vulnerability.py create mode 100644 tests/unit/test_cli_vulnerability.py create mode 100644 tests/unit/test_sdk_vulnerability.py diff --git a/limacharlie/cli.py b/limacharlie/cli.py index c37c59b..21f123c 100644 --- a/limacharlie/cli.py +++ b/limacharlie/cli.py @@ -146,6 +146,7 @@ def _config_no_warnings() -> bool: "task": ("task", "group"), "user": ("user", "group"), "usp": ("usp", "group"), + "vulnerability": ("vulnerability", "group"), "yara": ("yara", "group"), } diff --git a/limacharlie/commands/vulnerability.py b/limacharlie/commands/vulnerability.py new file mode 100644 index 0000000..dbc9c7a --- /dev/null +++ b/limacharlie/commands/vulnerability.py @@ -0,0 +1,534 @@ +"""Vulnerability commands for LimaCharlie CLI v2. + +Commands for querying the ``ext-vulnerability-reporting`` extension: +list CVEs, list affected endpoints, get the dashboard, drill into a +single host or CVE, and trigger an on-demand scan of a sensor. + +Subscription management lives under ``limacharlie extension``; this +module is purely the user-facing query surface. +""" + +from __future__ import annotations + +import json +from typing import Any + +import click + +from ..cli import pass_context +from ..client import Client +from ..sdk.organization import Organization +from ..sdk.vulnerability import Vulnerability +from ..output import format_output, detect_output_format +from ..discovery import register_explain + + +# --------------------------------------------------------------------------- +# Explain texts +# --------------------------------------------------------------------------- + +_EXPLAIN_GROUP = """\ +Query the vulnerability index built by ext-vulnerability-reporting. + +The extension keeps a per-org index of OS packages reported by +sensors and joins it against a CVE database. Subscribe the org to +the extension first: + + limacharlie extension subscribe --name ext-vulnerability-reporting + +Then either let the default scheduled rule pull os_packages once a +day from each sensor, or run an on-demand scan with: + + limacharlie vulnerability scan --sid + +Once data has flowed in, use the cve / host / dashboard subcommands +to query it. +""" + +_EXPLAIN_SCAN = """\ +Trigger an on-demand os_packages scan for a single sensor. + +The extension tasks the sensor for its installed packages, joins +them against the CVE database, and stores the findings. Use this +when you want fresh data for one host without waiting for the +daily schedule. + +Use --simulate to skip the sensor task and exercise the scan +pipeline with synthetic packages — useful for end-to-end testing +of detections that consume vulnerability findings. + +Examples: + limacharlie vulnerability scan --sid 11111111-2222-3333-4444-555555555555 + limacharlie vulnerability scan --sid --simulate +""" + +_EXPLAIN_DASHBOARD = """\ +Return the dashboard graphs for the org. + +The response is a list of named graphs (severity counts, top CVEs, +…) that mirror what the LC web UI shows. + +Example: + limacharlie vulnerability dashboard +""" + +_EXPLAIN_CVE_LIST = """\ +List CVEs observed across the org's sensors. + +Pagination, filters, and free-text search are all server-side. +Repeat --filter to add multiple values for the same key (treated +as OR by the extension): + + --filter severity=HIGH --filter severity=CRITICAL + +For free-text search use the three flags together: + + --search-field cve --search-op contains --search-value CVE-2025 + +Sort by 'cve', 'count', or 'severity'. + +Examples: + limacharlie vulnerability cve list + limacharlie vulnerability cve list --sort-by severity --limit 50 + limacharlie vulnerability cve list --filter severity=CRITICAL +""" + +_EXPLAIN_CVE_GET = """\ +Return raw details for a single CVE id. + +The response shape is the upstream NVD record (descriptions, CVSS +metrics, weaknesses, references, configurations). + +Examples: + limacharlie vulnerability cve get CVE-2021-44228 +""" + +_EXPLAIN_CVE_HOSTS = """\ +List hosts in the org that have at least one package vulnerable to +the given CVE. + +Sort by 'hostname' (default) or 'platform_string'. + +Examples: + limacharlie vulnerability cve hosts CVE-2021-44228 + limacharlie vulnerability cve hosts CVE-2021-44228 --include-tags +""" + +_EXPLAIN_CVE_PACKAGES = """\ +List (package_name, package_version) pairs in the org affected by +the given CVE, with a count of distinct sensors that have each +pair installed. + +Sort by 'count' (default), 'package_name', or 'package_version'. + +Examples: + limacharlie vulnerability cve packages CVE-2021-44228 + limacharlie vulnerability cve packages CVE-2021-44228 --sort-by package_name --sort-asc +""" + +_EXPLAIN_HOST_LIST = """\ +List endpoints with their vulnerability counts. + +Sort by 'hostname', 'platform_string', 'count', or 'severity'. + +Examples: + limacharlie vulnerability host list + limacharlie vulnerability host list --sort-by count --filter platform=1 +""" + +_EXPLAIN_HOST_PACKAGES = """\ +List vulnerable packages and their CVEs on a single host. + +Sort by 'cve' (default), 'score', 'severity', 'package_name', or +'package_name_package_version_cve'. + +Examples: + limacharlie vulnerability host packages 11111111-2222-3333-4444-555555555555 + limacharlie vulnerability host packages --sort-by score +""" + +register_explain("vulnerability.scan", _EXPLAIN_SCAN) +register_explain("vulnerability.dashboard", _EXPLAIN_DASHBOARD) +register_explain("vulnerability.cve.list", _EXPLAIN_CVE_LIST) +register_explain("vulnerability.cve.get", _EXPLAIN_CVE_GET) +register_explain("vulnerability.cve.hosts", _EXPLAIN_CVE_HOSTS) +register_explain("vulnerability.cve.packages", _EXPLAIN_CVE_PACKAGES) +register_explain("vulnerability.host.list", _EXPLAIN_HOST_LIST) +register_explain("vulnerability.host.packages", _EXPLAIN_HOST_PACKAGES) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _output(ctx: click.Context, data: Any) -> None: + fmt = ctx.obj.output_format or detect_output_format() + if not ctx.obj.quiet: + click.echo(format_output(data, fmt)) + + +def _get_vuln(ctx: click.Context) -> Vulnerability: + client = Client( + oid=ctx.obj.oid, + environment=ctx.obj.environment, + print_debug_fn=ctx.obj.debug_fn, + debug_full_response=ctx.obj.debug_full, + debug_curl=ctx.obj.debug_curl, + debug_verbose=ctx.obj.debug_verbose, + ) + org = Organization(client) + return Vulnerability(org) + + +def _parse_filters( + filter_pairs: tuple[str, ...], + filters_json: str | None, +) -> dict[str, list[str]] | None: + """Combine ``--filter k=v`` repeats with ``--filters-json`` into one dict. + + The two flags are mutually compatible; if both are given, ``--filter`` + pairs are merged on top of the JSON dict. Returns ``None`` when no + filter input was supplied so the SDK omits the key entirely. + """ + out: dict[str, list[str]] = {} + if filters_json is not None: + try: + parsed = json.loads(filters_json) + except json.JSONDecodeError as exc: + raise click.BadParameter(f"invalid JSON: {exc}", param_hint="--filters-json") + if not isinstance(parsed, dict): + raise click.BadParameter( + "must decode to an object of {field: [values]}", + param_hint="--filters-json", + ) + for k, v in parsed.items(): + if not isinstance(v, list): + raise click.BadParameter( + f"value for {k!r} must be a list of strings", + param_hint="--filters-json", + ) + out[str(k)] = [str(x) for x in v] + for pair in filter_pairs: + if "=" not in pair: + raise click.BadParameter( + f"expected KEY=VALUE, got {pair!r}", param_hint="--filter", + ) + key, value = pair.split("=", 1) + out.setdefault(key, []).append(value) + return out or None + + +def _build_search( + field: str | None, + op: str | None, + value: str | None, +) -> dict[str, Any] | None: + """Build the search dict expected by the extension. + + The extension shape is ``{"search": , "op": "is|contains", + "value": }``. All three flags must be set together, or none. + """ + provided = [x is not None for x in (field, op, value)] + if not any(provided): + return None + if not all(provided): + raise click.UsageError( + "--search-field, --search-op, and --search-value must be used together", + ) + return {"search": field, "op": op, "value": value} + + +# --------------------------------------------------------------------------- +# Shared option values +# --------------------------------------------------------------------------- + +_SEARCH_OP_CHOICES = click.Choice(["is", "contains"], case_sensitive=False) + + +def _common_query_options(f): + """Decorate a command with the shared list-query options. + + Order matters in Click: decorators stack bottom-up, so the option + that should appear first in --help is applied last. + """ + f = click.option( + "--include-tags", is_flag=True, default=False, + help="Include sensor tags on each returned row.", + )(f) + f = click.option( + "--search-value", default=None, + help="Free-text search value (used with --search-field/--search-op).", + )(f) + f = click.option( + "--search-op", default=None, type=_SEARCH_OP_CHOICES, + help="Search operator: 'is' (exact) or 'contains' (substring).", + )(f) + f = click.option( + "--search-field", default=None, + help="Field to free-text search on (e.g. 'cve', 'hostname').", + )(f) + f = click.option( + "--filters-json", default=None, + help="Raw filters as JSON object {field: [values]}. Combinable with --filter.", + )(f) + f = click.option( + "--filter", "filter_pairs", multiple=True, metavar="KEY=VALUE", + help="Filter by KEY=VALUE; repeat for multiple values (OR within a key).", + )(f) + f = click.option( + "--sort-asc", is_flag=True, default=False, + help="Sort ascending (default: descending).", + )(f) + f = click.option( + "--sort-by", default=None, + help="Field to sort by (see command help for valid values).", + )(f) + f = click.option( + "--limit", default=None, type=int, + help="Maximum number of rows to return (default: 100).", + )(f) + f = click.option( + "--cursor", default=None, + help="Pagination cursor returned by a previous call.", + )(f) + return f + + +# --------------------------------------------------------------------------- +# Group +# --------------------------------------------------------------------------- + +@click.group("vulnerability") +def group() -> None: + """Query vulnerability data (ext-vulnerability-reporting). + + Provides CVE, host, and dashboard views over the per-org + vulnerability index, plus an on-demand scan trigger. + + \b + Subgroups / commands: + scan Trigger an on-demand os_packages scan + dashboard Per-org vulnerability dashboard graphs + cve list List CVEs observed in the org + cve get Get raw CVE details + cve hosts Hosts affected by a CVE + cve packages Packages affected by a CVE + host list Endpoints with vulnerability counts + host packages Vulnerable packages on a single host + """ + + +# --------------------------------------------------------------------------- +# scan +# --------------------------------------------------------------------------- + +@group.command("scan") +@click.option("--sid", required=True, help="Sensor ID to scan.") +@click.option( + "--simulate", is_flag=True, default=False, + help="Skip the sensor task and exercise the pipeline with simulated packages.", +) +@pass_context +def scan(ctx, sid, simulate) -> None: + """Trigger an on-demand os_packages scan for a sensor. + + \b + Examples: + limacharlie vulnerability scan --sid 11111111-2222-3333-4444-555555555555 + limacharlie vulnerability scan --sid --simulate + """ + v = _get_vuln(ctx) + data = v.scan(sid, simulate=simulate) + _output(ctx, data) + + +# --------------------------------------------------------------------------- +# dashboard +# --------------------------------------------------------------------------- + +@group.command("dashboard") +@click.option("--sort-asc", is_flag=True, default=False, help="Sort ascending.") +@pass_context +def dashboard(ctx, sort_asc) -> None: + """Return the per-org vulnerability dashboard graphs. + + \b + Example: + limacharlie vulnerability dashboard + """ + v = _get_vuln(ctx) + data = v.query_dashboard(sort_asc=sort_asc or None) + _output(ctx, data) + + +# --------------------------------------------------------------------------- +# cve subgroup +# --------------------------------------------------------------------------- + +@group.group("cve") +def cve_group() -> None: + """CVE-centric views: list, get, hosts affected, packages affected.""" + + +@cve_group.command("list") +@_common_query_options +@pass_context +def cve_list(ctx, cursor, limit, sort_by, sort_asc, + filter_pairs, filters_json, + search_field, search_op, search_value, include_tags) -> None: + """List CVEs observed across the org. + + Valid --sort-by values: cve, count, severity. + + \b + Examples: + limacharlie vulnerability cve list --sort-by severity --limit 50 + limacharlie vulnerability cve list --filter severity=CRITICAL --filter severity=HIGH + """ + v = _get_vuln(ctx) + data = v.query_cves( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc or None, + filters=_parse_filters(filter_pairs, filters_json), + search=_build_search(search_field, search_op, search_value), + include_tags=include_tags or None, + ) + _output(ctx, data) + + +@cve_group.command("get") +@click.argument("cve_id") +@pass_context +def cve_get(ctx, cve_id) -> None: + """Return raw details for CVE_ID (e.g. CVE-2021-44228). + + \b + Example: + limacharlie vulnerability cve get CVE-2021-44228 + """ + v = _get_vuln(ctx) + data = v.get_cve(cve_id) + _output(ctx, data) + + +@cve_group.command("hosts") +@click.argument("cve_id") +@_common_query_options +@pass_context +def cve_hosts(ctx, cve_id, cursor, limit, sort_by, sort_asc, + filter_pairs, filters_json, + search_field, search_op, search_value, include_tags) -> None: + """List hosts affected by CVE_ID. + + Valid --sort-by values: hostname (default), platform_string. + + \b + Example: + limacharlie vulnerability cve hosts CVE-2021-44228 --include-tags + """ + v = _get_vuln(ctx) + data = v.query_cve_hosts( + cve_id, + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc or None, + filters=_parse_filters(filter_pairs, filters_json), + search=_build_search(search_field, search_op, search_value), + include_tags=include_tags or None, + ) + _output(ctx, data) + + +@cve_group.command("packages") +@click.argument("cve_id") +@click.option("--cursor", default=None, help="Pagination cursor returned by a previous call.") +@click.option("--limit", default=None, type=int, help="Maximum number of rows to return (default: 100).") +@click.option("--sort-by", default=None, + help="Field to sort by: count (default), package_name, package_version.") +@click.option("--sort-asc", is_flag=True, default=False, help="Sort ascending.") +@pass_context +def cve_packages(ctx, cve_id, cursor, limit, sort_by, sort_asc) -> None: + """List (package_name, package_version) pairs affected by CVE_ID. + + \b + Example: + limacharlie vulnerability cve packages CVE-2021-44228 --sort-by count + """ + v = _get_vuln(ctx) + data = v.query_cve_packages( + cve_id, + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc or None, + ) + _output(ctx, data) + + +# --------------------------------------------------------------------------- +# host subgroup +# --------------------------------------------------------------------------- + +@group.group("host") +def host_group() -> None: + """Endpoint-centric views: list endpoints, packages on a single host.""" + + +@host_group.command("list") +@_common_query_options +@pass_context +def host_list(ctx, cursor, limit, sort_by, sort_asc, + filter_pairs, filters_json, + search_field, search_op, search_value, include_tags) -> None: + """List endpoints with their vulnerability counts. + + Valid --sort-by values: hostname, platform_string, count, severity. + + \b + Example: + limacharlie vulnerability host list --sort-by count --filter platform=1 + """ + v = _get_vuln(ctx) + data = v.query_endpoints( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc or None, + filters=_parse_filters(filter_pairs, filters_json), + search=_build_search(search_field, search_op, search_value), + include_tags=include_tags or None, + ) + _output(ctx, data) + + +@host_group.command("packages") +@click.argument("sid") +@_common_query_options +@pass_context +def host_packages(ctx, sid, cursor, limit, sort_by, sort_asc, + filter_pairs, filters_json, + search_field, search_op, search_value, include_tags) -> None: + """List vulnerable packages and their CVEs on host SID. + + Valid --sort-by values: cve (default), score, severity, package_name, + package_name_package_version_cve. + + \b + Example: + limacharlie vulnerability host packages --sort-by score + """ + v = _get_vuln(ctx) + data = v.query_host_packages( + sid, + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc or None, + filters=_parse_filters(filter_pairs, filters_json), + search=_build_search(search_field, search_op, search_value), + include_tags=include_tags or None, + ) + _output(ctx, data) diff --git a/limacharlie/sdk/vulnerability.py b/limacharlie/sdk/vulnerability.py new file mode 100644 index 0000000..2d87988 --- /dev/null +++ b/limacharlie/sdk/vulnerability.py @@ -0,0 +1,286 @@ +"""Vulnerability reporting SDK for LimaCharlie v2. + +Wraps the ext-vulnerability-reporting extension. The extension keeps a +per-org index of OS packages reported by sensors and joins it against +a CVE database, giving per-CVE / per-host vulnerability views and a +dashboard summary. + +The extension exposes a small set of RPC actions over the standard +extension request channel; this module is a thin typed shim around +:class:`limacharlie.sdk.extensions.Extensions.request` for those. +""" + +from __future__ import annotations + +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from .organization import Organization + +from .extensions import Extensions + +_EXTENSION_NAME = "ext-vulnerability-reporting" + + +def _build_query( + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + filters: dict[str, list[str]] | None = None, + search: dict[str, Any] | None = None, + include_tags: bool | None = None, +) -> dict[str, Any]: + """Assemble the optional query parameters shared by the list endpoints. + + The extension treats missing keys as defaults (empty cursor, default + limit, default sort), so we only include keys the caller set. + """ + data: dict[str, Any] = {} + if cursor is not None: + data["cursor"] = cursor + if limit is not None: + data["limit"] = limit + if sort_by is not None: + data["sort_by"] = sort_by + if sort_asc is not None: + data["sort_asc"] = sort_asc + if filters is not None: + data["filters"] = filters + if search is not None: + data["search"] = search + if include_tags is not None: + data["include_tags"] = include_tags + return data + + +class Vulnerability: + """Vulnerability reporting client for LimaCharlie.""" + + def __init__(self, org: Organization) -> None: + self._org = org + + @property + def oid(self) -> str: + return self._org.oid + + # ------------------------------------------------------------------ + # CVE-centric views + # ------------------------------------------------------------------ + + def query_cves( + self, + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + filters: dict[str, list[str]] | None = None, + search: dict[str, Any] | None = None, + include_tags: bool | None = None, + ) -> dict[str, Any]: + """List CVEs observed across the org. + + Args: + cursor: Pagination cursor returned by a previous call. + limit: Maximum number of rows to return (default 100). + sort_by: One of ``cve``, ``count``, ``severity``. + sort_asc: Sort ascending instead of descending. + filters: ``{field: [values]}`` filter map (e.g. ``{"severity": ["HIGH","CRITICAL"]}``). + search: ``{"search": field, "op": "is|contains", "value": str}`` substring search. + include_tags: Include sensor tags on returned rows. + + Returns: + ``{"results": [...], "next_cursor": str, "total_return_count": int}``. + """ + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, + "query_cves", + data=_build_query( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc, + filters=filters, + search=search, + include_tags=include_tags, + ), + ) + + def get_cve(self, cve_id: str) -> dict[str, Any]: + """Return raw details for a single CVE id (e.g. ``CVE-2021-44228``).""" + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, + "query_cve", + data={"cve_id": cve_id}, + ) + + def query_cve_hosts( + self, + cve: str, + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + filters: dict[str, list[str]] | None = None, + search: dict[str, Any] | None = None, + include_tags: bool | None = None, + ) -> dict[str, Any]: + """List hosts in the org affected by ``cve``. + + Args: + cve: CVE id (e.g. ``CVE-2021-44228``). + cursor, limit, sort_by, sort_asc: pagination + ordering. + ``sort_by`` accepts ``hostname`` (default) or ``platform_string``. + filters, search, include_tags: see :meth:`query_cves`. + + Returns: + ``{"hosts": [...], "cursor": str, "total": int}``. + """ + data = _build_query( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc, + filters=filters, + search=search, + include_tags=include_tags, + ) + data["cve"] = cve + ext = Extensions(self._org) + return ext.request(_EXTENSION_NAME, "query_cve_vuln_hosts", data=data) + + def query_cve_packages( + self, + cve: str, + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + ) -> dict[str, Any]: + """List ``(package_name, package_version)`` pairs in the org affected by ``cve``. + + ``sort_by`` accepts ``count`` (default), ``package_name``, or + ``package_version``. + + Returns: + ``{"packages": [...], "cursor": str, "total": int}``. + """ + data: dict[str, Any] = {"cve": cve} + if cursor is not None: + data["cursor"] = cursor + if limit is not None: + data["limit"] = limit + if sort_by is not None: + data["sort_by"] = sort_by + if sort_asc is not None: + data["sort_asc"] = sort_asc + ext = Extensions(self._org) + return ext.request(_EXTENSION_NAME, "query_cve_vuln_packages", data=data) + + # ------------------------------------------------------------------ + # Endpoint-centric views + # ------------------------------------------------------------------ + + def query_endpoints( + self, + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + filters: dict[str, list[str]] | None = None, + search: dict[str, Any] | None = None, + include_tags: bool | None = None, + ) -> dict[str, Any]: + """List endpoints with vulnerability counts. + + ``sort_by`` accepts ``hostname``, ``platform_string``, ``count``, or + ``severity``. + + Returns: + ``{"endpoints": [...], "cursor": str, "total_return_count": int}``. + """ + ext = Extensions(self._org) + return ext.request( + _EXTENSION_NAME, + "query_endpoints", + data=_build_query( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc, + filters=filters, + search=search, + include_tags=include_tags, + ), + ) + + def query_host_packages( + self, + sid: str, + *, + cursor: str | None = None, + limit: int | None = None, + sort_by: str | None = None, + sort_asc: bool | None = None, + filters: dict[str, list[str]] | None = None, + search: dict[str, Any] | None = None, + include_tags: bool | None = None, + ) -> dict[str, Any]: + """List vulnerable packages and their CVEs on a single host. + + ``sort_by`` accepts ``cve``, ``score``, ``severity``, + ``package_name``, or ``package_name_package_version_cve``. + + Returns: + ``{"packages": [...], "cursor": str, "total": int}``. + """ + data = _build_query( + cursor=cursor, + limit=limit, + sort_by=sort_by, + sort_asc=sort_asc, + filters=filters, + search=search, + include_tags=include_tags, + ) + data["sid"] = sid + ext = Extensions(self._org) + return ext.request(_EXTENSION_NAME, "query_host_vuln_packages", data=data) + + # ------------------------------------------------------------------ + # Dashboard + # ------------------------------------------------------------------ + + def query_dashboard(self, *, sort_asc: bool | None = None) -> dict[str, Any]: + """Return the per-org dashboard graphs (severity counts, top CVEs, …).""" + data: dict[str, Any] = {} + if sort_asc is not None: + data["sort_asc"] = sort_asc + ext = Extensions(self._org) + return ext.request(_EXTENSION_NAME, "query_dashboard", data=data) + + # ------------------------------------------------------------------ + # Scan trigger + # ------------------------------------------------------------------ + + def scan(self, sid: str, *, simulate: bool = False) -> dict[str, Any]: + """Trigger an on-demand os_packages scan for a sensor. + + Args: + sid: Sensor id to scan. + simulate: If True, the extension returns simulated CVE matches + without tasking the sensor (useful for end-to-end testing). + + Returns: + ``{"message": str, "sid": str}``. + """ + data: dict[str, Any] = {"sid": sid, "simulate": bool(simulate)} + ext = Extensions(self._org) + return ext.request(_EXTENSION_NAME, "scan_packages", data=data) diff --git a/tests/unit/test_cli_lazy_loading_regression.py b/tests/unit/test_cli_lazy_loading_regression.py index fddce0f..63ad63b 100644 --- a/tests/unit/test_cli_lazy_loading_regression.py +++ b/tests/unit/test_cli_lazy_loading_regression.py @@ -52,7 +52,7 @@ "ingestion-key", "installation-key", "integrity", "ioc", "job", "logging", "lookup", "note", "org", "output", "payload", "playbook", "replay", "schema", "search", "secret", "sensor", "sop", "spotcheck", "stream", - "sync", "tag", "task", "user", "usp", "yara", + "sync", "tag", "task", "user", "usp", "vulnerability", "yara", }) # Module filename -> (attribute name, Click command name). @@ -110,6 +110,7 @@ "task": ("group", "task"), "user": ("group", "user"), "usp": ("group", "usp"), + "vulnerability": ("group", "vulnerability"), "yara": ("group", "yara"), } @@ -210,6 +211,7 @@ }), "user": frozenset({"invite", "list", "permissions", "remove"}), "usp": frozenset({"validate"}), + "vulnerability": frozenset({"cve", "dashboard", "host", "scan"}), "yara": frozenset({ "rule-add", "rule-delete", "rules-list", "scan", "source-add", "source-delete", "source-get", "sources-list", diff --git a/tests/unit/test_cli_vulnerability.py b/tests/unit/test_cli_vulnerability.py new file mode 100644 index 0000000..5b4cb99 --- /dev/null +++ b/tests/unit/test_cli_vulnerability.py @@ -0,0 +1,348 @@ +"""Tests for limacharlie vulnerability CLI commands.""" + +from unittest.mock import patch, MagicMock + +from click.testing import CliRunner + +from limacharlie.cli import cli + + +def _patches(): + return ( + patch("limacharlie.commands.vulnerability.Client"), + patch("limacharlie.commands.vulnerability.Organization"), + patch("limacharlie.commands.vulnerability.Vulnerability"), + ) + + +def _invoke(args, mock_vuln_cls, return_value=None): + """Run the CLI with a mocked Vulnerability instance. + + All SDK methods on the mocked instance return ``return_value`` so the + CLI can render output without us needing to know which method got + called from the test set-up. + """ + inst = MagicMock() + mock_vuln_cls.return_value = inst + if return_value is not None: + for name in [ + "scan", "query_dashboard", + "query_cves", "get_cve", + "query_cve_hosts", "query_cve_packages", + "query_endpoints", "query_host_packages", + ]: + getattr(inst, name).return_value = return_value + runner = CliRunner() + result = runner.invoke(cli, ["--output", "json"] + args) + return result, inst + + +# --------------------------------------------------------------------------- +# Help +# --------------------------------------------------------------------------- + + +class TestVulnerabilityHelp: + def test_root_help_lists_subcommands(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "--help"]) + assert result.exit_code == 0 + for cmd in ["scan", "dashboard", "cve", "host"]: + assert cmd in result.output + + def test_cve_subgroup_help(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "cve", "--help"]) + assert result.exit_code == 0 + for cmd in ["list", "get", "hosts", "packages"]: + assert cmd in result.output + + def test_host_subgroup_help(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "host", "--help"]) + assert result.exit_code == 0 + for cmd in ["list", "packages"]: + assert cmd in result.output + + +# --------------------------------------------------------------------------- +# scan +# --------------------------------------------------------------------------- + + +class TestVulnerabilityScan: + def test_scan_basic(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "scan", "--sid", "sid-1"], + cls, + return_value={"message": "ok", "sid": "sid-1"}, + ) + assert result.exit_code == 0, result.output + inst.scan.assert_called_once_with("sid-1", simulate=False) + + def test_scan_simulate(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "scan", "--sid", "sid-1", "--simulate"], + cls, + return_value={"message": "ok", "sid": "sid-1"}, + ) + assert result.exit_code == 0, result.output + inst.scan.assert_called_once_with("sid-1", simulate=True) + + def test_scan_requires_sid(self): + runner = CliRunner() + result = runner.invoke(cli, ["vulnerability", "scan"]) + assert result.exit_code != 0 + + +# --------------------------------------------------------------------------- +# dashboard +# --------------------------------------------------------------------------- + + +class TestVulnerabilityDashboard: + def test_dashboard_default(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "dashboard"], cls, return_value={"values": []}, + ) + assert result.exit_code == 0, result.output + inst.query_dashboard.assert_called_once_with(sort_asc=None) + + def test_dashboard_sort_asc(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "dashboard", "--sort-asc"], cls, + return_value={"values": []}, + ) + assert result.exit_code == 0, result.output + inst.query_dashboard.assert_called_once_with(sort_asc=True) + + +# --------------------------------------------------------------------------- +# cve list +# --------------------------------------------------------------------------- + + +class TestVulnerabilityCveList: + def test_default(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "cve", "list"], cls, return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + inst.query_cves.assert_called_once_with( + cursor=None, + limit=None, + sort_by=None, + sort_asc=None, + filters=None, + search=None, + include_tags=None, + ) + + def test_filter_repeats_become_list(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "list", + "--filter", "severity=HIGH", + "--filter", "severity=CRITICAL", + "--filter", "platform=1", + "--limit", "50", + "--sort-by", "severity", + "--sort-asc", + ], + cls, + return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + inst.query_cves.assert_called_once_with( + cursor=None, + limit=50, + sort_by="severity", + sort_asc=True, + filters={"severity": ["HIGH", "CRITICAL"], "platform": ["1"]}, + search=None, + include_tags=None, + ) + + def test_filters_json_merges_with_filter_pairs(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "list", + "--filters-json", '{"severity": ["LOW"]}', + "--filter", "platform=1", + ], + cls, + return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + kwargs = inst.query_cves.call_args[1] + assert kwargs["filters"] == {"severity": ["LOW"], "platform": ["1"]} + + def test_invalid_filter_pair_rejected(self): + runner = CliRunner() + result = runner.invoke( + cli, ["vulnerability", "cve", "list", "--filter", "no-equals-here"], + ) + assert result.exit_code != 0 + assert "KEY=VALUE" in result.output + + def test_search_requires_all_three(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + cls.return_value.query_cves.return_value = {"results": []} + runner = CliRunner() + result = runner.invoke( + cli, + [ + "vulnerability", "cve", "list", + "--search-field", "cve", + ], + ) + assert result.exit_code != 0 + assert "must be used together" in result.output + + def test_search_built_correctly(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "list", + "--search-field", "cve", + "--search-op", "contains", + "--search-value", "CVE-2025", + ], + cls, + return_value={"results": []}, + ) + assert result.exit_code == 0, result.output + kwargs = inst.query_cves.call_args[1] + assert kwargs["search"] == { + "search": "cve", "op": "contains", "value": "CVE-2025", + } + + +# --------------------------------------------------------------------------- +# cve get / hosts / packages +# --------------------------------------------------------------------------- + + +class TestVulnerabilityCveDrilldowns: + def test_get(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + ["vulnerability", "cve", "get", "CVE-2021-44228"], + cls, + return_value={"cve": {}}, + ) + assert result.exit_code == 0, result.output + inst.get_cve.assert_called_once_with("CVE-2021-44228") + + def test_hosts(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "hosts", "CVE-2021-44228", + "--include-tags", + ], + cls, + return_value={"hosts": []}, + ) + assert result.exit_code == 0, result.output + inst.query_cve_hosts.assert_called_once_with( + "CVE-2021-44228", + cursor=None, + limit=None, + sort_by=None, + sort_asc=None, + filters=None, + search=None, + include_tags=True, + ) + + def test_packages(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "cve", "packages", "CVE-2021-44228", + "--sort-by", "package_name", "--sort-asc", + ], + cls, + return_value={"packages": []}, + ) + assert result.exit_code == 0, result.output + inst.query_cve_packages.assert_called_once_with( + "CVE-2021-44228", + cursor=None, + limit=None, + sort_by="package_name", + sort_asc=True, + ) + + +# --------------------------------------------------------------------------- +# host +# --------------------------------------------------------------------------- + + +class TestVulnerabilityHost: + def test_host_list(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "host", "list", + "--sort-by", "count", + "--filter", "platform=1", + ], + cls, + return_value={"endpoints": []}, + ) + assert result.exit_code == 0, result.output + inst.query_endpoints.assert_called_once_with( + cursor=None, + limit=None, + sort_by="count", + sort_asc=None, + filters={"platform": ["1"]}, + search=None, + include_tags=None, + ) + + def test_host_packages(self): + p1, p2, p3 = _patches() + with p1, p2, p3 as cls: + result, inst = _invoke( + [ + "vulnerability", "host", "packages", "sid-1", + "--sort-by", "score", + ], + cls, + return_value={"packages": []}, + ) + assert result.exit_code == 0, result.output + inst.query_host_packages.assert_called_once_with( + "sid-1", + cursor=None, + limit=None, + sort_by="score", + sort_asc=None, + filters=None, + search=None, + include_tags=None, + ) diff --git a/tests/unit/test_sdk_vulnerability.py b/tests/unit/test_sdk_vulnerability.py new file mode 100644 index 0000000..d7da52c --- /dev/null +++ b/tests/unit/test_sdk_vulnerability.py @@ -0,0 +1,167 @@ +"""Tests for limacharlie.sdk.vulnerability module.""" + +import base64 +import gzip +import json +from unittest.mock import MagicMock + +import pytest + +from limacharlie.sdk.vulnerability import Vulnerability, _EXTENSION_NAME + + +@pytest.fixture +def mock_org(): + org = MagicMock() + org.oid = "test-oid" + org.client = MagicMock() + org.client._jwt = "fake-jwt-token" + return org + + +@pytest.fixture +def vuln(mock_org): + return Vulnerability(mock_org) + + +def _decode(call_args) -> tuple[str, dict]: + """Pull (action, decoded_data) out of a mocked Extensions.request call.""" + params = call_args[1]["params"] + decoded = json.loads(gzip.decompress(base64.b64decode(params["gzdata"]))) + return params["action"], decoded + + +class TestExtensionTarget: + def test_extension_name_constant(self): + assert _EXTENSION_NAME == "ext-vulnerability-reporting" + + def test_oid_property(self, vuln, mock_org): + assert vuln.oid == mock_org.oid + + +class TestQueryCves: + def test_default_omits_unset_keys(self, vuln, mock_org): + mock_org.client.request.return_value = {"results": []} + vuln.query_cves() + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cves" + assert data == {} + + def test_passes_all_options(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_cves( + cursor="c1", + limit=50, + sort_by="severity", + sort_asc=True, + filters={"severity": ["HIGH", "CRITICAL"]}, + search={"search": "cve", "op": "contains", "value": "CVE-2025"}, + include_tags=True, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cves" + assert data == { + "cursor": "c1", + "limit": 50, + "sort_by": "severity", + "sort_asc": True, + "filters": {"severity": ["HIGH", "CRITICAL"]}, + "search": {"search": "cve", "op": "contains", "value": "CVE-2025"}, + "include_tags": True, + } + + +class TestGetCve: + def test_sends_cve_id(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.get_cve("CVE-2021-44228") + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cve" + assert data == {"cve_id": "CVE-2021-44228"} + + +class TestQueryCveHosts: + def test_required_cve_and_options(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_cve_hosts( + "CVE-2021-44228", + limit=10, + sort_by="hostname", + include_tags=True, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cve_vuln_hosts" + assert data["cve"] == "CVE-2021-44228" + assert data["limit"] == 10 + assert data["sort_by"] == "hostname" + assert data["include_tags"] is True + + +class TestQueryCvePackages: + def test_required_cve_and_pagination(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_cve_packages( + "CVE-2021-44228", + cursor="abc", + limit=25, + sort_by="count", + sort_asc=False, + ) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_cve_vuln_packages" + assert data == { + "cve": "CVE-2021-44228", + "cursor": "abc", + "limit": 25, + "sort_by": "count", + "sort_asc": False, + } + + +class TestQueryEndpoints: + def test_passes_options(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_endpoints(sort_by="count", filters={"platform": ["1"]}) + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_endpoints" + assert data == {"sort_by": "count", "filters": {"platform": ["1"]}} + + +class TestQueryHostPackages: + def test_required_sid(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_host_packages("sid-1", sort_by="score") + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_host_vuln_packages" + assert data["sid"] == "sid-1" + assert data["sort_by"] == "score" + + +class TestQueryDashboard: + def test_default_empty(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_dashboard() + action, data = _decode(mock_org.client.request.call_args) + assert action == "query_dashboard" + assert data == {} + + def test_sort_asc(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.query_dashboard(sort_asc=True) + action, data = _decode(mock_org.client.request.call_args) + assert data == {"sort_asc": True} + + +class TestScan: + def test_default_simulate_false(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.scan("sid-42") + action, data = _decode(mock_org.client.request.call_args) + assert action == "scan_packages" + assert data == {"sid": "sid-42", "simulate": False} + + def test_simulate_true(self, vuln, mock_org): + mock_org.client.request.return_value = {} + vuln.scan("sid-42", simulate=True) + action, data = _decode(mock_org.client.request.call_args) + assert data == {"sid": "sid-42", "simulate": True} From 717d7c9136ac1e47eeaf3e73e8c80b73f0815839 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Mon, 4 May 2026 10:14:30 -0700 Subject: [PATCH 2/3] feat: add unwrap=False flag to Extensions.request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in unwrap parameter to limacharlie.sdk.extensions.Extensions.request that returns the inner data field of the standard {data, error, retry} extension envelope instead of the whole response. Default stays False to keep existing callers (feedback, cases, configs, the extension request CLI) byte-compatible. The new vulnerability SDK opts in everywhere so its CLI surfaces clean payloads — JMESPath filters can now address fields directly (e.g. 'results[*].cve' instead of 'data.results[*].cve') and --output table renders rows instead of '{N keys}'. Co-Authored-By: Claude Opus 4.7 (1M context) --- limacharlie/sdk/extensions.py | 17 ++++++++++++++--- limacharlie/sdk/vulnerability.py | 13 ++++++++----- tests/unit/test_sdk_extensions.py | 18 ++++++++++++++++++ 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/limacharlie/sdk/extensions.py b/limacharlie/sdk/extensions.py index 6c00600..7d1ae96 100644 --- a/limacharlie/sdk/extensions.py +++ b/limacharlie/sdk/extensions.py @@ -96,7 +96,7 @@ def delete(self, name: str) -> dict[str, Any]: return self._org.client.request("DELETE", f"extension/definition/{name}") def request(self, extension_name: str, action: str, data: dict[str, Any] | None = None, - is_impersonated: bool = False) -> dict[str, Any]: + is_impersonated: bool = False, unwrap: bool = False) -> dict[str, Any]: """Call an extension. Args: @@ -104,9 +104,17 @@ def request(self, extension_name: str, action: str, data: dict[str, Any] | None action: Action to invoke. data: Request data dict. is_impersonated: If True, impersonate the caller. + unwrap: If True, return just the ``data`` field of the + extension's ``common.Response`` envelope (``{data, error, + retry}``) instead of the whole envelope. Application-level + errors (non-empty ``error``) are surfaced as a non-200 by + the API gateway and therefore raise before reaching here, + so unwrap is safe on the success path. Defaults to + ``False`` to preserve byte-compatibility with existing + callers; new typed SDK wrappers should opt in. Returns: - dict: Extension response. + dict: Extension response (envelope or unwrapped ``data``). """ if data is None: data = {} @@ -120,6 +128,9 @@ def request(self, extension_name: str, action: str, data: dict[str, Any] | None if client._jwt is None: client.refresh_jwt() params["impersonator_jwt"] = client._jwt - return self._org.client.request( + resp = self._org.client.request( "POST", f"extension/request/{extension_name}", params=params ) + if unwrap and isinstance(resp, dict) and "data" in resp: + return resp["data"] + return resp diff --git a/limacharlie/sdk/vulnerability.py b/limacharlie/sdk/vulnerability.py index 2d87988..ac88138 100644 --- a/limacharlie/sdk/vulnerability.py +++ b/limacharlie/sdk/vulnerability.py @@ -107,6 +107,7 @@ def query_cves( search=search, include_tags=include_tags, ), + unwrap=True, ) def get_cve(self, cve_id: str) -> dict[str, Any]: @@ -116,6 +117,7 @@ def get_cve(self, cve_id: str) -> dict[str, Any]: _EXTENSION_NAME, "query_cve", data={"cve_id": cve_id}, + unwrap=True, ) def query_cve_hosts( @@ -152,7 +154,7 @@ def query_cve_hosts( ) data["cve"] = cve ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "query_cve_vuln_hosts", data=data) + return ext.request(_EXTENSION_NAME, "query_cve_vuln_hosts", data=data, unwrap=True) def query_cve_packages( self, @@ -181,7 +183,7 @@ def query_cve_packages( if sort_asc is not None: data["sort_asc"] = sort_asc ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "query_cve_vuln_packages", data=data) + return ext.request(_EXTENSION_NAME, "query_cve_vuln_packages", data=data, unwrap=True) # ------------------------------------------------------------------ # Endpoint-centric views @@ -219,6 +221,7 @@ def query_endpoints( search=search, include_tags=include_tags, ), + unwrap=True, ) def query_host_packages( @@ -252,7 +255,7 @@ def query_host_packages( ) data["sid"] = sid ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "query_host_vuln_packages", data=data) + return ext.request(_EXTENSION_NAME, "query_host_vuln_packages", data=data, unwrap=True) # ------------------------------------------------------------------ # Dashboard @@ -264,7 +267,7 @@ def query_dashboard(self, *, sort_asc: bool | None = None) -> dict[str, Any]: if sort_asc is not None: data["sort_asc"] = sort_asc ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "query_dashboard", data=data) + return ext.request(_EXTENSION_NAME, "query_dashboard", data=data, unwrap=True) # ------------------------------------------------------------------ # Scan trigger @@ -283,4 +286,4 @@ def scan(self, sid: str, *, simulate: bool = False) -> dict[str, Any]: """ data: dict[str, Any] = {"sid": sid, "simulate": bool(simulate)} ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "scan_packages", data=data) + return ext.request(_EXTENSION_NAME, "scan_packages", data=data, unwrap=True) diff --git a/tests/unit/test_sdk_extensions.py b/tests/unit/test_sdk_extensions.py index 8847cb9..cfa7621 100644 --- a/tests/unit/test_sdk_extensions.py +++ b/tests/unit/test_sdk_extensions.py @@ -155,3 +155,21 @@ def test_impersonated_refreshes_jwt_if_none(self, ext, mock_org): mock_org.client.request.return_value = {} ext.request("ext-zeek", "get_logs", {}, is_impersonated=True) mock_org.client.refresh_jwt.assert_called_once() + + def test_unwrap_default_false_returns_envelope(self, ext, mock_org): + envelope = {"data": {"x": 1}, "error": "", "retry": False} + mock_org.client.request.return_value = envelope + assert ext.request("ext-zeek", "ping") == envelope + + def test_unwrap_true_returns_data_field(self, ext, mock_org): + mock_org.client.request.return_value = { + "data": {"x": 1}, "error": "", "retry": False, + } + assert ext.request("ext-zeek", "ping", unwrap=True) == {"x": 1} + + def test_unwrap_true_with_no_data_key_returns_envelope(self, ext, mock_org): + # Defensive: if the API ever returns something that isn't the + # standard envelope, unwrap=True must not crash — just hand back + # whatever we got. + mock_org.client.request.return_value = {"unexpected": "shape"} + assert ext.request("ext-zeek", "ping", unwrap=True) == {"unexpected": "shape"} From a206da93e84afc0916550c40648b742e6e512c92 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Mon, 4 May 2026 11:26:08 -0700 Subject: [PATCH 3/3] refactor: drop --simulate from vulnerability scan The simulate flag was a debug knob from the extension's testing pipeline; it doesn't belong in the user-facing CLI. Co-Authored-By: Claude Opus 4.7 (1M context) --- limacharlie/commands/vulnerability.py | 16 +++------------- limacharlie/sdk/vulnerability.py | 9 ++++----- tests/unit/test_cli_vulnerability.py | 20 ++++++++------------ tests/unit/test_sdk_vulnerability.py | 10 ++-------- 4 files changed, 17 insertions(+), 38 deletions(-) diff --git a/limacharlie/commands/vulnerability.py b/limacharlie/commands/vulnerability.py index dbc9c7a..c0bf4a6 100644 --- a/limacharlie/commands/vulnerability.py +++ b/limacharlie/commands/vulnerability.py @@ -53,13 +53,8 @@ when you want fresh data for one host without waiting for the daily schedule. -Use --simulate to skip the sensor task and exercise the scan -pipeline with synthetic packages — useful for end-to-end testing -of detections that consume vulnerability findings. - Examples: limacharlie vulnerability scan --sid 11111111-2222-3333-4444-555555555555 - limacharlie vulnerability scan --sid --simulate """ _EXPLAIN_DASHBOARD = """\ @@ -324,21 +319,16 @@ def group() -> None: @group.command("scan") @click.option("--sid", required=True, help="Sensor ID to scan.") -@click.option( - "--simulate", is_flag=True, default=False, - help="Skip the sensor task and exercise the pipeline with simulated packages.", -) @pass_context -def scan(ctx, sid, simulate) -> None: +def scan(ctx, sid) -> None: """Trigger an on-demand os_packages scan for a sensor. \b - Examples: + Example: limacharlie vulnerability scan --sid 11111111-2222-3333-4444-555555555555 - limacharlie vulnerability scan --sid --simulate """ v = _get_vuln(ctx) - data = v.scan(sid, simulate=simulate) + data = v.scan(sid) _output(ctx, data) diff --git a/limacharlie/sdk/vulnerability.py b/limacharlie/sdk/vulnerability.py index ac88138..15296ea 100644 --- a/limacharlie/sdk/vulnerability.py +++ b/limacharlie/sdk/vulnerability.py @@ -273,17 +273,16 @@ def query_dashboard(self, *, sort_asc: bool | None = None) -> dict[str, Any]: # Scan trigger # ------------------------------------------------------------------ - def scan(self, sid: str, *, simulate: bool = False) -> dict[str, Any]: + def scan(self, sid: str) -> dict[str, Any]: """Trigger an on-demand os_packages scan for a sensor. Args: sid: Sensor id to scan. - simulate: If True, the extension returns simulated CVE matches - without tasking the sensor (useful for end-to-end testing). Returns: ``{"message": str, "sid": str}``. """ - data: dict[str, Any] = {"sid": sid, "simulate": bool(simulate)} ext = Extensions(self._org) - return ext.request(_EXTENSION_NAME, "scan_packages", data=data, unwrap=True) + return ext.request( + _EXTENSION_NAME, "scan_packages", data={"sid": sid}, unwrap=True, + ) diff --git a/tests/unit/test_cli_vulnerability.py b/tests/unit/test_cli_vulnerability.py index 5b4cb99..3a16340 100644 --- a/tests/unit/test_cli_vulnerability.py +++ b/tests/unit/test_cli_vulnerability.py @@ -80,24 +80,20 @@ def test_scan_basic(self): return_value={"message": "ok", "sid": "sid-1"}, ) assert result.exit_code == 0, result.output - inst.scan.assert_called_once_with("sid-1", simulate=False) - - def test_scan_simulate(self): - p1, p2, p3 = _patches() - with p1, p2, p3 as cls: - result, inst = _invoke( - ["vulnerability", "scan", "--sid", "sid-1", "--simulate"], - cls, - return_value={"message": "ok", "sid": "sid-1"}, - ) - assert result.exit_code == 0, result.output - inst.scan.assert_called_once_with("sid-1", simulate=True) + inst.scan.assert_called_once_with("sid-1") def test_scan_requires_sid(self): runner = CliRunner() result = runner.invoke(cli, ["vulnerability", "scan"]) assert result.exit_code != 0 + def test_scan_rejects_simulate_flag(self): + runner = CliRunner() + result = runner.invoke( + cli, ["vulnerability", "scan", "--sid", "sid-1", "--simulate"], + ) + assert result.exit_code != 0 + # --------------------------------------------------------------------------- # dashboard diff --git a/tests/unit/test_sdk_vulnerability.py b/tests/unit/test_sdk_vulnerability.py index d7da52c..9d9850e 100644 --- a/tests/unit/test_sdk_vulnerability.py +++ b/tests/unit/test_sdk_vulnerability.py @@ -153,15 +153,9 @@ def test_sort_asc(self, vuln, mock_org): class TestScan: - def test_default_simulate_false(self, vuln, mock_org): + def test_sends_sid(self, vuln, mock_org): mock_org.client.request.return_value = {} vuln.scan("sid-42") action, data = _decode(mock_org.client.request.call_args) assert action == "scan_packages" - assert data == {"sid": "sid-42", "simulate": False} - - def test_simulate_true(self, vuln, mock_org): - mock_org.client.request.return_value = {} - vuln.scan("sid-42", simulate=True) - action, data = _decode(mock_org.client.request.call_args) - assert data == {"sid": "sid-42", "simulate": True} + assert data == {"sid": "sid-42"}