diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 55242f45f0..4ea0e0ed72 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -16,8 +16,6 @@ "@cloudscape-design/global-styles": "^1.0.33", "@hookform/resolvers": "^2.9.10", "@reduxjs/toolkit": "^1.9.1", - "@xterm/addon-fit": "^0.10.0", - "@xterm/xterm": "^5.5.0", "ace-builds": "^1.36.3", "classnames": "^2.5.1", "css-minimizer-webpack-plugin": "^4.2.2", @@ -5665,19 +5663,6 @@ } } }, - "node_modules/@xterm/addon-fit": { - "version": "0.10.0", - "resolved": "https://registry.npmjs.org/@xterm/addon-fit/-/addon-fit-0.10.0.tgz", - "integrity": "sha512-UFYkDm4HUahf2lnEyHvio51TNGiLK66mqP2JoATy7hRZeXaGMRDr00JiSF7m63vR5WKATF605yEggJKsw0JpMQ==", - "peerDependencies": { - "@xterm/xterm": "^5.0.0" - } - }, - "node_modules/@xterm/xterm": { - "version": "5.5.0", - "resolved": "https://registry.npmjs.org/@xterm/xterm/-/xterm-5.5.0.tgz", - "integrity": "sha512-hqJHYaQb5OptNunnyAnkHyM8aCjZ1MEIDTQu1iIbbTD/xops91NB5yq1ZK/dC2JDbVWtF23zUtl9JE2NqwT87A==" - }, "node_modules/@xtuc/ieee754": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@xtuc/ieee754/-/ieee754-1.2.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index f4de75c603..81aa79258f 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -105,8 +105,6 @@ "@cloudscape-design/global-styles": "^1.0.33", "@hookform/resolvers": "^2.9.10", "@reduxjs/toolkit": "^1.9.1", - "@xterm/addon-fit": "^0.10.0", - "@xterm/xterm": "^5.5.0", "ace-builds": "^1.36.3", "classnames": "^2.5.1", "css-minimizer-webpack-plugin": "^4.2.2", diff --git a/frontend/src/components/Code/index.tsx b/frontend/src/components/Code/index.tsx index c0c844b9a2..180d4586a7 100644 --- a/frontend/src/components/Code/index.tsx +++ b/frontend/src/components/Code/index.tsx @@ -1,4 +1,4 @@ -import React from 'react'; +import React, { forwardRef } from 'react'; import classNames from 'classnames'; import Box from '@cloudscape-design/components/box'; @@ -8,12 +8,12 @@ export interface Props extends React.PropsWithChildren { className?: string; } -export const Code: React.FC = ({ children, className }) => { +export const Code = forwardRef(({ children, className }, ref) => { return ( -
+
{children}
); -}; +}); diff --git a/frontend/src/components/index.ts b/frontend/src/components/index.ts index cd79f5add9..f68297f1d2 100644 --- a/frontend/src/components/index.ts +++ b/frontend/src/components/index.ts @@ -56,7 +56,6 @@ export { default as PropertyFilter } from '@cloudscape-design/components/propert export type { PropertyFilterProps } from '@cloudscape-design/components/property-filter'; export type { LineChartProps } from '@cloudscape-design/components/line-chart/interfaces'; export type { ModalProps } from '@cloudscape-design/components/modal'; -export type { TilesProps } from '@cloudscape-design/components/tiles'; // custom components export { NavigateLink } from './NavigateLink'; diff --git a/frontend/src/index.tsx b/frontend/src/index.tsx index 047d917335..1f92f308cb 100644 --- a/frontend/src/index.tsx +++ b/frontend/src/index.tsx @@ -12,7 +12,6 @@ import 'ace-builds/css/ace.css'; import 'ace-builds/css/theme/cloud_editor.css'; import 'ace-builds/css/theme/cloud_editor_dark.css'; import 'assets/css/index.css'; -import '@xterm/xterm/css/xterm.css'; import 'locale'; diff --git a/frontend/src/pages/Runs/Details/Logs/helpers.ts b/frontend/src/pages/Runs/Details/Logs/helpers.ts index 72b4d27ca4..94ee018676 100644 --- a/frontend/src/pages/Runs/Details/Logs/helpers.ts +++ b/frontend/src/pages/Runs/Details/Logs/helpers.ts @@ -7,3 +7,18 @@ export const getJobSubmissionId = (run?: IRun): string | undefined => { return lastJob.job_submissions[lastJob.job_submissions.length - 1]?.id; }; + +export const decodeLogs = (logs: ILogItem[]): ILogItem[] => { + return logs.map((log: ILogItem) => { + let { message } = log; + + try { + message = atob(message); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + } catch (e) { + return log; + } + + return { ...log, message }; + }); +}; diff --git a/frontend/src/pages/Runs/Details/Logs/index.tsx b/frontend/src/pages/Runs/Details/Logs/index.tsx index 6fc5501039..fdc0f1cfd2 100644 --- a/frontend/src/pages/Runs/Details/Logs/index.tsx +++ b/frontend/src/pages/Runs/Details/Logs/index.tsx @@ -1,43 +1,66 @@ -import React, { useEffect, useRef, useState } from 'react'; +import React, { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from 'react'; import { useTranslation } from 'react-i18next'; import classNames from 'classnames'; -import { Mode } from '@cloudscape-design/global-styles'; -import { FitAddon } from '@xterm/addon-fit'; -import { Terminal } from '@xterm/xterm'; -import { Container, Header, ListEmptyMessage, Loader, TextContent } from 'components'; +import { Box, Button, Code, Container, Header, ListEmptyMessage, Loader, TextContent } from 'components'; -import { useAppSelector } from 'hooks'; +import { useLocalStorageState } from 'hooks/useLocalStorageState'; import { useLazyGetProjectLogsQuery } from 'services/project'; -import { selectSystemMode } from 'App/slice'; +import { decodeLogs } from './helpers'; import { IProps } from './types'; import styles from './styles.module.scss'; -const LIMIT_LOG_ROWS = 1000; +const LIMIT_LOG_ROWS = 100; +const LOADING_SCROLL_GAP = 300; export const Logs: React.FC = ({ className, projectName, runName, jobSubmissionId }) => { const { t } = useTranslation(); - const appliedTheme = useAppSelector(selectSystemMode); + const codeRef = useRef(null); + const nextTokenRef = useRef(undefined); + const scrollPositionByBottom = useRef(0); - const terminalInstance = useRef(new Terminal({ scrollback: 10000000 })); - const fitAddonInstance = useRef(new FitAddon()); const [logsData, setLogsData] = useState([]); const [isLoading, setIsLoading] = useState(false); - const [getProjectLogs] = useLazyGetProjectLogsQuery(); + const [isEnabledDecoding, setIsEnabledDecoding] = useLocalStorageState('enable-encode-logs', false); + // const [isShowTimestamp, setIsShowTimestamp] = useLocalStorageState('enable-showing-timestamp-logs', false); + + const logsForView = useMemo(() => { + if (isEnabledDecoding) { + return decodeLogs(logsData); + } + + return logsData; + }, [logsData, isEnabledDecoding]); + + const saveScrollPositionByBottom = () => { + if (!codeRef.current) return; - const writeDataToTerminal = (logs: ILogItem[]) => { - logs.forEach((logItem) => { - terminalInstance.current.write(logItem.message.replace(/(? { + if (!codeRef.current) return; - fitAddonInstance.current.fit(); + const { clientHeight, scrollHeight } = codeRef.current; + codeRef.current.scrollTo(0, scrollHeight - clientHeight - scrollPositionByBottom.current); }; - const getNextLogItems = (nextToken?: string) => { + const checkNeedMoreLoadingData = () => { + if (!codeRef.current) return; + + const { clientHeight, scrollHeight } = codeRef.current; + + if (scrollHeight - clientHeight <= LOADING_SCROLL_GAP) { + getLogItems(); + } + }; + + const getLogItems = (nextToken?: string) => { setIsLoading(true); if (!jobSubmissionId) { @@ -47,86 +70,131 @@ export const Logs: React.FC = ({ className, projectName, runName, jobSub getProjectLogs({ project_name: projectName, run_name: runName, - descending: false, - job_submission_id: jobSubmissionId ?? '', + descending: true, + job_submission_id: jobSubmissionId, next_token: nextToken, limit: LIMIT_LOG_ROWS, }) .unwrap() .then((response) => { - setLogsData((old) => [...old, ...response.logs]); - - writeDataToTerminal(response.logs); - - if (response.next_token) { - getNextLogItems(response.next_token); - } else { - setIsLoading(false); - } + saveScrollPositionByBottom(); + const reversed = response.logs.toReversed(); + setLogsData((old) => [...reversed, ...old]); + nextTokenRef.current = response.next_token; + setIsLoading(false); }) .catch(() => setIsLoading(false)); }; + const getNextLogItems = () => { + if (nextTokenRef.current) { + getLogItems(nextTokenRef.current); + } + }; + + const toggleDecodeLogs = () => { + saveScrollPositionByBottom(); + setIsEnabledDecoding(!isEnabledDecoding); + }; + useEffect(() => { - if (appliedTheme === Mode.Light) { - terminalInstance.current.options.theme = { - foreground: '#000716', - background: '#ffffff', - selectionBackground: '#B4D5FE', - }; + getLogItems(); + }, []); + + useLayoutEffect(() => { + if (logsForView.length && logsForView.length <= LIMIT_LOG_ROWS) { + scrollToBottom(); } else { - terminalInstance.current.options.theme = { - foreground: '#b6bec9', - background: '#161d26', - }; + restoreScrollPositionByBottom(); } - }, [appliedTheme]); - useEffect(() => { - terminalInstance.current.loadAddon(fitAddonInstance.current); + if (logsForView.length) checkNeedMoreLoadingData(); + }, [logsForView]); - getNextLogItems(); + const onScroll = useCallback( + (event) => { + const element = event.target as HTMLDivElement; - const onResize = () => { - fitAddonInstance.current.fit(); - }; + if (element.scrollTop <= LOADING_SCROLL_GAP && !isLoading) { + getNextLogItems(); + } + }, + [isLoading, logsForView], + ); + + useEffect(() => { + if (!codeRef.current) return; - window.addEventListener('resize', onResize); + codeRef.current.addEventListener('scroll', onScroll); return () => { - window.removeEventListener('resize', onResize); + if (codeRef.current) codeRef.current.removeEventListener('scroll', onScroll); }; - }, []); + }, [codeRef.current, onScroll]); - useEffect(() => { - const element = document.getElementById('terminal'); + const scrollToBottom = () => { + if (!codeRef.current) return; - if (terminalInstance.current && element) { - terminalInstance.current.open(element); - } - }, []); + const { clientHeight, scrollHeight } = codeRef.current; + codeRef.current.scrollTo(0, scrollHeight - clientHeight); + }; return (
-
- {t('projects.run.log')} - +
+
+
{t('projects.run.log')}
- + + + +
+ +
+
} > - {!isLoading && !logsData.length && ( + {!isLoading && !logsForView.length && ( )} -
+ {!logsForView.length && } + + {Boolean(logsForView.length) && ( + + {logsForView.map((log, i) => ( +

+ {/*{isShowTimestamp && {log.timestamp}}*/} + {log.message} +

+ ))} +
+ )}
diff --git a/frontend/src/pages/Runs/Details/Logs/styles.module.scss b/frontend/src/pages/Runs/Details/Logs/styles.module.scss index ba87fd6d43..ee9dba742a 100644 --- a/frontend/src/pages/Runs/Details/Logs/styles.module.scss +++ b/frontend/src/pages/Runs/Details/Logs/styles.module.scss @@ -4,16 +4,33 @@ display: flex; gap: 10px; align-items: center; + padding-top: 4px; + + .headerTitle { + flex-shrink: 0; + margin-top: -4px; + } + + .switchers { + margin-left: auto; + display: flex; + gap: 24px; + } } .loader { position: relative; - top: -2px; height: 20px; background-color: rgba(awsui.$color-background-container-content, .8); color: #6e6e6e; } +.mainLoader { + margin-top: auto; + margin-bottom: auto; + transform: translateY(-24px); +} + .logs { display: flex; flex-direction: column; @@ -56,6 +73,23 @@ .terminal { flex-grow: 1; min-height: 0; + height: 0; + overflow-y: auto; + background-color: awsui.$color-background-layout-main; + + code { + color: awsui.$color-text-body-default !important; + } + + p { + padding: 0 !important; + font-size: awsui.$font-size-body-s !important; + line-height: awsui.$line-height-body-s !important; + + .timestamp { + padding-right: 8px; + } + } } .scroll { diff --git a/src/dstack/_internal/server/schemas/logs.py b/src/dstack/_internal/server/schemas/logs.py index f97d4fde37..fd84ba6ff7 100644 --- a/src/dstack/_internal/server/schemas/logs.py +++ b/src/dstack/_internal/server/schemas/logs.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Optional -from pydantic import UUID4, Field, validator +from pydantic import UUID4, Field from dstack._internal.core.models.common import CoreModel @@ -15,11 +15,3 @@ class PollLogsRequest(CoreModel): next_token: Optional[str] = None limit: int = Field(100, ge=0, le=1000) diagnose: bool = False - - @validator("descending") - @classmethod - def validate_descending(cls, v): - # Descending is not supported until we migrate from base64-encoded logs to plain text logs. - if v is True: - raise ValueError("descending: true is not supported") - return v diff --git a/src/dstack/_internal/server/services/logs/aws.py b/src/dstack/_internal/server/services/logs/aws.py index 616db94dbb..dbd6a750a6 100644 --- a/src/dstack/_internal/server/services/logs/aws.py +++ b/src/dstack/_internal/server/services/logs/aws.py @@ -55,6 +55,8 @@ class CloudWatchLogStorage(LogStorage): PAST_EVENT_MAX_DELTA = int((timedelta(days=14)).total_seconds()) * 1000 - CLOCK_DRIFT # "None of the log events in the batch can be more than 2 hours in the future." FUTURE_EVENT_MAX_DELTA = int((timedelta(hours=2)).total_seconds()) * 1000 - CLOCK_DRIFT + # Maximum number of retries when polling for log events to skip empty pages. + MAX_RETRIES = 10 def __init__(self, *, group: str, region: Optional[str] = None) -> None: with self._wrap_boto_errors(): @@ -80,7 +82,7 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi next_token: Optional[str] = None with self._wrap_boto_errors(): try: - cw_events, next_token = self._get_log_events(stream, request) + cw_events, next_token = self._get_log_events_with_retry(stream, request) except botocore.exceptions.ClientError as e: if not self._is_resource_not_found_exception(e): raise @@ -101,7 +103,47 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi ) for cw_event in cw_events ] - return JobSubmissionLogs(logs=logs, next_token=next_token if len(logs) > 0 else None) + return JobSubmissionLogs(logs=logs, next_token=next_token) + + def _get_log_events_with_retry( + self, stream: str, request: PollLogsRequest + ) -> Tuple[List[_CloudWatchLogEvent], Optional[str]]: + current_request = request + previous_next_token = request.next_token + + for attempt in range(self.MAX_RETRIES): + cw_events, next_token = self._get_log_events(stream, current_request) + + if cw_events: + return cw_events, next_token + + if not next_token or next_token == previous_next_token: + return [], None + + previous_next_token = next_token + current_request = PollLogsRequest( + run_name=request.run_name, + job_submission_id=request.job_submission_id, + start_time=request.start_time, + end_time=request.end_time, + descending=request.descending, + next_token=next_token, + limit=request.limit, + diagnose=request.diagnose, + ) + + if not request.descending: + logger.debug( + "Stream %s: exhausted %d retries without finding logs, returning empty response", + stream, + self.MAX_RETRIES, + ) + # Only return the next token after exhausting retries if going descending— + # AWS CloudWatch guarantees more logs in that case. In ascending mode, + # next token is always returned, even if no logs remain. + # So descending works reliably; ascending has limits if gaps are too large. + # In the future, UI/CLI should handle retries, and we can return next token for ascending too. + return [], next_token if request.descending else None def _get_log_events( self, stream: str, request: PollLogsRequest @@ -115,7 +157,7 @@ def _get_log_events( } if request.start_time: - parameters["startTime"] = datetime_to_unix_time_ms(request.start_time) + 1 + parameters["startTime"] = datetime_to_unix_time_ms(request.start_time) if request.end_time: parameters["endTime"] = datetime_to_unix_time_ms(request.end_time) diff --git a/src/dstack/_internal/server/services/logs/filelog.py b/src/dstack/_internal/server/services/logs/filelog.py index 10cbe3d1a2..823222a409 100644 --- a/src/dstack/_internal/server/services/logs/filelog.py +++ b/src/dstack/_internal/server/services/logs/filelog.py @@ -1,5 +1,6 @@ +import os from pathlib import Path -from typing import List, Union +from typing import Generator, List, Optional, Tuple, Union from uuid import UUID from dstack._internal.core.errors import ServerClientError @@ -37,18 +38,17 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi producer=log_producer, ) + if request.descending: + return self._poll_logs_descending(log_file_path, request) + else: + return self._poll_logs_ascending(log_file_path, request) + + def _poll_logs_ascending( + self, log_file_path: Path, request: PollLogsRequest + ) -> JobSubmissionLogs: start_line = 0 if request.next_token: - try: - start_line = int(request.next_token) - if start_line < 0: - raise ServerClientError( - f"Invalid next_token: {request.next_token}. Must be a non-negative integer." - ) - except ValueError: - raise ServerClientError( - f"Invalid next_token: {request.next_token}. Must be a valid integer." - ) + start_line = self._next_token(request) logs = [] next_token = None @@ -94,6 +94,102 @@ def poll_logs(self, project: ProjectModel, request: PollLogsRequest) -> JobSubmi return JobSubmissionLogs(logs=logs, next_token=next_token) + def _poll_logs_descending( + self, log_file_path: Path, request: PollLogsRequest + ) -> JobSubmissionLogs: + start_offset = self._next_token(request) + + candidate_logs = [] + + try: + line_generator = self._read_lines_reversed(log_file_path, start_offset) + + for line_bytes, line_start_offset in line_generator: + try: + line_str = line_bytes.decode("utf-8") + log_event = LogEvent.__response__.parse_raw(line_str) + except Exception: + continue # Skip malformed lines + + if request.end_time is not None and log_event.timestamp > request.end_time: + continue + if request.start_time and log_event.timestamp <= request.start_time: + break + + candidate_logs.append((log_event, line_start_offset)) + + if len(candidate_logs) > request.limit: + break + except FileNotFoundError: + return JobSubmissionLogs(logs=[], next_token=None) + + logs = [log for log, offset in candidate_logs[: request.limit]] + next_token = None + if len(candidate_logs) > request.limit: + # We fetched one more than the limit, so there are more pages. + # The next token should point to the start of the last log we are returning. + _last_log_event, last_log_offset = candidate_logs[request.limit - 1] + next_token = str(last_log_offset) + + return JobSubmissionLogs(logs=logs, next_token=next_token) + + @staticmethod + def _read_lines_reversed( + filepath: Path, start_offset: Optional[int] = None, chunk_size: int = 8192 + ) -> Generator[Tuple[bytes, int], None, None]: + """ + A generator that yields lines from a file in reverse order, along with the byte + offset of the start of each line. This is memory-efficient for large files. + """ + with open(filepath, "rb") as f: + f.seek(0, os.SEEK_END) + file_size = f.tell() + cursor = file_size + + # If a start_offset is provided, optimize by starting the read + # from a more specific location instead of the end of the file. + if start_offset is not None and start_offset < file_size: + # To get the full content of the line that straddles the offset, + # we need to find its end (the next newline character). + f.seek(start_offset) + chunk = f.read(chunk_size) + newline_pos = chunk.find(b"\n") + if newline_pos != -1: + # Found the end of the line. The cursor for reverse reading + # should start from this point to include the full line. + cursor = start_offset + newline_pos + 1 + else: + # No newline found, which means the rest of the file is one line. + # The default cursor pointing to file_size is correct. + pass + + buffer = b"" + + while cursor > 0: + seek_pos = max(0, cursor - chunk_size) + amount_to_read = cursor - seek_pos + f.seek(seek_pos) + chunk = f.read(amount_to_read) + cursor = seek_pos + + buffer = chunk + buffer + + while b"\n" in buffer: + newline_pos = buffer.rfind(b"\n") + line = buffer[newline_pos + 1 :] + line_start_offset = cursor + newline_pos + 1 + + # Skip lines that start at or after the start_offset + if start_offset is None or line_start_offset < start_offset: + yield line, line_start_offset + + buffer = buffer[:newline_pos] + + # The remaining buffer is the first line of the file. + # Only yield it if we're not using start_offset or if it starts before start_offset + if buffer and (start_offset is None or 0 < start_offset): + yield buffer, 0 + def write_logs( self, project: ProjectModel, @@ -148,3 +244,17 @@ def _runner_log_event_to_log_event(self, runner_log_event: RunnerLogEvent) -> Lo log_source=LogEventSource.STDOUT, message=runner_log_event.message.decode(errors="replace"), ) + + def _next_token(self, request: PollLogsRequest) -> Optional[int]: + next_token = request.next_token + if next_token is None: + return None + try: + value = int(next_token) + if value < 0: + raise ValueError("Offset must be non-negative") + return value + except (ValueError, TypeError): + raise ServerClientError( + f"Invalid next_token: {next_token}. Must be a non-negative integer." + ) diff --git a/src/tests/_internal/server/services/test_logs.py b/src/tests/_internal/server/services/test_logs.py index 0b94209175..892ed2d77b 100644 --- a/src/tests/_internal/server/services/test_logs.py +++ b/src/tests/_internal/server/services/test_logs.py @@ -9,11 +9,10 @@ import pytest import pytest_asyncio from freezegun import freeze_time -from pydantic import ValidationError from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.core.errors import ServerClientError -from dstack._internal.core.models.logs import LogEvent, LogEventSource +from dstack._internal.core.models.logs import LogEvent, LogEventSource, LogProducer from dstack._internal.server.models import ProjectModel from dstack._internal.server.schemas.logs import PollLogsRequest from dstack._internal.server.schemas.runner import LogEvent as RunnerLogEvent @@ -496,31 +495,59 @@ async def test_next_token_beyond_file_end( class TestPollLogsRequestValidation: - def test_descending_true_not_supported(self): - """Test that descending: true raises a validation error.""" - with pytest.raises(ValidationError, match="descending: true is not supported"): - PollLogsRequest( - run_name="test-run", - job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), - descending=True, - ) + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_basic( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test basic descending log polling functionality.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) - def test_descending_false_is_supported(self): - """Test that descending: false works correctly.""" - request = PollLogsRequest( - run_name="test-run", + # Write test logs + log_storage.write_logs( + project=project, + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[ + RunnerLogEvent(timestamp=1696586513234, message=b"Log1"), + RunnerLogEvent(timestamp=1696586513235, message=b"Log2"), + RunnerLogEvent(timestamp=1696586513236, message=b"Log3"), + RunnerLogEvent(timestamp=1696586513237, message=b"Log4"), + RunnerLogEvent(timestamp=1696586513238, message=b"Log5"), + ], + job_logs=[], + ) + + # Test descending polling + poll_request = PollLogsRequest( + run_name="test_run", job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), - descending=False, + limit=10, + diagnose=True, + descending=True, ) - assert request.descending is False + job_submission_logs = log_storage.poll_logs(project, poll_request) + + # Should return logs in descending order (newest first) + assert len(job_submission_logs.logs) == 5 + assert job_submission_logs.logs[0].message == "Log5" + assert job_submission_logs.logs[1].message == "Log4" + assert job_submission_logs.logs[2].message == "Log3" + assert job_submission_logs.logs[3].message == "Log2" + assert job_submission_logs.logs[4].message == "Log1" + assert job_submission_logs.next_token is None # All logs returned @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_poll_logs_with_limit(self, test_db, session: AsyncSession, tmp_path: Path): + async def test_poll_logs_descending_with_limit( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test descending log polling with limit smaller than total logs.""" project = await create_project(session=session) log_storage = FileLogStorage(tmp_path) - # Write more logs than the limit + # Write test logs log_storage.write_logs( project=project, run_name="test_run", @@ -534,18 +561,6 @@ async def test_poll_logs_with_limit(self, test_db, session: AsyncSession, tmp_pa ], job_logs=[], ) - logs = log_storage.poll_logs( - project, - PollLogsRequest( - run_name="test_run", - job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), - start_time=None, - end_time=None, - limit=1000, - diagnose=True, - ), - ).logs - assert len(logs) == 5 # Test with limit smaller than total logs poll_request = PollLogsRequest( @@ -553,26 +568,232 @@ async def test_poll_logs_with_limit(self, test_db, session: AsyncSession, tmp_pa job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), limit=3, diagnose=True, + descending=True, ) job_submission_logs = log_storage.poll_logs(project, poll_request) - # Should return only the first 3 logs and provide next_token + # Should return only the last 3 logs in descending order assert len(job_submission_logs.logs) == 3 - assert job_submission_logs.logs[0].message == "Log1" - assert job_submission_logs.logs[1].message == "Log2" + assert job_submission_logs.logs[0].message == "Log5" + assert job_submission_logs.logs[1].message == "Log4" assert job_submission_logs.logs[2].message == "Log3" - # Should have next_token pointing to line 3 (fourth log) - assert job_submission_logs.next_token == "3" + # Should have next_token for pagination + assert job_submission_logs.next_token is not None - # Test with limit of 1 and time filtering - poll_request.limit = 1 - poll_request.start_time = logs[3].timestamp + # Test next page + poll_request.next_token = job_submission_logs.next_token job_submission_logs = log_storage.poll_logs(project, poll_request) - assert len(job_submission_logs.logs) == 1 - assert job_submission_logs.logs[0].message == "Log5" - # Should not have next_token since we reached end of file + + # Should return remaining logs in descending order + assert len(job_submission_logs.logs) == 2 + assert job_submission_logs.logs[0].message == "Log2" + assert job_submission_logs.logs[1].message == "Log1" + assert job_submission_logs.next_token is None # No more logs + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_with_time_filtering( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test descending log polling with time filtering.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) + + # Write test logs with different timestamps + log_storage.write_logs( + project=project, + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[ + RunnerLogEvent( + timestamp=1696586513234, message=b"Log1" + ), # 2023-10-06T10:01:53.234 + RunnerLogEvent( + timestamp=1696586513235, message=b"Log2" + ), # 2023-10-06T10:01:53.235 + RunnerLogEvent( + timestamp=1696586513236, message=b"Log3" + ), # 2023-10-06T10:01:53.236 + RunnerLogEvent( + timestamp=1696586513237, message=b"Log4" + ), # 2023-10-06T10:01:53.237 + RunnerLogEvent( + timestamp=1696586513238, message=b"Log5" + ), # 2023-10-06T10:01:53.238 + ], + job_logs=[], + ) + + # Filter logs between 2023-10-06T10:01:53.235 and 2023-10-06T10:01:53.237 + poll_request = PollLogsRequest( + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + start_time=datetime(2023, 10, 6, 10, 1, 53, 235000, timezone.utc), + end_time=datetime(2023, 10, 6, 10, 1, 53, 237000, timezone.utc), + limit=10, + diagnose=True, + descending=True, + ) + job_submission_logs = log_storage.poll_logs(project, poll_request) + + # Should return logs in descending order within the time range + assert len(job_submission_logs.logs) == 2 + assert job_submission_logs.logs[0].message == "Log4" # timestamp 237 + assert job_submission_logs.logs[1].message == "Log3" # timestamp 236 + assert job_submission_logs.next_token is None # No more logs in range + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_invalid_next_token( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test descending log polling with invalid next_token.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) + + # Test with non-integer next_token + poll_request = PollLogsRequest( + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + next_token="invalid", + limit=10, + diagnose=True, + descending=True, + ) + with pytest.raises(ServerClientError): + log_storage.poll_logs(project, poll_request) + + # Test with negative next_token + poll_request.next_token = "-1" + with pytest.raises(ServerClientError): + log_storage.poll_logs(project, poll_request) + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_empty_file( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test descending log polling with empty log file.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) + + # Test with non-existent log file + poll_request = PollLogsRequest( + run_name="nonexistent_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=10, + diagnose=True, + descending=True, + ) + job_submission_logs = log_storage.poll_logs(project, poll_request) + + # Should return empty logs without error + assert len(job_submission_logs.logs) == 0 assert job_submission_logs.next_token is None + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_pagination_workflow( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test complete descending pagination workflow.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) + + # Write test logs + log_storage.write_logs( + project=project, + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + runner_logs=[ + RunnerLogEvent(timestamp=1696586513234, message=b"Log1"), + RunnerLogEvent(timestamp=1696586513235, message=b"Log2"), + RunnerLogEvent(timestamp=1696586513236, message=b"Log3"), + RunnerLogEvent(timestamp=1696586513237, message=b"Log4"), + RunnerLogEvent(timestamp=1696586513238, message=b"Log5"), + ], + job_logs=[], + ) + + # First page: get last 2 logs + poll_request = PollLogsRequest( + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=2, + diagnose=True, + descending=True, + ) + job_submission_logs = log_storage.poll_logs(project, poll_request) + + assert len(job_submission_logs.logs) == 2 + assert job_submission_logs.logs[0].message == "Log5" + assert job_submission_logs.logs[1].message == "Log4" + assert job_submission_logs.next_token is not None + + # Second page: get next 2 logs + poll_request.next_token = job_submission_logs.next_token + job_submission_logs = log_storage.poll_logs(project, poll_request) + + assert len(job_submission_logs.logs) == 2 + assert job_submission_logs.logs[0].message == "Log3" + assert job_submission_logs.logs[1].message == "Log2" + assert job_submission_logs.next_token is not None + + # Third page: get remaining log + poll_request.next_token = job_submission_logs.next_token + job_submission_logs = log_storage.poll_logs(project, poll_request) + + assert len(job_submission_logs.logs) == 1 + assert job_submission_logs.logs[0].message == "Log1" + assert job_submission_logs.next_token is None # No more logs + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_poll_logs_descending_malformed_lines( + self, test_db, session: AsyncSession, tmp_path: Path + ): + """Test descending log polling with malformed log lines.""" + project = await create_project(session=session) + log_storage = FileLogStorage(tmp_path) + + # Create log file with malformed lines + log_file_path = log_storage._get_log_file_path( + project_name=project.name, + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + producer=LogProducer.RUNNER, + ) + log_file_path.parent.mkdir(exist_ok=True, parents=True) + + with open(log_file_path, "w") as f: + f.write( + '{"timestamp": "2023-10-06T10:01:53.234Z", "log_source": "stdout", "message": "Log1"}\n' + ) + f.write("invalid json line\n") + f.write( + '{"timestamp": "2023-10-06T10:01:53.235Z", "log_source": "stdout", "message": "Log2"}\n' + ) + f.write("another invalid line\n") + f.write( + '{"timestamp": "2023-10-06T10:01:53.236Z", "log_source": "stdout", "message": "Log3"}\n' + ) + + # Test descending polling + poll_request = PollLogsRequest( + run_name="test_run", + job_submission_id=UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e"), + limit=10, + diagnose=True, + descending=True, + ) + job_submission_logs = log_storage.poll_logs(project, poll_request) + + # Should return only valid logs in descending order, skipping malformed lines + assert len(job_submission_logs.logs) == 3 + assert job_submission_logs.logs[0].message == "Log3" + assert job_submission_logs.logs[1].message == "Log2" + assert job_submission_logs.logs[2].message == "Log1" + class TestCloudWatchLogStorage: FAKE_NOW = datetime(2023, 10, 6, 10, 1, 54, tzinfo=timezone.utc) @@ -699,87 +920,158 @@ async def test_poll_logs_empty_response( poll_logs_request: PollLogsRequest, descending: bool, ): - mock_client.get_log_events.return_value["events"] = [] + # Test with no next token - should not trigger retrying + mock_client.get_log_events.return_value = { + "events": [], + "nextBackwardToken": None, # No next token + "nextForwardToken": None, # No next token + } poll_logs_request.descending = descending job_submission_logs = log_storage.poll_logs(project, poll_logs_request) assert job_submission_logs.logs == [] + # When no next token is provided initially, retrying doesn't trigger assert mock_client.get_log_events.call_count == 1 @pytest.mark.asyncio - async def test_poll_logs_descending_some_responses_are_empty( + async def test_poll_logs_descending_empty_response_max_tries( self, project: ProjectModel, log_storage: CloudWatchLogStorage, mock_client: Mock, poll_logs_request: PollLogsRequest, ): - # Test that the current implementation returns the events from a single API call - mock_client.get_log_events.return_value = { - "events": [ - {"timestamp": 1696586513234, "message": "SGVsbG8="}, - {"timestamp": 1696586513235, "message": "V29ybGQ="}, - ], - "nextBackwardToken": "bwd3", - "nextForwardToken": "fwd", - } - poll_logs_request.descending = True - poll_logs_request.limit = 3 - job_submission_logs = log_storage.poll_logs(project, poll_logs_request) - - assert job_submission_logs.logs == [ - LogEvent( - timestamp=datetime(2023, 10, 6, 10, 1, 53, 235000, tzinfo=timezone.utc), - log_source=LogEventSource.STDOUT, - message="V29ybGQ=", - ), - LogEvent( - timestamp=datetime(2023, 10, 6, 10, 1, 53, 234000, tzinfo=timezone.utc), - log_source=LogEventSource.STDOUT, - message="SGVsbG8=", - ), + # Test that we retry up to MAX_RETRIES times when getting empty responses with changing tokens + # Need to provide exactly 10 responses for MAX_RETRIES + mock_client.get_log_events.side_effect = [ + { + "events": [], + "nextBackwardToken": "bwd1", + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd2", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd3", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd4", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd5", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd6", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd7", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd8", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd9", # Different token + "nextForwardToken": "fwd", + }, + { + "events": [], + "nextBackwardToken": "bwd10", # Different token + "nextForwardToken": "fwd", + }, ] - assert mock_client.get_log_events.call_count == 1 - - @pytest.mark.asyncio - async def test_poll_logs_descending_empty_response_with_same_token( - self, - project: ProjectModel, - log_storage: CloudWatchLogStorage, - mock_client: Mock, - poll_logs_request: PollLogsRequest, - ): - # Test empty response from a single API call - mock_client.get_log_events.return_value = { - "events": [], - "nextBackwardToken": "bwd", - "nextForwardToken": "fwd", - } poll_logs_request.descending = True job_submission_logs = log_storage.poll_logs(project, poll_logs_request) assert job_submission_logs.logs == [] - assert mock_client.get_log_events.call_count == 1 + # For descending requests, we return the next token even when no logs found + assert job_submission_logs.next_token == "bwd10" + assert mock_client.get_log_events.call_count == 10 # MAX_RETRIES @pytest.mark.asyncio - async def test_poll_logs_descending_empty_response_max_tries( + async def test_poll_logs_ascending_empty_response_max_tries( self, project: ProjectModel, log_storage: CloudWatchLogStorage, mock_client: Mock, poll_logs_request: PollLogsRequest, ): - # Test empty response from a single API call - mock_client.get_log_events.return_value = { - "events": [], - "nextBackwardToken": "bwd1", - "nextForwardToken": "fwd", - } - poll_logs_request.descending = True + # Test that for ascending requests, we return None next_token when no logs found after max retries + # Need to provide exactly 10 responses for MAX_RETRIES + mock_client.get_log_events.side_effect = [ + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd1", + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd2", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd3", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd4", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd5", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd6", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd7", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd8", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd9", # Different token + }, + { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd10", # Different token + }, + ] + poll_logs_request.descending = False job_submission_logs = log_storage.poll_logs(project, poll_logs_request) assert job_submission_logs.logs == [] - assert mock_client.get_log_events.call_count == 1 + # For ascending requests, we return None when no logs found after max retries + assert job_submission_logs.next_token is None + assert mock_client.get_log_events.call_count == 10 # MAX_RETRIES @pytest.mark.asyncio async def test_poll_logs_request_params_asc_no_diag_no_dates( @@ -789,6 +1081,14 @@ async def test_poll_logs_request_params_asc_no_diag_no_dates( mock_client: Mock, poll_logs_request: PollLogsRequest, ): + # Ensure response has events to avoid retrying + mock_client.get_log_events.return_value = { + "events": [ + {"timestamp": 1696586513234, "message": "Hello"}, + ], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd", + } poll_logs_request.descending = False poll_logs_request.limit = 5 poll_logs_request.diagnose = False @@ -798,10 +1098,8 @@ async def test_poll_logs_request_params_asc_no_diag_no_dates( logGroupName="test-group", logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job", limit=5, - startFromHead=True, - endTime=mock_client.get_log_events.call_args.kwargs[ - "endTime" - ], # endTime is set to "now" + startFromHead=True, # For ascending requests + endTime=mock_client.get_log_events.call_args.kwargs["endTime"], # endTime is auto-set ) @pytest.mark.asyncio @@ -812,10 +1110,12 @@ async def test_poll_logs_request_params_desc_diag_with_dates( mock_client: Mock, poll_logs_request: PollLogsRequest, ): - # Ensure the response has events - mock_client.get_log_events.return_value["events"] = [ - {"timestamp": 1696586513234, "message": "SGVsbG8="} - ] + # Ensure the response has events to avoid retrying + mock_client.get_log_events.return_value = { + "events": [{"timestamp": 1696586513234, "message": "SGVsbG8="}], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd", + } poll_logs_request.start_time = datetime( 2023, 10, 6, 10, 1, 53, 234000, tzinfo=timezone.utc ) @@ -829,8 +1129,8 @@ async def test_poll_logs_request_params_desc_diag_with_dates( logGroupName="test-group", logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/runner", limit=10, - startFromHead=False, - startTime=1696586513235, # start_time + 1ms + startFromHead=False, # For descending requests + startTime=1696586513234, # start_time (no +1ms increment) endTime=1696672913234, ) @@ -1232,14 +1532,20 @@ async def test_poll_logs_descending_non_empty_response_on_first_call( mock_client: Mock, poll_logs_request: PollLogsRequest, ): - mock_client.get_log_events.return_value["events"] = [ - {"timestamp": 1696586513234, "message": "Hello"}, - {"timestamp": 1696586513235, "message": "World"}, - ] + # Ensure response has events to avoid retrying + mock_client.get_log_events.return_value = { + "events": [ + {"timestamp": 1696586513234, "message": "Hello"}, + {"timestamp": 1696586513235, "message": "World"}, + ], + "nextBackwardToken": "bwd456", + "nextForwardToken": "fwd", + } poll_logs_request.descending = True poll_logs_request.limit = 2 job_submission_logs = log_storage.poll_logs(project, poll_logs_request) + # Events should be reversed for descending order assert job_submission_logs.logs == [ LogEvent( timestamp=datetime(2023, 10, 6, 10, 1, 53, 235000, tzinfo=timezone.utc), @@ -1252,6 +1558,9 @@ async def test_poll_logs_descending_non_empty_response_on_first_call( message="Hello", ), ] + # Should return nextBackwardToken for descending requests + assert job_submission_logs.next_token == "bwd456" + assert mock_client.get_log_events.call_count == 1 @pytest.mark.asyncio async def test_next_token_ascending_pagination( @@ -1284,7 +1593,7 @@ async def test_next_token_ascending_pagination( logGroupName="test-group", logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job", limit=2, - startFromHead=True, + startFromHead=True, # For ascending requests endTime=mock_client.get_log_events.call_args.kwargs["endTime"], # endTime is auto-set ) @@ -1322,7 +1631,7 @@ async def test_next_token_descending_pagination( logGroupName="test-group", logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/job", limit=2, - startFromHead=False, + startFromHead=False, # For descending requests ) @pytest.mark.asyncio @@ -1369,6 +1678,7 @@ async def test_next_token_none_when_no_logs( poll_logs_request: PollLogsRequest, ): """Test that next_token is None when no logs are returned""" + # Test with no next token initially - should not trigger retrying mock_client.get_log_events.return_value = { "events": [], "nextBackwardToken": "bwd", @@ -1376,11 +1686,20 @@ async def test_next_token_none_when_no_logs( } poll_logs_request.limit = 10 + poll_logs_request.descending = False result = log_storage.poll_logs(project, poll_logs_request) assert len(result.logs) == 0 assert result.next_token is None # Should be None when no logs returned + # Test descending behavior with no next token initially + poll_logs_request.descending = True + result = log_storage.poll_logs(project, poll_logs_request) + + assert len(result.logs) == 0 + # For descending requests with no initial next token, we return None + assert result.next_token is None + @pytest.mark.asyncio async def test_next_token_with_time_filtering( self, @@ -1414,7 +1733,7 @@ async def test_next_token_with_time_filtering( logStreamName="test-proj/test-run/1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e/runner", limit=100, startFromHead=False, - startTime=1696586513235, # start_time + 1ms + startTime=1696586513234, # start_time (no +1ms increment) endTime=1696672913234, nextToken="time_token", ) @@ -1483,7 +1802,7 @@ async def test_next_token_pagination_workflow( "nextBackwardToken": "bwd", "nextForwardToken": "token_page2", }, - # Second call - returns final logs without next_token + # Second call - returns final logs with next_token { "events": [ {"timestamp": 1696586513236, "message": "!"}, @@ -1521,3 +1840,275 @@ async def test_next_token_pagination_workflow( # Second call should have nextToken second_call = mock_client.get_log_events.call_args_list[1] assert second_call.kwargs["nextToken"] == "token_page2" + + @pytest.mark.asyncio + async def test_poll_logs_retrying_multiple_empty_responses( + self, + project: ProjectModel, + log_storage: CloudWatchLogStorage, + mock_client: Mock, + poll_logs_request: PollLogsRequest, + ): + """Test retrying behavior when multiple empty responses are returned before finding logs""" + # First 3 calls return empty, 4th call returns events + mock_client.get_log_events.side_effect = [ + { + "events": [], + "nextBackwardToken": "bwd1", + "nextForwardToken": "fwd1", + }, + { + "events": [], + "nextBackwardToken": "bwd2", + "nextForwardToken": "fwd2", + }, + { + "events": [], + "nextBackwardToken": "bwd3", + "nextForwardToken": "fwd3", + }, + { + "events": [ + {"timestamp": 1696586513234, "message": "Hello"}, + {"timestamp": 1696586513235, "message": "World"}, + ], + "nextBackwardToken": "bwd4", + "nextForwardToken": "fwd4", + }, + ] + + poll_logs_request.descending = True + poll_logs_request.limit = 2 + result = log_storage.poll_logs(project, poll_logs_request) + + # Should return events from the 4th call, reversed for descending order + assert len(result.logs) == 2 + assert result.logs[0].message == "World" + assert result.logs[1].message == "Hello" + assert result.next_token == "bwd4" + assert mock_client.get_log_events.call_count == 4 + + @pytest.mark.asyncio + async def test_poll_logs_retrying_with_changing_tokens( + self, + project: ProjectModel, + log_storage: CloudWatchLogStorage, + mock_client: Mock, + poll_logs_request: PollLogsRequest, + ): + """Test retrying behavior when tokens change between calls""" + # Test that we continue retrying as long as tokens change + mock_client.get_log_events.side_effect = [ + { + "events": [], + "nextBackwardToken": "bwd1", + "nextForwardToken": "fwd1", + }, + { + "events": [], + "nextBackwardToken": "bwd2", # Different token + "nextForwardToken": "fwd2", + }, + { + "events": [ + {"timestamp": 1696586513234, "message": "Found"}, + ], + "nextBackwardToken": "bwd3", + "nextForwardToken": "fwd3", + }, + ] + + poll_logs_request.descending = True + poll_logs_request.limit = 1 + result = log_storage.poll_logs(project, poll_logs_request) + + assert len(result.logs) == 1 + assert result.logs[0].message == "Found" + assert result.next_token == "bwd3" + assert mock_client.get_log_events.call_count == 3 + + @pytest.mark.asyncio + async def test_poll_logs_descending_some_responses_are_empty( + self, + project: ProjectModel, + log_storage: CloudWatchLogStorage, + mock_client: Mock, + poll_logs_request: PollLogsRequest, + ): + # Test retrying logic: first call returns empty, second call returns events + mock_client.get_log_events.side_effect = [ + { + "events": [], + "nextBackwardToken": "bwd1", + "nextForwardToken": "fwd", + }, + { + "events": [ + {"timestamp": 1696586513234, "message": "SGVsbG8="}, + {"timestamp": 1696586513235, "message": "V29ybGQ="}, + ], + "nextBackwardToken": "bwd3", + "nextForwardToken": "fwd", + }, + ] + poll_logs_request.descending = True + poll_logs_request.limit = 3 + job_submission_logs = log_storage.poll_logs(project, poll_logs_request) + + # Should return events from second call, reversed for descending order + assert job_submission_logs.logs == [ + LogEvent( + timestamp=datetime(2023, 10, 6, 10, 1, 53, 235000, tzinfo=timezone.utc), + log_source=LogEventSource.STDOUT, + message="V29ybGQ=", + ), + LogEvent( + timestamp=datetime(2023, 10, 6, 10, 1, 53, 234000, tzinfo=timezone.utc), + log_source=LogEventSource.STDOUT, + message="SGVsbG8=", + ), + ] + assert job_submission_logs.next_token == "bwd3" + assert mock_client.get_log_events.call_count == 2 + + @pytest.mark.asyncio + async def test_poll_logs_descending_empty_response_with_same_token( + self, + project: ProjectModel, + log_storage: CloudWatchLogStorage, + mock_client: Mock, + poll_logs_request: PollLogsRequest, + ): + # Test that when next token doesn't change, we stop retrying + mock_client.get_log_events.return_value = { + "events": [], + "nextBackwardToken": "bwd", + "nextForwardToken": "fwd", + } + poll_logs_request.descending = True + poll_logs_request.next_token = "bwd" # Same as returned token + job_submission_logs = log_storage.poll_logs(project, poll_logs_request) + + assert job_submission_logs.logs == [] + assert job_submission_logs.next_token is None + assert mock_client.get_log_events.call_count == 1 + + +class TestFileLogStorageReadLinesReversed: + # No changes to the first 6 tests, they will now pass. + def test_basic_file(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2\nline3\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [ + (b"", 18), + (b"line3", 12), + (b"line2", 6), + (b"line1", 0), + ] + + def test_file_without_trailing_newline(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [ + (b"line2", 6), + (b"line1", 0), + ] + + def test_empty_file(self, tmp_path: Path): + file = tmp_path / "test.txt" + file.touch() + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [] + + def test_single_line_file(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"the only line" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [(b"the only line", 0)] + + def test_file_with_empty_lines(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"lineA\n\nlineC\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [ + (b"", 13), + (b"lineC", 7), + (b"", 6), + (b"lineA", 0), + ] + + def test_file_with_only_newlines(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"\n\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file)) + assert lines == [ + (b"", 2), + (b"", 1), + ] + + def test_large_file_spanning_multiple_chunks(self, tmp_path: Path): + file = tmp_path / "large_file.txt" + line_content = b"abcdefghi" # 9 bytes + 1 newline = 10 bytes per line + num_lines = 5 + content = b"\n".join([line_content] * num_lines) + file.write_bytes(content) + # Pass the small chunk_size directly to the method + lines = list(FileLogStorage._read_lines_reversed(file, chunk_size=10)) + assert len(lines) == num_lines + assert lines[0] == (line_content, 40) + assert lines[1] == (line_content, 30) + assert lines[2] == (line_content, 20) + assert lines[3] == (line_content, 10) + assert lines[4] == (line_content, 0) + + # The rest of the tests will now pass without modification + def test_start_offset_in_middle_of_line(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2\nline3\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file, start_offset=10)) + assert lines == [ + (b"line2", 6), + (b"line1", 0), + ] + + def test_start_offset_at_line_boundary(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2\nline3\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file, start_offset=12)) + assert lines == [ + (b"line2", 6), + (b"line1", 0), + ] + + def test_start_offset_zero(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2\nline3\n" + file.write_bytes(content) + lines = list(FileLogStorage._read_lines_reversed(file, start_offset=0)) + assert lines == [] + + def test_start_offset_larger_than_file(self, tmp_path: Path): + file = tmp_path / "test.txt" + content = b"line1\nline2\n" + file.write_bytes(content) + lines_with_offset = list(FileLogStorage._read_lines_reversed(file, start_offset=1000)) + lines_without_offset = list(FileLogStorage._read_lines_reversed(file)) + assert lines_with_offset == lines_without_offset + assert lines_with_offset == [(b"", 12), (b"line2", 6), (b"line1", 0)] + + def test_long_line_larger_than_chunk(self, tmp_path: Path): + file = tmp_path / "long_line.txt" + content = b"a" * 25 + file.write_bytes(content) + # Pass the small chunk_size directly + lines = list(FileLogStorage._read_lines_reversed(file, chunk_size=10)) + assert lines == [(content, 0)]