diff --git a/.claude/permissions-standard.json b/.claude/permissions-standard.json index 2726425..7bde2a7 100644 --- a/.claude/permissions-standard.json +++ b/.claude/permissions-standard.json @@ -21,6 +21,14 @@ "mcp__crowdstrike__cloud_get_iom_detections", "mcp__crowdstrike__cloud_query_assets", "mcp__crowdstrike__cloud_compliance_by_account", + "mcp__crowdstrike__case_query_access_tags", + "mcp__crowdstrike__case_get_access_tags", + "mcp__crowdstrike__case_aggregate_access_tags", + "mcp__crowdstrike__case_get_rtr_file_metadata", + "mcp__crowdstrike__case_get_rtr_recent_files", + "mcp__crowdstrike__correlation_list_templates", + "mcp__crowdstrike__correlation_get_template", + "mcp__crowdstrike__spotlight_supported_evaluations", "mcp__crowdstrike__update_alert_status", "mcp__crowdstrike__case_create", "mcp__crowdstrike__case_update", diff --git a/common/api_scopes.py b/common/api_scopes.py index 35669c0..ae95a0a 100644 --- a/common/api_scopes.py +++ b/common/api_scopes.py @@ -16,8 +16,6 @@ "query_alerts_v2": ["alerts:read"], "get_alerts_v2": ["alerts:read"], "update_alerts_v3": ["alerts:write"], - # Detects (Endpoint) - "get_detect_summaries": ["detects:read"], # Hosts "query_devices_by_filter": ["hosts:read"], "get_device_details": ["hosts:read"], @@ -27,6 +25,9 @@ "query_rules": ["correlation-rules:read"], "get_rules": ["correlation-rules:read"], "update_rules": ["correlation-rules:write"], + # Correlation Rules — Templates (v1.6.1) + "queries_templates_get_v1Mixin0": ["correlation-rules:read"], + "entities_templates_get_v1Mixin0": ["correlation-rules:read"], # CSPM Registration "get_aws_account": ["cspm-registration:read"], "get_azure_account": ["cspm-registration:read"], @@ -43,6 +44,12 @@ "entities_files_upload_post_v1": ["cases:write"], "entities_fields_get_v1": ["cases:read"], "queries_fields_get_v1": ["cases:read"], + # Case Management — Access Tags & RTR (v1.6.1) + "queries_access_tags_get_v1": ["cases:read"], + "entities_access_tags_get_v1": ["cases:read"], + "aggregates_access_tags_post_v1": ["cases:read"], + "entities_get_rtr_file_metadata_post_v1": ["cases:read"], + "entities_retrieve_rtr_recent_file_post_v1": ["cases:read"], # Cloud Security "combined_cloud_risks": ["cloud-security:read"], "query_iom_entities": ["cloud-security-detections:read"], @@ -50,6 +57,16 @@ "query_assets": ["cloud-security-assets:read"], "get_assets": ["cloud-security-assets:read"], "get_combined_compliance_by_account": ["cloud-security-assets:read"], + # Spotlight Evaluation Logic + "combinedSupportedEvaluationExt": ["spotlight-vulnerabilities:read"], + # CAO Hunting + "search_queries": ["cao-hunting:read"], + "get_queries": ["cao-hunting:read"], + "aggregate_queries": ["cao-hunting:read"], + "search_guides": ["cao-hunting:read"], + "get_guides": ["cao-hunting:read"], + "aggregate_guides": ["cao-hunting:read"], + "create_export_archive": ["cao-hunting:read"], } diff --git a/docs/superpowers/plans/2026-04-01-falconpy-v1.6.1-upgrade.md b/docs/superpowers/plans/2026-04-01-falconpy-v1.6.1-upgrade.md new file mode 100644 index 0000000..126260b --- /dev/null +++ b/docs/superpowers/plans/2026-04-01-falconpy-v1.6.1-upgrade.md @@ -0,0 +1,1247 @@ +# FalconPy v1.6.1 Upgrade Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Bump FalconPy to v1.6.1, add 8 new read-only MCP tools across case management, correlation rules, and spotlight modules. + +**Architecture:** Extend two existing modules (`case_management.py`, `correlation.py`) with new read-only tools using already-imported FalconPy service classes. Create one new module (`spotlight.py`) for Spotlight Evaluation Logic. Update smoke tests, api scopes, and permission presets. + +**Tech Stack:** Python 3, FalconPy SDK (>=1.6.1), FastMCP, pytest + +--- + +## File Structure + +| File | Action | Responsibility | +|------|--------|----------------| +| `requirements.txt` | Modify | Bump falconpy version | +| `modules/case_management.py` | Modify | Add 5 new read tools (access tags + RTR metadata) | +| `modules/correlation.py` | Modify | Add 2 new read tools (template query/get) | +| `modules/spotlight.py` | Create | New module with 1 read tool (supported evaluations) | +| `common/api_scopes.py` | Modify | Add scope mappings for 8 new operations | +| `.claude/permissions-standard.json` | Modify | Add 8 new read tools | +| `tests/test_case_management_new_tools.py` | Create | Tests for 5 new case management tools | +| `tests/test_correlation_templates.py` | Create | Tests for 2 new correlation template tools | +| `tests/test_spotlight.py` | Create | Tests for spotlight module | +| `tests/test_smoke_tools_list.py` | Modify | Add 8 new tools to expected sets | + +--- + +### Task 1: Bump FalconPy Version + +**Files:** +- Modify: `requirements.txt:1` + +- [ ] **Step 1: Update requirements.txt** + +In `requirements.txt`, change line 1: + +```python +# old +crowdstrike-falconpy>=1.6.0 +# new +crowdstrike-falconpy>=1.6.1 +``` + +- [ ] **Step 2: Commit** + +```bash +git add requirements.txt +git commit -m "chore: bump falconpy dependency to >=1.6.1" +``` + +--- + +### Task 2: Add Case Management Access Tags Tools (3 tools) + +**Files:** +- Modify: `modules/case_management.py` +- Create: `tests/test_case_management_new_tools.py` + +- [ ] **Step 1: Write failing tests for access tags tools** + +Create `tests/test_case_management_new_tools.py`: + +```python +"""Tests for new case management tools added in FalconPy v1.6.1.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture +def case_module(mock_client): + """Create CaseManagementModule with mocked API.""" + with patch("modules.case_management.CaseManagement") as MockCM: + mock_cm = MagicMock() + MockCM.return_value = mock_cm + from modules.case_management import CaseManagementModule + + module = CaseManagementModule(mock_client) + module.falcon = mock_cm + return module + + +class TestCaseQueryAccessTags: + """Test case_query_access_tags tool.""" + + def test_returns_tag_ids(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": ["tag-001", "tag-002"], + "meta": {"pagination": {"total": 2}}, + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_query_access_tags() + ) + assert "tag-001" in result + assert "tag-002" in result + + def test_handles_empty_results(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": [], + "meta": {"pagination": {"total": 0}}, + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_query_access_tags() + ) + assert "no access tags" in result.lower() or "0" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_query_access_tags() + ) + assert "failed" in result.lower() + + +class TestCaseGetAccessTags: + """Test case_get_access_tags tool.""" + + def test_returns_tag_details(self, case_module): + case_module.falcon.get_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"id": "tag-001", "name": "SOC-Team", "description": "SOC team access"} + ] + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_access_tags(tag_ids=["tag-001"]) + ) + assert "SOC-Team" in result + assert "tag-001" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_access_tags.return_value = { + "status_code": 404, + "body": {"errors": [{"message": "Not found"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_access_tags(tag_ids=["bad-id"]) + ) + assert "failed" in result.lower() + + +class TestCaseAggregateAccessTags: + """Test case_aggregate_access_tags tool.""" + + def test_returns_aggregation_data(self, case_module): + case_module.falcon.aggregate_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"name": "tag_count", "buckets": [{"label": "SOC", "count": 5}]} + ] + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_aggregate_access_tags( + date_ranges=[], + field="name", + filter="", + name="tag_count", + type="terms", + ) + ) + assert "tag_count" in result or "SOC" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.aggregate_access_tags.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_aggregate_access_tags( + date_ranges=[], + field="name", + filter="", + name="tag_count", + type="terms", + ) + ) + assert "failed" in result.lower() + + +class TestToolRegistration: + """Verify new tools register correctly.""" + + def test_access_tag_tools_register_as_read(self, case_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + case_module.register_tools(server) + assert "case_query_access_tags" in case_module.tools + assert "case_get_access_tags" in case_module.tools + assert "case_aggregate_access_tags" in case_module.tools +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `python -m pytest tests/test_case_management_new_tools.py -v` +Expected: FAIL — methods `case_query_access_tags`, `case_get_access_tags`, `case_aggregate_access_tags` don't exist yet. + +- [ ] **Step 3: Implement access tags tools in case_management.py** + +Add to the module docstring at the top of `modules/case_management.py`, after `case_get_fields`: + +```python + case_query_access_tags — Query available access tags + case_get_access_tags — Get access tag details by ID + case_aggregate_access_tags — Aggregate access tag data +``` + +Add tool registrations in `register_tools()`, after the `case_get_fields` registration block (before the closing of `register_tools`): + +```python + self._add_tool( + server, + self.case_query_access_tags, + name="case_query_access_tags", + description="Query available case access tags with optional FQL filtering. Returns tag IDs for understanding case access controls.", + ) + self._add_tool( + server, + self.case_get_access_tags, + name="case_get_access_tags", + description="Get access tag details by ID — name, description, and scope.", + ) + self._add_tool( + server, + self.case_aggregate_access_tags, + name="case_aggregate_access_tags", + description="Aggregate case access tag data (counts, groupings by field).", + ) +``` + +Add the tool methods after `case_get_fields` and before the `# Internal methods` comment: + +```python + async def case_query_access_tags( + self, + filter: Annotated[Optional[str], "FQL filter expression for access tags"] = None, + limit: Annotated[int, "Maximum tags to return (default: 100)"] = 100, + offset: Annotated[int, "Pagination offset (default: 0)"] = 0, + ) -> str: + """Query available case access tags.""" + try: + kwargs = {"limit": min(limit, 500), "offset": offset} + if filter: + kwargs["filter"] = filter + + response = self.falcon.query_access_tags(**kwargs) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to query access tags: {format_api_error(response, 'Failed to query access tags', operation='queries_access_tags_get_v1')}", + raw=True, + ) + + tag_ids = response.get("body", {}).get("resources", []) + total = response.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(tag_ids)) + + lines = [f"Access Tags: {len(tag_ids)} returned (of {total} total)", ""] + if not tag_ids: + lines.append("No access tags found.") + else: + for i, tag_id in enumerate(tag_ids, 1): + lines.append(f"{i}. {tag_id}") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to query access tags: {e}", raw=True) + + async def case_get_access_tags( + self, + tag_ids: Annotated[list[str], "List of access tag IDs to retrieve"], + ) -> str: + """Get access tag details by ID.""" + try: + response = self.falcon.get_access_tags(ids=tag_ids) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get access tags: {format_api_error(response, 'Failed to get access tags', operation='entities_access_tags_get_v1')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Access Tag Details ({len(resources)} tags)", ""] + + for tag in resources: + lines.append(f"### {tag.get('name', 'Unknown')}") + lines.append(f"- **ID**: {tag.get('id', 'N/A')}") + if tag.get("description"): + lines.append(f"- **Description**: {tag['description']}") + lines.append("") + lines.append("```json") + lines.append(json.dumps(tag, indent=2, default=str)) + lines.append("```") + lines.append("") + + if not resources: + lines.append("No access tags found for the provided IDs.") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get access tags: {e}", raw=True) + + async def case_aggregate_access_tags( + self, + date_ranges: Annotated[list, "Date range specifications for aggregation"], + field: Annotated[str, "Field to aggregate on (e.g. 'name', 'id')"], + filter: Annotated[str, "FQL filter to scope the aggregation"], + name: Annotated[str, "Name for this aggregation result"], + type: Annotated[str, "Aggregation type (e.g. 'terms', 'date_range', 'count')"], + ) -> str: + """Aggregate case access tag data.""" + try: + body = [ + { + "date_ranges": date_ranges, + "field": field, + "filter": filter, + "name": name, + "type": type, + } + ] + response = self.falcon.aggregate_access_tags(body=body) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to aggregate access tags: {format_api_error(response, 'Failed to aggregate access tags', operation='aggregates_access_tags_post_v1')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = ["Access Tag Aggregation Results", ""] + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to aggregate access tags: {e}", raw=True) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `python -m pytest tests/test_case_management_new_tools.py -v` +Expected: All tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add modules/case_management.py tests/test_case_management_new_tools.py +git commit -m "feat(case-management): add access tags query/get/aggregate tools" +``` + +--- + +### Task 3: Add Case Management RTR Metadata Tools (2 tools) + +**Files:** +- Modify: `modules/case_management.py` +- Modify: `tests/test_case_management_new_tools.py` + +- [ ] **Step 1: Write failing tests for RTR metadata tools** + +Append to `tests/test_case_management_new_tools.py`: + +```python +class TestCaseGetRtrFileMetadata: + """Test case_get_rtr_file_metadata tool.""" + + def test_returns_file_metadata(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "file-001", + "file_name": "suspicious.exe", + "file_size": 1024, + "sha256": "abc123def456", + } + ] + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_rtr_file_metadata(case_id="case-123") + ) + assert "suspicious.exe" in result + assert "file-001" in result + + def test_handles_no_files(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_rtr_file_metadata(case_id="case-123") + ) + assert "no rtr" in result.lower() or "0" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_rtr_file_metadata(case_id="case-123") + ) + assert "failed" in result.lower() + + +class TestCaseGetRtrRecentFiles: + """Test case_get_rtr_recent_files tool.""" + + def test_returns_recent_files(self, case_module): + case_module.falcon.get_rtr_recent_files.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "file-002", + "file_name": "collected.log", + "created_on": "2026-03-31T12:00:00Z", + } + ] + }, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_rtr_recent_files(case_id="case-123") + ) + assert "collected.log" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_rtr_recent_files.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + case_module.case_get_rtr_recent_files(case_id="case-123") + ) + assert "failed" in result.lower() + + +class TestRtrToolRegistration: + """Verify RTR tools register correctly.""" + + def test_rtr_tools_register_as_read(self, case_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + case_module.register_tools(server) + assert "case_get_rtr_file_metadata" in case_module.tools + assert "case_get_rtr_recent_files" in case_module.tools +``` + +- [ ] **Step 2: Run tests to verify new tests fail** + +Run: `python -m pytest tests/test_case_management_new_tools.py::TestCaseGetRtrFileMetadata -v` +Expected: FAIL — method `case_get_rtr_file_metadata` doesn't exist yet. + +- [ ] **Step 3: Implement RTR metadata tools** + +Add to the module docstring at the top of `modules/case_management.py`, after the access tags entries: + +```python + case_get_rtr_file_metadata — Get RTR-collected file metadata for a case + case_get_rtr_recent_files — Get recent RTR file activity for a case +``` + +Add tool registrations in `register_tools()`, after the `case_aggregate_access_tags` registration: + +```python + self._add_tool( + server, + self.case_get_rtr_file_metadata, + name="case_get_rtr_file_metadata", + description="Get metadata about RTR-collected files attached to a case — filename, size, hash, collection time.", + ) + self._add_tool( + server, + self.case_get_rtr_recent_files, + name="case_get_rtr_recent_files", + description="Retrieve recent RTR file collection activity for a case.", + ) +``` + +Add the tool methods after `case_aggregate_access_tags` and before the `# Internal methods` comment: + +```python + async def case_get_rtr_file_metadata( + self, + case_id: Annotated[str, "Case ID to retrieve RTR file metadata for"], + ) -> str: + """Get metadata about RTR-collected files attached to a case.""" + try: + response = self.falcon.get_rtr_file_metadata(body={"case_id": case_id}) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get RTR file metadata: {format_api_error(response, 'Failed to get RTR file metadata', operation='entities_get_rtr_file_metadata_post_v1')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = [f"RTR File Metadata for Case {case_id} ({len(resources)} files)", ""] + + if not resources: + lines.append("No RTR files found for this case.") + else: + for i, f in enumerate(resources, 1): + lines.append(f"{i}. **{f.get('file_name', 'Unknown')}**") + lines.append(f" - ID: {f.get('id', 'N/A')}") + if f.get("file_size"): + lines.append(f" - Size: {f['file_size']} bytes") + if f.get("sha256"): + lines.append(f" - SHA256: {f['sha256']}") + if f.get("created_on"): + lines.append(f" - Collected: {f['created_on']}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get RTR file metadata: {e}", raw=True) + + async def case_get_rtr_recent_files( + self, + case_id: Annotated[str, "Case ID to retrieve recent RTR files for"], + ) -> str: + """Retrieve recent RTR file collection activity for a case.""" + try: + response = self.falcon.get_rtr_recent_files(body={"case_id": case_id}) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get RTR recent files: {format_api_error(response, 'Failed to get RTR recent files', operation='entities_retrieve_rtr_recent_file_post_v1')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Recent RTR Files for Case {case_id} ({len(resources)} files)", ""] + + if not resources: + lines.append("No recent RTR files found for this case.") + else: + for i, f in enumerate(resources, 1): + lines.append(f"{i}. **{f.get('file_name', 'Unknown')}**") + lines.append(f" - ID: {f.get('id', 'N/A')}") + if f.get("created_on"): + lines.append(f" - Collected: {f['created_on']}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get RTR recent files: {e}", raw=True) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `python -m pytest tests/test_case_management_new_tools.py -v` +Expected: All tests PASS (including the access tags tests from Task 2). + +- [ ] **Step 5: Commit** + +```bash +git add modules/case_management.py tests/test_case_management_new_tools.py +git commit -m "feat(case-management): add RTR file metadata and recent files tools" +``` + +--- + +### Task 4: Add Correlation Rules Template Tools (2 tools) + +**Files:** +- Modify: `modules/correlation.py` +- Create: `tests/test_correlation_templates.py` + +- [ ] **Step 1: Write failing tests for template tools** + +Create `tests/test_correlation_templates.py`: + +```python +"""Tests for correlation rule template tools added in FalconPy v1.6.1.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +MOCK_TEMPLATE = { + "id": "template-uuid-001", + "name": "Lateral Movement - RDP Brute Force", + "description": "Detects repeated RDP login failures indicating brute force attempts.", + "severity": 60, + "search": { + "filter": '#event_simpleName=UserLogonFailed2 LogonType=10 | groupBy([aid, UserName], function=count()) | count > 10', + }, + "created_on": "2026-01-15T00:00:00Z", + "updated_on": "2026-03-20T00:00:00Z", +} + + +@pytest.fixture +def correlation_module(mock_client): + """Create CorrelationModule with mocked API.""" + with patch("modules.correlation.CorrelationRules") as MockCR: + mock_cr = MagicMock() + MockCR.return_value = mock_cr + from modules.correlation import CorrelationModule + + module = CorrelationModule(mock_client) + module.falcon = mock_cr + return module + + +class TestCorrelationListTemplates: + """Test correlation_list_templates tool.""" + + def test_returns_template_ids(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 200, + "body": { + "resources": ["template-uuid-001", "template-uuid-002"], + "meta": {"pagination": {"total": 2}}, + }, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_list_templates() + ) + assert "template-uuid-001" in result + assert "template-uuid-002" in result + + def test_handles_empty_results(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 200, + "body": { + "resources": [], + "meta": {"pagination": {"total": 0}}, + }, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_list_templates() + ) + assert "no templates" in result.lower() or "0" in result + + def test_handles_api_error(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_list_templates() + ) + assert "failed" in result.lower() + + +class TestCorrelationGetTemplate: + """Test correlation_get_template tool.""" + + def test_returns_template_details(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 200, + "body": {"resources": [MOCK_TEMPLATE]}, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_get_template( + template_ids=["template-uuid-001"] + ) + ) + assert "Lateral Movement" in result + assert "template-uuid-001" in result + assert "RDP" in result + + def test_handles_not_found(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_get_template( + template_ids=["bad-id"] + ) + ) + assert "no templates found" in result.lower() + + def test_handles_api_error(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + correlation_module.correlation_get_template( + template_ids=["template-uuid-001"] + ) + ) + assert "failed" in result.lower() + + +class TestTemplateToolRegistration: + """Verify template tools register correctly.""" + + def test_template_tools_register_as_read(self, correlation_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + correlation_module.register_tools(server) + assert "correlation_list_templates" in correlation_module.tools + assert "correlation_get_template" in correlation_module.tools +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `python -m pytest tests/test_correlation_templates.py -v` +Expected: FAIL — methods `correlation_list_templates`, `correlation_get_template` don't exist yet. + +- [ ] **Step 3: Implement template tools in correlation.py** + +Add to the module docstring at the top of `modules/correlation.py`, after `correlation_export_rule`: + +```python + correlation_list_templates — List available rule templates + correlation_get_template — Get full template details +``` + +Add tool registrations in `register_tools()`, after the `correlation_import_to_iac` registration: + +```python + self._add_tool( + server, + self.correlation_list_templates, + name="correlation_list_templates", + description="List available CrowdStrike correlation rule templates with optional filtering. Templates are pre-built detection patterns.", + ) + self._add_tool( + server, + self.correlation_get_template, + name="correlation_get_template", + description="Get full template details by ID, including CQL logic and configuration.", + ) +``` + +Add the tool methods after `correlation_import_to_iac` and before `_rule_to_template`: + +```python + async def correlation_list_templates( + self, + filter: Annotated[Optional[str], "FQL filter expression for templates"] = None, + limit: Annotated[int, "Maximum templates to return (default: 100)"] = 100, + offset: Annotated[int, "Pagination offset (default: 0)"] = 0, + ) -> str: + """List available correlation rule templates.""" + try: + kwargs = {"limit": min(limit, 500), "offset": offset} + if filter: + kwargs["filter"] = filter + + if self._use_harness: + response = self.falcon.command("queries_templates_get_v1Mixin0", **kwargs) + else: + response = self.falcon.query_templates(**kwargs) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to list templates: {format_api_error(response, 'Failed to query templates', operation='queries_templates_get_v1Mixin0')}", + raw=True, + ) + + template_ids = response.get("body", {}).get("resources", []) + total = response.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(template_ids)) + + lines = [f"Correlation Rule Templates: {len(template_ids)} returned (of {total} total)", ""] + + if not template_ids: + lines.append("No templates found.") + else: + for i, tid in enumerate(template_ids, 1): + lines.append(f"{i}. {tid}") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to list templates: {e}", raw=True) + + async def correlation_get_template( + self, + template_ids: Annotated[list[str], "List of template IDs to retrieve"], + ) -> str: + """Get full details for correlation rule templates.""" + try: + if self._use_harness: + response = self.falcon.command("entities_templates_get_v1Mixin0", ids=template_ids) + else: + response = self.falcon.get_templates(ids=template_ids) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get templates: {format_api_error(response, 'Failed to get template details', operation='entities_templates_get_v1Mixin0')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + + if not resources: + return format_text_response( + f"No templates found for IDs: {template_ids}", + raw=True, + ) + + lines = [f"Correlation Rule Template Details ({len(resources)} templates)", ""] + + for template in resources: + lines.append(f"### {template.get('name', 'Unknown')}") + lines.append(f"- ID: {template.get('id', 'N/A')}") + lines.append(f"- Severity: {template.get('severity', 'N/A')}") + if template.get("description"): + lines.append(f"- Description: {template['description']}") + lines.append(f"- Created: {template.get('created_on', 'N/A')}") + lines.append(f"- Updated: {template.get('updated_on', 'N/A')}") + + search = template.get("search", {}) + if search and search.get("filter"): + lines.append("\n**CQL Filter:**") + lines.append("```") + lines.append(search["filter"]) + lines.append("```") + + lines.append("") + lines.append("**Full Template JSON:**") + lines.append("```json") + lines.append(json.dumps(template, indent=2, default=str)) + lines.append("```") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get templates: {e}", raw=True) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `python -m pytest tests/test_correlation_templates.py -v` +Expected: All tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add modules/correlation.py tests/test_correlation_templates.py +git commit -m "feat(correlation): add rule template list and get tools" +``` + +--- + +### Task 5: Create Spotlight Module (1 tool) + +**Files:** +- Create: `modules/spotlight.py` +- Create: `tests/test_spotlight.py` + +- [ ] **Step 1: Write failing tests for spotlight module** + +Create `tests/test_spotlight.py`: + +```python +"""Tests for Spotlight evaluation logic module.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture +def spotlight_module(mock_client): + """Create SpotlightModule with mocked API.""" + with patch("modules.spotlight.SpotlightEvaluationLogic") as MockSEL: + mock_sel = MagicMock() + MockSEL.return_value = mock_sel + from modules.spotlight import SpotlightModule + + module = SpotlightModule(mock_client) + module.falcon = mock_sel + return module + + +class TestSpotlightSupportedEvaluations: + """Test spotlight_supported_evaluations tool.""" + + def test_returns_evaluation_data(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "eval-001", + "name": "Windows Kernel Vulnerability", + "platforms": ["Windows"], + "cve_ids": ["CVE-2024-1234"], + } + ] + }, + } + result = asyncio.get_event_loop().run_until_complete( + spotlight_module.spotlight_supported_evaluations() + ) + assert "Windows Kernel Vulnerability" in result + assert "eval-001" in result + + def test_handles_empty_results(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.get_event_loop().run_until_complete( + spotlight_module.spotlight_supported_evaluations() + ) + assert "no evaluation" in result.lower() or "0" in result + + def test_handles_api_error(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.get_event_loop().run_until_complete( + spotlight_module.spotlight_supported_evaluations() + ) + assert "failed" in result.lower() + + def test_passes_filter_parameter(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + asyncio.get_event_loop().run_until_complete( + spotlight_module.spotlight_supported_evaluations(filter="platform:'Windows'") + ) + spotlight_module.falcon.combined_supported_evaluation.assert_called_once_with( + filter="platform:'Windows'" + ) + + +class TestSpotlightToolRegistration: + """Verify spotlight tool registers correctly.""" + + def test_tool_registers_as_read(self, spotlight_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + spotlight_module.register_tools(server) + assert "spotlight_supported_evaluations" in spotlight_module.tools +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `python -m pytest tests/test_spotlight.py -v` +Expected: FAIL — `modules/spotlight.py` doesn't exist yet. + +- [ ] **Step 3: Create spotlight module** + +Create `modules/spotlight.py`: + +```python +""" +Spotlight Module — vulnerability evaluation logic via the SpotlightEvaluationLogic API. + +Tools: + spotlight_supported_evaluations — Get supported vulnerability evaluation logic +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Annotated, Optional + +from falconpy import SpotlightEvaluationLogic + +from common.errors import format_api_error +from modules.base import BaseModule +from utils import format_text_response + +if TYPE_CHECKING: + from mcp.server.fastmcp import FastMCP + + +class SpotlightModule(BaseModule): + """Spotlight vulnerability evaluation logic queries.""" + + def __init__(self, client): + super().__init__(client) + self.falcon = SpotlightEvaluationLogic(auth_object=self.client.auth_object) + self._log("Initialized") + + def register_tools(self, server: FastMCP) -> None: + self._add_tool( + server, + self.spotlight_supported_evaluations, + name="spotlight_supported_evaluations", + description=( + "Get supported vulnerability evaluation logic — assessment methods, " + "OS/platform coverage, and evaluation criteria. Use to check if Spotlight " + "can evaluate a specific CVE or what platforms are covered." + ), + ) + + async def spotlight_supported_evaluations( + self, + filter: Annotated[Optional[str], "FQL filter expression (e.g. platform:'Windows')"] = None, + ) -> str: + """Get combined supported evaluation logic.""" + try: + kwargs = {} + if filter: + kwargs["filter"] = filter + + response = self.falcon.combined_supported_evaluation(**kwargs) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get supported evaluations: {format_api_error(response, 'Failed to get evaluations', operation='combinedSupportedEvaluationExt')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Spotlight Supported Evaluations ({len(resources)} results)", ""] + + if not resources: + lines.append("No evaluation logic found matching the filter.") + else: + for i, ev in enumerate(resources, 1): + lines.append(f"{i}. **{ev.get('name', 'Unknown')}**") + lines.append(f" - ID: {ev.get('id', 'N/A')}") + if ev.get("platforms"): + lines.append(f" - Platforms: {', '.join(ev['platforms'])}") + if ev.get("cve_ids"): + lines.append(f" - CVEs: {', '.join(ev['cve_ids'][:10])}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get supported evaluations: {e}", raw=True) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `python -m pytest tests/test_spotlight.py -v` +Expected: All tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add modules/spotlight.py tests/test_spotlight.py +git commit -m "feat(spotlight): add evaluation logic module with supported evaluations tool" +``` + +--- + +### Task 6: Update API Scopes + +**Files:** +- Modify: `common/api_scopes.py` + +- [ ] **Step 1: Add scope mappings for new operations** + +Add to the `OPERATION_SCOPES` dict in `common/api_scopes.py`, after the existing Case Management block: + +```python + # Case Management — Access Tags & RTR (v1.6.1) + "queries_access_tags_get_v1": ["cases:read"], + "entities_access_tags_get_v1": ["cases:read"], + "aggregates_access_tags_post_v1": ["cases:read"], + "entities_get_rtr_file_metadata_post_v1": ["cases:read"], + "entities_retrieve_rtr_recent_file_post_v1": ["cases:read"], +``` + +Add after the existing Correlation Rules block: + +```python + # Correlation Rules — Templates (v1.6.1) + "queries_templates_get_v1Mixin0": ["correlation-rules:read"], + "entities_templates_get_v1Mixin0": ["correlation-rules:read"], +``` + +Add a new Spotlight section: + +```python + # Spotlight Evaluation Logic + "combinedSupportedEvaluationExt": ["spotlight-vulnerabilities:read"], +``` + +Also remove the stale Detects entry that references a deprecated operation: + +```python + # Remove this line: + "get_detect_summaries": ["detects:read"], +``` + +- [ ] **Step 2: Run existing tests to verify nothing broke** + +Run: `python -m pytest tests/ -v` +Expected: All existing tests PASS. + +- [ ] **Step 3: Commit** + +```bash +git add common/api_scopes.py +git commit -m "chore(scopes): add scope mappings for v1.6.1 operations, remove stale detects entry" +``` + +--- + +### Task 7: Update Smoke Tests + +**Files:** +- Modify: `tests/test_smoke_tools_list.py` + +- [ ] **Step 1: Update expected tool sets** + +In `tests/test_smoke_tools_list.py`, add to `EXPECTED_READ_TOOLS` set: + +```python + "case_query_access_tags", + "case_get_access_tags", + "case_aggregate_access_tags", + "case_get_rtr_file_metadata", + "case_get_rtr_recent_files", + "correlation_list_templates", + "correlation_get_template", + "spotlight_supported_evaluations", +``` + +Add to `_FALCONPY_PATCHES` list: + +```python + "modules.spotlight.SpotlightEvaluationLogic", +``` + +Add to the `_patch_falconpy()` context manager, add inside the `with` block: + +```python + patch.multiple("modules.spotlight", SpotlightEvaluationLogic=MagicMock()), +``` + +- [ ] **Step 2: Run smoke tests to verify they pass** + +Run: `python -m pytest tests/test_smoke_tools_list.py -v` +Expected: All 4 smoke tests PASS. + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_smoke_tools_list.py +git commit -m "test: update smoke tests for 8 new v1.6.1 tools" +``` + +--- + +### Task 8: Update Permission Presets + +**Files:** +- Modify: `.claude/permissions-standard.json` + +- [ ] **Step 1: Add new tools to standard preset** + +In `.claude/permissions-standard.json`, add to the `"allow"` array (after the existing `cloud_compliance_by_account` entry): + +```json + "mcp__crowdstrike__case_query_access_tags", + "mcp__crowdstrike__case_get_access_tags", + "mcp__crowdstrike__case_aggregate_access_tags", + "mcp__crowdstrike__case_get_rtr_file_metadata", + "mcp__crowdstrike__case_get_rtr_recent_files", + "mcp__crowdstrike__correlation_list_templates", + "mcp__crowdstrike__correlation_get_template", + "mcp__crowdstrike__spotlight_supported_evaluations" +``` + +Note: `permissions-full.json` uses `"mcp__crowdstrike__*"` wildcard so it already covers new tools. `permissions-minimal.json` is intentionally limited — no changes needed. + +- [ ] **Step 2: Verify JSON is valid** + +Run: `python -c "import json; json.load(open('.claude/permissions-standard.json')); print('Valid JSON')"` +Expected: `Valid JSON` + +- [ ] **Step 3: Commit** + +```bash +git add .claude/permissions-standard.json +git commit -m "chore(permissions): add v1.6.1 tools to standard preset" +``` + +--- + +### Task 9: Run Full Test Suite + +- [ ] **Step 1: Run all tests** + +Run: `python -m pytest tests/ -v` +Expected: All tests PASS. + +- [ ] **Step 2: Run linting** + +Run: `ruff check . && ruff format --check .` +Expected: No errors. + +- [ ] **Step 3: Fix any lint issues** + +If ruff reports issues, fix them and re-run. + +- [ ] **Step 4: Final commit if lint fixes were needed** + +```bash +git add -u +git commit -m "style: apply ruff fixes for v1.6.1 tools" +``` diff --git a/docs/superpowers/specs/2026-03-31-cao-hunting-module-design.md b/docs/superpowers/specs/2026-03-31-cao-hunting-module-design.md new file mode 100644 index 0000000..a1b1818 --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-cao-hunting-module-design.md @@ -0,0 +1,110 @@ +# CAO Hunting Module Design + +## Context + +CrowdStrike's CAO (Custom Assessment Operations) Hunting API provides access to curated intelligence queries and hunting guides. The FalconPy SDK (`>= 1.6.0`) already ships a `CAOHunting` service class with 7 operations. This module exposes 5 of those as MCP tools, following the established patterns in this project. + +## Tools (5, all read-tier) + +### `cao_search_queries` +Search and retrieve intelligence queries. Combines `search_queries` (returns IDs) + `get_queries` (hydrates details) — same pattern as `cloud_security._query_assets`. + +**Parameters:** +- `filter: Optional[str]` — FQL filter expression +- `q: Optional[str]` — Free-text search +- `sort: Optional[str]` — Sort field and direction (e.g. `created_on|desc`) +- `include_translated_content: bool = False` — Include AI-translated content (SPL, etc.) +- `max_results: int = 20` — Max queries to return + +### `cao_get_queries` +Retrieve intelligence queries by known IDs. + +**Parameters:** +- `ids: str` — Comma-separated intelligence query IDs +- `include_translated_content: bool = False` + +### `cao_search_guides` +Search and retrieve hunting guides. Same search+hydrate pattern. + +**Parameters:** +- `filter: Optional[str]` — FQL filter expression +- `q: Optional[str]` — Free-text search +- `sort: Optional[str]` — Sort field and direction +- `max_results: int = 20` + +### `cao_get_guides` +Retrieve hunting guides by known IDs. + +**Parameters:** +- `ids: str` — Comma-separated hunting guide IDs + +### `cao_aggregate` +Aggregate intelligence queries or hunting guides. Decomposes the aggregation body into explicit parameters for MCP usability. + +**Parameters:** +- `field: str` — Field to aggregate on (e.g. `severity`, `tags`, `created_on`) +- `type: str = "terms"` — Aggregation type (`terms`, `date_range`, `range`, `cardinality`) +- `resource_type: str = "queries"` — What to aggregate: `queries` or `guides` +- `filter: Optional[str]` — FQL filter to scope the aggregation +- `size: int = 10` — Number of buckets to return + +## Dropped + +- **`cao_export_archive`** — Binary file export. Unusual pattern for this project; deferred to a future PR. + +## Files + +### New: `modules/cao_hunting.py` + +``` +CAOHuntingModule(BaseModule) +├── __init__(client) → CAOHunting(auth_object=...) +├── register_tools(server) → 5 × _add_tool(..., tier="read") +├── cao_search_queries(...) → _search_queries() +├── cao_get_queries(...) → _get_queries_by_ids() +├── cao_search_guides(...) → _search_guides() +├── cao_get_guides(...) → _get_guides_by_ids() +├── cao_aggregate(...) → _aggregate() +├── _search_queries() → search_queries + get_queries +├── _get_queries_by_ids() → get_queries +├── _search_guides() → search_guides + get_guides +├── _get_guides_by_ids() → get_guides +└── _aggregate() → aggregate_queries | aggregate_guides +``` + +### Modify: `common/api_scopes.py` + +Add 7 operation→scope mappings (all `cao-hunting:read`): + +```python +"search_queries": ["cao-hunting:read"], +"get_queries": ["cao-hunting:read"], +"aggregate_queries": ["cao-hunting:read"], +"search_guides": ["cao-hunting:read"], +"get_guides": ["cao-hunting:read"], +"aggregate_guides": ["cao-hunting:read"], +"create_export_archive": ["cao-hunting:read"], +``` + +### New: `tests/test_cao_hunting.py` + +Test classes: +- `TestSearchQueries` — search+hydrate flow, empty results, API errors (403 with scope message) +- `TestGetQueries` — direct get by IDs +- `TestSearchGuides` — search+hydrate flow +- `TestGetGuides` — direct get by IDs +- `TestAggregate` — terms aggregation, guides vs queries routing +- `TestToolRegistration` — all 5 tools register, all are read-tier + +### Modify: `tests/test_smoke_tools_list.py` + +- Add `"modules.cao_hunting.CAOHunting"` to `_FALCONPY_PATCHES` +- Add `patch.multiple("modules.cao_hunting", CAOHunting=MagicMock())` to `_patch_falconpy()` +- Add 5 tool names to `EXPECTED_READ_TOOLS` + +## Verification + +1. `pytest tests/test_cao_hunting.py` — unit tests pass +2. `pytest tests/test_smoke_tools_list.py` — smoke tests pass (new tools in expected sets) +3. `pytest tests/` — full test suite passes +4. `ruff check modules/cao_hunting.py tests/test_cao_hunting.py` — lint clean diff --git a/docs/superpowers/specs/2026-04-01-falconpy-v1.6.1-upgrade-design.md b/docs/superpowers/specs/2026-04-01-falconpy-v1.6.1-upgrade-design.md new file mode 100644 index 0000000..80bd015 --- /dev/null +++ b/docs/superpowers/specs/2026-04-01-falconpy-v1.6.1-upgrade-design.md @@ -0,0 +1,149 @@ +# FalconPy v1.6.1 Upgrade — Design Spec + +**Date**: 2026-04-01 +**Scope**: Bump FalconPy dependency, add 8 new read-only MCP tools, gain free improvements on existing tools + +## Context + +FalconPy v1.6.1 was released with significant additions: new service collections, expanded operations on existing services, deprecation notices, and bug fixes. This spec captures the subset of changes relevant to our MCP, filtered through these criteria: + +- **Read-only**: No write operations in this iteration +- **Agent-valuable**: Operations an AI agent would meaningfully use during investigation/triage +- **Non-admin**: Skip infrastructure/administration operations (NGSIEM data connectors, parser management) + +Our MCP currently has 9 modules with 31 tools. This upgrade brings it to 10 modules with 39 tools. + +## Changes + +### 1. Dependency Bump + +**File**: `requirements.txt` + +Change `crowdstrike-falconpy>=1.6.0` to `crowdstrike-falconpy>=1.6.1`. + +This alone provides: +- Enhanced filter/sort fields on Hosts (`query_devices_by_filter`: 11 new sort fields including `device_policies.application-abuse-prevention`, `device_policies.data-protection-cloud`, `pointer_size`, `safe_mode`) +- Enhanced filter/sort fields on Cloud Security Assets (`cloud_risks.rule`, `cloud_risks.severity`, `cloud_risks.status`, public exposure sort fields) +- Enhanced filter fields on Container Images (`index_digest`, `ai_related`, `ai_vulnerability_count`) +- Enhanced filter fields on Container Vulnerabilities (`ai_related`, `index_digest`) +- Bug fixes: base_url setter validation, case management alias corrections, installation tokens truthiness fix, Falcon Container route formatting fix + +No code changes required for these improvements. + +### 2. Case Management Expansion (5 new tools) + +**File**: `modules/case_management.py` + +Extend the existing module with 5 new read-only tools using the existing `CaseManagement` service class (no new imports). + +#### Access Tags (3 tools) + +| Tool Name | FalconPy Operation | Description | +|-----------|--------------------|-------------| +| `case_query_access_tags` | `queries_access_tags_get_v1` | Query available access tags with FQL filtering. Returns tag IDs. | +| `case_get_access_tags` | `entities_access_tags_get_v1` | Get access tag details by ID. Returns tag name, description, and scope. | +| `case_aggregate_access_tags` | `aggregates_access_tags_post_v1` | Aggregate access tag data (counts, groupings). | + +**Agent value**: Understanding case access controls — "who can see this case?", "what access scoping exists?" + +#### RTR File Metadata (2 tools) + +| Tool Name | FalconPy Operation | Description | +|-----------|--------------------|-------------| +| `case_get_rtr_file_metadata` | `entities_get_rtr_file_metadata_post_v1` | Get metadata about RTR-collected files attached to a case (filename, size, hash, collection time). | +| `case_get_rtr_recent_files` | `entities_retrieve_rtr_recent_file_post_v1` | Retrieve recent RTR file collection activity for a case. | + +**Agent value**: During investigations, the agent can see what files were collected from endpoints via RTR without needing to download them. Answers "what evidence has been collected?" and "when was this file pulled?" + +**Excluded**: `entities_retrieve_rtr_file_post_v1` (actual file download) — returns binary content, deferred until we have a binary content handling pattern. + +All 5 tools: `tier="read"`. + +### 3. Correlation Rules Templates (2 new tools) + +**File**: `modules/correlation.py` + +Extend the existing module with 2 new read-only tools. Uses the existing `CorrelationRules` service class (with `APIHarnessV2` fallback, same pattern as existing tools). + +| Tool Name | FalconPy Operation | Description | +|-----------|--------------------|-------------| +| `correlation_list_templates` | `queries_templates_get_v1Mixin0` | Query available rule templates with filtering. Returns template IDs. | +| `correlation_get_template` | `entities_templates_get_v1Mixin0` | Get full template details by ID, including CQL logic and configuration. | + +**Agent value**: Browse CrowdStrike-provided detection templates — "what out-of-box detections exist for lateral movement?", "show me the template logic for this detection type." + +Both tools: `tier="read"`. + +### 4. Spotlight Evaluation Logic (1 new tool, new module) + +**File**: `modules/spotlight.py` (new) + +New module following the standard `BaseModule` pattern: + +```python +class SpotlightModule(BaseModule): + def __init__(self, client): + super().__init__(client) + self.service = SpotlightEvaluationLogic(auth_object=self.client.auth_object) +``` + +| Tool Name | FalconPy Operation | Description | +|-----------|--------------------|-------------| +| `spotlight_supported_evaluations` | `combinedSupportedEvaluationExt` | Get combined supported evaluation logic — vulnerability assessment methods, OS/platform coverage, evaluation criteria. | + +**Agent value**: "Can Spotlight evaluate CVE-2024-XXXX?", "what platforms does this vulnerability evaluation cover?" + +Tool: `tier="read"`. + +**Note**: This is a single-tool module. The modular architecture supports this cleanly, and Spotlight has room to grow (SpotlightVulnerabilities service exists). Users without a Spotlight license can exclude it via `--modules`. + +Auto-discovered by `registry.py` — no changes needed to the registry. + +### 5. API Scopes + +**File**: `common/api_scopes.py` + +Add scope mappings for all 8 new operations: + +- Case Management access tags/RTR: likely `cases:read` +- Correlation Rules templates: likely `correlation-rules:read` +- Spotlight evaluation logic: likely `spotlight-vulnerabilities:read` + +Exact scopes to be confirmed from FalconPy endpoint definitions during implementation. + +### 6. Permission Presets + +**Files**: `.claude/permissions-*.json` + +- `permissions-minimal.json`: No changes (query-only, doesn't include case/correlation tools) +- `permissions-standard.json`: Add the 8 new read tools (SOC analyst needs these) +- `permissions-full.json`: Add the 8 new read tools + +### 7. Tests + +New test coverage for all 8 tools: + +- `tests/test_case_management.py`: Extend with tests for 5 new tools +- `tests/test_correlation.py`: Extend with tests for 2 new tools (template operations) +- `tests/test_spotlight.py`: New test file for the spotlight module +- `tests/test_smoke_tools_list.py`: Update expected tool count (31 → 39) + +Test pattern follows existing convention: mock FalconPy service responses, verify tool returns formatted output. + +## What's Excluded + +| Category | Reason | +|----------|--------| +| All write operations | Read-only iteration | +| NGSIEM admin (data connectors, parsers) | Infrastructure admin, not investigation | +| Sensor Download v3 | Not agent-valuable for investigation | +| ML Exclusions v2 | Deferred — read-only exclusion queries may be added later | +| IOA Exclusions v2 | Deferred — same as above | +| Cloud Policies suppression rules | Deferred | +| Cloud Security Detections combined IOM by rule | Deferred | +| RTR file download | Binary content — needs content handling pattern first | +| Admission Control Policies | Kubernetes-specific, new service collection, deferred | +| Data Protection Configuration | Specialized DLP, deferred | +| ASPM group operations | Niche application security, deferred | +| Serverless Exports | Data export, deferred | +| Overwatch Dashboard | Deprecated by CrowdStrike | diff --git a/modules/cao_hunting.py b/modules/cao_hunting.py new file mode 100644 index 0000000..e113e4b --- /dev/null +++ b/modules/cao_hunting.py @@ -0,0 +1,449 @@ +""" +CAO Hunting Module — intelligence queries and hunting guides. + +Tools: + cao_search_queries — Search and retrieve intelligence queries + cao_get_queries — Get intelligence queries by IDs + cao_search_guides — Search and retrieve hunting guides + cao_get_guides — Get hunting guides by IDs + cao_aggregate — Aggregate intelligence queries or hunting guides +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Annotated, Optional + +from common.errors import format_api_error +from modules.base import BaseModule +from utils import format_text_response + +if TYPE_CHECKING: + from mcp.server.fastmcp import FastMCP + +try: + from falconpy import CAOHunting + + CAO_HUNTING_AVAILABLE = True +except ImportError: + CAO_HUNTING_AVAILABLE = False + + +class CAOHuntingModule(BaseModule): + """Intelligence queries and hunting guides from CrowdStrike CAO.""" + + def __init__(self, client): + super().__init__(client) + + if not CAO_HUNTING_AVAILABLE: + raise ImportError("CAOHunting FalconPy class not available. Ensure crowdstrike-falconpy >= 1.6.0 is installed.") + + try: + self._cao_hunting = CAOHunting(auth_object=self.client.auth_object) + except Exception as e: + raise ImportError(f"CAOHunting init failed: {e}") from e + + self._log("Initialized") + + def register_tools(self, server: FastMCP) -> None: + self._add_tool( + server, + self.cao_search_queries, + name="cao_search_queries", + description=( + "Search CrowdStrike intelligence queries (CAO Hunting). " + "Returns curated threat-hunting queries with metadata, " + "filterable by FQL or free-text search." + ), + ) + self._add_tool( + server, + self.cao_get_queries, + name="cao_get_queries", + description=("Retrieve specific intelligence queries by their IDs. Use after cao_search_queries or when IDs are already known."), + ) + self._add_tool( + server, + self.cao_search_guides, + name="cao_search_guides", + description=( + "Search CrowdStrike hunting guides (CAO Hunting). " + "Returns curated threat-hunting guides with metadata, " + "filterable by FQL or free-text search." + ), + ) + self._add_tool( + server, + self.cao_get_guides, + name="cao_get_guides", + description=("Retrieve specific hunting guides by their IDs. Use after cao_search_guides or when IDs are already known."), + ) + self._add_tool( + server, + self.cao_aggregate, + name="cao_aggregate", + description=("Aggregate intelligence queries or hunting guides by field. Supports terms, date_range, range, and cardinality aggregations."), + ) + + # ------------------------------------------------------------------ + # Tools + # ------------------------------------------------------------------ + + async def cao_search_queries( + self, + filter: Annotated[Optional[str], "FQL filter expression for intelligence queries"] = None, + q: Annotated[Optional[str], "Free-text search across query metadata"] = None, + sort: Annotated[Optional[str], "Sort field and direction (e.g. 'created_on|desc')"] = None, + include_translated_content: Annotated[bool, "Include AI-translated content (SPL, etc.)"] = False, + max_results: Annotated[int, "Maximum queries to return (default: 20)"] = 20, + ) -> str: + """Search and retrieve intelligence queries.""" + result = self._search_queries(filter, q, sort, include_translated_content, max_results) + + if not result.get("success"): + return format_text_response(f"Failed to search intelligence queries: {result.get('error')}", raw=True) + + queries = result["queries"] + lines = [ + f"Intelligence Queries: {result['count']} returned (of {result['total']} total)", + "", + ] + + if not queries: + lines.append("No intelligence queries found matching the criteria.") + else: + for i, q_item in enumerate(queries, 1): + lines.append(f"{i}. {q_item['name']}") + if q_item.get("description"): + lines.append(f" {q_item['description'][:200]}") + lines.append(f" ID: {q_item['id']}") + if q_item.get("tags"): + lines.append(f" Tags: {', '.join(q_item['tags'][:10])}") + if q_item.get("created_on"): + lines.append(f" Created: {q_item['created_on']}") + if q_item.get("modified_on"): + lines.append(f" Modified: {q_item['modified_on']}") + if q_item.get("content"): + lines.append(f" Content: {q_item['content'][:300]}") + if q_item.get("translated_content"): + for lang, text in q_item["translated_content"].items(): + lines.append(f" [{lang}]: {text[:200]}") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + + async def cao_get_queries( + self, + ids: Annotated[str, "Comma-separated intelligence query IDs"], + include_translated_content: Annotated[bool, "Include AI-translated content"] = False, + ) -> str: + """Retrieve intelligence queries by IDs.""" + id_list = [i.strip() for i in ids.split(",") if i.strip()] + if not id_list: + return format_text_response("No valid IDs provided.", raw=True) + + result = self._get_queries_by_ids(id_list, include_translated_content) + + if not result.get("success"): + return format_text_response(f"Failed to get intelligence queries: {result.get('error')}", raw=True) + + queries = result["queries"] + lines = [f"Intelligence Queries: {len(queries)} returned", ""] + + if not queries: + lines.append("No intelligence queries found for the given IDs.") + else: + for i, q_item in enumerate(queries, 1): + lines.append(f"{i}. {q_item['name']}") + if q_item.get("description"): + lines.append(f" {q_item['description'][:200]}") + lines.append(f" ID: {q_item['id']}") + if q_item.get("tags"): + lines.append(f" Tags: {', '.join(q_item['tags'][:10])}") + if q_item.get("created_on"): + lines.append(f" Created: {q_item['created_on']}") + if q_item.get("modified_on"): + lines.append(f" Modified: {q_item['modified_on']}") + if q_item.get("content"): + lines.append(f" Content: {q_item['content'][:300]}") + if q_item.get("translated_content"): + for lang, text in q_item["translated_content"].items(): + lines.append(f" [{lang}]: {text[:200]}") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + + async def cao_search_guides( + self, + filter: Annotated[Optional[str], "FQL filter expression for hunting guides"] = None, + q: Annotated[Optional[str], "Free-text search across guide metadata"] = None, + sort: Annotated[Optional[str], "Sort field and direction"] = None, + max_results: Annotated[int, "Maximum guides to return (default: 20)"] = 20, + ) -> str: + """Search and retrieve hunting guides.""" + result = self._search_guides(filter, q, sort, max_results) + + if not result.get("success"): + return format_text_response(f"Failed to search hunting guides: {result.get('error')}", raw=True) + + guides = result["guides"] + lines = [ + f"Hunting Guides: {result['count']} returned (of {result['total']} total)", + "", + ] + + if not guides: + lines.append("No hunting guides found matching the criteria.") + else: + for i, g in enumerate(guides, 1): + lines.append(f"{i}. {g['name']}") + if g.get("description"): + lines.append(f" {g['description'][:200]}") + lines.append(f" ID: {g['id']}") + if g.get("tags"): + lines.append(f" Tags: {', '.join(g['tags'][:10])}") + if g.get("created_on"): + lines.append(f" Created: {g['created_on']}") + if g.get("modified_on"): + lines.append(f" Modified: {g['modified_on']}") + if g.get("content"): + lines.append(f" Content: {g['content'][:300]}") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + + async def cao_get_guides( + self, + ids: Annotated[str, "Comma-separated hunting guide IDs"], + ) -> str: + """Retrieve hunting guides by IDs.""" + id_list = [i.strip() for i in ids.split(",") if i.strip()] + if not id_list: + return format_text_response("No valid IDs provided.", raw=True) + + result = self._get_guides_by_ids(id_list) + + if not result.get("success"): + return format_text_response(f"Failed to get hunting guides: {result.get('error')}", raw=True) + + guides = result["guides"] + lines = [f"Hunting Guides: {len(guides)} returned", ""] + + if not guides: + lines.append("No hunting guides found for the given IDs.") + else: + for i, g in enumerate(guides, 1): + lines.append(f"{i}. {g['name']}") + if g.get("description"): + lines.append(f" {g['description'][:200]}") + lines.append(f" ID: {g['id']}") + if g.get("tags"): + lines.append(f" Tags: {', '.join(g['tags'][:10])}") + if g.get("created_on"): + lines.append(f" Created: {g['created_on']}") + if g.get("modified_on"): + lines.append(f" Modified: {g['modified_on']}") + if g.get("content"): + lines.append(f" Content: {g['content'][:300]}") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + + async def cao_aggregate( + self, + field: Annotated[str, "Field to aggregate on (e.g. 'severity', 'tags', 'created_on')"], + type: Annotated[str, "Aggregation type: 'terms', 'date_range', 'range', 'cardinality'"] = "terms", + resource_type: Annotated[str, "What to aggregate: 'queries' or 'guides'"] = "queries", + filter: Annotated[Optional[str], "FQL filter to scope the aggregation"] = None, + size: Annotated[int, "Number of buckets to return (default: 10)"] = 10, + ) -> str: + """Aggregate intelligence queries or hunting guides by field.""" + if resource_type not in ("queries", "guides"): + return format_text_response("Invalid resource_type: must be 'queries' or 'guides'.", raw=True) + + result = self._aggregate(resource_type, field, type, filter, size) + + if not result.get("success"): + return format_text_response(f"Failed to aggregate {resource_type}: {result.get('error')}", raw=True) + + buckets = result["buckets"] + lines = [ + f"Aggregation: {field} ({type}) on {resource_type}", + "", + ] + + if not buckets: + lines.append("No aggregation results.") + else: + for b in buckets: + label = b.get("label", b.get("key", "unknown")) + count = b.get("count", 0) + lines.append(f" {label}: {count}") + + return format_text_response("\n".join(lines), raw=True) + + # ------------------------------------------------------------------ + # Internal methods + # ------------------------------------------------------------------ + + def _search_queries(self, filter=None, q=None, sort=None, include_translated_content=False, max_results=20): + try: + kwargs = {"limit": min(max_results, 100)} + if filter: + kwargs["filter"] = filter + if q: + kwargs["q"] = q + if sort: + kwargs["sort"] = sort + + r = self._cao_hunting.search_queries(**kwargs) + if r["status_code"] != 200: + return { + "success": False, + "error": format_api_error(r, "Failed to search queries", operation="search_queries"), + } + + query_ids = r.get("body", {}).get("resources", []) + total = r.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(query_ids)) + + if not query_ids: + return {"success": True, "queries": [], "count": 0, "total": total} + + return self._get_queries_by_ids(query_ids, include_translated_content, total=total) + except Exception as e: + return {"success": False, "error": f"Error searching intelligence queries: {e}"} + + def _get_queries_by_ids(self, ids, include_translated_content=False, total=None): + try: + kwargs = {"ids": ids} + if include_translated_content: + kwargs["include_translated_content"] = "__all__" + + r = self._cao_hunting.get_queries(**kwargs) + if r["status_code"] != 200: + return { + "success": False, + "error": format_api_error(r, "Failed to get queries", operation="get_queries"), + } + + resources = r.get("body", {}).get("resources", []) + + queries = [] + for q in resources: + entry = { + "id": q.get("id", ""), + "name": q.get("name", ""), + "description": q.get("description", ""), + "content": q.get("content", ""), + "tags": q.get("tags", []), + "created_on": q.get("created_on", ""), + "modified_on": q.get("modified_on", ""), + } + if include_translated_content and q.get("translated_content"): + entry["translated_content"] = q["translated_content"] + queries.append(entry) + + return { + "success": True, + "queries": queries, + "count": len(queries), + "total": total if total is not None else len(queries), + } + except Exception as e: + return {"success": False, "error": f"Error getting intelligence queries: {e}"} + + def _search_guides(self, filter=None, q=None, sort=None, max_results=20): + try: + kwargs = {"limit": min(max_results, 100)} + if filter: + kwargs["filter"] = filter + if q: + kwargs["q"] = q + if sort: + kwargs["sort"] = sort + + r = self._cao_hunting.search_guides(**kwargs) + if r["status_code"] != 200: + return { + "success": False, + "error": format_api_error(r, "Failed to search guides", operation="search_guides"), + } + + guide_ids = r.get("body", {}).get("resources", []) + total = r.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(guide_ids)) + + if not guide_ids: + return {"success": True, "guides": [], "count": 0, "total": total} + + return self._get_guides_by_ids(guide_ids, total=total) + except Exception as e: + return {"success": False, "error": f"Error searching hunting guides: {e}"} + + def _get_guides_by_ids(self, ids, total=None): + try: + r = self._cao_hunting.get_guides(ids=ids) + if r["status_code"] != 200: + return { + "success": False, + "error": format_api_error(r, "Failed to get guides", operation="get_guides"), + } + + resources = r.get("body", {}).get("resources", []) + + guides = [] + for g in resources: + guides.append( + { + "id": g.get("id", ""), + "name": g.get("name", ""), + "description": g.get("description", ""), + "content": g.get("content", ""), + "tags": g.get("tags", []), + "created_on": g.get("created_on", ""), + "modified_on": g.get("modified_on", ""), + } + ) + + return { + "success": True, + "guides": guides, + "count": len(guides), + "total": total if total is not None else len(guides), + } + except Exception as e: + return {"success": False, "error": f"Error getting hunting guides: {e}"} + + def _aggregate(self, resource_type, field, agg_type, filter=None, size=10): + try: + body = [{"field": field, "type": agg_type, "size": size}] + if filter: + body[0]["filter"] = filter + + if resource_type == "queries": + r = self._cao_hunting.aggregate_queries(body=body) + op = "aggregate_queries" + else: + r = self._cao_hunting.aggregate_guides(body=body) + op = "aggregate_guides" + + if r["status_code"] != 200: + return { + "success": False, + "error": format_api_error(r, f"Failed to aggregate {resource_type}", operation=op), + } + + resources = r.get("body", {}).get("resources", []) + + buckets = [] + for agg in resources: + for bucket in agg.get("buckets", []): + buckets.append( + { + "key": bucket.get("key", ""), + "label": bucket.get("label", bucket.get("key", "")), + "count": bucket.get("count", 0), + } + ) + + return {"success": True, "buckets": buckets} + except Exception as e: + return {"success": False, "error": f"Error aggregating {resource_type}: {e}"} diff --git a/modules/case_management.py b/modules/case_management.py index 0688a8e..5330b55 100644 --- a/modules/case_management.py +++ b/modules/case_management.py @@ -12,6 +12,11 @@ case_delete_tags — Remove tags from a case case_upload_file — Upload file attachment to a case case_get_fields — List available case field definitions + case_query_access_tags — Query available access tags + case_get_access_tags — Get access tag details by ID + case_aggregate_access_tags — Aggregate access tag data + case_get_rtr_file_metadata — Get RTR-collected file metadata for a case + case_get_rtr_recent_files — Get recent RTR file activity for a case """ from __future__ import annotations @@ -122,6 +127,36 @@ def register_tools(self, server: FastMCP) -> None: name="case_get_fields", description="List available case field definitions and their types.", ) + self._add_tool( + server, + self.case_query_access_tags, + name="case_query_access_tags", + description="Query available case access tags with optional FQL filtering. Returns tag IDs for understanding case access controls.", + ) + self._add_tool( + server, + self.case_get_access_tags, + name="case_get_access_tags", + description="Get access tag details by ID — name, description, and scope.", + ) + self._add_tool( + server, + self.case_aggregate_access_tags, + name="case_aggregate_access_tags", + description="Aggregate case access tag data (counts, groupings by field).", + ) + self._add_tool( + server, + self.case_get_rtr_file_metadata, + name="case_get_rtr_file_metadata", + description="Get metadata about RTR-collected files attached to a case — filename, size, hash, collection time.", + ) + self._add_tool( + server, + self.case_get_rtr_recent_files, + name="case_get_rtr_recent_files", + description="Retrieve recent RTR file collection activity for a case.", + ) # ------------------------------------------------------------------ # Tools @@ -421,6 +456,180 @@ async def case_get_fields(self) -> str: return format_text_response("\n".join(lines), raw=True) + async def case_query_access_tags( + self, + filter: Annotated[Optional[str], "FQL filter expression for access tags"] = None, + limit: Annotated[int, "Maximum tags to return (default: 100)"] = 100, + offset: Annotated[int, "Pagination offset (default: 0)"] = 0, + ) -> str: + """Query available case access tags.""" + try: + kwargs = {"limit": min(limit, 500), "offset": offset} + if filter: + kwargs["filter"] = filter + + response = self.falcon.query_access_tags(**kwargs) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to query access tags: {format_api_error(response, 'Failed to query access tags', operation='queries_access_tags_get_v1')}", + raw=True, + ) + + tag_ids = response.get("body", {}).get("resources", []) + total = response.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(tag_ids)) + + lines = [f"Access Tags: {len(tag_ids)} returned (of {total} total)", ""] + if not tag_ids: + lines.append("No access tags found.") + else: + for i, tag_id in enumerate(tag_ids, 1): + lines.append(f"{i}. {tag_id}") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to query access tags: {e}", raw=True) + + async def case_get_access_tags( + self, + tag_ids: Annotated[list[str], "List of access tag IDs to retrieve"], + ) -> str: + """Get access tag details by ID.""" + try: + response = self.falcon.get_access_tags(ids=tag_ids) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to get access tags: {format_api_error(response, 'Failed to get access tags', operation='entities_access_tags_get_v1')}", + raw=True, + ) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Access Tag Details ({len(resources)} tags)", ""] + + for tag in resources: + lines.append(f"### {tag.get('name', 'Unknown')}") + lines.append(f"- **ID**: {tag.get('id', 'N/A')}") + if tag.get("description"): + lines.append(f"- **Description**: {tag['description']}") + lines.append("") + lines.append("```json") + lines.append(json.dumps(tag, indent=2, default=str)) + lines.append("```") + lines.append("") + + if not resources: + lines.append("No access tags found for the provided IDs.") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get access tags: {e}", raw=True) + + async def case_aggregate_access_tags( + self, + date_ranges: Annotated[list, "Date range specifications for aggregation"], + field: Annotated[str, "Field to aggregate on (e.g. 'name', 'id')"], + filter: Annotated[str, "FQL filter to scope the aggregation"], + name: Annotated[str, "Name for this aggregation result"], + type: Annotated[str, "Aggregation type (e.g. 'terms', 'date_range', 'count')"], + ) -> str: + """Aggregate case access tag data.""" + try: + body = [ + { + "date_ranges": date_ranges, + "field": field, + "filter": filter, + "name": name, + "type": type, + } + ] + response = self.falcon.aggregate_access_tags(body=body) + + if response["status_code"] != 200: + err = format_api_error(response, "Failed to aggregate access tags", operation="aggregates_access_tags_post_v1") + return format_text_response(f"Failed to aggregate access tags: {err}", raw=True) + + resources = response.get("body", {}).get("resources", []) + lines = ["Access Tag Aggregation Results", ""] + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to aggregate access tags: {e}", raw=True) + + async def case_get_rtr_file_metadata( + self, + case_id: Annotated[str, "Case ID to retrieve RTR file metadata for"], + ) -> str: + """Get metadata about RTR-collected files attached to a case.""" + try: + response = self.falcon.get_rtr_file_metadata(body={"case_id": case_id}) + + if response["status_code"] != 200: + err = format_api_error(response, "Failed to get RTR file metadata", operation="entities_get_rtr_file_metadata_post_v1") + return format_text_response(f"Failed to get RTR file metadata: {err}", raw=True) + + resources = response.get("body", {}).get("resources", []) + lines = [f"RTR File Metadata for Case {case_id} ({len(resources)} files)", ""] + + if not resources: + lines.append("No RTR files found for this case.") + else: + for i, f in enumerate(resources, 1): + lines.append(f"{i}. **{f.get('file_name', 'Unknown')}**") + lines.append(f" - ID: {f.get('id', 'N/A')}") + if f.get("file_size"): + lines.append(f" - Size: {f['file_size']} bytes") + if f.get("sha256"): + lines.append(f" - SHA256: {f['sha256']}") + if f.get("created_on"): + lines.append(f" - Collected: {f['created_on']}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get RTR file metadata: {e}", raw=True) + + async def case_get_rtr_recent_files( + self, + case_id: Annotated[str, "Case ID to retrieve recent RTR files for"], + ) -> str: + """Retrieve recent RTR file collection activity for a case.""" + try: + response = self.falcon.get_rtr_recent_files(body={"case_id": case_id}) + + if response["status_code"] != 200: + err = format_api_error(response, "Failed to get RTR recent files", operation="entities_retrieve_rtr_recent_file_post_v1") + return format_text_response(f"Failed to get RTR recent files: {err}", raw=True) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Recent RTR Files for Case {case_id} ({len(resources)} files)", ""] + + if not resources: + lines.append("No recent RTR files found for this case.") + else: + for i, f in enumerate(resources, 1): + lines.append(f"{i}. **{f.get('file_name', 'Unknown')}**") + lines.append(f" - ID: {f.get('id', 'N/A')}") + if f.get("created_on"): + lines.append(f" - Collected: {f['created_on']}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get RTR recent files: {e}", raw=True) + # ------------------------------------------------------------------ # Internal methods # ------------------------------------------------------------------ diff --git a/modules/correlation.py b/modules/correlation.py index 4eced11..a87a9b7 100644 --- a/modules/correlation.py +++ b/modules/correlation.py @@ -6,6 +6,8 @@ correlation_get_rule — Get full rule details correlation_update_rule — Enable/disable rules with audit comment correlation_export_rule — Export rule in structured format + correlation_list_templates — List available rule templates + correlation_get_template — Get full template details """ from __future__ import annotations @@ -108,6 +110,18 @@ def register_tools(self, server: FastMCP) -> None: ), tier="write", ) + self._add_tool( + server, + self.correlation_list_templates, + name="correlation_list_templates", + description="List available CrowdStrike correlation rule templates with optional filtering. Templates are pre-built detection patterns.", + ) + self._add_tool( + server, + self.correlation_get_template, + name="correlation_get_template", + description="Get full template details by ID, including CQL logic and configuration.", + ) # ------------------------------------------------------------------ # Tools @@ -468,6 +482,96 @@ async def correlation_import_to_iac( ] return format_text_response("\n".join(lines), raw=True) + async def correlation_list_templates( + self, + filter: Annotated[Optional[str], "FQL filter expression for templates"] = None, + limit: Annotated[int, "Maximum templates to return (default: 100)"] = 100, + offset: Annotated[int, "Pagination offset (default: 0)"] = 0, + ) -> str: + """List available correlation rule templates.""" + try: + kwargs = {"limit": min(limit, 500), "offset": offset} + if filter: + kwargs["filter"] = filter + + if self._use_harness: + response = self.falcon.command("queries_templates_get_v1Mixin0", **kwargs) + else: + response = self.falcon.query_templates(**kwargs) + + if response["status_code"] != 200: + return format_text_response( + f"Failed to list templates: {format_api_error(response, 'Failed to query templates', operation='queries_templates_get_v1Mixin0')}", + raw=True, + ) + + template_ids = response.get("body", {}).get("resources", []) + total = response.get("body", {}).get("meta", {}).get("pagination", {}).get("total", len(template_ids)) + + lines = [f"Correlation Rule Templates: {len(template_ids)} returned (of {total} total)", ""] + + if not template_ids: + lines.append("No templates found.") + else: + for i, tid in enumerate(template_ids, 1): + lines.append(f"{i}. {tid}") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to list templates: {e}", raw=True) + + async def correlation_get_template( + self, + template_ids: Annotated[list[str], "List of template IDs to retrieve"], + ) -> str: + """Get full details for correlation rule templates.""" + try: + if self._use_harness: + response = self.falcon.command("entities_templates_get_v1Mixin0", ids=template_ids) + else: + response = self.falcon.get_templates(ids=template_ids) + + if response["status_code"] != 200: + err = format_api_error(response, "Failed to get template details", operation="entities_templates_get_v1Mixin0") + return format_text_response(f"Failed to get templates: {err}", raw=True) + + resources = response.get("body", {}).get("resources", []) + + if not resources: + return format_text_response( + f"No templates found for IDs: {template_ids}", + raw=True, + ) + + lines = [f"Correlation Rule Template Details ({len(resources)} templates)", ""] + + for template in resources: + lines.append(f"### {template.get('name', 'Unknown')}") + lines.append(f"- ID: {template.get('id', 'N/A')}") + lines.append(f"- Severity: {template.get('severity', 'N/A')}") + if template.get("description"): + lines.append(f"- Description: {template['description']}") + lines.append(f"- Created: {template.get('created_on', 'N/A')}") + lines.append(f"- Updated: {template.get('updated_on', 'N/A')}") + + search = template.get("search", {}) + if search and search.get("filter"): + lines.append("\n**CQL Filter:**") + lines.append("```") + lines.append(search["filter"]) + lines.append("```") + + lines.append("") + lines.append("**Full Template JSON:**") + lines.append("```json") + lines.append(json.dumps(template, indent=2, default=str)) + lines.append("```") + lines.append("") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get templates: {e}", raw=True) + # ------------------------------------------------------------------ # IaC template conversion helpers # ------------------------------------------------------------------ diff --git a/modules/spotlight.py b/modules/spotlight.py new file mode 100644 index 0000000..0b356d1 --- /dev/null +++ b/modules/spotlight.py @@ -0,0 +1,80 @@ +""" +Spotlight Module — vulnerability evaluation logic via the SpotlightEvaluationLogic API. + +Tools: + spotlight_supported_evaluations — Get supported vulnerability evaluation logic +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Annotated, Optional + +from falconpy import SpotlightEvaluationLogic + +from common.errors import format_api_error +from modules.base import BaseModule +from utils import format_text_response + +if TYPE_CHECKING: + from mcp.server.fastmcp import FastMCP + + +class SpotlightModule(BaseModule): + """Spotlight vulnerability evaluation logic queries.""" + + def __init__(self, client): + super().__init__(client) + self.falcon = SpotlightEvaluationLogic(auth_object=self.client.auth_object) + self._log("Initialized") + + def register_tools(self, server: FastMCP) -> None: + self._add_tool( + server, + self.spotlight_supported_evaluations, + name="spotlight_supported_evaluations", + description=( + "Get supported vulnerability evaluation logic — assessment methods, " + "OS/platform coverage, and evaluation criteria. Use to check if Spotlight " + "can evaluate a specific CVE or what platforms are covered." + ), + ) + + async def spotlight_supported_evaluations( + self, + filter: Annotated[Optional[str], "FQL filter expression (e.g. platform:'Windows')"] = None, + ) -> str: + """Get combined supported evaluation logic.""" + try: + kwargs = {} + if filter: + kwargs["filter"] = filter + + response = self.falcon.combined_supported_evaluation(**kwargs) + + if response["status_code"] != 200: + err = format_api_error(response, "Failed to get evaluations", operation="combinedSupportedEvaluationExt") + return format_text_response(f"Failed to get supported evaluations: {err}", raw=True) + + resources = response.get("body", {}).get("resources", []) + lines = [f"Spotlight Supported Evaluations ({len(resources)} results)", ""] + + if not resources: + lines.append("No evaluation logic found matching the filter.") + else: + for i, ev in enumerate(resources, 1): + lines.append(f"{i}. **{ev.get('name', 'Unknown')}**") + lines.append(f" - ID: {ev.get('id', 'N/A')}") + if ev.get("platforms"): + lines.append(f" - Platforms: {', '.join(ev['platforms'])}") + if ev.get("cve_ids"): + lines.append(f" - CVEs: {', '.join(ev['cve_ids'][:10])}") + lines.append("") + + lines.append("```json") + lines.append(json.dumps(resources, indent=2, default=str)) + lines.append("```") + + return format_text_response("\n".join(lines), raw=True) + except Exception as e: + return format_text_response(f"Failed to get supported evaluations: {e}", raw=True) diff --git a/requirements.txt b/requirements.txt index d457668..874f17c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -crowdstrike-falconpy>=1.6.0 +crowdstrike-falconpy>=1.6.1 mcp>=1.12.1 uvicorn>=0.27.0 python-dotenv>=1.0.0 diff --git a/tests/test_cao_hunting.py b/tests/test_cao_hunting.py new file mode 100644 index 0000000..94e7971 --- /dev/null +++ b/tests/test_cao_hunting.py @@ -0,0 +1,333 @@ +"""Tests for CAOHuntingModule — intelligence queries and hunting guides.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture +def cao_module(mock_client): + """Create a CAOHuntingModule with mocked CAOHunting service.""" + with patch("modules.cao_hunting.CAOHunting"): + from modules.cao_hunting import CAOHuntingModule + + module = CAOHuntingModule(mock_client) + module._cao_hunting = MagicMock() + return module + + +# ------------------------------------------------------------------ +# Search Queries +# ------------------------------------------------------------------ + + +class TestSearchQueries: + """Test cao_search_queries tool.""" + + def test_returns_queries_with_hydration(self, cao_module): + """Search returns IDs, then hydrates with full details.""" + cao_module._cao_hunting.search_queries.return_value = { + "status_code": 200, + "body": { + "resources": ["q1", "q2"], + "meta": {"pagination": {"total": 50}}, + }, + } + cao_module._cao_hunting.get_queries.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"id": "q1", "name": "Ransomware Hunt", "description": "Detect ransomware", "tags": ["ransomware"]}, + {"id": "q2", "name": "Lateral Movement", "description": "Detect lateral movement", "tags": ["lateral"]}, + ], + }, + } + + result = asyncio.run(cao_module.cao_search_queries()) + assert "Ransomware Hunt" in result + assert "Lateral Movement" in result + assert "50 total" in result + + def test_empty_results(self, cao_module): + """No matching queries returns empty message.""" + cao_module._cao_hunting.search_queries.return_value = { + "status_code": 200, + "body": { + "resources": [], + "meta": {"pagination": {"total": 0}}, + }, + } + + result = asyncio.run(cao_module.cao_search_queries(filter="tags:'nonexistent'")) + assert "No intelligence queries found" in result + + def test_passes_filter_and_q(self, cao_module): + """Filter and q parameters are forwarded to the API.""" + cao_module._cao_hunting.search_queries.return_value = { + "status_code": 200, + "body": {"resources": [], "meta": {"pagination": {"total": 0}}}, + } + + asyncio.run(cao_module.cao_search_queries(filter="tags:'apt'", q="ransomware", sort="created_on|desc")) + cao_module._cao_hunting.search_queries.assert_called_once_with(limit=20, filter="tags:'apt'", q="ransomware", sort="created_on|desc") + + def test_search_api_error(self, cao_module): + """403 error includes scope guidance.""" + cao_module._cao_hunting.search_queries.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Insufficient permissions"}]}, + } + + result = asyncio.run(cao_module.cao_search_queries()) + assert "Failed to search intelligence queries" in result + assert "403" in result + + def test_translated_content(self, cao_module): + """Include translated content when requested.""" + cao_module._cao_hunting.search_queries.return_value = { + "status_code": 200, + "body": { + "resources": ["q1"], + "meta": {"pagination": {"total": 1}}, + }, + } + cao_module._cao_hunting.get_queries.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "q1", + "name": "Test Query", + "translated_content": {"SPL": "index=main sourcetype=..."}, + }, + ], + }, + } + + result = asyncio.run(cao_module.cao_search_queries(include_translated_content=True)) + assert "SPL" in result + cao_module._cao_hunting.get_queries.assert_called_once_with(ids=["q1"], include_translated_content="__all__") + + +# ------------------------------------------------------------------ +# Get Queries +# ------------------------------------------------------------------ + + +class TestGetQueries: + """Test cao_get_queries tool.""" + + def test_get_by_ids(self, cao_module): + """Direct get by comma-separated IDs.""" + cao_module._cao_hunting.get_queries.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"id": "q1", "name": "Query One", "tags": ["tag1"]}, + ], + }, + } + + result = asyncio.run(cao_module.cao_get_queries(ids="q1")) + assert "Query One" in result + + def test_empty_ids(self, cao_module): + """Empty IDs string returns error.""" + result = asyncio.run(cao_module.cao_get_queries(ids="")) + assert "No valid IDs" in result + + def test_multiple_ids(self, cao_module): + """Comma-separated IDs are parsed correctly.""" + cao_module._cao_hunting.get_queries.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + + asyncio.run(cao_module.cao_get_queries(ids="q1, q2, q3")) + cao_module._cao_hunting.get_queries.assert_called_once_with(ids=["q1", "q2", "q3"]) + + +# ------------------------------------------------------------------ +# Search Guides +# ------------------------------------------------------------------ + + +class TestSearchGuides: + """Test cao_search_guides tool.""" + + def test_returns_guides_with_hydration(self, cao_module): + """Search returns IDs, then hydrates with full details.""" + cao_module._cao_hunting.search_guides.return_value = { + "status_code": 200, + "body": { + "resources": ["g1"], + "meta": {"pagination": {"total": 10}}, + }, + } + cao_module._cao_hunting.get_guides.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"id": "g1", "name": "APT Hunting Guide", "description": "How to hunt APTs"}, + ], + }, + } + + result = asyncio.run(cao_module.cao_search_guides()) + assert "APT Hunting Guide" in result + assert "10 total" in result + + def test_empty_results(self, cao_module): + """No matching guides returns empty message.""" + cao_module._cao_hunting.search_guides.return_value = { + "status_code": 200, + "body": {"resources": [], "meta": {"pagination": {"total": 0}}}, + } + + result = asyncio.run(cao_module.cao_search_guides()) + assert "No hunting guides found" in result + + def test_search_api_error(self, cao_module): + """API error is reported.""" + cao_module._cao_hunting.search_guides.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + + result = asyncio.run(cao_module.cao_search_guides()) + assert "Failed to search hunting guides" in result + + +# ------------------------------------------------------------------ +# Get Guides +# ------------------------------------------------------------------ + + +class TestGetGuides: + """Test cao_get_guides tool.""" + + def test_get_by_ids(self, cao_module): + """Direct get by IDs.""" + cao_module._cao_hunting.get_guides.return_value = { + "status_code": 200, + "body": { + "resources": [ + {"id": "g1", "name": "Guide One", "content": "Step 1: ..."}, + ], + }, + } + + result = asyncio.run(cao_module.cao_get_guides(ids="g1")) + assert "Guide One" in result + + def test_empty_ids(self, cao_module): + """Empty IDs string returns error.""" + result = asyncio.run(cao_module.cao_get_guides(ids=" ")) + assert "No valid IDs" in result + + +# ------------------------------------------------------------------ +# Aggregate +# ------------------------------------------------------------------ + + +class TestAggregate: + """Test cao_aggregate tool.""" + + def test_terms_aggregation_queries(self, cao_module): + """Terms aggregation on intelligence queries.""" + cao_module._cao_hunting.aggregate_queries.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "buckets": [ + {"key": "high", "label": "high", "count": 42}, + {"key": "medium", "label": "medium", "count": 18}, + ], + }, + ], + }, + } + + result = asyncio.run(cao_module.cao_aggregate(field="severity")) + assert "high: 42" in result + assert "medium: 18" in result + + def test_aggregation_guides(self, cao_module): + """Aggregation routes to guides when resource_type=guides.""" + cao_module._cao_hunting.aggregate_guides.return_value = { + "status_code": 200, + "body": {"resources": [{"buckets": []}]}, + } + + asyncio.run(cao_module.cao_aggregate(field="tags", resource_type="guides")) + cao_module._cao_hunting.aggregate_guides.assert_called_once() + cao_module._cao_hunting.aggregate_queries.assert_not_called() + + def test_invalid_resource_type(self, cao_module): + """Invalid resource_type returns error.""" + result = asyncio.run(cao_module.cao_aggregate(field="severity", resource_type="invalid")) + assert "Invalid resource_type" in result + + def test_aggregation_with_filter(self, cao_module): + """Filter is included in aggregation body.""" + cao_module._cao_hunting.aggregate_queries.return_value = { + "status_code": 200, + "body": {"resources": [{"buckets": []}]}, + } + + asyncio.run(cao_module.cao_aggregate(field="tags", filter="severity:'high'", size=5)) + call_kwargs = cao_module._cao_hunting.aggregate_queries.call_args + body = call_kwargs.kwargs.get("body") + assert body[0]["filter"] == "severity:'high'" + assert body[0]["size"] == 5 + + def test_aggregation_api_error(self, cao_module): + """API error is reported.""" + cao_module._cao_hunting.aggregate_queries.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Access denied"}]}, + } + + result = asyncio.run(cao_module.cao_aggregate(field="severity")) + assert "Failed to aggregate queries" in result + + +# ------------------------------------------------------------------ +# Tool Registration +# ------------------------------------------------------------------ + + +class TestToolRegistration: + """Verify tool registration.""" + + def test_all_tools_registered(self, cao_module): + """All 5 tools should be registered.""" + mock_server = MagicMock() + mock_server.tool.return_value = lambda fn: fn + + cao_module.register_tools(mock_server) + + expected = { + "cao_search_queries", + "cao_get_queries", + "cao_search_guides", + "cao_get_guides", + "cao_aggregate", + } + assert set(cao_module.tools) == expected + + def test_all_tools_are_read_tier(self, cao_module): + """All tools should register even with allow_writes=False.""" + cao_module.allow_writes = False + mock_server = MagicMock() + mock_server.tool.return_value = lambda fn: fn + + cao_module.register_tools(mock_server) + assert len(cao_module.tools) == 5 diff --git a/tests/test_case_management_new_tools.py b/tests/test_case_management_new_tools.py new file mode 100644 index 0000000..eafec9d --- /dev/null +++ b/tests/test_case_management_new_tools.py @@ -0,0 +1,195 @@ +"""Tests for new case management tools added in FalconPy v1.6.1.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture +def case_module(mock_client): + """Create CaseManagementModule with mocked API.""" + with patch("modules.case_management.CaseManagement") as MockCM: + mock_cm = MagicMock() + MockCM.return_value = mock_cm + from modules.case_management import CaseManagementModule + + module = CaseManagementModule(mock_client) + module.falcon = mock_cm + return module + + +class TestCaseQueryAccessTags: + """Test case_query_access_tags tool.""" + + def test_returns_tag_ids(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": ["tag-001", "tag-002"], + "meta": {"pagination": {"total": 2}}, + }, + } + result = asyncio.run(case_module.case_query_access_tags()) + assert "tag-001" in result + assert "tag-002" in result + + def test_handles_empty_results(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 200, + "body": { + "resources": [], + "meta": {"pagination": {"total": 0}}, + }, + } + result = asyncio.run(case_module.case_query_access_tags()) + assert "no access tags" in result.lower() or "0" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.query_access_tags.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.run(case_module.case_query_access_tags()) + assert "failed" in result.lower() + + +class TestCaseGetAccessTags: + """Test case_get_access_tags tool.""" + + def test_returns_tag_details(self, case_module): + case_module.falcon.get_access_tags.return_value = { + "status_code": 200, + "body": {"resources": [{"id": "tag-001", "name": "SOC-Team", "description": "SOC team access"}]}, + } + result = asyncio.run(case_module.case_get_access_tags(tag_ids=["tag-001"])) + assert "SOC-Team" in result + assert "tag-001" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_access_tags.return_value = { + "status_code": 404, + "body": {"errors": [{"message": "Not found"}]}, + } + result = asyncio.run(case_module.case_get_access_tags(tag_ids=["bad-id"])) + assert "failed" in result.lower() + + +class TestCaseAggregateAccessTags: + """Test case_aggregate_access_tags tool.""" + + def test_returns_aggregation_data(self, case_module): + case_module.falcon.aggregate_access_tags.return_value = { + "status_code": 200, + "body": {"resources": [{"name": "tag_count", "buckets": [{"label": "SOC", "count": 5}]}]}, + } + result = asyncio.run( + case_module.case_aggregate_access_tags( + date_ranges=[], + field="name", + filter="", + name="tag_count", + type="terms", + ) + ) + assert "tag_count" in result or "SOC" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.aggregate_access_tags.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.run( + case_module.case_aggregate_access_tags( + date_ranges=[], + field="name", + filter="", + name="tag_count", + type="terms", + ) + ) + assert "failed" in result.lower() + + +class TestCaseGetRtrFileMetadata: + """Test case_get_rtr_file_metadata tool.""" + + def test_returns_file_metadata(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "file-001", + "file_name": "suspicious.exe", + "file_size": 1024, + "sha256": "abc123def456", + } + ] + }, + } + result = asyncio.run(case_module.case_get_rtr_file_metadata(case_id="case-123")) + assert "suspicious.exe" in result + assert "file-001" in result + + def test_handles_no_files(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.run(case_module.case_get_rtr_file_metadata(case_id="case-123")) + assert "no rtr" in result.lower() or "0" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_rtr_file_metadata.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.run(case_module.case_get_rtr_file_metadata(case_id="case-123")) + assert "failed" in result.lower() + + +class TestCaseGetRtrRecentFiles: + """Test case_get_rtr_recent_files tool.""" + + def test_returns_recent_files(self, case_module): + case_module.falcon.get_rtr_recent_files.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "file-002", + "file_name": "collected.log", + "created_on": "2026-03-31T12:00:00Z", + } + ] + }, + } + result = asyncio.run(case_module.case_get_rtr_recent_files(case_id="case-123")) + assert "collected.log" in result + + def test_handles_api_error(self, case_module): + case_module.falcon.get_rtr_recent_files.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.run(case_module.case_get_rtr_recent_files(case_id="case-123")) + assert "failed" in result.lower() + + +class TestToolRegistration: + """Verify new tools register correctly.""" + + def test_all_new_tools_register_as_read(self, case_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + case_module.register_tools(server) + assert "case_query_access_tags" in case_module.tools + assert "case_get_access_tags" in case_module.tools + assert "case_aggregate_access_tags" in case_module.tools + assert "case_get_rtr_file_metadata" in case_module.tools + assert "case_get_rtr_recent_files" in case_module.tools diff --git a/tests/test_correlation_import.py b/tests/test_correlation_import.py index 797f437..eecaab0 100644 --- a/tests/test_correlation_import.py +++ b/tests/test_correlation_import.py @@ -119,7 +119,7 @@ class TestDryRunMode: def test_dry_run_returns_yaml(self, correlation_module): _mock_get_rules(correlation_module.falcon, MOCK_RULE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="aws", @@ -133,7 +133,7 @@ def test_dry_run_returns_yaml(self, correlation_module): def test_dry_run_does_not_write_file(self, correlation_module, tmp_path): correlation_module._detections_repo_path = str(tmp_path) _mock_get_rules(correlation_module.falcon, MOCK_RULE) - asyncio.get_event_loop().run_until_complete( + asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="aws", @@ -153,7 +153,7 @@ def test_writes_yaml_file(self, correlation_module, tmp_path): (tmp_path / "resources" / "detections" / "aws").mkdir(parents=True) _mock_get_rules(correlation_module.falcon, MOCK_RULE) - asyncio.get_event_loop().run_until_complete( + asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="aws", @@ -173,7 +173,7 @@ def test_refuses_overwrite_existing_file(self, correlation_module, tmp_path): (target_dir / "aws_-_cloudtrail_-_suspicious_iam_activity.yaml").write_text("existing") _mock_get_rules(correlation_module.falcon, MOCK_RULE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="aws", @@ -185,7 +185,7 @@ def test_refuses_overwrite_existing_file(self, correlation_module, tmp_path): def test_falls_back_to_dry_run_when_path_not_writable(self, correlation_module): correlation_module._detections_repo_path = None _mock_get_rules(correlation_module.falcon, MOCK_RULE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="aws", @@ -202,7 +202,7 @@ class TestVendorValidation: def test_rejects_invalid_vendor(self, correlation_module): _mock_get_rules(correlation_module.falcon, MOCK_RULE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( correlation_module.correlation_import_to_iac( rule_id="rule-uuid-123", vendor="invalid_vendor", diff --git a/tests/test_correlation_templates.py b/tests/test_correlation_templates.py new file mode 100644 index 0000000..71e8978 --- /dev/null +++ b/tests/test_correlation_templates.py @@ -0,0 +1,112 @@ +"""Tests for correlation rule template tools added in FalconPy v1.6.1.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +MOCK_TEMPLATE = { + "id": "template-uuid-001", + "name": "Lateral Movement - RDP Brute Force", + "description": "Detects repeated RDP login failures indicating brute force attempts.", + "severity": 60, + "search": { + "filter": "#event_simpleName=UserLogonFailed2 LogonType=10 | groupBy([aid, UserName], function=count()) | count > 10", + }, + "created_on": "2026-01-15T00:00:00Z", + "updated_on": "2026-03-20T00:00:00Z", +} + + +@pytest.fixture +def correlation_module(mock_client): + """Create CorrelationModule with mocked API.""" + with patch("modules.correlation.CorrelationRules") as MockCR: + mock_cr = MagicMock() + MockCR.return_value = mock_cr + from modules.correlation import CorrelationModule + + module = CorrelationModule(mock_client) + module.falcon = mock_cr + return module + + +class TestCorrelationListTemplates: + """Test correlation_list_templates tool.""" + + def test_returns_template_ids(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 200, + "body": { + "resources": ["template-uuid-001", "template-uuid-002"], + "meta": {"pagination": {"total": 2}}, + }, + } + result = asyncio.run(correlation_module.correlation_list_templates()) + assert "template-uuid-001" in result + assert "template-uuid-002" in result + + def test_handles_empty_results(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 200, + "body": { + "resources": [], + "meta": {"pagination": {"total": 0}}, + }, + } + result = asyncio.run(correlation_module.correlation_list_templates()) + assert "no templates" in result.lower() or "0" in result + + def test_handles_api_error(self, correlation_module): + correlation_module.falcon.query_templates.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.run(correlation_module.correlation_list_templates()) + assert "failed" in result.lower() + + +class TestCorrelationGetTemplate: + """Test correlation_get_template tool.""" + + def test_returns_template_details(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 200, + "body": {"resources": [MOCK_TEMPLATE]}, + } + result = asyncio.run(correlation_module.correlation_get_template(template_ids=["template-uuid-001"])) + assert "Lateral Movement" in result + assert "template-uuid-001" in result + assert "RDP" in result + + def test_handles_not_found(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.run(correlation_module.correlation_get_template(template_ids=["bad-id"])) + assert "no templates found" in result.lower() + + def test_handles_api_error(self, correlation_module): + correlation_module.falcon.get_templates.return_value = { + "status_code": 500, + "body": {"errors": [{"message": "Internal error"}]}, + } + result = asyncio.run(correlation_module.correlation_get_template(template_ids=["template-uuid-001"])) + assert "failed" in result.lower() + + +class TestTemplateToolRegistration: + """Verify template tools register correctly.""" + + def test_template_tools_register_as_read(self, correlation_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + correlation_module.register_tools(server) + assert "correlation_list_templates" in correlation_module.tools + assert "correlation_get_template" in correlation_module.tools diff --git a/tests/test_response_module.py b/tests/test_response_module.py index 49a8099..af33b93 100644 --- a/tests/test_response_module.py +++ b/tests/test_response_module.py @@ -60,7 +60,7 @@ class TestHostContainPreFlight: def test_preview_returns_device_details(self, response_module): _mock_device_lookup(response_module.hosts, MOCK_DEVICE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="abc123", reason="Cryptominer confirmed", @@ -74,7 +74,7 @@ def test_preview_returns_device_details(self, response_module): def test_already_contained_returns_noop(self, response_module): _mock_device_lookup(response_module.hosts, MOCK_CONTAINED_DEVICE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="abc123", reason="Cryptominer confirmed", @@ -85,7 +85,7 @@ def test_already_contained_returns_noop(self, response_module): def test_excluded_tag_blocks_containment(self, response_module): _mock_device_lookup(response_module.hosts, MOCK_DNC_DEVICE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="abc123", reason="Test", @@ -99,7 +99,7 @@ def test_device_not_found_returns_error(self, response_module): "status_code": 200, "body": {"resources": []}, } - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="nonexistent", reason="Test", @@ -118,7 +118,7 @@ def test_contain_succeeds_with_confirm(self, response_module): "status_code": 202, "body": {"resources": [{"id": "abc123"}]}, } - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="abc123", reason="Cryptominer confirmed", @@ -136,7 +136,7 @@ def test_contain_api_failure(self, response_module): "status_code": 403, "body": {"errors": [{"message": "Insufficient permissions"}]}, } - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_contain( device_id="abc123", reason="Test", @@ -151,7 +151,7 @@ class TestHostLiftContainment: def test_preview_shows_contained_device(self, response_module): _mock_device_lookup(response_module.hosts, MOCK_CONTAINED_DEVICE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_lift_containment( device_id="abc123", reason="Investigation complete", @@ -163,7 +163,7 @@ def test_preview_shows_contained_device(self, response_module): def test_not_contained_returns_noop(self, response_module): _mock_device_lookup(response_module.hosts, MOCK_DEVICE) - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_lift_containment( device_id="abc123", reason="Test", @@ -178,7 +178,7 @@ def test_lift_succeeds_with_confirm(self, response_module): "status_code": 202, "body": {"resources": [{"id": "abc123"}]}, } - result = asyncio.get_event_loop().run_until_complete( + result = asyncio.run( response_module.host_lift_containment( device_id="abc123", reason="Investigation complete", @@ -200,7 +200,7 @@ def test_contain_writes_audit_entry(self, response_module, tmp_path): "status_code": 202, "body": {"resources": [{"id": "abc123"}]}, } - asyncio.get_event_loop().run_until_complete( + asyncio.run( response_module.host_contain( device_id="abc123", reason="Cryptominer confirmed", diff --git a/tests/test_smoke_tools_list.py b/tests/test_smoke_tools_list.py index 6476d3a..7d909e0 100644 --- a/tests/test_smoke_tools_list.py +++ b/tests/test_smoke_tools_list.py @@ -11,6 +11,7 @@ # We patch these so no real auth is required. _FALCONPY_PATCHES = [ "modules.alerts.Alerts", + "modules.cao_hunting.CAOHunting", "modules.case_management.CaseManagement", "modules.cloud_registration.CSPMRegistration", "modules.cloud_security.CloudSecurity", @@ -21,6 +22,7 @@ "modules.hosts.Hosts", "modules.ngsiem.NGSIEM", "modules.response.Hosts", + "modules.spotlight.SpotlightEvaluationLogic", ] # Expected tool sets — update these when adding/removing tools @@ -38,12 +40,25 @@ "case_query", "case_get", "case_get_fields", + "cao_search_queries", + "cao_get_queries", + "cao_search_guides", + "cao_get_guides", + "cao_aggregate", "cloud_list_accounts", "cloud_policy_settings", "cloud_get_risks", "cloud_get_iom_detections", "cloud_query_assets", "cloud_compliance_by_account", + "case_query_access_tags", + "case_get_access_tags", + "case_aggregate_access_tags", + "case_get_rtr_file_metadata", + "case_get_rtr_recent_files", + "correlation_list_templates", + "correlation_get_template", + "spotlight_supported_evaluations", } EXPECTED_WRITE_TOOLS = { @@ -67,6 +82,7 @@ def _patch_falconpy(): """Patch all FalconPy service classes to MagicMock so no real auth is needed.""" with ( patch.multiple("modules.alerts", Alerts=MagicMock()), + patch.multiple("modules.cao_hunting", CAOHunting=MagicMock()), patch.multiple("modules.case_management", CaseManagement=MagicMock()), patch.multiple("modules.cloud_registration", CSPMRegistration=MagicMock()), patch.multiple("modules.cloud_security", CloudSecurity=MagicMock(), CloudSecurityDetections=MagicMock(), CloudSecurityAssets=MagicMock()), @@ -74,6 +90,7 @@ def _patch_falconpy(): patch.multiple("modules.hosts", Hosts=MagicMock()), patch.multiple("modules.ngsiem", NGSIEM=MagicMock()), patch.multiple("modules.response", Hosts=MagicMock()), + patch.multiple("modules.spotlight", SpotlightEvaluationLogic=MagicMock()), ): yield diff --git a/tests/test_spotlight.py b/tests/test_spotlight.py new file mode 100644 index 0000000..9cf528e --- /dev/null +++ b/tests/test_spotlight.py @@ -0,0 +1,79 @@ +"""Tests for Spotlight evaluation logic module.""" + +import asyncio +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + + +@pytest.fixture +def spotlight_module(mock_client): + """Create SpotlightModule with mocked API.""" + with patch("modules.spotlight.SpotlightEvaluationLogic") as MockSEL: + mock_sel = MagicMock() + MockSEL.return_value = mock_sel + from modules.spotlight import SpotlightModule + + module = SpotlightModule(mock_client) + module.falcon = mock_sel + return module + + +class TestSpotlightSupportedEvaluations: + """Test spotlight_supported_evaluations tool.""" + + def test_returns_evaluation_data(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": { + "resources": [ + { + "id": "eval-001", + "name": "Windows Kernel Vulnerability", + "platforms": ["Windows"], + "cve_ids": ["CVE-2024-1234"], + } + ] + }, + } + result = asyncio.run(spotlight_module.spotlight_supported_evaluations()) + assert "Windows Kernel Vulnerability" in result + assert "eval-001" in result + + def test_handles_empty_results(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + result = asyncio.run(spotlight_module.spotlight_supported_evaluations()) + assert "no evaluation" in result.lower() or "0" in result + + def test_handles_api_error(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 403, + "body": {"errors": [{"message": "Forbidden"}]}, + } + result = asyncio.run(spotlight_module.spotlight_supported_evaluations()) + assert "failed" in result.lower() + + def test_passes_filter_parameter(self, spotlight_module): + spotlight_module.falcon.combined_supported_evaluation.return_value = { + "status_code": 200, + "body": {"resources": []}, + } + asyncio.run(spotlight_module.spotlight_supported_evaluations(filter="platform:'Windows'")) + spotlight_module.falcon.combined_supported_evaluation.assert_called_once_with(filter="platform:'Windows'") + + +class TestSpotlightToolRegistration: + """Verify spotlight tool registers correctly.""" + + def test_tool_registers_as_read(self, spotlight_module): + server = MagicMock() + server.tool.return_value = lambda fn: fn + spotlight_module.register_tools(server) + assert "spotlight_supported_evaluations" in spotlight_module.tools