Skip to content

feat: add callback support to aws batch executor#62984

Open
dondaum wants to merge 1 commit intoapache:mainfrom
dondaum:feat/batch-executor-callback-support
Open

feat: add callback support to aws batch executor#62984
dondaum wants to merge 1 commit intoapache:mainfrom
dondaum:feat/batch-executor-callback-support

Conversation

@dondaum
Copy link
Contributor

@dondaum dondaum commented Mar 6, 2026

Implements executor callback support for the AWS BatchExecutor.

related: #62887


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dondaum dondaum requested a review from o-nikolas as a code owner March 6, 2026 09:32
@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Mar 6, 2026
@ferruzzi ferruzzi mentioned this pull request Mar 10, 2026
2 tasks
Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see a lot of other methods with names, types, variables, comments all mentioning tasks where it should be workloads that need cleaning up.

@ferruzzi should have a look too of course

@@ -127,26 +128,40 @@ def __init__(self, *args, **kwargs):
def queue_workload(self, workload: workloads.All, session: Session | None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind can you narrow down the type for workload here? I think there is a new type that @ferruzzi made basically anywhere you need task | callback as a type.

Copy link
Contributor Author

@dondaum dondaum Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review.

Are you referring to types ExecuteTask and ExecuteCallback? https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/workloads/__init__.py

I think we can only use it if we pin the provider's Airflow core dependency to when this change was implemented. This happened here #61153.

If we want to maintain backwards compatibility, I can't use the new types.. Currently the Amazon provider is apache-airflow>=2.11.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for just typing? @ferruzzi thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onikolas is likely thinking of SchedulerWorkload from workloads/types.py, but @dondaum may be right, that isn't going to be in 2.11 so we can't use that until we bunmp the min_ver.

I think we could do a conditional import though:

workload_type_hint = workloads.All

if airflow version  > 3.2:
    from airflow.executors.workloads.types import SchedulerWorkload 

    workload_type_hint = SchedulerWorkload

And that will force it to get cleaned up later when we pin the versions up??

Copy link
Contributor Author

@dondaum dondaum Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we make this conditional input work here. We are still limited by the BaseExecutor type having this

def queue_workload(self, workload: workloads.All, session: Session) -> None:
    ...

@potiuk
Copy link
Member

potiuk commented Mar 12, 2026

@dondaum This PR has a few issues that need to be addressed before it can be reviewed — please see our Pull Request quality criteria.

Issues found:

  • ⚠️ Unresolved review comments: This PR has 3 unresolved review threads from maintainers. Please review and resolve all inline review comments before requesting another review. You can resolve a conversation by clicking 'Resolve conversation' on each thread after addressing the feedback. See pull request guidelines.

Note: Your branch is 278 commits behind main. Some check failures may be caused by changes in the base branch rather than by your PR. Please rebase your branch and push again to get up-to-date CI results.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • Maintainers will then proceed with a normal review.

Please address the issues above and push again. If you have questions, feel free to ask on the Airflow Slack.

@dondaum dondaum force-pushed the feat/batch-executor-callback-support branch from 571de3f to c591fbe Compare March 12, 2026 10:08
@dondaum
Copy link
Contributor Author

dondaum commented Mar 12, 2026

I still see a lot of other methods with names, types, variables, comments all mentioning tasks where it should be workloads that need cleaning up.

@ferruzzi should have a look too of course

I changed the reference from task to workload where I could make a change. There is a method try_adopt_task_instances() which I have not changed since it is part of the base class. But technically it should work on both tasks and callbacks.

@dondaum dondaum force-pushed the feat/batch-executor-callback-support branch from c591fbe to 67332d4 Compare March 12, 2026 10:46
@dondaum dondaum force-pushed the feat/batch-executor-callback-support branch from 67332d4 to 00c71ae Compare March 12, 2026 11:11
@eladkal eladkal requested review from o-nikolas and vincbeck March 12, 2026 18:04
Comment on lines 365 to +371
self.log_task_event(
event="batch job submit failure",
extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). "
f"Dropping the task. Reason: {failure_reason}",
ti_key=key,
f"Dropping the workload. Reason: {failure_reason}",
ti_key=workload_key,
)
self.fail(key=key)
self.fail(key=workload_key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a callback workload exceeds max submit attempts, log_task_event is called with ti_key=workload_key. For callbacks, this key is a string UUID, not a TaskInstanceKey named tuple, which will cause errors since Log(task_instance=...) expects a TaskInstanceKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great find. I followed the relevant code. The scheduler uses this log queue to write a Log() entry. The log itself can be initialized without a task instance, but the Executor method expects a task instance key def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey).

So I'm wondering whether we should either adjust the def log_task_event() to accept both keys, which would also perhaps require a change to a different executor, or whether we should remove the callback from this log queue.

@ferruzzi any thoughts on it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point he made in another PR. #63035 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to change Log() since it can already be initiated without a Task instance.

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/log.py#L78C9-L78C22

What we could do is:

# BaseExecutor
def log_task_event(self, *, event: str, extra: str, ti_key: TaskInstanceKey | None):
    ...
# BatchExecutor
self.log_task_event(
    event="batch job submit failure",
    extra=f"This job has been unsuccessfully attempted too many times ({attempt_number}). "
    f"Dropping the workload. Reason: {failure_reason}"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this Log() entry for? I am wondering if callbacks need the entry or are they just for tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@shivaam shivaam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Left a comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants