Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions airflow-core/docs/authoring-and-scheduling/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,63 @@ must be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta`
def example_dag():
pass

.. versionadded:: 3.0.0
The ``run_immediately`` argument was introduced in Airflow 3.

The optional ``run_immediately`` argument controls which cron point is scheduled when a Dag is first
enabled or re-enabled after a pause. It has no effect when ``catchup=True`` or when prior Dag runs
already exist (in those cases the scheduler always continues from where it left off).

* ``run_immediately=True`` *(default)* — schedule the **most recent past** cron point immediately.
* ``run_immediately=False`` — skip the past cron point and wait for the **next future** cron point.
* ``run_immediately=timedelta(...)`` — schedule the most recent past cron point only if it fired
within the given window; otherwise wait for the **next future** cron point.

.. code-block:: python

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
# Runs every 10 minutes.
# run_immediately=False: on first enable, skip any past slot that is more than
# 5 minutes old (the minimum buffer) and wait for the next 10-minute boundary.
schedule=CronTriggerTimetable(
"*/10 * * * *",
timezone="UTC",
run_immediately=False,
),
start_date=datetime(2024, 1, 1),
catchup=False,
...,
)
def example_dag():
pass


@dag(
# Runs hourly.
# run_immediately=timedelta(minutes=10): on first enable, run the most recent
# past slot only if it fired within the last 10 minutes; otherwise wait for next.
schedule=CronTriggerTimetable(
"0 * * * *",
timezone="UTC",
run_immediately=timedelta(minutes=10),
),
start_date=datetime(2024, 1, 1),
catchup=False,
...,
)
def example_dag_with_buffer():
pass

.. note::

``run_immediately`` is a parameter of ``CronTriggerTimetable``, **not** of the ``DAG``
constructor. Passing it directly to ``DAG(run_immediately=...)`` has no effect.


.. _MultipleCronTriggerTimetable:

Expand All @@ -169,7 +226,7 @@ This is similar to CronTriggerTimetable_ except it takes multiple cron expressio
def example_dag():
pass

The same optional ``interval`` argument as CronTriggerTimetable_ is also available.
The same optional ``interval`` and ``run_immediately`` arguments as CronTriggerTimetable_ are also available.

.. code-block:: python

Expand Down Expand Up @@ -350,10 +407,11 @@ The following is another example showing the difference in the case of skipping

Suppose there are two running Dags with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the Dags at 3PM on January 31st and re-enable them at 3PM on February 2nd,

- `CronTriggerTimetable`_ skips the Dag runs that were supposed to trigger on February 1st and 2nd. The next Dag run will be triggered at 12AM on February 3rd.
- `CronDataIntervalTimetable`_ skips the Dag runs that were supposed to trigger on February 1st only. A Dag run for February 2nd is immediately triggered after you re-enable the Dag.
- Both `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ skip the Dag run that was supposed to trigger on February 1st. A Dag run for February 2nd is immediately triggered after you re-enable the Dag.

The difference between the two timetables in this scenario is the ``run_id`` timestamp: for ``CronTriggerTimetable``, the ``run_id`` reflects midnight on February 2nd (the trigger time), while for ``CronDataIntervalTimetable``, the ``run_id`` reflects midnight on February 1st (the start of the data interval being processed).

In these examples, you see how a trigger timetable creates Dag runs more intuitively and similar to what
In the first example (enabling a new Dag), you see how a trigger timetable creates Dag runs more intuitively and similar to what
people expect a workflow to behave, while a data interval timetable is designed heavily around the data
interval it processes, and does not reflect a workflow's own properties.

Expand Down
78 changes: 37 additions & 41 deletions airflow-core/src/airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ def next_dagrun_info(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
start_time_candidates.append(self._calc_first_run())
# After pause/resume: always pick the most recent past boundary
start_time_candidates = [
self._align_to_prev(coerce_datetime(utcnow())),
self._get_next(last_automated_data_interval.end),
]
else:
# First run: respect run_immediately regardless of whether start_date is set
start_time_candidates = [self._calc_first_run()]
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)
Expand Down Expand Up @@ -165,19 +168,17 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable):
:param timezone: Which timezone to use to interpret the cron string
:param interval: timedelta that defines the data interval start. Default 0.

*run_immediately* controls, if no *start_time* is given to the DAG, when
the first run of the DAG should be scheduled. It has no effect if there
already exist runs for this DAG.
*run_immediately* controls which cron point is scheduled when a Dag is
first enabled. It always takes effect regardless of whether ``start_date``
is set, but has no effect when ``catchup=True`` or when prior Dag runs
already exist.

