Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 99 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ To install from pip:
pip install pydo
```

For async support, install with the `aio` extra:

```shell
pip install pydo[aio]
```

## **`pydo` Quickstart**

> A quick guide to getting started with the client.
Expand All @@ -36,6 +42,22 @@ from pydo import Client
client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))
```

For asynchronous operations, use the `AsyncClient`:

```python
import os
import asyncio
from pydo import AsyncClient

async def main():
client = AsyncClient(token=os.getenv("DIGITALOCEAN_TOKEN"))
# Use await for async operations
result = await client.ssh_keys.list()
print(result)

asyncio.run(main())
```

#### Example of Using `pydo` to Access DO Resources

Find below a working example for GETting a ssh_key ([per this http request](https://docs.digitalocean.com/reference/api/api-reference/#operation/sshKeys_list)) and printing the ID associated with the ssh key. If you'd like to try out this quick example, you can follow [these instructions](https://docs.digitalocean.com/products/droplets/how-to/add-ssh-keys/) to add ssh keys to your DO account.
Expand All @@ -62,10 +84,67 @@ ID: 123457, NAME: my_prod_ssh_key, FINGERPRINT: eb:76:c7:2a:d3:3e:80:5d:ef:2e:ca

**Note**: More working examples can be found [here](https://github.com/digitalocean/pydo/tree/main/examples).

#### Type Hints and Models

PyDo includes comprehensive type hints for better IDE support and type checking:

```python
from pydo import Client
from pydo.types import Droplet, SSHKey, DropletsResponse

client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))

# Type hints help with autocomplete and validation
droplets: DropletsResponse = client.droplets.list()
for droplet in droplets["droplets"]:
# droplet is properly typed as Droplet
print(f"ID: {droplet['id']}, Name: {droplet['name']}")

# Use specific types for better type safety
def process_droplet(droplet: Droplet) -> None:
print(f"Processing {droplet['name']} in {droplet['region']['slug']}")

# Available types: Droplet, SSHKey, Region, Size, Image, Volume, etc.
# Response types: DropletsResponse, SSHKeysResponse, etc.
```

#### Custom Exceptions for Better Error Handling

PyDo includes custom exceptions for better error handling and debugging:

```python
from pydo import Client
from pydo.exceptions import AuthenticationError, ResourceNotFoundError, RateLimitError

client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))

try:
# This will raise AuthenticationError if token is invalid
droplets = client.droplets.list()
except AuthenticationError as e:
print(f"Authentication failed: {e.message}")
except RateLimitError as e:
print(f"Rate limit exceeded: {e.message}")
except ResourceNotFoundError as e:
print(f"Resource not found: {e.message}")
except Exception as e:
print(f"Other error: {e}")

# Available exceptions:
# - AuthenticationError (401)
# - PermissionDeniedError (403)
# - ResourceNotFoundError (404)
# - ValidationError (400)
# - ConflictError (409)
# - RateLimitError (429)
# - ServerError (5xx)
# - ServiceUnavailableError (503)
```

#### Pagination Example

Below is an example on handling pagination. One must parse the URL to find the
next page.
##### Manual Pagination (Traditional Approach)
Below is an example of handling pagination manually by parsing URLs:

```python
import os
Expand All @@ -91,6 +170,24 @@ while paginated:
paginated = False
```

##### Automatic Pagination (New Helper Method)
The client now includes a `paginate()` helper method that automatically handles pagination:

```python
import os
from pydo import Client

client = Client(token=os.getenv("DIGITALOCEAN_TOKEN"))

# Automatically paginate through all SSH keys
for key in client.paginate(client.ssh_keys.list, per_page=50):
print(f"ID: {key['id']}, NAME: {key['name']}, FINGERPRINT: {key['fingerprint']}")

# Works with any paginated endpoint
for droplet in client.paginate(client.droplets.list):
print(f"Droplet: {droplet['name']} - {droplet['status']}")
```

#### Retries and Backoff

By default the client uses the same retry policy as the [Azure SDK for Python](https://learn.microsoft.com/en-us/python/api/azure-core/azure.core.pipeline.policies.retrypolicy?view=azure-python).
Expand Down
118 changes: 115 additions & 3 deletions src/pydo/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Generator, Any, Dict, Callable, Optional

from azure.core.credentials import AccessToken
from azure.core.exceptions import HttpResponseError

from pydo.custom_policies import CustomHttpLoggingPolicy
from pydo import GeneratedClient, _version
from pydo.aio import AsyncClient
from pydo import types
from pydo import exceptions

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
Expand All @@ -38,7 +42,115 @@ class Client(GeneratedClient): # type: ignore
:paramtype endpoint: str
"""

def __init__(self, token: str, *, timeout: int = 120, **kwargs):
def paginate(self, method: Callable[..., Dict[str, Any]], *args, **kwargs) -> Generator[Dict[str, Any], None, None]:
"""Automatically paginate through all results from a method that returns paginated data.

:param method: The method to call (e.g., self.droplets.list)
:param args: Positional arguments to pass to the method
:param kwargs: Keyword arguments to pass to the method
:return: Generator yielding all items from all pages
:rtype: Generator[Dict[str, Any], None, None]
"""
page = 1
per_page = kwargs.get('per_page', 20) # Default per_page if not specified

while True:
# Set the current page
kwargs['page'] = page
kwargs['per_page'] = per_page

# Call the method
result = method(*args, **kwargs)

# Yield items from this page
items_key = None
if hasattr(result, 'keys') and callable(getattr(result, 'keys')):
# Find the key that contains the list of items
for key in result.keys():
if key.endswith('s') and isinstance(result[key], list): # e.g., 'droplets', 'ssh_keys'
items_key = key
break

if items_key and items_key in result:
yield from result[items_key]
else:
# If we can't find the items key, yield the whole result once
yield result
break

# Check if there's a next page
links = result.get('links', {})
pages = links.get('pages', {})
if 'next' not in pages:
break

page += 1

@staticmethod
def _handle_http_error(error: HttpResponseError) -> exceptions.DigitalOceanError:
"""Convert HTTP errors to appropriate DigitalOcean custom exceptions.

:param error: The HttpResponseError from azure
:return: Appropriate DigitalOcean exception
:rtype: exceptions.DigitalOceanError
"""
status_code = getattr(error, 'status', None) or getattr(error.response, 'status_code', None)

if status_code == 401:
return exceptions.AuthenticationError(
"Authentication failed. Please check your API token.",
status_code=status_code,
response=error.response
)
elif status_code == 403:
return exceptions.PermissionDeniedError(
"Access forbidden. You don't have permission to perform this action.",
status_code=status_code,
response=error.response
)
elif status_code == 404:
return exceptions.ResourceNotFoundError(
"Resource not found. The requested resource does not exist.",
status_code=status_code,
response=error.response
)
elif status_code == 400:
return exceptions.ValidationError(
"Bad request. Please check your request parameters.",
status_code=status_code,
response=error.response
)
elif status_code == 409:
return exceptions.ConflictError(
"Conflict. The resource is in a state that conflicts with the request.",
status_code=status_code,
response=error.response
)
elif status_code == 429:
return exceptions.RateLimitError(
"Rate limit exceeded. Please wait before making more requests.",
status_code=status_code,
response=error.response
)
elif status_code and status_code >= 500:
return exceptions.ServerError(
"Server error. Please try again later.",
status_code=status_code,
response=error.response
)
elif status_code == 503:
return exceptions.ServiceUnavailableError(
"Service temporarily unavailable. Please try again later.",
status_code=status_code,
response=error.response
)
else:
# Fallback to generic DigitalOcean error
return exceptions.DigitalOceanError(
f"API request failed: {str(error)}",
status_code=status_code,
response=error.response
)
logger = kwargs.get("logger")
if logger is not None and kwargs.get("http_logging_policy") == "":
kwargs["http_logging_policy"] = CustomHttpLoggingPolicy(logger=logger)
Expand All @@ -49,7 +161,7 @@ def __init__(self, token: str, *, timeout: int = 120, **kwargs):
)


__all__ = ["Client"]
__all__ = ["Client", "AsyncClient", "types", "exceptions"]


def patch_sdk():
Expand Down
4 changes: 4 additions & 0 deletions src/pydo/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
_patch_all = []
from ._patch import patch_sdk as _patch_sdk

# Alias Client as AsyncClient for easier access
AsyncClient = Client

__all__ = [
"GeneratedClient",
"AsyncClient",
]
__all__.extend([p for p in _patch_all if p not in __all__])

Expand Down
50 changes: 50 additions & 0 deletions src/pydo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,53 @@
# Importing exceptions this way makes them accessible through this module.
# Therefore, obscuring azure packages from the end user
from azure.core.exceptions import HttpResponseError


class DigitalOceanError(Exception):
"""Base exception for all DigitalOcean API errors."""

def __init__(self, message: str, status_code: int = None, response=None):
super().__init__(message)
self.message = message
self.status_code = status_code
self.response = response


class AuthenticationError(DigitalOceanError):
"""Raised when authentication fails (401 Unauthorized)."""
pass


class PermissionDeniedError(DigitalOceanError):
"""Raised when access is forbidden (403 Forbidden)."""
pass


class ResourceNotFoundError(DigitalOceanError):
"""Raised when a requested resource is not found (404 Not Found)."""
pass


class RateLimitError(DigitalOceanError):
"""Raised when API rate limit is exceeded (429 Too Many Requests)."""
pass


class ServerError(DigitalOceanError):
"""Raised when the server encounters an error (5xx status codes)."""
pass


class ValidationError(DigitalOceanError):
"""Raised when request validation fails (400 Bad Request)."""
pass


class ConflictError(DigitalOceanError):
"""Raised when there's a conflict with the current state (409 Conflict)."""
pass


class ServiceUnavailableError(DigitalOceanError):
"""Raised when the service is temporarily unavailable (503 Service Unavailable)."""
pass
26 changes: 26 additions & 0 deletions src/pydo/operations/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from typing import TYPE_CHECKING

from ._operations import DropletsOperations as Droplets
from ._operations import KubernetesOperations as Kubernetes
from ._operations import InvoicesOperations as Invoices

if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
Expand All @@ -25,3 +27,27 @@ def patch_sdk():
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

# Fix kubernetes.get_kubeconfig to return raw YAML content instead of trying to parse as JSON
def _get_kubeconfig(self, cluster_id, **kwargs):
"""Get a Kubernetes config file for the specified cluster."""
# Call the original method but with raw response
response = self._client.get(
f"/v2/kubernetes/clusters/{cluster_id}/kubeconfig",
**kwargs
)
return response.content

Kubernetes.get_kubeconfig = _get_kubeconfig

# Fix invoices.get_pdf_by_uuid to return raw PDF content instead of trying to parse as JSON
def _get_pdf_by_uuid(self, invoice_uuid, **kwargs):
"""Get a PDF invoice by UUID."""
# Call the original method but with raw response
response = self._client.get(
f"/v2/customers/my/invoices/{invoice_uuid}/pdf",
**kwargs
)
return response.content

Invoices.get_pdf_by_uuid = _get_pdf_by_uuid
5 changes: 2 additions & 3 deletions tests/mocked/test_billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,11 @@ def test_get_invoice_pdf_by_uuid(mock_client: Client, mock_client_url):
responses.add(
responses.GET,
f"{mock_client_url}/v2/customers/my/invoices/1/pdf",
json=expected,
body=expected,
)
invoices = mock_client.invoices.get_pdf_by_uuid(invoice_uuid=1)
list_in = list(invoices)

assert "group_description" in str(list_in)
assert "group_description" in str(invoices)


@responses.activate
Expand Down
Loading