From a40d3fbbd61865d056337a6d9d5dccc872f020d7 Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 13:21:53 -0700 Subject: [PATCH 1/6] refactor automatic rate limiting --- openaq/__init__.py | 2 +- openaq/_async/client.py | 155 ++++++++++++++--- openaq/_sync/client.py | 71 +++++++- openaq/shared/client.py | 44 +---- tests/unit/async/test_async_client.py | 233 +++++++++++++++++++------- tests/unit/sync/test_sync_client.py | 209 +++++++++++++++++++++-- tests/unit/test_shared_client.py | 119 +++---------- 7 files changed, 585 insertions(+), 248 deletions(-) diff --git a/openaq/__init__.py b/openaq/__init__.py index fedcc311..47302f7e 100644 --- a/openaq/__init__.py +++ b/openaq/__init__.py @@ -2,7 +2,7 @@ import logging -__version__ = "1.0.0rc1" +__version__ = "1.0.0rc2" logger = logging.getLogger(__name__) diff --git a/openaq/_async/client.py b/openaq/_async/client.py index bf4ac61f..a8ba634c 100644 --- a/openaq/_async/client.py +++ b/openaq/_async/client.py @@ -1,12 +1,15 @@ from __future__ import annotations import asyncio +from datetime import datetime import logging +import platform from types import TracebackType from typing import Mapping import httpx +from openaq import __version__ from openaq._async.models.countries import Countries from openaq._async.models.instruments import Instruments from openaq._async.models.licenses import Licenses @@ -18,6 +21,7 @@ from openaq._async.models.providers import Providers from openaq._async.models.sensors import Sensors from openaq.shared.client import DEFAULT_BASE_URL, BaseClient +from openaq.shared.exceptions import RateLimitError from .transport import AsyncTransport @@ -65,13 +69,21 @@ class AsyncOpenAQ(BaseClient[AsyncTransport]): def __init__( self, api_key: str | None = None, - headers: Mapping[str, str] = {}, + headers: Mapping[str, str] | None = None, auto_wait: bool = True, base_url: str = DEFAULT_BASE_URL, - transport: AsyncTransport = AsyncTransport(), + transport: AsyncTransport | None = None, + rate_limit_override: int | None = None, ) -> None: + if transport is None: + transport = AsyncTransport() + if headers is None: + headers = {} super().__init__(transport, headers, api_key, auto_wait, base_url) - + self._user_agent = ( + f"openaq-python-async-{__version__}-{platform.python_version()}" + ) + self.resolve_headers() self.countries = Countries(self) self.instruments = Instruments(self) self.licenses = Licenses(self) @@ -82,17 +94,86 @@ def __init__( self.providers = Providers(self) self.parameters = Parameters(self) self.sensors = Sensors(self) + rate_limit = rate_limit_override if rate_limit_override is not None else 60 + self._rate_limit_capacity = float(rate_limit) + self._rate_limit_remaining = self._rate_limit_capacity + self._lock = asyncio.Lock() + self._in_flight_requests = 0 + self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") + self._rate_limit_synced_event = asyncio.Event() + self._sync_in_progress = False @property def transport(self) -> AsyncTransport: return self._transport - async def _wait_for_rate_limit_reset(self) -> None: - """Wait asynchronously until the rate limit resets.""" - wait_seconds = self._rate_limit_reset_seconds - if wait_seconds > 0: - logger.info(f"Rate limit hit. Waiting {wait_seconds} seconds for reset...") - await asyncio.sleep(wait_seconds) + async def _acquire_token(self) -> None: + """Acquire a rate limit token before making a request. + + Checks available capacity against in-flight requests in the current + time window. If capacity is available, increments the in-flight counter + and returns immediately. If the window has rolled over, resets remaining + capacity accounting for any still in-flight requests from the previous + window before granting the token. + + If no capacity is available and auto_wait is enabled, sleeps until the + next window opens and then grants the token. If auto_wait is disabled, + raises RateLimitError immediately. + + Raises: + RateLimitError: If capacity is exhausted and auto_wait is False. + """ + async with self._lock: + now = datetime.now() + window_id = now.strftime("%Y%m%d%H%M") + + if self._current_window_id != window_id: + self._rate_limit_remaining = ( + self._rate_limit_capacity - self._in_flight_requests + ) + self._current_window_id = window_id + + available = self._rate_limit_remaining - self._in_flight_requests + if available >= 1.0: + self._in_flight_requests += 1 + return + + if not self._auto_wait: + raise RateLimitError("Rate limit exceeded") + + seconds_until_next_min = 60 - now.second - (now.microsecond / 1_000_000) + wait = seconds_until_next_min + 0.5 + + await asyncio.sleep(wait) + + async with self._lock: + self._rate_limit_remaining = ( + self._rate_limit_capacity - self._in_flight_requests + ) + self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") + self._in_flight_requests += 1 + + def _set_rate_limit(self, headers: httpx.Headers | Mapping[str, str]) -> None: + """Synchronize local rate limit state with API provided response headers. + + Reads the x-ratelimit-remaining and x-ratelimit-limit headers from the + HTTP response and updates the local capacity and remaining token counts. + This corrects any drift between the client-side estimates and the + server's actual counts, such as at window boundaries or after bursts. + + Args: + headers: The response headers from the HTTP client. + """ + x_ratelimit_remaining_header = headers.get("x-ratelimit-remaining") + x_ratelimit_limit_header = headers.get("x-ratelimit-limit") + + try: + if x_ratelimit_limit_header is not None: + self._rate_limit_capacity = float(x_ratelimit_limit_header) + if x_ratelimit_remaining_header is not None: + self._rate_limit_remaining = float(x_ratelimit_remaining_header) + except ValueError as e: + logger.error(f"API sent malformed rate limit headers: {e}") async def _do( self, @@ -104,17 +185,24 @@ async def _do( ) = None, headers: httpx.Headers | Mapping[str, str] | None = None, ) -> httpx.Response: - """Execute an HTTP request with rate limit handling. + """Execute an HTTP request with rate limit handling and state synchronization. - Checks rate limits before making the request. If auto_wait is enabled - and rate limited, waits for the limit to reset. Otherwise raises - RateLimitError if rate limited. + On the first request, designates the calling coroutine as the + initial sync request. All other coroutines that arrive before the first + response is received will wait until the server has confirmed the true + rate limit state via response headers. Subsequent requests proceed + directly to token acquisition. + + Once a token is acquired, builds the request headers, constructs the + full URL, and dispatches the request via the transport layer. On + completion, synchronizes local rate limit state from the response + headers and decrements the in-flight counter. Args: method: HTTP method. path: API endpoint path. params: Query parameters. - headers: HTTP request headers. + headers: Additional request headers. Returns: HTTP response object. @@ -122,17 +210,38 @@ async def _do( Raises: RateLimitError: If rate limited and auto_wait is False. """ - self._check_rate_limit() + is_initial_request = False + if not self._rate_limit_synced_event.is_set(): + async with self._lock: + if ( + not self._rate_limit_synced_event.is_set() + and not self._sync_in_progress + ): + self._sync_in_progress = True + is_initial_request = True - if self._auto_wait and self._is_rate_limited(): - await self._wait_for_rate_limit_reset() + if not is_initial_request: + await self._rate_limit_synced_event.wait() - request_headers = self.build_request_headers(headers) - url = self._base_url + path - data = await self.transport.send_request( - method=method, url=url, params=params, headers=request_headers - ) - return data + await self._acquire_token() + + try: + request_headers = self.build_request_headers(headers) + url = self._base_url + path + data = await self.transport.send_request( + method=method, url=url, params=params, headers=request_headers + ) + self._set_rate_limit(data.headers) + return data + + finally: + async with self._lock: + self._in_flight_requests = max(0, self._in_flight_requests - 1) + if is_initial_request: + self._sync_in_progress = False + + if is_initial_request: + self._rate_limit_synced_event.set() async def _get( self, diff --git a/openaq/_sync/client.py b/openaq/_sync/client.py index 86fd2b97..b8778ae8 100644 --- a/openaq/_sync/client.py +++ b/openaq/_sync/client.py @@ -1,12 +1,15 @@ from __future__ import annotations +from datetime import datetime, timedelta import logging +import platform import time from types import TracebackType from typing import Mapping import httpx +from openaq import __version__ from openaq._sync.models.countries import Countries from openaq._sync.models.instruments import Instruments from openaq._sync.models.licenses import Licenses @@ -18,6 +21,7 @@ from openaq._sync.models.providers import Providers from openaq._sync.models.sensors import Sensors from openaq.shared.client import DEFAULT_BASE_URL, BaseClient +from openaq.shared.exceptions import RateLimitError from .transport import Transport @@ -63,15 +67,34 @@ class OpenAQ(BaseClient[Transport]): """ + _rate_limit_reset_datetime: datetime + _rate_limit_remaining: int + _request_count: int + def __init__( self, api_key: str | None = None, - headers: Mapping[str, str] = {}, + headers: Mapping[str, str] | None = None, auto_wait: bool = True, base_url: str = DEFAULT_BASE_URL, - _transport: Transport = Transport(), + transport: Transport | None = None, + rate_limit_override: int | None = None, ) -> None: - super().__init__(_transport, headers, api_key, auto_wait, base_url) + if transport is None: + transport = Transport() + if headers is None: + headers = {} + super().__init__(transport, headers, api_key, auto_wait, base_url) + self._user_agent = ( + f"openaq-python-sync-{__version__}-{platform.python_version()}" + ) + self.resolve_headers() + self._request_count = 0 + rate_limit = rate_limit_override if rate_limit_override is not None else 60 + self._rate_limit_capacity = float(rate_limit) + self._rate_limit_reset_datetime = datetime.min + self._rate_limit_remaining = self._rate_limit_capacity + self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") self.countries = Countries(self) self.instruments = Instruments(self) @@ -84,6 +107,45 @@ def __init__( self.parameters = Parameters(self) self.sensors = Sensors(self) + @property + def _rate_limit_reset_seconds(self) -> int: + return int((self._rate_limit_reset_datetime - datetime.now()).total_seconds()) + + def _is_rate_limited(self) -> bool: + return ( + self._rate_limit_remaining == 0 + and self._rate_limit_reset_datetime > datetime.now() + ) + + def _check_rate_limit(self) -> None: + now = datetime.now() + window_id = now.strftime("%Y%m%d%H%M") + + if self._current_window_id != window_id: + self._rate_limit_remaining = self._rate_limit_capacity + self._current_window_id = window_id + return + + if self._rate_limit_remaining <= 0: + if self._auto_wait: + self._wait_for_rate_limit_reset() + self._rate_limit_remaining = self._rate_limit_capacity + self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") + else: + message = f"Rate limit exceeded. Limit resets in {self._rate_limit_reset_seconds} seconds" + logger.error(message) + raise RateLimitError(message) + + def _set_rate_limit(self, headers: httpx.Headers) -> None: + rate_limit_remaining = self._get_int_header(headers, 'x-ratelimit-remaining', 0) + rate_limit_reset_seconds = self._get_int_header( + headers, 'x-ratelimit-reset', 60 + ) + now = (datetime.now() + timedelta(seconds=0.5)).replace(microsecond=0) + rate_limit_reset_datetime = now + timedelta(seconds=rate_limit_reset_seconds) + self._rate_limit_remaining = rate_limit_remaining + self._rate_limit_reset_datetime = rate_limit_reset_datetime + def _wait_for_rate_limit_reset(self) -> None: """Wait until the rate limit resets.""" wait_seconds = self._rate_limit_reset_seconds @@ -120,8 +182,7 @@ def _do( RateLimitError: If rate limited and auto_wait is False. """ self._check_rate_limit() - if self._auto_wait and self._is_rate_limited(): - self._wait_for_rate_limit_reset() + self._rate_limit_remaining -= 1 request_headers = self.build_request_headers(headers) url = self._base_url + path data = self.transport.send_request( diff --git a/openaq/shared/client.py b/openaq/shared/client.py index 22207109..8b021fb2 100644 --- a/openaq/shared/client.py +++ b/openaq/shared/client.py @@ -3,11 +3,8 @@ from __future__ import annotations import logging -import math import os -import platform -from abc import ABC -from datetime import datetime, timedelta +from abc import ABC, abstractmethod from pathlib import Path from typing import Generic, Mapping, TypeVar @@ -15,7 +12,7 @@ from openaq._async.transport import AsyncTransport from openaq._sync.transport import Transport -from openaq.shared.exceptions import ApiKeyMissingError, RateLimitError +from openaq.shared.exceptions import ApiKeyMissingError from openaq.shared.types import OpenAQConfig logger = logging.getLogger(__name__) @@ -27,12 +24,9 @@ except ImportError: _has_toml = False -from openaq import __version__ ACCEPT_HEADER = "application/json" -DEFAULT_USER_AGENT = f"openaq-python-{__version__}-{platform.python_version()}" - DEFAULT_BASE_URL = "https://api.openaq.org/v3/" TTransport = TypeVar('TTransport', Transport, AsyncTransport) @@ -69,8 +63,6 @@ class BaseClient(ABC, Generic[TTransport]): _user_agent: str _auto_wait: bool _transport: TTransport - _rate_limit_reset_datetime: datetime - _rate_limit_remaining: float def __init__( self, @@ -96,10 +88,6 @@ def __init__( self._headers = httpx.Headers(headers) self._transport: TTransport = transport self._base_url = base_url - self._user_agent = DEFAULT_USER_AGENT - self.resolve_headers() - self._rate_limit_reset_datetime = datetime.min - self._rate_limit_remaining = math.inf self._auto_wait = auto_wait self._check_api_key_url() @@ -204,23 +192,6 @@ def resolve_headers(self) -> None: self._headers["User-Agent"] = self._user_agent self._headers["Accept"] = ACCEPT_HEADER - def _is_rate_limited(self) -> bool: - return ( - self._rate_limit_remaining == 0 - and self._rate_limit_reset_datetime > datetime.now() - ) - - def _check_rate_limit(self) -> None: - if self._is_rate_limited(): - if not self._auto_wait: - message = f"Rate limit exceeded. Limit resets in {self._rate_limit_reset_seconds} seconds" - logger.error(message) - raise RateLimitError(message) - - @property - def _rate_limit_reset_seconds(self) -> int: - return int((self._rate_limit_reset_datetime - datetime.now()).total_seconds()) - def _get_int_header(self, headers: httpx.Headers, key: str, default: int) -> int: """Extract integer from header, avoiding Any types. @@ -238,15 +209,8 @@ def _get_int_header(self, headers: httpx.Headers, key: str, default: int) -> int except (KeyError, ValueError): return default - def _set_rate_limit(self, headers: httpx.Headers) -> None: - rate_limit_remaining = self._get_int_header(headers, 'x-ratelimit-remaining', 0) - rate_limit_reset_seconds = self._get_int_header( - headers, 'x-ratelimit-reset', 60 - ) - now = (datetime.now() + timedelta(seconds=0.5)).replace(microsecond=0) - rate_limit_reset_datetime = now + timedelta(seconds=rate_limit_reset_seconds) - self._rate_limit_remaining = rate_limit_remaining - self._rate_limit_reset_datetime = rate_limit_reset_datetime + @abstractmethod + def _set_rate_limit(self, headers: httpx.Headers) -> None: ... def _get_openaq_config() -> OpenAQConfig | None: diff --git a/tests/unit/async/test_async_client.py b/tests/unit/async/test_async_client.py index c7b650e4..aec23d84 100644 --- a/tests/unit/async/test_async_client.py +++ b/tests/unit/async/test_async_client.py @@ -1,9 +1,12 @@ import os import platform -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path from unittest import mock +import httpx + + import pytest from openaq import __version__ @@ -13,6 +16,9 @@ from ..mocks import AsyncMockTransport +ASYNC_USER_AGENT = f"openaq-python-async-{__version__}-{platform.python_version()}" + + @pytest.fixture def mock_config_file(): mock_toml_content = b"""api-key='test_api_key'""" @@ -46,10 +52,7 @@ def test_default_client_params(self, setup): assert self.client._base_url == "https://api.openaq.org/v3/" def test_default_headers(self, setup): - assert ( - self.client.headers["User-Agent"] - == f"openaq-python-{__version__}-{platform.python_version()}" - ) + assert self.client.headers["User-Agent"] == ASYNC_USER_AGENT assert self.client.headers["Accept"] == "application/json" def test_custom_headers(self, setup): @@ -104,65 +107,6 @@ def test_api_key_arg_override_env_vars_config( """ assert self.client.api_key == "abc123-def456-ghi789" - @mock.patch('openaq.shared.client.datetime') - @mock.patch('asyncio.sleep') - @mock.patch('openaq._async.client.logger') - @pytest.mark.asyncio - async def test_wait_for_rate_limit_reset_waits_when_positive( - self, mock_logger, mock_sleep, mock_datetime, setup - ): - """Test that asyncio.sleep is called with correct duration when wait_seconds > 0.""" - now = datetime(2026, 2, 12, 0, 0, 0) - mock_datetime.now.return_value = now - - # Set reset time to 5 seconds in the future - self.client._rate_limit_reset_datetime = now + timedelta(seconds=5) - - await self.client._wait_for_rate_limit_reset() - - mock_sleep.assert_called_once_with(5) - mock_logger.info.assert_called_once_with( - "Rate limit hit. Waiting 5 seconds for reset..." - ) - - @mock.patch('openaq.shared.client.datetime') - @mock.patch('asyncio.sleep') - @mock.patch('openaq._async.client.logger') - @pytest.mark.asyncio - async def test_wait_for_rate_limit_reset_does_not_wait_when_zero( - self, mock_logger, mock_sleep, mock_datetime, setup - ): - """Test that asyncio.sleep is not called when wait_seconds is 0.""" - now = datetime(2026, 2, 12, 0, 0, 0) - mock_datetime.now.return_value = now - - # Set reset time to now (0 seconds wait) - self.client._rate_limit_reset_datetime = now - - await self.client._wait_for_rate_limit_reset() - - mock_sleep.assert_not_called() - mock_logger.info.assert_not_called() - - @mock.patch('openaq.shared.client.datetime') - @mock.patch('asyncio.sleep') - @mock.patch('openaq._async.client.logger') - @pytest.mark.asyncio - async def test_wait_for_rate_limit_reset_does_not_wait_when_negative( - self, mock_logger, mock_sleep, mock_datetime, setup - ): - """Test that asyncio.sleep is not called when wait_seconds is negative.""" - now = datetime(2026, 2, 12, 0, 0, 0) - mock_datetime.now.return_value = now - - # Set reset time to 5 seconds in the past - self.client._rate_limit_reset_datetime = now - timedelta(seconds=5) - - await self.client._wait_for_rate_limit_reset() - - mock_sleep.assert_not_called() - mock_logger.info.assert_not_called() - @pytest.mark.asyncio async def test_close_closes_transport(self, setup): """Test that close() calls transport.close().""" @@ -198,3 +142,164 @@ async def test_context_manager_exit_closes_even_with_exception(self, setup): raise ValueError("Test exception") self.client.transport.close.assert_called_once() + + @pytest.mark.asyncio + async def test_acquire_token_grants_when_capacity_available(self, setup): + """Test that token is granted immediately when capacity is available.""" + self.client._rate_limit_remaining = 60.0 + self.client._in_flight_requests = 0 + initial_in_flight = self.client._in_flight_requests + + await self.client._acquire_token() + + assert self.client._in_flight_requests == initial_in_flight + 1 + + @pytest.mark.asyncio + async def test_acquire_token_raises_when_exhausted_and_auto_wait_false(self, setup): + """Test that RateLimitError is raised when exhausted and auto_wait is False.""" + from openaq.shared.exceptions import RateLimitError + + self.client._auto_wait = False + self.client._rate_limit_remaining = 0.0 + self.client._in_flight_requests = 0 + + with pytest.raises(RateLimitError): + await self.client._acquire_token() + + @pytest.mark.asyncio + async def test_acquire_token_resets_capacity_on_new_window(self, setup): + """Test that capacity resets when minute window rolls over.""" + self.client._rate_limit_remaining = 0.0 + self.client._in_flight_requests = 0 + self.client._current_window_id = "202602240000" + + with mock.patch('openaq._async.client.datetime') as mock_datetime: + now = datetime(2026, 2, 24, 0, 1, 0) + mock_datetime.now.return_value = now + + await self.client._acquire_token() + + assert self.client._in_flight_requests == 1 + + @pytest.mark.asyncio + async def test_acquire_token_accounts_for_in_flight_on_window_reset(self, setup): + """Test that in-flight requests are subtracted from capacity on window reset.""" + self.client._rate_limit_remaining = 0.0 + self.client._in_flight_requests = 5 + self.client._current_window_id = "202602240000" + + with mock.patch('openaq._async.client.datetime') as mock_datetime: + now = datetime(2026, 2, 24, 0, 1, 0) + mock_datetime.now.return_value = now + + await self.client._acquire_token() + + assert self.client._rate_limit_remaining == self.client._rate_limit_capacity - 5 + + @pytest.mark.asyncio + async def test_acquire_token_decrements_in_flight_on_completion(self, setup): + """Test that in-flight counter is decremented after request completes.""" + mock_response = mock.MagicMock() + mock_response.headers = {} + self.client.transport.send_request = mock.AsyncMock(return_value=mock_response) + + await self.client._do("get", "/test") + + assert self.client._in_flight_requests == 0 + + @pytest.mark.asyncio + async def test_acquire_token_decrements_in_flight_on_exception(self, setup): + """Test that in-flight counter is decremented even when request raises.""" + self.client.transport.send_request = mock.AsyncMock( + side_effect=Exception("network error") + ) + + with pytest.raises(Exception, match="network error"): + await self.client._do("get", "/test") + + assert self.client._in_flight_requests == 0 + + @pytest.mark.asyncio + async def test_set_rate_limit_updates_remaining_from_headers(self, setup): + """Test that remaining is updated from x-ratelimit-remaining header.""" + headers = httpx.Headers( + {"x-ratelimit-remaining": "45", "x-ratelimit-limit": "60"} + ) + self.client._set_rate_limit(headers) + assert self.client._rate_limit_remaining == 45.0 + + @pytest.mark.asyncio + async def test_set_rate_limit_updates_capacity_from_headers(self, setup): + """Test that capacity is updated from x-ratelimit-limit header.""" + headers = httpx.Headers( + {"x-ratelimit-remaining": "45", "x-ratelimit-limit": "120"} + ) + self.client._set_rate_limit(headers) + assert self.client._rate_limit_capacity == 120.0 + + @pytest.mark.asyncio + async def test_set_rate_limit_server_remaining_overrides_local_count(self, setup): + """Test that server-provided remaining overrides optimistic local tracking.""" + + self.client._rate_limit_remaining = 50.0 + headers = httpx.Headers( + {"x-ratelimit-remaining": "30", "x-ratelimit-limit": "60"} + ) + self.client._set_rate_limit(headers) + assert self.client._rate_limit_remaining == 30.0 + + @pytest.mark.asyncio + async def test_rate_limit_synced_event_set_after_first_request(self, setup): + """Test that rate limit sync event is set after the first request completes.""" + mock_response = mock.MagicMock() + mock_response.headers = {} + self.client.transport.send_request = mock.AsyncMock(return_value=mock_response) + + assert not self.client._rate_limit_synced_event.is_set() + await self.client._do("get", "/test") + assert self.client._rate_limit_synced_event.is_set() + + @pytest.mark.asyncio + async def test_rate_limit_synced_event_set_even_on_request_failure(self, setup): + """Test that sync event is set even when the first request fails.""" + self.client.transport.send_request = mock.AsyncMock( + side_effect=Exception("network error") + ) + + with pytest.raises(Exception): + await self.client._do("get", "/test") + + assert self.client._rate_limit_synced_event.is_set() + + @pytest.mark.asyncio + async def test_default_rate_limit_override(self, setup): + """Test that default rate limit capacity is set to 60.""" + assert self.client._rate_limit_capacity == 60.0 + + @pytest.mark.asyncio + async def test_custom_rate_limit_override(self): + """Test that rate_limit_override sets custom capacity.""" + client = AsyncOpenAQ( + api_key="abc123-def456-ghi789", + transport=AsyncMockTransport(), + rate_limit_override=30, + ) + assert client._rate_limit_capacity == 30.0 + assert client._rate_limit_remaining == 30.0 + + @pytest.mark.asyncio + async def test_blocks_after_custom_limit(self): + """Test that client raises after exhausting custom rate limit.""" + from openaq.shared.exceptions import RateLimitError + + client = AsyncOpenAQ( + api_key="abc123-def456-ghi789", + transport=AsyncMockTransport(), + auto_wait=False, + rate_limit_override=5, + ) + for _ in range(5): + await client._acquire_token() + + with pytest.raises(RateLimitError): + await client._acquire_token() diff --git a/tests/unit/sync/test_sync_client.py b/tests/unit/sync/test_sync_client.py index e3bfa586..ba722f60 100644 --- a/tests/unit/sync/test_sync_client.py +++ b/tests/unit/sync/test_sync_client.py @@ -1,18 +1,24 @@ import os import platform -from datetime import datetime, timedelta, timezone +import httpx +from datetime import datetime, timedelta from pathlib import Path from unittest import mock import pytest +from freezegun import freeze_time + from openaq import __version__ from openaq._sync.client import OpenAQ -from openaq.shared.exceptions import ApiKeyMissingError +from openaq.shared.exceptions import ApiKeyMissingError, RateLimitError from ..mocks import MockTransport +SYNC_USER_AGENT = f"openaq-python-sync-{__version__}-{platform.python_version()}" + + @pytest.fixture def mock_config_file(): mock_toml_content = b"""api-key='test_api_key'""" @@ -26,7 +32,7 @@ def mock_config_file(): class TestClient: @pytest.fixture() def setup(self): - self.client = OpenAQ(api_key="abc123-def456-ghi789", _transport=MockTransport) + self.client = OpenAQ(api_key="abc123-def456-ghi789", transport=MockTransport) @pytest.fixture() def mock_openaq_api_key_env_vars(self): @@ -44,17 +50,14 @@ def test_default_client_params(self, setup): assert self.client._base_url == "https://api.openaq.org/v3/" def test_default_headers(self, setup): - assert ( - self.client.headers["User-Agent"] - == f"openaq-python-{__version__}-{platform.python_version()}" - ) + assert self.client.headers["User-Agent"] == SYNC_USER_AGENT assert self.client.headers["Accept"] == "application/json" def test_custom_headers(self, setup): self.client = OpenAQ( api_key="abc123-def456-ghi789", base_url="https://mycustom.openaq.org", - _transport=MockTransport(), + transport=MockTransport(), ) assert self.client.headers["X-API-Key"] == "abc123-def456-ghi789" @@ -62,7 +65,7 @@ def test_client_params(self, setup): self.client = OpenAQ( api_key="abc123-def456-ghi789", base_url="https://mycustom.openaq.org", - _transport=MockTransport(), + transport=MockTransport(), ) assert self.client._base_url == "https://mycustom.openaq.org" @@ -70,17 +73,17 @@ def test_api_env_var(self, mock_openaq_api_key_env_vars): """ tests that api_key is set from environment variable """ - client = OpenAQ(_transport=MockTransport) + client = OpenAQ(transport=MockTransport) assert client.api_key == "openaq-1a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6p" @pytest.mark.usefixtures("mock_config_file") def test_api_key_from_config(self): if int(platform.python_version_tuple()[1]) >= 11: - client = OpenAQ(_transport=MockTransport) + client = OpenAQ(transport=MockTransport) assert client.api_key == "test_api_key" else: with pytest.raises(ApiKeyMissingError): - client = OpenAQ(_transport=MockTransport) + client = OpenAQ(transport=MockTransport) def test_api_key_arg_override_env_var(self, setup, mock_openaq_api_key_env_vars): """ @@ -102,7 +105,7 @@ def test_api_key_arg_override_env_vars_config( """ assert self.client.api_key == "abc123-def456-ghi789" - @mock.patch('openaq.shared.client.datetime') + @mock.patch('openaq._sync.client.datetime') @mock.patch('time.sleep') @mock.patch('openaq._sync.client.logger') def test_wait_for_rate_limit_reset_waits_when_positive( @@ -122,7 +125,7 @@ def test_wait_for_rate_limit_reset_waits_when_positive( "Rate limit hit. Waiting 5 seconds for reset." ) - @mock.patch('openaq.shared.client.datetime') + @mock.patch('openaq._sync.client.datetime') @mock.patch('time.sleep') @mock.patch('openaq._sync.client.logger') def test_wait_for_rate_limit_reset_does_not_wait_when_zero( @@ -140,7 +143,7 @@ def test_wait_for_rate_limit_reset_does_not_wait_when_zero( mock_sleep.assert_not_called() mock_logger.info.assert_not_called() - @mock.patch('openaq.shared.client.datetime') + @mock.patch('openaq._sync.client.datetime') @mock.patch('time.sleep') @mock.patch('openaq._sync.client.logger') def test_wait_for_rate_limit_reset_does_not_wait_when_negative( @@ -189,3 +192,179 @@ def test_context_manager_exit_closes_even_with_exception(self, setup): raise ValueError("Test exception") self.client.transport.close.assert_called_once() + + def test_blocks_after_custom_limit(self): + """Test that client blocks after exhausting custom rate limit.""" + from openaq.shared.exceptions import RateLimitError + + client = OpenAQ( + api_key="abc123-def456-ghi789", + transport=MockTransport(), + auto_wait=False, + rate_limit_override=5, + ) + for _ in range(5): + client._check_rate_limit() + client._rate_limit_remaining -= 1 + + with pytest.raises(RateLimitError): + client._check_rate_limit() + + def test_allows_exactly_override_requests(self): + """Test that client allows exactly rate_limit_override requests before blocking.""" + from openaq.shared.exceptions import RateLimitError + + limit = 10 + client = OpenAQ( + api_key="abc123-def456-ghi789", + transport=MockTransport(), + auto_wait=False, + rate_limit_override=limit, + ) + success_count = 0 + for _ in range(limit + 5): + try: + client._check_rate_limit() + client._rate_limit_remaining -= 1 + success_count += 1 + except RateLimitError: + break + + assert success_count == limit + + def test_raises_when_exhausted_and_auto_wait_false(self, setup): + """Test that RateLimitError is raised when exhausted and auto_wait is False.""" + from openaq.shared.exceptions import RateLimitError + + self.client._auto_wait = False + self.client._rate_limit_remaining = 0.0 + self.client._rate_limit_reset_datetime = datetime.now() + timedelta(seconds=30) + + with pytest.raises(RateLimitError): + self.client._check_rate_limit() + + @freeze_time("2026-02-12T00:00:00") + def test_error_message_includes_reset_seconds(self, setup): + """Test that RateLimitError message includes seconds until reset.""" + self.client._auto_wait = False + self.client._rate_limit_remaining = 0.0 + self.client._rate_limit_reset_datetime = datetime(2026, 2, 12, 0, 0, 30) + self.client._current_window_id = datetime.now().strftime("%Y%m%d%H%M") + + with pytest.raises(RateLimitError, match="30"): + self.client._check_rate_limit() + + @mock.patch('time.sleep') + def test_waits_when_exhausted_and_auto_wait_true(self, mock_sleep, setup): + """Test that sleep is called when exhausted and auto_wait is True.""" + self.client._auto_wait = True + self.client._rate_limit_remaining = 0.0 + self.client._rate_limit_reset_datetime = datetime.now() + timedelta(seconds=10) + + self.client._check_rate_limit() + + mock_sleep.assert_called_once() + + @mock.patch('time.sleep') + def test_resets_capacity_after_wait(self, mock_sleep, setup): + """Test that remaining is reset to full capacity after waiting.""" + self.client._auto_wait = True + self.client._rate_limit_remaining = 0.0 + self.client._rate_limit_reset_datetime = datetime.now() + timedelta(seconds=10) + + self.client._check_rate_limit() + + assert self.client._rate_limit_remaining == self.client._rate_limit_capacity + + @mock.patch('openaq._sync.client.datetime') + def test_resets_capacity_on_new_window(self, mock_datetime, setup): + """Test that capacity resets automatically when minute window rolls over.""" + now = datetime(2026, 2, 12, 0, 1, 0) + mock_datetime.now.return_value = now + + self.client._rate_limit_remaining = 0.0 + self.client._current_window_id = "202602120000" + + try: + self.client._check_rate_limit() + except Exception as e: + pytest.fail(f"Unexpected exception raised: {e}") + + assert self.client._rate_limit_remaining == self.client._rate_limit_capacity + + @mock.patch('openaq._sync.client.datetime') + def test_does_not_reset_capacity_within_same_window(self, mock_datetime, setup): + """Test that capacity is not reset when still within the same window.""" + now = datetime(2026, 2, 12, 0, 0, 30) + mock_datetime.now.return_value = now + self.client._current_window_id = now.strftime("%Y%m%d%H%M") + self.client._rate_limit_remaining = 5.0 + + self.client._check_rate_limit() + + assert self.client._rate_limit_remaining == 5.0 + + @mock.patch('openaq._sync.client.datetime') + def test_updates_window_id_on_rollover(self, mock_datetime, setup): + """Test that window ID is updated when minute window rolls over.""" + now = datetime(2026, 2, 12, 0, 1, 0) + mock_datetime.now.return_value = now + self.client._current_window_id = "202602120000" + + self.client._check_rate_limit() + + assert self.client._current_window_id == "202602120001" + + def test_set_rate_limit_updates_remaining_from_headers(self, setup): + """Test that remaining is updated from x-ratelimit-remaining header.""" + headers = httpx.Headers( + {"x-ratelimit-remaining": "45", "x-ratelimit-reset": "60"} + ) + self.client._set_rate_limit(headers) + assert self.client._rate_limit_remaining == 45 + + def test_set_rate_limit_server_remaining_overrides_local_count(self, setup): + """Test that server-provided remaining overrides optimistic local tracking.""" + self.client._rate_limit_remaining = 50.0 + headers = httpx.Headers( + {"x-ratelimit-remaining": "30", "x-ratelimit-reset": "60"} + ) + self.client._set_rate_limit(headers) + assert self.client._rate_limit_remaining == 30 + + def test_set_rate_limit_updates_reset_datetime_from_headers(self, setup): + """Test that reset datetime is calculated correctly from x-ratelimit-reset header.""" + headers = httpx.Headers( + {"x-ratelimit-remaining": "45", "x-ratelimit-reset": "30"} + ) + before = datetime.now() + timedelta(seconds=29) + self.client._set_rate_limit(headers) + after = datetime.now() + timedelta(seconds=32) + + assert before < self.client._rate_limit_reset_datetime < after + + def test_set_rate_limit_missing_headers_does_not_raise(self, setup): + """Test that entirely missing rate limit headers are handled gracefully.""" + headers = httpx.Headers({}) + try: + self.client._set_rate_limit(headers) + except Exception as e: + pytest.fail(f"Unexpected exception raised: {e}") + + def test_set_rate_limit_remaining_allows_further_requests(self, setup): + """Test that after server sync, requests are allowed up to the new remaining count.""" + from openaq.shared.exceptions import RateLimitError + + headers = httpx.Headers( + {"x-ratelimit-remaining": "2", "x-ratelimit-reset": "30"} + ) + self.client._set_rate_limit(headers) + self.client._auto_wait = False + + self.client._check_rate_limit() + self.client._rate_limit_remaining -= 1 + self.client._check_rate_limit() + self.client._rate_limit_remaining -= 1 + + with pytest.raises(RateLimitError): + self.client._check_rate_limit() diff --git a/tests/unit/test_shared_client.py b/tests/unit/test_shared_client.py index 2ae6b258..16588859 100644 --- a/tests/unit/test_shared_client.py +++ b/tests/unit/test_shared_client.py @@ -1,22 +1,22 @@ import os import platform -from datetime import datetime from http import HTTPStatus from pathlib import Path from unittest import mock from unittest.mock import mock_open, patch +from openaq import __version__ + + import httpx import pytest -from freezegun import freeze_time from openaq.shared.client import ( - DEFAULT_USER_AGENT, BaseClient, _get_openaq_config, _has_toml, ) -from openaq.shared.exceptions import ApiKeyMissingError, RateLimitError +from openaq.shared.exceptions import ApiKeyMissingError from tests.unit.mocks import MockTransport @@ -46,7 +46,16 @@ def test__get_openaq_config_file_exists(): assert result == None +DEFAULT_USER_AGENT = f"openaq-python-{__version__}-{platform.python_version()}" + + class SharedClient(BaseClient): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._user_agent = DEFAULT_USER_AGENT + self.resolve_headers() + def close(): pass @@ -56,6 +65,9 @@ def _do(): def _get(): pass + def _set_rate_limit(self, headers): + pass + @pytest.fixture def mock_config_file(): @@ -140,8 +152,8 @@ def test_headers_property(self): assert self.instance.headers == { 'this': 'that', 'Accept': 'application/json', - 'User-Agent': DEFAULT_USER_AGENT, 'X-API-Key': 'abc123-def456-ghi789', + 'User-Agent': DEFAULT_USER_AGENT, } with pytest.raises(AttributeError): self.instance.headers = {'openaq': 'api'} @@ -154,8 +166,8 @@ def test_build_request_headers(self): 'this': 'that', 'Accept': 'application/json', 'Accept-Language': 'en-US,en;q=0.5', - 'User-Agent': DEFAULT_USER_AGENT, 'X-API-Key': 'abc123-def456-ghi789', + 'User-Agent': DEFAULT_USER_AGENT, } def test_build_request_headers_none(self): @@ -163,106 +175,13 @@ def test_build_request_headers_none(self): assert request_headers == { 'this': 'that', 'Accept': 'application/json', - 'User-Agent': DEFAULT_USER_AGENT, 'X-API-Key': 'abc123-def456-ghi789', + 'User-Agent': DEFAULT_USER_AGENT, } - def test___check_rate_limit(self): - mock_response = httpx.Response( - HTTPStatus.OK, - content="Test content", - headers={ - 'X-RateLimit-Used': '0', - 'X-RateLimit-Reset': '60', - 'X-RateLimit-Remaining': '0', - 'X-RateLimit-Limit': '60', - }, - ) - self.instance._set_rate_limit(mock_response.headers) - with pytest.raises(RateLimitError): - self.instance._check_rate_limit() - - @freeze_time("2025-02-27T01:00:00") - def test__is_rate_limited(self): - mock_response = httpx.Response( - HTTPStatus.OK, - content="Test content", - headers={ - 'X-RateLimit-Used': '0', - 'X-RateLimit-Reset': '60', - 'X-RateLimit-Remaining': '59', - 'X-RateLimit-Limit': '60', - }, - ) - self.instance._set_rate_limit(mock_response.headers) - assert self.instance._rate_limit_reset_seconds == 60 - assert self.instance._is_rate_limited() == False - with freeze_time("2025-02-27T01:00:59"): - assert self.instance._is_rate_limited() == False - assert self.instance._rate_limit_reset_seconds == 1 - - @freeze_time("2025-02-27T01:00:00") - def test__is_rate_limited_true(self): - mock_response = httpx.Response( - HTTPStatus.OK, - content="Test content", - headers={ - 'X-RateLimit-Used': '0', - 'X-RateLimit-Reset': '30', - 'X-RateLimit-Remaining': '0', - 'X-RateLimit-Limit': '60', - }, - ) - self.instance._set_rate_limit(mock_response.headers) - with freeze_time("2025-02-27T01:00:29"): - assert self.instance._rate_limit_remaining == 0 - assert self.instance._rate_limit_reset_seconds == 1 - assert self.instance._rate_limit_reset_datetime == datetime( - year=2025, month=2, day=27, hour=1, minute=0, second=30 - ) - assert self.instance._is_rate_limited() - - @freeze_time("2025-02-27T01:00:00") - def test__is_rate_limited_false(self): - mock_response = httpx.Response( - HTTPStatus.OK, - content="Test content", - headers={ - 'X-RateLimit-Used': '0', - 'X-RateLimit-Reset': '30', - 'X-RateLimit-Remaining': '60', - 'X-RateLimit-Limit': '60', - }, - ) - self.instance._set_rate_limit(mock_response.headers) - with freeze_time("2025-02-27T01:00:31"): - assert self.instance._rate_limit_remaining == 60 - assert self.instance._rate_limit_reset_seconds == -1 - assert self.instance._rate_limit_reset_datetime == datetime( - year=2025, month=2, day=27, hour=1, minute=0, second=30 - ) - assert self.instance._is_rate_limited() == False - def test_auto_wait_default_is_true(self): """Test that auto_wait defaults to True.""" instance = SharedClient( MockTransport, api_key='openaq-1a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6p' ) assert instance._auto_wait == True - - def test_auto_wait_false_raises_rate_limit_error(self): - """Test that auto_wait=True doesn't raise RateLimitError when rate limited.""" - instance = SharedClient( - MockTransport, - api_key='openaq-1a2b3c4d5e6f7g8h9i0j1k2l3m4n5o6p', - ) - mock_response = httpx.Response( - HTTPStatus.OK, - content="Test content", - headers={ - 'X-RateLimit-Remaining': '0', - 'X-RateLimit-Reset': '60', - }, - ) - instance._set_rate_limit(mock_response.headers) - self.instance._check_rate_limit() # Should not raise RateLimitError From 7a95ed9f7a208abd13423ed13e09c706d2d259bc Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 13:25:20 -0700 Subject: [PATCH 2/6] mypy type fix --- openaq/_sync/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openaq/_sync/client.py b/openaq/_sync/client.py index b8778ae8..c7a66fec 100644 --- a/openaq/_sync/client.py +++ b/openaq/_sync/client.py @@ -91,7 +91,7 @@ def __init__( self.resolve_headers() self._request_count = 0 rate_limit = rate_limit_override if rate_limit_override is not None else 60 - self._rate_limit_capacity = float(rate_limit) + self._rate_limit_capacity = int(rate_limit) self._rate_limit_reset_datetime = datetime.min self._rate_limit_remaining = self._rate_limit_capacity self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") From 3b62e1cd91c01082087cc63c172dd8ee3afdc84b Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 15:54:35 -0700 Subject: [PATCH 3/6] add aditional sync client tests --- tests/unit/sync/test_sync_client.py | 64 +++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/unit/sync/test_sync_client.py b/tests/unit/sync/test_sync_client.py index ba722f60..98d96d23 100644 --- a/tests/unit/sync/test_sync_client.py +++ b/tests/unit/sync/test_sync_client.py @@ -368,3 +368,67 @@ def test_set_rate_limit_remaining_allows_further_requests(self, setup): with pytest.raises(RateLimitError): self.client._check_rate_limit() + + def test_do_calls_transport_with_correct_args(self, setup): + """Test that _do constructs the correct URL and passes method to transport.""" + mock_response = mock.MagicMock() + mock_response.headers = httpx.Headers({}) + self.client.transport.send_request = mock.Mock(return_value=mock_response) + + self.client._do("get", "locations/1") + + call_kwargs = self.client.transport.send_request.call_args + assert call_kwargs.kwargs["url"] == "https://api.openaq.org/v3/locations/1" + assert call_kwargs.kwargs["method"] == "get" + + def test_do_passes_params_to_transport(self, setup): + """Test that _do passes query params through to the transport.""" + mock_response = mock.MagicMock() + mock_response.headers = httpx.Headers({}) + self.client.transport.send_request = mock.Mock(return_value=mock_response) + + self.client._do("get", "/test", params={"limit": 100, "page": 1}) + + call_kwargs = self.client.transport.send_request.call_args + assert call_kwargs.kwargs["params"] == {"limit": 100, "page": 1} + + def test_do_passes_custom_headers_to_transport(self, setup): + """Test that _do merges custom headers into the request.""" + mock_response = mock.MagicMock() + mock_response.headers = httpx.Headers({}) + self.client.transport.send_request = mock.Mock(return_value=mock_response) + + self.client._do("get", "/test", headers={"X-Custom-Header": "value"}) + + call_kwargs = self.client.transport.send_request.call_args + assert "X-Custom-Header" in call_kwargs.kwargs["headers"] + + def test_do_syncs_rate_limit_from_response_headers(self, setup): + """Test that _do updates rate limit state from response headers.""" + mock_response = mock.MagicMock() + mock_response.headers = httpx.Headers( + { + "x-ratelimit-remaining": "42", + "x-ratelimit-reset": "30", + } + ) + self.client.transport.send_request = mock.Mock(return_value=mock_response) + + self.client._do("get", "/test") + + assert self.client._rate_limit_remaining == 42 + + def test_do_raises_before_sending_when_rate_limited(self, setup): + """Test that _do raises RateLimitError without calling transport when exhausted.""" + from openaq.shared.exceptions import RateLimitError + + self.client._auto_wait = False + self.client._rate_limit_remaining = 0 + self.client._rate_limit_reset_datetime = datetime.now() + timedelta(seconds=30) + self.client._current_window_id = datetime.now().strftime("%Y%m%d%H%M") + self.client.transport.send_request = mock.Mock() + + with pytest.raises(RateLimitError): + self.client._do("get", "/test") + + self.client.transport.send_request.assert_not_called() From 5b0db9e82574ffd5027f40bf7d913bb91a9eed17 Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 16:56:21 -0700 Subject: [PATCH 4/6] add httpx configuration options --- openaq/_async/client.py | 30 ++++++++++++++++++++---- openaq/_async/transport.py | 10 +++++--- openaq/_sync/client.py | 30 +++++++++++++++++------- openaq/_sync/transport.py | 14 +++++++++--- openaq/shared/client.py | 2 -- openaq/shared/transport.py | 10 ++++++++ tests/unit/async/test_async_client.py | 33 ++++++++++++++++++++++++--- tests/unit/sync/test_sync_client.py | 33 ++++++++++++++++++++++++--- tests/unit/test_shared_client.py | 10 ++------ 9 files changed, 138 insertions(+), 34 deletions(-) diff --git a/openaq/_async/client.py b/openaq/_async/client.py index a8ba634c..ac212edc 100644 --- a/openaq/_async/client.py +++ b/openaq/_async/client.py @@ -1,9 +1,9 @@ from __future__ import annotations import asyncio -from datetime import datetime import logging import platform +from datetime import datetime from types import TracebackType from typing import Mapping @@ -22,6 +22,7 @@ from openaq._async.models.sensors import Sensors from openaq.shared.client import DEFAULT_BASE_URL, BaseClient from openaq.shared.exceptions import RateLimitError +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT from .transport import AsyncTransport @@ -34,9 +35,20 @@ class AsyncOpenAQ(BaseClient[AsyncTransport]): Args: api_key: The API key for accessing the service. headers: Additional headers to be sent with the request. - auto_wait: Whether to automatically wait when rate limited. Defaults to True. + auto_wait: Whether to automatically wait when rate limited. Defaults to + True. base_url: The base URL for the API endpoint. - _transport: The transport instance for making HTTP requests. For internal use. + transport: The transport instance for making HTTP requests. For internal + use. + rate_limit_override: Override the default rate limit capacity of 60 + requests per minute. + Useful for accounts with a higher rate limit. Defaults to 60. + timeout: Timeout configuration for HTTP requests. Defaults to 5 seconds + for connection, write, and pool, and 8 seconds for read to account + for the API's 6 second processing limit. Pass None for no timeout. + limits: Connection pool limits for the HTTP transport. Defaults to 20 + maximum connections with 10 keepalive connections. Keepalive + connections expire after 30 seconds. Note: An API key can either be passed directly to the OpenAQ client class at @@ -66,6 +78,14 @@ class AsyncOpenAQ(BaseClient[AsyncTransport]): """ + _rate_limit_capacity: float + _rate_limit_remaining: float + _in_flight_requests: int + _current_window_id: str + _sync_in_progress: bool + _lock: asyncio.Lock + _rate_limit_synced_event: asyncio.Event + def __init__( self, api_key: str | None = None, @@ -73,10 +93,12 @@ def __init__( auto_wait: bool = True, base_url: str = DEFAULT_BASE_URL, transport: AsyncTransport | None = None, + timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT, + limits: httpx.Limits = DEFAULT_LIMITS, rate_limit_override: int | None = None, ) -> None: if transport is None: - transport = AsyncTransport() + transport = AsyncTransport(timeout=timeout, limits=limits) if headers is None: headers = {} super().__init__(transport, headers, api_key, auto_wait, base_url) diff --git a/openaq/_async/transport.py b/openaq/_async/transport.py index 19cd2ced..b680ae68 100644 --- a/openaq/_async/transport.py +++ b/openaq/_async/transport.py @@ -3,14 +3,18 @@ import httpx -from ..shared.transport import check_response +from ..shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT, check_response logger = logging.getLogger(__name__) class AsyncTransport: - def __init__(self) -> None: - self.client = httpx.AsyncClient(timeout=15.0) + def __init__( + self, + timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT, + limits: httpx.Limits = DEFAULT_LIMITS, + ) -> None: + self.client = httpx.AsyncClient(timeout=timeout, limits=limits) async def send_request( self, diff --git a/openaq/_sync/client.py b/openaq/_sync/client.py index c7a66fec..7210ed19 100644 --- a/openaq/_sync/client.py +++ b/openaq/_sync/client.py @@ -1,9 +1,9 @@ from __future__ import annotations -from datetime import datetime, timedelta import logging import platform import time +from datetime import datetime, timedelta from types import TracebackType from typing import Mapping @@ -22,6 +22,7 @@ from openaq._sync.models.sensors import Sensors from openaq.shared.client import DEFAULT_BASE_URL, BaseClient from openaq.shared.exceptions import RateLimitError +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT from .transport import Transport @@ -34,9 +35,20 @@ class OpenAQ(BaseClient[Transport]): Args: api_key: The API key for accessing the service. headers: Additional headers to be sent with the request. - auto_wait: Whether to automatically wait when rate limited. Defaults to True. + auto_wait: Whether to automatically wait when rate limited. Defaults to + True. base_url: The base URL for the API endpoint. - _transport: The transport instance for making HTTP requests. For internal use. + transport: The transport instance for making HTTP requests. For internal + use. + rate_limit_override: Override the default rate limit capacity of 60 + requests per minute. + Useful for accounts with a higher rate limit. Defaults to 60. + timeout: Timeout configuration for HTTP requests. Defaults to 5 seconds + for connection, write, and pool, and 8 seconds for read to account + for the API's 6 second processing limit. Pass None for no timeout. + limits: Connection pool limits for the HTTP transport. Defaults to 20 + maximum connections with 10 keepalive connections. Keepalive + connections expire after 30 seconds. Note: An API key can either be passed directly to the OpenAQ client class at @@ -68,8 +80,9 @@ class OpenAQ(BaseClient[Transport]): """ _rate_limit_reset_datetime: datetime - _rate_limit_remaining: int - _request_count: int + _rate_limit_remaining: float + _rate_limit_capacity: float + _current_window_id: str def __init__( self, @@ -78,10 +91,12 @@ def __init__( auto_wait: bool = True, base_url: str = DEFAULT_BASE_URL, transport: Transport | None = None, + timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT, + limits: httpx.Limits = DEFAULT_LIMITS, rate_limit_override: int | None = None, ) -> None: if transport is None: - transport = Transport() + transport = Transport(timeout=timeout, limits=limits) if headers is None: headers = {} super().__init__(transport, headers, api_key, auto_wait, base_url) @@ -89,9 +104,8 @@ def __init__( f"openaq-python-sync-{__version__}-{platform.python_version()}" ) self.resolve_headers() - self._request_count = 0 rate_limit = rate_limit_override if rate_limit_override is not None else 60 - self._rate_limit_capacity = int(rate_limit) + self._rate_limit_capacity = float(rate_limit) self._rate_limit_reset_datetime = datetime.min self._rate_limit_remaining = self._rate_limit_capacity self._current_window_id = datetime.now().strftime("%Y%m%d%H%M") diff --git a/openaq/_sync/transport.py b/openaq/_sync/transport.py index f60612f6..a3ee92ea 100644 --- a/openaq/_sync/transport.py +++ b/openaq/_sync/transport.py @@ -3,14 +3,22 @@ import httpx -from openaq.shared.transport import check_response +from openaq.shared.transport import ( + DEFAULT_LIMITS, + DEFAULT_TIMEOUT, + check_response, +) logger = logging.getLogger(__name__) class Transport: - def __init__(self) -> None: - self.client = httpx.Client(timeout=15.0) + def __init__( + self, + timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT, + limits: httpx.Limits = DEFAULT_LIMITS, + ) -> None: + self.client = httpx.Client(timeout=timeout, limits=limits) def send_request( self, diff --git a/openaq/shared/client.py b/openaq/shared/client.py index 8b021fb2..20c301bb 100644 --- a/openaq/shared/client.py +++ b/openaq/shared/client.py @@ -46,8 +46,6 @@ class BaseClient(ABC, Generic[TTransport]): _auto_wait: Whether the client should automatically wait when rate limited instead of raising an exception. _base_url: The base URL of the OpenAQ API. - _rate_limit_reset_datetime: When the current rate limit resets. - _rate_limit_remaining: Number of requests remaining in the current rate limit window. Args: transport: The transport mechanism used for making requests to the OpenAQ API. diff --git a/openaq/shared/transport.py b/openaq/shared/transport.py index 7172f426..420e6926 100644 --- a/openaq/shared/transport.py +++ b/openaq/shared/transport.py @@ -23,6 +23,16 @@ logger = logging.getLogger(__name__) +# Set connect, write and pool to 5, and read to 8 since server times out at 6 anyways. +DEFAULT_TIMEOUT = httpx.Timeout(5.0, read=8.0) + +# connection pool sized to the 60 req/min API rate limit +DEFAULT_LIMITS = httpx.Limits( + max_connections=20, + max_keepalive_connections=10, + keepalive_expiry=30.0, +) + def check_response(res: httpx.Response) -> httpx.Response: """Checks the HTTP response of the request. diff --git a/tests/unit/async/test_async_client.py b/tests/unit/async/test_async_client.py index aec23d84..3a9351e7 100644 --- a/tests/unit/async/test_async_client.py +++ b/tests/unit/async/test_async_client.py @@ -5,8 +5,7 @@ from unittest import mock import httpx - - +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT import pytest from openaq import __version__ @@ -15,7 +14,6 @@ from ..mocks import AsyncMockTransport - ASYNC_USER_AGENT = f"openaq-python-async-{__version__}-{platform.python_version()}" @@ -30,6 +28,7 @@ def mock_config_file(): class TestAsyncClient: + @pytest.fixture() def setup(self): self.client = AsyncOpenAQ( @@ -303,3 +302,31 @@ async def test_blocks_after_custom_limit(self): with pytest.raises(RateLimitError): await client._acquire_token() + + def test_default_timeout_applied_to_transport(self): + """Test that default timeout is applied to the transport.""" + client = AsyncOpenAQ(api_key="abc123-def456-ghi789") + assert client.transport.client.timeout == DEFAULT_TIMEOUT + + def test_custom_timeout_passed_to_transport(self): + """Test that a custom timeout is passed through to the transport.""" + custom_timeout = httpx.Timeout(10.0, read=15.0) + client = AsyncOpenAQ(api_key="abc123-def456-ghi789", timeout=custom_timeout) + assert client.transport.client.timeout == custom_timeout + + def test_default_limits_applied_to_transport(self): + """Test that default connection limits are applied to the transport.""" + with mock.patch('openaq._async.transport.httpx.AsyncClient') as mock_client: + AsyncOpenAQ(api_key="abc123-def456-ghi789") + mock_client.assert_called_once_with( + timeout=DEFAULT_TIMEOUT, limits=DEFAULT_LIMITS + ) + + def test_custom_limits_passed_to_transport(self): + """Test that custom connection limits are passed through to the transport.""" + custom_limits = httpx.Limits(max_connections=5, max_keepalive_connections=2) + with mock.patch('openaq._async.transport.httpx.AsyncClient') as mock_client: + AsyncOpenAQ(api_key="abc123-def456-ghi789", limits=custom_limits) + mock_client.assert_called_once_with( + timeout=DEFAULT_TIMEOUT, limits=custom_limits + ) diff --git a/tests/unit/sync/test_sync_client.py b/tests/unit/sync/test_sync_client.py index 98d96d23..3bd6f1cd 100644 --- a/tests/unit/sync/test_sync_client.py +++ b/tests/unit/sync/test_sync_client.py @@ -1,21 +1,20 @@ import os import platform -import httpx from datetime import datetime, timedelta from pathlib import Path from unittest import mock +import httpx +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT import pytest from freezegun import freeze_time - from openaq import __version__ from openaq._sync.client import OpenAQ from openaq.shared.exceptions import ApiKeyMissingError, RateLimitError from ..mocks import MockTransport - SYNC_USER_AGENT = f"openaq-python-sync-{__version__}-{platform.python_version()}" @@ -432,3 +431,31 @@ def test_do_raises_before_sending_when_rate_limited(self, setup): self.client._do("get", "/test") self.client.transport.send_request.assert_not_called() + + def test_default_timeout_applied_to_transport(self): + """Test that default timeout is applied to the transport.""" + client = OpenAQ(api_key="abc123-def456-ghi789") + assert client.transport.client.timeout == DEFAULT_TIMEOUT + + def test_custom_timeout_passed_to_transport(self): + """Test that a custom timeout is passed through to the transport.""" + custom_timeout = httpx.Timeout(10.0, read=15.0) + client = OpenAQ(api_key="abc123-def456-ghi789", timeout=custom_timeout) + assert client.transport.client.timeout == custom_timeout + + def test_default_limits_applied_to_transport(self): + """Test that default connection limits are applied to the transport.""" + with mock.patch('openaq._sync.transport.httpx.Client') as mock_client: + OpenAQ(api_key="abc123-def456-ghi789") + mock_client.assert_called_once_with( + timeout=DEFAULT_TIMEOUT, limits=DEFAULT_LIMITS + ) + + def test_custom_limits_passed_to_transport(self): + """Test that custom connection limits are passed through to the transport.""" + custom_limits = httpx.Limits(max_connections=5, max_keepalive_connections=2) + with mock.patch('openaq._sync.transport.httpx.Client') as mock_client: + OpenAQ(api_key="abc123-def456-ghi789", limits=custom_limits) + mock_client.assert_called_once_with( + timeout=DEFAULT_TIMEOUT, limits=custom_limits + ) diff --git a/tests/unit/test_shared_client.py b/tests/unit/test_shared_client.py index 16588859..7c3ccce6 100644 --- a/tests/unit/test_shared_client.py +++ b/tests/unit/test_shared_client.py @@ -5,17 +5,11 @@ from unittest import mock from unittest.mock import mock_open, patch -from openaq import __version__ - - import httpx import pytest -from openaq.shared.client import ( - BaseClient, - _get_openaq_config, - _has_toml, -) +from openaq import __version__ +from openaq.shared.client import BaseClient, _get_openaq_config, _has_toml from openaq.shared.exceptions import ApiKeyMissingError from tests.unit.mocks import MockTransport From 26c9ed9711f490ef8cab2dc55512e71d82d264d9 Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 17:08:03 -0700 Subject: [PATCH 5/6] style fixes --- tests/unit/async/test_async_client.py | 2 +- tests/unit/sync/test_sync_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/async/test_async_client.py b/tests/unit/async/test_async_client.py index 3a9351e7..6a5f18f8 100644 --- a/tests/unit/async/test_async_client.py +++ b/tests/unit/async/test_async_client.py @@ -5,12 +5,12 @@ from unittest import mock import httpx -from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT import pytest from openaq import __version__ from openaq._async.client import AsyncOpenAQ from openaq.shared.exceptions import ApiKeyMissingError +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT from ..mocks import AsyncMockTransport diff --git a/tests/unit/sync/test_sync_client.py b/tests/unit/sync/test_sync_client.py index 3bd6f1cd..39e88093 100644 --- a/tests/unit/sync/test_sync_client.py +++ b/tests/unit/sync/test_sync_client.py @@ -5,13 +5,13 @@ from unittest import mock import httpx -from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT import pytest from freezegun import freeze_time from openaq import __version__ from openaq._sync.client import OpenAQ from openaq.shared.exceptions import ApiKeyMissingError, RateLimitError +from openaq.shared.transport import DEFAULT_LIMITS, DEFAULT_TIMEOUT from ..mocks import MockTransport From d15d2dd309a2679a8476bdacc6599e0ab3a4aabd Mon Sep 17 00:00:00 2001 From: Russ Biggs Date: Tue, 24 Feb 2026 17:20:15 -0700 Subject: [PATCH 6/6] changelog --- CHANGELOG.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7838d261..9338880a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [1.0.0rc2] - 2026-02-24 + +### Updated + +- HTTPX client configuration settings for timeout +- HTTPX client configuration settings for limits + +### Fixed + +- Fixed creation of headers on client classes to remove mutable default argument. +- Fixed creation of transport on client classes to remove mutable default argument. +- automatic rate limiting for AsyncOpenAQ, no longer relies solely on HTTP +headers, a more reliable method for async usage. + ## [1.0.0rc1] - 2026-02-13 **Breaking changes**