diff --git a/airflow-core/docs/authoring-and-scheduling/timetable.rst b/airflow-core/docs/authoring-and-scheduling/timetable.rst index 28957370a47ae..72f98c43d0128 100644 --- a/airflow-core/docs/authoring-and-scheduling/timetable.rst +++ b/airflow-core/docs/authoring-and-scheduling/timetable.rst @@ -151,6 +151,63 @@ must be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta` def example_dag(): pass +.. versionadded:: 3.0.0 + The ``run_immediately`` argument was introduced in Airflow 3. + +The optional ``run_immediately`` argument controls which cron point is scheduled when a Dag is first +enabled or re-enabled after a pause. It has no effect when ``catchup=True`` or when prior Dag runs +already exist (in those cases the scheduler always continues from where it left off). + +* ``run_immediately=True`` *(default)* — schedule the **most recent past** cron point immediately. +* ``run_immediately=False`` — skip the past cron point and wait for the **next future** cron point. +* ``run_immediately=timedelta(...)`` — schedule the most recent past cron point only if it fired + within the given window; otherwise wait for the **next future** cron point. + +.. code-block:: python + + from datetime import timedelta + + from airflow.timetables.trigger import CronTriggerTimetable + + + @dag( + # Runs every 10 minutes. + # run_immediately=False: on first enable, skip any past slot that is more than + # 5 minutes old (the minimum buffer) and wait for the next 10-minute boundary. + schedule=CronTriggerTimetable( + "*/10 * * * *", + timezone="UTC", + run_immediately=False, + ), + start_date=datetime(2024, 1, 1), + catchup=False, + ..., + ) + def example_dag(): + pass + + + @dag( + # Runs hourly. + # run_immediately=timedelta(minutes=10): on first enable, run the most recent + # past slot only if it fired within the last 10 minutes; otherwise wait for next. + schedule=CronTriggerTimetable( + "0 * * * *", + timezone="UTC", + run_immediately=timedelta(minutes=10), + ), + start_date=datetime(2024, 1, 1), + catchup=False, + ..., + ) + def example_dag_with_buffer(): + pass + +.. note:: + + ``run_immediately`` is a parameter of ``CronTriggerTimetable``, **not** of the ``DAG`` + constructor. Passing it directly to ``DAG(run_immediately=...)`` has no effect. + .. _MultipleCronTriggerTimetable: @@ -169,7 +226,7 @@ This is similar to CronTriggerTimetable_ except it takes multiple cron expressio def example_dag(): pass -The same optional ``interval`` argument as CronTriggerTimetable_ is also available. +The same optional ``interval`` and ``run_immediately`` arguments as CronTriggerTimetable_ are also available. .. code-block:: python @@ -350,10 +407,11 @@ The following is another example showing the difference in the case of skipping Suppose there are two running Dags with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the Dags at 3PM on January 31st and re-enable them at 3PM on February 2nd, -- `CronTriggerTimetable`_ skips the Dag runs that were supposed to trigger on February 1st and 2nd. The next Dag run will be triggered at 12AM on February 3rd. -- `CronDataIntervalTimetable`_ skips the Dag runs that were supposed to trigger on February 1st only. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. +- Both `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ skip the Dag run that was supposed to trigger on February 1st. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. + +The difference between the two timetables in this scenario is the ``run_id`` timestamp: for ``CronTriggerTimetable``, the ``run_id`` reflects midnight on February 2nd (the trigger time), while for ``CronDataIntervalTimetable``, the ``run_id`` reflects midnight on February 1st (the start of the data interval being processed). -In these examples, you see how a trigger timetable creates Dag runs more intuitively and similar to what +In the first example (enabling a new Dag), you see how a trigger timetable creates Dag runs more intuitively and similar to what people expect a workflow to behave, while a data interval timetable is designed heavily around the data interval it processes, and does not reflect a workflow's own properties. diff --git a/airflow-core/src/airflow/timetables/trigger.py b/airflow-core/src/airflow/timetables/trigger.py index d530d5b898a82..87a0a3a5f1caa 100644 --- a/airflow-core/src/airflow/timetables/trigger.py +++ b/airflow-core/src/airflow/timetables/trigger.py @@ -87,12 +87,15 @@ def next_dagrun_info( else: next_start_time = self._align_to_next(restriction.earliest) else: - start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] if last_automated_data_interval is not None: - start_time_candidates.append(self._get_next(last_automated_data_interval.end)) - elif restriction.earliest is None: - # Run immediately has no effect if there is restriction on earliest - start_time_candidates.append(self._calc_first_run()) + # After pause/resume: always pick the most recent past boundary + start_time_candidates = [ + self._align_to_prev(coerce_datetime(utcnow())), + self._get_next(last_automated_data_interval.end), + ] + else: + # First run: respect run_immediately regardless of whether start_date is set + start_time_candidates = [self._calc_first_run()] if restriction.earliest is not None: start_time_candidates.append(self._align_to_next(restriction.earliest)) next_start_time = max(start_time_candidates) @@ -165,19 +168,17 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable): :param timezone: Which timezone to use to interpret the cron string :param interval: timedelta that defines the data interval start. Default 0. - *run_immediately* controls, if no *start_time* is given to the DAG, when - the first run of the DAG should be scheduled. It has no effect if there - already exist runs for this DAG. + *run_immediately* controls which cron point is scheduled when a Dag is + first enabled. It always takes effect regardless of whether ``start_date`` + is set, but has no effect when ``catchup=True`` or when prior Dag runs + already exist. - * If *True*, always run immediately the most recent possible DAG run. - * If *False*, wait to run until the next scheduled time in the future. - * If passed a ``timedelta``, will run the most recent possible DAG run - if that run's ``data_interval_end`` is within timedelta of now. - * If *None*, the timedelta is calculated as 10% of the time between the - most recent past scheduled time and the next scheduled time. E.g. if - running every hour, this would run the previous time if less than 6 - minutes had past since the previous run time, otherwise it would wait - until the next hour. + .. versionadded:: 3.0.0 + + * If *True* (default), always run the most recent past cron point immediately. + * If *False*, skip the past cron point and wait for the next future one. + * If passed a ``timedelta``, run the most recent past cron point only if it + is within that timedelta of now; otherwise wait for the next future one. """ def __init__( @@ -186,7 +187,7 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, ) -> None: super().__init__(cron, timezone) self._interval = interval @@ -215,26 +216,21 @@ def serialize(self) -> dict[str, Any]: def _calc_first_run(self) -> DateTime: """ - If no start_time is set, determine the start. + Determine which cron point to schedule next based on ``run_immediately``. - If True, always prefer past run, if False, never. If None, if within 10% of next run, - if timedelta, if within that timedelta from past run. + If *True*, always run the most recent past cron point. + If *False*, always wait for the next future cron point. + If a ``timedelta``, run the most recent past cron point only if it falls + within that window of now; otherwise wait for the next future cron point. """ now = coerce_datetime(utcnow()) past_run_time = self._align_to_prev(now) - next_run_time = self._align_to_next(now) if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true. return past_run_time - - gap_between_runs = next_run_time - past_run_time - gap_to_past = now - past_run_time if isinstance(self._run_immediately, datetime.timedelta): - buffer_between_runs = self._run_immediately - else: - buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5)) - if gap_to_past <= buffer_between_runs: - return past_run_time - return next_run_time + if now - past_run_time <= self._run_immediately: + return past_run_time + return self._align_to_next(now) class MultipleCronTriggerTimetable(Timetable): @@ -253,7 +249,7 @@ def __init__( *crons: str, timezone: str | Timezone | FixedTimezone, interval: datetime.timedelta | relativedelta = datetime.timedelta(), - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, ) -> None: if not crons: raise ValueError("cron expression required") @@ -399,7 +395,7 @@ def __init__( *, timezone: str | Timezone | FixedTimezone, run_offset: int | datetime.timedelta | relativedelta | None = None, - run_immediately: bool | datetime.timedelta = False, + run_immediately: bool | datetime.timedelta = True, # todo: AIP-76 we can't infer partition date from this, so we need to store it separately. key_format: str = r"%Y-%m-%dT%H:%M:%S", ) -> None: @@ -485,15 +481,15 @@ def next_dagrun_info_v2( else: next_start_time = self._align_to_next(restriction.earliest) else: - prev_candidate = self._align_to_prev(coerce_datetime(utcnow())) - start_time_candidates = [prev_candidate] if last_dagrun_info is not None: - next_candidate = self._get_next(last_dagrun_info.run_after) - start_time_candidates.append(next_candidate) - elif restriction.earliest is None: - # Run immediately has no effect if there is restriction on earliest - first_run = self._calc_first_run() - start_time_candidates.append(first_run) + # After pause/resume: always pick the most recent past boundary + start_time_candidates = [ + self._align_to_prev(coerce_datetime(utcnow())), + self._get_next(last_dagrun_info.run_after), + ] + else: + # First run: respect run_immediately regardless of whether start_date is set + start_time_candidates = [self._calc_first_run()] if restriction.earliest is not None: earliest = self._align_to_next(restriction.earliest) start_time_candidates.append(earliest) diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py b/airflow-core/tests/unit/timetables/test_trigger_timetable.py index 114080685ca5a..8ef0479d30abb 100644 --- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py +++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py @@ -59,7 +59,7 @@ [ pytest.param( None, - YESTERDAY + DELTA_FROM_MIDNIGHT, + CURRENT_TIME + DELTA_FROM_MIDNIGHT, id="first-run", ), pytest.param( @@ -79,11 +79,11 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule( last_automated_data_interval: DataInterval | None, next_start_time: pendulum.DateTime, ) -> None: - """If ``catchup=False`` and start_date is a day before""" + """If ``catchup=False`` and start_date is a day before, run_immediately=False skips the past run""" timetable = CronTriggerTimetable( "30 16 * * *", timezone=utc, - run_immediately=False, # Should have no effect since earliest is not None + run_immediately=False, ) next_info = timetable.next_dagrun_info( last_automated_data_interval=last_automated_data_interval, @@ -480,7 +480,8 @@ def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: dic ("run_immediately", "current_time", "correct_interval"), [ (True, WAY_AFTER, PREVIOUS), - (False, JUST_AFTER, PREVIOUS), + (True, JUST_AFTER, PREVIOUS), + (False, JUST_AFTER, NEXT), (False, WAY_AFTER, NEXT), (datetime.timedelta(minutes=10), JUST_AFTER, PREVIOUS), (datetime.timedelta(minutes=10), WAY_AFTER, NEXT), @@ -501,18 +502,20 @@ def test_run_immediately(catchup, run_immediately, current_time, correct_interva @pytest.mark.parametrize("catchup", [True, False]) -def test_run_immediately_fast_dag(catchup): +def test_run_immediately_false_skips_to_next(catchup): + """With run_immediately=False, always skip to the next cron point regardless of how close now is.""" timetable = CronTriggerTimetable( - "*/10 3 * * *", # Runs every 10 minutes, so falls back to 5 min hardcoded limit on buffer time + "*/10 3 * * *", timezone=utc, run_immediately=False, ) + next_10min = DagRunInfo.exact(pendulum.datetime(year=2024, month=8, day=15, hour=3, minute=10)) with time_machine.travel(JUST_AFTER, tick=False): next_info = timetable.next_dagrun_info( last_automated_data_interval=None, restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup), ) - assert next_info == PREVIOUS + assert next_info == next_10min @pytest.mark.parametrize( @@ -593,7 +596,7 @@ def test_multi_serialization(): "expressions": ["@every 30s", "*/2 * * * *"], "timezone": "UTC", "interval": 600.0, - "run_immediately": False, + "run_immediately": True, } tt = MultipleCronTriggerTimetable.deserialize(data) @@ -603,7 +606,7 @@ def test_multi_serialization(): assert tt._timetables[1]._expression == "*/2 * * * *" assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc assert tt._timetables[0]._interval == tt._timetables[1]._interval == datetime.timedelta(minutes=10) - assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is False + assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is True @pytest.mark.db_test @@ -818,6 +821,80 @@ def test_next_run_info_from_dag_model(schedule, partition_key, expected, dag_mak assert info == expected +def test_run_immediately_false_with_start_date(): + """run_immediately=False should be respected even when start_date (earliest) is set. + + With run_immediately=False, the timetable always skips the past cron point + and waits for the next future one — regardless of how recently the boundary fired. + """ + timetable = CronTriggerTimetable( + "*/10 * * * *", + timezone=utc, + run_immediately=False, + ) + # 4 minutes past the 7:10 boundary — should still skip to 7:20 + current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 20, tz=utc)) + + +def test_run_immediately_true_with_start_date(): + """run_immediately=True (default) runs the most recent past cron point even when start_date is set.""" + timetable = CronTriggerTimetable( + "*/10 * * * *", + timezone=utc, + run_immediately=True, + ) + # Unpause at 7:14 — should run 7:10 immediately + current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 10, tz=utc)) + + +def test_run_immediately_false_after_unpause(): + """After a pause/resume with prior runs, the most recent past boundary is always scheduled. + + run_immediately only affects the very first run (when no prior runs exist). + After a pause, both CronTriggerTimetable and CronDataIntervalTimetable skip + all missed runs except the most recent one, which is triggered immediately. + """ + timetable = CronTriggerTimetable( + "0 0 * * *", # @daily + timezone=utc, + run_immediately=False, + ) + # Last run was Jan 31 midnight, unpause at 3PM Feb 2 + last_interval = DataInterval.exact(pendulum.datetime(2024, 1, 31, tz=utc)) + current_time = pendulum.datetime(2024, 2, 2, 15, tz=utc) + with time_machine.travel(current_time): + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_interval, + restriction=TimeRestriction( + earliest=pendulum.datetime(2024, 1, 1, tz=utc), + latest=None, + catchup=False, + ), + ) + # Feb 1 is skipped; Feb 2 midnight (most recent past boundary) is triggered immediately + assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 2, 2, tz=utc)) + + def test_generate_run_id_without_partition_key() -> None: """ Tests the generate_run_id method of CronPartitionTimetable.