From 662aba1edcb4b283d4a47ffa73a9c3cd4be7e62f Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 31 May 2026 18:07:39 +0000 Subject: [PATCH 1/6] feat(transform): add merge_subwatcher_fields for editor/browser field enrichment Adds a new backend transform `merge_subwatcher_fields(base, sub, keys)` that enriches base window events with extra fields from a subwatcher (editor, browser) by finding the longest-overlapping subwatcher event per base event and copying the named keys into the base event's data dict. Unlike the concat workaround used in aw-webui#851, this approach: - Preserves timestamps, durations, and event count exactly (no phantom events) - Keeps app/title/duration aggregations correct by construction - Lives in the backend so every client (webui, native UIs, exporters) benefits - Handles both editor (project/file/language) and browser (url/$domain) fields through one mechanism, closing both #1305 and #352 Exposes as q2_merge_subwatcher_fields in the query2 function registry. Adds 6 tests covering: basic enrichment, no-overlap passthrough, base_wins and sub_wins conflict modes, attach-longest N:1 overlap selection, and empty inputs. Closes ActivityWatch/activitywatch#1305, ActivityWatch/activitywatch#352 --- aw_query/functions.py | 9 ++ aw_transform/__init__.py | 2 + aw_transform/merge_subwatcher_fields.py | 101 +++++++++++++++++++ tests/test_transforms.py | 125 ++++++++++++++++++++++++ 4 files changed, 237 insertions(+) create mode 100644 aw_transform/merge_subwatcher_fields.py diff --git a/aw_query/functions.py b/aw_query/functions.py index b02ce5e7..74f92530 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -23,6 +23,7 @@ flood, limit_events, merge_events_by_keys, + merge_subwatcher_fields, period_union, simplify_string, sort_by_duration, @@ -227,6 +228,14 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]: return merge_events_by_keys(events, keys) +@q2_function(merge_subwatcher_fields) +@q2_typecheck +def q2_merge_subwatcher_fields( + base_events: list, subwatcher_events: list, keys: list +) -> List[Event]: + return merge_subwatcher_fields(base_events, subwatcher_events, keys) + + @q2_function(chunk_events_by_key) @q2_typecheck def q2_chunk_events_by_key(events: list, key: str) -> List[Event]: diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 1297e42c..d17e0fed 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -11,6 +11,7 @@ limit_events, ) from .split_url_events import split_url_events +from .merge_subwatcher_fields import merge_subwatcher_fields from .simplify import simplify_string from .flood import flood from .classify import categorize, tag, Rule @@ -33,6 +34,7 @@ "heartbeat_reduce", "heartbeat_merge", "merge_events_by_keys", + "merge_subwatcher_fields", "chunk_events_by_key", "limit_events", "filter_keyvals", diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py new file mode 100644 index 00000000..0fee881b --- /dev/null +++ b/aw_transform/merge_subwatcher_fields.py @@ -0,0 +1,101 @@ +import logging +from copy import deepcopy +from typing import List, Optional + +from aw_core.models import Event + +from .filter_period_intersect import _get_event_period + +logger = logging.getLogger(__name__) + + +def merge_subwatcher_fields( + base_events: List[Event], + subwatcher_events: List[Event], + keys: List[str], + conflict: str = "base_wins", +) -> List[Event]: + """ + For each event in *base_events*, find the longest-overlapping event in + *subwatcher_events* and copy the named *keys* from that subwatcher event + into the base event's ``data`` dict. + + Timestamps, durations, and event count of *base_events* are **unchanged** + — no phantom events are created. This makes duration/app/title aggregations + stay correct, unlike the ``concat`` workaround. + + This is the backend primitive that lets every client (webui, native UIs, + exporters) categorize by subwatcher fields (browser ``url``/``$domain``; + editor ``project``/``file``/``language``) without bespoke per-watcher + client-side code. + + Args: + base_events: The canonical window/afk-filtered stream to enrich. + subwatcher_events: Events from a subwatcher bucket (e.g. aw-watcher-vim, + aw-watcher-web). Should already be clipped via + ``filter_period_intersect`` before passing here. + keys: Which keys to copy from the subwatcher event into the base event. + Keys already present in the base event are left untouched when + ``conflict="base_wins"`` (default). + conflict: ``"base_wins"`` (default) — base event's existing keys are + never overwritten; subwatcher fields are purely additive. + ``"sub_wins"`` — subwatcher fields overwrite base fields. + + Returns: + A new list of base events with subwatcher fields injected. Events in + *base_events* that have no overlapping subwatcher event are returned + with their original data unchanged. + + Example:: + + window_events = query_bucket(bid_window) + editor_events = flood(query_bucket(bid_editor)) + editor_events = filter_period_intersect(editor_events, window_events) + window_events = merge_subwatcher_fields( + window_events, editor_events, ["project", "file", "language"] + ) + # Now categorize(window_events, ...) can match on "project"/"file" + + Note on N:1 overlap: + When multiple subwatcher events overlap a single base event, the one + with the **longest overlap duration** is used (attach-longest strategy). + This matches heartbeat granularity and avoids splitting base events. + """ + if not subwatcher_events or not keys: + return base_events + + # Build a sorted copy so we can do a linear scan + sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp) + + result: List[Event] = [] + for base in base_events: + base_period = _get_event_period(base) + best_sub: Optional[Event] = None + best_overlap_secs: float = 0.0 + + for sub in sub_sorted: + sub_period = _get_event_period(sub) + # Once sub starts after base ends we can stop + if sub_period.start >= base_period.end: + break + # Skip sub events that end before base starts + if sub_period.end <= base_period.start: + continue + ip = base_period.intersection(sub_period) + if ip: + overlap_secs = ip.duration.total_seconds() + if overlap_secs > best_overlap_secs: + best_overlap_secs = overlap_secs + best_sub = sub + + enriched = deepcopy(base) + if best_sub is not None: + for key in keys: + if key in best_sub.data: + if conflict == "base_wins" and key in enriched.data: + pass # base keeps its value + else: + enriched.data[key] = best_sub.data[key] + result.append(enriched) + + return result diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 3aa119ca..513ad2ab 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -6,6 +6,7 @@ filter_period_intersect, filter_keyvals_regex, filter_keyvals, + merge_subwatcher_fields, period_union, sort_by_timestamp, sort_by_duration, @@ -469,3 +470,127 @@ def test_union_no_overlap(): dur = sum((e.duration for e in events_union), timedelta(0)) assert dur == timedelta(hours=5, minutes=0) assert sorted(events_union, key=lambda e: e.timestamp) + + +def test_merge_subwatcher_fields_basic(): + """Subwatcher fields are injected into overlapping base events.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [ + Event( + timestamp=now, + duration=td1h, + data={"app": "vim", "title": "file.py"}, + ) + ] + sub = [ + Event( + timestamp=now, + duration=td1h, + data={"project": "myproject", "file": "file.py", "language": "python"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project", "file", "language"]) + + assert len(result) == 1 + # Original base fields preserved + assert result[0].data["app"] == "vim" + assert result[0].data["title"] == "file.py" + # Subwatcher fields injected + assert result[0].data["project"] == "myproject" + assert result[0].data["language"] == "python" + # Timestamps and durations unchanged + assert result[0].timestamp == now + assert result[0].duration == td1h + + +def test_merge_subwatcher_fields_no_overlap(): + """Base events with no overlapping subwatcher event are returned unchanged.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + # Subwatcher event is entirely after the base event + sub = [ + Event( + timestamp=now + 2 * td1h, + duration=td1h, + data={"project": "other"}, + ) + ] + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 1 + assert "project" not in result[0].data + assert result[0].data["app"] == "vim" + + +def test_merge_subwatcher_fields_base_wins_conflict(): + """With conflict='base_wins' (default), existing base keys are not overwritten.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py", "project": "p"})] + + result = merge_subwatcher_fields( + base, sub, ["file", "project"], conflict="base_wins" + ) + # base's "file" must not be overwritten + assert result[0].data["file"] == "base.py" + # "project" not in base → injected from sub + assert result[0].data["project"] == "p" + + +def test_merge_subwatcher_fields_sub_wins_conflict(): + """With conflict='sub_wins', subwatcher fields overwrite base fields.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim", "file": "base.py"})] + sub = [Event(timestamp=now, duration=td1h, data={"file": "sub.py"})] + + result = merge_subwatcher_fields(base, sub, ["file"], conflict="sub_wins") + assert result[0].data["file"] == "sub.py" + + +def test_merge_subwatcher_fields_attach_longest(): + """When multiple subwatcher events overlap a base event, the longest overlap wins.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td30m = timedelta(minutes=30) + td1h = timedelta(hours=1) + + # Base event: 12:00 – 13:00 + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + # Short overlap: 12:00 – 12:30 (30 min) + sub_short = Event(timestamp=now, duration=td30m, data={"project": "short"}) + # Long overlap: 12:30 – 13:00 (30 min) — same overlap here, but added first + # Make one clearly longer: 11:45 – 13:00 (75 min overlap into base) + sub_long = Event( + timestamp=now - timedelta(minutes=15), + duration=td1h + timedelta(minutes=15), + data={"project": "long"}, + ) + result = merge_subwatcher_fields(base, [sub_short, sub_long], ["project"]) + assert result[0].data["project"] == "long" + + +def test_merge_subwatcher_fields_empty_inputs(): + """Empty sub or keys returns base unchanged.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + + # Empty subwatcher list + result = merge_subwatcher_fields(base, [], ["project"]) + assert result[0].data == {"app": "vim"} + + # Empty keys list + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + result = merge_subwatcher_fields(base, sub, []) + assert "project" not in result[0].data + + # Both empty + result = merge_subwatcher_fields(base, [], []) + assert result[0].data == {"app": "vim"} From e78081b2f2962e86ba79a14f4d6de36281653d27 Mon Sep 17 00:00:00 2001 From: Bob Date: Sun, 31 May 2026 18:18:03 +0000 Subject: [PATCH 2/6] fix(transform): address Greptile P1/P2 findings on merge_subwatcher_fields - P1: early return now returns deepcopy list instead of original reference, matching the contract 'Returns: A new list of base events' - P2a: validate conflict param before early return; invalid values raise ValueError rather than silently behaving as 'sub_wins' - P2b: forward conflict param through q2_merge_subwatcher_fields wrapper so query2 callers can opt into 'sub_wins' instead of being locked to default - test: extend empty-inputs test to assert result is not the same list object - test: add test_merge_subwatcher_fields_invalid_conflict for P2a validation --- aw_query/functions.py | 4 ++-- aw_transform/merge_subwatcher_fields.py | 6 +++++- tests/test_transforms.py | 19 +++++++++++++++++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/aw_query/functions.py b/aw_query/functions.py index 74f92530..a87d3324 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -231,9 +231,9 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]: @q2_function(merge_subwatcher_fields) @q2_typecheck def q2_merge_subwatcher_fields( - base_events: list, subwatcher_events: list, keys: list + base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins" ) -> List[Event]: - return merge_subwatcher_fields(base_events, subwatcher_events, keys) + return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict) @q2_function(chunk_events_by_key) diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py index 0fee881b..6cb7f3fe 100644 --- a/aw_transform/merge_subwatcher_fields.py +++ b/aw_transform/merge_subwatcher_fields.py @@ -61,8 +61,12 @@ def merge_subwatcher_fields( with the **longest overlap duration** is used (attach-longest strategy). This matches heartbeat granularity and avoids splitting base events. """ + if conflict not in ("base_wins", "sub_wins"): + raise ValueError( + f"conflict must be 'base_wins' or 'sub_wins', got {conflict!r}" + ) if not subwatcher_events or not keys: - return base_events + return [deepcopy(e) for e in base_events] # Build a sorted copy so we can do a linear scan sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 513ad2ab..a2562d9f 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,6 +1,7 @@ from pprint import pprint from datetime import datetime, timedelta, timezone +import pytest from aw_core.models import Event from aw_transform import ( filter_period_intersect, @@ -577,20 +578,34 @@ def test_merge_subwatcher_fields_attach_longest(): def test_merge_subwatcher_fields_empty_inputs(): - """Empty sub or keys returns base unchanged.""" + """Empty sub or keys returns a defensive copy of base (not the same list).""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) td1h = timedelta(hours=1) base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] - # Empty subwatcher list + # Empty subwatcher list — returns a new list, data unchanged result = merge_subwatcher_fields(base, [], ["project"]) assert result[0].data == {"app": "vim"} + assert result is not base # Empty keys list sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] result = merge_subwatcher_fields(base, sub, []) assert "project" not in result[0].data + assert result is not base # Both empty result = merge_subwatcher_fields(base, [], []) assert result[0].data == {"app": "vim"} + assert result is not base + + +def test_merge_subwatcher_fields_invalid_conflict(): + """Invalid conflict value raises ValueError immediately.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td1h = timedelta(hours=1) + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [Event(timestamp=now, duration=td1h, data={"project": "p"})] + + with pytest.raises(ValueError, match="conflict must be"): + merge_subwatcher_fields(base, sub, ["project"], conflict="invalid") From a397b0d494577adef5d8d306124db966e8231ae6 Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 5 Jun 2026 21:21:56 +0000 Subject: [PATCH 3/6] fix(transform): split subwatcher enrichment by overlap --- aw_transform/merge_subwatcher_fields.py | 74 ++++++++++++++++--------- tests/test_transforms.py | 73 ++++++++++++++++++------ 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py index 6cb7f3fe..4d971a4a 100644 --- a/aw_transform/merge_subwatcher_fields.py +++ b/aw_transform/merge_subwatcher_fields.py @@ -1,8 +1,9 @@ import logging from copy import deepcopy -from typing import List, Optional +from typing import List, Optional, Tuple from aw_core.models import Event +from timeslot import Timeslot from .filter_period_intersect import _get_event_period @@ -16,13 +17,14 @@ def merge_subwatcher_fields( conflict: str = "base_wins", ) -> List[Event]: """ - For each event in *base_events*, find the longest-overlapping event in - *subwatcher_events* and copy the named *keys* from that subwatcher event - into the base event's ``data`` dict. + Split each event in *base_events* on overlapping subwatcher boundaries and + copy the named *keys* from the matching subwatcher event into each segment's + ``data`` dict. - Timestamps, durations, and event count of *base_events* are **unchanged** - — no phantom events are created. This makes duration/app/title aggregations - stay correct, unlike the ``concat`` workaround. + Unlike the ``concat`` workaround, this does not fabricate extra duration. + App/title/duration aggregations stay correct because the output still covers + exactly the same total time as *base_events*; only the segmentation changes + where subwatcher fields actually change. This is the backend primitive that lets every client (webui, native UIs, exporters) categorize by subwatcher fields (browser ``url``/``$domain``; @@ -42,9 +44,9 @@ def merge_subwatcher_fields( ``"sub_wins"`` — subwatcher fields overwrite base fields. Returns: - A new list of base events with subwatcher fields injected. Events in - *base_events* that have no overlapping subwatcher event are returned - with their original data unchanged. + A new list of base event segments with subwatcher fields injected. + Events in *base_events* that have no overlapping subwatcher event are + returned with their original data unchanged. Example:: @@ -56,10 +58,10 @@ def merge_subwatcher_fields( ) # Now categorize(window_events, ...) can match on "project"/"file" - Note on N:1 overlap: - When multiple subwatcher events overlap a single base event, the one - with the **longest overlap duration** is used (attach-longest strategy). - This matches heartbeat granularity and avoids splitting base events. + Note on overlap: + Base events are split at the clipped subwatcher boundaries. When + multiple subwatcher events overlap the same output segment, the one + with the **longest overlap with that segment** is used. """ if conflict not in ("base_wins", "sub_wins"): raise ValueError( @@ -74,8 +76,8 @@ def merge_subwatcher_fields( result: List[Event] = [] for base in base_events: base_period = _get_event_period(base) - best_sub: Optional[Event] = None - best_overlap_secs: float = 0.0 + overlapping: List[Tuple[Event, Timeslot]] = [] + boundaries = {base_period.start, base_period.end} for sub in sub_sorted: sub_period = _get_event_period(sub) @@ -87,19 +89,41 @@ def merge_subwatcher_fields( continue ip = base_period.intersection(sub_period) if ip: + overlapping.append((sub, sub_period)) + boundaries.add(ip.start) + boundaries.add(ip.end) + + if not overlapping: + result.append(deepcopy(base)) + continue + + boundary_points = sorted(boundaries) + for start, end in zip(boundary_points, boundary_points[1:]): + segment_period = Timeslot(start, end) + best_sub: Optional[Event] = None + best_overlap_secs: float = 0.0 + + for sub, sub_period in overlapping: + ip = segment_period.intersection(sub_period) + if not ip: + continue + overlap_secs = ip.duration.total_seconds() if overlap_secs > best_overlap_secs: best_overlap_secs = overlap_secs best_sub = sub - enriched = deepcopy(base) - if best_sub is not None: - for key in keys: - if key in best_sub.data: - if conflict == "base_wins" and key in enriched.data: - pass # base keeps its value - else: - enriched.data[key] = best_sub.data[key] - result.append(enriched) + enriched = deepcopy(base) + enriched.timestamp = start + enriched.duration = end - start + + if best_sub is not None: + for key in keys: + if key in best_sub.data: + if conflict == "base_wins" and key in enriched.data: + continue + enriched.data[key] = deepcopy(best_sub.data[key]) + + result.append(enriched) return result diff --git a/tests/test_transforms.py b/tests/test_transforms.py index a2562d9f..a80e08ef 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -474,7 +474,7 @@ def test_union_no_overlap(): def test_merge_subwatcher_fields_basic(): - """Subwatcher fields are injected into overlapping base events.""" + """Fully overlapping subwatcher fields are injected without changing duration.""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) td1h = timedelta(hours=1) @@ -501,11 +501,42 @@ def test_merge_subwatcher_fields_basic(): # Subwatcher fields injected assert result[0].data["project"] == "myproject" assert result[0].data["language"] == "python" - # Timestamps and durations unchanged + # Exact overlap means no extra segmentation assert result[0].timestamp == now assert result[0].duration == td1h +def test_merge_subwatcher_fields_partial_overlap_splits_base(): + """Partial overlap only enriches the covered slice of the base event.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td15m = timedelta(minutes=15) + td30m = timedelta(minutes=30) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [ + Event( + timestamp=now + td15m, + duration=td30m, + data={"project": "myproject"}, + ) + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 3 + assert [event.duration for event in result] == [td15m, td30m, td15m] + assert [event.timestamp for event in result] == [ + now, + now + td15m, + now + td15m + td30m, + ] + assert "project" not in result[0].data + assert result[1].data["project"] == "myproject" + assert "project" not in result[2].data + assert sum_durations(result) == td1h + + def test_merge_subwatcher_fields_no_overlap(): """Base events with no overlapping subwatcher event are returned unchanged.""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) @@ -556,25 +587,33 @@ def test_merge_subwatcher_fields_sub_wins_conflict(): assert result[0].data["file"] == "sub.py" -def test_merge_subwatcher_fields_attach_longest(): - """When multiple subwatcher events overlap a base event, the longest overlap wins.""" +def test_merge_subwatcher_fields_multiple_subsegments_preserve_duration(): + """Multiple subwatcher slices should not over-attribute time to any key.""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) - td30m = timedelta(minutes=30) + td5m = timedelta(minutes=5) + td10m = timedelta(minutes=10) + td20m = timedelta(minutes=20) td1h = timedelta(hours=1) - # Base event: 12:00 – 13:00 base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] - # Short overlap: 12:00 – 12:30 (30 min) - sub_short = Event(timestamp=now, duration=td30m, data={"project": "short"}) - # Long overlap: 12:30 – 13:00 (30 min) — same overlap here, but added first - # Make one clearly longer: 11:45 – 13:00 (75 min overlap into base) - sub_long = Event( - timestamp=now - timedelta(minutes=15), - duration=td1h + timedelta(minutes=15), - data={"project": "long"}, - ) - result = merge_subwatcher_fields(base, [sub_short, sub_long], ["project"]) - assert result[0].data["project"] == "long" + sub = [ + Event(timestamp=now + td5m, duration=td10m, data={"project": "p1"}), + Event( + timestamp=now + timedelta(minutes=35), + duration=td20m, + data={"project": "p2"}, + ), + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + project_durations = { + event.data["project"]: event.duration + for event in result + if "project" in event.data + } + assert project_durations == {"p1": td10m, "p2": td20m} + assert sum_durations(result) == td1h def test_merge_subwatcher_fields_empty_inputs(): From c46cd001149d3ffadf61f0c32ad25aa7739a2423 Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 5 Jun 2026 21:23:07 +0000 Subject: [PATCH 4/6] fix(transform): collapse adjacent split segments --- aw_transform/merge_subwatcher_fields.py | 19 +++++++++++---- tests/test_transforms.py | 32 +++++++++++++++---------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py index 4d971a4a..6bb4ead1 100644 --- a/aw_transform/merge_subwatcher_fields.py +++ b/aw_transform/merge_subwatcher_fields.py @@ -59,9 +59,9 @@ def merge_subwatcher_fields( # Now categorize(window_events, ...) can match on "project"/"file" Note on overlap: - Base events are split at the clipped subwatcher boundaries. When - multiple subwatcher events overlap the same output segment, the one - with the **longest overlap with that segment** is used. + Base events are split at the clipped subwatcher boundaries so each + output segment only carries the subwatcher fields that were actually + present during that slice of time. """ if conflict not in ("base_wins", "sub_wins"): raise ValueError( @@ -98,6 +98,7 @@ def merge_subwatcher_fields( continue boundary_points = sorted(boundaries) + base_segments: List[Event] = [] for start, end in zip(boundary_points, boundary_points[1:]): segment_period = Timeslot(start, end) best_sub: Optional[Event] = None @@ -124,6 +125,16 @@ def merge_subwatcher_fields( continue enriched.data[key] = deepcopy(best_sub.data[key]) - result.append(enriched) + if ( + base_segments + and base_segments[-1].timestamp + base_segments[-1].duration + == enriched.timestamp + and base_segments[-1].data == enriched.data + ): + base_segments[-1].duration += enriched.duration + else: + base_segments.append(enriched) + + result.extend(base_segments) return result diff --git a/tests/test_transforms.py b/tests/test_transforms.py index a80e08ef..ade43365 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -588,31 +588,37 @@ def test_merge_subwatcher_fields_sub_wins_conflict(): def test_merge_subwatcher_fields_multiple_subsegments_preserve_duration(): - """Multiple subwatcher slices should not over-attribute time to any key.""" + """Repeated subwatcher values aggregate to their true covered duration.""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) - td5m = timedelta(minutes=5) - td10m = timedelta(minutes=10) - td20m = timedelta(minutes=20) + td15m = timedelta(minutes=15) td1h = timedelta(hours=1) base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] sub = [ - Event(timestamp=now + td5m, duration=td10m, data={"project": "p1"}), + Event(timestamp=now, duration=td15m, data={"project": "alpha"}), + Event(timestamp=now + td15m, duration=td15m, data={"project": "beta"}), Event( - timestamp=now + timedelta(minutes=35), - duration=td20m, - data={"project": "p2"}, + timestamp=now + 2 * td15m, + duration=td15m, + data={"project": "alpha"}, ), ] result = merge_subwatcher_fields(base, sub, ["project"]) - project_durations = { - event.data["project"]: event.duration - for event in result - if "project" in event.data + by_project = { + event.data.get("project"): event.duration + for event in merge_events_by_keys(result, ["project"]) + } + by_app = { + event.data.get("app"): event.duration + for event in merge_events_by_keys(result, ["app"]) } - assert project_durations == {"p1": td10m, "p2": td20m} + + assert by_project["alpha"] == 2 * td15m + assert by_project["beta"] == td15m + assert by_project[None] == td15m + assert by_app["vim"] == td1h assert sum_durations(result) == td1h From 5823d153f296a7965020bee675a266e6f9777649 Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 5 Jun 2026 21:41:35 +0000 Subject: [PATCH 5/6] fix(query2): normalize merge_subwatcher_fields conflict errors --- aw_query/functions.py | 5 ++++- tests/test_query2.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/aw_query/functions.py b/aw_query/functions.py index a87d3324..1421b084 100644 --- a/aw_query/functions.py +++ b/aw_query/functions.py @@ -233,7 +233,10 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]: def q2_merge_subwatcher_fields( base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins" ) -> List[Event]: - return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict) + try: + return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict) + except ValueError as exc: + raise QueryFunctionException(str(exc)) from None @q2_function(chunk_events_by_key) diff --git a/tests/test_query2.py b/tests/test_query2.py index db413e28..c3c7de2d 100644 --- a/tests/test_query2.py +++ b/tests/test_query2.py @@ -323,6 +323,21 @@ def test_query2_function_invalid_argument_count(): query(qname, example_query, starttime, endtime, ds) +def test_query2_merge_subwatcher_fields_invalid_conflict(): + ds = mock_ds + qname = "asd" + starttime = iso8601.parse_date("1970-01-01") + endtime = iso8601.parse_date("1970-01-02") + example_query = """ + base = []; + sub = []; + RETURN = merge_subwatcher_fields(base, sub, ["project"], "invalid"); + """ + + with pytest.raises(QueryFunctionException, match="conflict must be"): + query(qname, example_query, starttime, endtime, ds) + + @pytest.mark.parametrize("datastore", param_datastore_objects()) def test_query2_function_in_function(datastore): qname = "asd" From e83b17f92292f8a512d627cb4f77a3c9788ce266 Mon Sep 17 00:00:00 2001 From: Bob Date: Fri, 5 Jun 2026 21:51:06 +0000 Subject: [PATCH 6/6] fix(transform): prefer latest overlapping subwatcher event --- aw_transform/merge_subwatcher_fields.py | 24 ++++++++++++++------ tests/test_transforms.py | 29 +++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/aw_transform/merge_subwatcher_fields.py b/aw_transform/merge_subwatcher_fields.py index 6bb4ead1..2f860403 100644 --- a/aw_transform/merge_subwatcher_fields.py +++ b/aw_transform/merge_subwatcher_fields.py @@ -61,7 +61,9 @@ def merge_subwatcher_fields( Note on overlap: Base events are split at the clipped subwatcher boundaries so each output segment only carries the subwatcher fields that were actually - present during that slice of time. + present during that slice of time. If multiple subwatcher events cover + the same slice, the most recent one wins so a later transition does not + get smeared backward by an older, longer pulse. """ if conflict not in ("base_wins", "sub_wins"): raise ValueError( @@ -102,17 +104,25 @@ def merge_subwatcher_fields( for start, end in zip(boundary_points, boundary_points[1:]): segment_period = Timeslot(start, end) best_sub: Optional[Event] = None - best_overlap_secs: float = 0.0 + best_sub_period: Optional[Timeslot] = None for sub, sub_period in overlapping: - ip = segment_period.intersection(sub_period) - if not ip: + if not segment_period.intersection(sub_period): continue - overlap_secs = ip.duration.total_seconds() - if overlap_secs > best_overlap_secs: - best_overlap_secs = overlap_secs + # Later subwatcher events should supersede older overlapping + # pulses on the shared slice instead of letting stale data linger. + if ( + best_sub is None + or sub.timestamp > best_sub.timestamp + or ( + sub.timestamp == best_sub.timestamp + and best_sub_period is not None + and sub_period.end > best_sub_period.end + ) + ): best_sub = sub + best_sub_period = sub_period enriched = deepcopy(base) enriched.timestamp = start diff --git a/tests/test_transforms.py b/tests/test_transforms.py index ade43365..df627dbd 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -537,6 +537,35 @@ def test_merge_subwatcher_fields_partial_overlap_splits_base(): assert sum_durations(result) == td1h +def test_merge_subwatcher_fields_overlapping_subwatchers_prefer_latest_start(): + """Newer overlapping subwatcher events take over from their own start time.""" + now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) + td20m = timedelta(minutes=20) + td40m = timedelta(minutes=40) + td1h = timedelta(hours=1) + + base = [Event(timestamp=now, duration=td1h, data={"app": "vim"})] + sub = [ + Event(timestamp=now, duration=td40m, data={"project": "alpha"}), + Event(timestamp=now + td20m, duration=td40m, data={"project": "beta"}), + ] + + result = merge_subwatcher_fields(base, sub, ["project"]) + + assert len(result) == 2 + assert [event.timestamp for event in result] == [now, now + td20m] + assert [event.duration for event in result] == [td20m, td40m] + assert [event.data.get("project") for event in result] == ["alpha", "beta"] + + by_project = { + event.data.get("project"): event.duration + for event in merge_events_by_keys(result, ["project"]) + } + assert by_project["alpha"] == td20m + assert by_project["beta"] == td40m + assert sum_durations(result) == td1h + + def test_merge_subwatcher_fields_no_overlap(): """Base events with no overlapping subwatcher event are returned unchanged.""" now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)