Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions aw_transform/merge_subwatcher_fields.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from copy import deepcopy
from datetime import timedelta
from typing import List, Optional, Tuple

from aw_core.models import Event
Expand Down Expand Up @@ -78,19 +79,31 @@ def merge_subwatcher_fields(
result: List[Event] = []
for base in base_events:
base_period = _get_event_period(base)
base_is_instant = base_period.duration == timedelta(0)
overlapping: List[Tuple[Event, Timeslot]] = []
boundaries = {base_period.start, base_period.end}

for sub in sub_sorted:
sub_period = _get_event_period(sub)
# Once sub starts after base ends we can stop
if sub_period.start >= base_period.end:
if sub_period.start >= base_period.end and not base_is_instant:
break
# Skip sub events that end before base starts
if sub_period.end <= base_period.start:
continue
ip = base_period.intersection(sub_period)
if ip:
if not ip:
continue
if base_is_instant:
# An instantaneous base needs whichever sub covers that
# instant, even though the intersection itself is zero-length.
overlapping.append((sub, sub_period))
elif ip.duration > timedelta(0):
# Zero-duration intersections on a non-instant base mean a
# sub only touched a single boundary point (e.g. instantaneous
# sub event). That doesn't represent a slice where the sub was
# actually active, so it must not introduce a split or color
# any segment.
overlapping.append((sub, sub_period))
boundaries.add(ip.start)
boundaries.add(ip.end)
Expand All @@ -99,6 +112,29 @@ def merge_subwatcher_fields(
result.append(deepcopy(base))
continue

# A zero-duration base event has no slice for boundaries to split,
# but it would otherwise be silently dropped by the segment loop
# below (boundary_points has a single element so the zip is empty).
# Preserve it as a single enriched event from whichever overlapping
# sub covers the instant, using the same "latest sub wins" rule.
if base_is_instant:
instant_best_sub, instant_best_period = overlapping[0]
for sub, sub_period in overlapping[1:]:
if sub.timestamp > instant_best_sub.timestamp or (
sub.timestamp == instant_best_sub.timestamp
and sub_period.end > instant_best_period.end
):
instant_best_sub = sub
instant_best_period = sub_period
enriched = deepcopy(base)
for key in keys:
if key in instant_best_sub.data:
if conflict == "base_wins" and key in enriched.data:
continue
enriched.data[key] = deepcopy(instant_best_sub.data[key])
result.append(enriched)
continue

boundary_points = sorted(boundaries)
base_segments: List[Event] = []
for start, end in zip(boundary_points, boundary_points[1:]):
Expand All @@ -107,7 +143,10 @@ def merge_subwatcher_fields(
best_sub_period: Optional[Timeslot] = None

for sub, sub_period in overlapping:
if not segment_period.intersection(sub_period):
seg_ip = segment_period.intersection(sub_period)
# Skip subs that only touch this segment at a single point;
# they don't actually cover any of its duration.
if not seg_ip or seg_ip.duration == timedelta(0):
continue

# Later subwatcher events should supersede older overlapping
Expand Down
66 changes: 66 additions & 0 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,3 +683,69 @@ def test_merge_subwatcher_fields_invalid_conflict():

with pytest.raises(ValueError, match="conflict must be"):
merge_subwatcher_fields(base, sub, ["project"], conflict="invalid")


def test_merge_subwatcher_fields_zero_duration_base_event_preserved():
"""A zero-duration base event with overlapping sub must not be silently
dropped. It should be kept as a single zero-duration event enriched with
the active sub's fields."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
base = [Event(timestamp=now, duration=timedelta(0), data={"app": "vim"})]
sub = [
Event(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=10),
data={"project": "P"},
)
]

result = merge_subwatcher_fields(base, sub, ["project"])

assert len(result) == 1
assert result[0].timestamp == now
assert result[0].duration == timedelta(0)
assert result[0].data == {"app": "vim", "project": "P"}


def test_merge_subwatcher_fields_zero_duration_base_event_no_overlap_preserved():
"""A zero-duration base event without any overlapping sub must still be
returned untouched (already worked via fast path, locked in by this test)."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
base = [Event(timestamp=now, duration=timedelta(0), data={"app": "vim"})]
sub = [
Event(
timestamp=now + timedelta(minutes=5),
duration=timedelta(seconds=10),
data={"project": "P"},
)
]

result = merge_subwatcher_fields(base, sub, ["project"])

assert len(result) == 1
assert result[0].timestamp == now
assert result[0].duration == timedelta(0)
assert result[0].data == {"app": "vim"}


def test_merge_subwatcher_fields_zero_duration_sub_does_not_color_base():
"""An instantaneous (zero-duration) sub event whose timestamp falls inside
a base event must not split or color the base. The sub was active for
zero time, so there is no slice of the base to enrich."""
now = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)
base = [Event(timestamp=now, duration=timedelta(seconds=10), data={"app": "vim"})]
sub = [
Event(
timestamp=now + timedelta(seconds=3),
duration=timedelta(0),
data={"project": "P"},
)
]

result = merge_subwatcher_fields(base, sub, ["project"])

# Base should pass through unchanged: no split, no color.
assert len(result) == 1
assert result[0].timestamp == now
assert result[0].duration == timedelta(seconds=10)
assert result[0].data == {"app": "vim"}
Loading