Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion openaq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging

__version__ = "1.0.0rc3"
__version__ = "1.0.0rc4"


logger = logging.getLogger(__name__)
Expand All @@ -18,6 +18,7 @@
GatewayTimeoutError,
HTTPRateLimitError,
IdentifierOutOfBoundsError,
InvalidParameterError,
NotAuthorizedError,
NotFoundError,
RateLimitError,
Expand All @@ -34,6 +35,7 @@
"NotAuthorizedError",
"NotFoundError",
"TimeoutError",
"InvalidParameterError",
"ValidationError",
"GatewayTimeoutError",
"HTTPRateLimitError",
Expand All @@ -43,4 +45,5 @@
"ServerError",
"ServiceUnavailableError",
"BadGatewayError",
"__version__",
]
54 changes: 24 additions & 30 deletions openaq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
from pathlib import Path
from types import TracebackType
from typing import Mapping
from urllib.parse import urljoin, urlparse

from openaq import __version__
from openaq.core.exceptions import ApiKeyMissingError, RateLimitError
from openaq.core.transport import (
DEFAULT_LIMITS,
DEFAULT_TIMEOUT,
Headers,
Limits,
Response,
Timeout,
Transport,
)
from openaq.models.countries import Countries
Expand Down Expand Up @@ -83,17 +82,11 @@ class OpenAQ:
auto_wait: Whether to automatically wait when rate limited. Defaults to
True.
base_url: The base URL for the API endpoint.
transport: The transport instance for making HTTP requests. For internal
_transport: The transport instance for making HTTP requests. For internal
use.
rate_limit_override: Override the default rate limit capacity of 60
requests per minute. Useful for accounts with a higher rate limit.
Defaults to None.
timeout: Timeout configuration for HTTP requests. Defaults to 5 seconds
for connection and pool, and 8 seconds for read to account for the
API's 6 second processing limit. Pass None for no timeout.
limits: Connection pool limits for the HTTP transport. Defaults to 20
maximum connections with 10 keepalive connections. Keepalive
connections expire after 30 seconds.

Note:
An API key can either be passed directly to the OpenAQ client class at
Expand Down Expand Up @@ -131,9 +124,7 @@ def __init__(
headers: Mapping[str, str] | None = None,
auto_wait: bool = True,
base_url: str = DEFAULT_BASE_URL,
transport: Transport | None = None,
timeout: float | Timeout | None = DEFAULT_TIMEOUT,
limits: Limits = DEFAULT_LIMITS,
_transport: Transport | None = None, # internal use only
rate_limit_override: int | None = None,
) -> None:
"""Initializes the OpenAQ client.
Expand All @@ -146,14 +137,8 @@ def __init__(
auto_wait: Whether to automatically wait when rate limited. Defaults
to True.
base_url: The base URL for the API endpoint.
transport: The transport instance for making HTTP requests. For
_transport: The transport instance for making HTTP requests. For
internal use.
timeout: Timeout configuration for HTTP requests. Defaults to 5
seconds for connection and 8 seconds for read. Pass None for no
timeout.
limits: Connection pool limits for the HTTP transport. Defaults to
20 maximum connections with 10 keepalive connections expiring
after 30 seconds.
rate_limit_override: Initial rate limit capacity in requests per
minute. Defaults to 60 and is corrected automatically from
server response headers after the first request.
Expand All @@ -163,12 +148,17 @@ def __init__(
URL is used.
"""
self._api_key = _resolve_api_key(api_key)
self._base_url = base_url
parsed = urlparse(base_url)
if not parsed.scheme or not parsed.netloc:
raise ValueError(
f"Invalid base_url, must be a fully qualified URL: {base_url!r}"
)
self._base_url = parsed.geturl()
self._auto_wait = auto_wait
self._transport = (
transport
if transport is not None
else Transport(timeout=timeout, limits=limits)
_transport
if _transport is not None
else Transport(timeout=DEFAULT_TIMEOUT, limits=DEFAULT_LIMITS)
)
self._headers = Headers(headers or {})

Expand All @@ -181,8 +171,8 @@ def __init__(
)

self._user_agent = f"openaq-python-{__version__}-{platform.python_version()}"
assert self._api_key is not None
self._headers["X-API-Key"] = self._api_key
if self._api_key:
self._headers["X-API-Key"] = self._api_key
self._headers["User-Agent"] = self._user_agent
self._headers["Accept"] = ACCEPT_HEADER

Expand Down Expand Up @@ -251,9 +241,13 @@ def _check_rate_limit(self) -> None:
self._wait_for_rate_limit_reset()
self._rate_limit_remaining = self._rate_limit_capacity
else:
message = f"Rate limit exceeded. Limit resets in {self._rate_limit_reset_seconds} seconds"
logger.error(message)
raise RateLimitError(message)
logger.error(
"Rate limit exceeded. Limit resets in %s seconds",
self._rate_limit_reset_seconds,
)
raise RateLimitError(
f"Rate limit exceeded. Limit resets in {self._rate_limit_reset_seconds} seconds"
)
self._rate_limit_remaining -= 1

