Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@
# Environment variables for multi-node execution.
_MULTI_NODE_NODE_INDEX_ENV_VAR_NAME = "_TANGLE_MULTI_NODE_NODE_INDEX"

# Optional URL template for linking to pod logs in an external observability
# platform when log acquisition fails. Set via TANGLE_LOG_SEARCH_URL_TEMPLATE.
# Three placeholders are substituted at runtime:
# {pod_name} — the Kubernetes pod name
# {start_iso8601_ms} — ISO 8601 UTC ms timestamp, started_at minus 5 min padding
# (e.g. "2026-06-17T20:24:11.000Z"), or 24 h before now as fallback
# {end_iso8601_ms} — ISO 8601 UTC ms timestamp, ended_at plus 5 min padding,
# or now when the pod is still running / end time is unavailable
# Example for Observe: see oasis-backend deployment config.
_LOG_SEARCH_URL_TEMPLATE: str | None = os.environ.get("TANGLE_LOG_SEARCH_URL_TEMPLATE")

_LOG_SEARCH_URL_TS_PADDING = datetime.timedelta(minutes=5)


def _to_iso8601_ms(dt: datetime.datetime) -> str:
"""Format a datetime as an ISO 8601 UTC string with millisecond precision."""
utc = dt.astimezone(datetime.timezone.utc)
return utc.strftime("%Y-%m-%dT%H:%M:%S.") + f"{utc.microsecond // 1000:03d}Z"


def _format_log_unavailable_message(
pod_name: str,
namespace: str,
started_at: "datetime.datetime | None",
ended_at: "datetime.datetime | None",
) -> str:
"""Return a placeholder log string when the Kubernetes API cannot be read."""
msg = (
f"[Log unavailable: Kubernetes API returned a malformed response. "
f"Pod: {pod_name}, Namespace: {namespace}."
)
if _LOG_SEARCH_URL_TEMPLATE:
now = datetime.datetime.now(tz=datetime.timezone.utc)
from_dt = (
(started_at - _LOG_SEARCH_URL_TS_PADDING)
if started_at
else (now - datetime.timedelta(hours=24))
)
to_dt = (ended_at + _LOG_SEARCH_URL_TS_PADDING) if ended_at else now
url = (
_LOG_SEARCH_URL_TEMPLATE.replace("{pod_name}", pod_name)
.replace("{start_iso8601_ms}", _to_iso8601_ms(from_dt))
.replace("{end_iso8601_ms}", _to_iso8601_ms(to_dt))
)
msg += f" Search: {url}"
msg += "]\n"
return msg


_T = typing.TypeVar("_T")

Expand Down Expand Up @@ -944,7 +992,9 @@ def get_log(self) -> str:
self._pod_name,
exc_info=True,
)
return ""
return _format_log_unavailable_message(
self._pod_name, self._namespace, self.started_at, self.ended_at
)

def upload_log(self):
launcher = self._get_launcher()
Expand Down Expand Up @@ -1528,7 +1578,9 @@ def _get_log_by_pod_key(self, pod_name: str) -> str | None:
pod_name,
exc_info=True,
)
return None
return _format_log_unavailable_message(
pod_name, self._namespace, self.started_at, self.ended_at
)

def _get_all_logs(self) -> dict[str, str]:
logs = {}
Expand Down