From a22d900055b7cb07263e8f007b2a126ea13f644a Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 26 Feb 2026 22:23:08 +0900 Subject: [PATCH 1/2] feat: make temporal partition_mapper timezone aware --- .../src/airflow/partition_mappers/temporal.py | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index 9c86bace56b42..d7187173d5062 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -18,10 +18,14 @@ from abc import ABC, abstractmethod from datetime import datetime, timedelta -from typing import Any +from typing import TYPE_CHECKING, Any +from airflow._shared.timezones.timezone import parse_timezone from airflow.partition_mappers.base import PartitionMapper +if TYPE_CHECKING: + from pendulum import FixedTimezone, Timezone + class _BaseTemporalMapper(PartitionMapper, ABC): """Base class for Temporal Partition Mappers.""" @@ -30,11 +34,16 @@ class _BaseTemporalMapper(PartitionMapper, ABC): def __init__( self, - input_format: str = "%Y-%m-%dT%H:%M:%S", + *, + timezone: str | Timezone | FixedTimezone, + input_format: str = "%Y-%m-%dT%H:%M:%S%z", output_format: str | None = None, ): self.input_format = input_format self.output_format = output_format or self.default_output_format + if isinstance(timezone, str): + timezone = parse_timezone(timezone) + self._timezone = timezone def to_downstream(self, key: str) -> str: dt = datetime.strptime(key, self.input_format) @@ -50,7 +59,10 @@ def format(self, dt: datetime) -> str: return dt.strftime(self.output_format) def serialize(self) -> dict[str, Any]: + from airflow.serialization.encoders import encode_timezone + return { + "timezone": encode_timezone(self._timezone), "input_format": self.input_format, "output_format": self.output_format, } @@ -58,6 +70,7 @@ def serialize(self) -> dict[str, Any]: @classmethod def deserialize(cls, data: dict[str, Any]) -> PartitionMapper: return cls( + timezone=parse_timezone(data["timezone"]), input_format=data["input_format"], output_format=data["output_format"], ) @@ -66,7 +79,7 @@ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper: class StartOfHourMapper(_BaseTemporalMapper): """Map a time-based partition key to hour.""" - default_output_format = "%Y-%m-%dT%H" + default_output_format = "%Y-%m-%dT%H%z" def normalize(self, dt: datetime) -> datetime: return dt.replace(minute=0, second=0, microsecond=0) @@ -75,7 +88,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfDayMapper(_BaseTemporalMapper): """Map a time-based partition key to day.""" - default_output_format = "%Y-%m-%d" + default_output_format = "%Y-%m-%d%z" def normalize(self, dt: datetime) -> datetime: return dt.replace(hour=0, minute=0, second=0, microsecond=0) @@ -84,7 +97,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfWeekMapper(_BaseTemporalMapper): """Map a time-based partition key to week.""" - default_output_format = "%Y-%m-%d (W%V)" + default_output_format = "%Y-%m-%d (W%V)%z" def normalize(self, dt: datetime) -> datetime: start = dt - timedelta(days=dt.weekday()) @@ -94,7 +107,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfMonthMapper(_BaseTemporalMapper): """Map a time-based partition key to month.""" - default_output_format = "%Y-%m" + default_output_format = "%Y-%m%z" def normalize(self, dt: datetime) -> datetime: return dt.replace( @@ -109,7 +122,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfQuarterMapper(_BaseTemporalMapper): """Map a time-based partition key to quarter.""" - default_output_format = "%Y-Q{quarter}" + default_output_format = "%Y-Q{quarter}%z" def normalize(self, dt: datetime) -> datetime: quarter = (dt.month - 1) // 3 @@ -131,7 +144,7 @@ def format(self, dt: datetime) -> str: class StartOfYearMapper(_BaseTemporalMapper): """Map a time-based partition key to year.""" - default_output_format = "%Y" + default_output_format = "%Y%z" def normalize(self, dt: datetime) -> datetime: return dt.replace( From db5fbd9c1400465033b1cd769c760181258d2111 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 27 Mar 2026 15:54:54 +0800 Subject: [PATCH 2/2] fixup! feat: make temporal partition_mapper timezone aware --- .../src/airflow/partition_mappers/temporal.py | 22 ++++++------ .../unit/partition_mappers/test_temporal.py | 35 +++++++++++++++++-- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index d7187173d5062..d1a4c7c904868 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -20,7 +20,7 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any -from airflow._shared.timezones.timezone import parse_timezone +from airflow._shared.timezones.timezone import make_aware, parse_timezone from airflow.partition_mappers.base import PartitionMapper if TYPE_CHECKING: @@ -35,8 +35,8 @@ class _BaseTemporalMapper(PartitionMapper, ABC): def __init__( self, *, - timezone: str | Timezone | FixedTimezone, - input_format: str = "%Y-%m-%dT%H:%M:%S%z", + timezone: str | Timezone | FixedTimezone = "UTC", + input_format: str = "%Y-%m-%dT%H:%M:%S", output_format: str | None = None, ): self.input_format = input_format @@ -47,6 +47,8 @@ def __init__( def to_downstream(self, key: str) -> str: dt = datetime.strptime(key, self.input_format) + if dt.tzinfo is None: + dt = make_aware(dt, self._timezone) normalized = self.normalize(dt) return self.format(normalized) @@ -70,7 +72,7 @@ def serialize(self) -> dict[str, Any]: @classmethod def deserialize(cls, data: dict[str, Any]) -> PartitionMapper: return cls( - timezone=parse_timezone(data["timezone"]), + timezone=parse_timezone(data.get("timezone", "UTC")), input_format=data["input_format"], output_format=data["output_format"], ) @@ -79,7 +81,7 @@ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper: class StartOfHourMapper(_BaseTemporalMapper): """Map a time-based partition key to hour.""" - default_output_format = "%Y-%m-%dT%H%z" + default_output_format = "%Y-%m-%dT%H" def normalize(self, dt: datetime) -> datetime: return dt.replace(minute=0, second=0, microsecond=0) @@ -88,7 +90,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfDayMapper(_BaseTemporalMapper): """Map a time-based partition key to day.""" - default_output_format = "%Y-%m-%d%z" + default_output_format = "%Y-%m-%d" def normalize(self, dt: datetime) -> datetime: return dt.replace(hour=0, minute=0, second=0, microsecond=0) @@ -97,7 +99,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfWeekMapper(_BaseTemporalMapper): """Map a time-based partition key to week.""" - default_output_format = "%Y-%m-%d (W%V)%z" + default_output_format = "%Y-%m-%d (W%V)" def normalize(self, dt: datetime) -> datetime: start = dt - timedelta(days=dt.weekday()) @@ -107,7 +109,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfMonthMapper(_BaseTemporalMapper): """Map a time-based partition key to month.""" - default_output_format = "%Y-%m%z" + default_output_format = "%Y-%m" def normalize(self, dt: datetime) -> datetime: return dt.replace( @@ -122,7 +124,7 @@ def normalize(self, dt: datetime) -> datetime: class StartOfQuarterMapper(_BaseTemporalMapper): """Map a time-based partition key to quarter.""" - default_output_format = "%Y-Q{quarter}%z" + default_output_format = "%Y-Q{quarter}" def normalize(self, dt: datetime) -> datetime: quarter = (dt.month - 1) // 3 @@ -144,7 +146,7 @@ def format(self, dt: datetime) -> str: class StartOfYearMapper(_BaseTemporalMapper): """Map a time-based partition key to year.""" - default_output_format = "%Y%z" + default_output_format = "%Y" def normalize(self, dt: datetime) -> datetime: return dt.replace( diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py b/airflow-core/tests/unit/partition_mappers/test_temporal.py index 89ad98cebbccc..c6b5b1d760d71 100644 --- a/airflow-core/tests/unit/partition_mappers/test_temporal.py +++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py @@ -46,7 +46,7 @@ def test_to_downstream( mapper_cls: type[_BaseTemporalMapper], expected_downstream_key: str, ): - pm = mapper_cls() + pm = mapper_cls(timezone="UTC") assert pm.to_downstream("2026-02-10T14:30:45") == expected_downstream_key @pytest.mark.parametrize( @@ -61,8 +61,9 @@ def test_to_downstream( ], ) def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_format: str): - pm = mapper_cls() + pm = mapper_cls(timezone="UTC") assert pm.serialize() == { + "timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", "output_format": expected_outut_format, } @@ -81,6 +82,7 @@ def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_f def test_deserialize(self, mapper_cls): pm = mapper_cls.deserialize( { + "timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "customized-format", } @@ -88,3 +90,32 @@ def test_deserialize(self, mapper_cls): assert isinstance(pm, mapper_cls) assert pm.input_format == "%Y-%m-%dT%H:%M:%S" assert pm.output_format == "customized-format" + + @pytest.mark.parametrize( + "mapper_cls", + [ + StartOfHourMapper, + StartOfDayMapper, + StartOfWeekMapper, + StartOfMonthMapper, + StartOfQuarterMapper, + StartOfYearMapper, + ], + ) + def test_deserialize_legacy_no_timezone(self, mapper_cls): + """Deserializing data without a timezone key defaults to UTC.""" + pm = mapper_cls.deserialize( + { + "input_format": "%Y-%m-%dT%H:%M:%S", + "output_format": "customized-format", + } + ) + assert isinstance(pm, mapper_cls) + + def test_to_downstream_timezone_aware(self): + """Input is interpreted as local time in the given timezone.""" + pm = StartOfDayMapper(timezone="America/New_York") + # 2026-02-10T23:00:00 in New York local time → start-of-day is 2026-02-10 + assert pm.to_downstream("2026-02-10T23:00:00") == "2026-02-10" + # 2026-02-11T01:00:00 in New York local time → start-of-day is 2026-02-11 + assert pm.to_downstream("2026-02-11T01:00:00") == "2026-02-11"