From e5a78f1586d9c2c1576953dcd609cc607f72d45f Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Wed, 17 Jun 2026 18:23:32 -0700 Subject: [PATCH] kubernetes: surface log-unavailable reason and observability link in placeholder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When log acquisition fails, replace the empty return value with a human- readable message that includes the pod name, namespace, and — when TANGLE_LOG_SEARCH_URL_TEMPLATE is set — a direct link to the pod's logs in the configured observability platform. The URL template supports two placeholders substituted at runtime: {pod_name} — Kubernetes pod name {start_time} — relative start derived from started_at (e.g. "now-125m", adding 5 min of padding); falls back to "now-1440m" (24 h) if the start time is not available in memory. Both started_at values (LaunchedKubernetesContainer from pod container state, LaunchedKubernetesJob from job status) are in-memory reads — no additional database queries are required to compute the time range. The placeholder is stored in GCS via upload_log and returned verbatim by the log-read API, so it surfaces wherever logs are displayed without any frontend or schema changes. --- .../launchers/kubernetes_launchers.py | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/cloud_pipelines_backend/launchers/kubernetes_launchers.py b/cloud_pipelines_backend/launchers/kubernetes_launchers.py index 21d1fa8..19498f8 100644 --- a/cloud_pipelines_backend/launchers/kubernetes_launchers.py +++ b/cloud_pipelines_backend/launchers/kubernetes_launchers.py @@ -69,6 +69,54 @@ # Environment variables for multi-node execution. _MULTI_NODE_NODE_INDEX_ENV_VAR_NAME = "_TANGLE_MULTI_NODE_NODE_INDEX" +# Optional URL template for linking to pod logs in an external observability +# platform when log acquisition fails. Set via TANGLE_LOG_SEARCH_URL_TEMPLATE. +# Three placeholders are substituted at runtime: +# {pod_name} — the Kubernetes pod name +# {start_iso8601_ms} — ISO 8601 UTC ms timestamp, started_at minus 5 min padding +# (e.g. "2026-06-17T20:24:11.000Z"), or 24 h before now as fallback +# {end_iso8601_ms} — ISO 8601 UTC ms timestamp, ended_at plus 5 min padding, +# or now when the pod is still running / end time is unavailable +# Example for Observe: see oasis-backend deployment config. +_LOG_SEARCH_URL_TEMPLATE: str | None = os.environ.get("TANGLE_LOG_SEARCH_URL_TEMPLATE") + +_LOG_SEARCH_URL_TS_PADDING = datetime.timedelta(minutes=5) + + +def _to_iso8601_ms(dt: datetime.datetime) -> str: + """Format a datetime as an ISO 8601 UTC string with millisecond precision.""" + utc = dt.astimezone(datetime.timezone.utc) + return utc.strftime("%Y-%m-%dT%H:%M:%S.") + f"{utc.microsecond // 1000:03d}Z" + + +def _format_log_unavailable_message( + pod_name: str, + namespace: str, + started_at: "datetime.datetime | None", + ended_at: "datetime.datetime | None", +) -> str: + """Return a placeholder log string when the Kubernetes API cannot be read.""" + msg = ( + f"[Log unavailable: Kubernetes API returned a malformed response. " + f"Pod: {pod_name}, Namespace: {namespace}." + ) + if _LOG_SEARCH_URL_TEMPLATE: + now = datetime.datetime.now(tz=datetime.timezone.utc) + from_dt = ( + (started_at - _LOG_SEARCH_URL_TS_PADDING) + if started_at + else (now - datetime.timedelta(hours=24)) + ) + to_dt = (ended_at + _LOG_SEARCH_URL_TS_PADDING) if ended_at else now + url = ( + _LOG_SEARCH_URL_TEMPLATE.replace("{pod_name}", pod_name) + .replace("{start_iso8601_ms}", _to_iso8601_ms(from_dt)) + .replace("{end_iso8601_ms}", _to_iso8601_ms(to_dt)) + ) + msg += f" Search: {url}" + msg += "]\n" + return msg + _T = typing.TypeVar("_T") @@ -944,7 +992,9 @@ def get_log(self) -> str: self._pod_name, exc_info=True, ) - return "" + return _format_log_unavailable_message( + self._pod_name, self._namespace, self.started_at, self.ended_at + ) def upload_log(self): launcher = self._get_launcher() @@ -1528,7 +1578,9 @@ def _get_log_by_pod_key(self, pod_name: str) -> str | None: pod_name, exc_info=True, ) - return None + return _format_log_unavailable_message( + pod_name, self._namespace, self.started_at, self.ended_at + ) def _get_all_logs(self) -> dict[str, str]: logs = {}