Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,18 @@ only if the subscriber needs to run cleanup before failing.
triggerer restart, the broker redelivers messages that were never
advanced. Subscribers must therefore be idempotent.

When multiple triggers sharing the same key restart together, the first
to re-subscribe creates a fresh group and polling starts immediately.
Triggers that re-subscribe later join as late subscribers (outside the
snapshot of any already-broadcast event), so they may miss events
committed in the window between the first subscription and their own.
Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
number of seconds (e.g. ``2.0``) to delay the start of polling after a
new group is created, giving concurrent re-subscriptions time to join
before any event is broadcast. This is a best-effort window — it reduces
but does not eliminate the risk of a slow-rejoining trigger missing
events.

**Durability**: the broker advance is gated on persistence. A subscriber's
resolution completes only after every ``TriggerEvent`` it derived from the
event has been stored in the metadata database; the confirmation reaches
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68888.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``[triggerer] shared_stream_cohort_grace_period`` to delay the start of polling after a shared-stream group is created
13 changes: 13 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,19 @@ triggerer:
type: float
example: ~
default: "300.0"
shared_stream_cohort_grace_period:
description: |
Seconds to delay the start of polling after a shared-stream group is created, giving triggers
that share the same key a window to subscribe before any event is broadcast. The default of 0
starts polling immediately (no delay). Set to a small positive value (e.g. 2.0–5.0) to reduce
the chance of triggers missing events on triggerer restart, when multiple triggers sharing a key
re-subscribe concurrently and the first to arrive would otherwise start the poll before the
others have joined. This is a best-effort window: triggers that take longer than the grace period
to re-subscribe can still miss events committed after polling starts.
version_added: 3.3.0
type: float
example: ~
default: "0.0"
kerberos:
description: ~
options:
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ def __init__(self):
log=self.log,
max_subscriber_queue=conf.getint("triggerer", "shared_stream_subscriber_queue_size"),
ack_timeout=conf.getfloat("triggerer", "shared_stream_ack_timeout"),
cohort_grace_period=conf.getfloat("triggerer", "shared_stream_cohort_grace_period"),
)
self.blocked_main_thread_warning_threshold = conf.getfloat(
"triggerer", "blocked_main_thread_warning_threshold"
Expand Down
30 changes: 30 additions & 0 deletions airflow-core/src/airflow/triggers/shared_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@
event): the pending advances are abandoned and the broker redelivers
those events.

When multiple triggers sharing the same key restart together, the first
to re-subscribe creates a fresh group and polling begins immediately.
Triggers that re-subscribe later join as ordinary late subscribers (not
counted in the snapshot of earlier events), so they may miss events
committed during the window between the first subscription and their own.
Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive
number of seconds to delay the start of polling after a group is created,
giving concurrent re-subscriptions time to join before any event is
broadcast. This is a best-effort window: triggers that take longer than
the grace period to re-subscribe still miss events committed after
polling starts.

