-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Move ExecutorCallback execution into a supervised process #62645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2f6a8e1
0f671a1
64fa0f5
3b82e1e
2596cb9
170348f
9965f46
428549a
b5afbe4
43d814a
51e1b6b
79f9293
937618f
a2f3252
6df881f
21a255f
845db57
e558b31
b8052b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,10 +35,7 @@ | |
|
|
||
| import structlog | ||
|
|
||
| from airflow.executors import workloads | ||
| from airflow.executors.base_executor import BaseExecutor | ||
| from airflow.executors.workloads.callback import execute_callback_workload | ||
| from airflow.utils.state import CallbackState, TaskInstanceState | ||
|
|
||
| # add logger to parameter of setproctitle to support logging | ||
| if sys.platform == "darwin": | ||
|
|
@@ -51,9 +48,23 @@ | |
| if TYPE_CHECKING: | ||
| from structlog.typing import FilteringBoundLogger as Logger | ||
|
|
||
| from airflow.executors.workloads import ExecutorWorkload | ||
| from airflow.executors.workloads.types import WorkloadResultType | ||
|
|
||
|
|
||
| def _get_execution_api_server_url(team_conf) -> str: | ||
| """ | ||
| Resolve the execution API server URL from team-specific configuration. | ||
|
|
||
| :param team_conf: Team-specific executor configuration (ExecutorConf or AirflowConfigParser) | ||
| """ | ||
| base_url = team_conf.get("api", "base_url", fallback="/") | ||
| if base_url.startswith("/"): | ||
| base_url = f"http://localhost:8080{base_url}" | ||
| default_execution_api_server = f"{base_url.rstrip('/')}/execution/" | ||
| return team_conf.get("core", "execution_api_server_url", fallback=default_execution_api_server) | ||
|
|
||
|
|
||
| def _get_executor_process_title_prefix(team_name: str | None) -> str: | ||
| """ | ||
| Build the process title prefix for LocalExecutor workers. | ||
|
|
@@ -66,7 +77,7 @@ def _get_executor_process_title_prefix(team_name: str | None) -> str: | |
|
|
||
| def _run_worker( | ||
| logger_name: str, | ||
| input: SimpleQueue[workloads.All | None], | ||
| input: SimpleQueue[ExecutorWorkload | None], | ||
| output: Queue[WorkloadResultType], | ||
| unread_messages: multiprocessing.sharedctypes.Synchronized[int], | ||
| team_conf, | ||
|
|
@@ -99,73 +110,35 @@ def _run_worker( | |
| with unread_messages: | ||
| unread_messages.value -= 1 | ||
|
|
||
| # Handle different workload types | ||
| if isinstance(workload, workloads.ExecuteTask): | ||
| try: | ||
| _execute_work(log, workload, team_conf) | ||
| output.put((workload.ti.key, TaskInstanceState.SUCCESS, None)) | ||
| except Exception as e: | ||
| log.exception("Task execution failed.") | ||
| output.put((workload.ti.key, TaskInstanceState.FAILED, e)) | ||
|
|
||
| elif isinstance(workload, workloads.ExecuteCallback): | ||
| output.put((workload.callback.id, CallbackState.RUNNING, None)) | ||
| try: | ||
| _execute_callback(log, workload, team_conf) | ||
| output.put((workload.callback.id, CallbackState.SUCCESS, None)) | ||
| except Exception as e: | ||
| log.exception("Callback execution failed") | ||
| output.put((workload.callback.id, CallbackState.FAILED, e)) | ||
|
|
||
| else: | ||
| raise ValueError(f"LocalExecutor does not know how to handle {type(workload)}") | ||
|
|
||
|
|
||
| def _execute_work(log: Logger, workload: workloads.ExecuteTask, team_conf) -> None: | ||
| """ | ||
| Execute command received and stores result state in queue. | ||
|
|
||
| :param log: Logger instance | ||
| :param workload: The workload to execute | ||
| :param team_conf: Team-specific executor configuration | ||
| """ | ||
| from airflow.sdk.execution_time.supervisor import supervise | ||
| if workload.running_state is not None: | ||
| output.put((workload.key, workload.running_state, None)) | ||
|
|
||
| setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} {workload.ti.id}", log) | ||
|
|
||
| base_url = team_conf.get("api", "base_url", fallback="/") | ||
| # If it's a relative URL, use localhost:8080 as the default | ||
| if base_url.startswith("/"): | ||
| base_url = f"http://localhost:8080{base_url}" | ||
| default_execution_api_server = f"{base_url.rstrip('/')}/execution/" | ||
|
|
||
| # This will return the exit code of the task process, but we don't care about that, just if the | ||
| # _supervisor_ had an error reporting the state back (which will result in an exception.) | ||
| supervise( | ||
| # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this. | ||
| ti=workload.ti, # type: ignore[arg-type] | ||
| dag_rel_path=workload.dag_rel_path, | ||
| bundle_info=workload.bundle_info, | ||
| token=workload.token, | ||
| server=team_conf.get("core", "execution_api_server_url", fallback=default_execution_api_server), | ||
| log_path=workload.log_path, | ||
| ) | ||
| try: | ||
| _execute_workload(log, workload, team_conf) | ||
| output.put((workload.key, workload.success_state, None)) | ||
ferruzzi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| except Exception as e: | ||
| log.exception("Workload execution failed.", workload_type=type(workload).__name__) | ||
| output.put((workload.key, workload.failure_state, e)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. I can see where you are going. I don't think it's quite that simple though. with the two pops how I have it, one of them will pop and the other is expected to be a no-op. a task will pop from queued_tasks and the queued_callbacks pop will just not do anything, and vice versa. A But you are right that I missed the case where it's not in either. How do you feel about this: for workload in workload_list:
self.activity_queue.put(workload)
# A valid workload will exist in exactly one of these dicts.
# One will succeed, the other will fail gracefully and return None.
removed = self.queued_tasks.pop(workload.key, None) or self.queued_callbacks.pop(workload.key, None)
if not removed:
raise KeyError(f"Workload {workload.key} was not found in any queue.")
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I implemented that in e558b31, if you have another idea I can revert it |
||
|
|
||
|
|
||
| def _execute_callback(log: Logger, workload: workloads.ExecuteCallback, team_conf) -> None: | ||
| def _execute_workload(log: Logger, workload: ExecutorWorkload, team_conf) -> None: | ||
| """ | ||
| Execute a callback workload. | ||
| Execute any workload type in a supervised subprocess. | ||
|
|
||
| All workload types are run in a supervised child process, providing process isolation, | ||
| stdout/stderr capture, signal handling, and crash detection. | ||
|
|
||
| :param log: Logger instance | ||
| :param workload: The ExecuteCallback workload to execute | ||
| :param workload: The workload to execute (ExecuteTask or ExecuteCallback) | ||
| :param team_conf: Team-specific executor configuration | ||
| """ | ||
| setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} {workload.callback.id}", log) | ||
|
|
||
| success, error_msg = execute_callback_workload(workload.callback, log) | ||
| from airflow.sdk.execution_time.supervisor import supervise_workload | ||
|
|
||
| if not success: | ||
| raise RuntimeError(error_msg or "Callback execution failed") | ||
| supervise_workload( | ||
| workload, | ||
| server=_get_execution_api_server_url(team_conf), | ||
| proctitle=f"{_get_executor_process_title_prefix(team_conf.team_name)} {workload.display_name}", | ||
| ) | ||
|
|
||
|
|
||
| class LocalExecutor(BaseExecutor): | ||
|
|
@@ -184,7 +157,7 @@ class LocalExecutor(BaseExecutor): | |
| serve_logs: bool = True | ||
| supports_callbacks: bool = True | ||
|
|
||
| activity_queue: SimpleQueue[workloads.All | None] | ||
| activity_queue: SimpleQueue[ExecutorWorkload | None] | ||
| result_queue: SimpleQueue[WorkloadResultType] | ||
| workers: dict[int, multiprocessing.Process] | ||
| _unread_messages: multiprocessing.sharedctypes.Synchronized[int] | ||
|
|
@@ -213,6 +186,7 @@ def start(self) -> None: | |
|
|
||
| # Mypy sees this value as `SynchronizedBase[c_uint]`, but that isn't the right runtime type behaviour | ||
| # (it looks like an int to python) | ||
|
|
||
| self._unread_messages = multiprocessing.Value(ctypes.c_uint) | ||
|
|
||
| if self.is_mp_using_fork: | ||
|
|
@@ -331,11 +305,13 @@ def terminate(self): | |
| def _process_workloads(self, workload_list): | ||
| for workload in workload_list: | ||
| self.activity_queue.put(workload) | ||
| # Remove from appropriate queue based on workload type | ||
| if isinstance(workload, workloads.ExecuteTask): | ||
| del self.queued_tasks[workload.ti.key] | ||
| elif isinstance(workload, workloads.ExecuteCallback): | ||
| del self.queued_callbacks[workload.callback.id] | ||
| # A valid workload will exist in exactly one of these dicts. | ||
| # One pop will succeed, the other will return None gracefully. | ||
| removed = self.queued_tasks.pop(workload.key, None) or self.queued_callbacks.pop( | ||
| workload.key, None | ||
| ) | ||
| if not removed: | ||
| raise KeyError(f"Workload {workload.key} was not found in any queue") | ||
| with self._unread_messages: | ||
| self._unread_messages.value += len(workload_list) | ||
| self._check_workers() | ||
Uh oh!
There was an error while loading. Please reload this page.