From 3674c0a8ac0062c3282be5fbf98b46c661142ea3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 5 Aug 2025 11:29:20 +0500 Subject: [PATCH] Return logs external_url for AWS and GCP --- src/dstack/_internal/core/models/logs.py | 3 ++- .../_internal/server/services/logs/aws.py | 14 +++++++++++++- .../_internal/server/services/logs/gcp.py | 17 ++++++++++++++++- src/tests/_internal/server/routers/test_logs.py | 2 ++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/core/models/logs.py b/src/dstack/_internal/core/models/logs.py index 7dd5cb87de..887176f247 100644 --- a/src/dstack/_internal/core/models/logs.py +++ b/src/dstack/_internal/core/models/logs.py @@ -23,4 +23,5 @@ class LogEvent(CoreModel): class JobSubmissionLogs(CoreModel): logs: List[LogEvent] - next_token: Optional[str] + external_url: Optional[str] = None + next_token: Optional[str] = None diff --git a/src/dstack/_internal/server/services/logs/aws.py b/src/dstack/_internal/server/services/logs/aws.py index dbd6a750a6..692ae1348e 100644 --- a/src/dstack/_internal/server/services/logs/aws.py +++ b/src/dstack/_internal/server/services/logs/aws.py @@ -1,5 +1,7 @@ import itertools import operator +import urllib +import urllib.parse from contextlib import contextmanager from datetime import datetime, timedelta, timezone from typing import Iterator, List, Optional, Set, Tuple, TypedDict @@ -64,6 +66,7 @@ def __init__(self, *, group: str, region: Optional[str] = None) -> None: self._client = session.client("logs") self._check_group_exists(group) self._group = group + self._region = self._client.meta.region_name # Stores names of already created streams. # XXX: This set acts as an unbound cache. If this becomes a problem (in case of _very_ long # running server and/or lots of jobs, consider replacing it with an LRU cache, e.g., @@ -103,7 +106,11 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi ) for cw_event in cw_events ] - return JobSubmissionLogs(logs=logs, next_token=next_token) + return JobSubmissionLogs( + logs=logs, + external_url=self._get_stream_external_url(stream), + next_token=next_token, + ) def _get_log_events_with_retry( self, stream: str, request: PollLogsRequest @@ -181,6 +188,11 @@ def _get_log_events( return events, next_token + def _get_stream_external_url(self, stream: str) -> str: + quoted_group = urllib.parse.quote(self._group, safe="") + quoted_stream = urllib.parse.quote(stream, safe="") + return f"https://console.aws.amazon.com/cloudwatch/home?region={self._region}#logsV2:log-groups/log-group/{quoted_group}/log-events/{quoted_stream}" + def write_logs( self, project: ProjectModel, diff --git a/src/dstack/_internal/server/services/logs/gcp.py b/src/dstack/_internal/server/services/logs/gcp.py index 6e9314df26..7faa727dc1 100644 --- a/src/dstack/_internal/server/services/logs/gcp.py +++ b/src/dstack/_internal/server/services/logs/gcp.py @@ -1,3 +1,4 @@ +import urllib.parse from typing import List from uuid import UUID @@ -48,6 +49,7 @@ class GCPLogStorage(LogStorage): # (https://cloud.google.com/logging/docs/analyze/custom-index). def __init__(self, project_id: str): + self.project_id = project_id try: self.client = logging_v2.Client(project=project_id) self.logger = self.client.logger(name=self.LOG_NAME) @@ -106,7 +108,11 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi "GCP Logging read request limit exceeded." " It's recommended to increase default entries.list request quota from 60 per minute." ) - return JobSubmissionLogs(logs=logs, next_token=next_token if len(logs) > 0 else None) + return JobSubmissionLogs( + logs=logs, + external_url=self._get_stream_extrnal_url(stream_name), + next_token=next_token if len(logs) > 0 else None, + ) def write_logs( self, @@ -162,3 +168,12 @@ def _get_stream_name( self, project_name: str, run_name: str, job_submission_id: UUID, producer: LogProducer ) -> str: return f"{project_name}-{run_name}-{job_submission_id}-{producer.value}" + + def _get_stream_extrnal_url(self, stream_name: str) -> str: + log_name_resource_name = self._get_log_name_resource_name() + query = f'logName="{log_name_resource_name}" AND labels.stream="{stream_name}"' + quoted_query = urllib.parse.quote(query, safe="") + return f"https://console.cloud.google.com/logs/query;query={quoted_query}?project={self.project_id}" + + def _get_log_name_resource_name(self) -> str: + return f"projects/{self.project_id}/logs/{self.LOG_NAME}" diff --git a/src/tests/_internal/server/routers/test_logs.py b/src/tests/_internal/server/routers/test_logs.py index 0364edee46..33d904d56a 100644 --- a/src/tests/_internal/server/routers/test_logs.py +++ b/src/tests/_internal/server/routers/test_logs.py @@ -75,6 +75,7 @@ async def test_returns_logs( "message": "IQ==", }, ], + "external_url": None, "next_token": None, } response = await client.post( @@ -96,5 +97,6 @@ async def test_returns_logs( "message": "IQ==", }, ], + "external_url": None, "next_token": None, }