Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions limacharlie/commands/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,30 @@ def search(ctx: click.Context, ioc_type: str, value: str) -> None:
This is more efficient than running individual searches when you have
many IOCs to check. Data can also be piped via stdin.

--info controls the response shape:
summary four-bucket prevalence counts per indicator (default).
locations prevalence counts plus a capped list of sensor IDs that
observed each indicator. Use --limit to set the per-indicator
cap (default 100, max 1000).

The whole batch consumes a single ioc-search rate-limit charge regardless
of --info, which is the main reason to prefer batch-search over fanning
out per-IOC search calls.

Example:
limacharlie ioc batch-search --input-file iocs.json
cat iocs.yaml | limacharlie ioc batch-search
limacharlie ioc batch-search --input-file iocs.json --info locations --limit 50
"""
register_explain("ioc.batch-search", _EXPLAIN_BATCH_SEARCH)


@group.command("batch-search")
@click.option("--input-file", default=None, type=click.Path(exists=True), help="Path to JSON file with IOCs ({type: [values]}). Reads stdin if omitted.")
@click.option("--info", "info", type=click.Choice(["summary", "locations"]), default="summary", show_default=True, help="Response shape: 'summary' counts only, or 'locations' to also return per-indicator sensor IDs.")
@click.option("--limit", type=int, default=None, help="Per-indicator location cap when --info=locations (default 100, max 1000).")
@pass_context
def batch_search(ctx: click.Context, input_file: str | None) -> None:
def batch_search(ctx: click.Context, input_file: str | None, info: str, limit: int | None) -> None:
data = _load_input(input_file)
if data is None:
click.echo(
Expand All @@ -167,7 +180,7 @@ def batch_search(ctx: click.Context, input_file: str | None) -> None:

org = _get_org(ctx)
insight = Insight(org)
result = insight.batch_search(data)
result = insight.batch_search(data, info=info, limit=limit)
_output(ctx, result)


Expand Down
12 changes: 10 additions & 2 deletions limacharlie/sdk/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,29 @@ def search_ioc(self, obj_type: str, obj_name: str, info: str = "summary",
query_params=qp,
)

def batch_search(self, objects: dict[str, list[str]], case_sensitive: bool = True) -> dict[str, Any]:
def batch_search(self, objects: dict[str, list[str]], case_sensitive: bool = True,
info: str = "summary", limit: int | None = None) -> dict[str, Any]:
"""Batch IOC search.

Args:
objects: Dict of type -> list of values.
case_sensitive: Case-sensitive matching.
info: Type of information to receive ('summary' or 'locations').
'summary' returns the four-bucket prevalence counts per indicator.
'locations' additionally returns a per-indicator capped list of sensor IDs.
limit: Per-indicator location cap when info='locations' (default 100, max 1000).
Ignored when info='summary'.

Returns:
dict: Batch results.
"""
# V1 uses form-encoded params
params = {
"objects": json.dumps({k: list(v) for k, v in objects.items()}),
"case_sensitive": "true" if case_sensitive else "false",
"info": info,
}
if limit is not None:
params["limit"] = str(limit)
return self.client.request(
"POST",
f"insight/{self._org.oid}/objects",
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/test_sdk_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def test_path_and_params(self, insight, mock_org):
decoded = json.loads(params["objects"])
assert decoded == objects
assert params["case_sensitive"] == "true"
assert params["info"] == "summary"
assert "limit" not in params

def test_case_insensitive(self, insight, mock_org):
mock_org.client.request.return_value = {"results": {}}
Expand All @@ -117,6 +119,27 @@ def test_case_insensitive(self, insight, mock_org):
params = call_args[1]["params"]
assert params["case_sensitive"] == "false"

def test_info_locations(self, insight, mock_org):
mock_org.client.request.return_value = {"results": {}}
insight.batch_search({"domain": ["evil.com"]}, info="locations")
call_args = mock_org.client.request.call_args
params = call_args[1]["params"]
assert params["info"] == "locations"

def test_limit_included_when_set(self, insight, mock_org):
mock_org.client.request.return_value = {"results": {}}
insight.batch_search({"domain": ["evil.com"]}, info="locations", limit=50)
call_args = mock_org.client.request.call_args
params = call_args[1]["params"]
assert params["limit"] == "50"

def test_limit_excluded_when_none(self, insight, mock_org):
mock_org.client.request.return_value = {"results": {}}
insight.batch_search({"domain": ["evil.com"]}, info="locations")
call_args = mock_org.client.request.call_args
params = call_args[1]["params"]
assert "limit" not in params


class TestInsightGetObjectInformation:
def test_delegates_to_search_ioc(self, insight, mock_org):
Expand Down