Skip to content

Commit edbf605

Browse files
committed
fix(k8s): Preserve task history during API rate limiting
- Handle 429 errors in KubernetesExecutor task publishing retry logic - Detect orphaned tasks and record TaskInstanceHistory in failure handler - Add detailed logging for rate limiting scenarios
1 parent 316ded5 commit edbf605

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,6 +1633,28 @@ def fetch_handle_failure_context(
16331633
# about to retry so we record the task instance history. For other states, the task
16341634
# instance was cleared and already recorded in the task instance history.
16351635
ti.prepare_db_for_next_try(session)
1636+
elif ti.state is None and ti.start_date is not None and ti.end_date is None:
1637+
# If the task instance state is None but has a start_date without end_date,
1638+
# it likely means the task was running but became orphaned and its state was reset.
1639+
# This can happen during scheduler restarts when executors fail to adopt running tasks
1640+
# (e.g., due to Kubernetes API 429 errors). We should still record the task instance
1641+
# history to maintain complete log history for troubleshooting.
1642+
from airflow.models.taskinstancehistory import TaskInstanceHistory
1643+
1644+
log.info(
1645+
"Recording task instance history for orphaned task %s that was previously running "
1646+
"(start_date: %s, state reset to None)",
1647+
ti.key,
1648+
ti.start_date,
1649+
)
1650+
# Temporarily set state to RUNNING to trigger proper history recording
1651+
original_state = ti.state
1652+
ti.state = TaskInstanceState.RUNNING
1653+
try:
1654+
TaskInstanceHistory.record_ti(ti, session=session)
1655+
finally:
1656+
# Restore the original state
1657+
ti.state = original_state
16361658

16371659
ti.state = State.UP_FOR_RETRY
16381660
email_for_state = operator.attrgetter("email_on_retry")

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,11 +380,12 @@ def sync(self) -> None:
380380
body = {"message": e.body}
381381

382382
retries = self.task_publish_retries[key]
383-
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
383+
# In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries
384384
message = body.get("message", "")
385385
if (
386386
(str(e.status) == "403" and "exceeded quota" in message)
387387
or (str(e.status) == "409" and "object has been modified" in message)
388+
or str(e.status) == "429" # Add support for rate limiting errors
388389
) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
389390
self.log.warning(
390391
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
@@ -682,6 +683,17 @@ def adopt_launched_task(
682683
)
683684
except ApiException as e:
684685
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
686+
687+
# Log detailed information for rate limiting errors (429) which can cause task history loss
688+
if str(e.status) == "429":
689+
self.log.warning(
690+
"Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. "
691+
"This may cause task history loss if the task was previously running. "
692+
"Consider implementing rate limiting backoff or increasing API quota.",
693+
pod.metadata.name,
694+
ti_key,
695+
)
696+
685697
return
686698

687699
del tis_to_flush_by_key[ti_key]

0 commit comments

Comments
 (0)