From 48f1d64b421191d39ccaac49825b7fe1f3778a25 Mon Sep 17 00:00:00 2001 From: Guilherme de Amorim Date: Fri, 20 Mar 2026 21:38:54 -0300 Subject: [PATCH 1/4] :sparkles: support usage of credentials --- gcpde/bq.py | 3 ++- gcpde/sheets.py | 65 ++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/gcpde/bq.py b/gcpde/bq.py index b7c61cf..7a559c2 100644 --- a/gcpde/bq.py +++ b/gcpde/bq.py @@ -8,6 +8,7 @@ import tenacity from bigquery_schema_generator.generate_schema import SchemaGenerator from google.api_core.exceptions import BadRequest, NotFound +from google.auth.credentials import Credentials as GoogleCredentials from google.cloud import bigquery from google.cloud.exceptions import Conflict from google.oauth2.service_account import Credentials @@ -29,7 +30,7 @@ class BigQueryClient: def __init__( self, json_key: dict[str, str] | None = None, - credentials: Credentials | None = None, + credentials: GoogleCredentials | None = None, client: bigquery.Client | None = None, ): self.client = client or bigquery.Client( diff --git a/gcpde/sheets.py b/gcpde/sheets.py index 8ee55da..518ea4a 100644 --- a/gcpde/sheets.py +++ b/gcpde/sheets.py @@ -3,31 +3,53 @@ from typing import Optional import gspread +from google.auth.credentials import Credentials as GoogleCredentials from gspread import Spreadsheet, Worksheet from loguru import logger from gcpde.types import ListJsonType -def _open_document(document_id: str, json_key: dict[str, str]) -> Spreadsheet: - gc = gspread.service_account_from_dict(json_key) +def _open_document( + document_id: str, + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, +) -> Spreadsheet: + if json_key is None and credentials is None: + raise ValueError( + "You must provide either a json_key or credentials to connect to sheets." + ) + gc = ( + gspread.authorize(credentials) + if credentials + else gspread.service_account_from_dict(json_key) # type: ignore[arg-type] + ) return gc.open_by_key(document_id) def _open_sheet( - sheet_name: str, document_id: str, json_key: dict[str, str] + sheet_name: str, + document_id: str, + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, ) -> Worksheet: """Open a sheet from a document.""" - spreadsheet = _open_document(document_id=document_id, json_key=json_key) + spreadsheet = _open_document( + document_id=document_id, json_key=json_key, credentials=credentials + ) worksheet: Worksheet = spreadsheet.worksheet(sheet_name) return worksheet -def _get_worksheets(document_id: str, json_key: dict[str, str]) -> list[str]: +def _get_worksheets( + document_id: str, + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, +) -> list[str]: return [ ws.title for ws in _open_document( - document_id=document_id, json_key=json_key + document_id=document_id, json_key=json_key, credentials=credentials ).worksheets() ] @@ -37,7 +59,8 @@ def replace_from_records( sheet_name: str, records: ListJsonType, columns: list[str], - json_key: dict[str, str], + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, ) -> None: """Replace a target document content with new data. @@ -49,11 +72,15 @@ def replace_from_records( serialize. columns: name of the columns expected in the given records. json_key: json key with gcp credentials. + credentials: google-auth credentials to connect to gcp. """ logger.info(f"Document '{document_id}' update started ...") sheet = _open_sheet( - sheet_name=sheet_name, document_id=document_id, json_key=json_key + sheet_name=sheet_name, + document_id=document_id, + json_key=json_key, + credentials=credentials, ) sheet.clear() records_as_row = [[r[c] for c in columns] for r in records] @@ -64,7 +91,8 @@ def replace_from_records( def read_sheet( document_id: str, sheet_name: str, - json_key: dict[str, str], + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, head: int = 1, expected_headers: Optional[list[str]] = None, ) -> list[dict[str, str | None]]: @@ -74,6 +102,7 @@ def read_sheet( document_id: id for the document (can be retrieved from the url). sheet_name: name of the sheet to read in the document. json_key: json key with gcp credentials. + credentials: google-auth credentials to connect to gcp. head: (optional) Determines which row to use as keys, starting from 1 following the numeration of the spreadsheet. expected_headers: (optional) List of expected headers, they must be unique. @@ -84,7 +113,10 @@ def read_sheet( """ logger.info(f"Reading sheet '{sheet_name}' from document '{document_id}' ...") sheet = _open_sheet( - sheet_name=sheet_name, document_id=document_id, json_key=json_key + sheet_name=sheet_name, + document_id=document_id, + json_key=json_key, + credentials=credentials, ) records: ListJsonType = sheet.get_all_records( head=head, expected_headers=expected_headers @@ -98,15 +130,17 @@ def read_sheet( def read_sheets( document_id: str, - json_key: dict[str, str], + json_key: Optional[dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, sheet_names: list[str] | None = None, ) -> dict[str, list[dict[str, str | None]]]: """Get all data from all sheets in a document. Args: document_id: id for the document (can be retrieved from the url). - sheet_names: list of sheet names to read in the document. json_key: json key with gcp credentials. + credentials: google-auth credentials to connect to gcp. + sheet_names: list of sheet names to read in the document. Returns: Dict with each key being the sheet_name and the values as a list of records as @@ -114,11 +148,14 @@ def read_sheets( """ sheet_names = sheet_names or _get_worksheets( - document_id=document_id, json_key=json_key + document_id=document_id, json_key=json_key, credentials=credentials ) return { sheet_name: read_sheet( - document_id=document_id, sheet_name=sheet_name, json_key=json_key + document_id=document_id, + sheet_name=sheet_name, + json_key=json_key, + credentials=credentials, ) for sheet_name in sheet_names } From 9c0536924e1d0d5fc4ad8dc53ecad537c0508765 Mon Sep 17 00:00:00 2001 From: Guilherme de Amorim Date: Fri, 20 Mar 2026 21:40:00 -0300 Subject: [PATCH 2/4] :sparkles: allow usage of credentials - wrapper --- gcpde/gcs.py | 94 +++++++++++++++++++++++++++++++---------- tests/unit/test_gcs.py | 95 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 23 deletions(-) diff --git a/gcpde/gcs.py b/gcpde/gcs.py index 3c4e353..66e68d0 100644 --- a/gcpde/gcs.py +++ b/gcpde/gcs.py @@ -7,9 +7,11 @@ from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Protocol, Tuple +import google.auth.transport.requests import tenacity from aiohttp import ClientResponseError, ClientSession, ClientTimeout from gcloud.aio.storage import Storage as AsyncStorageClient +from google.auth.credentials import Credentials as GoogleCredentials from google.cloud.storage import Client as StorageClient from google.oauth2 import service_account from loguru import logger @@ -81,11 +83,31 @@ def _build_file_name(dataset: str, datetime_partition: DateTimePartitions) -> st return f"{dataset}__{datetime_partition}.jsonl" +class _CredentialsToken: + """Adapter wrapping google-auth credentials for gcloud-aio-storage.""" + + def __init__(self, credentials: GoogleCredentials) -> None: + self._credentials = credentials + + async def get(self) -> str: + """Return a valid access token, refreshing if needed.""" + if not self._credentials.valid: + self._credentials.refresh(google.auth.transport.requests.Request()) # type: ignore[no-untyped-call] + return self._credentials.token # type: ignore[return-value] + + async def close(self) -> None: + """No-op — nothing to close.""" + + def _get_gcs_client( - json_key: Dict[str, str], client: Optional[StorageClient] = None + json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, + client: Optional[StorageClient] = None, ) -> StorageClient: - credentials = service_account.Credentials.from_service_account_info(info=json_key) # type: ignore[no-untyped-call] - storage_client = (client or StorageClient)(credentials=credentials) + gcs_creds = credentials or service_account.Credentials.from_service_account_info( # type: ignore[no-untyped-call] + info=json_key + ) + storage_client = (client or StorageClient)(credentials=gcs_creds) return storage_client @@ -106,6 +128,7 @@ def upload_file( bucket_name: str, file_name: str, json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, client: Optional[StorageClient] = None, content_type: str = "application/json", ) -> None: @@ -116,16 +139,17 @@ def upload_file( bucket_name: the name of the dataset. file_name: the name of the file with full path. json_key: auth to connect to gcp. + credentials: google-auth credentials to connect to gcp. client: client to user to make the API requests. content_type: content type of the file. """ - _check_auth_args(json_key=json_key, client=client) + _check_auth_args(json_key=json_key, credentials=credentials, client=client) _upload_file( content=content, bucket_name=bucket_name, file_name=file_name, - client=client or _get_gcs_client(json_key=json_key or {}), + client=client or _get_gcs_client(json_key=json_key, credentials=credentials), content_type=content_type, ) @@ -137,6 +161,7 @@ def add_records_to_dataset( version: str = "1", datetime_partition: Optional[DateTimePartitions] = None, json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, client: Optional[StorageClient] = None, build_file_name: Optional[BuildFileNameProtocol] = None, ) -> None: @@ -149,12 +174,13 @@ def add_records_to_dataset( datetime_partition: datetime partition values for gcs path. bucket_name: temporal partitioning for the object path. json_key: json key with gcp credentials. + credentials: google-auth credentials to connect to gcp. client: google storage client to use for connecting to the API. build_file_name: optional callable that generates the file name. If not provided, defaults to internal _build_file_name method. """ - _check_auth_args(json_key=json_key, client=client) + _check_auth_args(json_key=json_key, credentials=credentials, client=client) if not json_str_records: logger.warning("No records to add! (empty collection given)") return @@ -182,7 +208,7 @@ def add_records_to_dataset( content=jsonl_file_str, bucket_name=bucket_name, file_name=file_path + file_name, - client=client or _get_gcs_client(json_key=json_key or {}), + client=client or _get_gcs_client(json_key=json_key, credentials=credentials), ) logger.info(f"File {file_name} created successfully!") @@ -269,15 +295,15 @@ async def _async_download_files_handling_auth( bucket_name: str, file_paths: List[str], json_key: Optional[Dict[str, str]], + credentials: Optional[GoogleCredentials], client: Optional[AsyncStorageClient], timeout: int, ) -> list[GCSDownloadedFile]: - _check_auth_args(json_key=json_key, client=client) + _check_auth_args(json_key=json_key, credentials=credentials, client=client) session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout) async with ClientSession(timeout=session_timeout) as session: - client = client or AsyncStorageClient( - service_file=io.StringIO(json.dumps(json_key)), - session=session, + client = client or _get_async_gcs_client( + json_key=json_key, credentials=credentials, session=session ) return await _async_download_files( bucket_name=bucket_name, @@ -291,6 +317,7 @@ def download_files( bucket_name: str, file_paths: List[str], json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, client: Optional[AsyncStorageClient] = None, timeout: int = 300, ) -> list[GCSDownloadedFile]: @@ -300,6 +327,7 @@ def download_files( bucket_name=bucket_name, file_paths=file_paths, json_key=json_key, + credentials=credentials, client=client, timeout=timeout, ) @@ -315,14 +343,30 @@ def _deserialize_jsonl_files(files: list[GCSDownloadedFile]) -> ListJsonType: def _check_auth_args( - json_key: Optional[Dict[str, str]], client: Optional[AsyncStorageClient] + json_key: Optional[Dict[str, str]], + credentials: Optional[GoogleCredentials], + client: Optional[object], ) -> None: - if not json_key and not client: + if json_key is None and credentials is None and client is None: raise ValueError( - "You must provide either a json_key or a client to connect to gcs." + "You must provide either a json_key, credentials, " + "or a client to connect to gcs." ) +def _get_async_gcs_client( + json_key: Optional[Dict[str, str]], + credentials: Optional[GoogleCredentials], + session: ClientSession, +) -> AsyncStorageClient: + if credentials: + return AsyncStorageClient(token=_CredentialsToken(credentials), session=session) # type: ignore[arg-type] + return AsyncStorageClient( + service_file=io.StringIO(json.dumps(json_key)), + session=session, + ) + + async def _async_list_files( bucket_name: str, prefix: str, @@ -372,18 +416,18 @@ async def _async_list_files_handling_auth( bucket_name: str, prefix: str, json_key: Optional[Dict[str, str]], + credentials: Optional[GoogleCredentials], client: Optional[AsyncStorageClient], timeout: int, api_params: Optional[Dict[str, Any]] = None, updated_after: Optional[datetime] = None, updated_before: Optional[datetime] = None, ) -> List[str]: - _check_auth_args(json_key=json_key, client=client) + _check_auth_args(json_key=json_key, credentials=credentials, client=client) session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout) async with ClientSession(timeout=session_timeout) as session: - client = client or AsyncStorageClient( - service_file=io.StringIO(json.dumps(json_key)), - session=session, + client = client or _get_async_gcs_client( + json_key=json_key, credentials=credentials, session=session ) return await _async_list_files( bucket_name=bucket_name, @@ -399,6 +443,7 @@ def list_files( bucket_name: str, prefix: str, json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, client: Optional[AsyncStorageClient] = None, timeout: int = 300, api_params: Optional[Dict[str, Any]] = None, @@ -411,6 +456,7 @@ def list_files( bucket_name: the name of the dataset. prefix: prefix to filter the files. json_key: auth to connect to gcp. + credentials: google-auth credentials to connect to gcp. client: client to user to make the API requests. timeout: timeout for the API requests. api_params: parameters for the API request (ref. https://cloud.google.com/storage/docs/json_api/v1/objects/list). @@ -423,6 +469,7 @@ def list_files( bucket_name=bucket_name, prefix=prefix, json_key=json_key, + credentials=credentials, client=client, timeout=timeout, api_params=api_params, @@ -464,15 +511,15 @@ async def _async_get_dataset( bucket_name: str, latest_partition_only: bool, json_key: Optional[Dict[str, str]], + credentials: Optional[GoogleCredentials], client: Optional[AsyncStorageClient], timeout: int, ) -> ListJsonType: - _check_auth_args(json_key=json_key, client=client) + _check_auth_args(json_key=json_key, credentials=credentials, client=client) session_timeout = ClientTimeout(total=None, sock_connect=timeout, sock_read=timeout) async with ClientSession(timeout=session_timeout) as session: - client = client or AsyncStorageClient( - service_file=io.StringIO(json.dumps(json_key)), - session=session, + client = client or _get_async_gcs_client( + json_key=json_key, credentials=credentials, session=session ) file_paths = await _async_list_files( bucket_name=bucket_name, @@ -501,6 +548,7 @@ def get_dataset( version: str = "1", latest_partition_only: bool = False, json_key: Optional[Dict[str, str]] = None, + credentials: Optional[GoogleCredentials] = None, client: Optional[AsyncStorageClient] = None, timeout: int = 300, ) -> ListJsonType: @@ -512,6 +560,7 @@ def get_dataset( bucket_name: the name of the dataset. latest_partition_only: if True, return only the latest partition data. json_key: auth to connect to gcp. + credentials: google-auth credentials to connect to gcp. client: client to user to make the API requests. timeout: timeout for the API requests. @@ -524,6 +573,7 @@ def get_dataset( bucket_name=bucket_name, latest_partition_only=latest_partition_only, json_key=json_key, + credentials=credentials, client=client, timeout=timeout, ) diff --git a/tests/unit/test_gcs.py b/tests/unit/test_gcs.py index 87ba5ed..6b413ca 100644 --- a/tests/unit/test_gcs.py +++ b/tests/unit/test_gcs.py @@ -5,6 +5,7 @@ import pytest import time_machine from gcloud.aio.storage import Storage as AsyncStorageClient +from google.auth.credentials import Credentials as GoogleCredentials from google.cloud.storage import Blob, Bucket, Client from gcpde import gcs @@ -302,10 +303,102 @@ def test__check_auth_args_exception(): # arrange json_key = None client = None + credentials = None # act and assert with pytest.raises(ValueError): - gcs._check_auth_args(json_key=json_key, client=client) + gcs._check_auth_args(json_key=json_key, credentials=credentials, client=client) + + +@pytest.mark.asyncio +async def test__credentials_token_get_valid(): + # arrange + mock_creds = mock.Mock(spec=GoogleCredentials) + mock_creds.valid = True + mock_creds.token = "my-access-token" + token = gcs._CredentialsToken(mock_creds) + + # act + result = await token.get() + + # assert + assert result == "my-access-token" + mock_creds.refresh.assert_not_called() + + +@pytest.mark.asyncio +@mock.patch("google.auth.transport.requests.Request", autospec=True) +async def test__credentials_token_get_expired_refreshes(mock_request_cls): + # arrange + mock_creds = mock.Mock(spec=GoogleCredentials) + mock_creds.valid = False + mock_creds.token = "refreshed-token" + token = gcs._CredentialsToken(mock_creds) + + # act + result = await token.get() + + # assert + assert result == "refreshed-token" + mock_creds.refresh.assert_called_once_with(mock_request_cls.return_value) + + +@pytest.mark.asyncio +async def test__credentials_token_close_is_noop(): + mock_creds = mock.Mock(spec=GoogleCredentials) + token = gcs._CredentialsToken(mock_creds) + await token.close() # should not raise + + +@mock.patch("google.cloud.storage.client.Client", autospec=True) +def test__get_gcs_client_with_credentials(mock_storage_client_cls): + # arrange + mock_creds = mock.Mock(spec=GoogleCredentials) + + # act + gcs._get_gcs_client(credentials=mock_creds, client=mock_storage_client_cls) + + # assert — credentials passed directly, no service account lookup + mock_storage_client_cls.assert_called_with(credentials=mock_creds) + + +@mock.patch("gcpde.gcs.AsyncStorageClient", autospec=True) +def test__get_async_gcs_client_with_credentials(mock_async_storage_cls): + # arrange + mock_creds = mock.Mock(spec=GoogleCredentials) + mock_session = mock.Mock() + + # act + gcs._get_async_gcs_client( + json_key=None, credentials=mock_creds, session=mock_session + ) + + # assert — token is a _CredentialsToken wrapping the credentials + call_kwargs = mock_async_storage_cls.call_args.kwargs + assert isinstance(call_kwargs["token"], gcs._CredentialsToken) + assert call_kwargs["session"] is mock_session + + +@mock.patch("gcpde.gcs.AsyncStorageClient", autospec=True) +def test__get_async_gcs_client_with_json_key(mock_async_storage_cls): + # arrange + mock_session = mock.Mock() + + # act + gcs._get_async_gcs_client( + json_key={"auth": "key"}, credentials=None, session=mock_session + ) + + # assert — service_file is used instead of token + call_kwargs = mock_async_storage_cls.call_args.kwargs + assert "service_file" in call_kwargs + assert call_kwargs["session"] is mock_session + + +def test__check_auth_args_passes_with_credentials(): + mock_creds = mock.Mock(spec=GoogleCredentials) + # should not raise + gcs._check_auth_args(json_key=None, credentials=mock_creds, client=None) def test_add_records_to_dataset_no_records(): From 8232d6e2c6283b2bd14a9d36eee25bd2141a6a0c Mon Sep 17 00:00:00 2001 From: Guilherme de Amorim Date: Fri, 20 Mar 2026 21:45:53 -0300 Subject: [PATCH 3/4] :bug: do not block event look during token refresh --- gcpde/gcs.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gcpde/gcs.py b/gcpde/gcs.py index 66e68d0..7815243 100644 --- a/gcpde/gcs.py +++ b/gcpde/gcs.py @@ -92,7 +92,12 @@ def __init__(self, credentials: GoogleCredentials) -> None: async def get(self) -> str: """Return a valid access token, refreshing if needed.""" if not self._credentials.valid: - self._credentials.refresh(google.auth.transport.requests.Request()) # type: ignore[no-untyped-call] + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + self._credentials.refresh, + google.auth.transport.requests.Request(), # type: ignore[no-untyped-call] + ) return self._credentials.token # type: ignore[return-value] async def close(self) -> None: From e416fdb6a7a2b0b7a76ed4fb5496db401b2b8841 Mon Sep 17 00:00:00 2001 From: Guilherme de Amorim Date: Fri, 20 Mar 2026 21:46:08 -0300 Subject: [PATCH 4/4] :memo: detail google-auth credentials --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bc88fb2..9a37fe9 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ A Python library that provides an opinionated interface for working with Google ## Features - typing for GCS and BigQuery API -- standardized auth using service account credentials +- standardized auth using service account credentials or any `google-auth` credentials (ADC, impersonated, etc.) ### Google Cloud Storage (GCS) - upload and download files with retry logic and async capabilities