Skip to content

Commit 1fb6375

Browse files
committed
refactor(fluentbit): cleanup protocol lambdas and address codex comments
1 parent 517659b commit 1fb6375

2 files changed

Lines changed: 65 additions & 46 deletions

File tree

src/dstack/_internal/server/services/logs/fluentbit.py

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Optional, Protocol, runtime_checkable
1+
from typing import List, Optional, Protocol
22
from uuid import UUID
33

44
from dstack._internal.core.models.logs import (
@@ -21,16 +21,13 @@
2121
logger = get_logger(__name__)
2222

2323

24-
# Check if elasticsearch is available (optional for ship-only mode)
2524
ELASTICSEARCH_AVAILABLE = True
2625
try:
2726
from elasticsearch import Elasticsearch
2827
from elasticsearch.exceptions import ApiError, TransportError
2928
except ImportError:
3029
ELASTICSEARCH_AVAILABLE = False
3130
else:
32-
33-
# Catch both API errors and transport/connection errors
3431
ElasticsearchError: tuple = (ApiError, TransportError) # type: ignore[misc]
3532

3633
class ElasticsearchReader:
@@ -51,9 +48,7 @@ def __init__(
5148
try:
5249
self._client.info()
5350
except ElasticsearchError as e:
54-
raise LogStorageError(
55-
f"Failed to connect to Elasticsearch/OpenSearch: {e}"
56-
) from e
51+
raise LogStorageError(f"Failed to connect to Elasticsearch/OpenSearch: {e}") from e
5752

5853
def read(
5954
self,
@@ -82,12 +77,16 @@ def read(
8277
search_params: dict = {
8378
"index": self._index,
8479
"query": query,
85-
"sort": [{"@timestamp": {"order": sort_order}}],
80+
"sort": [
81+
{"@timestamp": {"order": sort_order}},
82+
{"_id": {"order": sort_order}},
83+
],
8684
"size": request.limit,
8785
}
8886

8987
if request.next_token:
90-
search_params["search_after"] = [request.next_token]
88+
parts = request.next_token.split(":", 1)
89+
search_params["search_after"] = [parts[0], parts[1]]
9190

9291
try:
9392
response = self._client.search(**search_params)
@@ -97,7 +96,7 @@ def read(
9796

9897
hits = response.get("hits", {}).get("hits", [])
9998
logs = []
100-
last_sort_value = None
99+
last_sort_values = None
101100

102101
for hit in hits:
103102
source = hit.get("_source", {})
@@ -108,9 +107,7 @@ def read(
108107
from datetime import datetime
109108

110109
try:
111-
timestamp = datetime.fromisoformat(
112-
timestamp_str.replace("Z", "+00:00")
113-
)
110+
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
114111
except ValueError:
115112
continue
116113
else:
@@ -125,12 +122,12 @@ def read(
125122
)
126123

127124
sort_values = hit.get("sort")
128-
if sort_values:
129-
last_sort_value = sort_values[0]
125+
if sort_values and len(sort_values) >= 2:
126+
last_sort_values = sort_values
130127

131128
next_token = None
132-
if len(logs) == request.limit and last_sort_value is not None:
133-
next_token = str(last_sort_value)
129+
if len(logs) == request.limit and last_sort_values is not None:
130+
next_token = f"{last_sort_values[0]}:{last_sort_values[1]}"
134131

135132
return JobSubmissionLogs(
136133
logs=logs,
@@ -149,29 +146,13 @@ def close(self) -> None:
149146
FLUENTBIT_AVAILABLE = False
150147
else:
151148

152-
@runtime_checkable
153149
class FluentBitWriter(Protocol):
154-
"""Protocol for Fluent-bit log writers."""
155-
156-
def write(self, tag: str, records: List[dict]) -> None:
157-
"""Write log records to Fluent-bit."""
158-
...
150+
def write(self, tag: str, records: List[dict]) -> None: ...
151+
def close(self) -> None: ...
159152

160-
def close(self) -> None:
161-
"""Close any resources."""
162-
...
163-
164-
@runtime_checkable
165153
class LogReader(Protocol):
166-
"""Protocol for log readers (Interface Segregation Principle)."""
167-
168-
def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs:
169-
"""Read logs from the storage backend."""
170-
...
171-
172-
def close(self) -> None:
173-
"""Close any resources."""
174-
...
154+
def read(self, stream_name: str, request: PollLogsRequest) -> JobSubmissionLogs: ...
155+
def close(self) -> None: ...
175156

176157
class HTTPFluentBitWriter:
177158
"""Writes logs to Fluent-bit via HTTP POST."""
@@ -183,11 +164,21 @@ def __init__(self, host: str, port: int) -> None:
183164
def write(self, tag: str, records: List[dict]) -> None:
184165
for record in records:
185166
try:
186-
self._client.post(
167+
response = self._client.post(
187168
f"{self._endpoint}/{tag}",
188169
json=record,
189170
headers={"Content-Type": "application/json"},
190171
)
172+
response.raise_for_status()
173+
except httpx.HTTPStatusError as e:
174+
logger.error(
175+
"Fluent-bit HTTP request failed with status %d: %s",
176+
e.response.status_code,
177+
e.response.text,
178+
)
179+
raise LogStorageError(
180+
f"Fluent-bit HTTP error: status {e.response.status_code}"
181+
) from e
191182
except httpx.HTTPError as e:
192183
logger.error("Failed to write log to Fluent-bit via HTTP: %s", e)
193184
raise LogStorageError(f"Fluent-bit HTTP error: {e}") from e
@@ -257,7 +248,6 @@ def __init__(
257248
else:
258249
raise LogStorageError(f"Unsupported Fluent-bit protocol: {protocol}")
259250

260-
# Initialize reader based on configuration (Dependency Inversion Principle)
261251
self._reader: LogReader
262252
if es_host:
263253
if not ELASTICSEARCH_AVAILABLE:

src/tests/_internal/server/services/test_fluentbit_logs.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,33 @@ def test_write_posts_records(self, mock_httpx_client):
8383
headers={"Content-Type": "application/json"},
8484
)
8585

86-
def test_write_raises_on_http_error(self, mock_httpx_client):
86+
def test_write_calls_raise_for_status(self, mock_httpx_client):
87+
"""Test that response.raise_for_status() is called to detect non-2xx responses."""
88+
mock_response = Mock()
89+
mock_httpx_client.post.return_value = mock_response
90+
writer = HTTPFluentBitWriter(host="localhost", port=8080)
91+
92+
writer.write(tag="test-tag", records=[{"message": "test"}])
93+
94+
mock_response.raise_for_status.assert_called_once()
95+
96+
def test_write_raises_on_http_status_error(self, mock_httpx_client):
97+
"""Test that 4xx/5xx responses are properly detected and raise LogStorageError."""
98+
import httpx
99+
100+
mock_response = Mock()
101+
mock_response.status_code = 500
102+
mock_response.text = "Internal Server Error"
103+
mock_httpx_client.post.return_value = mock_response
104+
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
105+
"Server Error", request=Mock(), response=mock_response
106+
)
107+
writer = HTTPFluentBitWriter(host="localhost", port=8080)
108+
109+
with pytest.raises(LogStorageError, match="Fluent-bit HTTP error: status 500"):
110+
writer.write(tag="test-tag", records=[{"message": "test"}])
111+
112+
def test_write_raises_on_transport_error(self, mock_httpx_client):
87113
import httpx
88114

89115
mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed")
@@ -468,15 +494,15 @@ def test_read_returns_logs(self, mock_es_client):
468494
"message": "Hello",
469495
"stream": "test-stream",
470496
},
471-
"sort": [1696586513234],
497+
"sort": [1696586513234, "doc1"],
472498
},
473499
{
474500
"_source": {
475501
"@timestamp": "2023-10-06T10:01:53.235000+00:00",
476502
"message": "World",
477503
"stream": "test-stream",
478504
},
479-
"sort": [1696586513235],
505+
"sort": [1696586513235, "doc2"],
480506
},
481507
]
482508
}
@@ -499,7 +525,7 @@ def test_read_returns_logs(self, mock_es_client):
499525
assert len(result.logs) == 2
500526
assert result.logs[0].message == "Hello"
501527
assert result.logs[1].message == "World"
502-
assert result.next_token == "1696586513235"
528+
assert result.next_token == "1696586513235:doc2"
503529

504530
def test_read_with_time_filtering(self, mock_es_client):
505531
with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock:
@@ -540,7 +566,10 @@ def test_read_descending_order(self, mock_es_client):
540566
reader.read("test-stream", request)
541567

542568
call_args = mock_es_client.search.call_args
543-
assert call_args.kwargs["sort"] == [{"@timestamp": {"order": "desc"}}]
569+
assert call_args.kwargs["sort"] == [
570+
{"@timestamp": {"order": "desc"}},
571+
{"_id": {"order": "desc"}},
572+
]
544573

545574
def test_read_with_next_token(self, mock_es_client):
546575
with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock:
@@ -553,13 +582,13 @@ def test_read_with_next_token(self, mock_es_client):
553582
request = PollLogsRequest(
554583
run_name="test-run",
555584
job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"),
556-
next_token="1696586513234",
585+
next_token="1696586513234:doc1",
557586
limit=100,
558587
)
559588
reader.read("test-stream", request)
560589

561590
call_args = mock_es_client.search.call_args
562-
assert call_args.kwargs["search_after"] == ["1696586513234"]
591+
assert call_args.kwargs["search_after"] == ["1696586513234", "doc1"]
563592

564593
def test_close_closes_client(self, mock_es_client):
565594
with patch("dstack._internal.server.services.logs.fluentbit.Elasticsearch") as mock:

0 commit comments

Comments
 (0)