From 3260b8a85a48606d04346900fc3223799db0e286 Mon Sep 17 00:00:00 2001 From: Andrew Starr-Bochicchio Date: Wed, 4 Mar 2026 18:42:00 -0500 Subject: [PATCH] k8s: patch get_kubeconfig to work around deserialization issue --- README.md | 25 ------- src/pydo/aio/operations/_patch.py | 111 ++++++++++++++++++++++++++++-- src/pydo/operations/_patch.py | 107 ++++++++++++++++++++++++++-- tests/mocked/test_kubernetes.py | 101 +++++++++++++++++++++++---- 4 files changed, 296 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 5fdb4b4f..a620ee43 100644 --- a/README.md +++ b/README.md @@ -216,31 +216,6 @@ docker run -it --rm --name pydo -v $PWD/tests:/tests pydo:dev pytest tests/mocke > This selection lists the known issues of the client generator. -#### `kubernetes.get_kubeconfig` Does not serialize response content - -In the generated Python client, calling client.kubernetes.get_kubeconfig(cluster_id) raises a deserialization error when the response content-type is application/yaml. This occurs because the generator does not correctly handle YAML responses. We should investigate whether the OpenAPI spec or generator configuration can be adjusted to support this content-type. If not, the issue should be reported upstream to improve YAML support in client generation. - -Workaround (with std lib httplib): - -```python -from http.client import HTTPSConnection - -conn = HTTPSConnection('api.digitalocean.com') -conn.request( - 'GET', - f'/v2/kubernetes/clusters/{cluster_id}/kubeconfig', - headers={'Authorization': f'Bearer {os.environ["DIGITALOCEAN_TOKEN"]}'} -) -response = conn.getresponse() - -if response.getcode() > 400: - msg = 'Unable to get kubeconfig' - raise RuntimeError(msg) - -kube_config = response.read().decode('utf-8') -conn.close() -``` - #### `invoices.get_pdf_by_uuid(invoice_uuid=invoice_uuid_param)` Does not return PDF In the generated python client, when calling `invoices.get_pdf_by_uuid`, the response returns a Iterator[bytes] that does not format correctly into a PDF. diff --git a/src/pydo/aio/operations/_patch.py b/src/pydo/aio/operations/_patch.py index 39ea63f6..577ae7c4 100644 --- a/src/pydo/aio/operations/_patch.py +++ b/src/pydo/aio/operations/_patch.py @@ -6,15 +6,116 @@ Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast + +from azure.core.exceptions import ( + ClientAuthenticationError, + HttpResponseError, + ResourceExistsError, + ResourceNotFoundError, + ResourceNotModifiedError, + map_error, +) +from azure.core.pipeline import PipelineResponse +from azure.core.tracing.decorator_async import distributed_trace_async + +from ._operations import ( + KubernetesOperations as _KubernetesOperations, + build_kubernetes_get_kubeconfig_request, +) if TYPE_CHECKING: # pylint: disable=unused-import,ungrouped-imports - from typing import List + from typing import MutableMapping, Type + +__all__ = ("KubernetesOperations",) + + +# Override: generated client expects JSON but this endpoint returns application/yaml; +# we return the response body as str instead of deserializing. +class KubernetesOperations(_KubernetesOperations): + """Kubernetes operations.""" + + @distributed_trace_async + async def get_kubeconfig( + self, cluster_id: str, *, expiry_seconds: int = 0, **kwargs: Any + ) -> str: + """Retrieve the kubeconfig for a Kubernetes Cluster. + + This endpoint returns a kubeconfig file in YAML format. It can be used to + connect to and administer the cluster using the Kubernetes command line tool, + ``kubectl``, or other programs supporting kubeconfig files (e.g., client libraries). + + The resulting kubeconfig file uses token-based authentication for clusters + supporting it, and certificate-based authentication otherwise. For a list of + supported versions and more information, see "How to Connect to a DigitalOcean + Kubernetes Cluster" + https://docs.digitalocean.com/products/kubernetes/how-to/connect-to-cluster/ + + Clusters supporting token-based authentication may define an expiration by + passing a duration in seconds as a query parameter (expiry_seconds). + If not set or 0, then the token will have a 7 day expiry. The query parameter + has no impact in certificate-based authentication. + + Kubernetes Roles granted to a user with a token-based kubeconfig are derived from that user's + DigitalOcean role. Custom roles require additional configuration by a cluster administrator. + + :param cluster_id: A unique ID that can be used to reference a Kubernetes cluster. Required. + :type cluster_id: str + :keyword expiry_seconds: The duration in seconds that the returned Kubernetes credentials will + be valid. If not set or 0, the credentials will have a 7 day expiry. Default value is 0. + :paramtype expiry_seconds: int + :return: The kubeconfig file contents as a string (YAML). + :rtype: str + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: "MutableMapping[int, Type[HttpResponseError]]" = { + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + 401: cast( + "Type[HttpResponseError]", + lambda response: ClientAuthenticationError(response=response), + ), + 429: HttpResponseError, + 500: HttpResponseError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + _request = build_kubernetes_get_kubeconfig_request( + cluster_id=cluster_id, + expiry_seconds=expiry_seconds, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + # stream=True so the pipeline's content policy skips deserialization (API returns YAML, not JSON) + pipeline_response: PipelineResponse = ( + await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=True, **kwargs + ) + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + await response.read() + map_error( + status_code=response.status_code, + response=response, + error_map=error_map, + ) + raise HttpResponseError(response=response) -__all__ = ( - [] -) # type: List[str] # Add all objects you want publicly available to users at this package level + if hasattr(response, "read"): + body = await response.read() + else: + body = response.content + return body.decode("utf-8") if body else "" def patch_sdk(): diff --git a/src/pydo/operations/_patch.py b/src/pydo/operations/_patch.py index 8a843f7c..0f3b5480 100644 --- a/src/pydo/operations/_patch.py +++ b/src/pydo/operations/_patch.py @@ -6,16 +6,115 @@ Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast -from ._operations import DropletsOperations as Droplets +from azure.core.exceptions import ( + ClientAuthenticationError, + HttpResponseError, + ResourceExistsError, + ResourceNotFoundError, + ResourceNotModifiedError, + map_error, +) +from azure.core.pipeline import PipelineResponse +from azure.core.tracing.decorator import distributed_trace + +from ._operations import ( + KubernetesOperations as _KubernetesOperations, + build_kubernetes_get_kubeconfig_request, +) if TYPE_CHECKING: # pylint: disable=unused-import,ungrouped-imports - pass + from typing import MutableMapping, Type + + +__all__ = ["KubernetesOperations"] + + +# Override: generated client expects JSON but this endpoint returns application/yaml; +# we return the response body as str instead of deserializing. +class KubernetesOperations(_KubernetesOperations): + """Kubernetes operations.""" + + @distributed_trace + def get_kubeconfig( + self, cluster_id: str, *, expiry_seconds: int = 0, **kwargs: Any + ) -> str: + """Retrieve the kubeconfig for a Kubernetes Cluster. + + This endpoint returns a kubeconfig file in YAML format. It can be used to + connect to and administer the cluster using the Kubernetes command line tool, + ``kubectl``, or other programs supporting kubeconfig files (e.g., client libraries). + + The resulting kubeconfig file uses token-based authentication for clusters + supporting it, and certificate-based authentication otherwise. For a list of + supported versions and more information, see "How to Connect to a DigitalOcean + Kubernetes Cluster" + https://docs.digitalocean.com/products/kubernetes/how-to/connect-to-cluster/ + + Clusters supporting token-based authentication may define an expiration by + passing a duration in seconds as a query parameter (expiry_seconds). + If not set or 0, then the token will have a 7 day expiry. The query parameter + has no impact in certificate-based authentication. + + Kubernetes Roles granted to a user with a token-based kubeconfig are derived from that user's + DigitalOcean role. Custom roles require additional configuration by a cluster administrator. + + :param cluster_id: A unique ID that can be used to reference a Kubernetes cluster. Required. + :type cluster_id: str + :keyword expiry_seconds: The duration in seconds that the returned Kubernetes credentials will + be valid. If not set or 0, the credentials will have a 7 day expiry. Default value is 0. + :paramtype expiry_seconds: int + :return: The kubeconfig file contents as a string (YAML). + :rtype: str + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: "MutableMapping[int, Type[HttpResponseError]]" = { + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + 401: cast( + "Type[HttpResponseError]", + lambda response: ClientAuthenticationError(response=response), + ), + 429: HttpResponseError, + 500: HttpResponseError, + } + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + _request = build_kubernetes_get_kubeconfig_request( + cluster_id=cluster_id, + expiry_seconds=expiry_seconds, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + # stream=True so the pipeline's content policy skips deserialization (API returns YAML, not JSON). + # Without this, the policy raises DecodeError for application/yaml. See test_kubernetes_get_kubeconfig. + pipeline_response: PipelineResponse = ( + self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=True, **kwargs + ) + ) + + response = pipeline_response.http_response + if response.status_code not in [200]: + response.read() + map_error( + status_code=response.status_code, + response=response, + error_map=error_map, + ) + raise HttpResponseError(response=response) -__all__ = [] + body = response.read() if hasattr(response, "read") else response.content + return body.decode("utf-8") if body else "" def patch_sdk(): diff --git a/tests/mocked/test_kubernetes.py b/tests/mocked/test_kubernetes.py index 703ebb7c..b99a43e5 100644 --- a/tests/mocked/test_kubernetes.py +++ b/tests/mocked/test_kubernetes.py @@ -1,15 +1,28 @@ """Mocked tests for kubernetes resources.""" +import re import uuid +from unittest.mock import patch + import pytest import responses +from azure.core.exceptions import DecodeError, ResourceNotFoundError + from pydo import Client +from pydo.operations._operations import ( + KubernetesOperations as _GeneratedKubernetesOperations, +) from tests.mocked.data import kubernetes_data as data BASE_PATH = "v2/kubernetes/clusters" +# URL pattern for kubeconfig so the mock matches the actual request (with query string) +KUBECONFIG_URL_PATTERN = re.compile( + r"https://testing\.local/v2/kubernetes/clusters/[^/]+/kubeconfig\?expiry_seconds=0" +) + @responses.activate def test_kubernetes_list_clusters(mock_client: Client, mock_client_url): @@ -192,29 +205,89 @@ def test_kubernetes_destroy_associated_resources_dangerous( assert des_resp is None +def _kubeconfig_yaml_callback(_request): + """Return 200 with body as YAML and Content-Type: application/yaml only. + + The responses library can merge headers and yield e.g. "text/plain, + application/yaml", which the pipeline treats as text/plain and does not + raise DecodeError. A callback ensures the response has exactly + application/yaml so the content policy fails (matching real API behaviour). + """ + return (200, {"Content-Type": "application/yaml"}, data.KUBECONFIG) + + +@responses.activate +def test_kubernetes_get_kubeconfig_generated_raises_decode_error(mock_client: Client): + """Without our override, get_kubeconfig with application/yaml raises DecodeError. + + The mock uses a callback so Content-Type is exactly application/yaml (no + text/plain). That makes the pipeline's content policy try to deserialize and + raise DecodeError, matching the real API. Our override (stream=True + return + body as text) fixes this. If this test fails, the issue was likely fixed + upstream and the override can be removed. + """ + cluster_id = str(uuid.uuid4()) + # Regex so the mock matches the actual request URL including ?expiry_seconds=0 + responses.add_callback( + responses.GET, + KUBECONFIG_URL_PATTERN, + callback=_kubeconfig_yaml_callback, + ) + # Use generated (unpatched) operations so the pipeline tries to deserialize YAML + # pylint: disable=protected-access + generated_ops = _GeneratedKubernetesOperations( + mock_client.kubernetes._client, + mock_client._config, + mock_client._serialize, + mock_client._deserialize, + ) + with pytest.raises(DecodeError): + generated_ops.get_kubeconfig(cluster_id) + + @responses.activate -def test_kubernetes_get_kubeconfig(mock_client: Client, mock_client_url): - """Mock kubernetes get_kubeconfig operation.""" +def test_kubernetes_get_kubeconfig(mock_client: Client): + """Mock kubernetes get_kubeconfig operation. + The patched client returns the response body as a string (YAML). The mock uses + the same callback as test_kubernetes_get_kubeconfig_generated_raises_decode_error + (Content-Type: application/yaml only) so behaviour matches the real API. + """ cluster_id = str(uuid.uuid4()) expected = data.KUBECONFIG + responses.add_callback( + responses.GET, + KUBECONFIG_URL_PATTERN, + callback=_kubeconfig_yaml_callback, + ) + + # Regression: override must use stream=True or pipeline deserializes and fails + # pylint: disable=protected-access + pipeline = mock_client.kubernetes._client._pipeline + with patch.object(pipeline, "run", wraps=pipeline.run) as run_mock: + config_resp = mock_client.kubernetes.get_kubeconfig(cluster_id) + run_mock.assert_called_once() + assert run_mock.call_args[1]["stream"] is True, ( + "get_kubeconfig must call pipeline.run(..., stream=True) so the " + "content policy skips deserialization (API returns application/yaml)." + ) + assert isinstance(config_resp, str) + assert config_resp == expected + + +@responses.activate +def test_kubernetes_get_kubeconfig_404_raises(mock_client: Client, mock_client_url): + """get_kubeconfig raises ResourceNotFoundError on 404.""" + cluster_id = str(uuid.uuid4()) responses.add( responses.GET, f"{mock_client_url}/{BASE_PATH}/{cluster_id}/kubeconfig", - headers={"content-type": "application/yaml"}, - match=[ - responses.matchers.query_param_matcher({"expiry_seconds": 0}), - ], - status=200, - body=expected, + json={"id": "not_found", "message": "Cluster not found"}, + status=404, ) - - config_resp = mock_client.kubernetes.get_kubeconfig(cluster_id) - pytest.skip("The operation currently fails to return content.") - # TODO: investigate why the generated client doesn't return the response content - # It seems to be something to do with the yaml content type. - assert config_resp.decode("utf-8") == expected + with pytest.raises(ResourceNotFoundError): + mock_client.kubernetes.get_kubeconfig(cluster_id) @responses.activate