diff --git a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst index b60b39cf821f3..693632f6a75dc 100644 --- a/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst +++ b/airflow-core/docs/authoring-and-scheduling/event-scheduling.rst @@ -410,6 +410,18 @@ only if the subscriber needs to run cleanup before failing. triggerer restart, the broker redelivers messages that were never advanced. Subscribers must therefore be idempotent. +When multiple triggers sharing the same key restart together, the first +to re-subscribe creates a fresh group and polling starts immediately. +Triggers that re-subscribe later join as late subscribers (outside the +snapshot of any already-broadcast event), so they may miss events +committed in the window between the first subscription and their own. +Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive +number of seconds (e.g. ``2.0``) to delay the start of polling after a +new group is created, giving concurrent re-subscriptions time to join +before any event is broadcast. This is a best-effort window — it reduces +but does not eliminate the risk of a slow-rejoining trigger missing +events. + **Durability**: the broker advance is gated on persistence. A subscriber's resolution completes only after every ``TriggerEvent`` it derived from the event has been stored in the metadata database; the confirmation reaches diff --git a/airflow-core/newsfragments/68888.feature.rst b/airflow-core/newsfragments/68888.feature.rst new file mode 100644 index 0000000000000..456f9907e9dfe --- /dev/null +++ b/airflow-core/newsfragments/68888.feature.rst @@ -0,0 +1 @@ +Add ``[triggerer] shared_stream_cohort_grace_period`` to delay the start of polling after a shared-stream group is created diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index d8bbd8442b6a6..660ede631b358 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2947,6 +2947,19 @@ triggerer: type: float example: ~ default: "300.0" + shared_stream_cohort_grace_period: + description: | + Seconds to delay the start of polling after a shared-stream group is created, giving triggers + that share the same key a window to subscribe before any event is broadcast. The default of 0 + starts polling immediately (no delay). Set to a small positive value (e.g. 2.0–5.0) to reduce + the chance of triggers missing events on triggerer restart, when multiple triggers sharing a key + re-subscribe concurrently and the first to arrive would otherwise start the poll before the + others have joined. This is a best-effort window: triggers that take longer than the grace period + to re-subscribe can still miss events committed after polling starts. + version_added: 3.3.0 + type: float + example: ~ + default: "0.0" kerberos: description: ~ options: diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 4e5249fee7941..3e1c84fb1e4b5 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1208,6 +1208,7 @@ def __init__(self): log=self.log, max_subscriber_queue=conf.getint("triggerer", "shared_stream_subscriber_queue_size"), ack_timeout=conf.getfloat("triggerer", "shared_stream_ack_timeout"), + cohort_grace_period=conf.getfloat("triggerer", "shared_stream_cohort_grace_period"), ) self.blocked_main_thread_warning_threshold = conf.getfloat( "triggerer", "blocked_main_thread_warning_threshold" diff --git a/airflow-core/src/airflow/triggers/shared_stream.py b/airflow-core/src/airflow/triggers/shared_stream.py index 93ccc3e80dafa..4b8ae16c8fb65 100644 --- a/airflow-core/src/airflow/triggers/shared_stream.py +++ b/airflow-core/src/airflow/triggers/shared_stream.py @@ -110,6 +110,18 @@ event): the pending advances are abandoned and the broker redelivers those events. +When multiple triggers sharing the same key restart together, the first +to re-subscribe creates a fresh group and polling begins immediately. +Triggers that re-subscribe later join as ordinary late subscribers (not +counted in the snapshot of earlier events), so they may miss events +committed during the window between the first subscription and their own. +Set ``[triggerer] shared_stream_cohort_grace_period`` to a positive +number of seconds to delay the start of polling after a group is created, +giving concurrent re-subscriptions time to join before any event is +broadcast. This is a best-effort window: triggers that take longer than +the grace period to re-subscribe still miss events committed after +polling starts. + **``shared_stream_subscriber_queue_size`` in ack mode**: the bound is still "unprocessed raw events per subscriber". The manager does **not** wait for outstanding resolutions before pulling the next event from the @@ -508,6 +520,7 @@ def __init__( on_poll_terminate: Callable[[_SharedStreamGroup], None], max_subscriber_queue: int, ack_timeout: float, + cohort_grace_period: float = 0.0, log: BoundLogger, _now: Callable[[], float] = time.monotonic, ) -> None: @@ -518,6 +531,7 @@ def __init__( self._on_poll_terminate = on_poll_terminate self._max_subscriber_queue = max_subscriber_queue self._ack_timeout = ack_timeout + self._cohort_grace_period = cohort_grace_period self._now = _now self._subscribers: dict[int, asyncio.Queue] = {} # Subscribers already force-failed (queue overflow or ack timeout); @@ -685,6 +699,8 @@ async def _poll(self) -> None: producer: SharedStreamProducer | None = None terminal_exc: BaseException | None = None try: + if self._cohort_grace_period > 0: + await asyncio.sleep(self._cohort_grace_period) if ack_required: # A factory failure flows through the terminal broadcast # path below, like any other poll failure. @@ -1161,6 +1177,17 @@ class SharedStreamManager: The manager is single-event-loop and not thread-safe. The triggerer's ``TriggerRunner`` is its sole owner. + + :param log: Bound logger; defaults to the module logger. + :param max_subscriber_queue: Per-subscriber buffer size. A subscriber whose queue is full + is force-failed rather than blocking the poll loop. + :param ack_timeout: Per-event ack timeout in seconds. A subscriber that has not finished + processing an event within this window is force-failed via :class:`AckTimeout`. + :param cohort_grace_period: Seconds to delay the start of polling after a new group is + created. When > 0, the poll loop sleeps for this duration before calling + ``open_stream``/``open_shared_stream``, giving triggers that share the same key a window + to subscribe before any event is broadcast — useful on triggerer restart where concurrent + re-subscriptions would otherwise race against the first poll. Default 0 (no delay). """ def __init__( @@ -1169,11 +1196,13 @@ def __init__( log: BoundLogger | None = None, max_subscriber_queue: int = DEFAULT_SUBSCRIBER_QUEUE_MAX, ack_timeout: float = DEFAULT_ACK_TIMEOUT, + cohort_grace_period: float = 0.0, _now: Callable[[], float] = time.monotonic, ) -> None: self.log = log or structlog.get_logger(__name__) self._max_subscriber_queue = max_subscriber_queue self._ack_timeout = ack_timeout + self._cohort_grace_period = cohort_grace_period self._now = _now self._groups: dict[Hashable, _SharedStreamGroup] = {} # Allocator for trigger-event persist-confirmation seqs; unique per @@ -1205,6 +1234,7 @@ def subscribe( on_poll_terminate=self._handle_poll_terminate, max_subscriber_queue=self._max_subscriber_queue, ack_timeout=self._ack_timeout, + cohort_grace_period=self._cohort_grace_period, log=self.log, _now=self._now, ) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 9332c2a2342be..4af2b982036f8 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -991,6 +991,12 @@ def test_shared_stream_ack_timeout_config_wiring(self) -> None: trigger_runner = TriggerRunner() assert trigger_runner._shared_streams._ack_timeout == 60.0 + @conf_vars({("triggerer", "shared_stream_cohort_grace_period"): "3.0"}) + def test_shared_stream_cohort_grace_period_config_wiring(self) -> None: + """[triggerer] shared_stream_cohort_grace_period is wired into SharedStreamManager.""" + trigger_runner = TriggerRunner() + assert trigger_runner._shared_streams._cohort_grace_period == 3.0 + @pytest.mark.asyncio async def test_block_watchdog_does_not_log_when_threshold_is_not_exceeded(self) -> None: with conf_vars({("triggerer", "blocked_main_thread_warning_threshold"): "0.5"}): diff --git a/airflow-core/tests/unit/triggers/test_shared_stream.py b/airflow-core/tests/unit/triggers/test_shared_stream.py index 76baa8bff6141..2c4c9bae5a1c8 100644 --- a/airflow-core/tests/unit/triggers/test_shared_stream.py +++ b/airflow-core/tests/unit/triggers/test_shared_stream.py @@ -1243,6 +1243,81 @@ async def filter_shared_stream(self, shared_stream): await manager.unsubscribe(2, key) +@pytest.mark.asyncio +async def test_cohort_grace_period_includes_late_joining_subscriber(): + """Subscribers that join during the cohort grace period are in the first event's snapshot.""" + first_event_processed = asyncio.Event() + + class _GraceProducer(SharedStreamProducer): + async def open_stream(self): + yield {"msg": "first"}, "p1" + # Fires after _poll has processed the yield and looped back for the next item. + first_event_processed.set() + await asyncio.Event().wait() + + async def advance(self, batch): + pass + + class _GraceTrigger(_ProgrammableSharedStreamTrigger): + @classmethod + def create_shared_stream_producer(cls, kwargs): + return _GraceProducer() + + async def filter_shared_stream(self, shared_stream): + async for _ in shared_stream: + yield TriggerEvent({}) + + t1, t2 = _GraceTrigger(), _GraceTrigger() + key = t1.shared_stream_key() + manager = SharedStreamManager(cohort_grace_period=0.05) + try: + manager.subscribe(trigger_id=1, trigger=t1, key=key) + # Poll is sleeping through the grace period; t2 joins before polling starts. + manager.subscribe(trigger_id=2, trigger=t2, key=key) + + await asyncio.wait_for(first_event_processed.wait(), timeout=1.0) + + entry = next(iter(manager._groups[key]._outstanding.values()), None) + assert entry is not None + assert entry.pending == {1, 2}, "both subscribers must be in the initial cohort snapshot" + finally: + await manager.unsubscribe(1, key) + await manager.unsubscribe(2, key) + + +@pytest.mark.asyncio +async def test_cohort_grace_period_zero_starts_poll_immediately(): + """With cohort_grace_period=0 (default), polling starts without any delay.""" + poll_reached = asyncio.Event() + + class _ImmediateProducer(SharedStreamProducer): + async def open_stream(self): + poll_reached.set() + await asyncio.Event().wait() + yield {}, "p" # pragma: no cover + + async def advance(self, batch): + pass + + class _ImmediateTrigger(_ProgrammableSharedStreamTrigger): + @classmethod + def create_shared_stream_producer(cls, kwargs): + return _ImmediateProducer() + + async def filter_shared_stream(self, shared_stream): + async for _ in shared_stream: + yield TriggerEvent({}) # pragma: no cover + + t = _ImmediateTrigger() + key = t.shared_stream_key() + manager = SharedStreamManager(cohort_grace_period=0.0) + try: + manager.subscribe(trigger_id=1, trigger=t, key=key) + await asyncio.wait_for(poll_reached.wait(), timeout=0.5) + finally: + await manager.unsubscribe(1, key) + + @pytest.mark.asyncio async def test_subscriber_unsubscribe_during_outstanding_event(): """When a subscriber leaves while still on an event, the group advances without it."""