Skip to content

Commit 12a13cd

Browse files
author
Dylan Huang
committed
fix ordering of logs
1 parent 5702b38 commit 12a13cd

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

eval_protocol/log_utils/elasticsearch_client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,22 @@ def search(
202202
except Exception:
203203
return None
204204

205-
def search_by_term(self, field: str, value: Any, size: int = 10) -> Optional[Dict[str, Any]]:
205+
def search_by_term(
206+
self, field: str, value: Any, size: int = 10, sort: Optional[List[Dict[str, Any]]] = None
207+
) -> Optional[Dict[str, Any]]:
206208
"""Search documents by exact term match.
207209
208210
Args:
209211
field: Field name to search
210212
value: Value to match
211213
size: Number of results to return
214+
sort: Sort specification
212215
213216
Returns:
214217
Dict containing search results, or None if failed
215218
"""
216219
query = {"term": {field: value}}
217-
return self.search(query, size=size)
220+
return self.search(query, size=size, sort=sort)
218221

219222
def search_by_match(
220223
self, field: str, value: str, size: int = 10, sort: Optional[List[Dict[str, Any]]] = None

eval_protocol/utils/logs_server.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import threading
66
import time
7+
from datetime import datetime
78
from contextlib import asynccontextmanager
89
from queue import Queue
910
from typing import TYPE_CHECKING, Any, Dict, List, Optional
@@ -17,6 +18,7 @@
1718
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE
1819
from eval_protocol.event_bus import event_bus
1920
from eval_protocol.models import Status
21+
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
2022
from eval_protocol.utils.vite_server import ViteServer
2123
from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient
2224
from eval_protocol.types.remote_rollout_processor import ElasticsearchConfig
@@ -351,11 +353,19 @@ async def get_logs(
351353
raise HTTPException(status_code=503, detail="Elasticsearch is not configured for this logs server")
352354

353355
try:
354-
# Search for logs by rollout_id, sorted by timestamp in descending order (newest first)
355-
sort_spec = [{"@timestamp": {"order": "desc"}}]
356-
search_results = self.elasticsearch_client.search_by_match(
357-
"rollout_id", rollout_id, size=limit, sort=sort_spec
358-
)
356+
# Search for logs by rollout_id using a term filter (exact match),
357+
# sorted by timestamp desc with a secondary deterministic tie-breaker on _id desc
358+
sort_spec = [
359+
{"@timestamp": {"order": "asc"}},
360+
]
361+
query = {
362+
"bool": {
363+
"must": [
364+
{"term": {"rollout_id": rollout_id}},
365+
]
366+
}
367+
}
368+
search_results = self.elasticsearch_client.search(query, size=limit, sort=sort_spec)
359369

360370
if not search_results or "hits" not in search_results:
361371
# Return empty response using Pydantic model
@@ -512,10 +522,14 @@ def serve_logs(port: Optional[int] = None, elasticsearch_config: Optional[Elasti
512522

513523
args = parser.parse_args()
514524

525+
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
526+
515527
# Create server with command line arguments
516528
if args.build_dir:
517-
server = LogsServer(host=args.host, port=args.port, build_dir=args.build_dir)
529+
server = LogsServer(
530+
host=args.host, port=args.port, build_dir=args.build_dir, elasticsearch_config=elasticsearch_config
531+
)
518532
else:
519-
server = LogsServer(host=args.host, port=args.port)
533+
server = LogsServer(host=args.host, port=args.port, elasticsearch_config=elasticsearch_config)
520534

521535
server.run()

0 commit comments

Comments
 (0)