def _set_rate_limit(self, headers: Headers) -> None:
Expand Down Expand Up @@ -286,7 +280,7 @@ def _wait_for_rate_limit_reset(self) -> None:
"""
wait_seconds = self._rate_limit_reset_seconds
if wait_seconds > 0:
logger.info(f"Rate limit hit. Waiting {wait_seconds} seconds for reset.")
logger.info("Rate limit hit. Waiting %s seconds for reset.", wait_seconds)
time.sleep(wait_seconds)

def _get_int_header(self, headers: Headers, key: str, default: int) -> int:
Expand Down Expand Up @@ -349,7 +343,7 @@ def _do(
"""
self._check_rate_limit()
request_headers = self._build_request_headers(headers)
url = self._base_url + path
url = urljoin(self._base_url, path.lstrip("/"))
data = self._transport.send_request(
method=method, url=url, params=params, headers=request_headers
)
Expand Down
94 changes: 48 additions & 46 deletions openaq/core/transport.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Base class and utlity functions for working with client transport."""
"""Base class and utility functions for working with client transport."""

from __future__ import annotations

import http.client
import json
import logging
import ssl
import threading
import time
import urllib.parse
Expand Down Expand Up @@ -336,7 +337,7 @@ def release(self, pc: PooledConnection, *, discard: bool = False) -> None:

def close_all(self) -> None:
"""Closes all idle connections in the pool and resets its state."""
with self._lock:
with self._has_capacity:
for q in self._idle.values():
for pc in q:
try:
Expand All @@ -345,6 +346,7 @@ def close_all(self) -> None:
pass
self._idle.clear()
self._total = 0
self._has_capacity.notify_all()


def _encode_params(
Expand Down Expand Up @@ -434,21 +436,31 @@ def _raw_request(
for attempt in range(2):
pc = self._pool.acquire(host, self._pool_timeout)
try:
if self._read_timeout is not None:
if pc.conn.sock is not None:
pc.conn.sock.settimeout(self._read_timeout)

pc.conn.request(method, path, headers=dict(headers))
raw = pc.conn.getresponse()

# After connect, set socket timeout for the read.
if self._read_timeout is not None and pc.conn.sock is not None:
pc.conn.sock.settimeout(self._read_timeout)

raw = pc.conn.getresponse()
body = raw.read()
resp = Response(raw.status, body, raw.msg)
self._pool.release(pc)
return resp

except ssl.SSLCertVerificationError as exc:
self._pool.release(pc, discard=True)
logger.error(
"SSL certificate verification failed for %s: %s. "
"On macOS, run 'Install Certificates.command' in your Python "
"installation directory to fix this.",
host,
exc,
)
raise

except ssl.SSLError as exc:
self._pool.release(pc, discard=True)
logger.error("SSL error for %s: %s", host, exc)
raise

except (OSError, http.client.HTTPException) as exc:
self._pool.release(pc, discard=True)
if attempt == 1:
Expand Down Expand Up @@ -495,7 +507,7 @@ def send_request(
if parsed.query:
path = f"{path}?{parsed.query}"

res = self._raw_request(method, host, path, headers)
res = self._raw_request(method.upper(), host, path, headers)
logger.debug("Received response: %s from %s", res.status_code, url)
return check_response(res)

Expand All @@ -504,6 +516,20 @@ def close(self) -> None:
self._pool.close_all()


_HTTP_SATUS_MAP = {
HTTPStatus.BAD_REQUEST: BadRequestError,
HTTPStatus.NOT_FOUND: NotFoundError,
HTTPStatus.REQUEST_TIMEOUT: TimeoutError,
HTTPStatus.FORBIDDEN: ForbiddenError,
HTTPStatus.UNPROCESSABLE_ENTITY: ValidationError,
HTTPStatus.TOO_MANY_REQUESTS: HTTPRateLimitError,
HTTPStatus.UNAUTHORIZED: NotAuthorizedError,
HTTPStatus.INTERNAL_SERVER_ERROR: ServerError,
HTTPStatus.BAD_GATEWAY: BadGatewayError,
HTTPStatus.SERVICE_UNAVAILABLE: ServiceUnavailableError,
}


def check_response(res: Response) -> Response:
"""Checks the HTTP response of the request.

