Skip to content

Commit dbf3a52

Browse files
committed
user-facing phrasing for Niko
1 parent 9f81809 commit dbf3a52

3 files changed

Lines changed: 12 additions & 6 deletions

File tree

airflow-core/src/airflow/executors/base_executor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,10 @@ def queue_workload(self, workload: workloads.All, session: Session) -> None:
241241
)
242242
self.queued_callbacks[workload.callback.id] = workload
243243
else:
244-
raise ValueError(f"Un-handled workload kind {type(workload).__name__!r} in {type(self).__name__}")
244+
raise ValueError(
245+
f"Un-handled workload type {type(workload).__name__!r} in {type(self).__name__}. "
246+
f"Workload must be one of: ExecuteTask, ExecuteCallback."
247+
)
245248

246249
def _get_workloads_to_schedule(self, open_slots: int) -> list[tuple[WorkloadKey, workloads.All]]:
247250
"""

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,14 +379,14 @@ def _get_workload_team_name(self, workload: SchedulerWorkload, session: Session)
379379

380380
if team_name:
381381
self.log.debug(
382-
"Resolved team name '%s' for workload %s (dag_id=%s)",
382+
"Resolved team name '%s' for task or callback %s (dag_id=%s)",
383383
team_name,
384384
workload,
385385
dag_id,
386386
)
387387
else:
388388
self.log.debug(
389-
"No team found for workload %s (dag_id=%s) - DAG may not have bundle or team association",
389+
"No team found for task or callback %s (dag_id=%s) - DAG may not have bundle or team association",
390390
workload,
391391
dag_id,
392392
)
@@ -3268,15 +3268,17 @@ def _try_to_load_executor(
32683268
executor = _executor
32693269

32703270
if executor is not None:
3271-
self.log.debug("Found executor %s for workload %s (team: %s)", executor.name, workload, team_name)
3271+
self.log.debug(
3272+
"Found executor %s for task or callback %s (team: %s)", executor.name, workload, team_name
3273+
)
32723274
else:
32733275
# This case should not happen unless some (as of now unknown) edge case occurs or direct DB
32743276
# modification, since the DAG parser will validate the tasks in the DAG and ensure the executor
32753277
# they request is available and if not, disallow the DAG to be scheduled.
32763278
# Keeping this exception handling because this is a critical issue if we do somehow find
32773279
# ourselves here and the user should get some feedback about that.
32783280
self.log.warning(
3279-
"Executor, %s, was not found but a workload was configured to use it",
3281+
"Executor, %s, was not found but a Task or Callback was configured to use it",
32803282
workload.get_executor_name(),
32813283
)
32823284

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8220,7 +8220,8 @@ def test_multi_team_try_to_load_executor_explicit_executor_team_mismatch(
82208220

82218221
# Should log a warning when no executor is found
82228222
mock_log.warning.assert_called_once_with(
8223-
"Executor, %s, was not found but a workload was configured to use it", "secondary_exec"
8223+
"Executor, %s, was not found but a Task or Callback was configured to use it",
8224+
"secondary_exec"
82248225
)
82258226

82268227
# Should return None since we failed to resolve an executor due to the mismatch. In practice, this

0 commit comments

Comments
 (0)