diff --git a/cloud_pipelines_backend/launchers/kubernetes_launchers.py b/cloud_pipelines_backend/launchers/kubernetes_launchers.py index 6c481baa..25cd267b 100644 --- a/cloud_pipelines_backend/launchers/kubernetes_launchers.py +++ b/cloud_pipelines_backend/launchers/kubernetes_launchers.py @@ -922,7 +922,8 @@ def get_refreshed(self) -> "LaunchedKubernetesContainer": def get_log(self) -> str: launcher = self._get_launcher() core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client) - return core_api_client.read_namespaced_pod_log( + # _preload_content=False bypasses the kubernetes client's strict .decode('utf8'); see _get_log_by_pod_key. + response = core_api_client.read_namespaced_pod_log( name=self._pod_name, namespace=self._namespace, container=_MAIN_CONTAINER_NAME, @@ -931,7 +932,17 @@ def get_log(self) -> str: # HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"PodLogOptions \"task-pod-xxxxx\" is invalid: stream: Forbidden: may not be specified","reason":"Invalid","details":{"name":"task-pod-xxxxx","kind":"PodLogOptions","causes":[{"reason":"FieldValueForbidden","message":"Forbidden: may not be specified","field":"stream"}]},"code":422} # stream="All", _request_timeout=launcher._request_timeout, + _preload_content=False, ) + try: + log = response.data.decode("utf-8", errors="replace") + if "\N{REPLACEMENT CHARACTER}" in log: + _logger.warning( + f"Pod log for {self._pod_name} contained invalid UTF-8 bytes; substituted replacement characters." + ) + return log + finally: + response.release_conn() def upload_log(self): launcher = self._get_launcher() @@ -1490,14 +1501,26 @@ def _get_log_by_pod_key(self, pod_name: str) -> str | None: launcher = self._get_launcher() core_api_client = k8s_client_lib.CoreV1Api(api_client=launcher._api_client) try: - log = core_api_client.read_namespaced_pod_log( + # _preload_content=False bypasses the kubernetes client's strict .decode('utf8'), + # which would raise UnicodeDecodeError on torn multi-byte chars in pod logs (e.g. + # tqdm block glyphs split across concurrent-writer chunk boundaries). + response = core_api_client.read_namespaced_pod_log( name=pod_name, namespace=self._namespace, container=_MAIN_CONTAINER_NAME, timestamps=True, _request_timeout=launcher._request_timeout, + _preload_content=False, ) - return log + try: + log = response.data.decode("utf-8", errors="replace") + if "\N{REPLACEMENT CHARACTER}" in log: + _logger.warning( + f"Pod log for {pod_name} contained invalid UTF-8 bytes; substituted replacement characters." + ) + return log + finally: + response.release_conn() except kubernetes.client.exceptions.ApiException as ex: if ex.reason == "Bad Request": # Kubernetes client raises kubernetes.client.exceptions.ApiException when Pod is still in PodInitializing phase