Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ A Python library that provides an opinionated interface for working with Google

## Features
- typing for GCS and BigQuery API
- standardized auth using service account credentials
- standardized auth using service account credentials or any `google-auth` credentials (ADC, impersonated, etc.)

### Google Cloud Storage (GCS)
- upload and download files with retry logic and async capabilities
Expand Down
3 changes: 2 additions & 1 deletion gcpde/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tenacity
from bigquery_schema_generator.generate_schema import SchemaGenerator
from google.api_core.exceptions import BadRequest, NotFound
from google.auth.credentials import Credentials as GoogleCredentials
from google.cloud import bigquery
from google.cloud.exceptions import Conflict
from google.oauth2.service_account import Credentials
Expand All @@ -29,7 +30,7 @@ class BigQueryClient:
def __init__(
self,
json_key: dict[str, str] | None = None,
credentials: Credentials | None = None,
credentials: GoogleCredentials | None = None,
client: bigquery.Client | None = None,
):
self.client = client or bigquery.Client(
Expand Down
99 changes: 77 additions & 22 deletions gcpde/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Protocol, Tuple

import google.auth.transport.requests
import tenacity
from aiohttp import ClientResponseError, ClientSession, ClientTimeout
from gcloud.aio.storage import Storage as AsyncStorageClient
from google.auth.credentials import Credentials as GoogleCredentials
from google.cloud.storage import Client as StorageClient
from google.oauth2 import service_account
from loguru import logger
Expand Down Expand Up @@ -81,11 +83,36 @@ def _build_file_name(dataset: str, datetime_partition: DateTimePartitions) -> st
return f"{dataset}__{datetime_partition}.jsonl"


class _CredentialsToken:
"""Adapter wrapping google-auth credentials for gcloud-aio-storage."""

def __init__(self, credentials: GoogleCredentials) -> None:
self._credentials = credentials

async def get(self) -> str:
"""Return a valid access token, refreshing if needed."""
if not self._credentials.valid:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
self._credentials.refresh,
google.auth.transport.requests.Request(), # type: ignore[no-untyped-call]
)
return self._credentials.token # type: ignore[return-value]

async def close(self) -> None:
"""No-op — nothing to close."""


def _get_gcs_client(
json_key: Dict[str, str], client: Optional[StorageClient] = None
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[StorageClient] = None,
) -> StorageClient:
credentials = service_account.Credentials.from_service_account_info(info=json_key) # type: ignore[no-untyped-call]
storage_client = (client or StorageClient)(credentials=credentials)
gcs_creds = credentials or service_account.Credentials.from_service_account_info( # type: ignore[no-untyped-call]
info=json_key
)
storage_client = (client or StorageClient)(credentials=gcs_creds)
return storage_client