* If *True*, always run immediately the most recent possible DAG run.
* If *False*, wait to run until the next scheduled time in the future.
* If passed a ``timedelta``, will run the most recent possible DAG run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
.. versionadded:: 3.0.0

* If *True* (default), always run the most recent past cron point immediately.
* If *False*, skip the past cron point and wait for the next future one.
* If passed a ``timedelta``, run the most recent past cron point only if it
is within that timedelta of now; otherwise wait for the next future one.
"""

def __init__(
Expand All @@ -186,7 +187,7 @@ def __init__(
*,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
) -> None:
super().__init__(cron, timezone)
self._interval = interval
Expand Down Expand Up @@ -215,26 +216,21 @@ def serialize(self) -> dict[str, Any]:

def _calc_first_run(self) -> DateTime:
"""
If no start_time is set, determine the start.
Determine which cron point to schedule next based on ``run_immediately``.

If True, always prefer past run, if False, never. If None, if within 10% of next run,
if timedelta, if within that timedelta from past run.
If *True*, always run the most recent past cron point.
If *False*, always wait for the next future cron point.
If a ``timedelta``, run the most recent past cron point only if it falls
within that window of now; otherwise wait for the next future cron point.
"""
now = coerce_datetime(utcnow())
past_run_time = self._align_to_prev(now)
next_run_time = self._align_to_next(now)
if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true.
return past_run_time

gap_between_runs = next_run_time - past_run_time
gap_to_past = now - past_run_time
if isinstance(self._run_immediately, datetime.timedelta):
buffer_between_runs = self._run_immediately
else:
buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5))
if gap_to_past <= buffer_between_runs:
return past_run_time
return next_run_time
if now - past_run_time <= self._run_immediately:
return past_run_time
return self._align_to_next(now)


class MultipleCronTriggerTimetable(Timetable):
Expand All @@ -253,7 +249,7 @@ def __init__(
*crons: str,
timezone: str | Timezone | FixedTimezone,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
) -> None:
if not crons:
raise ValueError("cron expression required")
Expand Down Expand Up @@ -399,7 +395,7 @@ def __init__(
*,
timezone: str | Timezone | FixedTimezone,
run_offset: int | datetime.timedelta | relativedelta | None = None,
run_immediately: bool | datetime.timedelta = False,
run_immediately: bool | datetime.timedelta = True,
# todo: AIP-76 we can't infer partition date from this, so we need to store it separately.
key_format: str = r"%Y-%m-%dT%H:%M:%S",
) -> None:
Expand Down Expand Up @@ -485,15 +481,15 @@ def next_dagrun_info_v2(
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
start_time_candidates = [prev_candidate]
if last_dagrun_info is not None:
next_candidate = self._get_next(last_dagrun_info.run_after)
start_time_candidates.append(next_candidate)
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
first_run = self._calc_first_run()
start_time_candidates.append(first_run)
# After pause/resume: always pick the most recent past boundary
start_time_candidates = [
self._align_to_prev(coerce_datetime(utcnow())),
self._get_next(last_dagrun_info.run_after),
]
else:
# First run: respect run_immediately regardless of whether start_date is set
start_time_candidates = [self._calc_first_run()]
if restriction.earliest is not None:
earliest = self._align_to_next(restriction.earliest)
start_time_candidates.append(earliest)
Expand Down
95 changes: 86 additions & 9 deletions airflow-core/tests/unit/timetables/test_trigger_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
[
pytest.param(
None,
YESTERDAY + DELTA_FROM_MIDNIGHT,
CURRENT_TIME + DELTA_FROM_MIDNIGHT,
id="first-run",
),
pytest.param(
Expand All @@ -79,11 +79,11 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
last_automated_data_interval: DataInterval | None,
next_start_time: pendulum.DateTime,
) -> None:
"""If ``catchup=False`` and start_date is a day before"""
"""If ``catchup=False`` and start_date is a day before, run_immediately=False skips the past run"""
timetable = CronTriggerTimetable(
"30 16 * * *",
timezone=utc,
run_immediately=False, # Should have no effect since earliest is not None
run_immediately=False,
)
next_info = timetable.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
Expand Down Expand Up @@ -480,7 +480,8 @@ def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: dic
("run_immediately", "current_time", "correct_interval"),
[
(True, WAY_AFTER, PREVIOUS),
(False, JUST_AFTER, PREVIOUS),
(True, JUST_AFTER, PREVIOUS),
(False, JUST_AFTER, NEXT),
(False, WAY_AFTER, NEXT),
(datetime.timedelta(minutes=10), JUST_AFTER, PREVIOUS),
(datetime.timedelta(minutes=10), WAY_AFTER, NEXT),
Expand All @@ -501,18 +502,20 @@ def test_run_immediately(catchup, run_immediately, current_time, correct_interva


@pytest.mark.parametrize("catchup", [True, False])
def test_run_immediately_fast_dag(catchup):
def test_run_immediately_false_skips_to_next(catchup):
"""With run_immediately=False, always skip to the next cron point regardless of how close now is."""
timetable = CronTriggerTimetable(
"*/10 3 * * *", # Runs every 10 minutes, so falls back to 5 min hardcoded limit on buffer time
"*/10 3 * * *",
timezone=utc,
run_immediately=False,
)
next_10min = DagRunInfo.exact(pendulum.datetime(year=2024, month=8, day=15, hour=3, minute=10))
with time_machine.travel(JUST_AFTER, tick=False):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup),
)
assert next_info == PREVIOUS
assert next_info == next_10min


@pytest.mark.parametrize(
Expand Down Expand Up @@ -593,7 +596,7 @@ def test_multi_serialization():
"expressions": ["@every 30s", "*/2 * * * *"],
"timezone": "UTC",
"interval": 600.0,
"run_immediately": False,
"run_immediately": True,
}

tt = MultipleCronTriggerTimetable.deserialize(data)
Expand All @@ -603,7 +606,7 @@ def test_multi_serialization():
assert tt._timetables[1]._expression == "*/2 * * * *"
assert tt._timetables[0]._timezone == tt._timetables[1]._timezone == utc
assert tt._timetables[0]._interval == tt._timetables[1]._interval == datetime.timedelta(minutes=10)
assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is False
assert tt._timetables[0]._run_immediately == tt._timetables[1]._run_immediately is True


@pytest.mark.db_test
Expand Down Expand Up @@ -818,6 +821,80 @@ def test_next_run_info_from_dag_model(schedule, partition_key, expected, dag_mak
assert info == expected


def test_run_immediately_false_with_start_date():
"""run_immediately=False should be respected even when start_date (earliest) is set.

