From 4f8912185a459b268328c00339a76e56668c820b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 24 Jun 2026 13:27:59 +0900 Subject: [PATCH 1/2] Correct the example and docstring for the coordinators --- airflow-core/src/airflow/config_templates/config.yml | 7 +++++++ .../airflow/sdk/coordinators/executable/coordinator.py | 7 +++---- .../src/airflow/sdk/coordinators/java/coordinator.py | 9 ++++----- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index a8ce5506c534e..e0cd830889f92 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2091,6 +2091,7 @@ sdk: "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": { + "jars_root": ["/opt/airflow/java-bundles"], "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"] }, @@ -2099,6 +2100,12 @@ sdk: "worker_container_repository": "apache/airflow", "worker_container_tag": "3.3.0" } + }, + "go-sdk": { + "classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", + "kwargs": { + "executables_root": ["/opt/airflow/executable-bundles"] + } } } default: ~ diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py index ed11f97165d1a..2275628d0b1c8 100644 --- a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py +++ b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py @@ -370,12 +370,11 @@ class ExecutableCoordinator(SubprocessCoordinator): Configuration is taken from the ``[sdk] coordinators`` entry that constructs this instance:: - { - "name": "go", + "go": { "classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs": { - "executables_root": ["~/airflow/executable-bundles"], - }, + "executables_root": ["~/airflow/executable-bundles"] + } } :param executables_root: A list of directories scanned for executable diff --git a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py index cb74c64dfd48f..da35d0f37b84e 100644 --- a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py +++ b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py @@ -174,14 +174,13 @@ class JavaCoordinator(SubprocessCoordinator): Configuration is taken from the ``[sdk] coordinators`` entry that constructs this instance:: - { - "name": "jdk-17", + "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": { - "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", - "jvm_args": ["-Xmx1024m"], "jars_root": ["~/airflow/jars"], - }, + "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", + "jvm_args": ["-Xmx1024m"] + } } :param java_executable: Path to the ``java`` command (defaults to From e70c9f8e3350feac940fb1f8be5d191557bffcca Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 24 Jun 2026 14:42:59 +0900 Subject: [PATCH 2/2] Guard the config.yml coordinators example against drift --- .../execution_time/test_coordinator.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py index 475b587bf7f1f..40db5378afccf 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py +++ b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py @@ -22,7 +22,8 @@ import pytest -from airflow.sdk.configuration import conf +from airflow.sdk._shared.module_loading import import_string +from airflow.sdk.configuration import conf, retrieve_configuration_description from airflow.sdk.execution_time.coordinator import ( BaseCoordinator, CoordinatorManager, @@ -199,3 +200,32 @@ def test_extra_for_queue_does_not_instantiate_coordinator(self, sdk_config): "pod_template_file": "/opt/airflow/pod_templates/boom.yaml" } assert manager._created_coordinators == {} + + +class TestConfigYamlCoordinatorsExample: + """Guard the ``[sdk] coordinators`` example in ``config.yml`` against drift. + + Nothing else exercises the example, so a broken one (e.g. dropping the + required ``jars_root`` kwarg) can ship unnoticed. Loading it through + CoordinatorManager and constructing every entry keeps the example honest. + """ + + def test_every_example_coordinator_constructs(self, sdk_config): + description = retrieve_configuration_description() + coordinators_example = description["sdk"]["options"]["coordinators"]["example"] + specs = json.loads(coordinators_example) + assert specs, "config.yml [sdk] coordinators example must not be empty" + + # The example's own queue_to_coordinator illustrates different keys, so + # route every coordinator through a synthetic queue to construct each one. + queue_to_coordinator = {f"queue-{key}": key for key in specs} + sdk_config( + coordinators=coordinators_example, + queue_to_coordinator=json.dumps(queue_to_coordinator), + ) + manager = CoordinatorManager.from_config() + assert set(manager._coordinator_specs) == set(specs) + + for queue, key in queue_to_coordinator.items(): + coordinator = manager.for_queue(queue) + assert isinstance(coordinator, import_string(specs[key]["classpath"]))