Expand All @@ -106,6 +133,7 @@ def upload_file(
bucket_name: str,
file_name: str,
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[StorageClient] = None,
content_type: str = "application/json",
) -> None:
Expand All @@ -116,16 +144,17 @@ def upload_file(
bucket_name: the name of the dataset.
file_name: the name of the file with full path.
json_key: auth to connect to gcp.
credentials: google-auth credentials to connect to gcp.
client: client to user to make the API requests.
content_type: content type of the file.

"""
_check_auth_args(json_key=json_key, client=client)
_check_auth_args(json_key=json_key, credentials=credentials, client=client)
_upload_file(
content=content,
bucket_name=bucket_name,
file_name=file_name,
client=client or _get_gcs_client(json_key=json_key or {}),
client=client or _get_gcs_client(json_key=json_key, credentials=credentials),
content_type=content_type,
)

Expand All @@ -137,6 +166,7 @@ def add_records_to_dataset(
version: str = "1",
datetime_partition: Optional[DateTimePartitions] = None,
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[StorageClient] = None,
build_file_name: Optional[BuildFileNameProtocol] = None,
) -> None:
Expand All @@ -149,12 +179,13 @@ def add_records_to_dataset(
datetime_partition: datetime partition values for gcs path.
bucket_name: temporal partitioning for the object path.
json_key: json key with gcp credentials.
credentials: google-auth credentials to connect to gcp.
client: google storage client to use for connecting to the API.
build_file_name: optional callable that generates the file name.
If not provided, defaults to internal _build_file_name method.

"""
_check_auth_args(json_key=json_key, client=client)
_check_auth_args(json_key=json_key, credentials=credentials, client=client)
if not json_str_records:
logger.warning("No records to add! (empty collection given)")
return
Expand Down Expand Up @@ -182,7 +213,7 @@ def add_records_to_dataset(
content=jsonl_file_str,
bucket_name=bucket_name,
file_name=file_path + file_name,
client=client or _get_gcs_client(json_key=json_key or {}),
client=client or _get_gcs_client(json_key=json_key, credentials=credentials),
)
logger.info(f"File {file_name} created successfully!")

Expand Down Expand Up @@ -269,15 +300,15 @@ async def _async_download_files_handling_auth(
bucket_name: str,
file_paths: List[str],
json_key: Optional[Dict[str, str]],
credentials: Optional[GoogleCredentials],
client: Optional[AsyncStorageClient],
timeout: int,
) -> list[GCSDownloadedFile]:
_check_auth_args(json_key=json_key, client=client)
_check_auth_args(json_key=json_key, credentials=credentials, client=client)
session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout)
async with ClientSession(timeout=session_timeout) as session:
client = client or AsyncStorageClient(
service_file=io.StringIO(json.dumps(json_key)),
session=session,
client = client or _get_async_gcs_client(
json_key=json_key, credentials=credentials, session=session
)
return await _async_download_files(
bucket_name=bucket_name,
Expand All @@ -291,6 +322,7 @@ def download_files(
bucket_name: str,
file_paths: List[str],
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[AsyncStorageClient] = None,
timeout: int = 300,
) -> list[GCSDownloadedFile]:
Expand All @@ -300,6 +332,7 @@ def download_files(
bucket_name=bucket_name,
file_paths=file_paths,
json_key=json_key,
credentials=credentials,
client=client,
timeout=timeout,
)
Expand All @@ -315,14 +348,30 @@ def _deserialize_jsonl_files(files: list[GCSDownloadedFile]) -> ListJsonType:


def _check_auth_args(
json_key: Optional[Dict[str, str]], client: Optional[AsyncStorageClient]
json_key: Optional[Dict[str, str]],
credentials: Optional[GoogleCredentials],
client: Optional[object],
) -> None:
if not json_key and not client:
if json_key is None and credentials is None and client is None:
raise ValueError(
"You must provide either a json_key or a client to connect to gcs."
"You must provide either a json_key, credentials, "
"or a client to connect to gcs."
)


def _get_async_gcs_client(
json_key: Optional[Dict[str, str]],
credentials: Optional[GoogleCredentials],
session: ClientSession,
) -> AsyncStorageClient:
if credentials:
return AsyncStorageClient(token=_CredentialsToken(credentials), session=session) # type: ignore[arg-type]
return AsyncStorageClient(
service_file=io.StringIO(json.dumps(json_key)),
session=session,
)


async def _async_list_files(
bucket_name: str,
prefix: str,
Expand Down Expand Up @@ -372,18 +421,18 @@ async def _async_list_files_handling_auth(
bucket_name: str,
prefix: str,
json_key: Optional[Dict[str, str]],
credentials: Optional[GoogleCredentials],
client: Optional[AsyncStorageClient],
timeout: int,
api_params: Optional[Dict[str, Any]] = None,
updated_after: Optional[datetime] = None,
updated_before: Optional[datetime] = None,
) -> List[str]:
_check_auth_args(json_key=json_key, client=client)
_check_auth_args(json_key=json_key, credentials=credentials, client=client)
session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout)
async with ClientSession(timeout=session_timeout) as session:
client = client or AsyncStorageClient(
service_file=io.StringIO(json.dumps(json_key)),
session=session,
client = client or _get_async_gcs_client(
json_key=json_key, credentials=credentials, session=session
)
return await _async_list_files(
bucket_name=bucket_name,
Expand All @@ -399,6 +448,7 @@ def list_files(
bucket_name: str,
prefix: str,
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[AsyncStorageClient] = None,
timeout: int = 300,
api_params: Optional[Dict[str, Any]] = None,
Expand All @@ -411,6 +461,7 @@ def list_files(
bucket_name: the name of the dataset.
prefix: prefix to filter the files.
json_key: auth to connect to gcp.
credentials: google-auth credentials to connect to gcp.
client: client to user to make the API requests.
timeout: timeout for the API requests.
api_params: parameters for the API request (ref. https://cloud.google.com/storage/docs/json_api/v1/objects/list).
Expand All @@ -423,6 +474,7 @@ def list_files(
bucket_name=bucket_name,
prefix=prefix,
json_key=json_key,
credentials=credentials,
client=client,
timeout=timeout,
api_params=api_params,
Expand Down Expand Up @@ -464,15 +516,15 @@ async def _async_get_dataset(
bucket_name: str,
latest_partition_only: bool,
json_key: Optional[Dict[str, str]],
credentials: Optional[GoogleCredentials],
client: Optional[AsyncStorageClient],
timeout: int,
) -> ListJsonType:
_check_auth_args(json_key=json_key, client=client)
_check_auth_args(json_key=json_key, credentials=credentials, client=client)
session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout)
async with ClientSession(timeout=session_timeout) as session:
client = client or AsyncStorageClient(
service_file=io.StringIO(json.dumps(json_key)),
session=session,
client = client or _get_async_gcs_client(
json_key=json_key, credentials=credentials, session=session
)
file_paths = await _async_list_files(
bucket_name=bucket_name,
Expand Down Expand Up @@ -501,6 +553,7 @@ def get_dataset(
version: str = "1",
latest_partition_only: bool = False,
json_key: Optional[Dict[str, str]] = None,
credentials: Optional[GoogleCredentials] = None,
client: Optional[AsyncStorageClient] = None,
timeout: int = 300,
) -> ListJsonType:
Expand All @@ -512,6 +565,7 @@ def get_dataset(
bucket_name: the name of the dataset.
latest_partition_only: if True, return only the latest partition data.
json_key: auth to connect to gcp.
credentials: google-auth credentials to connect to gcp.
client: client to user to make the API requests.
timeout: timeout for the API requests.

Expand All @@ -524,6 +578,7 @@ def get_dataset(
bucket_name=bucket_name,
latest_partition_only=latest_partition_only,
json_key=json_key,
credentials=credentials,
client=client,
timeout=timeout,
)
Expand Down
Loading
Loading