Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions aw_query/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
flood,
limit_events,
merge_events_by_keys,
merge_subwatcher_fields,
period_union,
simplify_string,
sort_by_duration,
Expand Down Expand Up @@ -227,6 +228,17 @@ def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]:
return merge_events_by_keys(events, keys)


@q2_function(merge_subwatcher_fields)
@q2_typecheck
def q2_merge_subwatcher_fields(
base_events: list, subwatcher_events: list, keys: list, conflict: str = "base_wins"
) -> List[Event]:
try:
return merge_subwatcher_fields(base_events, subwatcher_events, keys, conflict)
except ValueError as exc:
raise QueryFunctionException(str(exc)) from None


@q2_function(chunk_events_by_key)
@q2_typecheck
def q2_chunk_events_by_key(events: list, key: str) -> List[Event]:
Expand Down
2 changes: 2 additions & 0 deletions aw_transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limit_events,
)
from .split_url_events import split_url_events
from .merge_subwatcher_fields import merge_subwatcher_fields
from .simplify import simplify_string
from .flood import flood
from .classify import categorize, tag, Rule
Expand All @@ -33,6 +34,7 @@
"heartbeat_reduce",
"heartbeat_merge",
"merge_events_by_keys",
"merge_subwatcher_fields",
"chunk_events_by_key",
"limit_events",
"filter_keyvals",
Expand Down
150 changes: 150 additions & 0 deletions aw_transform/merge_subwatcher_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import logging
from copy import deepcopy
from typing import List, Optional, Tuple

from aw_core.models import Event
from timeslot import Timeslot

from .filter_period_intersect import _get_event_period

logger = logging.getLogger(__name__)


def merge_subwatcher_fields(
base_events: List[Event],
subwatcher_events: List[Event],
keys: List[str],
conflict: str = "base_wins",
) -> List[Event]:
"""
Split each event in *base_events* on overlapping subwatcher boundaries and
copy the named *keys* from the matching subwatcher event into each segment's
``data`` dict.

Unlike the ``concat`` workaround, this does not fabricate extra duration.
App/title/duration aggregations stay correct because the output still covers
exactly the same total time as *base_events*; only the segmentation changes
where subwatcher fields actually change.

This is the backend primitive that lets every client (webui, native UIs,
exporters) categorize by subwatcher fields (browser ``url``/``$domain``;
editor ``project``/``file``/``language``) without bespoke per-watcher
client-side code.

Args:
base_events: The canonical window/afk-filtered stream to enrich.
subwatcher_events: Events from a subwatcher bucket (e.g. aw-watcher-vim,
aw-watcher-web). Should already be clipped via
``filter_period_intersect`` before passing here.
keys: Which keys to copy from the subwatcher event into the base event.
Keys already present in the base event are left untouched when
``conflict="base_wins"`` (default).
conflict: ``"base_wins"`` (default) — base event's existing keys are
never overwritten; subwatcher fields are purely additive.
``"sub_wins"`` — subwatcher fields overwrite base fields.

Returns:
A new list of base event segments with subwatcher fields injected.
Events in *base_events* that have no overlapping subwatcher event are
returned with their original data unchanged.

Example::

window_events = query_bucket(bid_window)
editor_events = flood(query_bucket(bid_editor))
editor_events = filter_period_intersect(editor_events, window_events)
window_events = merge_subwatcher_fields(
window_events, editor_events, ["project", "file", "language"]
)
# Now categorize(window_events, ...) can match on "project"/"file"

Note on overlap:
Base events are split at the clipped subwatcher boundaries so each
output segment only carries the subwatcher fields that were actually
present during that slice of time. If multiple subwatcher events cover
the same slice, the most recent one wins so a later transition does not
get smeared backward by an older, longer pulse.
"""
if conflict not in ("base_wins", "sub_wins"):
raise ValueError(
f"conflict must be 'base_wins' or 'sub_wins', got {conflict!r}"
)
if not subwatcher_events or not keys:
return [deepcopy(e) for e in base_events]

# Build a sorted copy so we can do a linear scan
sub_sorted = sorted(subwatcher_events, key=lambda e: e.timestamp)

result: List[Event] = []
for base in base_events:
base_period = _get_event_period(base)
overlapping: List[Tuple[Event, Timeslot]] = []
boundaries = {base_period.start, base_period.end}

for sub in sub_sorted:
sub_period = _get_event_period(sub)
# Once sub starts after base ends we can stop
if sub_period.start >= base_period.end:
break
# Skip sub events that end before base starts
if sub_period.end <= base_period.start:
continue
ip = base_period.intersection(sub_period)
if ip:
overlapping.append((sub, sub_period))
boundaries.add(ip.start)
boundaries.add(ip.end)

if not overlapping:
result.append(deepcopy(base))
continue

boundary_points = sorted(boundaries)
base_segments: List[Event] = []
for start, end in zip(boundary_points, boundary_points[1:]):
segment_period = Timeslot(start, end)
best_sub: Optional[Event] = None
best_sub_period: Optional[Timeslot] = None

for sub, sub_period in overlapping:
if not segment_period.intersection(sub_period):
continue

# Later subwatcher events should supersede older overlapping
# pulses on the shared slice instead of letting stale data linger.
if (
best_sub is None
or sub.timestamp > best_sub.timestamp
or (
sub.timestamp == best_sub.timestamp
and best_sub_period is not None
and sub_period.end > best_sub_period.end
)
):
best_sub = sub
best_sub_period = sub_period

enriched = deepcopy(base)
enriched.timestamp = start
enriched.duration = end - start

if best_sub is not None:
for key in keys:
if key in best_sub.data:
if conflict == "base_wins" and key in enriched.data:
continue
enriched.data[key] = deepcopy(best_sub.data[key])

if (
base_segments
and base_segments[-1].timestamp + base_segments[-1].duration
== enriched.timestamp
and base_segments[-1].data == enriched.data
):
base_segments[-1].duration += enriched.duration
else:
base_segments.append(enriched)

result.extend(base_segments)

return result
15 changes: 15 additions & 0 deletions tests/test_query2.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ def test_query2_function_invalid_argument_count():
query(qname, example_query, starttime, endtime, ds)


def test_query2_merge_subwatcher_fields_invalid_conflict():
ds = mock_ds
qname = "asd"
starttime = iso8601.parse_date("1970-01-01")
endtime = iso8601.parse_date("1970-01-02")
example_query = """
base = [];
sub = [];
RETURN = merge_subwatcher_fields(base, sub, ["project"], "invalid");
"""

with pytest.raises(QueryFunctionException, match="conflict must be"):
query(qname, example_query, starttime, endtime, ds)


@pytest.mark.parametrize("datastore", param_datastore_objects())
def test_query2_function_in_function(datastore):
qname = "asd"
Expand Down
Loading
Loading