diff --git a/.gitignore b/.gitignore index cfece36..e44b9d4 100644 --- a/.gitignore +++ b/.gitignore @@ -172,3 +172,4 @@ scratches/ .ralphex/ +mise.local.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 96cc321..674cabd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,6 +3,7 @@ repos: rev: v5.0.0 hooks: - id: check-yaml + args: [--unsafe] - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/CHANGELOG.md b/CHANGELOG.md index e8f7bff..2847c3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `ClientOptions.trust_env: bool = False` — opt-in to environment-driven HTTP proxy + configuration (`HTTP_PROXY`, `HTTPS_PROXY`, `ALL_PROXY`, `NO_PROXY`) and netrc-based + proxy credentials on both backends. Netrc is used for proxy credentials only; kubex's + per-request bearer `Authorization` header always wins over any netrc-derived target-host + Basic auth. + +### Changed + +- The httpx backend now passes `trust_env=False` to `httpx.AsyncClient` by default, + overriding httpx's own library default of `True`. Users who relied on httpx silently + honoring `HTTP_PROXY` / `HTTPS_PROXY` / `NO_PROXY` must opt in explicitly with + `ClientOptions(trust_env=True)`. This aligns httpx behavior with aiohttp, which has + always defaulted to `trust_env=False`. + ## [0.1.0-beta.1] - 2026-05-06 Initial public beta release. diff --git a/CLAUDE.md b/CLAUDE.md index cd8261e..85004fe 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -70,7 +70,7 @@ kubex/ # Main package — PEP 420 namespace package (no │ └── _protocol.py # ApiProtocol[ResourceType], type aliases, SubresourceNotAvailable, namespace helpers ├── client/ # HTTP client implementations │ ├── client.py # BaseClient ABC, create_client() factory, ClientChoise enum -│ ├── options.py # ClientOptions pydantic model — timeout, log_api_warnings, proxy, keep_alive, keep_alive_timeout, buffer_size, ws_max_message_size, pool_size, pool_size_per_host +│ ├── options.py # ClientOptions pydantic model — timeout, log_api_warnings, proxy, keep_alive, keep_alive_timeout, buffer_size, ws_max_message_size, pool_size, pool_size_per_host, trust_env │ ├── websocket.py # WebSocketConnection ABC — abstraction used by exec and attach subresources │ ├── httpx.py # HttpxClient implementation (exec via httpx-ws) │ └── aiohttp.py # AioHttpClient implementation (exec via aiohttp ws_connect) @@ -147,13 +147,25 @@ scripts/codegen/ # OpenAPI → Pydantic v2 code generator test/ # Test suite ├── e2e/ # End-to-end tests (testcontainers + K3S) -│ ├── conftest.py # Fixtures: K3S container, client fixtures, temp namespace +│ ├── conftest.py # Fixtures: K3S container, client fixtures, temp namespace, bearer-token SA fixtures (sa_token, kubernetes_token_config), client_choice +│ ├── _helpers.py # Shared e2e helpers: create_busybox_pod(), wait_for_pod_running(), +│ # mint_sa_token(k3s, *, sa_name, crb_name, clusterrole="view") — provisions a SA + ClusterRoleBinding and returns a short-lived JWT │ ├── test_core_api_pod.py # Pod CRUD tests │ ├── test_core_api_namespaces.py # Namespace listing tests │ ├── test_subresource_apis.py # E2E tests for Status, Eviction, EphemeralContainers, Resize subresources │ ├── test_exec.py # E2E tests for Pod exec subresource (run + stream against K3S) │ ├── test_attach.py # E2E tests for Pod attach subresource (stream against K3S) -│ └── test_portforward.py # E2E tests for Pod portforward subresource (forward() + listen() against K3S) +│ ├── test_portforward.py # E2E tests for Pod portforward subresource (forward() + listen() against K3S) +│ ├── test_bearer_token_auth.py # E2E tests for bearer-token auth (no proxy): valid SA token succeeds, invalid token returns 401 +│ └── proxy/ # E2E tests for trust_env proxy support (Squid on shared Docker network) +│ ├── _constants.py # SQUID_PROXY_USER / SQUID_PROXY_PASSWORD shared by conftest and tests +│ ├── conftest.py # Fixtures: Docker network, K3S-in-network, Squid, proxy_url, proxy config, +│ # kubernetes_token_config_via_proxy (view SA), kubernetes_admin_token_config_via_proxy (admin SA — needed for pods/exec), +│ # tmp_namespace_via_proxy / tmp_namespace_name_via_proxy (direct-access provisioning, bypasses Squid), proxy_netrc, +│ # clean_proxy_env (autouse — clears all proxy env vars before each test) +│ └── test_trust_env.py # REST proxy tests: URL-embedded creds, netrc creds, missing-creds-fails, NO_PROXY bypass, bearer-token via proxy. +│ # WS exec proxy tests: cert-auth exec via proxy, bearer-token exec via proxy, +│ # missing-proxy-creds WS handshake fails (KubexClientException), NO_PROXY bypass WS connect fails (KubexClientException) ├── test_configuration/ # Unit tests for configuration and auth │ ├── test_file_config.py # Kubeconfig file parsing tests │ ├── test_incluster_config.py # In-cluster configuration tests @@ -180,7 +192,7 @@ test/ # Test suite ├── test_attach/ # Unit tests for attach subresource (AttachOptions, AttachAccessor) ├── test_portforward/ # Unit tests for portforward subresource (PortForwardOptions, channels, PortForwardSession, PortForwardStream, PortforwardAccessor, listen()) ├── test_stream/ # Unit tests for _BaseChannelSession lifecycle + StreamSession channel multiplexer (shared by exec and attach) -├── test_client/ # Unit tests for client WebSocket layer (BaseClient ABC, AioHttpClient, HttpxClient) +├── test_client/ # Unit tests for BaseClient ABC, AioHttpClient, HttpxClient, and trust_env env-proxy + netrc resolver (test_trust_env.py) ├── test_subresource_descriptors/ # Unit tests for descriptor-based subresource APIs └── test_timeout/ # Unit tests for HTTP timeout settings @@ -198,7 +210,7 @@ examples/ # Usage examples ├── exec_pod.py # Pod exec subresource — api.exec.run() + api.exec.stream() interactive shell ├── attach_pod.py # Pod attach subresource — api.attach.stream() with stdin/stdout ├── portforward_pod.py # Pod portforward subresource — api.portforward.forward() (low-level ByteStream) + api.portforward.listen() (local TCP listener) -├── client_options.py # ClientOptions knobs — proxy, keep_alive, buffer_size, pool_size, ws_max_message_size +├── client_options.py # ClientOptions knobs — proxy, keep_alive, buffer_size, pool_size, ws_max_message_size, trust_env ├── custom_resource.py # Define CRD models (Widget, ClusterWidget) + create/get/list/patch/status/delete └── delete_collection.py # Bulk delete with label_selector @@ -263,7 +275,7 @@ Each resource model declares a `__RESOURCE_CONFIG__` class variable (a `Resource All models inherit from `BaseK8sModel` which uses `alias_generator=to_camel` and `populate_by_name=True`. This means Python code uses `snake_case` while JSON serialization uses `camelCase` to match the Kubernetes API. ### Pluggable HTTP clients -`BaseClient` is an ABC. Implementations (`HttpxClient`, `AioHttpClient`) are lazily imported. The `create_client()` factory auto-detects which library is installed (prefers aiohttp, falls back to httpx). `BaseClient` also exposes `connect_websocket(request, subprotocols)` returning a `WebSocketConnection` (defined in `kubex/client/websocket.py`); the default raises `NotImplementedError`. `HttpxClient` implements it via `httpx-ws` (lazy import — raises `ConfgiurationError` if missing); `AioHttpClient` uses aiohttp's built-in `ws_connect`. Both adapters prefer `Request.query_param_pairs` over `Request.query_params` when building the upgrade URL so exec's repeated `command=` entries are preserved. `create_client()` also accepts `options: ClientOptions | None` — a `ClientOptions` instance carrying the client-level `timeout` default, `log_api_warnings` flag, and the following connection-pool / proxy / WebSocket knobs: `proxy` (str or per-scheme dict), `keep_alive` (bool), `keep_alive_timeout` (float|None|...), `buffer_size` (int|None|...), `ws_max_message_size` (int|None|...), `pool_size` (int|None|...), `pool_size_per_host` (int|None|...). When `None`, a `ClientOptions()` with library defaults is used. Both `HttpxClient` and `AioHttpClient` expose an `options` property. `HttpxClient._create_inner_client()` maps these into `httpx.Limits(...)`, `proxy=`, and `mounts=`; `AioHttpClient._create_inner_client()` maps them into `TCPConnector(limit=, limit_per_host=, keepalive_timeout=, force_close=)` and `ClientSession(read_bufsize=, proxy=)`. Backend asymmetries (httpx ignores `buffer_size` and `pool_size_per_host`; aiohttp warns on `keep_alive_timeout=None` and `proxy=dict` with non-matching schemes) emit a `UserWarning` at client construction time. +`BaseClient` is an ABC. Implementations (`HttpxClient`, `AioHttpClient`) are lazily imported. The `create_client()` factory auto-detects which library is installed (prefers aiohttp, falls back to httpx). `BaseClient` also exposes `connect_websocket(request, subprotocols)` returning a `WebSocketConnection` (defined in `kubex/client/websocket.py`); the default raises `NotImplementedError`. `HttpxClient` implements it via `httpx-ws` (lazy import — raises `ConfgiurationError` if missing); `AioHttpClient` uses aiohttp's built-in `ws_connect`. Both adapters prefer `Request.query_param_pairs` over `Request.query_params` when building the upgrade URL so exec's repeated `command=` entries are preserved. `create_client()` also accepts `options: ClientOptions | None` — a `ClientOptions` instance carrying the client-level `timeout` default, `log_api_warnings` flag, and the following connection-pool / proxy / WebSocket knobs: `proxy` (str or per-scheme dict), `keep_alive` (bool), `keep_alive_timeout` (float|None|...), `buffer_size` (int|None|...), `ws_max_message_size` (int|None|...), `pool_size` (int|None|...), `pool_size_per_host` (int|None|...), `trust_env` (bool, default `False` — opts into HTTP_PROXY/HTTPS_PROXY/NO_PROXY env vars and ~/.netrc proxy-credential lookup on both backends; setting `trust_env=True` alongside an explicit `proxy` emits a `UserWarning` and force-disables env reads). When `None`, a `ClientOptions()` with library defaults is used. Both `HttpxClient` and `AioHttpClient` expose an `options` property. `HttpxClient._create_inner_client()` maps these into `httpx.Limits(...)`, `proxy=`, and `mounts=`; `AioHttpClient._create_inner_client()` maps them into `TCPConnector(limit=, limit_per_host=, keepalive_timeout=, force_close=)` and `ClientSession(read_bufsize=, proxy=)`. Backend asymmetries (httpx ignores `buffer_size` and `pool_size_per_host`; aiohttp warns on `keep_alive_timeout=None` and `proxy=dict` with non-matching schemes; `trust_env=True` on httpx: when a proxy env var is found at construction, kubex materialises it into a concrete `httpx.Proxy(auth=...)` (snapshot — subsequent env mutations have no effect); when no proxy env var is found at construction, httpx receives `trust_env=True` directly and re-reads env vars per-request. aiohttp always retains its native per-request env lookup) emit a `UserWarning` at client construction time where relevant. ### Configuration auto-loading `create_client()` → tries kubeconfig file first → falls back to in-cluster pod environment. @@ -312,6 +324,7 @@ Resources declare capabilities via multiple inheritance from marker classes: `Na - E2E tests are parameterized over both HTTP clients (`httpx`, `aiohttp`) and async backends (`asyncio`, `trio` — trio only with httpx) - Mark async tests with `@pytest.mark.anyio` - The `conftest.py` provides session-scoped K3S cluster, per-test client fixtures, and a temporary namespace fixture that creates/cleans up namespaces +- The `test/e2e/proxy/` sub-suite spins up a **separate** K3S cluster (`k3s_in_network`) on a shared Docker network alongside a Squid forward proxy. Namespace and pod provisioning fixtures connect to `k3s_in_network` directly (bypassing the proxy) so that test infrastructure does not depend on the code under test. For exec/attach/portforward tests via bearer token, use `kubernetes_admin_token_config_via_proxy` (admin ClusterRole); the standard `kubernetes_token_config_via_proxy` uses `view`, which lacks `pods/exec` permission. ## CI/CD diff --git a/README.md b/README.md index c919cd5..d16800e 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Kubex works with both **asyncio** and **trio** (via httpx), with no framework lo * `asyncio` and `trio` async runtime support (only `httpx` client is supported for `trio`). * Comprehensive, fully-typed Kubernetes resource models (1.32–1.37) generated from the OpenAPI spec via a built-in code generator. * Custom Resource Definitions (CRDs) — define a Pydantic model inheriting from `NamespaceScopedEntity` or `ClusterScopedEntity` with `__RESOURCE_CONFIG__` and use `Api[T]` for full CRUD, watch, and subresource access. No code generation required. See `examples/custom_resource.py` and the [Custom Resources docs](https://kubex.codemageddon.me/advanced/custom-resources/). -* `ClientOptions` — per-client HTTP configuration: `proxy` (single URL string or per-scheme `{"http": …, "https": …}` dict), `keep_alive` / `keep_alive_timeout`, `buffer_size` (aiohttp read buffer), `ws_max_message_size` (WebSocket frame cap for exec/attach/portforward), `pool_size` (total connection pool), and `pool_size_per_host`. Pass `options=ClientOptions(…)` to `create_client()`. See `examples/client_options.py`. +* `ClientOptions` — per-client HTTP configuration: `proxy` (single URL string or per-scheme `{"http": …, "https": …}` dict), `keep_alive` / `keep_alive_timeout`, `buffer_size` (aiohttp read buffer), `ws_max_message_size` (WebSocket frame cap for exec/attach/portforward), `pool_size` (total connection pool), `pool_size_per_host`, and `trust_env` (opt-in to `HTTP_PROXY`/`HTTPS_PROXY`/`NO_PROXY` env vars and `~/.netrc` proxy credentials on both backends). Pass `options=ClientOptions(…)` to `create_client()`. See `examples/client_options.py`. > **Experimental — WebSocket subresources.** The `exec`, `attach`, and > `portforward` APIs described below are still under active development. diff --git a/docs/concepts/clients.md b/docs/concepts/clients.md index 3cd433b..57d020f 100644 --- a/docs/concepts/clients.md +++ b/docs/concepts/clients.md @@ -202,6 +202,59 @@ options = ClientOptions(pool_size_per_host=5) httpx has no per-host pool limit. Setting `pool_size_per_host` to anything other than `...` on an httpx-backed client emits a `UserWarning` and is otherwise ignored. +### `trust_env` + +When `True`, kubex enables environment-driven proxy configuration on both backends: + +| Env var | Effect | +|---|---| +| `HTTP_PROXY` / `HTTPS_PROXY` / `ALL_PROXY` | Select an outbound proxy for matching requests | +| `NO_PROXY` | Exclude specific hosts from proxy routing | +| `~/.netrc` (or `$NETRC`) | Supply basic-auth credentials for the **proxy host** when the env proxy URL carries no embedded `user:pass` | + +The default is `False`. This matches aiohttp's library default and overrides httpx's library default of `True`, making behavior symmetric across both backends out of the box. + +```python +# Read HTTPS_PROXY / HTTP_PROXY / NO_PROXY from the environment +options = ClientOptions(trust_env=True) +``` + +**Canonical flow — HTTPS proxy with netrc credentials:** + +```shell +export HTTPS_PROXY=http://proxy.corp.example.com:3128 +# ~/.netrc +# machine proxy.corp.example.com +# login alice +# password s3cret +``` + +```python +options = ClientOptions(trust_env=True) +# Kubex reads HTTPS_PROXY, finds no user:pass in the URL, looks up +# proxy.corp.example.com in ~/.netrc, and configures the proxy with +# auth=("alice", "s3cret"). +async with await create_client(options=options) as client: + ... +``` + +**Netrc scope:** netrc is used for **proxy credentials only**. Kubex's per-request `Authorization: Bearer …` header always takes priority over any netrc-derived target-host Basic auth. + +**Conflict with `options.proxy`:** when both `trust_env=True` and `options.proxy` are set, the explicit `options.proxy` wins and a `UserWarning` is emitted at client construction. The effective behavior is `trust_env=False` passed to the backend, so `NO_PROXY` cannot silently exempt the explicit proxy. + +!!! note "Backend asymmetries" + - `WS_PROXY` / `WSS_PROXY` env vars are read by **aiohttp only**. On httpx, `wss://` upgrades use `HTTPS_PROXY`. + - `SSL_CERT_FILE` / `SSL_CERT_DIR` env vars are read by **httpx only** (when no custom CA is configured). aiohttp does not read these. + +!!! note "Snapshot semantics (httpx only)" + When a proxy env var (`HTTPS_PROXY`, `HTTP_PROXY`, `ALL_PROXY`, etc.) **is set** at construction time, Kubex's proxy-netrc resolver materializes it into an explicit `httpx.Proxy(auth=...)`. Mutations to those env vars after `create_client()` returns are **not** reflected on the httpx backend. + + When **no** proxy env var is set at construction time, httpx receives `trust_env=True` directly and re-reads env vars on every request (httpx's own default behavior). + + The aiohttp backend always retains its native per-request env lookup regardless of whether a proxy was found at construction. + + If snapshot behavior is unwanted on httpx, use the [Custom underlying HTTP client](#custom-underlying-http-client) escape hatch. + ## Backend asymmetries Some `ClientOptions` fields behave differently (or are unsupported) depending on which HTTP backend is in use. A `UserWarning` is emitted on first use when a field has no effect. @@ -216,6 +269,7 @@ Some `ClientOptions` fields behave differently (or are unsupported) depending on | `ws_max_message_size` | `aconnect_ws(max_message_size_bytes=int)` | `ws_connect(max_msg_size=int)` | | `pool_size` | `Limits(max_connections=int\|None)` | `TCPConnector(limit=int)` — `None` maps to `0` (unlimited) | | `pool_size_per_host` | **Ignored** — warning emitted | `TCPConnector(limit_per_host=int)` — `None` maps to `0` (unlimited) | +| `trust_env=True` | If a proxy env var is found at construction, it is materialized into `httpx.Proxy(auth=...)` (snapshot). If none is found, httpx receives `trust_env=True` and re-reads env vars per-request. | Env vars read per-request (aiohttp native behavior) | Cross-reference: see [Timeouts](../operations/timeouts.md) for the note on `Timeout.write` and `Timeout.pool` being httpx-only fields. @@ -244,3 +298,53 @@ The httpx client is the only client that supports **trio** (in addition to async | Auto-detection priority | 1st | 2nd | For detailed guidance on choosing a client and runtime, see [Clients & Runtimes](../advanced/clients-runtimes.md). + +## Custom underlying HTTP client + +Kubex prioritizes predictable defaults over knob coverage. When the built-in `ClientOptions` wiring is insufficient — for example, you need target-host Basic auth from netrc, a custom retry policy, a custom TLS transport, or per-request env reads on httpx — you can subclass `HttpxClient` or `AioHttpClient` and override `_create_inner_client()`. + +When this escape hatch is used, the subclass is **fully responsible** for mapping all options and configuration. `ClientOptions` proxy, timeout, pool, and `trust_env` fields are **no longer applied automatically**. + +**httpx example:** + +```python +import httpx +from kubex.client.httpx import HttpxClient +from kubex.client import ClientOptions + +class MyHttpxClient(HttpxClient): + def _create_inner_client(self) -> httpx.AsyncClient: + # Full control: target-host NetRCAuth, custom transport, retry policy, etc. + return httpx.AsyncClient( + base_url=str(self.configuration.base_url), + verify=True, + trust_env=True, # per-request env reads + auth=httpx.NetRCAuth(file=None), # target-host Basic auth from netrc + ) + +client = MyHttpxClient(configuration=cfg, options=ClientOptions()) +``` + +**aiohttp example:** + +```python +import ssl +import aiohttp +from kubex.client.aiohttp import AioHttpClient +from kubex.client import ClientOptions + +class MyAioHttpClient(AioHttpClient): + def _create_inner_client(self) -> aiohttp.ClientSession: + ssl_context = ssl.create_default_context() + connector = aiohttp.TCPConnector(ssl=ssl_context) + return aiohttp.ClientSession( + base_url=str(self.configuration.base_url), + connector=connector, + trust_env=True, + ) + +client = MyAioHttpClient(configuration=cfg, options=ClientOptions()) +``` + +!!! warning + Overriding `_create_inner_client()` means `ClientOptions` proxy, timeout, pool, and `trust_env` fields are no longer applied automatically. The subclass is fully responsible for all HTTP client configuration. diff --git a/examples/client_options.py b/examples/client_options.py index 736954a..0dff8c6 100644 --- a/examples/client_options.py +++ b/examples/client_options.py @@ -41,6 +41,10 @@ async def main() -> None: # Per-host connection limit: 10 # (ignored on httpx — UserWarning emitted; aiohttp default is 0 = no limit) pool_size_per_host=10, + # Honor HTTP_PROXY / HTTPS_PROXY / NO_PROXY and ~/.netrc for proxy creds. + # NOTE: using trust_env=True together with proxy= emits a UserWarning; + # in practice use one or the other — not both. + trust_env=True, ) async with await create_client(options=options) as client: diff --git a/kubex/__version__.py b/kubex/__version__.py index 6d80cda..6345ce4 100644 --- a/kubex/__version__.py +++ b/kubex/__version__.py @@ -1 +1 @@ -VERSION = "0.1.0-beta.1" +VERSION = "0.1.0-beta.2" diff --git a/kubex/client/aiohttp.py b/kubex/client/aiohttp.py index 3533b01..964907a 100644 --- a/kubex/client/aiohttp.py +++ b/kubex/client/aiohttp.py @@ -166,6 +166,19 @@ def _create_inner_client(self) -> ClientSession: ) self._resolved_proxy = cast("str | None", kwargs.get("proxy")) + if self.options.trust_env and self._resolved_proxy is not None: + warnings.warn( + "ClientOptions.proxy is set; env-based proxy variables " + "(HTTP_PROXY/HTTPS_PROXY/NO_PROXY) are ignored when " + "trust_env=True coexists with an explicit proxy.", + UserWarning, + stacklevel=3, + ) + effective_trust_env = False + else: + effective_trust_env = self.options.trust_env + kwargs["trust_env"] = effective_trust_env + buffer_size = self.options.buffer_size if isinstance(buffer_size, EllipsisType): kwargs["read_bufsize"] = 2**21 @@ -196,6 +209,7 @@ def _session_kwargs(self) -> dict[str, Any]: } if self._resolved_proxy is not None: kwargs["proxy"] = self._resolved_proxy + kwargs["trust_env"] = self.options.trust_env and self._resolved_proxy is None return kwargs async def request(self, request: Request) -> Response: diff --git a/kubex/client/httpx.py b/kubex/client/httpx.py index 99f3b4e..53cc740 100644 --- a/kubex/client/httpx.py +++ b/kubex/client/httpx.py @@ -1,10 +1,14 @@ from __future__ import annotations +import logging +import netrc +import os import ssl import warnings from contextlib import AbstractAsyncContextManager from types import EllipsisType from typing import TYPE_CHECKING, Any, AsyncGenerator, Sequence, cast +from urllib.parse import urlparse import httpx @@ -96,6 +100,154 @@ def _build_httpx_proxy_kwargs( } +_logger = logging.getLogger("kubex.client.httpx") + + +def _getenv_icase(name: str) -> tuple[str | None, str]: + """Case-insensitive env var lookup: lowercase > uppercase > mixed-case. + + Returns (matched_key, value) or (None, "") if not present at any case variant. + A higher-priority set-but-empty value suppresses lower-priority non-empty values, + preserving the curl/urllib convention that ``https_proxy=""`` suppresses + ``HTTPS_PROXY``. + + Always returns the *actual* key stored in ``os.environ``, not the normalised + lookup form. This is critical on Windows where ``os.environ`` is + case-insensitive: a containment check like ``"http_proxy" in os.environ`` + returns ``True`` when only ``HTTP_PROXY`` is stored, and the subsequent + ``os.environ["http_proxy"]`` call returns the value but the caller would + record the synthetic lowercase key — breaking the HTTPOXY guard which + compares ``found_key == "HTTP_PROXY"``. + """ + lc = name.lower() + uc = name.upper() + lc_match: tuple[str, str] | None = None + uc_match: tuple[str, str] | None = None + mc_match: tuple[str, str] | None = None + for k, v in os.environ.items(): + if k == lc: + lc_match = (k, v) + elif k == uc: + uc_match = (k, v) + elif k.lower() == lc: + mc_match = (k, v) + if lc_match is not None: + return lc_match + if uc_match is not None: + return uc_match + if mc_match is not None: + return mc_match + return None, "" + + +def _host_matches_no_proxy(host: str, no_proxy: str) -> bool: + """NO_PROXY host matching compatible with Python stdlib / aiohttp. + + Entries are comma-separated and whitespace-trimmed. Matching rules: + + - ``*`` matches every host. + - A leading-dot entry (e.g. ``.example.com``) matches the exact domain + AND any subdomain — ``.example.com`` matches both ``example.com`` and + ``api.example.com``, consistent with curl and Python's urllib. + - A bare-domain entry (e.g. ``example.com``) matches the exact host and + any subdomain. + - Port qualifiers in entries (e.g. ``example.com:6443``) are NOT + supported and will never match — consistent with Python stdlib and + aiohttp behavior. Use a bare hostname entry (``example.com``) instead. + - Bracketed IPv6 entries (e.g. ``[::1]``) are NOT supported — use the + bare IPv6 address (``::1``) instead. Consistent with Python stdlib and + aiohttp, which do not strip brackets before matching. + - An IP literal matches only the exact IP; CIDR notation is NOT supported. + - Comparison is case-insensitive. + """ + host = host.lower() + for raw_entry in (e.strip().lower() for e in no_proxy.split(",")): + if not raw_entry: + continue + if raw_entry == "*": + return True + # Strip leading dot so that .example.com and example.com both match the + # domain itself and its subdomains (curl / Python stdlib semantics). + bare = raw_entry.lstrip(".") + if host == bare or host.endswith("." + bare): + return True + return False + + +def _resolve_env_proxy_with_netrc(base_url: str) -> dict[str, Any]: + """Resolve env-based proxy URL and netrc credentials for the httpx backend. + + Returns a kwargs dict to be ``.update()``'d into ``httpx.AsyncClient`` + constructor kwargs: + + - ``{}`` if no env proxy var is set, or NO_PROXY matches the base url host. + - ``{"proxy": ""}`` if the env proxy URL has embedded ``user:pass`` + or netrc has no matching entry. + - ``{"proxy": httpx.Proxy(url=..., auth=(login, password))}`` if the env + proxy URL has no creds AND netrc has a ``machine`` entry for the proxy + host. + + Netrc read failures are caught and silently downgraded to "no creds" with + the proxy URL still applied. Failures are logged at DEBUG level. + """ + parsed_base = urlparse(base_url) + base_host = (parsed_base.hostname or "").lower() + scheme = (parsed_base.scheme or "").lower() + + if scheme == "https": + candidates = ["https_proxy", "all_proxy"] + else: + candidates = ["http_proxy", "all_proxy"] + + proxy_url: str | None = None + for target in candidates: + found_key, found_val = _getenv_icase(target) + if found_key is None: + continue + # CGI safeguard (CVE-2016-1000110 / HTTPOXY): skip exact uppercase + # HTTP_PROXY in CGI context to prevent header injection. Only the + # exact uppercase form can be injected by a CGI server (RFC 3875 + # uppercases header names); lowercase and mixed-case variants are + # user-set and are allowed. Applied on all platforms (including + # Windows) to match stdlib / aiohttp behavior. + if found_key == "HTTP_PROXY" and "REQUEST_METHOD" in os.environ: + continue + if found_val: + proxy_url = found_val + break + # Empty value at the winning priority level suppresses this candidate + # but allows fallback to the next entry (e.g. ALL_PROXY). + + if proxy_url is None: + return {} + + # Case-insensitive NO_PROXY lookup with the same priority rule. + no_proxy_key, no_proxy_val = _getenv_icase("no_proxy") + no_proxy = no_proxy_val if no_proxy_key is not None else "" + if no_proxy and _host_matches_no_proxy(base_host, no_proxy): + return {} + + parsed_proxy = urlparse(proxy_url) + if parsed_proxy.username or parsed_proxy.password: + return {"proxy": proxy_url} + + proxy_host = parsed_proxy.hostname or "" + netrc_path = os.environ.get("NETRC") or None + try: + rc = netrc.netrc(netrc_path) + creds = rc.authenticators(proxy_host) + if creds is not None: + login, _, password = creds + if login is not None: + return { + "proxy": httpx.Proxy(url=proxy_url, auth=(login, password or "")) + } + except (FileNotFoundError, netrc.NetrcParseError, OSError) as exc: + _logger.debug("Failed to read netrc for proxy credentials: %s", exc) + + return {"proxy": proxy_url} + + class HttpxClient(BaseClient): def __init__( self, @@ -157,6 +309,36 @@ def _create_inner_client(self) -> httpx.AsyncClient: if limits is not None: kwargs["limits"] = limits + if self.options.trust_env and self.options.proxy is not None: + warnings.warn( + "ClientOptions.proxy is set; env-based proxy variables " + "(HTTP_PROXY/HTTPS_PROXY/NO_PROXY) are ignored when " + "trust_env=True coexists with an explicit proxy.", + UserWarning, + stacklevel=3, + ) + kwargs["trust_env"] = False + # Passing trust_env=False to httpx also suppresses SSL_CERT_FILE / + # SSL_CERT_DIR. When no custom CA is configured via kubeconfig, + # apply these env vars manually so env-provided CA bundles are not + # silently downgraded to certifi. + if not needs_custom_ssl: + ssl_cert_file = os.environ.get("SSL_CERT_FILE") + ssl_cert_dir = os.environ.get("SSL_CERT_DIR") + if ssl_cert_file or ssl_cert_dir: + env_ssl_ctx = ssl.create_default_context( + cafile=ssl_cert_file, capath=ssl_cert_dir + ) + _verify = env_ssl_ctx + kwargs["verify"] = env_ssl_ctx + elif self.options.trust_env: + kwargs["trust_env"] = True + kwargs.update( + _resolve_env_proxy_with_netrc(str(self.configuration.base_url)) + ) + else: + kwargs["trust_env"] = False + kwargs.update(_build_httpx_proxy_kwargs(self.options.proxy, _verify, limits)) if not isinstance(self.options.buffer_size, EllipsisType): diff --git a/kubex/client/options.py b/kubex/client/options.py index a93550c..e1a6046 100644 --- a/kubex/client/options.py +++ b/kubex/client/options.py @@ -48,6 +48,19 @@ class ClientOptions(BaseModel): The entry whose key matches the API server's URL scheme is used; all other entries are dropped with a warning. httpx applies all dict entries via ``mounts=``. + - ``WS_PROXY`` / ``WSS_PROXY``: these env vars are read by aiohttp only. + On httpx, ``wss://`` upgrades use ``HTTPS_PROXY``. + - ``SSL_CERT_FILE`` / ``SSL_CERT_DIR``: read by httpx only (when no custom + CA is configured). aiohttp does not read these. + - netrc for the target API server host: when ``trust_env=True`` on the aiohttp + backend, aiohttp looks up the target request URL in ``~/.netrc`` and, if + found, treats the result as Basic auth. If the K8s API server hostname + appears in ``~/.netrc`` AND bearer-token auth is active, aiohttp raises + :class:`ValueError` (conflicting ``Authorization`` header and netrc-derived + auth). Avoid adding K8s API server credentials to ``~/.netrc`` when + ``trust_env=True`` is used on the aiohttp backend. This restriction does + not apply to the httpx backend, which resolves netrc only for the proxy + host. See ``trust_env`` for details. """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") @@ -172,6 +185,53 @@ class ClientOptions(BaseModel): ignored on httpx and a :class:`UserWarning` is emitted if it is not ``...``. """ + trust_env: bool = False + """Honor standard HTTP environment variables (and ~/.netrc for proxy creds) for outbound requests. + + When ``True``, kubex enables environment-driven configuration on both backends: + + - **Proxy URL**: ``HTTP_PROXY`` / ``HTTPS_PROXY`` / ``ALL_PROXY`` (lowercase + variants too) select an outbound proxy. + - **Proxy exclusions**: ``NO_PROXY`` excludes hosts from proxy routing. + - **Proxy credentials**: ``~/.netrc`` (or ``$NETRC``) supplies basic-auth + credentials for the **proxy host** when the env proxy URL has no embedded + ``user:pass``. + + Default is ``False``. This matches aiohttp's library default and overrides + httpx's library default of ``True`` so behavior is symmetric across both + backends out of the box. + + **Auth precedence**: kubex's per-request bearer ``Authorization`` header + always wins over any netrc-derived target Basic auth. Netrc support here is + for **proxy credentials only**. + + **Conflict with options.proxy**: when both ``trust_env=True`` and + ``options.proxy`` are set, the explicit ``options.proxy`` wins and a + :class:`UserWarning` is emitted at client construction. + + **Backend asymmetries**: + + - ``WS_PROXY`` / ``WSS_PROXY`` env vars are read by aiohttp only. On httpx, + ``wss://`` upgrades use ``HTTPS_PROXY``. + - ``SSL_CERT_FILE`` / ``SSL_CERT_DIR`` env vars are read by httpx only + (when no custom CA is configured). aiohttp does not read these. + + **Snapshot semantics (httpx only)**: when a proxy URL is found in the + environment at construction time, kubex materialises it into a concrete + ``httpx.Proxy`` object so subsequent env-var mutations do NOT affect the + httpx backend. When no proxy env var is set at construction time, httpx + receives ``trust_env=True`` directly and re-reads env vars on every request + (same as httpx's own default behaviour). The aiohttp backend always retains + aiohttp's native per-request env lookup. + """ + + @field_validator("trust_env", mode="before") + @classmethod + def _normalize_trust_env(cls, value: object) -> object: + if not isinstance(value, bool): + raise ValueError(f"trust_env must be a bool; got {type(value).__name__!r}") + return value + @field_validator("timeout", mode="before") @classmethod def _normalize_timeout(cls, value: object) -> object: diff --git a/mkdocs.yml b/mkdocs.yml index 19340f1..99a3338 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -65,6 +65,9 @@ markdown_extensions: - pymdownx.tabbed: alternate_style: true - pymdownx.details + - pymdownx.emoji: + emoji_index: !!python/name:material.extensions.emoji.twemoji + emoji_generator: !!python/name:material.extensions.emoji.to_svg extra: version: diff --git a/packages/kubex-core/pyproject.toml b/packages/kubex-core/pyproject.toml index 51a3d6d..51b4b4c 100644 --- a/packages/kubex-core/pyproject.toml +++ b/packages/kubex-core/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-core" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Core model types shared by Kubex and its generated resource packages" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-32/pyproject.toml b/packages/kubex-k8s-1-32/pyproject.toml index a1ad165..8243f06 100644 --- a/packages/kubex-k8s-1-32/pyproject.toml +++ b/packages/kubex-k8s-1-32/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-32" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Pydantic v2 Kubernetes 1.32 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-33/pyproject.toml b/packages/kubex-k8s-1-33/pyproject.toml index c892dec..edfaa7b 100644 --- a/packages/kubex-k8s-1-33/pyproject.toml +++ b/packages/kubex-k8s-1-33/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-33" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Pydantic v2 Kubernetes 1.33 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-34/pyproject.toml b/packages/kubex-k8s-1-34/pyproject.toml index 618fd1a..4609f40 100644 --- a/packages/kubex-k8s-1-34/pyproject.toml +++ b/packages/kubex-k8s-1-34/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-34" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Pydantic v2 Kubernetes 1.34 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-35/pyproject.toml b/packages/kubex-k8s-1-35/pyproject.toml index c309cff..28adafc 100644 --- a/packages/kubex-k8s-1-35/pyproject.toml +++ b/packages/kubex-k8s-1-35/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-35" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Pydantic v2 Kubernetes 1.35 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-36/pyproject.toml b/packages/kubex-k8s-1-36/pyproject.toml index 1fa6234..1f5352e 100644 --- a/packages/kubex-k8s-1-36/pyproject.toml +++ b/packages/kubex-k8s-1-36/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-36" -version = "0.1.0-beta.1" +version = "0.1.0-beta.2" description = "Pydantic v2 Kubernetes 1.36 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/packages/kubex-k8s-1-37/pyproject.toml b/packages/kubex-k8s-1-37/pyproject.toml index fe6e599..15caf9a 100644 --- a/packages/kubex-k8s-1-37/pyproject.toml +++ b/packages/kubex-k8s-1-37/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kubex-k8s-1-37" -version = "0.1.0-alpha.2" +version = "0.1.0-alpha.3" description = "Pydantic v2 Kubernetes 1.37 resource models for Kubex" readme = "README.md" requires-python = ">=3.10" diff --git a/pyproject.toml b/pyproject.toml index 8e561a4..8c07eca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ ] [project.urls] -Homepage = "https://github.com/codemageddon/kubex" +Homepage = "https://kubex.codemageddon.me/" Repository = "https://github.com/codemageddon/kubex.git" Issues = "https://github.com/codemageddon/kubex/issues" Changelog = "https://github.com/codemageddon/kubex/blob/main/CHANGELOG.md" diff --git a/test/e2e/_helpers.py b/test/e2e/_helpers.py new file mode 100644 index 0000000..26b177e --- /dev/null +++ b/test/e2e/_helpers.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from testcontainers.k3s import K3SContainer # type: ignore[import-untyped] + +from kubex.api import Api +from kubex.k8s.v1_35.core.v1.container import Container +from kubex.k8s.v1_35.core.v1.pod import Pod +from kubex.k8s.v1_35.core.v1.pod_spec import PodSpec +from kubex_core.models.metadata import ObjectMetadata + +_BUSYBOX_IMAGE = "busybox:1.36" + +_K3S_KUBECONFIG = "/etc/rancher/k3s/k3s.yaml" + + +def _kubectl(k3s: K3SContainer, *args: str, allow_already_exists: bool = False) -> str: + result = k3s.get_wrapped_container().exec_run( + ["kubectl", *args], + environment={"KUBECONFIG": _K3S_KUBECONFIG}, + ) + output: str = result.output.decode() + exit_code: int = result.exit_code + if exit_code != 0: + if allow_already_exists and "AlreadyExists" in output: + return output + raise RuntimeError( + f"kubectl {' '.join(args)} failed (exit {exit_code}): {output}" + ) + return output + + +async def create_busybox_pod(api: Api[Pod], name: str, namespace: str) -> Pod: + return await api.create( + Pod( + metadata=ObjectMetadata(name=name, namespace=namespace), + spec=PodSpec( + containers=[ + Container( + name="main", + image=_BUSYBOX_IMAGE, + command=["sleep", "3600"], + ) + ] + ), + ), + namespace=namespace, + ) + + +async def wait_for_pod_running( + api: Api[Pod], + name: str, + namespace: str, + *, + timeout: int = 300, +) -> Pod: + pod = await api.get(name, namespace=namespace) + if pod.status is not None and pod.status.phase == "Running": + return pod + + resource_version = pod.metadata.resource_version if pod.metadata else None + async for event in api.watch( + field_selector=f"metadata.name={name}", + namespace=namespace, + resource_version=resource_version, + timeout_seconds=timeout, + request_timeout=timeout, + ): + obj = event.object + if ( + isinstance(obj, Pod) + and obj.status is not None + and obj.status.phase == "Running" + ): + return obj + + raise TimeoutError(f"Pod {name} did not reach Running within {timeout}s") + + +def mint_sa_token( + k3s: K3SContainer, + *, + sa_name: str = "kubex-test-sa", + crb_name: str = "kubex-test-sa-binding", + clusterrole: str = "view", +) -> str: + _kubectl( + k3s, + "create", + "serviceaccount", + sa_name, + "-n", + "default", + allow_already_exists=True, + ) + _kubectl( + k3s, + "create", + "clusterrolebinding", + crb_name, + f"--clusterrole={clusterrole}", + f"--serviceaccount=default:{sa_name}", + allow_already_exists=True, + ) + return _kubectl(k3s, "create", "token", sa_name, "-n", "default").strip() diff --git a/test/e2e/conftest.py b/test/e2e/conftest.py index 64f10d0..74dedf2 100644 --- a/test/e2e/conftest.py +++ b/test/e2e/conftest.py @@ -12,6 +12,7 @@ from kubex.configuration.file_config import configure_from_kubeconfig from kubex.k8s.v1_35.core.v1.namespace import Namespace from kubex_core.models.metadata import ObjectMetadata +from test.e2e._helpers import mint_sa_token @pytest.fixture(scope="session") @@ -84,3 +85,40 @@ def tmp_namespace_name( ) -> Generator[str, None, None]: assert tmp_namespace.metadata.name is not None yield tmp_namespace.metadata.name + + +@pytest.fixture(scope="session") +def sa_token(kubernetes: K3SContainer) -> str: + return mint_sa_token(kubernetes) + + +@pytest.fixture +async def kubernetes_token_config( + kubernetes_config: ClientConfiguration, + sa_token: str, +) -> ClientConfiguration: + kubernetes_config.client_cert_file = None + kubernetes_config.client_key_file = None + kubernetes_config._token = sa_token + return kubernetes_config + + +@pytest.fixture(params=[ClientChoise.HTTPX, ClientChoise.AIOHTTP]) +def client_choice( + request: pytest.FixtureRequest, + anyio_backend: str, +) -> ClientChoise: + if anyio_backend == "trio" and request.param != ClientChoise.HTTPX: + pytest.skip("Skipping AIOHTTP client for trio backend") + param: ClientChoise = request.param + return param + + +@pytest.fixture +async def token_client( + kubernetes_token_config: ClientConfiguration, + client_choice: ClientChoise, +) -> AsyncGenerator[BaseClient, None]: + client = await create_client(kubernetes_token_config, client_class=client_choice) + async with client as c: + yield c diff --git a/test/e2e/proxy/__init__.py b/test/e2e/proxy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/e2e/proxy/_constants.py b/test/e2e/proxy/_constants.py new file mode 100644 index 0000000..f53b1b9 --- /dev/null +++ b/test/e2e/proxy/_constants.py @@ -0,0 +1,4 @@ +from __future__ import annotations + +SQUID_PROXY_USER = "alice" +SQUID_PROXY_PASSWORD = "s3cret" diff --git a/test/e2e/proxy/conftest.py b/test/e2e/proxy/conftest.py new file mode 100644 index 0000000..0fedd2e --- /dev/null +++ b/test/e2e/proxy/conftest.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import shlex +import stat +from pathlib import Path +from typing import AsyncGenerator, Generator +from urllib.parse import urlparse + +import pytest +from testcontainers.core.container import DockerContainer # type: ignore[import-untyped] +from testcontainers.core.network import Network # type: ignore[import-untyped] +from testcontainers.core.waiting_utils import wait_for_logs # type: ignore[import-untyped] +from testcontainers.k3s import K3SContainer # type: ignore[import-untyped] +from yaml import safe_load + +from kubex.api import Api +from kubex.client.httpx import HttpxClient +from kubex.configuration.configuration import ClientConfiguration, KubeConfig +from kubex.configuration.file_config import configure_from_kubeconfig +from kubex.k8s.v1_35.core.v1.namespace import Namespace +from kubex_core.models.metadata import ObjectMetadata +from test.e2e._helpers import mint_sa_token +from test.e2e.proxy._constants import SQUID_PROXY_PASSWORD, SQUID_PROXY_USER + +# Heredoc delimiter chosen to be unlikely to appear in the config body. +_SQUID_CONF_HEREDOC = "KUBEX_SQUID_CONF_EOF" + +_SQUID_CONF = """\ +auth_param basic program /usr/lib/squid/basic_ncsa_auth /etc/squid/passwords +auth_param basic realm kubex-proxy-test +acl authenticated proxy_auth REQUIRED +acl k3s_port port 6443 +acl CONNECT method CONNECT +http_access allow CONNECT k3s_port authenticated +http_access deny all +http_port 3128 +access_log none +cache deny all +""" + + +@pytest.fixture(scope="session") +def proxy_network() -> Generator[Network, None, None]: + with Network() as network: + yield network + + +@pytest.fixture(scope="session") +def k3s_in_network(proxy_network: Network) -> Generator[K3SContainer, None, None]: + container = K3SContainer(enable_cgroup_mount=False) + host_ip = container.get_container_host_ip() + # Rebuild command to add k3s docker-network alias in the TLS SANs. + # with_command replaces the command set by K3SContainer.__init__. + container.with_command( + f"server --disable traefik --tls-san={host_ip} --tls-san=k3s" + ) + container.with_network(proxy_network) + container.with_network_aliases("k3s") + with container as k3s: + yield k3s + + +@pytest.fixture(scope="session") +def squid(proxy_network: Network) -> Generator[DockerContainer, None, None]: + container = DockerContainer("ubuntu/squid:edge") + container.with_network(proxy_network) + container.with_exposed_ports(3128) + # Write the config inside the container instead of bind-mounting it from the host. + # A host bind mount fails on Docker daemons running in a VM (e.g. Colima) when the + # host path is not shared into the VM — Docker then creates an empty directory at + # the missing path and tries to mount it onto the file in the image, which fails + # with "trying to mount a directory onto a file". + # ubuntu/squid:edge does not ship apache2-utils, so generate the htpasswd-format + # entry with openssl (already in the image). basic_ncsa_auth understands the + # $apr1$ Apache MD5 format. -d 1 routes startup messages to stderr so the + # wait_for_logs() predicate can match "Accepting HTTP Socket connections". + startup_script = ( + f"cat > /etc/squid/squid.conf <<'{_SQUID_CONF_HEREDOC}'\n" + f"{_SQUID_CONF}" + f"{_SQUID_CONF_HEREDOC}\n" + f'printf "{SQUID_PROXY_USER}:%s\\n" "$(openssl passwd -apr1 -salt kubexpwd {shlex.quote(SQUID_PROXY_PASSWORD)})"' + " > /etc/squid/passwords\n" + "exec squid -N -d 1 -f /etc/squid/squid.conf\n" + ) + # Pass the script as a single-element list so docker-py does not shlex-split it + # into separate tokens (which would only leave "cat" as the -ec argument). + container.with_kwargs(entrypoint=["sh", "-ec"]) + container.with_command([startup_script]) + with container: + wait_for_logs(container, "Accepting HTTP Socket connections") + yield container + + +@pytest.fixture(scope="session") +def proxy_url(squid: DockerContainer) -> str: + host = squid.get_container_host_ip() + port = squid.get_exposed_port(3128) + return f"http://{host}:{port}" + + +@pytest.fixture +async def kubernetes_config_via_proxy( + k3s_in_network: K3SContainer, +) -> AsyncGenerator[ClientConfiguration, None]: + conf = safe_load(k3s_in_network.config_yaml()) + config = KubeConfig.model_validate(conf) + client_config = await configure_from_kubeconfig(config) + # Route through the in-network alias that squid can resolve and K3S has in SANs; + # the --tls-san=k3s flag added in k3s_in_network keeps TLS validation working. + client_config.base_url = "https://k3s:6443" + yield client_config + + +@pytest.fixture +def proxy_netrc(tmp_path: Path, proxy_url: str) -> Path: + host = urlparse(proxy_url).hostname or "" + netrc_file = tmp_path / "netrc" + netrc_file.write_text( + f"machine {host}\nlogin {SQUID_PROXY_USER}\npassword {SQUID_PROXY_PASSWORD}\n" + ) + netrc_file.chmod(stat.S_IRUSR | stat.S_IWUSR) + return netrc_file + + +@pytest.fixture(scope="session") +def sa_token_via_proxy(k3s_in_network: K3SContainer) -> str: + return mint_sa_token(k3s_in_network) + + +@pytest.fixture(scope="session") +def sa_admin_token_via_proxy(k3s_in_network: K3SContainer) -> str: + return mint_sa_token( + k3s_in_network, + sa_name="kubex-test-admin-sa", + crb_name="kubex-test-admin-sa-binding", + clusterrole="admin", + ) + + +@pytest.fixture +async def kubernetes_token_config_via_proxy( + kubernetes_config_via_proxy: ClientConfiguration, + sa_token_via_proxy: str, +) -> ClientConfiguration: + kubernetes_config_via_proxy.client_cert_file = None + kubernetes_config_via_proxy.client_key_file = None + kubernetes_config_via_proxy._token = sa_token_via_proxy + return kubernetes_config_via_proxy + + +@pytest.fixture +async def kubernetes_admin_token_config_via_proxy( + kubernetes_config_via_proxy: ClientConfiguration, + sa_admin_token_via_proxy: str, +) -> ClientConfiguration: + kubernetes_config_via_proxy.client_cert_file = None + kubernetes_config_via_proxy.client_key_file = None + kubernetes_config_via_proxy._token = sa_admin_token_via_proxy + return kubernetes_config_via_proxy + + +@pytest.fixture +async def tmp_namespace_via_proxy( + k3s_in_network: K3SContainer, +) -> AsyncGenerator[Namespace, None]: + conf = safe_load(k3s_in_network.config_yaml()) + config = KubeConfig.model_validate(conf) + # Build a direct-access config (no base_url override) so namespace provisioning + # bypasses Squid and hits the host-port mapping directly. + direct_config = await configure_from_kubeconfig(config) + namespace_template = Namespace( + metadata=ObjectMetadata(generate_name="test-ws-proxy-") + ) + async with HttpxClient(direct_config) as client: + api: Api[Namespace] = Api(Namespace, client=client) + namespace = await api.create(namespace_template) + assert namespace.metadata.name is not None + yield namespace + await api.delete(namespace.metadata.name) + + +@pytest.fixture +def tmp_namespace_name_via_proxy( + tmp_namespace_via_proxy: Namespace, +) -> Generator[str, None, None]: + assert tmp_namespace_via_proxy.metadata.name is not None + yield tmp_namespace_via_proxy.metadata.name + + +@pytest.fixture(autouse=True) +def clean_proxy_env(monkeypatch: pytest.MonkeyPatch) -> None: + for var in ( + "http_proxy", + "HTTP_PROXY", + "https_proxy", + "HTTPS_PROXY", + "ws_proxy", + "WS_PROXY", + "wss_proxy", + "WSS_PROXY", + "all_proxy", + "ALL_PROXY", + "NO_PROXY", + "no_proxy", + "NETRC", + ): + monkeypatch.delenv(var, raising=False) diff --git a/test/e2e/proxy/test_trust_env.py b/test/e2e/proxy/test_trust_env.py new file mode 100644 index 0000000..a305916 --- /dev/null +++ b/test/e2e/proxy/test_trust_env.py @@ -0,0 +1,299 @@ +"""E2E tests for trust_env with a Squid forward proxy and K3S. + +These tests validate the canonical HTTPS_PROXY + ~/.netrc flow on both backends +using a Squid forward proxy on a shared Docker network with K3S. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from kubex.api import Api +from kubex.client.client import ClientChoise, create_client +from kubex.client.options import ClientOptions +from kubex.configuration.configuration import ClientConfiguration +from kubex.core.exceptions import KubexClientException, Unauthorized +from kubex.k8s.v1_35.core.v1.namespace import Namespace +from kubex.k8s.v1_35.core.v1.pod import Pod +from test.e2e._helpers import create_busybox_pod, wait_for_pod_running +from test.e2e.proxy._constants import SQUID_PROXY_PASSWORD, SQUID_PROXY_USER + +pytestmark = pytest.mark.anyio + + +async def test_trust_env_proxy_url_with_embedded_creds_succeeds( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """HTTPS_PROXY with embedded user:pass succeeds — no netrc lookup needed.""" + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Namespace] = Api(Namespace, client=client) + result = await api.list() + + assert result is not None + names = [ns.metadata.name for ns in result.items if ns.metadata] + assert "default" in names + + +async def test_trust_env_proxy_creds_from_netrc_succeeds( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + proxy_netrc: Path, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """HTTPS_PROXY without creds + netrc with creds succeeds. + + Exercises _resolve_env_proxy_with_netrc on httpx and aiohttp's native netrc. + """ + monkeypatch.setenv("HTTPS_PROXY", proxy_url) + monkeypatch.setenv("NETRC", str(proxy_netrc)) + + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Namespace] = Api(Namespace, client=client) + result = await api.list() + + assert result is not None + names = [ns.metadata.name for ns in result.items if ns.metadata] + assert "default" in names + + +async def test_trust_env_proxy_missing_creds_fails( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """HTTPS_PROXY without creds and no netrc -> proxy auth fails. + + Proves proxy authentication actually flows through the wire. Transport-level + 407 surfaces as httpx.ProxyError or aiohttp.ClientHttpProxyError (neither is + KubexClientException) because BaseClient.request() does not wrap + transport-level exceptions from the CONNECT tunnel. + """ + monkeypatch.setenv("HTTPS_PROXY", proxy_url) + # Point NETRC at a guaranteed-nonexistent path so the resolver cannot fall + # back to ~/.netrc on the runner's machine (FileNotFoundError is caught and + # treated as "no creds", which is the isolation we want here). + monkeypatch.setenv("NETRC", str(tmp_path / "kubex_no_netrc")) + + with pytest.raises(Exception): + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Namespace] = Api(Namespace, client=client) + await api.list() + + +async def test_no_proxy_bypasses_proxy( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """NO_PROXY=k3s bypasses the proxy -> direct connection fails with DNS error. + + The docker-network alias "k3s" is not resolvable from the host, so the + direct connection attempt raises a DNS/connection error. httpx raises + httpx.ConnectError; aiohttp raises aiohttp.ClientConnectorError. Neither is + KubexClientException because BaseClient.request() does not wrap + transport-level exceptions. + """ + monkeypatch.setenv("HTTPS_PROXY", proxy_url) + monkeypatch.setenv("NO_PROXY", "k3s") + + with pytest.raises(Exception): + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Namespace] = Api(Namespace, client=client) + await api.list() + + +async def test_trust_env_bearer_token_via_proxy_succeeds( + kubernetes_token_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """SA bearer token + proxy basic auth: api.list() succeeds end-to-end. + + The Authorization: Bearer header travels inside the CONNECT tunnel and is + invisible to the proxy. + """ + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + + async with await create_client( + kubernetes_token_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + result = await Api(Namespace, client=client).list() + + assert result is not None + names = [ns.metadata.name for ns in result.items if ns.metadata] + assert "default" in names + + +async def test_trust_env_bearer_invalid_token_returns_unauthorized( + kubernetes_token_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A garbage token survives the proxy tunnel and is rejected by K8s as 401.""" + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + kubernetes_token_config_via_proxy._token = "not.a.valid.jwt" + + async with await create_client( + kubernetes_token_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + with pytest.raises(Unauthorized): + await Api(Namespace, client=client).list() + + +async def test_trust_env_exec_via_proxy_succeeds( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + tmp_namespace_name_via_proxy: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """WebSocket exec via Squid with URL-embedded proxy creds succeeds.""" + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name_via_proxy) + await create_busybox_pod(api, "ws-exec-cert", tmp_namespace_name_via_proxy) + await wait_for_pod_running(api, "ws-exec-cert", tmp_namespace_name_via_proxy) + result = await api.exec.run("ws-exec-cert", command=["echo", "ws-proxy"]) + + assert result.stdout == b"ws-proxy\n" + assert result.exit_code == 0 + + +async def test_trust_env_exec_bearer_token_via_proxy_succeeds( + kubernetes_admin_token_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + tmp_namespace_name_via_proxy: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """WebSocket exec with SA bearer token via Squid proxy succeeds. + + Uses an admin-role SA token so the bearer token has permission to create pods + and run exec. The Authorization: Bearer header travels inside the CONNECT + tunnel and is invisible to the proxy; the proxy sees only Proxy-Authorization + on the outer CONNECT line. + """ + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + + async with await create_client( + kubernetes_admin_token_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name_via_proxy) + await create_busybox_pod(api, "ws-exec-bearer", tmp_namespace_name_via_proxy) + await wait_for_pod_running(api, "ws-exec-bearer", tmp_namespace_name_via_proxy) + result = await api.exec.run( + "ws-exec-bearer", command=["echo", "ws-proxy-bearer"] + ) + + assert result.stdout == b"ws-proxy-bearer\n" + assert result.exit_code == 0 + + +async def test_trust_env_exec_missing_proxy_creds_fails( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """HTTPS_PROXY without creds and no netrc -> proxy auth fails for WS exec. + + The proxy rejects the CONNECT tunnel with 407 before the WS handshake reaches + the kubelet; no pod needs to exist. Both httpx and aiohttp backends wrap the + transport failure as KubexClientException. + """ + monkeypatch.setenv("HTTPS_PROXY", proxy_url) + monkeypatch.setenv("NETRC", str(tmp_path / "kubex_no_netrc")) + + with pytest.raises(KubexClientException, match="407"): + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Pod] = Api(Pod, client=client, namespace="default") + await api.exec.run("no-such-pod", command=["echo", "hi"]) + + +async def test_trust_env_exec_no_proxy_bypasses_proxy( + kubernetes_config_via_proxy: ClientConfiguration, + proxy_url: str, + client_choice: ClientChoise, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """NO_PROXY=k3s bypasses the proxy -> direct WS connect to k3s alias fails. + + The docker-network alias "k3s" is not resolvable from the host, so the + direct connection attempt raises a DNS/connection error before the WS + handshake reaches the kubelet. No pod needs to exist. Both httpx and aiohttp + backends wrap the transport failure as KubexClientException. + """ + host_port = proxy_url.removeprefix("http://") + monkeypatch.setenv( + "HTTPS_PROXY", f"http://{SQUID_PROXY_USER}:{SQUID_PROXY_PASSWORD}@{host_port}" + ) + monkeypatch.setenv("NO_PROXY", "k3s") + + with pytest.raises(KubexClientException, match="connection failed"): + async with await create_client( + kubernetes_config_via_proxy, + client_class=client_choice, + options=ClientOptions(trust_env=True), + ) as client: + api: Api[Pod] = Api(Pod, client=client, namespace="default") + await api.exec.run("no-such-pod", command=["echo", "hi"]) diff --git a/test/e2e/test_bearer_token_auth.py b/test/e2e/test_bearer_token_auth.py new file mode 100644 index 0000000..78b6a50 --- /dev/null +++ b/test/e2e/test_bearer_token_auth.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import pytest + +from kubex.api import Api +from kubex.client.client import BaseClient, ClientChoise, create_client +from kubex.configuration.configuration import ClientConfiguration +from kubex.core.exceptions import Unauthorized +from kubex.k8s.v1_35.core.v1.namespace import Namespace + +pytestmark = pytest.mark.anyio + + +async def test_bearer_token_succeeds(token_client: BaseClient) -> None: + result = await Api(Namespace, client=token_client).list() + assert result is not None + names = [ns.metadata.name for ns in result.items if ns.metadata] + assert "default" in names + + +async def test_bearer_invalid_token_returns_unauthorized( + kubernetes_token_config: ClientConfiguration, + client_choice: ClientChoise, +) -> None: + kubernetes_token_config._token = "not.a.valid.jwt" + async with await create_client( + kubernetes_token_config, client_class=client_choice + ) as client: + with pytest.raises(Unauthorized): + await Api(Namespace, client=client).list() diff --git a/test/e2e/test_exec.py b/test/e2e/test_exec.py index daac8f7..d90f05c 100644 --- a/test/e2e/test_exec.py +++ b/test/e2e/test_exec.py @@ -6,60 +6,8 @@ from kubex.api import Api from kubex.client import BaseClient from kubex.core.exceptions import KubexClientException -from kubex.k8s.v1_35.core.v1.container import Container from kubex.k8s.v1_35.core.v1.pod import Pod -from kubex.k8s.v1_35.core.v1.pod_spec import PodSpec -from kubex_core.models.metadata import ObjectMetadata - - -_BUSYBOX_IMAGE = "busybox:1.36" - - -async def _create_busybox_pod(api: Api[Pod], name: str, namespace: str) -> Pod: - return await api.create( - Pod( - metadata=ObjectMetadata(name=name, namespace=namespace), - spec=PodSpec( - containers=[ - Container( - name="main", - image=_BUSYBOX_IMAGE, - command=["sleep", "3600"], - ) - ] - ), - ), - namespace=namespace, - ) - - -async def _wait_for_running( - api: Api[Pod], - name: str, - namespace: str, - timeout: int = 300, -) -> Pod: - pod = await api.get(name, namespace=namespace) - if pod.status is not None and pod.status.phase == "Running": - return pod - - resource_version = pod.metadata.resource_version if pod.metadata else None - async for event in api.watch( - field_selector=f"metadata.name={name}", - namespace=namespace, - resource_version=resource_version, - timeout_seconds=timeout, - request_timeout=timeout, - ): - obj = event.object - if ( - isinstance(obj, Pod) - and obj.status is not None - and obj.status.phase == "Running" - ): - return obj - - raise TimeoutError(f"Pod {name} did not reach Running within {timeout}s") +from test.e2e._helpers import create_busybox_pod, wait_for_pod_running async def _read_until( @@ -82,8 +30,8 @@ async def test_exec_run_echo_returns_stdout_and_zero_exit( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-echo", tmp_namespace_name) - await _wait_for_running(api, "exec-echo", tmp_namespace_name) + await create_busybox_pod(api, "exec-echo", tmp_namespace_name) + await wait_for_pod_running(api, "exec-echo", tmp_namespace_name) result = await api.exec.run("exec-echo", command=["echo", "hello"]) assert result.stdout == b"hello\n" @@ -95,8 +43,8 @@ async def test_exec_run_non_zero_exit_code_is_reported( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-exit-code", tmp_namespace_name) - await _wait_for_running(api, "exec-exit-code", tmp_namespace_name) + await create_busybox_pod(api, "exec-exit-code", tmp_namespace_name) + await wait_for_pod_running(api, "exec-exit-code", tmp_namespace_name) result = await api.exec.run("exec-exit-code", command=["sh", "-c", "exit 7"]) assert result.exit_code == 7 @@ -109,8 +57,8 @@ async def test_exec_run_pipes_stdin_to_command( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-stdin", tmp_namespace_name) - await _wait_for_running(api, "exec-stdin", tmp_namespace_name) + await create_busybox_pod(api, "exec-stdin", tmp_namespace_name) + await wait_for_pod_running(api, "exec-stdin", tmp_namespace_name) result = await api.exec.run("exec-stdin", command=["cat"], stdin=b"piped\n") assert result.stdout == b"piped\n" @@ -121,8 +69,8 @@ async def test_exec_stream_interactive_shell( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-stream-sh", tmp_namespace_name) - await _wait_for_running(api, "exec-stream-sh", tmp_namespace_name) + await create_busybox_pod(api, "exec-stream-sh", tmp_namespace_name) + await wait_for_pod_running(api, "exec-stream-sh", tmp_namespace_name) async with api.exec.stream( "exec-stream-sh", @@ -152,8 +100,8 @@ async def test_exec_stream_resize_changes_terminal_size( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-resize", tmp_namespace_name) - await _wait_for_running(api, "exec-resize", tmp_namespace_name) + await create_busybox_pod(api, "exec-resize", tmp_namespace_name) + await wait_for_pod_running(api, "exec-resize", tmp_namespace_name) async with api.exec.stream( "exec-resize", @@ -181,8 +129,8 @@ async def test_exec_against_missing_container_reports_failure( client: BaseClient, tmp_namespace_name: str ) -> None: api: Api[Pod] = Api(Pod, client=client, namespace=tmp_namespace_name) - await _create_busybox_pod(api, "exec-missing-container", tmp_namespace_name) - await _wait_for_running(api, "exec-missing-container", tmp_namespace_name) + await create_busybox_pod(api, "exec-missing-container", tmp_namespace_name) + await wait_for_pod_running(api, "exec-missing-container", tmp_namespace_name) try: result = await api.exec.run( diff --git a/test/test_client/test_aiohttp_options.py b/test/test_client/test_aiohttp_options.py index 90117e6..086ae20 100644 --- a/test/test_client/test_aiohttp_options.py +++ b/test/test_client/test_aiohttp_options.py @@ -2,7 +2,7 @@ import warnings from typing import Any -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -30,22 +30,30 @@ def _config() -> ClientConfiguration: ) -# --- resolve_ws_max_message_size --- - - -def test_resolve_ws_max_message_size_ellipsis_returns_kubex_default() -> None: - assert resolve_ws_max_message_size(...) == 2**21 - - -def test_resolve_ws_max_message_size_none_returns_zero() -> None: - assert resolve_ws_max_message_size(None) == 0 - - -def test_resolve_ws_max_message_size_explicit_int() -> None: - assert resolve_ws_max_message_size(8 * 1024 * 1024) == 8 * 1024 * 1024 - - -# --- _apply_aiohttp_proxy --- +def _patched_session_client( + options: ClientOptions | None = None, +) -> tuple[AioHttpClient, MagicMock]: + """Create AioHttpClient with TCPConnector and ClientSession patched; return (client, session_mock).""" + with ( + patch("kubex.client.aiohttp.TCPConnector") as mock_connector, + patch("kubex.client.aiohttp.ClientSession") as mock_session, + ): + mock_connector.return_value = MagicMock() + mock_session.return_value = MagicMock() + client = AioHttpClient(_config(), options) + return client, mock_session + + +@pytest.mark.parametrize( + "val,expected", + [ + pytest.param(..., 2**21, id="ellipsis_kubex_default"), + pytest.param(None, 0, id="none_no_cap"), + pytest.param(8 * 1024 * 1024, 8 * 1024 * 1024, id="explicit_int"), + ], +) +def test_resolve_ws_max_message_size(val: Any, expected: int) -> None: + assert resolve_ws_max_message_size(val) == expected def test_apply_aiohttp_proxy_none_no_op() -> None: @@ -104,9 +112,6 @@ def test_apply_aiohttp_proxy_dict_no_matching_scheme_warns_no_proxy() -> None: ) -# --- connector: pool_size --- - - @pytest.mark.anyio async def test_aiohttp_pool_size_default_uses_library_default() -> None: # Ellipsis (default) must NOT pass limit at all — let aiohttp decide. @@ -119,18 +124,16 @@ async def test_aiohttp_pool_size_default_uses_library_default() -> None: @pytest.mark.anyio -async def test_aiohttp_pool_size_none_sets_unlimited() -> None: - client = AioHttpClient(_config(), ClientOptions(pool_size=None)) - assert client._inner_client.connector.limit == 0 - - -@pytest.mark.anyio -async def test_aiohttp_pool_size_explicit_int() -> None: - client = AioHttpClient(_config(), ClientOptions(pool_size=50)) - assert client._inner_client.connector.limit == 50 - - -# --- connector: pool_size_per_host --- +@pytest.mark.parametrize( + "pool_size,expected_limit", + [ + pytest.param(None, 0, id="none_unlimited"), + pytest.param(50, 50, id="explicit_int"), + ], +) +async def test_aiohttp_pool_size(pool_size: int | None, expected_limit: int) -> None: + client = AioHttpClient(_config(), ClientOptions(pool_size=pool_size)) + assert client._inner_client.connector.limit == expected_limit @pytest.mark.anyio @@ -165,24 +168,20 @@ async def test_aiohttp_pool_size_per_host_explicit_int() -> None: assert client._inner_client.connector.limit_per_host == 5 -# --- connector: keep_alive --- - - -@pytest.mark.anyio -async def test_aiohttp_keep_alive_true_default_no_force_close() -> None: - client = AioHttpClient(_config(), ClientOptions()) - force_close = getattr(client._inner_client.connector, "_force_close", False) - assert force_close is False - - @pytest.mark.anyio -async def test_aiohttp_keep_alive_false_sets_force_close() -> None: - client = AioHttpClient(_config(), ClientOptions(keep_alive=False)) +@pytest.mark.parametrize( + "options,expected_force_close", + [ + pytest.param(ClientOptions(), False, id="default_keep_alive"), + pytest.param(ClientOptions(keep_alive=False), True, id="keep_alive_false"), + ], +) +async def test_aiohttp_keep_alive_maps_to_force_close( + options: ClientOptions, expected_force_close: bool +) -> None: + client = AioHttpClient(_config(), options) force_close = getattr(client._inner_client.connector, "_force_close", None) - assert force_close is True - - -# --- connector: keep_alive_timeout --- + assert force_close is expected_force_close @pytest.mark.anyio @@ -225,61 +224,34 @@ async def test_aiohttp_keep_alive_false_with_timeout_does_not_raise() -> None: assert force_close is True -# --- session: buffer_size --- - - -@pytest.mark.anyio -async def test_aiohttp_buffer_size_default_is_kubex_default() -> None: - client = AioHttpClient(_config(), ClientOptions()) - assert client._inner_client._read_bufsize == 2**21 - - -@pytest.mark.anyio -async def test_aiohttp_buffer_size_none_uses_library_default() -> None: - client = AioHttpClient(_config(), ClientOptions(buffer_size=None)) - # aiohttp library default is 2**16 - assert client._inner_client._read_bufsize == 2**16 - - -@pytest.mark.anyio -async def test_aiohttp_buffer_size_explicit_int() -> None: - client = AioHttpClient(_config(), ClientOptions(buffer_size=4096)) - assert client._inner_client._read_bufsize == 4096 - - -# --- ws_max_message_size --- - - -@pytest.mark.anyio -async def test_aiohttp_ws_max_message_size_default() -> None: - client = AioHttpClient(_config(), ClientOptions()) - mock_ws = AsyncMock() - mock_ws.protocol = None - mock_ws.closed = False - mock_ws_connect = AsyncMock(return_value=mock_ws) - with patch.object(client._inner_client, "ws_connect", new=mock_ws_connect): - request = Request(method="GET", url="/exec") - await client.connect_websocket(request, []) - assert mock_ws_connect.call_args.kwargs["max_msg_size"] == 2**21 - - @pytest.mark.anyio -async def test_aiohttp_ws_max_message_size_none_no_cap() -> None: - client = AioHttpClient(_config(), ClientOptions(ws_max_message_size=None)) - mock_ws = AsyncMock() - mock_ws.protocol = None - mock_ws.closed = False - mock_ws_connect = AsyncMock(return_value=mock_ws) - with patch.object(client._inner_client, "ws_connect", new=mock_ws_connect): - request = Request(method="GET", url="/exec") - await client.connect_websocket(request, []) - assert mock_ws_connect.call_args.kwargs["max_msg_size"] == 0 +@pytest.mark.parametrize( + "buffer_size,expected_bufsize", + [ + pytest.param(..., 2**21, id="default_kubex_default"), + pytest.param(None, 2**16, id="none_aiohttp_default"), + pytest.param(4096, 4096, id="explicit_int"), + ], +) +async def test_aiohttp_buffer_size(buffer_size: Any, expected_bufsize: int) -> None: + client = AioHttpClient(_config(), ClientOptions(buffer_size=buffer_size)) + assert client._inner_client._read_bufsize == expected_bufsize @pytest.mark.anyio -async def test_aiohttp_ws_max_message_size_explicit() -> None: +@pytest.mark.parametrize( + "ws_max_message_size,expected_max_msg_size", + [ + pytest.param(..., 2**21, id="default_kubex_default"), + pytest.param(None, 0, id="none_no_cap"), + pytest.param(8 * 1024 * 1024, 8 * 1024 * 1024, id="explicit_int"), + ], +) +async def test_aiohttp_ws_max_message_size( + ws_max_message_size: Any, expected_max_msg_size: int +) -> None: client = AioHttpClient( - _config(), ClientOptions(ws_max_message_size=8 * 1024 * 1024) + _config(), ClientOptions(ws_max_message_size=ws_max_message_size) ) mock_ws = AsyncMock() mock_ws.protocol = None @@ -288,36 +260,30 @@ async def test_aiohttp_ws_max_message_size_explicit() -> None: with patch.object(client._inner_client, "ws_connect", new=mock_ws_connect): request = Request(method="GET", url="/exec") await client.connect_websocket(request, []) - assert mock_ws_connect.call_args.kwargs["max_msg_size"] == 8 * 1024 * 1024 - - -# --- proxy integration: propagated through _session_kwargs --- + assert mock_ws_connect.call_args.kwargs["max_msg_size"] == expected_max_msg_size @pytest.mark.anyio -async def test_aiohttp_session_kwargs_includes_proxy_str() -> None: - client = AioHttpClient(_config(), ClientOptions(proxy="http://proxy.corp:8080")) - session_kw = client._session_kwargs() - assert session_kw.get("proxy") == "http://proxy.corp:8080" - - -@pytest.mark.anyio -async def test_aiohttp_session_kwargs_no_proxy_by_default() -> None: - client = AioHttpClient(_config(), ClientOptions()) - session_kw = client._session_kwargs() - assert "proxy" not in session_kw - - -@pytest.mark.anyio -async def test_aiohttp_session_kwargs_includes_proxy_dict_matching_scheme() -> None: - client = AioHttpClient( - _config(), ClientOptions(proxy={"https": "http://proxy.corp:8080"}) - ) +@pytest.mark.parametrize( + "proxy,expected_proxy", + [ + pytest.param(None, None, id="no_proxy"), + pytest.param( + "http://proxy.corp:8080", "http://proxy.corp:8080", id="proxy_str" + ), + pytest.param( + {"https": "http://proxy.corp:8080"}, + "http://proxy.corp:8080", + id="proxy_dict_matching_scheme", + ), + ], +) +async def test_aiohttp_session_kwargs_proxy( + proxy: Any, expected_proxy: str | None +) -> None: + client = AioHttpClient(_config(), ClientOptions(proxy=proxy)) session_kw = client._session_kwargs() - assert session_kw.get("proxy") == "http://proxy.corp:8080" - - -# --- regression: defaults preserve prior behavior --- + assert session_kw.get("proxy") == expected_proxy @pytest.mark.anyio @@ -342,3 +308,96 @@ async def test_aiohttp_default_options_regression() -> None: # no proxy session_kw = client._session_kwargs() assert "proxy" not in session_kw + + +@pytest.mark.parametrize( + "options,expected", + [ + pytest.param(ClientOptions(), False, id="default_false"), + pytest.param(ClientOptions(trust_env=True), True, id="explicit_true"), + ], +) +def test_trust_env_passed_to_session(options: ClientOptions, expected: bool) -> None: + _, mock_session = _patched_session_client(options) + _, kwargs = mock_session.call_args + assert kwargs.get("trust_env") is expected + + +@pytest.mark.parametrize( + "options,expected", + [ + pytest.param(ClientOptions(), False, id="default_false"), + pytest.param(ClientOptions(trust_env=True), True, id="explicit_true"), + ], +) +def test_trust_env_propagated_to_session_kwargs( + options: ClientOptions, expected: bool +) -> None: + client, _ = _patched_session_client(options) + session_kw = client._session_kwargs() + assert session_kw.get("trust_env") is expected + + +def test_trust_env_false_default_with_env_set_aiohttp( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", "http://leak.example.com:3128") + _, mock_session = _patched_session_client(ClientOptions()) + _, kwargs = mock_session.call_args + assert kwargs.get("trust_env") is False + assert "proxy" not in kwargs + + +def test_trust_env_true_explicit_proxy_emits_warning_aiohttp() -> None: + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _, mock_session = _patched_session_client( + ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + _, kwargs = mock_session.call_args + assert any( + issubclass(w.category, UserWarning) + and "proxy" in str(w.message) + and "trust_env" in str(w.message) + for w in caught + ) + assert kwargs.get("trust_env") is False + + +def test_trust_env_true_explicit_proxy_session_kwargs_force_false_aiohttp() -> None: + with warnings.catch_warnings(record=True): + warnings.simplefilter("always") + client, _ = _patched_session_client( + ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + session_kw = client._session_kwargs() + assert session_kw.get("trust_env") is False + + +def test_trust_env_true_proxy_dict_no_matching_scheme_does_not_disable_trust_env() -> ( + None +): + # proxy={"http": ...} against an https:// API server — aiohttp applies no + # effective proxy (wrong scheme). trust_env must NOT be force-disabled just + # because the raw options.proxy dict is non-None. + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _, mock_session = _patched_session_client( + ClientOptions(proxy={"http": "http://corp.proxy:8080"}, trust_env=True) + ) + _, kwargs = mock_session.call_args + # "no entry for URL scheme" warning must fire (from _apply_aiohttp_proxy) + assert any( + issubclass(w.category, UserWarning) + and "no entry for URL scheme" in str(w.message) + for w in caught + ) + # conflict warning must NOT fire — no proxy was actually applied + assert not any( + issubclass(w.category, UserWarning) + and "trust_env" in str(w.message) + and "proxy" in str(w.message) + and "ignored" in str(w.message) + for w in caught + ) + assert kwargs.get("trust_env") is True diff --git a/test/test_client/test_aiohttp_websocket.py b/test/test_client/test_aiohttp_websocket.py index 3005c4f..3603004 100644 --- a/test/test_client/test_aiohttp_websocket.py +++ b/test/test_client/test_aiohttp_websocket.py @@ -1,7 +1,9 @@ from __future__ import annotations from typing import Any, AsyncGenerator +from unittest.mock import MagicMock +import aiohttp import pytest from aiohttp import WSMsgType, web from aiohttp.test_utils import TestServer @@ -570,6 +572,25 @@ async def close(self) -> None: await conn.close() +@pytest.mark.anyio +async def test_trust_env_propagated_to_temp_ws_session( + ws_server: TestServer, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """trust_env is forwarded to the temp ClientSession created for WS upgrade with timeout override.""" + config = _client_config(ws_server) + async with AioHttpClient(config, ClientOptions(trust_env=True)) as client: + mock_cls = MagicMock(wraps=aiohttp.ClientSession) + monkeypatch.setattr("kubex.client.aiohttp.ClientSession", mock_cls) + request = Request(method="GET", url="/ws", timeout=None) + conn = await client.connect_websocket(request, ["v5.channel.k8s.io"]) + await conn.close() + + assert mock_cls.call_count == 1 + _, kwargs = mock_cls.call_args + assert kwargs.get("trust_env") is True + + @pytest.mark.anyio async def test_send_bytes_wraps_client_error_as_kubex_exception() -> None: """Generic ``ClientError`` during send must surface as diff --git a/test/test_client/test_httpx_options.py b/test/test_client/test_httpx_options.py index 2c761eb..00f9f12 100644 --- a/test/test_client/test_httpx_options.py +++ b/test/test_client/test_httpx_options.py @@ -1,5 +1,6 @@ from __future__ import annotations +import ssl import warnings from typing import Any from unittest.mock import AsyncMock, MagicMock, patch @@ -22,6 +23,16 @@ from kubex.core.request import Request # noqa: E402 +def _patched_client( + options: ClientOptions | None = None, +) -> tuple[HttpxClient, MagicMock]: + """Create an HttpxClient with httpx.AsyncClient patched; return (client, mock).""" + with patch("kubex.client.httpx.httpx.AsyncClient") as mock_ac: + mock_ac.return_value = MagicMock() + client = HttpxClient(_config(), options) + return client, mock_ac + + def _config() -> ClientConfiguration: return ClientConfiguration( url="https://example.invalid", insecure_skip_tls_verify=True @@ -32,11 +43,6 @@ def _client(options: ClientOptions | None = None) -> HttpxClient: return HttpxClient(_config(), options) -# --------------------------------------------------------------------------- -# resolve_ws_max_message_size -# --------------------------------------------------------------------------- - - @pytest.mark.parametrize( "val,expected", [ @@ -49,57 +55,54 @@ def test_resolve_ws_max_message_size(val: Any, expected: int) -> None: assert resolve_ws_max_message_size(val) == expected -# --------------------------------------------------------------------------- -# _build_httpx_limits -# --------------------------------------------------------------------------- - - def test_build_httpx_limits_all_defaults_returns_empty() -> None: - opts = ClientOptions() - assert _build_httpx_limits(opts) == {} - - -def test_build_httpx_limits_keep_alive_false() -> None: - opts = ClientOptions(keep_alive=False) - kw = _build_httpx_limits(opts) - assert kw["max_keepalive_connections"] == 0 - - -def test_build_httpx_limits_pool_size_int() -> None: - opts = ClientOptions(pool_size=50) - kw = _build_httpx_limits(opts) - assert kw["max_connections"] == 50 + assert _build_httpx_limits(ClientOptions()) == {} -def test_build_httpx_limits_pool_size_none_unlimited() -> None: - opts = ClientOptions(pool_size=None) - kw = _build_httpx_limits(opts) - assert kw["max_connections"] is None - - -def test_build_httpx_limits_keep_alive_timeout_float() -> None: - opts = ClientOptions(keep_alive_timeout=30.0) - kw = _build_httpx_limits(opts) - assert kw["keepalive_expiry"] == 30.0 - - -def test_build_httpx_limits_keep_alive_timeout_none() -> None: - opts = ClientOptions(keep_alive_timeout=None) - kw = _build_httpx_limits(opts) - assert kw["keepalive_expiry"] is None - - -def test_build_httpx_limits_combined() -> None: - opts = ClientOptions(pool_size=20, keep_alive=False, keep_alive_timeout=60.0) - kw = _build_httpx_limits(opts) - assert kw["max_connections"] == 20 - assert kw["max_keepalive_connections"] == 0 - assert kw["keepalive_expiry"] == 60.0 - - -# --------------------------------------------------------------------------- -# _build_httpx_proxy_kwargs -# --------------------------------------------------------------------------- +@pytest.mark.parametrize( + "options,expected_subset", + [ + pytest.param( + ClientOptions(keep_alive=False), + {"max_keepalive_connections": 0}, + id="keep_alive_false", + ), + pytest.param( + ClientOptions(pool_size=50), + {"max_connections": 50}, + id="pool_size_int", + ), + pytest.param( + ClientOptions(pool_size=None), + {"max_connections": None}, + id="pool_size_none_unlimited", + ), + pytest.param( + ClientOptions(keep_alive_timeout=30.0), + {"keepalive_expiry": 30.0}, + id="keep_alive_timeout_float", + ), + pytest.param( + ClientOptions(keep_alive_timeout=None), + {"keepalive_expiry": None}, + id="keep_alive_timeout_none", + ), + pytest.param( + ClientOptions(pool_size=20, keep_alive=False, keep_alive_timeout=60.0), + { + "max_connections": 20, + "max_keepalive_connections": 0, + "keepalive_expiry": 60.0, + }, + id="combined", + ), + ], +) +def test_build_httpx_limits( + options: ClientOptions, expected_subset: dict[str, Any] +) -> None: + kw = _build_httpx_limits(options) + assert expected_subset.items() <= kw.items() def test_build_httpx_proxy_kwargs_none_returns_empty() -> None: @@ -144,11 +147,6 @@ def test_build_httpx_proxy_kwargs_dict_both_schemes() -> None: assert "https://" in mounts -# --------------------------------------------------------------------------- -# HttpxClient._create_inner_client — option wiring -# --------------------------------------------------------------------------- - - def _pool(client: HttpxClient) -> Any: """Navigate httpx.AsyncClient -> AsyncHTTPTransport -> httpcore pool.""" return client._inner_client._transport._pool @@ -217,81 +215,40 @@ def test_create_inner_client_no_proxy_by_default() -> None: assert isinstance(client._inner_client, httpx.AsyncClient) -def test_create_inner_client_buffer_size_not_ellipsis_warns() -> None: - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _client(ClientOptions(buffer_size=65536)) - assert any( - issubclass(w.category, UserWarning) and "buffer_size" in str(w.message) - for w in caught - ) - - -def test_create_inner_client_buffer_size_none_warns() -> None: - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _client(ClientOptions(buffer_size=None)) - assert any( - issubclass(w.category, UserWarning) and "buffer_size" in str(w.message) - for w in caught - ) - - -def test_create_inner_client_buffer_size_ellipsis_no_warn() -> None: - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _client(ClientOptions(buffer_size=...)) - buffer_warns = [ - w - for w in caught - if issubclass(w.category, UserWarning) and "buffer_size" in str(w.message) - ] - assert not buffer_warns - - -def test_create_inner_client_pool_size_per_host_not_ellipsis_warns() -> None: - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _client(ClientOptions(pool_size_per_host=10)) - assert any( - issubclass(w.category, UserWarning) and "pool_size_per_host" in str(w.message) - for w in caught - ) - - -def test_create_inner_client_pool_size_per_host_none_warns() -> None: - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _client(ClientOptions(pool_size_per_host=None)) - assert any( - issubclass(w.category, UserWarning) and "pool_size_per_host" in str(w.message) - for w in caught - ) - - -def test_create_inner_client_pool_size_per_host_ellipsis_no_warn() -> None: +# httpx has no equivalent for buffer_size / pool_size_per_host. Setting either +# to anything other than ``...`` must emit a UserWarning mentioning the field; +# the default (``...``) must not. +@pytest.mark.parametrize( + "field,value,expect_warn", + [ + pytest.param("buffer_size", 65536, True, id="buffer_size_int_warns"), + pytest.param("buffer_size", None, True, id="buffer_size_none_warns"), + pytest.param("buffer_size", ..., False, id="buffer_size_ellipsis_no_warn"), + pytest.param("pool_size_per_host", 10, True, id="pool_size_per_host_int_warns"), + pytest.param( + "pool_size_per_host", None, True, id="pool_size_per_host_none_warns" + ), + pytest.param( + "pool_size_per_host", ..., False, id="pool_size_per_host_ellipsis_no_warn" + ), + ], +) +def test_create_inner_client_unsupported_field_warns( + field: str, value: Any, expect_warn: bool +) -> None: with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always") - _client(ClientOptions(pool_size_per_host=...)) - per_host_warns = [ + _client(ClientOptions(**{field: value})) + matching = [ w for w in caught - if issubclass(w.category, UserWarning) - and "pool_size_per_host" in str(w.message) + if issubclass(w.category, UserWarning) and field in str(w.message) ] - assert not per_host_warns - - -# --------------------------------------------------------------------------- -# connect_websocket — ws_max_message_size -# --------------------------------------------------------------------------- + assert bool(matching) is expect_warn -@pytest.mark.anyio -async def test_connect_websocket_default_max_message_size() -> None: - client = _client(ClientOptions()) +async def _connect_ws_and_get_kwargs(client: HttpxClient) -> dict[str, Any]: request = Request(method="GET", url="/ws") - with patch("httpx_ws.aconnect_ws") as mock_connect: mock_session = MagicMock() mock_session.subprotocol = "v5.channel.k8s.io" @@ -299,12 +256,26 @@ async def test_connect_websocket_default_max_message_size() -> None: mock_cm.__aenter__ = AsyncMock(return_value=mock_session) mock_cm.__aexit__ = AsyncMock(return_value=None) mock_connect.return_value = mock_cm - conn = await client.connect_websocket(request, ["v5.channel.k8s.io"]) await conn.close() - _, kwargs = mock_connect.call_args - assert kwargs.get("max_message_size_bytes") == 2**21 + return dict(kwargs) + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "ws_max_message_size,expected", + [ + pytest.param(..., 2**21, id="default_kubex_default"), + pytest.param(8 * 1024 * 1024, 8 * 1024 * 1024, id="explicit_int"), + ], +) +async def test_connect_websocket_forwards_max_message_size( + ws_max_message_size: Any, expected: int +) -> None: + client = _client(ClientOptions(ws_max_message_size=ws_max_message_size)) + kwargs = await _connect_ws_and_get_kwargs(client) + assert kwargs.get("max_message_size_bytes") == expected @pytest.mark.anyio @@ -318,49 +289,10 @@ async def test_connect_websocket_none_max_message_size_warns_and_skips_param() - for w in caught ) - request = Request(method="GET", url="/ws") - - with patch("httpx_ws.aconnect_ws") as mock_connect: - mock_session = MagicMock() - mock_session.subprotocol = "v5.channel.k8s.io" - mock_cm = AsyncMock() - mock_cm.__aenter__ = AsyncMock(return_value=mock_session) - mock_cm.__aexit__ = AsyncMock(return_value=None) - mock_connect.return_value = mock_cm - - conn = await client.connect_websocket(request, ["v5.channel.k8s.io"]) - await conn.close() - - _, kwargs = mock_connect.call_args + kwargs = await _connect_ws_and_get_kwargs(client) assert "max_message_size_bytes" not in kwargs -@pytest.mark.anyio -async def test_connect_websocket_explicit_max_message_size() -> None: - explicit_size = 8 * 1024 * 1024 - client = _client(ClientOptions(ws_max_message_size=explicit_size)) - request = Request(method="GET", url="/ws") - - with patch("httpx_ws.aconnect_ws") as mock_connect: - mock_session = MagicMock() - mock_session.subprotocol = "v5.channel.k8s.io" - mock_cm = AsyncMock() - mock_cm.__aenter__ = AsyncMock(return_value=mock_session) - mock_cm.__aexit__ = AsyncMock(return_value=None) - mock_connect.return_value = mock_cm - - conn = await client.connect_websocket(request, ["v5.channel.k8s.io"]) - await conn.close() - - _, kwargs = mock_connect.call_args - assert kwargs.get("max_message_size_bytes") == explicit_size - - -# --------------------------------------------------------------------------- -# Regression: ClientOptions() defaults produce identical behavior to pre-option code -# --------------------------------------------------------------------------- - - def test_regression_defaults_no_limits_no_proxy() -> None: with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always") @@ -379,3 +311,177 @@ def test_regression_defaults_no_limits_no_proxy() -> None: assert pool._max_connections == 100 assert pool._max_keepalive_connections == 20 assert pool._keepalive_expiry == 5.0 + + +def test_trust_env_false_default_passed_to_async_client() -> None: + _, mock_ac = _patched_client(ClientOptions()) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is False + + +def test_trust_env_false_default_overrides_httpx_library_default_with_env_set( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", "http://leak.example.com:3128") + _, mock_ac = _patched_client(ClientOptions()) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is False + assert "proxy" not in kwargs + assert "mounts" not in kwargs + + +def test_trust_env_true_passed_to_async_client() -> None: + _, mock_ac = _patched_client(ClientOptions(trust_env=True)) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is True + assert "auth" not in kwargs + + +def test_trust_env_true_explicit_proxy_emits_warning() -> None: + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _, mock_ac = _patched_client( + ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert any( + issubclass(w.category, UserWarning) + and "proxy" in str(w.message) + and "trust_env" in str(w.message) + for w in caught + ) + assert kwargs.get("proxy") == "http://corp.proxy:8080" + assert kwargs.get("trust_env") is False + + +def test_trust_env_true_explicit_proxy_preserves_ssl_cert_file( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # When trust_env=True + explicit proxy conflict, trust_env=False is passed + # to httpx which also suppresses SSL_CERT_FILE. kubex must apply SSL_CERT_FILE + # manually so callers relying on env-provided CA bundles are not silently + # downgraded to certifi. Use a config without insecure_skip_tls_verify so + # needs_custom_ssl=False and the SSL_CERT_FILE branch is exercised (not + # masked by the pre-existing custom-ssl path). + monkeypatch.setenv("SSL_CERT_FILE", "/path/to/custom_ca.pem") + monkeypatch.delenv("SSL_CERT_DIR", raising=False) + fake_ssl_ctx = MagicMock(spec=ssl.SSLContext) + cfg = ClientConfiguration(url="https://example.invalid") + with warnings.catch_warnings(record=True): + warnings.simplefilter("always") + with ( + patch("kubex.client.httpx.httpx.AsyncClient") as mock_ac, + patch( + "kubex.client.httpx.ssl.create_default_context", + return_value=fake_ssl_ctx, + ) as mock_ctx, + ): + mock_ac.return_value = MagicMock() + HttpxClient( + cfg, ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is False + assert kwargs.get("verify") is fake_ssl_ctx + mock_ctx.assert_called_once_with(cafile="/path/to/custom_ca.pem", capath=None) + + +def test_trust_env_true_explicit_proxy_preserves_ssl_cert_dir( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # SSL_CERT_DIR set (no SSL_CERT_FILE) — capath must be forwarded to + # ssl.create_default_context when trust_env=True + explicit proxy conflict. + monkeypatch.delenv("SSL_CERT_FILE", raising=False) + monkeypatch.setenv("SSL_CERT_DIR", "/path/to/cert/dir") + fake_ssl_ctx = MagicMock(spec=ssl.SSLContext) + cfg = ClientConfiguration(url="https://example.invalid") + with warnings.catch_warnings(record=True): + warnings.simplefilter("always") + with ( + patch("kubex.client.httpx.httpx.AsyncClient") as mock_ac, + patch( + "kubex.client.httpx.ssl.create_default_context", + return_value=fake_ssl_ctx, + ) as mock_ctx, + ): + mock_ac.return_value = MagicMock() + HttpxClient( + cfg, ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is False + assert kwargs.get("verify") is fake_ssl_ctx + mock_ctx.assert_called_once_with(cafile=None, capath="/path/to/cert/dir") + + +def test_trust_env_true_explicit_proxy_no_ssl_cert_file_keeps_bool_verify( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # When trust_env=True + explicit proxy but no SSL_CERT_FILE env var set, + # our new SSL_CERT_FILE path is not triggered — verify stays as the bool + # True that was set before the conflict branch runs. Use a config without + # insecure_skip_tls_verify so needs_custom_ssl=False and _verify=True. + monkeypatch.delenv("SSL_CERT_FILE", raising=False) + monkeypatch.delenv("SSL_CERT_DIR", raising=False) + cfg = ClientConfiguration(url="https://example.invalid") + with warnings.catch_warnings(record=True): + warnings.simplefilter("always") + with patch("kubex.client.httpx.httpx.AsyncClient") as mock_ac: + mock_ac.return_value = MagicMock() + HttpxClient( + cfg, ClientOptions(proxy="http://corp.proxy:8080", trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert kwargs.get("trust_env") is False + assert kwargs.get("verify") is True + + +def test_trust_env_true_no_explicit_proxy_invokes_env_resolver() -> None: + fake_proxy = httpx.Proxy(url="http://proxy.corp:3128", auth=("alice", "s3cret")) + resolver_result: dict[str, Any] = {"proxy": fake_proxy} + with patch( + "kubex.client.httpx._resolve_env_proxy_with_netrc", + return_value=resolver_result, + ) as mock_resolver: + _, mock_ac = _patched_client(ClientOptions(trust_env=True)) + mock_resolver.assert_called_once_with("https://example.invalid") + _, kwargs = mock_ac.call_args + assert kwargs.get("proxy") == fake_proxy + assert kwargs.get("trust_env") is True + + +def test_trust_env_true_explicit_proxy_dict_matching_scheme_emits_warning() -> None: + # Dict proxy with a key matching the base URL scheme (https) conflicts with + # trust_env=True → warning emitted and trust_env forced False (mirrors aiohttp). + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _, mock_ac = _patched_client( + ClientOptions(proxy={"https": "http://corp.proxy:8080"}, trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert any( + issubclass(w.category, UserWarning) + and "proxy" in str(w.message) + and "trust_env" in str(w.message) + for w in caught + ) + assert kwargs.get("trust_env") is False + + +def test_trust_env_true_proxy_dict_no_matching_scheme_emits_warning() -> None: + # Dict proxy only for http:// while the base URL is https:// — any explicit + # proxy conflicts with trust_env=True regardless of scheme, so a warning is + # emitted and trust_env is forced False. Mirrors the aiohttp backend. + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _, mock_ac = _patched_client( + ClientOptions(proxy={"http": "http://corp.proxy:8080"}, trust_env=True) + ) + _, kwargs = mock_ac.call_args + assert any( + issubclass(w.category, UserWarning) + and "proxy" in str(w.message) + and "trust_env" in str(w.message) + for w in caught + ) + assert kwargs.get("trust_env") is False diff --git a/test/test_client/test_options.py b/test/test_client/test_options.py index 92651f5..37bb03c 100644 --- a/test/test_client/test_options.py +++ b/test/test_client/test_options.py @@ -18,19 +18,60 @@ def test_defaults() -> None: assert opts.ws_max_message_size is Ellipsis assert opts.pool_size is Ellipsis assert opts.pool_size_per_host is Ellipsis + assert opts.trust_env is False -def test_explicit_none_preserved() -> None: - opts = ClientOptions(timeout=None) - assert opts.timeout is None +def test_log_api_warnings_false() -> None: + assert ClientOptions(log_api_warnings=False).log_api_warnings is False + + +# Fields that share the ``... | None | `` shape: ``...`` (default) +# means "use library default", ``None`` means "explicitly disable / unlimited", +# and a positive value is the explicit override. +_OPTIONAL_OVERRIDE_FIELDS = [ + "timeout", + "keep_alive_timeout", + "buffer_size", + "ws_max_message_size", + "pool_size", + "pool_size_per_host", +] -def test_log_api_warnings_false() -> None: - opts = ClientOptions(log_api_warnings=False) - assert opts.log_api_warnings is False +@pytest.mark.parametrize("field", _OPTIONAL_OVERRIDE_FIELDS) +def test_optional_override_field_defaults_to_ellipsis(field: str) -> None: + assert getattr(ClientOptions(), field) is Ellipsis + + +@pytest.mark.parametrize("field", _OPTIONAL_OVERRIDE_FIELDS) +def test_optional_override_field_explicit_ellipsis_roundtrip(field: str) -> None: + assert getattr(ClientOptions(**{field: ...}), field) is Ellipsis + + +@pytest.mark.parametrize("field", _OPTIONAL_OVERRIDE_FIELDS) +def test_optional_override_field_none_preserved(field: str) -> None: + assert getattr(ClientOptions(**{field: None}), field) is None + +@pytest.mark.parametrize( + "field,value,expected", + [ + pytest.param("keep_alive_timeout", 30, 30.0, id="keep_alive_timeout_int"), + pytest.param("keep_alive_timeout", 60.5, 60.5, id="keep_alive_timeout_float"), + pytest.param("keep_alive_timeout", 0, 0.0, id="keep_alive_timeout_zero"), + pytest.param("buffer_size", 4096, 4096, id="buffer_size"), + pytest.param("ws_max_message_size", 2**21, 2**21, id="ws_max_message_size"), + pytest.param("pool_size", 50, 50, id="pool_size"), + pytest.param("pool_size_per_host", 10, 10, id="pool_size_per_host"), + ], +) +def test_optional_override_field_positive_value_preserved( + field: str, value: object, expected: object +) -> None: + assert getattr(ClientOptions(**{field: value}), field) == expected -_NORMALIZE_CASES = [ + +_TIMEOUT_NORMALIZE_CASES = [ pytest.param(3, Timeout(total=3.0), id="int"), pytest.param(2.5, Timeout(total=2.5), id="float"), pytest.param( @@ -39,7 +80,7 @@ def test_log_api_warnings_false() -> None: ] -@pytest.mark.parametrize("input_val,expected", _NORMALIZE_CASES) +@pytest.mark.parametrize("input_val,expected", _TIMEOUT_NORMALIZE_CASES) def test_timeout_normalized(input_val: object, expected: Timeout) -> None: opts = ClientOptions(timeout=input_val) if isinstance(input_val, Timeout): @@ -48,59 +89,35 @@ def test_timeout_normalized(input_val: object, expected: Timeout) -> None: assert opts.timeout == expected -def test_ellipsis_sentinel_roundtrip() -> None: - opts = ClientOptions(timeout=...) - assert opts.timeout is Ellipsis - - -def test_bogus_timeout_raises() -> None: - with pytest.raises(ValidationError): - ClientOptions(timeout="bogus") - - -@pytest.mark.parametrize("value", [True, False]) -def test_bool_timeout_raises(value: bool) -> None: +@pytest.mark.parametrize( + "bad_val", + [ + pytest.param("bogus", id="non_numeric_string"), + pytest.param(True, id="bool_true"), + pytest.param(False, id="bool_false"), + ], +) +def test_timeout_invalid_raises(bad_val: object) -> None: with pytest.raises(ValidationError): - ClientOptions(timeout=value) - - -# --------------------------------------------------------------------------- -# proxy field -# --------------------------------------------------------------------------- - - -def test_proxy_default_none() -> None: - assert ClientOptions().proxy is None - - -def test_proxy_none_preserved() -> None: - assert ClientOptions(proxy=None).proxy is None - - -def test_proxy_str_preserved() -> None: - url = "http://proxy.example.com:8080" - assert ClientOptions(proxy=url).proxy == url - - -def test_proxy_str_with_userinfo() -> None: - url = "http://user:pass@proxy.example.com:8080" - assert ClientOptions(proxy=url).proxy == url + ClientOptions(timeout=bad_val) @pytest.mark.parametrize( - "proxy_dict", + "proxy", [ - pytest.param({"http": "http://p.example.com"}, id="http_only"), - pytest.param({"https": "http://p.example.com"}, id="https_only"), + pytest.param(None, id="none"), + pytest.param("http://proxy.example.com:8080", id="str_no_userinfo"), + pytest.param("http://user:pass@proxy.example.com:8080", id="str_with_userinfo"), + pytest.param({"http": "http://p.example.com"}, id="dict_http_only"), + pytest.param({"https": "http://p.example.com"}, id="dict_https_only"), pytest.param( {"http": "http://p.example.com", "https": "http://p.example.com"}, - id="both_schemes", + id="dict_both_schemes", ), ], ) -def test_proxy_dict_valid(proxy_dict: dict[str, str]) -> None: - opts = ClientOptions(proxy=proxy_dict) - assert opts.proxy == proxy_dict +def test_proxy_valid(proxy: object) -> None: + assert ClientOptions(proxy=proxy).proxy == proxy @pytest.mark.parametrize( @@ -123,48 +140,9 @@ def test_proxy_invalid_raises(bad_proxy: object) -> None: ClientOptions(proxy=bad_proxy) -# --------------------------------------------------------------------------- -# keep_alive field -# --------------------------------------------------------------------------- - - -def test_keep_alive_default_true() -> None: - assert ClientOptions().keep_alive is True - - -def test_keep_alive_false() -> None: - assert ClientOptions(keep_alive=False).keep_alive is False - - -# --------------------------------------------------------------------------- -# keep_alive_timeout field -# --------------------------------------------------------------------------- - - -def test_keep_alive_timeout_default_ellipsis() -> None: - assert ClientOptions().keep_alive_timeout is Ellipsis - - -def test_keep_alive_timeout_ellipsis_roundtrip() -> None: - assert ClientOptions(keep_alive_timeout=...).keep_alive_timeout is Ellipsis - - -def test_keep_alive_timeout_none_preserved() -> None: - assert ClientOptions(keep_alive_timeout=None).keep_alive_timeout is None - - -@pytest.mark.parametrize( - "val,expected", - [ - pytest.param(30, 30.0, id="int_coerced_to_float"), - pytest.param(60.5, 60.5, id="float_preserved"), - pytest.param(0.0, 0.0, id="zero_allowed"), - pytest.param(0, 0.0, id="zero_int_allowed"), - ], -) -def test_keep_alive_timeout_valid(val: object, expected: float) -> None: - opts = ClientOptions(keep_alive_timeout=val) - assert opts.keep_alive_timeout == expected +@pytest.mark.parametrize("value", [True, False]) +def test_keep_alive_valid(value: bool) -> None: + assert ClientOptions(keep_alive=value).keep_alive is value @pytest.mark.parametrize( @@ -182,149 +160,44 @@ def test_keep_alive_timeout_invalid_raises(bad_val: object) -> None: ClientOptions(keep_alive_timeout=bad_val) -# --------------------------------------------------------------------------- -# buffer_size field -# --------------------------------------------------------------------------- - - -def test_buffer_size_default_ellipsis() -> None: - assert ClientOptions().buffer_size is Ellipsis - - -def test_buffer_size_ellipsis_roundtrip() -> None: - assert ClientOptions(buffer_size=...).buffer_size is Ellipsis - - -def test_buffer_size_none_preserved() -> None: - assert ClientOptions(buffer_size=None).buffer_size is None - - -def test_buffer_size_positive_int_preserved() -> None: - assert ClientOptions(buffer_size=4096).buffer_size == 4096 - - -@pytest.mark.parametrize( - "bad_val", - [ - pytest.param(True, id="bool_true"), - pytest.param(False, id="bool_false"), - pytest.param(0, id="zero"), - pytest.param(-1, id="negative"), - pytest.param(3.14, id="float"), - pytest.param("4096", id="string"), - ], -) -def test_buffer_size_invalid_raises(bad_val: object) -> None: - with pytest.raises(ValidationError): - ClientOptions(buffer_size=bad_val) - - -# --------------------------------------------------------------------------- -# ws_max_message_size field -# --------------------------------------------------------------------------- - - -def test_ws_max_message_size_default_ellipsis() -> None: - assert ClientOptions().ws_max_message_size is Ellipsis - - -def test_ws_max_message_size_ellipsis_roundtrip() -> None: - assert ClientOptions(ws_max_message_size=...).ws_max_message_size is Ellipsis - - -def test_ws_max_message_size_none_preserved() -> None: - assert ClientOptions(ws_max_message_size=None).ws_max_message_size is None - - -def test_ws_max_message_size_positive_int_preserved() -> None: - assert ClientOptions(ws_max_message_size=2**21).ws_max_message_size == 2**21 - - -@pytest.mark.parametrize( - "bad_val", - [ - pytest.param(True, id="bool_true"), - pytest.param(False, id="bool_false"), - pytest.param(0, id="zero"), - pytest.param(-1, id="negative"), - pytest.param(1.5, id="float"), - pytest.param("big", id="string"), - ], -) -def test_ws_max_message_size_invalid_raises(bad_val: object) -> None: - with pytest.raises(ValidationError): - ClientOptions(ws_max_message_size=bad_val) - - -# --------------------------------------------------------------------------- -# pool_size field -# --------------------------------------------------------------------------- - - -def test_pool_size_default_ellipsis() -> None: - assert ClientOptions().pool_size is Ellipsis - - -def test_pool_size_ellipsis_roundtrip() -> None: - assert ClientOptions(pool_size=...).pool_size is Ellipsis - - -def test_pool_size_none_preserved() -> None: - assert ClientOptions(pool_size=None).pool_size is None - - -def test_pool_size_positive_int_preserved() -> None: - assert ClientOptions(pool_size=50).pool_size == 50 +# ``buffer_size``, ``ws_max_message_size``, ``pool_size`` and +# ``pool_size_per_host`` share the same positive-int-or-sentinel validator. +_POSITIVE_INT_FIELDS = [ + "buffer_size", + "ws_max_message_size", + "pool_size", + "pool_size_per_host", +] +_POSITIVE_INT_INVALID = [ + pytest.param(True, id="bool_true"), + pytest.param(False, id="bool_false"), + pytest.param(0, id="zero"), + pytest.param(-1, id="negative"), + pytest.param(3.14, id="float"), + pytest.param("4096", id="string"), +] -@pytest.mark.parametrize( - "bad_val", - [ - pytest.param(True, id="bool_true"), - pytest.param(False, id="bool_false"), - pytest.param(0, id="zero"), - pytest.param(-5, id="negative"), - pytest.param(2.0, id="float"), - pytest.param("100", id="string"), - ], -) -def test_pool_size_invalid_raises(bad_val: object) -> None: +@pytest.mark.parametrize("field", _POSITIVE_INT_FIELDS) +@pytest.mark.parametrize("bad_val", _POSITIVE_INT_INVALID) +def test_positive_int_field_rejects_invalid(field: str, bad_val: object) -> None: with pytest.raises(ValidationError): - ClientOptions(pool_size=bad_val) - - -# --------------------------------------------------------------------------- -# pool_size_per_host field -# --------------------------------------------------------------------------- - - -def test_pool_size_per_host_default_ellipsis() -> None: - assert ClientOptions().pool_size_per_host is Ellipsis + ClientOptions(**{field: bad_val}) -def test_pool_size_per_host_ellipsis_roundtrip() -> None: - assert ClientOptions(pool_size_per_host=...).pool_size_per_host is Ellipsis - - -def test_pool_size_per_host_none_preserved() -> None: - assert ClientOptions(pool_size_per_host=None).pool_size_per_host is None - - -def test_pool_size_per_host_positive_int_preserved() -> None: - assert ClientOptions(pool_size_per_host=10).pool_size_per_host == 10 +@pytest.mark.parametrize("value", [True, False]) +def test_trust_env_valid(value: bool) -> None: + assert ClientOptions(trust_env=value).trust_env is value @pytest.mark.parametrize( "bad_val", [ - pytest.param(True, id="bool_true"), - pytest.param(False, id="bool_false"), - pytest.param(0, id="zero_rejected_use_none_instead"), - pytest.param(-1, id="negative"), - pytest.param(2.5, id="float"), - pytest.param("10", id="string"), + pytest.param("true", id="str_true"), + pytest.param(1, id="int_one"), + pytest.param(1.0, id="float_one"), ], ) -def test_pool_size_per_host_invalid_raises(bad_val: object) -> None: +def test_trust_env_rejects_non_bool(bad_val: object) -> None: with pytest.raises(ValidationError): - ClientOptions(pool_size_per_host=bad_val) + ClientOptions(trust_env=bad_val) diff --git a/test/test_client/test_trust_env.py b/test/test_client/test_trust_env.py new file mode 100644 index 0000000..7526d3d --- /dev/null +++ b/test/test_client/test_trust_env.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import os +import sys +from pathlib import Path + +import pytest + +pytest.importorskip("httpx") + +import httpx # noqa: E402 + +from kubex.client.httpx import _getenv_icase, _resolve_env_proxy_with_netrc # noqa: E402 + +_BASE_HTTPS = "https://apiserver.example.invalid:6443" +_BASE_HTTP = "http://apiserver.example.invalid:6443" +_PROXY_URL = "http://proxy.corp:3128" + + +@pytest.fixture(autouse=True) +def clean_proxy_env(monkeypatch: pytest.MonkeyPatch) -> None: + for var in [ + "http_proxy", + "HTTP_PROXY", + "https_proxy", + "HTTPS_PROXY", + "all_proxy", + "ALL_PROXY", + "no_proxy", + "NO_PROXY", + "NETRC", + ]: + monkeypatch.delenv(var, raising=False) + + +def test_resolve_env_proxy_with_netrc_no_env_var() -> None: + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {} + + +def test_resolve_env_proxy_with_netrc_url_has_creds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + proxy = "http://user:pw@host:3128" + monkeypatch.setenv("HTTPS_PROXY", proxy) + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": proxy} + + +def test_resolve_env_proxy_with_netrc_lookup( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine proxy.corp login alice password s3cret\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + + proxy = result.get("proxy") + assert isinstance(proxy, httpx.Proxy) + assert proxy.auth == ("alice", "s3cret") + assert proxy.url.host == "proxy.corp" + assert proxy.url.port == 3128 + + +def test_resolve_env_proxy_with_netrc_ipv6_host( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("HTTPS_PROXY", "http://[::1]:3128") + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine ::1 login alice password s3cret\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + + proxy = result.get("proxy") + assert isinstance(proxy, httpx.Proxy) + assert proxy.auth == ("alice", "s3cret") + + +def test_resolve_env_proxy_with_netrc_null_password( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + # netrc.authenticators() returns password=None when the password field is + # absent. The code coerces it to "" via `password or ""` so httpx.Proxy + # receives a valid (login, "") tuple rather than (login, None). + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine proxy.corp login alice\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + + proxy = result.get("proxy") + assert isinstance(proxy, httpx.Proxy) + assert proxy.auth == ("alice", "") + + +def test_resolve_env_proxy_with_netrc_no_matching_entry( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine other.host login user password pass\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_scheme_selection(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("HTTP_PROXY", "http://http-proxy.corp:3128") + monkeypatch.setenv("HTTPS_PROXY", "http://https-proxy.corp:3128") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": "http://https-proxy.corp:3128"} + + +def test_resolve_env_proxy_lowercase_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("https_proxy", "http://proxy.corp:3128") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": "http://proxy.corp:3128"} + + +def test_resolve_env_proxy_lowercase_wins_over_uppercase( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("https_proxy", "http://low.proxy:3128") + monkeypatch.setenv("HTTPS_PROXY", "http://up.proxy:3128") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": "http://low.proxy:3128"} + + +def test_resolve_env_proxy_all_proxy_fallback(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ALL_PROXY", "http://all-proxy.corp:3128") + + https_result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + http_result = _resolve_env_proxy_with_netrc(_BASE_HTTP) + assert https_result == {"proxy": "http://all-proxy.corp:3128"} + assert http_result == {"proxy": "http://all-proxy.corp:3128"} + + +def test_resolve_env_proxy_no_proxy_exact_host(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "apiserver.test") + + result = _resolve_env_proxy_with_netrc("https://apiserver.test") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_suffix_match( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", ".example.com") + + result = _resolve_env_proxy_with_netrc("https://api.example.com") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_bare_domain_matches_subdomains( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "example.com") + + result = _resolve_env_proxy_with_netrc("https://api.example.com") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_no_match(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", ".example.com") + + result = _resolve_env_proxy_with_netrc("https://api.other.com") + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_no_proxy_ip_literal( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "10.0.0.5") + + result = _resolve_env_proxy_with_netrc("https://10.0.0.5:6443") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_ip_no_cidr_support( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "10.0.0.0/24") + + result = _resolve_env_proxy_with_netrc("https://10.0.0.5:6443") + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_no_proxy_comma_separated( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "foo.com, .example.com, 10.0.0.5") + + result = _resolve_env_proxy_with_netrc("https://api.example.com") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_wildcard_all( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "*") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {} + + +def test_resolve_env_proxy_no_proxy_leading_dot_matches_domain_itself( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", ".example.com") + + result = _resolve_env_proxy_with_netrc("https://example.com:6443") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_entry_with_port_not_supported( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Port-qualified NO_PROXY entries (e.g. "example.com:6443") are NOT + # supported — consistent with Python stdlib / aiohttp behavior. + # Use a bare hostname entry ("example.com") instead. + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "example.com:6443") + + result = _resolve_env_proxy_with_netrc("https://example.com:6443") + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_no_proxy_bare_ipv6_matches( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "::1") + + result = _resolve_env_proxy_with_netrc("https://[::1]:6443") + assert result == {} + + +def test_resolve_env_proxy_no_proxy_bracketed_ipv6_not_supported( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Bracketed IPv6 (e.g. [::1]) in NO_PROXY is NOT supported — consistent + # with Python stdlib and aiohttp which do not strip brackets. + # Use the bare form (::1) instead. + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "[::1]") + + result = _resolve_env_proxy_with_netrc("https://[::1]:6443") + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_no_proxy_bracketed_ipv6_with_port_not_supported( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # [::1]:6443 in NO_PROXY is not supported (port qualifier + bracketed IPv6). + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("NO_PROXY", "[::1]:6443") + + result = _resolve_env_proxy_with_netrc("https://[::1]:6443") + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_netrc_parse_error_falls_back( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("not a valid netrc file content\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": _PROXY_URL} + + +@pytest.mark.skipif( + sys.platform == "win32" or (hasattr(os, "getuid") and os.getuid() == 0), + reason="chmod not reliable on Windows or as root", +) +def test_resolve_env_proxy_netrc_unreadable_falls_back( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine proxy.corp login alice password s3cret\n") + netrc_file.chmod(0o000) + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_empty_lowercase_suppresses_uppercase( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("https_proxy", "") + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {} + + +def test_resolve_env_proxy_empty_lowercase_does_not_suppress_all_proxy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Empty https_proxy suppresses HTTPS_PROXY but should still fall through + # to ALL_PROXY, matching urllib.request.getproxies_environment() semantics. + monkeypatch.setenv("https_proxy", "") + monkeypatch.setenv("ALL_PROXY", "http://all-proxy.corp:3128") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": "http://all-proxy.corp:3128"} + + +def test_resolve_env_proxy_cgi_safeguard_skips_uppercase_http_proxy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("REQUEST_METHOD", "GET") + monkeypatch.setenv("HTTP_PROXY", _PROXY_URL) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTP) + assert result == {} + + +def test_resolve_env_proxy_cgi_safeguard_allows_lowercase_http_proxy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("REQUEST_METHOD", "GET") + monkeypatch.setenv("http_proxy", _PROXY_URL) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTP) + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_no_proxy_empty_lowercase_suppresses_uppercase( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("no_proxy", "") + monkeypatch.setenv("NO_PROXY", "apiserver.example.invalid") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {"proxy": _PROXY_URL} + + +def test_resolve_env_proxy_mixed_case_proxy_var_picks_up_netrc( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + # Mixed-case env var (e.g. Https_Proxy) must be picked up so kubex can + # attach netrc creds — without this, httpx uses the proxy natively but + # skips netrc enrichment, causing 407 auth failures. + monkeypatch.setenv("Https_Proxy", _PROXY_URL) + netrc_file = tmp_path / "netrc" + netrc_file.write_text("machine proxy.corp login alice password s3cret\n") + monkeypatch.setenv("NETRC", str(netrc_file)) + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + + proxy = result.get("proxy") + assert isinstance(proxy, httpx.Proxy) + assert proxy.auth == ("alice", "s3cret") + + +def test_resolve_env_proxy_mixed_case_no_proxy_bypasses( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Mixed-case No_Proxy must still bypass the proxy. Without case-insensitive + # lookup, the resolver materialises an explicit proxy= kwarg and httpx never + # gets a chance to apply the bypass. + monkeypatch.setenv("HTTPS_PROXY", _PROXY_URL) + monkeypatch.setenv("No_Proxy", "apiserver.example.invalid") + + result = _resolve_env_proxy_with_netrc(_BASE_HTTPS) + assert result == {} + + +def test_getenv_icase_returns_actual_stored_key_not_normalized_lookup_form( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # _getenv_icase must return the key exactly as stored in os.environ, + # not the lowercase form used for the lookup. On Windows, os.environ is + # case-insensitive, so ``"http_proxy" in os.environ`` matches a stored + # ``HTTP_PROXY`` entry and would cause _getenv_icase to return the + # synthetic lowercase key — breaking the HTTPOXY guard which compares + # ``found_key == "HTTP_PROXY"``. + monkeypatch.setenv("HTTP_PROXY", _PROXY_URL) + key, val = _getenv_icase("http_proxy") + assert key == "HTTP_PROXY" + assert val == _PROXY_URL + + +def test_getenv_icase_lowercase_key_returned_as_is( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("https_proxy", _PROXY_URL) + key, val = _getenv_icase("https_proxy") + assert key == "https_proxy" + assert val == _PROXY_URL + + +def test_getenv_icase_missing_returns_none_key( + monkeypatch: pytest.MonkeyPatch, +) -> None: + key, val = _getenv_icase("https_proxy") + assert key is None + assert val == ""