diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py index cfe4a6100e482..095426eca8018 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/templater.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/templater.py @@ -20,7 +20,9 @@ import datetime import logging import os -from collections.abc import Collection, Iterable, Iterator, Sequence +import re +import zipfile +from collections.abc import Callable, Collection, Iterable, Iterator, Sequence from dataclasses import dataclass from typing import TYPE_CHECKING, Any @@ -38,6 +40,17 @@ from airflow.sdk.types import Operator +# Regex pattern to detect zip file paths: matches "path/to/archive.zip/inner/path" +# Uses [/\\] to support both Unix and Windows path separators since zip internal paths always use "/" +# Case-insensitive to match zipfile.is_zipfile() behavior used by the DAG importer +ZIP_REGEX = re.compile(r"(.*\.zip)(?:[/\\](.*))?$", re.IGNORECASE) + + +def _has_zip_extension(path: str) -> bool: + """Check if a path has a zip file extension (case-insensitive).""" + return path.lower().endswith(".zip") + + @dataclass(frozen=True) class LiteralValue(ResolveMixin): """ @@ -58,6 +71,201 @@ def resolve(self, context: Context) -> Any: log = logging.getLogger(__name__) +# This loader addresses the issue where template files in zipped DAG packages +# could not be resolved by the standard FileSystemLoader. +# See: https://github.com/apache/airflow/issues/59310 +class ZipAwareFileSystemLoader(jinja2.FileSystemLoader): + """ + A Jinja2 template loader that supports resolving templates from zipped DAG packages. + + Search paths may include filesystem directories, zip files, or subdirectories + within zip files. Searchpath ordering is preserved across zip and non-zip entries. + """ + + def __init__( + self, + searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]], + encoding: str = "utf-8", + followlinks: bool = False, + ) -> None: + # Convert to list first to process + if isinstance(searchpath, (str, os.PathLike)): + searchpath = [searchpath] + all_paths = [os.fspath(p) for p in searchpath] + + # Separate zip paths from regular paths at initialization time (once) + # Store zip info by index to preserve searchpath order + self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)} + # Per-path FileSystemLoader for non-zip paths, delegating to Jinja2's own logic + self._fs_loaders: dict[int, jinja2.FileSystemLoader] = {} + regular_paths: list[str] = [] + + for idx, path in enumerate(all_paths): + zip_info = self._parse_zip_path(path) + if zip_info: + self._zip_path_map[idx] = zip_info + else: + regular_paths.append(path) + self._fs_loaders[idx] = jinja2.FileSystemLoader([path], encoding, followlinks) + + # Initialize parent with regular paths only (empty list is OK for our use case) + # We override get_source anyway, so parent's searchpath is only used for list_templates + super().__init__(regular_paths if regular_paths else [], encoding, followlinks) + + # Store all paths for reference and error messages + self._all_searchpaths = all_paths + self.searchpath = all_paths + + @staticmethod + def _parse_zip_path(path: str) -> tuple[str, str] | None: + """ + Parse a path to extract zip archive and internal path components. + + :param path: The path to parse + :return: Tuple of (archive_path, internal_base_path) if path is a zip path, + None otherwise + """ + # Check if the path itself is a zip file (no internal path) + if _has_zip_extension(path) and os.path.isfile(path) and zipfile.is_zipfile(path): + return (path, "") + + # Check for paths inside a zip (e.g., "archive.zip/subdir" or "archive.zip\subdir") + match = ZIP_REGEX.match(path) + if match: + archive, internal = match.groups() + if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive): + return (archive, internal or "") + + return None + + def _read_from_zip(self, archive_path: str, internal_path: str) -> str: + """ + Read a file from inside a zip archive. + + :param archive_path: Path to the zip file + :param internal_path: Path to the file inside the zip + :return: The file contents as a string + :raises TemplateNotFound: If the file doesn't exist in the zip + """ + try: + with zipfile.ZipFile(archive_path, "r") as zf: + # Normalize path separators for zip (always forward slashes) + normalized_path = internal_path.replace(os.sep, "/") + with zf.open(normalized_path) as f: + return f.read().decode(self.encoding) + except KeyError as exc: + raise jinja2.TemplateNotFound(internal_path) from exc + except (OSError, zipfile.BadZipFile) as exc: + raise jinja2.TemplateNotFound( + f"{internal_path} (error reading from {archive_path}: {exc})" + ) from exc + + def _get_source_from_single_zip( + self, archive_path: str, base_internal_path: str, template: str + ) -> tuple[str, str, Callable[[], bool]] | None: + """ + Try to get template source from a single zip archive. + + :param archive_path: Path to the zip file + :param base_internal_path: Base path inside the zip (may be empty) + :param template: The name of the template to load + :return: A tuple of (source, filename, up_to_date_func) if found, None otherwise + """ + import posixpath + + from jinja2.loaders import split_template_path + + pieces = split_template_path(template) + if base_internal_path: + internal_path = posixpath.join(base_internal_path, *pieces) + else: + internal_path = "/".join(pieces) + + try: + source = self._read_from_zip(archive_path, internal_path) + filename = os.path.join(archive_path, internal_path) + + archive_mtime = os.path.getmtime(archive_path) + + def up_to_date(archive: str = archive_path, mtime: float = archive_mtime) -> bool: + try: + return os.path.getmtime(archive) == mtime + except OSError: + return False + + return source, filename, up_to_date + except jinja2.TemplateNotFound: + return None + + def get_source( + self, environment: jinja2.Environment, template: str + ) -> tuple[str, str, Callable[[], bool]]: + """ + Get the template source, filename, and reload helper for a template. + + Searches through searchpaths in order, handling both zip archives and + regular filesystem paths according to their original order. + + :param environment: The Jinja2 environment + :param template: The name of the template to load + :return: A tuple of (source, filename, up_to_date_func) + :raises TemplateNotFound: If the template cannot be found + """ + for idx, _path in enumerate(self._all_searchpaths): + if idx in self._zip_path_map: + archive_path, base_internal_path = self._zip_path_map[idx] + result = self._get_source_from_single_zip(archive_path, base_internal_path, template) + if result: + return result + elif idx in self._fs_loaders: + try: + return self._fs_loaders[idx].get_source(environment, template) + except jinja2.TemplateNotFound: + continue + + # Template not found in any searchpath + raise jinja2.TemplateNotFound( + f"'{template}' not found in search path: {', '.join(repr(p) for p in self._all_searchpaths)}" + ) + + def list_templates(self) -> list[str]: + """ + Return a list of available templates. + + Combines templates from both zip archives and regular filesystem paths. + + :return: A sorted list of template names + """ + found: set[str] = set() + + # Get templates from zip paths + for archive_path, base_internal_path in self._zip_path_map.values(): + try: + with zipfile.ZipFile(archive_path, "r") as zf: + for name in zf.namelist(): + # Skip directories + if name.endswith("/"): + continue + if base_internal_path: + prefix = base_internal_path.replace(os.sep, "/") + "/" + if name.startswith(prefix): + relative = name[len(prefix) :] + found.add(relative) + else: + found.add(name) + except (OSError, zipfile.BadZipFile): + continue + + # Get templates from regular paths via per-path FileSystemLoader + for loader in self._fs_loaders.values(): + try: + found.update(loader.list_templates()) + except OSError: + continue + + return sorted(found) + + class Templater: """ This renders the template fields of object. @@ -391,7 +599,7 @@ def create_template_env( "cache_size": 0, } if searchpath: - jinja_env_options["loader"] = jinja2.FileSystemLoader(searchpath) + jinja_env_options["loader"] = ZipAwareFileSystemLoader(searchpath) if jinja_environment_kwargs: jinja_env_options.update(jinja_environment_kwargs) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index c3f786584746b..fbdcd56fc6088 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -816,15 +816,30 @@ def resolve_template_files(self): if hasattr(t, "resolve_template_files"): t.resolve_template_files() - def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment: - """Build a Jinja2 environment.""" - from airflow.sdk.definitions._internal.templater import create_template_env + def _get_resolved_searchpath(self) -> list[str]: + """ + Return searchpath with relative paths resolved for zipped DAGs. + + For zipped DAGs, relative template_searchpath entries (e.g., ``["templates"]``) + are resolved against the DAG folder (the zip file path). + """ + from airflow.sdk.definitions._internal.templater import _has_zip_extension - # Collect directories to search for template files searchpath = [self.folder] if self.template_searchpath: - searchpath += self.template_searchpath + is_zipped_dag = _has_zip_extension(self.folder) + for path in self.template_searchpath: + if os.path.isabs(path) or not is_zipped_dag: + searchpath.append(path) + else: + searchpath.append(os.path.join(self.folder, path)) + return searchpath + + def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment: + """Build a Jinja2 environment.""" + from airflow.sdk.definitions._internal.templater import create_template_env + searchpath = self._get_resolved_searchpath() use_native = self.render_template_as_native_obj and not force_sandboxed return create_template_env( native=use_native, diff --git a/task-sdk/tests/task_sdk/definitions/test_dag.py b/task-sdk/tests/task_sdk/definitions/test_dag.py index 263259240d28d..41c01748e2e48 100644 --- a/task-sdk/tests/task_sdk/definitions/test_dag.py +++ b/task-sdk/tests/task_sdk/definitions/test_dag.py @@ -16,10 +16,13 @@ # under the License. from __future__ import annotations +import os import re import warnings import weakref +import zipfile from datetime import datetime, timedelta, timezone +from pathlib import Path from typing import Any import pytest @@ -32,6 +35,7 @@ from airflow.utils.types import DagRunType DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc) +DEFAULT_ZIP_DATE = datetime(2024, 1, 1, tzinfo=timezone.utc) class TestDag: @@ -941,3 +945,385 @@ def test_cycle_task_group_with_edge_labels(self): op1 >> Label("label") >> op2 assert not dag.check_cycle() + + +@pytest.fixture +def zipped_dag_with_template(tmp_path: Path) -> tuple[str, str]: + """ + Create a zip file containing a DAG and a SQL template file. + + Structure: + test_dag.zip/ + ├── test_dag.py + └── templates/ + └── query.sql + + Returns: + tuple: (path to zip file, path to DAG file within zip) + """ + zip_path = tmp_path / "test_dag.zip" + + # Create the zip file with DAG and template + with zipfile.ZipFile(zip_path, "w") as zf: + # Add a simple DAG file + dag_content = """ +from airflow import DAG +from datetime import datetime + +with DAG(dag_id="test_zip_dag", start_date=datetime(2024, 1, 1)) as dag: + pass +""" + zf.writestr("test_dag.py", dag_content) + + # Add template SQL file in subdirectory + sql_content = "SELECT column_a FROM {{ params.table_name }}" + zf.writestr("templates/query.sql", sql_content) + + # Also add a template at root level + zf.writestr("root_query.sql", "SELECT * FROM {{ params.table }}") + + # Return zip path and the fileloc as it would appear for a DAG in the zip + dag_fileloc = os.path.join(str(zip_path), "test_dag.py") + return str(zip_path), dag_fileloc + + +class TestZippedDagTemplateResolution: + """ + Test suite for template resolution in zipped DAGs. + + These tests verify that ZipAwareFileSystemLoader correctly loads + templates from within zip archives, fixing Issue #59310. + """ + + def test_dag_folder_property_for_zipped_dag(self, zipped_dag_with_template): + """ + Test that DAG.folder returns the zip file path for zipped DAGs. + + For a DAG with fileloc='/path/to/test.zip/dag.py', + folder should return '/path/to/test.zip'. + """ + zip_path, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + # Manually set fileloc as it would be set when loading from zip + dag.fileloc = dag_fileloc + + # The folder property uses os.path.dirname(fileloc) + expected_folder = zip_path + assert dag.folder == expected_folder + + def test_template_file_loads_from_zipped_dag(self, zipped_dag_with_template): + """ + Test that template files can be loaded from within zip archives. + + This verifies the fix for Issue #59310 - templates inside zipped DAGs + should be accessible via ZipAwareFileSystemLoader. + """ + _, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + dag.fileloc = dag_fileloc + + # Get the Jinja2 environment from the DAG + jinja_env = dag.get_template_env() + + # Load template from subdirectory inside zip + template = jinja_env.get_template("templates/query.sql") + rendered = template.render(params={"table_name": "users"}) + assert rendered == "SELECT column_a FROM users" + + def test_root_level_template_loads_from_zipped_dag(self, zipped_dag_with_template): + """ + Test that templates at the root level of the zip can be loaded. + """ + _, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + dag.fileloc = dag_fileloc + + jinja_env = dag.get_template_env() + + # Load template from root of zip + template = jinja_env.get_template("root_query.sql") + rendered = template.render(params={"table": "orders"}) + assert rendered == "SELECT * FROM orders" + + def test_template_searchpath_in_zipped_dag(self, zipped_dag_with_template): + """ + Test that template_searchpath works correctly for zipped DAGs. + + When template_searchpath points to a subdirectory inside the zip, + templates should be resolved relative to that path. + """ + zip_path, dag_fileloc = zipped_dag_with_template + + # Set template_searchpath to the templates folder inside the zip + template_path = os.path.join(zip_path, "templates") + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + template_searchpath=[template_path], + ) + dag.fileloc = dag_fileloc + + jinja_env = dag.get_template_env() + + # Template should be found relative to the searchpath + template = jinja_env.get_template("query.sql") + rendered = template.render(params={"table_name": "products"}) + assert rendered == "SELECT column_a FROM products" + + def test_template_searchpath_relative_path_in_zipped_dag(self, zipped_dag_with_template): + """ + Test that relative template_searchpath works correctly for zipped DAGs. + + This is the core fix for Issue #59310 - when users specify: + template_searchpath=["templates"] + it should resolve to "{zip_path}/templates" automatically. + """ + zip_path, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + template_searchpath=["templates"], + ) + dag.fileloc = dag_fileloc + + jinja_env = dag.get_template_env() + template = jinja_env.get_template("query.sql") + rendered = template.render(params={"table_name": "users"}) + assert rendered == "SELECT column_a FROM users" + + def test_regular_dag_template_resolution_works(self, tmp_path: Path): + """ + Test that template resolution works correctly for regular (non-zipped) DAGs. + + This ensures ZipAwareFileSystemLoader maintains backward compatibility + with regular filesystem paths. + """ + # Create a regular directory structure (not zipped) + dag_dir = tmp_path / "dags" + dag_dir.mkdir() + template_dir = dag_dir / "templates" + template_dir.mkdir() + + # Create template file + sql_file = template_dir / "query.sql" + sql_file.write_text("SELECT * FROM {{ params.table }}") + + # Create DAG file + dag_file = dag_dir / "test_dag.py" + dag_file.write_text("# DAG file") + + dag = DAG( + dag_id="test_regular_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + dag.fileloc = str(dag_file) + + jinja_env = dag.get_template_env() + + # This should work for regular directories + template = jinja_env.get_template("templates/query.sql") + rendered = template.render(params={"table": "users"}) + assert rendered == "SELECT * FROM users" + + +class TestOperatorTemplateFieldsInZippedDag: + """ + Tests for resolving template fields in operators for zipped DAGs. + """ + + class DummyOperator(BaseOperator): + template_fields = ("sql", "query_list") + template_ext = (".sql",) + + def __init__(self, sql: str, query_list: list[str] | None = None, **kwargs): + super().__init__(**kwargs) + self.sql = sql + self.query_list = query_list or [] + + def execute(self, context): ... + + def test_operator_template_fields_resolve_in_zipped_dag(self, zipped_dag_with_template): + """ + Test that operator template fields are resolved for zipped DAGs. + """ + _, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + dag.fileloc = dag_fileloc + + # Operator with SQL file reference + task = self.DummyOperator(task_id="test_task", sql="templates/query.sql") + task.dag = dag + + # Resolve template files + task.resolve_template_files() + + # SQL should be resolved to file content + assert "SELECT column_a FROM" in task.sql + + def test_operator_multiple_template_files_in_zipped_dag(self, zipped_dag_with_template): + """ + Test that multiple template files in list fields are resolved. + """ + _, dag_fileloc = zipped_dag_with_template + + dag = DAG( + dag_id="test_zipped_dag", + schedule=None, + start_date=DEFAULT_ZIP_DATE, + ) + dag.fileloc = dag_fileloc + + task = self.DummyOperator( + task_id="test_task", + sql="templates/query.sql", + query_list=["templates/query.sql", "root_query.sql"], + ) + task.dag = dag + + # Resolve template files + task.resolve_template_files() + + # Both template files should be resolved + assert "SELECT column_a FROM" in task.sql + assert "SELECT column_a FROM" in task.query_list[0] + assert "SELECT * FROM" in task.query_list[1] + + +class TestZipAwareFileSystemLoader: + """ + Tests for the ZipAwareFileSystemLoader class directly. + """ + + def test_loader_list_templates_in_zip(self, zipped_dag_with_template): + """ + Test that list_templates() returns templates from zip archives. + """ + from airflow.sdk.definitions._internal.templater import ZipAwareFileSystemLoader + + zip_path, _ = zipped_dag_with_template + + loader = ZipAwareFileSystemLoader([zip_path]) + templates = loader.list_templates() + + assert "test_dag.py" in templates + assert "templates/query.sql" in templates + assert "root_query.sql" in templates + + def test_loader_mixed_searchpath(self, tmp_path: Path, zipped_dag_with_template): + """ + Test that the loader works with mixed searchpaths (zip and regular directories). + """ + import jinja2 + + from airflow.sdk.definitions._internal.templater import ZipAwareFileSystemLoader + + zip_path, _ = zipped_dag_with_template + + # Create a regular directory with a template + regular_dir = tmp_path / "regular_templates" + regular_dir.mkdir() + (regular_dir / "regular.sql").write_text("SELECT 1") + + loader = ZipAwareFileSystemLoader([zip_path, str(regular_dir)]) + env = jinja2.Environment(loader=loader) + + # Should load from zip + template1 = env.get_template("templates/query.sql") + assert "SELECT column_a" in template1.render(params={"table_name": "test"}) + + # Should load from regular directory + template2 = env.get_template("regular.sql") + assert template2.render() == "SELECT 1" + + def test_loader_respects_searchpath_order(self, tmp_path: Path, zipped_dag_with_template): + """ + Test that searchpath order is respected when templates overlap. + """ + import jinja2 + + from airflow.sdk.definitions._internal.templater import ZipAwareFileSystemLoader + + zip_path, _ = zipped_dag_with_template + + # Create a regular directory with a template that matches the zip path + regular_dir = tmp_path / "regular_templates" + (regular_dir / "templates").mkdir(parents=True) + (regular_dir / "templates" / "query.sql").write_text("SELECT 42") + + # Regular path first should win + loader = ZipAwareFileSystemLoader([str(regular_dir), zip_path]) + env = jinja2.Environment(loader=loader) + template = env.get_template("templates/query.sql") + assert template.render() == "SELECT 42" + + def test_zip_path_traversal_is_blocked(self, zipped_dag_with_template): + """ + Test that path traversal attempts are blocked inside zip paths. + """ + import jinja2 + + from airflow.sdk.definitions._internal.templater import ZipAwareFileSystemLoader + + zip_path, _ = zipped_dag_with_template + loader = ZipAwareFileSystemLoader([zip_path]) + env = jinja2.Environment(loader=loader) + + with pytest.raises(jinja2.TemplateNotFound): + env.get_template("../templates/query.sql") + + def test_loader_case_insensitive_zip_extension(self, tmp_path: Path): + """Test that zip detection works with uppercase .ZIP extension.""" + import jinja2 + + from airflow.sdk.definitions._internal.templater import ZipAwareFileSystemLoader + + zip_path = tmp_path / "test_dag.ZIP" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("templates/query.sql", "SELECT 1 FROM {{ params.table }}") + + loader = ZipAwareFileSystemLoader([str(zip_path)]) + env = jinja2.Environment(loader=loader) + template = env.get_template("templates/query.sql") + assert "SELECT 1" in template.render(params={"table": "test"}) + + def test_resolved_searchpath_case_insensitive_zip(self, tmp_path: Path): + """Test that _get_resolved_searchpath detects .ZIP extension.""" + zip_path = tmp_path / "dags.ZIP" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("dag.py", "pass") + zf.writestr("templates/query.sql", "SELECT 1") + + dag_fileloc = os.path.join(str(zip_path), "dag.py") + dag = DAG(dag_id="test_zip_case", schedule=None, start_date=datetime(2024, 1, 1)) + dag.fileloc = dag_fileloc + + searchpath = dag._get_resolved_searchpath() + # Relative "templates" should be resolved against the zip path + dag.template_searchpath = ["templates"] + searchpath = dag._get_resolved_searchpath() + assert os.path.join(str(zip_path), "templates") in searchpath