feat: add callback support to aws batch executor#62984
feat: add callback support to aws batch executor#62984dondaum wants to merge 1 commit intoapache:mainfrom
Conversation
providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py
Outdated
Show resolved
Hide resolved
providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py
Outdated
Show resolved
Hide resolved
| @@ -127,26 +128,40 @@ def __init__(self, *args, **kwargs): | |||
| def queue_workload(self, workload: workloads.All, session: Session | None) -> None: | |||
There was a problem hiding this comment.
If you don't mind can you narrow down the type for workload here? I think there is a new type that @ferruzzi made basically anywhere you need task | callback as a type.
There was a problem hiding this comment.
Thank you for the review.
Are you referring to types ExecuteTask and ExecuteCallback? https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/workloads/__init__.py
I think we can only use it if we pin the provider's Airflow core dependency to when this change was implemented. This happened here #61153.
If we want to maintain backwards compatibility, I can't use the new types.. Currently the Amazon provider is apache-airflow>=2.11.0
There was a problem hiding this comment.
@onikolas is likely thinking of SchedulerWorkload from workloads/types.py, but @dondaum may be right, that isn't going to be in 2.11 so we can't use that until we bunmp the min_ver.
I think we could do a conditional import though:
workload_type_hint = workloads.All
if airflow version > 3.2:
from airflow.executors.workloads.types import SchedulerWorkload
workload_type_hint = SchedulerWorkloadAnd that will force it to get cleaned up later when we pin the versions up??
There was a problem hiding this comment.
Let's say we make this conditional input work here. We are still limited by the BaseExecutor type having this
def queue_workload(self, workload: workloads.All, session: Session) -> None:
...|
@dondaum This PR has a few issues that need to be addressed before it can be reviewed — please see our Pull Request quality criteria. Issues found:
What to do next:
Please address the issues above and push again. If you have questions, feel free to ask on the Airflow Slack. |
571de3f to
c591fbe
Compare
I changed the reference from task to workload where I could make a change. There is a method |
c591fbe to
67332d4
Compare
67332d4 to
00c71ae
Compare
| self.log_task_event( | ||
| event="batch job submit failure", | ||
| extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). " | ||
| f"Dropping the task. Reason: {failure_reason}", | ||
| ti_key=key, | ||
| f"Dropping the workload. Reason: {failure_reason}", | ||
| ti_key=workload_key, | ||
| ) | ||
| self.fail(key=key) | ||
| self.fail(key=workload_key) |
There was a problem hiding this comment.
When a callback workload exceeds max submit attempts, log_task_event is called with ti_key=workload_key. For callbacks, this key is a string UUID, not a TaskInstanceKey named tuple, which will cause errors since Log(task_instance=...) expects a TaskInstanceKey.
There was a problem hiding this comment.
Great find. I followed the relevant code. The scheduler uses this log queue to write a Log() entry. The log itself can be initialized without a task instance, but the Executor method expects a task instance key def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey).
So I'm wondering whether we should either adjust the def log_task_event() to accept both keys, which would also perhaps require a change to a different executor, or whether we should remove the callback from this log queue.
@ferruzzi any thoughts on it ?
There was a problem hiding this comment.
Same point he made in another PR. #63035 (comment)
There was a problem hiding this comment.
I don't think we need to change Log() since it can already be initiated without a Task instance.
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/log.py#L78C9-L78C22
What we could do is:
# BaseExecutor
def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey | None):
...
# BatchExecutor
self.log_task_event(
event="batch job submit failure",
extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). "
f"Dropping the workload. Reason: {failure_reason}"
)There was a problem hiding this comment.
What is this Log() entry for? I am wondering if callbacks need the entry or are they just for tasks?
There was a problem hiding this comment.
Implements executor callback support for the AWS BatchExecutor.
related: #62887
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.