From 27b57aa97654c48b17194177f7d83339f97ab27d Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Wed, 17 Jun 2026 17:54:19 -0700 Subject: [PATCH] kubernetes: swallow log-acquisition errors to prevent false SYSTEM_ERROR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the Kubernetes API returns a malformed response (truncated body, broken UTF-8, broken JSON), the kubernetes client throws inside read_namespaced_pod_log before returning any data. That exception propagates into the orchestrator's _retry wrapper (max 5 attempts), which — after exhausting retries — marks the execution SYSTEM_ERROR and skips all downstream tasks, even though the training job itself completed successfully. Catch all non-ApiException errors in: - LaunchedKubernetesContainer.get_log (single-pod path) - LaunchedKubernetesJob._get_log_by_pod_key (multi-pod path) On failure, log a warning with the full traceback (observable in Observe) and return an empty string / None. The execution proceeds to its correct terminal state (SUCCEEDED or FAILED) with an empty log rather than being incorrectly promoted to SYSTEM_ERROR. The existing ApiException / "Bad Request" handling for PodInitializing is preserved and unaffected. --- .../launchers/kubernetes_launchers.py | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/cloud_pipelines_backend/launchers/kubernetes_launchers.py b/cloud_pipelines_backend/launchers/kubernetes_launchers.py index 6c481baa..21d1fa84 100644 --- a/cloud_pipelines_backend/launchers/kubernetes_launchers.py +++ b/cloud_pipelines_backend/launchers/kubernetes_launchers.py @@ -922,16 +922,29 @@ def get_refreshed(self) -> "LaunchedKubernetesContainer": def get_log(self) -> str: launcher = self._get_launcher() core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client) - return core_api_client.read_namespaced_pod_log( - name=self._pod_name, - namespace=self._namespace, - container=_MAIN_CONTAINER_NAME, - timestamps=True, - # Disabled stream="All" due to error in some new GKE clusters - # HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"PodLogOptions \"task-pod-xxxxx\" is invalid: stream: Forbidden: may not be specified","reason":"Invalid","details":{"name":"task-pod-xxxxx","kind":"PodLogOptions","causes":[{"reason":"FieldValueForbidden","message":"Forbidden: may not be specified","field":"stream"}]},"code":422} - # stream="All", - _request_timeout=launcher._request_timeout, - ) + try: + return core_api_client.read_namespaced_pod_log( + name=self._pod_name, + namespace=self._namespace, + container=_MAIN_CONTAINER_NAME, + timestamps=True, + # Disabled stream="All" due to error in some new GKE clusters + # HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"PodLogOptions \"task-pod-xxxxx\" is invalid: stream: Forbidden: may not be specified","reason":"Invalid","details":{"name":"task-pod-xxxxx","kind":"PodLogOptions","causes":[{"reason":"FieldValueForbidden","message":"Forbidden: may not be specified","field":"stream"}]},"code":422} + # stream="All", + _request_timeout=launcher._request_timeout, + ) + except Exception: + # The Kubernetes API can return truncated or otherwise malformed responses + # (broken UTF-8, broken JSON) that cause the client to throw before we can + # read the log. Surfacing that exception fails the entire execution via the + # orchestrator's retry wrapper. A missing log is far less harmful than a + # false SYSTEM_ERROR on an otherwise-healthy run. + _logger.warning( + "Failed to retrieve log for pod %s; log will be missing.", + self._pod_name, + exc_info=True, + ) + return "" def upload_log(self): launcher = self._get_launcher() @@ -1504,6 +1517,18 @@ def _get_log_by_pod_key(self, pod_name: str) -> str | None: # See https://github.com/TangleML/tangle/issues/139 return None raise + except Exception: + # The Kubernetes API can return truncated or otherwise malformed responses + # (broken UTF-8, broken JSON) that cause the client to throw before we can + # read the log. Surfacing that exception fails the entire execution via the + # orchestrator's retry wrapper. A missing log is far less harmful than a + # false SYSTEM_ERROR on an otherwise-healthy run. + _logger.warning( + "Failed to retrieve log for pod %s; log will be missing for this pod.", + pod_name, + exc_info=True, + ) + return None def _get_all_logs(self) -> dict[str, str]: logs = {}