Skip to content

Commit 652c8d8

Browse files
authored
[CLI]: dstack event --watch (#3533)
This adds the new `--watch` option to the `dstack event` command, which allows tracking events in realtime.
1 parent 2410c47 commit 652c8d8

4 files changed

Lines changed: 449 additions & 4 deletions

File tree

src/dstack/_internal/cli/commands/event.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
from dataclasses import asdict
33

44
from dstack._internal.cli.commands import APIBaseCommand
5-
from dstack._internal.cli.services.events import EventListFilters, EventPaginator, print_event
5+
from dstack._internal.cli.services.events import (
6+
EventListFilters,
7+
EventPaginator,
8+
EventTracker,
9+
print_event,
10+
)
611
from dstack._internal.cli.utils.common import (
712
get_start_time,
813
)
@@ -29,6 +34,12 @@ def _register(self):
2934
list_parser.set_defaults(subfunc=self._list)
3035

3136
for parser in [self._parser, list_parser]:
37+
parser.add_argument(
38+
"-w",
39+
"--watch",
40+
help="Watch events in realtime",
41+
action="store_true",
42+
)
3243
parser.add_argument(
3344
"--since",
3445
help=(
@@ -106,7 +117,11 @@ def _list(self, args: argparse.Namespace):
106117
since = get_start_time(args.since)
107118
filters = _build_filters(args, self.api)
108119

109-
if since is not None:
120+
if args.watch:
121+
events = EventTracker(
122+
client=self.api.client.events, filters=filters, since=since
123+
).stream_forever()
124+
elif since is not None:
110125
events = EventPaginator(self.api.client.events).list(
111126
filters=filters, since=since, ascending=True
112127
)

src/dstack/_internal/cli/services/events.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
import time
12
import uuid
23
from collections.abc import Iterator
34
from dataclasses import asdict, dataclass
4-
from datetime import datetime
5+
from datetime import datetime, timedelta
56
from typing import Optional
67

78
from rich.text import Text
89

9-
from dstack._internal.cli.utils.common import console
10+
from dstack._internal.cli.utils.common import LIVE_TABLE_PROVISION_INTERVAL_SECS, console
1011
from dstack._internal.core.models.events import Event, EventTargetType
1112
from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT
1213
from dstack.api.server._events import EventsAPIClient
@@ -50,6 +51,82 @@ def list(
5051
prev_recorded_at = events[-1].recorded_at
5152

5253

54+
class EventTracker:
55+
"""
56+
Tracks new events from the server. Implements a sliding window mechanism to avoid
57+
missing events that are commited with a delay.
58+
"""
59+
60+
def __init__(
61+
self,
62+
client: EventsAPIClient,
63+
filters: EventListFilters,
64+
since: Optional[datetime],
65+
event_delay_tolerance: timedelta = timedelta(seconds=20),
66+
) -> None:
67+
self._client = client
68+
self._filters = filters
69+
self._since = since
70+
self._event_delay_tolerance = event_delay_tolerance
71+
self._seen_events: dict[uuid.UUID, _SeenEvent] = {}
72+
self._latest_event: Optional[Event] = None
73+
74+
def poll(self) -> Iterator[Event]:
75+
"""
76+
Fetches the next batch of events from the server.
77+
"""
78+
79+
if self._since is None and self._latest_event is None:
80+
# First batch without `since` - fetch some recent events
81+
event_stream = reversed(self._client.list(ascending=False, **asdict(self._filters)))
82+
else:
83+
configured_since = self._since or datetime.fromtimestamp(0)
84+
latest_event_recorded_at = (
85+
self._latest_event.recorded_at
86+
if self._latest_event is not None
87+
else datetime.fromtimestamp(0)
88+
)
89+
since = max(
90+
configured_since.astimezone(),
91+
latest_event_recorded_at.astimezone() - self._event_delay_tolerance,
92+
)
93+
self._cleanup_seen_events(before=since)
94+
event_stream = EventPaginator(self._client).list(self._filters, since, ascending=True)
95+
96+
for event in event_stream:
97+
if event.id not in self._seen_events:
98+
self._seen_events[event.id] = _SeenEvent(recorded_at=event.recorded_at)
99+
yield event
100+
self._latest_event = event
101+
102+
def stream_forever(
103+
self,
104+
update_interval: timedelta = timedelta(seconds=LIVE_TABLE_PROVISION_INTERVAL_SECS),
105+
) -> Iterator[Event]:
106+
"""
107+
Yields events as they are received from the server.
108+
"""
109+
110+
while True:
111+
for event in self.poll():
112+
yield event
113+
time.sleep(update_interval.total_seconds())
114+
115+
def _cleanup_seen_events(self, before: datetime) -> None:
116+
ids_to_delete = {
117+
event_id
118+
for event_id, seen_event in self._seen_events.items()
119+
if seen_event.recorded_at.astimezone() < before.astimezone()
120+
}
121+
for event_id in ids_to_delete:
122+
del self._seen_events[event_id]
123+
124+
125+
@dataclass
126+
class _SeenEvent:
127+
recorded_at: datetime
128+
129+
53130
def print_event(event: Event) -> None:
54131
recorded_at = event.recorded_at.astimezone().strftime("%Y-%m-%d %H:%M:%S")
55132
targets = ", ".join(f"{target.type} {target.name}" for target in event.targets)

src/dstack/_internal/server/routers/events.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ async def list_events(
3333
3434
The results are paginated. To get the next page, pass `recorded_at` and `id` of
3535
the last event from the previous page as `prev_recorded_at` and `prev_id`.
36+
37+
NOTE: Some events may become available in the API with a delay after their `recorded_at`.
38+
This should be taken into account when using the API to monitor recent events,
39+
so that delayed events are not missed during pagination.
3640
"""
3741
return CustomORJSONResponse(
3842
await events_services.list_events(

0 commit comments

Comments
 (0)