Skip to content

Commit 4210e98

Browse files
committed
feat(fluentbit): validate next_token format and raise ServerClientError for malformed tokens
1 parent 1fb6375 commit 4210e98

2 files changed

Lines changed: 39 additions & 0 deletions

File tree

src/dstack/_internal/server/services/logs/fluentbit.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List, Optional, Protocol
22
from uuid import UUID
33

4+
from dstack._internal.core.errors import ServerClientError
45
from dstack._internal.core.models.logs import (
56
JobSubmissionLogs,
67
LogEvent,
@@ -86,6 +87,11 @@ def read(
8687

8788
if request.next_token:
8889
parts = request.next_token.split(":", 1)
90+
if len(parts) != 2 or not parts[0] or not parts[1]:
91+
raise ServerClientError(
92+
f"Invalid next_token: {request.next_token}. "
93+
"Must be in format 'timestamp:document_id'."
94+
)
8995
search_params["search_after"] = [parts[0], parts[1]]
9096

9197
try:

src/tests/_internal/server/services/test_fluentbit_logs.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest_asyncio
77
from sqlalchemy.ext.asyncio import AsyncSession
88

9+
from dstack._internal.core.errors import ServerClientError
910
from dstack._internal.server.models import ProjectModel
1011
from dstack._internal.server.schemas.logs import PollLogsRequest
1112
from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent
@@ -590,6 +591,38 @@ def test_read_with_next_token(self, mock_es_client):
590591
call_args = mock_es_client.search.call_args
591592
assert call_args.kwargs["search_after"] == ["1696586513234", "doc1"]
592593

594+
def test_read_with_malformed_next_token_raises_client_error(self, mock_es_client):
595+
"""Test that malformed next_token raises ServerClientError (400) instead of IndexError (500)."""
596+
with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock:
597+
mock.return_value = mock_es_client
598+
reader = ElasticsearchReader(
599+
host="http://localhost:9200",
600+
index="dstack-logs",
601+
)
602+
603+
request = PollLogsRequest(
604+
run_name="test-run",
605+
job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"),
606+
next_token="invalid_token_no_colon",
607+
limit=100,
608+
)
609+
with pytest.raises(ServerClientError, match="Invalid next_token"):
610+
reader.read("test-stream", request)
611+
612+
request.next_token = ":"
613+
with pytest.raises(ServerClientError, match="Invalid next_token"):
614+
reader.read("test-stream", request)
615+
616+
request.next_token = ":doc1"
617+
with pytest.raises(ServerClientError, match="Invalid next_token"):
618+
reader.read("test-stream", request)
619+
620+
request.next_token = "1696586513234:"
621+
with pytest.raises(ServerClientError, match="Invalid next_token"):
622+
reader.read("test-stream", request)
623+
624+
mock_es_client.search.assert_not_called()
625+
593626
def test_close_closes_client(self, mock_es_client):
594627
with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock:
595628
mock.return_value = mock_es_client

0 commit comments

Comments
 (0)