Expand All @@ -528,42 +554,18 @@ def check_response(res: Response) -> Response:
"""
if res.status_code >= HTTPStatus.OK and res.status_code < HTTPStatus.BAD_REQUEST:
return res
elif res.status_code == HTTPStatus.BAD_REQUEST:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise BadRequestError(res.text)
elif res.status_code == HTTPStatus.NOT_FOUND:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise NotFoundError(res.text)
elif res.status_code == HTTPStatus.REQUEST_TIMEOUT:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise TimeoutError(res.text)
elif res.status_code == HTTPStatus.FORBIDDEN:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise ForbiddenError(res.text)
elif res.status_code == HTTPStatus.UNPROCESSABLE_ENTITY:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise ValidationError(res.text)
elif res.status_code == HTTPStatus.TOO_MANY_REQUESTS:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise HTTPRateLimitError(res.text)
elif res.status_code == HTTPStatus.UNAUTHORIZED:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise NotAuthorizedError(res.text)
elif res.status_code == HTTPStatus.INTERNAL_SERVER_ERROR:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise ServerError(res.text)
elif res.status_code == HTTPStatus.BAD_GATEWAY:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise BadGatewayError(res.text)
elif res.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise ServiceUnavailableError(res.text)
elif res.status_code == HTTPStatus.GATEWAY_TIMEOUT:
logger.exception(f"HTTP {res.status_code} - {res.text}")
if res.status_code == HTTPStatus.GATEWAY_TIMEOUT:
logger.error("HTTP %s - %s", res.status_code, res.text)
raise GatewayTimeoutError(
"Your request timed out on the server. "
"Consider reducing the complexity of your request."
)
else:
logger.exception(f"HTTP {res.status_code} - {res.text}")
raise Exception
try:
http_status = HTTPStatus(res.status_code)
except ValueError:
http_status = None
exc_class = (
_HTTP_SATUS_MAP.get(http_status, ServerError) if http_status else ServerError
)
logger.error("HTTP %s - %s", res.status_code, res.text)
raise exc_class(res.text)
6 changes: 3 additions & 3 deletions openaq/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ def validate_datetime_params(
if date_to:
if not date_check(date_from) or not date_check(date_to):
raise InvalidParameterError(
f"Invalid date_from or date_to, must be either date type or ISO-8601 formatted date string, got {type(date_from)} and {type(date_to)}"
f"Invalid date_from or date_to, must be either datetime.date type or ISO-8601 formatted date string, got {type(date_from)} and {type(date_to)}"
)
date_from_date = to_date(date_from)
date_to_date = to_date(date_to)
Expand All @@ -888,7 +888,7 @@ def validate_datetime_params(
if datetime_to is not None:
if not datetime_check(datetime_from) or not datetime_check(datetime_to):
raise InvalidParameterError(
f"Invalid datetime_from or datetime_to, must be either datetime type or ISO-8601 formatted string, got {type(datetime_from)} and {type(datetime_to)}"
f"Invalid datetime_from or datetime_to, must be either datetime.datetime type or ISO-8601 formatted string, got {type(datetime_from)} and {type(datetime_to)}"
)
datetime_from_datetime = to_datetime(datetime_from)
datetime_to_datetime = to_datetime(datetime_to)
Expand All @@ -902,7 +902,7 @@ def validate_datetime_params(
elif datetime_from is not None:
if not datetime_check(datetime_from):
raise InvalidParameterError(
f"Invalid datetime_from, must be either datetime type or ISO-8601 formatted string, got {type(datetime_from)}"
f"Invalid datetime_from, must be either datetime.datetime type or ISO-8601 formatted string, got {type(datetime_from)}"
)
datetime_from_datetime = to_datetime(datetime_from)
if not datetime_from_lesser_check(datetime_from_datetime):
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Base class for OpenAQ API resource models."""

from __future__ import annotations

from typing import TYPE_CHECKING
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/countries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the countries endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import CountriesResponse
from openaq.core.types import SortOrder
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/instruments.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the instruments endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import InstrumentsResponse
from openaq.core.types import SortOrder
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/licenses.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the licenses endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import LicensesResponse
from openaq.core.types import SortOrder
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/locations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the locations endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import (
LatestResponse,
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/manufacturers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the manufacturers endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import InstrumentsResponse, ManufacturersResponse
from openaq.core.types import SortOrder
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/measurements.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the measurements endpoints of the OpenAQ API."""

import datetime
from typing import overload

Expand Down
2 changes: 2 additions & 0 deletions openaq/models/owners.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the owners endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import OwnersResponse
from openaq.core.types import SortOrder
Expand Down
2 changes: 2 additions & 0 deletions openaq/models/parameters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Resource model for interacting with the parameters endpoints of the OpenAQ API."""

from openaq.core.models import build_query_params
from openaq.core.responses import LatestResponse, ParametersResponse
from openaq.core.types import ParameterType, SortOrder
Expand Down
Loading
Loading