**``shared_stream_subscriber_queue_size`` in ack mode**: the bound is
still "unprocessed raw events per subscriber". The manager does **not**
wait for outstanding resolutions before pulling the next event from the
Expand Down Expand Up @@ -508,6 +520,7 @@ def __init__(
on_poll_terminate: Callable[[_SharedStreamGroup], None],
max_subscriber_queue: int,
ack_timeout: float,
cohort_grace_period: float = 0.0,
log: BoundLogger,
_now: Callable[[], float] = time.monotonic,
) -> None:
Expand All @@ -518,6 +531,7 @@ def __init__(
self._on_poll_terminate = on_poll_terminate
self._max_subscriber_queue = max_subscriber_queue
self._ack_timeout = ack_timeout
self._cohort_grace_period = cohort_grace_period
self._now = _now
self._subscribers: dict[int, asyncio.Queue] = {}
# Subscribers already force-failed (queue overflow or ack timeout);
Expand Down Expand Up @@ -685,6 +699,8 @@ async def _poll(self) -> None:
producer: SharedStreamProducer | None = None
terminal_exc: BaseException | None = None
try:
if self._cohort_grace_period > 0:
await asyncio.sleep(self._cohort_grace_period)
if ack_required:
# A factory failure flows through the terminal broadcast
# path below, like any other poll failure.
Expand Down Expand Up @@ -1161,6 +1177,17 @@ class SharedStreamManager:

The manager is single-event-loop and not thread-safe. The triggerer's
``TriggerRunner`` is its sole owner.

:param log: Bound logger; defaults to the module logger.
:param max_subscriber_queue: Per-subscriber buffer size. A subscriber whose queue is full
is force-failed rather than blocking the poll loop.
:param ack_timeout: Per-event ack timeout in seconds. A subscriber that has not finished
processing an event within this window is force-failed via :class:`AckTimeout`.
:param cohort_grace_period: Seconds to delay the start of polling after a new group is
created. When > 0, the poll loop sleeps for this duration before calling
``open_stream``/``open_shared_stream``, giving triggers that share the same key a window
to subscribe before any event is broadcast — useful on triggerer restart where concurrent
re-subscriptions would otherwise race against the first poll. Default 0 (no delay).
"""

def __init__(
Expand All @@ -1169,11 +1196,13 @@ def __init__(
log: BoundLogger | None = None,
max_subscriber_queue: int = DEFAULT_SUBSCRIBER_QUEUE_MAX,
ack_timeout: float = DEFAULT_ACK_TIMEOUT,
cohort_grace_period: float = 0.0,
_now: Callable[[], float] = time.monotonic,
) -> None:
self.log = log or structlog.get_logger(__name__)
self._max_subscriber_queue = max_subscriber_queue
self._ack_timeout = ack_timeout
self._cohort_grace_period = cohort_grace_period
self._now = _now
self._groups: dict[Hashable, _SharedStreamGroup] = {}
# Allocator for trigger-event persist-confirmation seqs; unique per
Expand Down Expand Up @@ -1205,6 +1234,7 @@ def subscribe(
on_poll_terminate=self._handle_poll_terminate,
max_subscriber_queue=self._max_subscriber_queue,
ack_timeout=self._ack_timeout,
cohort_grace_period=self._cohort_grace_period,
log=self.log,
_now=self._now,
)
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ def test_shared_stream_ack_timeout_config_wiring(self) -> None:
trigger_runner = TriggerRunner()
assert trigger_runner._shared_streams._ack_timeout == 60.0

@conf_vars({("triggerer", "shared_stream_cohort_grace_period"): "3.0"})
def test_shared_stream_cohort_grace_period_config_wiring(self) -> None:
"""[triggerer] shared_stream_cohort_grace_period is wired into SharedStreamManager."""
trigger_runner = TriggerRunner()
assert trigger_runner._shared_streams._cohort_grace_period == 3.0

@pytest.mark.asyncio
async def test_block_watchdog_does_not_log_when_threshold_is_not_exceeded(self) -> None:
with conf_vars({("triggerer", "blocked_main_thread_warning_threshold"): "0.5"}):
Expand Down
75 changes: 75 additions & 0 deletions airflow-core/tests/unit/triggers/test_shared_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,81 @@ async def filter_shared_stream(self, shared_stream):
await manager.unsubscribe(2, key)


@pytest.mark.asyncio
async def test_cohort_grace_period_includes_late_joining_subscriber():
"""Subscribers that join during the cohort grace period are in the first event's snapshot."""
first_event_processed = asyncio.Event()

class _GraceProducer(SharedStreamProducer):
async def open_stream(self):
yield {"msg": "first"}, "p1"
# Fires after _poll has processed the yield and looped back for the next item.
first_event_processed.set()
await asyncio.Event().wait()

async def advance(self, batch):
pass

class _GraceTrigger(_ProgrammableSharedStreamTrigger):
@classmethod
def create_shared_stream_producer(cls, kwargs):
return _GraceProducer()

async def filter_shared_stream(self, shared_stream):
async for _ in shared_stream:
yield TriggerEvent({})

t1, t2 = _GraceTrigger(), _GraceTrigger()
key = t1.shared_stream_key()
manager = SharedStreamManager(cohort_grace_period=0.05)
try:
manager.subscribe(trigger_id=1, trigger=t1, key=key)
# Poll is sleeping through the grace period; t2 joins before polling starts.
manager.subscribe(trigger_id=2, trigger=t2, key=key)

await asyncio.wait_for(first_event_processed.wait(), timeout=1.0)

entry = next(iter(manager._groups[key]._outstanding.values()), None)
assert entry is not None
assert entry.pending == {1, 2}, "both subscribers must be in the initial cohort snapshot"
finally:
await manager.unsubscribe(1, key)
await manager.unsubscribe(2, key)


@pytest.mark.asyncio
async def test_cohort_grace_period_zero_starts_poll_immediately():
"""With cohort_grace_period=0 (default), polling starts without any delay."""
poll_reached = asyncio.Event()

class _ImmediateProducer(SharedStreamProducer):
async def open_stream(self):
poll_reached.set()
await asyncio.Event().wait()
yield {}, "p" # pragma: no cover

async def advance(self, batch):
pass

class _ImmediateTrigger(_ProgrammableSharedStreamTrigger):
@classmethod
def create_shared_stream_producer(cls, kwargs):
return _ImmediateProducer()

async def filter_shared_stream(self, shared_stream):
async for _ in shared_stream:
yield TriggerEvent({}) # pragma: no cover

t = _ImmediateTrigger()
key = t.shared_stream_key()
manager = SharedStreamManager(cohort_grace_period=0.0)
try:
manager.subscribe(trigger_id=1, trigger=t, key=key)
await asyncio.wait_for(poll_reached.wait(), timeout=0.5)
finally:
await manager.unsubscribe(1, key)


@pytest.mark.asyncio
async def test_subscriber_unsubscribe_during_outstanding_event():
"""When a subscriber leaves while still on an event, the group advances without it."""
Expand Down
Loading