diff --git a/aw_query/functions.py b/aw_query/functions.py index b02ce5e..1421b08 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -23,6 +23,7 @@ flood, limit_events, merge_events_by_keys, + merge_subwatcher_fields, period_union, simplify_string, sort_by_duration, @@ -227,6 +228,17 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]: return merge_events_by_keys(events, keys) +@q2_function(merge_subwatcher_fields) +@q2_typecheck +def q2_merge_subwatcher_fields( + base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins" +) -> List[Event]: + try: + return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict) + except ValueError as exc: + raise QueryFunctionException(str(exc)) from None + + @q2_function(chunk_events_by_key) @q2_typecheck def q2_chunk_events_by_key(events: list, key: str) -> List[Event]: diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 1297e42..d17e0fe 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -11,6 +11,7 @@ limit_events, ) from .split_url_events import split_url_events +from .merge_subwatcher_fields import merge_subwatcher_fields from .simplify import simplify_string from .flood import flood from .classify import categorize, tag, Rule @@ -33,6 +34,7 @@ "heartbeat_reduce", "heartbeat_merge", "merge_events_by_keys", + "merge_subwatcher_fields", "chunk_events_by_key", "limit_events", "filter_keyvals", diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py new file mode 100644 index 0000000..2f86040 --- /dev/null +++ b/aw_transform/merge_subwatcher_fields.py @@ -0,0 +1,150 @@ +import logging +from copy import deepcopy +from typing import List, Optional, Tuple + +from aw_core.models import Event +from timeslot import Timeslot + +from .filter_period_intersect import _get_event_period + +logger = logging.getLogger(__name__) + + +def merge_subwatcher_fields( + base_events: List[Event], + subwatcher_events: List[Event], + keys: List[str], + conflict: str = "base_wins", +) -> List[Event]: + """ + Split each event in *base_events* on overlapping subwatcher boundaries and + copy the named *keys* from the matching subwatcher event into each segment's + ``data`` dict. + + Unlike the ``concat`` workaround, this does not fabricate extra duration. + App/title/duration aggregations stay correct because the output still covers + exactly the same total time as *base_events*; only the segmentation changes + where subwatcher fields actually change. + + This is the backend primitive that lets every client (webui, native UIs, + exporters) categorize by subwatcher fields (browser ``url``/``$domain``; + editor ``project``/``file``/``language``) without bespoke per-watcher + client-side code. + + Args: + base_events: The canonical window/afk-filtered stream to enrich. + subwatcher_events: Events from a subwatcher bucket (e.g. aw-watcher-vim, + aw-watcher-web). Should already be clipped via + ``filter_period_intersect`` before passing here. + keys: Which keys to copy from the subwatcher event into the base event. + Keys already present in the base event are left untouched when + ``conflict="base_wins"`` (default). + conflict: ``"base_wins"`` (default) — base event's existing keys are + never overwritten; subwatcher fields are purely additive. + ``"sub_wins"`` — subwatcher fields overwrite base fields. + + Returns: + A new list of base event segments with subwatcher fields injected. + Events in *base_events* that have no overlapping subwatcher event are + returned with their original data unchanged. + + Example:: + + window_events = query_bucket(bid_window) + editor_events = flood(query_bucket(bid_editor)) + editor_events = filter_period_intersect(editor_events, window_events) + window_events = merge_subwatcher_fields( + window_events, editor_events, ["project", "file", "language"] + ) + # Now categorize(window_events, ...) can match on "project"/"file" + + Note on overlap: + Base events are split at the clipped subwatcher boundaries so each + output segment only carries the subwatcher fields that were actually + present during that slice of time. If multiple subwatcher events cover + the same slice, the most recent one wins so a later transition does not + get smeared backward by an older, longer pulse. + """ + if conflict not in ("base_wins", "sub_wins"): + raise ValueError( + f"conflict must be 'base_wins' or 'sub_wins', got {conflict!r}" + ) + if not subwatcher_events or not keys: + return [deepcopy(e) for e in base_events] + + # Build a sorted copy so we can do a linear scan + sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp) + + result: List[Event] = [] + for base in base_events: + base_period = _get_event_period(base) + overlapping: List[Tuple[Event, Timeslot]] = [] + boundaries = {base_period.start, base_period.end} + + for sub in sub_sorted: + sub_period = _get_event_period(sub) + # Once sub starts after base ends we can stop + if sub_period.start >= base_period.end: + break + # Skip sub events that end before base starts + if sub_period.end <= base_period.start: + continue + ip = base_period.intersection(sub_period) + if ip: + overlapping.append((sub, sub_period)) + boundaries.add(ip.start) + boundaries.add(ip.end) + + if not overlapping: + result.append(deepcopy(base)) + continue + + boundary_points = sorted(boundaries) + base_segments: List[Event] = [] + for start, end in zip(boundary_points, boundary_points[1:]): + segment_period = Timeslot(start, end) + best_sub: Optional[Event] = None + best_sub_period: Optional[Timeslot] = None + + for sub, sub_period in overlapping: + if not segment_period.intersection(sub_period): + continue + + # Later subwatcher events should supersede older overlapping + # pulses on the shared slice instead of letting stale data linger. + if ( + best_sub is None + or sub.timestamp > best_sub.timestamp + or ( + sub.timestamp == best_sub.timestamp + and best_sub_period is not None + and sub_period.end > best_sub_period.end + ) + ): + best_sub = sub + best_sub_period = sub_period + + enriched = deepcopy(base) + enriched.timestamp = start + enriched.duration = end - start + + if best_sub is not None: + for key in keys: + if key in best_sub.data: + if conflict == "base_wins" and key in enriched.data: + continue + enriched.data[key] = deepcopy(best_sub.data[key]) + + if ( + base_segments + and base_segments[-1].timestamp + base_segments[-1].duration + == enriched.timestamp + and base_segments[-1].data == enriched.data + ): + base_segments[-1].duration += enriched.duration + else: + base_segments.append(enriched) + + result.extend(base_segments) + + return result diff --git a/tests/test_query2.py b/tests/test_query2.py index db413e2..c3c7de2 100644 --- a/tests/test_query2.py +++ b/tests/test_query2.py @@ -323,6 +323,21 @@ def test_query2_function_invalid_argument_count(): query(qname, example_query, starttime, endtime, ds) +def test_query2_merge_subwatcher_fields_invalid_conflict(): + ds = mock_ds + qname = "asd" + starttime = iso8601.parse_date("1970-01-01") + endtime = iso8601.parse_date("1970-01-02") + example_query = """ + base = []; + sub = []; + RETURN = merge_subwatcher_fields(base, sub, ["project"], "invalid"); + """ + + with pytest.raises(QueryFunctionException, match="conflict must be"): + query(qname, example_query, starttime, endtime, ds) + + @pytest.mark.parametrize("datastore", param_datastore_objects()) def test_query2_function_in_function(datastore): qname = "asd" diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 3aa119c..df627db 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,11 +1,13 @@ from pprint import pprint from datetime import datetime, timedelta, timezone +import pytest from aw_core.models import Event from aw_transform import ( filter_period_intersect, filter_keyvals_regex, filter_keyvals, + merge_subwatcher_fields, period_union, sort_by_timestamp, sort_by_duration, @@ -469,3 +471,215 @@ def test_union_no_overlap(): dur = sum((e.duration for e in events_union), timedelta(0)) assert dur == timedelta(hours=5, minutes=0) assert sorted(events_union, key=lambda e: e.timestamp) + + +def test_merge_subwatcher_fields_basic(): + """Fully overlapping subwatcher fields are injected without changing duration.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [ + Event( + timestamp=now, + duration=td1h, + data={"app": "vim", "title": "file.py"}, + ) + ] + sub = [ + Event( + timestamp=now, + duration=td1h, + data={"project": "myproject", "file": "file.py", "language": "python"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project", "file", "language"]) + + assert len(result) == 1 + # Original base fields preserved + assert result[0].data["app"] == "vim" + assert result[0].data["title"] == "file.py" + # Subwatcher fields injected + assert result[0].data["project"] == "myproject" + assert result[0].data["language"] == "python" + # Exact overlap means no extra segmentation + assert result[0].timestamp == now + assert result[0].duration == td1h + + +def test_merge_subwatcher_fields_partial_overlap_splits_base(): + """Partial overlap only enriches the covered slice of the base event.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td15m = timedelta(minutes=15) + td30m = timedelta(minutes=30) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [ + Event( + timestamp=now + td15m, + duration=td30m, + data={"project": "myproject"}, + ) + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 3 + assert [event.duration for event in result] == [td15m, td30m, td15m] + assert [event.timestamp for event in result] == [ + now, + now + td15m, + now + td15m + td30m, + ] + assert "project" not in result[0].data + assert result[1].data["project"] == "myproject" + assert "project" not in result[2].data + assert sum_durations(result) == td1h + + +def test_merge_subwatcher_fields_overlapping_subwatchers_prefer_latest_start(): + """Newer overlapping subwatcher events take over from their own start time.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td20m = timedelta(minutes=20) + td40m = timedelta(minutes=40) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [ + Event(timestamp=now, duration=td40m, data={"project": "alpha"}), + Event(timestamp=now + td20m, duration=td40m, data={"project": "beta"}), + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 2 + assert [event.timestamp for event in result] == [now, now + td20m] + assert [event.duration for event in result] == [td20m, td40m] + assert [event.data.get("project") for event in result] == ["alpha", "beta"] + + by_project = { + event.data.get("project"): event.duration + for event in merge_events_by_keys(result, ["project"]) + } + assert by_project["alpha"] == td20m + assert by_project["beta"] == td40m + assert sum_durations(result) == td1h + + +def test_merge_subwatcher_fields_no_overlap(): + """Base events with no overlapping subwatcher event are returned unchanged.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + # Subwatcher event is entirely after the base event + sub = [ + Event( + timestamp=now + 2 * td1h, + duration=td1h, + data={"project": "other"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 1 + assert "project" not in result[0].data + assert result[0].data["app"] == "vim" + + +def test_merge_subwatcher_fields_base_wins_conflict(): + """With conflict='base_wins' (default), existing base keys are not overwritten.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py", "project": "p"})] + + result = merge_subwatcher_fields( + base, sub, ["file", "project"], conflict="base_wins" + ) + # base's "file" must not be overwritten + assert result[0].data["file"] == "base.py" + # "project" not in base → injected from sub + assert result[0].data["project"] == "p" + + +def test_merge_subwatcher_fields_sub_wins_conflict(): + """With conflict='sub_wins', subwatcher fields overwrite base fields.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py"})] + + result = merge_subwatcher_fields(base, sub, ["file"], conflict="sub_wins") + assert result[0].data["file"] == "sub.py" + + +def test_merge_subwatcher_fields_multiple_subsegments_preserve_duration(): + """Repeated subwatcher values aggregate to their true covered duration.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td15m = timedelta(minutes=15) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [ + Event(timestamp=now, duration=td15m, data={"project": "alpha"}), + Event(timestamp=now + td15m, duration=td15m, data={"project": "beta"}), + Event( + timestamp=now + 2 * td15m, + duration=td15m, + data={"project": "alpha"}, + ), + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + by_project = { + event.data.get("project"): event.duration + for event in merge_events_by_keys(result, ["project"]) + } + by_app = { + event.data.get("app"): event.duration + for event in merge_events_by_keys(result, ["app"]) + } + + assert by_project["alpha"] == 2 * td15m + assert by_project["beta"] == td15m + assert by_project[None] == td15m + assert by_app["vim"] == td1h + assert sum_durations(result) == td1h + + +def test_merge_subwatcher_fields_empty_inputs(): + """Empty sub or keys returns a defensive copy of base (not the same list).""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + + # Empty subwatcher list — returns a new list, data unchanged + result = merge_subwatcher_fields(base, [], ["project"]) + assert result[0].data == {"app": "vim"} + assert result is not base + + # Empty keys list + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + result = merge_subwatcher_fields(base, sub, []) + assert "project" not in result[0].data + assert result is not base + + # Both empty + result = merge_subwatcher_fields(base, [], []) + assert result[0].data == {"app": "vim"} + assert result is not base + + +def test_merge_subwatcher_fields_invalid_conflict(): + """Invalid conflict value raises ValueError immediately.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + + with pytest.raises(ValueError, match="conflict must be"): + merge_subwatcher_fields(base, sub, ["project"], conflict="invalid")