Skip to content

Refactor _executable_task_instances_to_queued to make logic more readable#66878

Open
ashb wants to merge 1 commit into
apache:mainfrom
astronomer:scheduler-enqueue-readability
Open

Refactor _executable_task_instances_to_queued to make logic more readable#66878
ashb wants to merge 1 commit into
apache:mainfrom
astronomer:scheduler-enqueue-readability

Conversation

@ashb

@ashb ashb commented May 13, 2026

Copy link
Copy Markdown
Member

I was looking at the core scheduler logic, and the _executable_task_instances_to_queued was rather large, and hard for even me to understand, and past me wrote a good chunk of it!

This splits it into two (plus a helper) focused methods to aid readability:

  • _acquire_pool_capacity: takes the advisory lock (lifetime of the session, so longer than just this fn) and reads pool utilisation via SELECT FOR UPDATE. Returns (pools, max_tis, starved_pools) so caller can short-circuit when all pools are full before doing any TI selection work.

  • _select_task_instances_to_queue: given pre-computed pool capacity, selects eligible SCHEDULED TIs and moves them to QUEUED. Accepts the pools dict and starved_pools set as parameters, making it directly testable without needing a real lock or DB pool read. This uses the new _build_schedulable_tis_query helper fn to build the complex query.

_critical_section_enqueue_task_instances now calls these two methods in sequence, making the two-phase structure (acquire capacity, then select and queue) more explicit.

All test call sites updated to call _select_task_instances_to_queue directly with a make_pool_stats() helper, removing the dependency on pool row locking in unit tests.

No behaviour changes, just refactoring.

@ashb ashb requested a review from XD-DENG as a code owner May 13, 2026 16:06
@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label May 13, 2026
@ashb ashb changed the title Extract _acquire_pool_capacity from _critical_section_enqueue_task_instances Refactor _executable_task_instances_to_queued to make logic more readable May 13, 2026
@ashb ashb added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label May 13, 2026
@ashb ashb added this to the Airflow 3.3.0 milestone May 13, 2026

def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -> list[TI]:
def _acquire_pool_capacity(
self, max_tis: int, session: Session

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, max_tis: int, session: Session
self, max_tis: int, *, session: Session

Or, convert it all to be kwarg only. Same with the others.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had the rule with session as kwarg mainly for the @provide_session decorator which is not used here. So a session needs to be provided by caller anyway. So it is rather a nit.

@ashb

This comment was marked as resolved.

…stances

Split the monolithic `_executable_task_instances_to_queued` into two
focused methods:

- `_acquire_pool_capacity`: takes the advisory lock and reads pool
  utilisation via SELECT FOR UPDATE. Returns (pools, max_tis,
  starved_pools) so callers can short-circuit when all pools are full
  before doing any TI selection work.

- `_select_task_instances_to_queue`: given pre-computed pool capacity,
  selects eligible SCHEDULED TIs and moves them to QUEUED. Accepts the
  pools dict and starved_pools set as parameters, making it directly
  testable without needing a real lock or DB pool read.

`_critical_section_enqueue_task_instances` now calls these two methods
in sequence, making the two-phase structure (acquire capacity, then
select and queue) visible at the orchestration level.

All test call sites updated to call `_select_task_instances_to_queue`
directly with a `make_pool_stats()` helper, removing the dependency on
pool row locking in unit tests.
@ashb ashb force-pushed the scheduler-enqueue-readability branch from 007fd8b to 0e16ac2 Compare May 14, 2026 11:21
clear_db_triggers()


def make_pool_stats(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nicer making this a real @fixture

@jscheffl jscheffl left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the break-up of the code and logic. That makes totally sense and due to the good pydoc makes it much more readable.

two nit from my side, then LGTM, hope we have this in 3.3.0? Would be cool as a clean start on a new minor.

max_tis: int,
pools: dict[str, PoolStats],
starved_pools: set[str],
session: Session,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
session: Session,
*,
session: Session,

.limit(max_tis)
)

def _mark_task_instances_queued(self, executable_tis: list[TI], session: Session) -> list[TI]:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _mark_task_instances_queued(self, executable_tis: list[TI], session: Session) -> list[TI]:
def _mark_task_instances_queued(self, executable_tis: list[TI], *, session: Session) -> list[TI]:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants