diff --git a/src/openai/resources/vector_stores/files.py b/src/openai/resources/vector_stores/files.py index b7e1ea9f92..ff078efbc8 100644 --- a/src/openai/resources/vector_stores/files.py +++ b/src/openai/resources/vector_stores/files.py @@ -2,6 +2,7 @@ from __future__ import annotations +import time from typing import TYPE_CHECKING, Dict, Union, Optional from typing_extensions import Literal, assert_never @@ -331,6 +332,7 @@ def create_and_poll( vector_store_id: str, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, chunking_strategy: FileChunkingStrategyParam | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. @@ -355,6 +357,7 @@ def create_and_poll( file_id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) def poll( @@ -363,6 +366,7 @@ def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, ) -> VectorStoreFile: """Wait for the vector store file to finish processing. @@ -373,6 +377,7 @@ def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + start = time.time() while True: response = self.with_raw_response.retrieve( file_id, @@ -382,6 +387,13 @@ def poll( file = response.parse() if file.status == "in_progress": + elapsed = time.time() - start + remaining = max_wait_seconds - elapsed + if remaining <= 0: + raise RuntimeError( + f"Giving up on waiting for vector store file {file_id} to finish processing after {max_wait_seconds} seconds." + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -389,7 +401,7 @@ def poll( else: poll_interval_ms = 1000 - self._sleep(poll_interval_ms / 1000) + self._sleep(min(poll_interval_ms / 1000, remaining)) elif file.status == "cancelled" or file.status == "completed" or file.status == "failed": return file else: @@ -420,6 +432,7 @@ def upload_and_poll( file: FileTypes, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFile: """Add a file to a vector store and poll until processing is complete.""" @@ -429,6 +442,7 @@ def upload_and_poll( file_id=file_obj.id, chunking_strategy=chunking_strategy, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, attributes=attributes, ) @@ -785,6 +799,7 @@ async def create_and_poll( vector_store_id: str, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, chunking_strategy: FileChunkingStrategyParam | Omit = omit, # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. # The extra values given here take precedence over values defined on the client or passed to this method. @@ -809,6 +824,7 @@ async def create_and_poll( file_id, vector_store_id=vector_store_id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, ) async def poll( @@ -817,6 +833,7 @@ async def poll( *, vector_store_id: str, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, ) -> VectorStoreFile: """Wait for the vector store file to finish processing. @@ -827,6 +844,7 @@ async def poll( if is_given(poll_interval_ms): headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms) + start = time.time() while True: response = await self.with_raw_response.retrieve( file_id, @@ -836,6 +854,13 @@ async def poll( file = response.parse() if file.status == "in_progress": + elapsed = time.time() - start + remaining = max_wait_seconds - elapsed + if remaining <= 0: + raise RuntimeError( + f"Giving up on waiting for vector store file {file_id} to finish processing after {max_wait_seconds} seconds." + ) + if not is_given(poll_interval_ms): from_header = response.headers.get("openai-poll-after-ms") if from_header is not None: @@ -843,7 +868,7 @@ async def poll( else: poll_interval_ms = 1000 - await self._sleep(poll_interval_ms / 1000) + await self._sleep(min(poll_interval_ms / 1000, remaining)) elif file.status == "cancelled" or file.status == "completed" or file.status == "failed": return file else: @@ -876,6 +901,7 @@ async def upload_and_poll( file: FileTypes, attributes: Optional[Dict[str, Union[str, float, bool]]] | Omit = omit, poll_interval_ms: int | Omit = omit, + max_wait_seconds: float = 30 * 60, chunking_strategy: FileChunkingStrategyParam | Omit = omit, ) -> VectorStoreFile: """Add a file to a vector store and poll until processing is complete.""" @@ -884,6 +910,7 @@ async def upload_and_poll( vector_store_id=vector_store_id, file_id=file_obj.id, poll_interval_ms=poll_interval_ms, + max_wait_seconds=max_wait_seconds, chunking_strategy=chunking_strategy, attributes=attributes, ) diff --git a/tests/api_resources/vector_stores/test_files.py b/tests/api_resources/vector_stores/test_files.py index 53aa5ee041..80a7df7df6 100644 --- a/tests/api_resources/vector_stores/test_files.py +++ b/tests/api_resources/vector_stores/test_files.py @@ -7,6 +7,7 @@ import pytest +import openai.resources.vector_stores.files as files_module from openai import OpenAI, AsyncOpenAI from tests.utils import assert_matches_type from openai._utils import assert_signatures_in_sync @@ -20,6 +21,23 @@ base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010") +class _VectorStoreFilePollResponse: + headers: dict[str, str] = {} + + def __init__(self, status: str = "in_progress") -> None: + self._file = VectorStoreFile( + id="file-abc123", + created_at=0, + object="vector_store.file", + status=status, + usage_bytes=0, + vector_store_id="vs_abc123", + ) + + def parse(self) -> VectorStoreFile: + return self._file + + class TestFiles: parametrize = pytest.mark.parametrize("client", [False, True], indirect=True, ids=["loose", "strict"]) @@ -75,6 +93,50 @@ def test_path_params_create(self, client: OpenAI) -> None: file_id="file_id", ) + def test_poll_times_out_for_stuck_in_progress(self, client: OpenAI, monkeypatch: pytest.MonkeyPatch) -> None: + files = client.vector_stores.files + times = iter([0.0, 2.0]) + + def retrieve(*_args: object, **_kwargs: object) -> _VectorStoreFilePollResponse: + return _VectorStoreFilePollResponse() + + monkeypatch.setattr(files.with_raw_response, "retrieve", retrieve) + monkeypatch.setattr(files, "_sleep", lambda _seconds: None) + monkeypatch.setattr(files_module.time, "time", lambda: next(times)) + + with pytest.raises( + RuntimeError, + match=r"Giving up on waiting for vector store file file-abc123 to finish processing after 1.0 seconds.", + ): + files.poll( + "file-abc123", + vector_store_id="vs_abc123", + poll_interval_ms=1, + max_wait_seconds=1.0, + ) + + def test_poll_limits_sleep_to_remaining_timeout(self, client: OpenAI, monkeypatch: pytest.MonkeyPatch) -> None: + files = client.vector_stores.files + sleep_durations: list[float] = [] + times = iter([0.0, 0.25, 1.25]) + + def retrieve(*_args: object, **_kwargs: object) -> _VectorStoreFilePollResponse: + return _VectorStoreFilePollResponse() + + monkeypatch.setattr(files.with_raw_response, "retrieve", retrieve) + monkeypatch.setattr(files, "_sleep", sleep_durations.append) + monkeypatch.setattr(files_module.time, "time", lambda: next(times)) + + with pytest.raises(RuntimeError): + files.poll( + "file-abc123", + vector_store_id="vs_abc123", + poll_interval_ms=60_000, + max_wait_seconds=1.0, + ) + + assert sleep_durations == [pytest.approx(0.75)] + @parametrize def test_method_retrieve(self, client: OpenAI) -> None: file = client.vector_stores.files.retrieve( @@ -380,6 +442,60 @@ async def test_path_params_create(self, async_client: AsyncOpenAI) -> None: file_id="file_id", ) + async def test_poll_times_out_for_stuck_in_progress( + self, async_client: AsyncOpenAI, monkeypatch: pytest.MonkeyPatch + ) -> None: + files = async_client.vector_stores.files + times = iter([0.0, 2.0]) + + async def retrieve(*_args: object, **_kwargs: object) -> _VectorStoreFilePollResponse: + return _VectorStoreFilePollResponse() + + async def sleep(_seconds: float) -> None: + return None + + monkeypatch.setattr(files.with_raw_response, "retrieve", retrieve) + monkeypatch.setattr(files, "_sleep", sleep) + monkeypatch.setattr(files_module.time, "time", lambda: next(times)) + + with pytest.raises( + RuntimeError, + match=r"Giving up on waiting for vector store file file-abc123 to finish processing after 1.0 seconds.", + ): + await files.poll( + "file-abc123", + vector_store_id="vs_abc123", + poll_interval_ms=1, + max_wait_seconds=1.0, + ) + + async def test_poll_limits_sleep_to_remaining_timeout( + self, async_client: AsyncOpenAI, monkeypatch: pytest.MonkeyPatch + ) -> None: + files = async_client.vector_stores.files + sleep_durations: list[float] = [] + times = iter([0.0, 0.25, 1.25]) + + async def retrieve(*_args: object, **_kwargs: object) -> _VectorStoreFilePollResponse: + return _VectorStoreFilePollResponse() + + async def sleep(seconds: float) -> None: + sleep_durations.append(seconds) + + monkeypatch.setattr(files.with_raw_response, "retrieve", retrieve) + monkeypatch.setattr(files, "_sleep", sleep) + monkeypatch.setattr(files_module.time, "time", lambda: next(times)) + + with pytest.raises(RuntimeError): + await files.poll( + "file-abc123", + vector_store_id="vs_abc123", + poll_interval_ms=60_000, + max_wait_seconds=1.0, + ) + + assert sleep_durations == [pytest.approx(0.75)] + @parametrize async def test_method_retrieve(self, async_client: AsyncOpenAI) -> None: file = await async_client.vector_stores.files.retrieve(