With run_immediately=False, the timetable always skips the past cron point
and waits for the next future one — regardless of how recently the boundary fired.
"""
timetable = CronTriggerTimetable(
"*/10 * * * *",
timezone=utc,
run_immediately=False,
)
# 4 minutes past the 7:10 boundary — should still skip to 7:20
current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc)
with time_machine.travel(current_time):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(
earliest=pendulum.datetime(2024, 1, 1, tz=utc),
latest=None,
catchup=False,
),
)
assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 20, tz=utc))


def test_run_immediately_true_with_start_date():
"""run_immediately=True (default) runs the most recent past cron point even when start_date is set."""
timetable = CronTriggerTimetable(
"*/10 * * * *",
timezone=utc,
run_immediately=True,
)
# Unpause at 7:14 — should run 7:10 immediately
current_time = pendulum.datetime(2024, 1, 1, 7, 14, tz=utc)
with time_machine.travel(current_time):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(
earliest=pendulum.datetime(2024, 1, 1, tz=utc),
latest=None,
catchup=False,
),
)
assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 1, 1, 7, 10, tz=utc))


def test_run_immediately_false_after_unpause():
"""After a pause/resume with prior runs, the most recent past boundary is always scheduled.

run_immediately only affects the very first run (when no prior runs exist).
After a pause, both CronTriggerTimetable and CronDataIntervalTimetable skip
all missed runs except the most recent one, which is triggered immediately.
"""
timetable = CronTriggerTimetable(
"0 0 * * *", # @daily
timezone=utc,
run_immediately=False,
)
# Last run was Jan 31 midnight, unpause at 3PM Feb 2
last_interval = DataInterval.exact(pendulum.datetime(2024, 1, 31, tz=utc))
current_time = pendulum.datetime(2024, 2, 2, 15, tz=utc)
with time_machine.travel(current_time):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=last_interval,
restriction=TimeRestriction(
earliest=pendulum.datetime(2024, 1, 1, tz=utc),
latest=None,
catchup=False,
),
)
# Feb 1 is skipped; Feb 2 midnight (most recent past boundary) is triggered immediately
assert next_info == DagRunInfo.exact(pendulum.datetime(2024, 2, 2, tz=utc))


def test_generate_run_id_without_partition_key() -> None:
"""
Tests the generate_run_id method of CronPartitionTimetable.
Expand Down
Loading