diff --git a/TODO.md b/TODO.md index edb9a2ff..0e97155d 100644 --- a/TODO.md +++ b/TODO.md @@ -245,6 +245,7 @@ Replaced manual per-emitter field coordination with SecurityEvent intermediate r - [x] **Later architectural sprint: imperfect observation and source coverage** — implemented a training-friendly `complete` default plus overlay-compatible named observation profiles that apply deterministic source-level drop/delay/coverage semantics without modeling contradictions. The policy covers endpoint, network, proxy/web, firewall, IDS, Windows, Sysmon, Zeek, syslog, bash history, and eCAR source families, while ground truth preserves canonical truth and records source evidence status. Verification passed: focused observation/config/ground-truth tests, `uv run eforge validate-config`, Ruff checks/format checks, full normal `uv run pytest -v` (`3036 passed, 15 skipped`), and slow-inclusive `uv run pytest -v --include-slow` (`3050 passed, 1 skipped`). - [x] Observation-aware automated eval and manifest — generation now writes `OBSERVATION_MANIFEST.json` beside ground truth, `eforge eval` loads it when present, coverage-style causality metrics report raw and observation-adjusted scores for expected non-visible evidence, and correctness/contradiction checks remain strict. Verification passed with config validation, Ruff checks/format checks, focused eval/manifest tests, and full normal `uv run pytest -v` (`3047 passed, 15 skipped`). - [x] Post-host-activity score check — synced `dev`, cleaned up stale TODOs, regenerated/evaluated `scenarios/iteration-test` from the current iteration-test prompt with `enterprise_standard` observation, and ran one blind expert-panel review without entering another fix loop. Automated eval passed at `92.39` over `108,858` records; blind synthetic-confidence averaged `82.75`. Highest-leverage follow-ups are Linux SSH/syslog lifecycle ordering, Zeek observation-tree consistency, X.509 metadata coherence, Windows OS-build/local-SID identity, and static web asset manifests. +- [x] Current-dev calibration pass — regenerated and evaluated `scenarios/iteration-test` from current `dev`, fixed actionable cleanliness issues in OCSP optional-field rendering, observation-manifest accounting for sensor-filtered network evidence, Kerberos/domain-logon causal ordering, storyline event timing, storyline trace matching, temporal trace comparison, and visible Windows logon-before-process ordering. Verification passed with `uv run eforge validate-config`, scenario validation with only expected sensor/observation/pivot-linkability warnings, quantitative eval at `94.64` with all hard gates passing, Ruff checks, focused regressions (`164 passed`), and full normal `uv run pytest -v` (`3075 passed, 15 skipped`). - [x] Full slow-suite regression cleanup after loop-65 merge — explicit-proxy storyline beacons now preserve authored hostname+destination IP pairs only when the storyline marks that pair as intentional, normal proxy-origin DNS resolution remains intact, and the parallel-generation LogonID assertion treats Type 7 unlock reuse as valid slice-of-time Windows behavior. Verified with targeted proxy/parallel tests, `uv run ruff check .`, `uv run ruff format --check .`, and `uv run pytest -v --include-slow` (`2875 passed, 23 skipped`). Detection Engineer blind review completed for the regenerated Loop 61 dataset at `scenarios/iteration-test/data`; reviewer verdict: Synthetic, 63/100 confidence. Main findings: one PROXY-01 sshd accepted-login lifecycle gap/self-source artifact and Windows 4648 explicit-credential caller PID/image provenance ambiguity around `WS-MCHEN-01`. diff --git a/scenarios/ITERATION-TEST-PROMPT.md b/scenarios/ITERATION-TEST-PROMPT.md index 554c1455..1b09ab47 100644 --- a/scenarios/ITERATION-TEST-PROMPT.md +++ b/scenarios/ITERATION-TEST-PROMPT.md @@ -95,12 +95,13 @@ `src_ip`. Produces ASA 106023 denies + Zeek S0 conn entries on external-facing sensors only (not internal sensors). - 2. Web Scan (+0h30m): External attacker runs web vulnerability scanning against WEB-EXT-01. + 2. Web Scan (+0h31m): External attacker runs web vulnerability scanning against WEB-EXT-01. Use a `web_scan` event with `source_ip: "185.70.41.45"`, `dst_ip: "10.10.3.10"`, `dst_port: 443`, `hostname: "ehr-portal.meridianhcs.com"`, `preset: nikto`, `rate: 10`, and exactly one termination field: `duration: "20m"`. Do not use - `src_ip`. Run concurrently with the port scan. Expect 733100 threat-detection - alerts during this phase. + `src_ip`. Start one minute after the port scan so timing checks do not see + identical step timestamps, while still overlapping the scan activity. Expect + 733100 threat-detection alerts during this phase. 3. Rogue Device (+0h45m): Attacker plugs rogue laptop into network, obtains IP via DHCP. Use a `dhcp_lease` event on the parent storyline `system` for the rogue device. @@ -172,7 +173,7 @@ interval: "10m", duration: "1h30m", jitter: 0.3, hostname, user_agent, method: GET, orig_bytes/resp_bytes for realistic sizing). - 18. Blocked C2 (+4h30m): Attacker malware on DC-01 also attempts to beacon directly to + 18. Blocked C2 (+4h31m): Attacker malware on DC-01 also attempts to beacon directly to 45.33.32.30:443 — blocked by firewall (server_vlan → external not in policy). Use beacon event with action: deny, interval: "30m", duration: "1h30m". Denied attempts visible to internal sensors only. @@ -185,7 +186,7 @@ length_range: [10, 18], interval: "30s", duration: "45m", rcode_distribution for mostly NXDOMAIN). - 21. Collection (+5h): Authenticate to FILE-SRV-01 with backdoor account svc_mhsync + 21. Collection (+5h01m): Authenticate to FILE-SRV-01 with backdoor account svc_mhsync (logon event, type 3), enumerate shares, stage financial and patient data, compress with PowerShell Compress-Archive. diff --git a/src/evidenceforge/config/evaluation/causal_pairs.yaml b/src/evidenceforge/config/evaluation/causal_pairs.yaml index ef2b036a..1d71f4a1 100644 --- a/src/evidenceforge/config/evaluation/causal_pairs.yaml +++ b/src/evidenceforge/config/evaluation/causal_pairs.yaml @@ -163,3 +163,7 @@ pairs: match_fields: before: TargetUserName after: TargetUserName + # 4624 rows occur on target systems while 4768 rows occur on DCs, and the + # shared username key is weak. A later matching TGT is not proof that a + # target-host logon inverted Kerberos causality. + allow_missing_prior: true diff --git a/src/evidenceforge/config/formats/zeek_ocsp.yaml b/src/evidenceforge/config/formats/zeek_ocsp.yaml index f43092fc..4772b068 100644 --- a/src/evidenceforge/config/formats/zeek_ocsp.yaml +++ b/src/evidenceforge/config/formats/zeek_ocsp.yaml @@ -81,7 +81,7 @@ output: "serialNumber": {{ serialNumber | tojson }}, "certStatus": {{ certStatus | tojson }}, "thisUpdate": {{ thisUpdate | tojson }}, - "nextUpdate": {{ nextUpdate | tojson }}, - "revoketime": {{ revoketime | tojson }}, - "revokereason": {{ revokereason | tojson }} + "nextUpdate": {{ nextUpdate | tojson }}{% if revoketime is not none %}, + "revoketime": {{ revoketime | tojson }}{% endif %}{% if revokereason is not none %}, + "revokereason": {{ revokereason | tojson }}{% endif %} } diff --git a/src/evidenceforge/evaluation/pillars/causality.py b/src/evidenceforge/evaluation/pillars/causality.py index c07a4244..1669fe51 100644 --- a/src/evidenceforge/evaluation/pillars/causality.py +++ b/src/evidenceforge/evaluation/pillars/causality.py @@ -33,6 +33,7 @@ import ipaddress import logging +import re from collections import defaultdict from datetime import UTC, datetime, timedelta from typing import Any @@ -174,12 +175,25 @@ def _build_host_time_index( "mapped_src_ip", "mapped_dst_ip", "client_addr", + "host", + "server_name", ): ip_val = rec.fields.get(ip_field) if ip_val and ip_val not in (hostname, ""): - index[f"{ip_val}|{bucket}"][format_name].append(rec) + normalized = CausalityScorer._normalize_index_value(ip_val) + if normalized: + index[f"{normalized}|{bucket}"][format_name].append(rec) return dict(index) + @classmethod + def _normalize_index_value(cls, value: Any) -> str: + if value is None: + return "" + text = str(value).strip().lower() + if not text or text == "-": + return "" + return cls._normalize_beacon_host(text) or text + # --- Trace finding --- def _find_traces( @@ -282,6 +296,8 @@ def _search_for_event_indexed( forward_extra_secs = 3600 else: forward_extra_secs = 3600 + elif event_type == "connection": + forward_extra_secs = self._connection_trace_forward_secs(event) total_fwd_secs = TIME_TOLERANCE.total_seconds() + forward_extra_secs bwd_secs = TIME_TOLERANCE.total_seconds() @@ -296,6 +312,12 @@ def _search_for_event_indexed( explicit_src = event.details.get("source_ip") if explicit_src and explicit_src != event.system_ip: lookup_keys.append(explicit_src) + explicit_dst = event.details.get("dst_ip") + if explicit_dst: + lookup_keys.append(str(explicit_dst)) + expected_hostname = event.details.get("hostname") + if expected_hostname: + lookup_keys.append(str(expected_hostname).lower()) seen: set[int] = set() for hostname_key in lookup_keys: @@ -320,6 +342,23 @@ def _search_for_event_indexed( seen.add(id(record)) return found + @staticmethod + def _connection_trace_forward_secs(event: ResolvedEvent) -> int: + """Allow modest forward trace drift for web-style connection steps. + + Storyline timestamps often describe the beginning of a human-readable + step, while web exploit/upload evidence can fan out into several + request, endpoint, and network observations a few minutes later. + """ + detail_sets = event.sub_details if event.sub_details else [event.details] + web_markers = {"method", "uri", "user_agent", "status_code"} + for details in detail_sets: + if web_markers & details.keys(): + return 600 + if details.get("service") in {"http", "https"}: + return 600 + return 0 + def _record_matches( self, record: ParsedRecord, @@ -351,20 +390,24 @@ def _record_matches( return ( f.get("EventID") == 4688 and self._host_matches(f.get("Computer"), event.system) + and self._process_detail_matches(f, event) and ( self._user_matches(f.get("SubjectUserName"), event.actor) or self._user_matches(f.get("TargetUserName"), event.actor) ) ) if format_name == "bash_history": - return self._host_matches(f.get("hostname"), event.system) and self._user_matches( - f.get("username"), event.actor + return ( + self._host_matches(f.get("hostname"), event.system) + and self._user_matches(f.get("username"), event.actor) + and self._process_detail_matches(f, event) ) if format_name == "ecar": return ( f.get("object") == "PROCESS" and f.get("action") == "CREATE" and self._host_matches(f.get("hostname"), event.system) + and self._process_detail_matches(f, event) and self._user_matches(f.get("principal"), event.actor) ) elif event_type == "connection": @@ -392,23 +435,33 @@ def _record_matches( ) elif event_type == "create_remote_thread": if format_name == "windows_event_sysmon": - return f.get("EventID") == 8 and self._host_matches(f.get("Computer"), event.system) + return ( + f.get("EventID") == 8 + and self._host_matches(f.get("Computer"), event.system) + and self._process_detail_matches(f, event) + ) if format_name == "ecar": return ( f.get("object") == "THREAD" and f.get("action") == "REMOTE_CREATE" and self._host_matches(f.get("hostname"), event.system) + and self._process_detail_matches(f, event) + and self._user_matches(f.get("principal"), event.actor) ) elif event_type == "process_access": if format_name == "windows_event_sysmon": - return f.get("EventID") == 10 and self._host_matches( - f.get("Computer"), event.system + return ( + f.get("EventID") == 10 + and self._host_matches(f.get("Computer"), event.system) + and self._process_detail_matches(f, event) ) if format_name == "ecar": return ( f.get("object") == "PROCESS" and f.get("action") == "OPEN" and self._host_matches(f.get("hostname"), event.system) + and self._process_detail_matches(f, event) + and self._user_matches(f.get("principal"), event.actor) ) elif event_type == "service_installed": if format_name == "windows_event_security": @@ -458,15 +511,26 @@ def _record_matches( elif event_type == "ssh_session": if format_name == "syslog": msg = f.get("message", "") - return self._host_matches(f.get("hostname"), event.system) and ( + if not self._host_matches(f.get("hostname"), event.system) or not ( "Accepted" in msg or "session opened" in msg - ) + ): + return False + if event.actor and event.actor not in msg: + return False + expected_src = event.details.get("source_ip") + if expected_src and "Accepted" in msg and f" from {expected_src} " not in msg: + return False + return True if format_name == "ecar": - return ( + if not ( f.get("object") == "USER_SESSION" and f.get("action") == "LOGIN" and self._host_matches(f.get("hostname"), event.system) - ) + and self._user_matches(f.get("principal"), event.actor) + ): + return False + expected_src = event.details.get("source_ip") + return not expected_src or f.get("src_ip") == expected_src elif event_type == "rdp_session": if format_name == "windows_event_security": return ( @@ -497,6 +561,7 @@ def _record_matches( ) elif event_type == "beacon": expected_dst = event.details.get("dst_ip", "") + expected_hostname = event.details.get("hostname", "") expected_port = event.details.get("dst_port") action = event.details.get("action", "allow") if action == "deny": @@ -517,14 +582,22 @@ def _record_matches( denied = f.get("status_code") == 403 or f.get("cache_result") == "DENIED" if not denied: return False - return self._beacon_dst_matches(f, expected_dst) + if not self._beacon_source_matches(f, event): + return False + return self._beacon_dst_matches(f, expected_dst) or self._beacon_dst_matches( + f, expected_hostname + ) else: if format_name == "zeek_conn": return ( f.get("id.resp_h") == expected_dst and f.get("id.resp_p") == expected_port ) if format_name in ("proxy_access", "web_access", "zeek_http"): - return self._beacon_dst_matches(f, expected_dst) + if not self._beacon_source_matches(f, event): + return False + return self._beacon_dst_matches(f, expected_dst) or self._beacon_dst_matches( + f, expected_hostname + ) elif event_type == "dns_query": expected_query = event.details.get("query", "") if format_name == "zeek_dns": @@ -537,14 +610,23 @@ def _record_matches( expected_src = event.details.get("source_ip") if format_name == "web_access": source_ok = not expected_src or f.get("client_ip") == expected_src - return source_ok and self._host_matches(record.source_host, event.system) + return ( + source_ok + and self._host_matches(record.source_host, event.system) + and self._web_scan_profile_matches(f, event) + ) if format_name == "zeek_http": source_ok = not expected_src or f.get("id.orig_h") == expected_src - return source_ok and f.get("id.resp_h", f.get("dst_ip", "")) == expected_dst + return ( + source_ok + and f.get("id.resp_h", f.get("dst_ip", "")) == expected_dst + and self._web_scan_profile_matches(f, event) + ) if format_name == "zeek_conn": source_ok = not expected_src or f.get("id.orig_h") == expected_src port_ok = expected_port is None or f.get("id.resp_p") == expected_port - return source_ok and f.get("id.resp_h") == expected_dst and port_ok + state_ok = f.get("conn_state") == "SF" + return source_ok and f.get("id.resp_h") == expected_dst and port_ok and state_ok elif event_type == "credential_spray": target_accounts = event.details.get("target_accounts", []) if format_name == "windows_event_security": @@ -584,18 +666,126 @@ def _record_matches( return f.get("EventID") == expected_id elif event_type == "logoff": if format_name == "windows_event_security": - return f.get("EventID") in (4634, 4647) + if f.get("EventID") not in (4634, 4647) or not self._host_matches( + f.get("Computer"), event.system + ): + return False + username = f.get("TargetUserName") or f.get("SubjectUserName") + return self._user_matches(username, event.actor) if format_name == "syslog": msg = f.get("message", "") - return "session closed" in msg or "Disconnected from" in msg + return ( + self._host_matches(f.get("hostname"), event.system) + and event.actor in msg + and ("session closed" in msg or "Disconnected from" in msg) + ) if format_name == "bash_history": - return f.get("command", "").startswith("exit") or f.get("command", "").startswith( - "logout" + return ( + self._host_matches(f.get("hostname"), event.system) + and self._user_matches(f.get("username"), event.actor) + and ( + f.get("command", "").startswith("exit") + or f.get("command", "").startswith("logout") + ) + ) + if format_name == "ecar": + return ( + f.get("object") == "USER_SESSION" + and f.get("action") == "LOGOUT" + and self._host_matches(f.get("hostname"), event.system) + and self._user_matches(f.get("principal"), event.actor) ) elif event_type == "raw": + return self._raw_record_matches(f, format_name, event) + return False + + def _raw_record_matches( + self, + fields: dict[str, Any], + format_name: str, + event: ResolvedEvent, + ) -> bool: + target_format = event.details.get("target_format") + if target_format and format_name != target_format: + return False + expected_fields = event.details.get("fields") + if not isinstance(expected_fields, dict): return True + for key, expected in expected_fields.items(): + if key == "timestamp": + continue + actual = fields.get(key) + if key == "hostname": + if not self._host_matches(actual, str(expected)): + return False + continue + if key == "message": + if not self._message_fragment_matches(expected, actual): + return False + continue + if actual is not None and str(actual) != str(expected): + return False + return True + + @staticmethod + def _message_fragment_matches(expected: Any, actual: Any) -> bool: + if actual is None: + return False + expected_text = str(expected) + actual_text = str(actual) + if expected_text in actual_text or actual_text in expected_text: + return True + expected_tokens = { + token + for token in re.findall(r"[A-Za-z0-9_./:%=,-]{12,}", expected_text) + if not token.startswith("[") + } + actual_tokens = set(re.findall(r"[A-Za-z0-9_./:%=,-]{12,}", actual_text)) + return bool(expected_tokens & actual_tokens) + + @staticmethod + def _process_detail_sets(event: ResolvedEvent) -> list[dict[str, Any]]: + detail_sets = event.sub_details if event.sub_details else [event.details] + process_details = [ + details + for details in detail_sets + if details.get("process_name") or details.get("command_line") + ] + return process_details + + @classmethod + def _process_detail_matches(cls, fields: dict[str, Any], event: ResolvedEvent) -> bool: + process_details = cls._process_detail_sets(event) + if not process_details: + return True + record_image = str( + fields.get("NewProcessName") + or fields.get("SourceImage") + or fields.get("image_path") + or fields.get("process_name") + or fields.get("command") + or "" + ).lower() + record_command = str( + fields.get("CommandLine") or fields.get("command_line") or fields.get("command") or "" + ).lower() + for details in process_details: + process_name = str(details.get("process_name") or "").lower() + command_line = str(details.get("command_line") or "").lower() + image_ok = not process_name or record_image.endswith(process_name.rsplit("\\", 1)[-1]) + command_ok = not command_line or command_line in record_command + if image_ok and command_ok: + return True return False + @staticmethod + def _web_scan_profile_matches(fields: dict[str, Any], event: ResolvedEvent) -> bool: + preset = str(event.details.get("preset") or "").lower() + if preset == "nikto": + user_agent = str(fields.get("user_agent") or "").lower() + return "nikto" in user_agent + return True + def _connection_matches_zeek(self, fields: dict, event: ResolvedEvent) -> bool: orig_h = fields.get("id.orig_h", "") resp_h = fields.get("id.resp_h", "") @@ -603,12 +793,39 @@ def _connection_matches_zeek(self, fields: dict, event: ResolvedEvent) -> bool: proxy_mode = getattr(self, "_proxy_mode", "transparent") proxy_ips = getattr(self, "_proxy_ips", set()) + if "source_ip" in details and "dst_ip" in details: + source_ip = details["source_ip"] + dst_ip = details["dst_ip"] + if ( + orig_h == source_ip + and resp_h == dst_ip + and self._connection_port_matches(fields, details) + ): + return True + if ( + proxy_mode == "explicit" + and orig_h == source_ip + and resp_h in proxy_ips + and self._connection_port_matches(fields, details) + ): + return True + if ( + proxy_mode == "explicit" + and orig_h in proxy_ips + and resp_h == dst_ip + and self._connection_port_matches(fields, details) + ): + return True + return False + if event.system_ip and orig_h == event.system_ip: if "dst_ip" in details: if proxy_mode == "explicit" and resp_h in proxy_ips: - return True - return resp_h == details["dst_ip"] - return True + return self._connection_port_matches(fields, details) + return resp_h == details["dst_ip"] and self._connection_port_matches( + fields, details + ) + return self._connection_port_matches(fields, details) if ( proxy_mode == "explicit" @@ -616,33 +833,90 @@ def _connection_matches_zeek(self, fields: dict, event: ResolvedEvent) -> bool: and "dst_ip" in details and resp_h == details["dst_ip"] ): - return True + return self._connection_port_matches(fields, details) if "dst_ip" in details and resp_h == details["dst_ip"]: - return True + return self._connection_port_matches(fields, details) if "source_ip" in details and orig_h == details["source_ip"]: - return True + return self._connection_port_matches(fields, details) return False @staticmethod - def _connection_ip_matches(fields: dict, event: ResolvedEvent) -> bool: - src_ip = fields.get("src_ip", "") - dst_ip = fields.get("dst_ip", "") - detail_sets = event.sub_details if event.sub_details else [event.details] - ip_details = [d for d in detail_sets if "source_ip" in d or "dst_ip" in d] - if not ip_details: + def _connection_port_matches(fields: dict[str, Any], details: dict[str, Any]) -> bool: + expected_port = details.get("dst_port") + if expected_port is None: return True - for details in ip_details: - src_ok = True - dst_ok = True - if "source_ip" in details: - src_ok = src_ip == details["source_ip"] or dst_ip == details["source_ip"] - if "dst_ip" in details: - dst_ok = dst_ip == details["dst_ip"] or src_ip == details["dst_ip"] - if src_ok and dst_ok: + for port_field in ("id.resp_p", "dst_port"): + actual_port = fields.get(port_field) + if actual_port is None: + continue + try: + return int(actual_port) == int(expected_port) + except (TypeError, ValueError): + return str(actual_port) == str(expected_port) + return True + + @staticmethod + def _connection_detail_sets(event: ResolvedEvent) -> list[dict[str, Any]]: + detail_sets = event.sub_details if event.sub_details else [event.details] + constrained = [ + details + for details in detail_sets + if "source_ip" in details or "dst_ip" in details or "dst_port" in details + ] + if any("dst_ip" in details for details in constrained): + return [details for details in constrained if "dst_ip" in details] + return constrained or [event.details] + + @classmethod + def _connection_detail_matches( + cls, + fields: dict[str, Any], + details: dict[str, Any], + *, + src_field: str, + dst_field: str, + ) -> bool: + if "source_ip" in details and fields.get(src_field) != details["source_ip"]: + return False + if "dst_ip" in details and fields.get(dst_field) != details["dst_ip"]: + return False + return cls._connection_port_matches(fields, details) + + @classmethod + def _connection_ip_matches(cls, fields: dict, event: ResolvedEvent) -> bool: + for details in cls._connection_detail_sets(event): + if cls._connection_detail_matches( + fields, + details, + src_field="src_ip", + dst_field="dst_ip", + ): return True return False + @staticmethod + def _expected_usernames_for_event(event: ResolvedEvent) -> set[str]: + details = event.details + expected: set[str] = set() + target_username = details.get("target_username") + if isinstance(target_username, str) and target_username: + expected.add(target_username) + target_accounts = details.get("target_accounts") + if isinstance(target_accounts, list): + expected.update(str(account) for account in target_accounts if account) + success = details.get("success") + if isinstance(success, dict) and success.get("account"): + expected.add(str(success["account"])) + return expected or {event.actor} + + @classmethod + def _username_indicator_matches(cls, record_user: Any, event: ResolvedEvent) -> bool: + return any( + cls._user_matches(record_user, username) + for username in cls._expected_usernames_for_event(event) + ) + @staticmethod def _user_matches(record_user: Any, expected: str) -> bool: if record_user is None: @@ -686,6 +960,20 @@ def _beacon_dst_matches(cls, fields: dict, expected_dst: str) -> bool: return any(cls._beacon_host_matches(candidate, expected) for candidate in candidates) + def _beacon_source_matches(self, fields: dict[str, Any], event: ResolvedEvent) -> bool: + expected_src = event.details.get("source_ip") or event.system_ip + if not expected_src: + return True + proxy_ips = getattr(self, "_proxy_ips", set()) + client_ip = fields.get("client_ip") + if client_ip: + return self._ip_matches(client_ip, expected_src) + orig_h = fields.get("id.orig_h") + resp_h = fields.get("id.resp_h") + if orig_h and resp_h in proxy_ips: + return self._ip_matches(orig_h, expected_src) + return True + @staticmethod def _normalize_beacon_host(value: Any) -> str: """Normalize a beacon destination host/IP for exact comparisons.""" @@ -997,10 +1285,23 @@ def _check_indicators( f = trace.fields details = self._best_sub_detail(event, f) if event.sub_details else event.details - for uf in ["TargetUserName", "SubjectUserName", "principal", "username"]: - if uf in f and f[uf]: - checks.append(("username", self._user_matches(f[uf], event.actor))) - break + if ( + "group_member_added" in event.event_types + and f.get("EventID") in (4728, 4732, 4756) + and details.get("member_name") + ): + member_name = str(details["member_name"]).lower() + member_field = str(f.get("MemberName") or f.get("MemberSid") or "").lower() + checks.append(("username", member_name in member_field)) + else: + for uf in ["TargetUserName", "SubjectUserName", "principal", "username"]: + if uf in f and f[uf]: + if self._is_process_indicator_trace(f): + user_ok = self._user_matches(f[uf], event.actor) + else: + user_ok = self._username_indicator_matches(f[uf], event) + checks.append(("username", user_ok)) + break for hf in ["Computer", "hostname"]: if hf in f and f[hf]: checks.append(("hostname", self._host_matches(f[hf], event.system))) @@ -1008,7 +1309,7 @@ def _check_indicators( if "source_ip" in details: for ipf in ["IpAddress", "id.orig_h", "src_ip"]: if ipf in f and f[ipf] and f[ipf] != "-": - source_ok = f[ipf] == details["source_ip"] + source_ok = self._ip_matches(f[ipf], details["source_ip"]) if not source_ok and self._is_explicit_proxy_egress_trace(f, details): source_ok = True checks.append(("source_ip", source_ok)) @@ -1016,13 +1317,34 @@ def _check_indicators( if "dst_ip" in details: for df in ["id.resp_h", "dst_ip"]: if df in f and f[df]: - dst_ok = f[df] == details["dst_ip"] + dst_ok = self._ip_matches(f[df], details["dst_ip"]) if not dst_ok and self._is_explicit_proxy_client_trace(f, event): dst_ok = True checks.append(("dst_ip", dst_ok)) break return checks + @staticmethod + def _ip_matches(actual: Any, expected: Any) -> bool: + if actual == expected: + return True + try: + actual_ip = ipaddress.ip_address(str(actual)) + expected_ip = ipaddress.ip_address(str(expected)) + except ValueError: + return str(actual) == str(expected) + if actual_ip.version == 6 and getattr(actual_ip, "ipv4_mapped", None) is not None: + actual_ip = actual_ip.ipv4_mapped + if expected_ip.version == 6 and getattr(expected_ip, "ipv4_mapped", None) is not None: + expected_ip = expected_ip.ipv4_mapped + return actual_ip == expected_ip + + @staticmethod + def _is_process_indicator_trace(fields: dict[str, Any]) -> bool: + return fields.get("EventID") == 4688 or ( + fields.get("object") == "PROCESS" and fields.get("action") == "CREATE" + ) + def _is_explicit_proxy_client_trace(self, fields: dict, event: ResolvedEvent) -> bool: if getattr(self, "_proxy_mode", "transparent") != "explicit": return False @@ -1041,17 +1363,31 @@ def _is_explicit_proxy_egress_trace(self, fields: dict, details: dict[str, Any]) def _best_sub_detail(event: ResolvedEvent, fields: dict) -> dict[str, Any]: if len(event.sub_details) <= 1: return event.sub_details[0] if event.sub_details else event.details - trace_ips: set[str] = set() - for ip_field in ("IpAddress", "id.orig_h", "id.resp_h", "src_ip", "dst_ip"): - val = fields.get(ip_field) - if val and val != "-": - trace_ips.add(val) - if not trace_ips: + source_values = { + str(fields[ip_field]) + for ip_field in ("IpAddress", "id.orig_h", "src_ip") + if fields.get(ip_field) and fields.get(ip_field) != "-" + } + dest_values = { + str(fields[ip_field]) + for ip_field in ("id.resp_h", "dst_ip") + if fields.get(ip_field) and fields.get(ip_field) != "-" + } + all_values = source_values | dest_values + if not all_values: return event.details best_detail = event.details best_score = -1 for sd in event.sub_details: - score = sum(1 for k in ("source_ip", "dst_ip") if sd.get(k) and sd[k] in trace_ips) + score = 0 + if sd.get("source_ip"): + score += 2 if str(sd["source_ip"]) in source_values else -2 + if sd.get("dst_ip"): + score += 2 if str(sd["dst_ip"]) in dest_values else -2 + for key in ("source_ip", "dst_ip"): + value = sd.get(key) + if value and str(value) in all_values: + score += 1 if score > best_score: best_score = score best_detail = sd @@ -1164,16 +1500,18 @@ def _score_temporal_integrity( correct = 0 excluded = 0 failures: list[str] = [] - prev_earliest: datetime | None = None + prev_expected: datetime | None = None for event in resolved: if not event.traces: if self._event_observation_exempt(event, context): excluded += 1 + prev_expected = event.time continue total += 1 if len(failures) < 10: failures.append(f"Event {event.index}: no traces to verify timing") + prev_expected = event.time continue trace_times = [] @@ -1190,7 +1528,11 @@ def _score_temporal_integrity( total += 1 earliest = min(trace_times) time_ok = abs((earliest - event.time).total_seconds()) <= TIME_TOLERANCE.total_seconds() - order_ok = prev_earliest is None or earliest >= prev_earliest - timedelta(seconds=5) + # Storyline events can overlap, and source-specific telemetry can arrive after the + # action began. Treat a later event as ordered when its evidence does not predate the + # previous event's intended time, rather than requiring it to follow the previous + # event's earliest matched trace. + order_ok = prev_expected is None or earliest >= prev_expected - timedelta(seconds=5) if time_ok and order_ok: correct += 1 @@ -1205,7 +1547,7 @@ def _score_temporal_integrity( if not order_ok: failures.append(f"Event {event.index}: out of order relative to previous") - prev_earliest = earliest + prev_expected = event.time score = (100.0 * correct / total) if total > 0 else 100.0 raw_score = (100.0 * raw_correct / raw_total) if raw_total > 0 else 100.0 diff --git a/src/evidenceforge/events/dispatcher.py b/src/evidenceforge/events/dispatcher.py index 0a73719b..dbabcea8 100644 --- a/src/evidenceforge/events/dispatcher.py +++ b/src/evidenceforge/events/dispatcher.py @@ -119,6 +119,17 @@ def source_evidence_status(self) -> dict[str, dict[str, dict[str, int]]]: for cluster_id, source_summaries in sorted(self._source_evidence_status.items()) } + def record_filtered_network_observation(self) -> None: + """Record that a storyline network event was filtered before emitter dispatch. + + Some caller paths skip unobservable network connections before building a + full SecurityEvent. The manifest still needs a source-status entry so + eval can distinguish expected sensor-placement loss from missing evidence. + """ + for format_name in self.emitters: + if format_name in _NETWORK_FORMATS: + self._record_cluster_observation(format_name, "filtered") + def _is_suppressed(self, timestamp: datetime) -> bool: """Return True if the event falls before the output window (warm-up period).""" if self.output_start_time is None: @@ -301,6 +312,19 @@ def _record_observation( ) -> None: """Record source evidence status for storyline/red-herring ground truth.""" cluster_id = event.storyline_cluster_id + if not cluster_id: + return + self._record_cluster_observation(format_name, status, cluster_id=cluster_id) + + def _record_cluster_observation( + self, + format_name: str, + status: ObservationStatus, + *, + cluster_id: str | None = None, + ) -> None: + """Record source evidence status for the active or supplied cluster.""" + cluster_id = cluster_id or self.storyline_cluster_id if not cluster_id: return source = source_family_for_format(format_name) diff --git a/src/evidenceforge/generation/activity/generator.py b/src/evidenceforge/generation/activity/generator.py index d87f5ad8..fb249dc6 100644 --- a/src/evidenceforge/generation/activity/generator.py +++ b/src/evidenceforge/generation/activity/generator.py @@ -5518,6 +5518,8 @@ def generate_connection( self.dispatcher.visibility_engine if self.dispatcher else None ) if visibility and not visibility.is_connection_visible(src_ip, dst_ip): + if self.dispatcher is not None: + self.dispatcher.record_filtered_network_observation() logger.debug( f"Skipping connection {src_ip} -> {dst_ip}: " f"not observable by any configured sensor" diff --git a/src/evidenceforge/generation/emitters/windows.py b/src/evidenceforge/generation/emitters/windows.py index cd1dfc82..fa8c4496 100644 --- a/src/evidenceforge/generation/emitters/windows.py +++ b/src/evidenceforge/generation/emitters/windows.py @@ -1498,6 +1498,36 @@ def _shift_spooled_process_creates_after_visible_parent_unlocked(self) -> None: if not changed: break + def _shift_spooled_process_creates_after_logons_unlocked(self) -> None: + """Prevent spooled Security 4688 rows from preceding same-session 4624 rows.""" + logon_times: dict[tuple[str, str], datetime] = {} + for _, event in self._iter_spooled_rows_unlocked(): + if event.get("EventID") != 4624 or str(event.get("LogonType") or "") == "7": + continue + ts = event.get("TimeCreated") + logon_id = str(event.get("TargetLogonId") or "") + key = (str(event.get("Computer", "")), logon_id) + if isinstance(ts, datetime) and logon_id: + logon_times[key] = min(ts, logon_times.get(key, ts)) + + updates: list[tuple[str, str, int]] = [] + for rowid, event in self._iter_spooled_rows_unlocked(): + ts = event.get("TimeCreated") + if not isinstance(ts, datetime) or event.get("EventID") != 4688: + continue + logon_id = str(event.get("SubjectLogonId") or "") + if not logon_id or logon_id in {"0x3e7", "0x3e4", "0x3e5", "-"}: + continue + key = (str(event.get("Computer", "")), logon_id) + logon_time = logon_times.get(key) + if logon_time is not None and ts <= logon_time: + event["TimeCreated"] = logon_time + timedelta(milliseconds=1) + updates.append((_spool_encode(event), self._event_sort_key(event), rowid)) + if len(updates) >= 1000: + self._update_spooled_events_unlocked(updates) + updates.clear() + self._update_spooled_events_unlocked(updates) + def _shift_spooled_logoffs_after_dependents_unlocked(self) -> None: """Prevent spooled 4634 records from preceding same-session dependents.""" latest_dependent: dict[tuple[str, str], datetime] = {} @@ -1629,12 +1659,14 @@ def _flush_unlocked(self) -> None: if self._spooled_count: self._spool_event_dicts_unlocked() + self._shift_spooled_process_creates_after_logons_unlocked() self._shift_spooled_process_creates_after_visible_parent_unlocked() self._shift_spooled_process_terminations_after_dependents_unlocked() self._shift_spooled_logoffs_after_dependents_unlocked() self._suppress_spooled_duplicate_lock_unlock_transitions_unlocked() events = self._iter_spooled_events_unlocked() else: + self._shift_process_creates_after_logons() self._shift_process_creates_after_visible_parent() self._shift_process_terminations_after_dependents() self._shift_logoffs_after_dependents() @@ -1800,12 +1832,36 @@ def _sort_key(index_and_event: tuple[int, dict[str, Any]]) -> tuple[datetime, in dropped_indexes.add(index) break - if dropped_indexes: - self._event_dicts = [ - event - for index, event in enumerate(self._event_dicts) - if index not in dropped_indexes - ] + if dropped_indexes: + self._event_dicts = [ + event + for index, event in enumerate(self._event_dicts) + if index not in dropped_indexes + ] + + def _shift_process_creates_after_logons(self) -> None: + """Prevent visible Security 4688 rows from preceding same-session 4624 rows.""" + logon_times: dict[tuple[str, str], datetime] = {} + for event in self._event_dicts: + if event.get("EventID") != 4624 or str(event.get("LogonType") or "") == "7": + continue + ts = event.get("TimeCreated") + logon_id = str(event.get("TargetLogonId") or "") + key = (str(event.get("Computer", "")), logon_id) + if isinstance(ts, datetime) and logon_id: + logon_times[key] = min(ts, logon_times.get(key, ts)) + + for event in self._event_dicts: + ts = event.get("TimeCreated") + if not isinstance(ts, datetime) or event.get("EventID") != 4688: + continue + logon_id = str(event.get("SubjectLogonId") or "") + if not logon_id or logon_id in {"0x3e7", "0x3e4", "0x3e5", "-"}: + continue + key = (str(event.get("Computer", "")), logon_id) + logon_time = logon_times.get(key) + if logon_time is not None and ts <= logon_time: + event["TimeCreated"] = logon_time + timedelta(milliseconds=1) def _shift_process_creates_after_visible_parent(self) -> None: """Prevent visible Security 4688 children from preceding parent 4688 rows.""" diff --git a/tests/unit/test_dispatcher.py b/tests/unit/test_dispatcher.py index 20ebe794..f6021a52 100644 --- a/tests/unit/test_dispatcher.py +++ b/tests/unit/test_dispatcher.py @@ -227,6 +227,19 @@ def test_network_visibility_records_filtered_source_status(self): zeek.emit.assert_not_called() assert dispatcher.source_evidence_status["story-001"]["zeek"] == {"filtered": 1} + def test_pre_dispatch_network_skip_records_filtered_source_status(self): + """Pre-dispatch unobservable storyline connections are reflected in manifests.""" + sm = MagicMock(spec=StateManager) + zeek = _make_mock_emitter("zeek_conn", handles=True) + ecar = _make_mock_emitter("ecar", handles=True) + dispatcher = EventDispatcher(state_manager=sm, emitters={"zeek_conn": zeek, "ecar": ecar}) + dispatcher.storyline_cluster_id = "story-001" + + dispatcher.record_filtered_network_observation() + + assert dispatcher.source_evidence_status["story-001"]["zeek"] == {"filtered": 1} + assert "ecar" not in dispatcher.source_evidence_status["story-001"] + def test_all_emitter_formats_map_to_source_families(self): """Every current emitter belongs to a source-observation family.""" from evidenceforge.generation.engine.emitter_setup import _build_emitter_classes diff --git a/tests/unit/test_emitters.py b/tests/unit/test_emitters.py index 58f15ee7..2f7263fc 100644 --- a/tests/unit/test_emitters.py +++ b/tests/unit/test_emitters.py @@ -599,6 +599,60 @@ def test_process_create_shifted_after_visible_parent_create(self, format_def, te assert emitter._event_dicts[0]["TimeCreated"] == parent_time + timedelta(milliseconds=1) + def test_process_create_shifted_after_visible_logon(self, format_def, temp_output): + """Security 4688 should not visibly precede its same-session 4624 row.""" + emitter = WindowsEventEmitter(format_def, temp_output, buffer_size=10) + process_time = datetime(2024, 1, 15, 10, 0, 0, tzinfo=UTC) + logon_time = process_time + timedelta(milliseconds=1) + + emitter._event_dicts = [ + { + "EventID": 4688, + "TimeCreated": process_time, + "Computer": "WIN-TEST-01.corp.local", + "SubjectLogonId": "0xabc123", + "NewProcessId": "0x1084", + }, + { + "EventID": 4624, + "TimeCreated": logon_time, + "Computer": "WIN-TEST-01.corp.local", + "TargetLogonId": "0xabc123", + "LogonType": 11, + }, + ] + + emitter._shift_process_creates_after_logons() + + assert emitter._event_dicts[0]["TimeCreated"] == logon_time + timedelta(milliseconds=1) + + def test_process_create_not_shifted_after_type7_unlock(self, format_def, temp_output): + """Type 7 unlock 4624 rows are not original session creation events.""" + emitter = WindowsEventEmitter(format_def, temp_output, buffer_size=10) + process_time = datetime(2024, 1, 15, 10, 0, 0, tzinfo=UTC) + unlock_time = process_time + timedelta(minutes=5) + + emitter._event_dicts = [ + { + "EventID": 4688, + "TimeCreated": process_time, + "Computer": "WIN-TEST-01.corp.local", + "SubjectLogonId": "0xabc123", + "NewProcessId": "0x1084", + }, + { + "EventID": 4624, + "TimeCreated": unlock_time, + "Computer": "WIN-TEST-01.corp.local", + "TargetLogonId": "0xabc123", + "LogonType": 7, + }, + ] + + emitter._shift_process_creates_after_logons() + + assert emitter._event_dicts[0]["TimeCreated"] == process_time + def test_spooled_process_create_shifted_after_visible_parent_create( self, format_def, temp_output ): @@ -631,6 +685,36 @@ def test_spooled_process_create_shifted_after_visible_parent_create( assert child["TimeCreated"] == parent_time + timedelta(milliseconds=1) emitter._cleanup_spool_unlocked() + def test_spooled_process_create_shifted_after_visible_logon(self, format_def, temp_output): + """Spooled Security 4688 fixups should preserve logon-before-process ordering.""" + emitter = WindowsEventEmitter(format_def, temp_output, buffer_size=10) + process_time = datetime(2024, 1, 15, 10, 0, 0, tzinfo=UTC) + logon_time = process_time + timedelta(milliseconds=1) + emitter._event_dicts = [ + { + "EventID": 4688, + "TimeCreated": process_time, + "Computer": "WIN-TEST-01.corp.local", + "SubjectLogonId": "0xabc123", + "NewProcessId": "0x1084", + }, + { + "EventID": 4624, + "TimeCreated": logon_time, + "Computer": "WIN-TEST-01.corp.local", + "TargetLogonId": "0xabc123", + "LogonType": 11, + }, + ] + + emitter._spool_event_dicts_unlocked() + emitter._shift_spooled_process_creates_after_logons_unlocked() + events = list(emitter._iter_spooled_events_unlocked()) + + process = next(event for event in events if event["EventID"] == 4688) + assert process["TimeCreated"] == logon_time + timedelta(milliseconds=1) + emitter._cleanup_spool_unlocked() + def test_windows_time_created_spreads_large_same_timestamp_clusters(self): """Dense same-host Windows/Sysmon timestamp ties should not compress into microseconds.""" base_time = datetime(2024, 1, 15, 10, 0, 0, tzinfo=UTC) diff --git a/tests/unit/test_eval_cross_source.py b/tests/unit/test_eval_cross_source.py index 6adba27f..6e86c4f3 100644 --- a/tests/unit/test_eval_cross_source.py +++ b/tests/unit/test_eval_cross_source.py @@ -705,6 +705,36 @@ def test_beacon_allow_proxy_matches_host_field(self): assert scorer._beacon_dst_matches(fields, "evil.example.com") assert not scorer._beacon_dst_matches(fields, "other.example.com") + def test_search_finds_explicit_proxy_beacon_by_hostname(self): + """Beacon evidence can be indexed by proxy host, not only by origin IPs.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + proxy_rec = _record( + "proxy_access", + {"host": "api.evil.example.com", "status_code": 200, "method": "GET"}, + ts=T0 + timedelta(seconds=10), + ) + event = ResolvedEvent( + index=0, + time=T0, + actor="attacker", + system="DC-01", + system_ip="10.10.2.10", + activity="allowed c2", + details={ + "dst_ip": "45.33.32.30", + "dst_port": 443, + "hostname": "api.evil.example.com", + }, + event_types=["beacon"], + ) + scorer = CrossSourceScorer() + index = scorer._build_host_time_index({"proxy_access": [proxy_rec]}) + + traces = scorer._search_for_event_indexed(event, "beacon", index) + + assert traces == [proxy_rec] + def test_beacon_allow_proxy_matches_ip_url_host(self): """_beacon_dst_matches should match IP found in the URL authority host.""" scorer = CrossSourceScorer() @@ -795,6 +825,623 @@ def test_beacon_deny_proxy_200_does_not_match_deny(self): scorer = CrossSourceScorer() assert not scorer._record_matches(proxy_rec, "proxy_access", event, "beacon") + def test_logoff_matcher_accepts_ecar_logout(self): + """eCAR USER_SESSION/LOGOUT rows should satisfy logoff event presence.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + logout_rec = _record( + "ecar", + { + "hostname": "APP-INT-01", + "object": "USER_SESSION", + "action": "LOGOUT", + "principal": "root", + }, + ts=T0, + ) + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="APP-INT-01", + system_ip="10.10.2.30", + activity="logout", + details={}, + event_types=["logoff"], + ) + scorer = CrossSourceScorer() + + assert scorer._record_matches(logout_rec, "ecar", event, "logoff") + + def test_logoff_matcher_rejects_wrong_windows_user(self): + """Windows logoff rows should not attach to another user's same-host session.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="svc_mhsync", + system="FILE-SRV-01", + system_ip="10.10.2.20", + activity="logout", + details={}, + event_types=["logoff"], + ) + scorer = CrossSourceScorer() + + assert not scorer._record_matches( + _record( + "windows_event_security", + { + "EventID": 4634, + "Computer": "FILE-SRV-01", + "TargetUserName": "sophia.martinez", + }, + ts=T0, + ), + "windows_event_security", + event, + "logoff", + ) + assert scorer._record_matches( + _record( + "windows_event_security", + { + "EventID": 4634, + "Computer": "FILE-SRV-01", + "TargetUserName": "svc_mhsync", + }, + ts=T0, + ), + "windows_event_security", + event, + "logoff", + ) + + def test_zeek_connection_match_requires_authored_source_ip(self): + """A same-destination Zeek row should not match if source_ip disagrees.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="attacker", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="SQL injection", + details={"source_ip": "185.70.41.45", "dst_ip": "10.10.3.10"}, + event_types=["connection"], + ) + scorer = CrossSourceScorer() + + assert not scorer._connection_matches_zeek( + {"id.orig_h": "10.10.3.20", "id.resp_h": "10.10.3.10"}, + event, + ) + assert scorer._connection_matches_zeek( + {"id.orig_h": "185.70.41.45", "id.resp_h": "10.10.3.10"}, + event, + ) + + def test_zeek_connection_match_prefers_explicit_tuple_over_story_host(self): + """Explicit source/destination/port should beat the storyline system IP fallback.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="APP-INT-01", + system_ip="10.10.2.30", + activity="failed ssh pivot", + details={ + "source_ip": "10.10.3.10", + "dst_ip": "10.10.3.20", + "dst_port": 22, + }, + event_types=["connection"], + ) + scorer = CrossSourceScorer() + + assert not scorer._connection_matches_zeek( + { + "id.orig_h": "10.10.2.30", + "id.orig_p": 8, + "id.resp_h": "10.10.3.20", + "id.resp_p": 0, + }, + event, + ) + assert not scorer._connection_matches_zeek( + { + "id.orig_h": "10.10.3.10", + "id.orig_p": 50000, + "id.resp_h": "10.10.3.20", + "id.resp_p": 8080, + }, + event, + ) + assert scorer._connection_matches_zeek( + { + "id.orig_h": "10.10.3.10", + "id.orig_p": 50000, + "id.resp_h": "10.10.3.20", + "id.resp_p": 22, + }, + event, + ) + + def test_ecar_connection_match_uses_directional_ip_roles(self): + """A reverse callback should not match an earlier inbound upload tuple.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="apache", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="upload and reverse shell", + details={"dst_ip": "45.33.32.30"}, + event_types=["connection"], + sub_details=[ + { + "source_ip": "185.70.41.45", + "dst_ip": "10.10.3.10", + "description": "web shell upload", + }, + {"dst_ip": "45.33.32.30", "description": "reverse shell callback"}, + ], + ) + + assert not CrossSourceScorer._connection_ip_matches( + {"src_ip": "10.10.3.10", "dst_ip": "185.70.41.45"}, + event, + ) + assert CrossSourceScorer._connection_ip_matches( + {"src_ip": "10.10.3.10", "dst_ip": "45.33.32.30"}, + event, + ) + + def test_ecar_connection_match_ignores_partial_source_only_detail_when_dst_exists(self): + """Mixed connection/session details should not match by source IP alone.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="APP-INT-01", + system_ip="10.10.2.30", + activity="ssh pivot", + details={"dst_ip": "10.10.3.20", "dst_port": 22, "source_ip": "10.10.3.10"}, + event_types=["connection", "ssh_session"], + sub_details=[ + {"dst_ip": "10.10.3.20", "dst_port": 22, "source_ip": "10.10.3.10"}, + {"source_ip": "10.10.3.10"}, + ], + ) + + assert not CrossSourceScorer._connection_ip_matches( + {"src_ip": "10.10.3.10", "dst_ip": "10.10.3.20", "dst_port": 8080}, + event, + ) + assert CrossSourceScorer._connection_ip_matches( + {"src_ip": "10.10.3.10", "dst_ip": "10.10.3.20", "dst_port": 22}, + event, + ) + + def test_ssh_session_match_requires_actor_and_source_for_accept_line(self): + """SSH session traces should not attach unrelated same-host logins.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="APP-INT-01", + system_ip="10.10.2.30", + activity="ssh pivot", + details={"source_ip": "10.10.3.10"}, + event_types=["ssh_session"], + ) + scorer = CrossSourceScorer() + + assert not scorer._record_matches( + _record( + "syslog", + { + "hostname": "APP-INT-01", + "message": "Accepted password for aisha.johnson from 10.10.1.35 port 58516 ssh2", + }, + ts=T0, + ), + "syslog", + event, + "ssh_session", + ) + assert scorer._record_matches( + _record( + "syslog", + { + "hostname": "APP-INT-01", + "message": "Accepted password for root from 10.10.3.10 port 36592 ssh2", + }, + ts=T0, + ), + "syslog", + event, + "ssh_session", + ) + + def test_failed_logon_indicator_uses_target_username(self): + """Failed-logon rows should be checked against the target account, not actor.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="LT-MRIVERA-02", + system_ip="10.10.1.99", + activity="wrong password fumble", + details={"target_username": "aisha.johnson"}, + event_types=["failed_logon"], + ) + + assert CrossSourceScorer._username_indicator_matches("aisha.johnson", event) + assert not CrossSourceScorer._username_indicator_matches("root", event) + + def test_ipv4_mapped_source_indicator_matches_plain_ipv4(self): + """Windows IPv4-mapped addresses should not create source mismatch noise.""" + assert CrossSourceScorer._ip_matches("::ffff:10.10.1.99", "10.10.1.99") + + def test_group_member_indicator_uses_member_name_not_group_target(self): + """4728 TargetUserName is the group, while MemberName carries the account.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="SYSTEM", + system="DC-01", + system_ip="10.10.2.10", + activity="add backdoor account", + details={"member_name": "svc_mhsync", "group_name": "Domain Admins"}, + event_types=["group_member_added"], + ) + trace = _record( + "windows_event_security", + { + "EventID": 4728, + "Computer": "DC-01", + "TargetUserName": "Domain Admins", + "MemberName": "CN=svc_mhsync,CN=Users,DC=corp,DC=local", + }, + ts=T0, + ) + + assert CausalityScorer()._check_indicators(event, trace)[0] == ("username", True) + + def test_web_scan_matcher_requires_nikto_profile_evidence(self): + """Web scan traces should not attach generic favicon/browser requests.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="nikto web scan", + details={ + "source_ip": "185.70.41.45", + "dst_ip": "10.10.3.10", + "dst_port": 443, + "preset": "nikto", + }, + event_types=["web_scan"], + ) + scorer = CrossSourceScorer() + + assert not scorer._record_matches( + ParsedRecord( + source_format="web_access", + raw="test", + fields={ + "client_ip": "185.70.41.45", + "user_agent": "Mozilla/5.0 Chrome/121.0", + }, + timestamp=T0, + source_host="WEB-EXT-01", + ), + "web_access", + event, + "web_scan", + ) + assert scorer._record_matches( + ParsedRecord( + source_format="web_access", + raw="test", + fields={ + "client_ip": "185.70.41.45", + "user_agent": "Mozilla/5.00 (Nikto/2.1.6)", + }, + timestamp=T0, + source_host="WEB-EXT-01", + ), + "web_access", + event, + "web_scan", + ) + assert not scorer._record_matches( + _record( + "zeek_conn", + { + "id.orig_h": "185.70.41.45", + "id.resp_h": "10.10.3.10", + "id.resp_p": 443, + "conn_state": "S0", + }, + ts=T0, + ), + "zeek_conn", + event, + "web_scan", + ) + assert not scorer._record_matches( + _record( + "zeek_conn", + { + "id.orig_h": "185.70.41.45", + "id.resp_h": "10.10.3.10", + "id.resp_p": 443, + "conn_state": "RSTR", + }, + ts=T0, + ), + "zeek_conn", + event, + "web_scan", + ) + + def test_process_matcher_requires_storyline_process_detail(self): + """Generic same-host process creates should not attach to precise process steps.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="SYSTEM", + system="DC-01", + system_ip="10.10.2.10", + activity="clear security log", + details={ + "process_name": r"C:\Windows\System32\wevtutil.exe", + "command_line": "wevtutil cl Security", + }, + event_types=["process"], + ) + scorer = CrossSourceScorer() + + assert not scorer._record_matches( + _record( + "windows_event_security", + { + "EventID": 4688, + "Computer": "DC-01", + "SubjectUserName": "SYSTEM", + "NewProcessName": r"C:\Windows\System32\RuntimeBroker.exe", + "CommandLine": "RuntimeBroker.exe -Embedding", + }, + ts=T0, + ), + "windows_event_security", + event, + "process", + ) + assert scorer._record_matches( + _record( + "windows_event_security", + { + "EventID": 4688, + "Computer": "DC-01", + "SubjectUserName": "SYSTEM", + "NewProcessName": r"C:\Windows\System32\wevtutil.exe", + "CommandLine": "wevtutil cl Security", + }, + ts=T0, + ), + "windows_event_security", + event, + "process", + ) + + def test_process_indicator_uses_actor_not_target_account(self): + """Process traces in account-management steps should validate the actor principal.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="SYSTEM", + system="DC-01", + system_ip="10.10.2.10", + activity="create backdoor account", + details={"target_username": "svc_mhsync"}, + event_types=["process", "account_created"], + ) + trace = _record( + "ecar", + { + "hostname": "DC-01", + "object": "PROCESS", + "action": "CREATE", + "principal": "SYSTEM", + }, + ts=T0, + ) + + assert CausalityScorer()._check_indicators(event, trace)[0] == ("username", True) + + def test_beacon_proxy_matcher_requires_expected_source_host(self): + """Same C2 hostname from another host should not attach to this beacon step.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="root", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="beacon", + details={"dst_ip": "45.33.32.30", "hostname": "api.example.net", "dst_port": 443}, + event_types=["beacon"], + ) + scorer = CrossSourceScorer() + scorer._proxy_ips = {"10.10.3.20"} + + assert not scorer._record_matches( + _record( + "zeek_http", + { + "id.orig_h": "10.10.2.10", + "id.resp_h": "10.10.3.20", + "host": "api.example.net", + "status_code": 200, + }, + ts=T0, + ), + "zeek_http", + event, + "beacon", + ) + assert scorer._record_matches( + _record( + "zeek_http", + { + "id.orig_h": "10.10.3.10", + "id.resp_h": "10.10.3.20", + "host": "api.example.net", + "status_code": 200, + }, + ts=T0, + ), + "zeek_http", + event, + "beacon", + ) + + def test_best_sub_detail_prefers_directional_ip_roles(self): + """Indicator checks should choose the reverse-shell detail for callback traces.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="apache", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="upload and reverse shell", + details={"dst_ip": "45.33.32.30"}, + event_types=["connection"], + sub_details=[ + {"source_ip": "185.70.41.45", "dst_ip": "10.10.3.10"}, + {"dst_ip": "45.33.32.30"}, + ], + ) + + best = CrossSourceScorer._best_sub_detail( + event, + {"src_ip": "10.10.3.10", "dst_ip": "45.33.32.30"}, + ) + + assert best == {"dst_ip": "45.33.32.30"} + + def test_raw_matcher_requires_target_format_and_fields(self): + """Raw storyline rows should not match every record in the time window.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + event = ResolvedEvent( + index=0, + time=T0, + actor="apache", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="raw apache error", + details={ + "target_format": "syslog", + "fields": { + "hostname": "WEB-EXT-01", + "app_name": "apache2", + "message": "SQLSTATE[42000]: syntax error near UNION SELECT", + }, + }, + event_types=["raw"], + ) + scorer = CrossSourceScorer() + + assert not scorer._record_matches( + _record("ecar", {"hostname": "WEB-EXT-01", "object": "FLOW"}, ts=T0), + "ecar", + event, + "raw", + ) + assert not scorer._record_matches( + _record("syslog", {"hostname": "WEB-EXT-01", "app_name": "sshd"}, ts=T0), + "syslog", + event, + "raw", + ) + assert scorer._record_matches( + _record( + "syslog", + { + "hostname": "WEB-EXT-01", + "app_name": "apache2", + "message": "PHP message: SQLSTATE[42000]: syntax error near UNION SELECT", + }, + ts=T0, + ), + "syslog", + event, + "raw", + ) + + def test_http_connection_search_allows_modest_forward_trace_drift(self): + """Web exploit steps may render exact network evidence a few minutes later.""" + from evidenceforge.evaluation.storyline import ResolvedEvent + + zeek_rec = _record( + "zeek_conn", + { + "id.orig_h": "185.70.41.45", + "id.resp_h": "10.10.3.10", + "id.resp_p": 443, + }, + ts=T0 + timedelta(minutes=5), + ) + event = ResolvedEvent( + index=0, + time=T0, + actor="apache", + system="WEB-EXT-01", + system_ip="10.10.3.10", + activity="SQL injection", + details={ + "source_ip": "185.70.41.45", + "dst_ip": "10.10.3.10", + "dst_port": 443, + "method": "POST", + "uri": "/ehr/patient/search", + }, + event_types=["connection"], + ) + scorer = CrossSourceScorer() + index = scorer._build_host_time_index({"zeek_conn": [zeek_rec]}) + + assert scorer._search_for_event_indexed(event, "connection", index) == [zeek_rec] + class TestPortScanSourceIp: """port_scan events with external source_ip must use that IP for matching.""" diff --git a/tests/unit/test_eval_signal_integrity.py b/tests/unit/test_eval_signal_integrity.py index 57a8a237..5a993b2a 100644 --- a/tests/unit/test_eval_signal_integrity.py +++ b/tests/unit/test_eval_signal_integrity.py @@ -210,6 +210,7 @@ def test_all_events_found(self): "EventID": 4688, "Computer": "WS-01", "SubjectUserName": "jsmith", + "NewProcessName": "C:\\Windows\\System32\\cmd.exe", }, ts=T0 + timedelta(hours=2), ), @@ -459,6 +460,7 @@ def test_same_actor_is_linkable(self): "EventID": 4688, "Computer": "WS-01", "SubjectUserName": "jsmith", + "NewProcessName": "C:\\Windows\\System32\\cmd.exe", }, ts=T0 + timedelta(hours=2), ), @@ -540,6 +542,7 @@ def test_correct_order(self): "EventID": 4688, "Computer": "WS-01", "SubjectUserName": "jsmith", + "NewProcessName": "C:\\Windows\\System32\\cmd.exe", }, ts=T0 + timedelta(hours=2), ), @@ -550,6 +553,57 @@ def test_correct_order(self): ti = next(s for s in result.sub_scores if s.key == "temporal_integrity") assert ti.score == 100.0 + def test_delayed_previous_trace_does_not_create_false_order_failure(self): + """Source delay on an earlier step should not make overlapping later evidence fail.""" + scenario = _scenario_with_storyline( + [ + { + "id": "evt-test-15a", + "time": "+1h", + "actor": "jsmith", + "system": "WS-01", + "activity": "Login to workstation", + "events": [{"type": "logon"}], + }, + { + "id": "evt-test-15b", + "time": "+1h1m", + "actor": "jsmith", + "system": "WS-01", + "activity": "Execute command", + "events": [{"type": "process", "process_name": "cmd.exe"}], + }, + ] + ) + records = { + "windows_event_security": [ + _record( + "windows_event_security", + { + "EventID": 4624, + "TargetUserName": "jsmith", + "Computer": "WS-01", + }, + ts=T0 + timedelta(hours=1, seconds=90), + ), + _record( + "windows_event_security", + { + "EventID": 4688, + "Computer": "WS-01", + "SubjectUserName": "jsmith", + "NewProcessName": "C:\\Windows\\System32\\cmd.exe", + }, + ts=T0 + timedelta(hours=1, minutes=1, seconds=10), + ), + ], + } + + result = SignalIntegrityScorer().score(records, scenario) + + ti = next(s for s in result.sub_scores if s.key == "temporal_integrity") + assert ti.score == 100.0 + def test_out_of_tolerance(self): """Trace timestamp far from expected time should fail.""" scenario = _scenario_with_storyline( diff --git a/tests/unit/test_eval_temporal.py b/tests/unit/test_eval_temporal.py index 417391a2..f72335bf 100644 --- a/tests/unit/test_eval_temporal.py +++ b/tests/unit/test_eval_temporal.py @@ -366,6 +366,36 @@ def test_dns_weak_rule_skips_later_matching_answer(self): result = scorer._score_causal_ordering(records, scenario) assert result.score == 100.0 + def test_kerberos_domain_logon_weak_rule_skips_later_matching_tgt(self): + """A later user TGT on a DC is not proof a target-host 4624 is inverted.""" + base = T0 + self._AFTER_GRACE + records = { + "windows_event_security": [ + _record( + "windows_event_security", + { + "EventID": 4624, + "Computer": "WS-01", + "TargetUserName": "jsmith", + }, + ts=base, + ), + _record( + "windows_event_security", + { + "EventID": 4768, + "Computer": "DC-01", + "TargetUserName": "jsmith", + }, + ts=base + timedelta(minutes=5), + ), + ] + } + scenario = _make_scenario() + scorer = CausalityScorer() + result = scorer._score_causal_ordering(records, scenario) + assert result.score == 100.0 + def test_causal_ordering_counts_failures_after_sample_cap(self): """Failures beyond the diagnostic sample cap still count against the score.""" base = T0 + self._AFTER_GRACE diff --git a/tests/unit/test_zeek_ssl.py b/tests/unit/test_zeek_ssl.py index 762c5297..3bde0763 100644 --- a/tests/unit/test_zeek_ssl.py +++ b/tests/unit/test_zeek_ssl.py @@ -629,6 +629,8 @@ def test_tls_analyzer_logs_have_stage_timestamp_offsets(self): assert ssl_ts < x509_ts <= conn_ts + ((ssl_window.max_ms + x509_window.max_ms) / 1000) assert x509_ts < ocsp_ts < conn_ts + 6.1 assert ocsp_row["id"] == "Focsp12345678901" + assert "revoketime" not in ocsp_row + assert "revokereason" not in ocsp_row assert "uid" not in ocsp_row assert "id.orig_h" not in ocsp_row assert "id.resp_h" not in ocsp_row