From 04d3b3600522ccef37175a11ac14d4ef187fe431 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Sat, 2 May 2026 11:10:48 -0700 Subject: [PATCH] feat: ioc batch-search --info=locations + --limit Surfaces the new info=locations / limit params on POST /insight/{oid}/objects (lc_api-go) which routes to insight-go's get_obj_batch_locations RPC. Adds the params to Insight.batch_search and exposes them as --info / --limit on the batch-search CLI. The whole batch still consumes a single ioc-search rate-limit charge regardless of --info -- that's the entire reason callers should prefer batch-search over fanning out per-IOC search calls. Co-Authored-By: Claude Opus 4.7 (1M context) --- limacharlie/commands/ioc.py | 17 +++++++++++++++-- limacharlie/sdk/insight.py | 12 ++++++++++-- tests/unit/test_sdk_insight.py | 23 +++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/limacharlie/commands/ioc.py b/limacharlie/commands/ioc.py index c07645e4..851e345b 100644 --- a/limacharlie/commands/ioc.py +++ b/limacharlie/commands/ioc.py @@ -139,17 +139,30 @@ def search(ctx: click.Context, ioc_type: str, value: str) -> None: This is more efficient than running individual searches when you have many IOCs to check. Data can also be piped via stdin. +--info controls the response shape: + summary four-bucket prevalence counts per indicator (default). + locations prevalence counts plus a capped list of sensor IDs that + observed each indicator. Use --limit to set the per-indicator + cap (default 100, max 1000). + +The whole batch consumes a single ioc-search rate-limit charge regardless +of --info, which is the main reason to prefer batch-search over fanning +out per-IOC search calls. + Example: limacharlie ioc batch-search --input-file iocs.json cat iocs.yaml | limacharlie ioc batch-search + limacharlie ioc batch-search --input-file iocs.json --info locations --limit 50 """ register_explain("ioc.batch-search", _EXPLAIN_BATCH_SEARCH) @group.command("batch-search") @click.option("--input-file", default=None, type=click.Path(exists=True), help="Path to JSON file with IOCs ({type: [values]}). Reads stdin if omitted.") +@click.option("--info", "info", type=click.Choice(["summary", "locations"]), default="summary", show_default=True, help="Response shape: 'summary' counts only, or 'locations' to also return per-indicator sensor IDs.") +@click.option("--limit", type=int, default=None, help="Per-indicator location cap when --info=locations (default 100, max 1000).") @pass_context -def batch_search(ctx: click.Context, input_file: str | None) -> None: +def batch_search(ctx: click.Context, input_file: str | None, info: str, limit: int | None) -> None: data = _load_input(input_file) if data is None: click.echo( @@ -167,7 +180,7 @@ def batch_search(ctx: click.Context, input_file: str | None) -> None: org = _get_org(ctx) insight = Insight(org) - result = insight.batch_search(data) + result = insight.batch_search(data, info=info, limit=limit) _output(ctx, result) diff --git a/limacharlie/sdk/insight.py b/limacharlie/sdk/insight.py index 28ccb660..a15d5efa 100644 --- a/limacharlie/sdk/insight.py +++ b/limacharlie/sdk/insight.py @@ -64,21 +64,29 @@ def search_ioc(self, obj_type: str, obj_name: str, info: str = "summary", query_params=qp, ) - def batch_search(self, objects: dict[str, list[str]], case_sensitive: bool = True) -> dict[str, Any]: + def batch_search(self, objects: dict[str, list[str]], case_sensitive: bool = True, + info: str = "summary", limit: int | None = None) -> dict[str, Any]: """Batch IOC search. Args: objects: Dict of type -> list of values. case_sensitive: Case-sensitive matching. + info: Type of information to receive ('summary' or 'locations'). + 'summary' returns the four-bucket prevalence counts per indicator. + 'locations' additionally returns a per-indicator capped list of sensor IDs. + limit: Per-indicator location cap when info='locations' (default 100, max 1000). + Ignored when info='summary'. Returns: dict: Batch results. """ - # V1 uses form-encoded params params = { "objects": json.dumps({k: list(v) for k, v in objects.items()}), "case_sensitive": "true" if case_sensitive else "false", + "info": info, } + if limit is not None: + params["limit"] = str(limit) return self.client.request( "POST", f"insight/{self._org.oid}/objects", diff --git a/tests/unit/test_sdk_insight.py b/tests/unit/test_sdk_insight.py index 0702a5e6..ab5bad40 100644 --- a/tests/unit/test_sdk_insight.py +++ b/tests/unit/test_sdk_insight.py @@ -109,6 +109,8 @@ def test_path_and_params(self, insight, mock_org): decoded = json.loads(params["objects"]) assert decoded == objects assert params["case_sensitive"] == "true" + assert params["info"] == "summary" + assert "limit" not in params def test_case_insensitive(self, insight, mock_org): mock_org.client.request.return_value = {"results": {}} @@ -117,6 +119,27 @@ def test_case_insensitive(self, insight, mock_org): params = call_args[1]["params"] assert params["case_sensitive"] == "false" + def test_info_locations(self, insight, mock_org): + mock_org.client.request.return_value = {"results": {}} + insight.batch_search({"domain": ["evil.com"]}, info="locations") + call_args = mock_org.client.request.call_args + params = call_args[1]["params"] + assert params["info"] == "locations" + + def test_limit_included_when_set(self, insight, mock_org): + mock_org.client.request.return_value = {"results": {}} + insight.batch_search({"domain": ["evil.com"]}, info="locations", limit=50) + call_args = mock_org.client.request.call_args + params = call_args[1]["params"] + assert params["limit"] == "50" + + def test_limit_excluded_when_none(self, insight, mock_org): + mock_org.client.request.return_value = {"results": {}} + insight.batch_search({"domain": ["evil.com"]}, info="locations") + call_args = mock_org.client.request.call_args + params = call_args[1]["params"] + assert "limit" not in params + class TestInsightGetObjectInformation: def test_delegates_to_search_ioc(self, insight, mock_org):