diff --git a/examples/organization_token.py b/examples/organization_token.py new file mode 100644 index 0000000..202a255 --- /dev/null +++ b/examples/organization_token.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +""" +Organization Token Operations Example + +Demonstrates usage of all 6 organization token operations: +1. create() - Create a new organization token, replacing any existing token +2. create_with_options() - Create with options like expiration date and token type +3. read() - Read the organization token +4. read_with_options() - Read with options like token type +5. delete() - Delete the organization token +6. delete_with_options() - Delete with options like token type + +Usage: +- Modify organization names as needed for your environment +- Ensure you have proper TFE credentials and organization access +- Organization tokens are used for organization-level API access + +Prerequisites: +- Set TFE_TOKEN and TFE_ADDRESS environment variables +- You need an existing organization or admin permissions to create one +- Appropriate permissions to manage organization tokens +""" + +from datetime import datetime, timedelta + +# Add the src directory to the path +##sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) +from pytfe import TFEClient, TFEConfig +from pytfe.models import ( + OrganizationTokenCreateOptions, + OrganizationTokenDeleteOptions, + OrganizationTokenReadOptions, + TokenType, +) + + +def redact_token(token_value: str | None) -> str: + """Redact token value for safe display.""" + if not token_value: + return "None" + if len(token_value) <= 8: + return f"{'*' * len(token_value)}" + # Show first 3 and last 3 characters + return f"{token_value[:3]}...{token_value[-3:]}".replace( + token_value[3:-3], "*" * (len(token_value) - 6) + ) + + +def redact_id(id_value: str | None) -> str: + """Redact ID for safe display.""" + if not id_value: + return "None" + if len(id_value) <= 6: + return f"{'*' * len(id_value)}" + # Show first 3 and last 3 characters + return f"{id_value[:3]}...{id_value[-3:]}" + + +def main(): + """Execute organization token operation examples.""" + + print("=" * 80) + print("ORGANIZATION TOKEN OPERATIONS") + print("=" * 80) + + # Initialize the TFE client + client = TFEClient(TFEConfig.from_env()) + organization_name = "prab-sandbox01" + # ===================================================== + # 1. CREATE ORGANIZATION TOKEN (BASIC) + # ===================================================== + print("\n1. create() - Create a new organization token:") + print("-" * 40) + try: + print(f"Creating token for organization: {organization_name}") + token = client.organization_tokens.create(organization_name) + + print("Token created successfully!") + print(f" Token ID: {redact_id(token.id)}") + print(f" Created At: {token.created_at}") + print(f" Description: {token.description}") + print(f" Token Value: {redact_token(token.token)}") + if token.expired_at: + print(f" Expires At: {token.expired_at}") + print() + + except Exception as e: + print(f" Error: {e}") + print() + + # ===================================================== + # 2. CREATE WITH OPTIONS (WITH EXPIRATION) + # ===================================================== + print("2. create_with_options() - Create token with expiration date:") + print("-" * 40) + try: + # Create a token that expires in 30 days + expiry_date = datetime.utcnow() + timedelta(days=30) + options = OrganizationTokenCreateOptions(expired_at=expiry_date) + + print(f"Creating organization token with expiration date: {expiry_date}") + token = client.organization_tokens.create_with_options( + organization_name, options + ) + + print("Token created with options successfully!") + print(f" Token ID: {redact_id(token.id)}") + print(f" Created At: {token.created_at}") + if token.expired_at: + print(f" Expires At: {token.expired_at}") + print() + + except Exception as e: + print(f" Error: {e}") + print() + + # ===================================================== + print("3. create_with_options() - Create audit-trails token:") + print("-" * 40) + try: + options = OrganizationTokenCreateOptions(token_type=TokenType.AUDIT_TRAILS) + + print(f"Creating audit-trails token for organization: {organization_name}") + token = client.organization_tokens.create_with_options( + organization_name, options + ) + + print(" Audit-trails token created successfully!") + print(f" Token ID: {redact_id(token.id)}") + print(f" Token Value: {redact_token(token.token)}") + print() + + except Exception as e: + print(f"Error: {e}") + print() + + # ===================================================== + print("4. read() - Read the organization token:") + print("-" * 40) + try: + print(f"Reading organization token for organization: {organization_name}") + token = client.organization_tokens.read(organization_name) + + print("Token read successfully!") + print(f" Token ID: {redact_id(token.id)}") + print(f" Created At: {token.created_at}") + print(f" Description: {token.description}") + if token.last_used_at: + print(f" Last Used At: {token.last_used_at}") + if token.expired_at: + print(f" Expires At: {token.expired_at}") + print() + + except Exception as e: + print(f" Error: {e}") + print() + + # ===================================================== + print("5. read_with_options() - Read audit-trails token:") + print("-" * 40) + try: + options = OrganizationTokenReadOptions(token_type=TokenType.AUDIT_TRAILS) + + print(f"Reading audit-trails token for organization: {organization_name}") + token = client.organization_tokens.read_with_options(organization_name, options) + + print(" Audit-trails token read successfully!") + print(f" Token ID: {redact_id(token.id)}") + print(f" Token Value: {redact_token(token.token)}") + print() + + except Exception as e: + print(f" Error: {e}") + print() + + # ===================================================== + print("6. delete() - Delete the organization token:") + print("-" * 40) + try: + print(f"Deleting organization token for organization: {organization_name}") + client.organization_tokens.delete(organization_name) + + print(" Token deleted successfully!") + print() + + except Exception as e: + print(f" Error: {e}") + print() + + # ===================================================== + print("7. delete_with_options() - Delete audit-trails token:") + print("-" * 40) + try: + options = OrganizationTokenDeleteOptions(token_type=TokenType.AUDIT_TRAILS) + + print(f"Deleting audit-trails token for organization: {organization_name}") + client.organization_tokens.delete_with_options(organization_name, options) + + print(" Audit-trails token deleted successfully!") + print() + + except Exception as e: + print(f"Error: {e}") + print() + + print("=" * 80) + print("ORGANIZATION TOKEN OPERATIONS COMPLETED") + print("=" * 80) + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index d1c8337..fcb8dc0 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -10,6 +10,7 @@ from .resources.oauth_client import OAuthClients from .resources.oauth_token import OAuthTokens from .resources.organization_membership import OrganizationMemberships +from .resources.organization_token import OrganizationTokens from .resources.organizations import Organizations from .resources.plan import Plans from .resources.policy import Policies @@ -69,6 +70,7 @@ def __init__(self, config: TFEConfig | None = None): self.plans = Plans(self._transport) self.organizations = Organizations(self._transport) self.organization_memberships = OrganizationMemberships(self._transport) + self.organization_tokens = OrganizationTokens(self._transport) self.projects = Projects(self._transport) self.variables = Variables(self._transport) self.variable_sets = VariableSets(self._transport) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index 8524e6b..20f6b2d 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -93,6 +93,15 @@ OrganizationMembershipStatus, OrgMembershipIncludeOpt, ) + +# ── Organization Token ──────────────────────────────────────────────────────── +from .organization_token import ( + OrganizationToken, + OrganizationTokenCreateOptions, + OrganizationTokenDeleteOptions, + OrganizationTokenReadOptions, + TokenType, +) from .policy import ( Policy, PolicyCreateOptions, @@ -287,7 +296,6 @@ from .ssh_key import ( SSHKey, SSHKeyCreateOptions, - SSHKeyList, SSHKeyListOptions, SSHKeyUpdateOptions, ) @@ -382,7 +390,6 @@ # SSH keys "SSHKey", "SSHKeyCreateOptions", - "SSHKeyList", "SSHKeyListOptions", "SSHKeyUpdateOptions", # Reserved tag keys @@ -486,6 +493,12 @@ "OrganizationMembershipReadOptions", "OrganizationMembershipStatus", "OrgMembershipIncludeOpt", + # Organization tokens + "OrganizationToken", + "OrganizationTokenCreateOptions", + "OrganizationTokenDeleteOptions", + "OrganizationTokenReadOptions", + "TokenType", "OrganizationAccess", "Team", "TeamPermissions", diff --git a/src/pytfe/models/organization_token.py b/src/pytfe/models/organization_token.py new file mode 100644 index 0000000..24f1cb0 --- /dev/null +++ b/src/pytfe/models/organization_token.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING, Any + +from pydantic import BaseModel, ConfigDict, Field + +if TYPE_CHECKING: + pass + + +class TokenType(str, Enum): + """Token type enumeration.""" + + AUDIT_TRAILS = "audit-trails" + + +class OrganizationToken(BaseModel): + """Organization token represents a Terraform Enterprise organization token.""" + + model_config = ConfigDict(extra="forbid") + + id: str = Field(..., description="Organization token ID") + created_at: datetime = Field(..., description="Creation timestamp") + description: str | None = Field(None, description="Token description") + last_used_at: datetime | None = Field(None, description="Last usage timestamp") + token: str | None = Field(None, description="The actual token value") + expired_at: datetime | None = Field(None, description="Token expiration timestamp") + created_by: Any | None = Field( + None, description="The entity that created this token" + ) + + +class OrganizationTokenCreateOptions(BaseModel): + """Options for creating an organization token.""" + + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + expired_at: datetime | None = Field( + None, + description="The token's expiration date. Available in TFE release v202305-1 and later", + ) + token_type: TokenType | None = Field( + None, + alias="token", + description="What type of token to create. Only applicable to HCP Terraform", + ) + + +class OrganizationTokenReadOptions(BaseModel): + """Options for reading an organization token.""" + + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + token_type: TokenType | None = Field( + None, + alias="token", + description="What type of token to read. Only applicable to HCP Terraform", + ) + + +class OrganizationTokenDeleteOptions(BaseModel): + """Options for deleting an organization token.""" + + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + token_type: TokenType | None = Field( + None, + alias="token", + description="What type of token to delete. Only applicable to HCP Terraform", + ) diff --git a/src/pytfe/resources/organization_token.py b/src/pytfe/resources/organization_token.py new file mode 100644 index 0000000..dcbcfb2 --- /dev/null +++ b/src/pytfe/resources/organization_token.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any +from urllib.parse import quote + +from ..errors import ERR_INVALID_ORG +from ..models.organization_token import ( + OrganizationToken, + OrganizationTokenCreateOptions, + OrganizationTokenDeleteOptions, + OrganizationTokenReadOptions, +) +from ..utils import valid_string_id +from ._base import _Service + + +class OrganizationTokens(_Service): + """Organization tokens service for managing TFE organization tokens.""" + + def create(self, organization: str) -> OrganizationToken: + """Create a new organization token, replacing any existing token. + + Args: + organization: The organization name or ID + + Returns: + OrganizationToken: The created organization token + + Raises: + ValueError: If the organization name is invalid + """ + return self.create_with_options(organization) + + def create_with_options( + self, + organization: str, + options: OrganizationTokenCreateOptions | None = None, + ) -> OrganizationToken: + """Create a new organization token with options, replacing any existing token. + + Args: + organization: The organization name or ID + options: Options for creating the token + + Returns: + OrganizationToken: The created organization token + + Raises: + ValueError: If the organization name is invalid + """ + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/authentication-token" + + # Build request body + body: dict[str, Any] = { + "data": { + "type": "authentication-token", + "attributes": {}, + } + } + + # Add optional attributes + if options and options.expired_at is not None: + body["data"]["attributes"]["expired-at"] = options.expired_at.isoformat() + + # Add query parameters for token type if specified + params = {} + if options and options.token_type is not None: + params["token"] = options.token_type.value + + if params: + response = self.t.request("POST", path, json_body=body, params=params) + else: + response = self.t.request("POST", path, json_body=body) + + data = response.json() + + if "data" in data: + return self._parse_organization_token(data["data"]) + + raise ValueError("Invalid response format") + + def read(self, organization: str) -> OrganizationToken: + """Read an organization token. + + Args: + organization: The organization name or ID + + Returns: + OrganizationToken: The organization token + + Raises: + ValueError: If the organization name is invalid + """ + return self.read_with_options(organization, None) + + def read_with_options( + self, + organization: str, + options: OrganizationTokenReadOptions | None = None, + ) -> OrganizationToken: + """Read an organization token with options. + + Args: + organization: The organization name or ID + options: Options for reading the token + + Returns: + OrganizationToken: The organization token + + Raises: + ValueError: If the organization name is invalid + """ + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/authentication-token" + + # Add query parameters for token type if specified + params = {} + if options and options.token_type is not None: + params["token"] = options.token_type.value + + response = self.t.request("GET", path, params=params if params else None) + data = response.json() + + if "data" in data: + return self._parse_organization_token(data["data"]) + + raise ValueError("Invalid response format") + + def delete(self, organization: str) -> None: + """Delete an organization token. + + Args: + organization: The organization name or ID + + Raises: + ValueError: If the organization name is invalid + """ + return self.delete_with_options(organization, None) + + def delete_with_options( + self, + organization: str, + options: OrganizationTokenDeleteOptions | None = None, + ) -> None: + """Delete an organization token with options. + + Args: + organization: The organization name or ID + options: Options for deleting the token + + Raises: + ValueError: If the organization name is invalid + """ + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/authentication-token" + + # Add query parameters for token type if specified + params = {} + if options and options.token_type is not None: + params["token"] = options.token_type.value + + if params: + self.t.request("DELETE", path, params=params) + else: + self.t.request("DELETE", path) + + def _parse_organization_token(self, data: dict[str, Any]) -> OrganizationToken: + """Parse organization token data from API response. + + Args: + data: The token data from the API response + + Returns: + OrganizationToken: The parsed organization token + """ + attributes = data.get("attributes", {}) + + # Parse timestamps + created_at_str = attributes.get("created-at") + created_at = ( + datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + if created_at_str + else datetime.now() + ) + + last_used_at_str = attributes.get("last-used-at") + last_used_at = ( + datetime.fromisoformat(last_used_at_str.replace("Z", "+00:00")) + if last_used_at_str + else None + ) + + expired_at_str = attributes.get("expired-at") + expired_at = ( + datetime.fromisoformat(expired_at_str.replace("Z", "+00:00")) + if expired_at_str + else None + ) + + # Parse created-by relationship + created_by = None + # For now, just set to None since it's mainly for display + + return OrganizationToken( + id=data.get("id", ""), + created_at=created_at, + description=attributes.get("description", ""), + last_used_at=last_used_at, + token=attributes.get("token", ""), + expired_at=expired_at, + created_by=created_by, + ) diff --git a/tests/units/test_organization_token.py b/tests/units/test_organization_token.py new file mode 100644 index 0000000..826f223 --- /dev/null +++ b/tests/units/test_organization_token.py @@ -0,0 +1,313 @@ +"""Unit tests for the organization token module.""" + +from datetime import datetime +from unittest.mock import Mock, patch + +import pytest + +from pytfe._http import HTTPTransport +from pytfe.errors import ERR_INVALID_ORG +from pytfe.models.organization_token import ( + OrganizationToken, + OrganizationTokenCreateOptions, + OrganizationTokenDeleteOptions, + OrganizationTokenReadOptions, + TokenType, +) +from pytfe.resources.organization_token import OrganizationTokens + + +class TestOrganizationTokens: + """Test the OrganizationTokens service class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def org_tokens_service(self, mock_transport): + """Create an OrganizationTokens service with mocked transport.""" + return OrganizationTokens(mock_transport) + + def test_create_success(self, org_tokens_service): + """Test successful create operation.""" + mock_response_data = { + "data": { + "id": "at-test123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "description": "Test token", + "token": "test-token-value", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + result = org_tokens_service.create("test-org") + + mock_t.request.assert_called_once() + call_args = mock_t.request.call_args + + assert call_args[0][0] == "POST" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + assert "json_body" in call_args[1] + assert "data" in call_args[1]["json_body"] + assert "attributes" in call_args[1]["json_body"]["data"] + assert isinstance(result, OrganizationToken) + assert result.id == "at-test123" + assert result.description == "Test token" + + def test_create_validation_errors(self, org_tokens_service): + """Test create with invalid organization name.""" + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.create("") + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.create(None) + + def test_create_with_options_expiration_success(self, org_tokens_service): + """Test create with options including expiration date.""" + mock_response_data = { + "data": { + "id": "at-exp-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "token": "token-value", + "expired-at": "2024-01-01T00:00:00Z", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + expiry = datetime(2024, 1, 1, 0, 0, 0) + options = OrganizationTokenCreateOptions(expired_at=expiry) + + result = org_tokens_service.create_with_options("test-org", options) + + assert isinstance(result, OrganizationToken) + assert result.expired_at is not None + + call_args = mock_t.request.call_args + assert call_args[0][0] == "POST" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + body = call_args[1]["json_body"] + assert "expired-at" in body["data"]["attributes"] + assert body["data"]["attributes"]["expired-at"] == "2024-01-01T00:00:00" + + def test_create_with_options_token_type_success(self, org_tokens_service): + """Test create with options including token type.""" + mock_response_data = { + "data": { + "id": "at-audit-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "token": "audit-token-value", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + options = OrganizationTokenCreateOptions(token_type=TokenType.AUDIT_TRAILS) + result = org_tokens_service.create_with_options("test-org", options) + + assert isinstance(result, OrganizationToken) + call_args = mock_t.request.call_args + assert call_args[0][0] == "POST" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + assert "params" in call_args[1] + assert call_args[1]["params"]["token"] == "audit-trails" + assert "json_body" in call_args[1] + + def test_read_success(self, org_tokens_service): + """Test successful read operation.""" + mock_response_data = { + "data": { + "id": "at-read-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "description": "Read token", + "token": "read-token-value", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + result = org_tokens_service.read("test-org") + + assert isinstance(result, OrganizationToken) + assert result.id == "at-read-123" + + call_args = mock_t.request.call_args + assert call_args[0][0] == "GET" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + + def test_read_validation_errors(self, org_tokens_service): + """Test read with invalid organization name.""" + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.read("") + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.read(None) + + def test_read_with_options_token_type_success(self, org_tokens_service): + """Test read with options including token type.""" + mock_response_data = { + "data": { + "id": "at-audit-read-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "token": "audit-read-value", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + options = OrganizationTokenReadOptions(token_type=TokenType.AUDIT_TRAILS) + result = org_tokens_service.read_with_options("test-org", options) + + assert isinstance(result, OrganizationToken) + call_args = mock_t.request.call_args + assert call_args[0][0] == "GET" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + assert call_args[1]["params"]["token"] == "audit-trails" + + def test_delete_success(self, org_tokens_service): + """Test successful delete operation.""" + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = Mock() + + result = org_tokens_service.delete("test-org") + + assert result is None + call_args = mock_t.request.call_args + assert call_args[0][0] == "DELETE" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + + def test_delete_validation_errors(self, org_tokens_service): + """Test delete with invalid organization name.""" + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.delete("") + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + org_tokens_service.delete(None) + + def test_delete_with_options_token_type_success(self, org_tokens_service): + """Test delete with options including token type.""" + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = Mock() + + options = OrganizationTokenDeleteOptions(token_type=TokenType.AUDIT_TRAILS) + result = org_tokens_service.delete_with_options("test-org", options) + + assert result is None + call_args = mock_t.request.call_args + assert call_args[0][0] == "DELETE" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/authentication-token" + ) + assert call_args[1]["params"]["token"] == "audit-trails" + + def test_parse_token_minimal(self, org_tokens_service): + """Test parsing token with minimal data.""" + data = { + "id": "at-minimal-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "description": "Minimal token", + "token": "minimal-value", + }, + "relationships": {}, + } + + result = org_tokens_service._parse_organization_token(data) + + assert result.id == "at-minimal-123" + assert isinstance(result.created_at, datetime) + assert result.description == "Minimal token" + assert result.token == "minimal-value" + assert result.last_used_at is None + assert result.expired_at is None + + def test_parse_token_all_fields(self, org_tokens_service): + """Test parsing token with all fields populated.""" + data = { + "id": "at-full-123", + "attributes": { + "created-at": "2023-01-01T00:00:00Z", + "description": "Full token", + "token": "full-value", + "last-used-at": "2023-01-15T12:30:00Z", + "expired-at": "2024-01-01T00:00:00Z", + }, + "relationships": {}, + } + + result = org_tokens_service._parse_organization_token(data) + + assert result.id == "at-full-123" + assert result.description == "Full token" + assert result.token == "full-value" + assert result.last_used_at is not None + assert result.expired_at is not None + assert isinstance(result.last_used_at, datetime) + assert isinstance(result.expired_at, datetime) + + def test_invalid_response_format_on_create(self, org_tokens_service): + """Test handling of invalid response format when creating.""" + mock_response = Mock() + mock_response.json.return_value = {"error": "Invalid"} + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + with pytest.raises(ValueError, match="Invalid response format"): + org_tokens_service.create("test-org") + + def test_invalid_response_format_on_read(self, org_tokens_service): + """Test handling of invalid response format when reading.""" + mock_response = Mock() + mock_response.json.return_value = {"error": "Invalid"} + + with patch.object(org_tokens_service, "t") as mock_t: + mock_t.request.return_value = mock_response + + with pytest.raises(ValueError, match="Invalid response format"): + org_tokens_service.read("test-org")