diff --git a/.gitignore b/.gitignore index caa3e08..24939dc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ __pycache__/ #Environments .env .venv +.venv-*/ env/ #Session Cache diff --git a/README.md b/README.md index ff5e46c..d0cefb7 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,10 @@ async def callback(request: Request): return RedirectResponse(url="/") ``` +#### Organizations + +The SDK supports [Auth0 Organizations](https://auth0.com/docs/organizations) with first-class `organization` and `invitation` parameters on `ServerClient` and `StartInteractiveLoginOptions`. Token claim validation is enforced automatically at callback. For setup, invitation flows, error handling, and reading org data from the session, see [examples/InteractiveLogin.md](examples/InteractiveLogin.md#8-organizations). + ### 4. Login with Custom Token Exchange If you're migrating from a legacy authentication system or integrating with a custom identity provider, you can exchange external tokens for Auth0 tokens using the OAuth 2.0 Token Exchange specification (RFC 8693): diff --git a/examples/InteractiveLogin.md b/examples/InteractiveLogin.md index ba8cb9e..9635647 100644 --- a/examples/InteractiveLogin.md +++ b/examples/InteractiveLogin.md @@ -1,8 +1,8 @@ # Interactive Login -Interactive login in `auth0‑server‑python` is a two‑step process. First, you start the login flow by obtaining an authorization URL; then, after the user authenticates at Auth0 and is redirected back, you complete the login flow to exchange the authorization code for tokens. +Interactive login in `auth0‑server‑python` is a two‑step process. First, you start the login flow by obtaining an authorization URL; then, after the user authenticates at Auth0 and is redirected back, you complete the login flow to exchange the authorization code for tokens. -This guide covers how to customize the authorization parameters, pass custom app state, enable **Pushed Authorization Requests (PAR)** and **Rich Authorization Requests (RAR)**, and supply store options. +This guide covers how to customize the authorization parameters, pass custom app state, enable **Pushed Authorization Requests (PAR)** and **Rich Authorization Requests (RAR)**, supply store options, and log in to an organization. ## 1. Starting Interactive Login @@ -28,7 +28,7 @@ Now call `start_interactive_login()` to obtain the authorization URL and redirec authorization_url = await server_client.start_interactive_login() ``` ## 2. Passing Authorization Params -You can customize the parameters sent to Auth0’s `/authorize` endpoint in two ways: +You can customize the parameters sent to Auth0's `/authorize` endpoint in two ways: ### Global Configuration @@ -76,7 +76,7 @@ result = await server_client.complete_interactive_login(callback_url) print(result.get("app_state").get("returnTo")) # Should output: http://localhost:3000/dashboard ``` > [!NOTE] ->- `authorize_url` is the URL for Auth0’s /authorize endpoint (or a URL built from PAR, if enabled). +>- `authorize_url` is the URL for Auth0's /authorize endpoint (or a URL built from PAR, if enabled). >- `callback_url` is the URL Auth0 redirects back to after authentication. ## 4. Using Pushed Authorization Requests (PAR) @@ -89,7 +89,7 @@ authorization_url = await server_client.start_interactive_login({ }) ``` >[!IMPORTANT] -> Using PAR requires that your Auth0 tenant is configured to support it. Refer to Auth0’s documentation for details. +> Using PAR requires that your Auth0 tenant is configured to support it. Refer to Auth0's documentation for details. ## 5. Using Pushed Authorization Requests and Rich Authorization Requests (RAR) @@ -137,4 +137,99 @@ print(result.get("authorization_details")) # Rich Authorization Re ``` >[!NOTE] ->The `callback_url` must include the necessary parameters (`state` and `code`) that Auth0 sends upon successful authentication. \ No newline at end of file +>The `callback_url` must include the necessary parameters (`state` and `code`) that Auth0 sends upon successful authentication. + +## 8. Organizations + +[Auth0 Organizations](https://auth0.com/docs/organizations) lets you manage teams, business customers, and partner companies as distinct entities with their own login flows and membership. + +### Logging in to an organization + +Set `organization` on `ServerClient` to enforce it for every login (dedicated-org), or pass it per login via `StartInteractiveLoginOptions` (multi-org): + +```python +from auth0_server_python.auth_server.server_client import ServerClient +from auth0_server_python.auth_types import StartInteractiveLoginOptions + +# Dedicated-org: every login enforces this organization +auth0 = ServerClient( + domain="YOUR_AUTH0_DOMAIN", + client_id="YOUR_CLIENT_ID", + client_secret="YOUR_CLIENT_SECRET", + secret="YOUR_SECRET", + organization="org_abc123", + authorization_params={"redirect_uri": "http://localhost:3000/auth/callback"} +) + +# Multi-org: pass organization per login +authorization_url = await auth0.start_interactive_login( + StartInteractiveLoginOptions(organization="org_xyz789"), + store_options={"request": request, "response": response} +) +``` + +`organization` accepts either an org ID (`org_` prefix) or an org name. The SDK validates the corresponding `org_id` or `org_name` claim in the returned token automatically at callback. + +> [!IMPORTANT] +> In the multi-org pattern, validate that the `organization` value comes from a trusted source — never pass it unvalidated directly from user input. + +### Accepting an invitation + +When a user follows an invitation link, extract `organization` and `invitation` from the URL and pass them as typed fields: + +```python +@app.get("/auth/login") +async def login(request: Request, response: Response): + authorization_url = await auth0.start_interactive_login( + StartInteractiveLoginOptions( + organization=request.query_params.get("organization"), + invitation=request.query_params.get("invitation"), + ), + store_options={"request": request, "response": response} + ) + return RedirectResponse(url=authorization_url) +``` + +### Handling organization errors + +Auth0 returns organization errors as standard OAuth error responses (`error` + `error_description`). The SDK surfaces these as `ApiError`, preserving the raw values so you can branch on `error.code`: + +```python +from auth0_server_python.error import ApiError, OrganizationTokenValidationError + +@app.get("/auth/callback") +async def callback(request: Request, response: Response): + try: + result = await auth0.complete_interactive_login( + str(request.url), + store_options={"request": request, "response": response} + ) + return RedirectResponse(url="/dashboard") + except OrganizationTokenValidationError: + return RedirectResponse(url="/error?reason=org_mismatch") + except ApiError as e: + return RedirectResponse(url=f"/error?reason={e.code}") +``` + +| Exception | When raised | +|-----------|-------------| +| `OrganizationTokenValidationError` | `org_id` / `org_name` in the returned token does not match what was requested | +| `ApiError` | Auth0 rejected the authorization request — inspect `error.code` and `error.message` for the raw OAuth error and description | + +Common `ApiError.code` values for org flows: + +| `error.code` | Typical cause | +|---|---| +| `access_denied` | User not a member, connection not enabled for org, member quota exceeded | +| `invalid_request` | Invalid org format, feature disabled, client not configured for orgs, expired or invalid invitation ticket | + +### Reading organization data from the session + +After a successful org login, `org_id` is always present in the token. `org_name` is also present when the organization has the org name feature enabled: + +```python +user = await auth0.get_user(store_options={"request": request, "response": response}) +if user: + print(user.get("org_id")) # always present; use as stable identifier + print(user.get("org_name")) # present when org name is enabled +``` diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 91de45d..68fbccf 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -54,6 +54,7 @@ MfaRequiredError, MissingRequiredArgumentError, MissingTransactionError, + OrganizationTokenValidationError, PollingApiError, StartLinkUserError, ) @@ -61,6 +62,7 @@ from auth0_server_python.utils import PKCE, URL, State from auth0_server_python.utils.helpers import ( build_domain_resolver_context, + validate_org_claims, validate_resolved_domain_value, ) @@ -96,6 +98,7 @@ def __init__( state_identifier: str = "_a0_session", authorization_params: Optional[dict[str, Any]] = None, pushed_authorization_requests: bool = False, + organization: Optional[str] = None, ): """ Initialize the Auth0 server client. @@ -112,6 +115,9 @@ def __init__( state_identifier: Identifier for state data authorization_params: Default parameters for authorization requests pushed_authorization_requests: Whether to use Pushed Authorization Requests + organization: Default organization for all login flows from this client. + Can be an org ID (e.g. 'org_abc123') or an org name (e.g. 'acme-corp'). + Per-login values passed in StartInteractiveLoginOptions always override this. """ if not secret: raise MissingRequiredArgumentError("secret") @@ -146,6 +152,7 @@ def __init__( self._secret = secret self._default_authorization_params = authorization_params or {} self._pushed_authorization_requests = pushed_authorization_requests # store the flag + self._organization = organization # Initialize stores self._transaction_store = transaction_store @@ -207,6 +214,7 @@ def _normalize_url(self, value: str) -> str: return value.rstrip('/') + async def _resolve_current_domain(self, store_options=None) -> str: """Resolve domain from resolver function or return static domain.""" if self._domain_resolver: @@ -502,6 +510,16 @@ async def start_interactive_login( merged_scope = self._merge_scope_with_defaults(requested_scope, audience) auth_params["scope"] = merged_scope + # Typed org/invitation fields win over anything already in auth_params from authorization_params. + resolved_org = options.organization or self._organization + if resolved_org and not resolved_org.strip(): + raise InvalidArgumentError("organization", "organization must not be blank") + if resolved_org: + auth_params["organization"] = resolved_org + + if options.invitation: + auth_params["invitation"] = options.invitation + # Build the transaction data to store with domain transaction_data = TransactionData( code_verifier=code_verifier, @@ -509,6 +527,7 @@ async def start_interactive_login( audience=audience, domain=origin_domain, redirect_uri=auth_params.get("redirect_uri"), + organization=resolved_org, ) # Store the transaction data @@ -638,8 +657,26 @@ async def complete_interactive_login( user_info = token_response.get("userinfo") user_claims = None id_token = token_response.get("id_token") + expected_org = transaction_data.organization + + if not user_info and not id_token and expected_org: + raise OrganizationTokenValidationError( + "Organization was requested but the token response included neither an ID token nor userinfo; " + "cannot verify organization membership" + ) if user_info: + if not isinstance(user_info, dict): + if expected_org: + raise OrganizationTokenValidationError( + "Userinfo response is not a valid claims dictionary; cannot verify organization membership" + ) + raise ApiError( + "invalid_response", + "Userinfo response is not a valid claims dictionary" + ) + if expected_org: + validate_org_claims(user_info, expected_org) user_claims = UserClaims.parse_obj(user_info) elif id_token: # Fetch JWKS for signature verification @@ -656,6 +693,10 @@ async def complete_interactive_login( if self._normalize_url(token_issuer) != self._normalize_url(origin_issuer): raise IssuerValidationError("ID token issuer mismatch. Ensure your Auth0 domain is configured correctly.") + # Organization claim validation — mandatory when org was requested. + if expected_org: + validate_org_claims(claims, expected_org) + user_claims = UserClaims.parse_obj(claims) except ValueError as e: raise ApiError("jwks_key_not_found", str(e)) @@ -1283,7 +1324,10 @@ async def backchannel_authentication( while time.time() < end_time: # Make token request try: - token_response = await self.backchannel_authentication_grant(auth_req_id, store_options=store_options) + token_response = await self.backchannel_authentication_grant( + auth_req_id, + store_options=store_options, + ) return token_response except Exception as e: @@ -2386,6 +2430,7 @@ async def login_with_custom_token_exchange( # Extract user claims from ID token if present user_claims = None sid = PKCE.generate_random_string(32) # Default sid + if token_response.id_token: # Fetch JWKS and verify ID token signature jwks = await self._get_jwks_cached(domain, metadata) @@ -2467,7 +2512,7 @@ async def login_with_custom_token_exchange( return result except Exception as e: - if isinstance(e, (CustomTokenExchangeError, ApiError)): + if isinstance(e, (CustomTokenExchangeError, ApiError, IssuerValidationError)): raise raise CustomTokenExchangeError( CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED, diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 055103a..0cb77cd 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -22,6 +22,7 @@ class UserClaims(BaseModel): email: Optional[str] = None email_verified: Optional[bool] = None org_id: Optional[str] = None + org_name: Optional[str] = None class Config: extra = "allow" # Allow additional fields not defined in the model @@ -91,6 +92,7 @@ class TransactionData(BaseModel): auth_session: Optional[str] = None redirect_uri: Optional[str] = None domain: Optional[str] = None + organization: Optional[str] = None class Config: extra = "allow" # Allow additional fields not defined in the model @@ -128,6 +130,7 @@ class ServerClientOptionsBase(BaseModel): transaction_identifier: Optional[str] = "_a0_tx" state_identifier: Optional[str] = "_a0_session" custom_fetch: Optional[Any] = None # Function type hint would be more complex + organization: Optional[str] = None class ServerClientOptionsWithSecret(ServerClientOptionsBase): @@ -147,6 +150,8 @@ class StartInteractiveLoginOptions(BaseModel): pushed_authorization_requests: Optional[bool] = False app_state: Optional[Any] = None authorization_params: Optional[dict[str, Any]] = None + organization: Optional[str] = None + invitation: Optional[str] = None class LogoutOptions(BaseModel): diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index db4f28e..600d7e4 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -200,6 +200,18 @@ class AccessTokenErrorCode: DOMAIN_MISMATCH = "domain_mismatch" +class OrganizationTokenValidationError(Auth0Error): + """ + Raised when org_id or org_name claim in the ID token fails validation + against the organization value that was requested at login. + """ + code = "organization_token_validation_error" + + def __init__(self, message: str): + super().__init__(message) + self.name = "OrganizationTokenValidationError" + + class AccessTokenForConnectionErrorCode: """Error codes for connection-specific token operations.""" MISSING_REFRESH_TOKEN = "missing_refresh_token" diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 47ba774..1c76d49 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -1,5 +1,6 @@ import json import time +import unicodedata from unittest.mock import ANY, AsyncMock, MagicMock, patch from urllib.parse import parse_qs, urlparse @@ -24,6 +25,7 @@ LoginWithCustomTokenExchangeOptions, LogoutOptions, MfaRequirements, + StartInteractiveLoginOptions, StateData, TransactionData, ) @@ -43,6 +45,7 @@ MfaRequiredError, MissingRequiredArgumentError, MissingTransactionError, + OrganizationTokenValidationError, PollingApiError, StartLinkUserError, ) @@ -4816,3 +4819,945 @@ async def _fake_fetch(self, domain): assert exc.value.mfa_requirements is not None finally: ServerClient._fetch_oidc_metadata = original_fetch + + +# ORGANIZATIONS SUPPORT TESTS + +def _make_org_client(mocker, transaction_data: TransactionData, **extra): + """Helper: build a ServerClient with mocked stores and standard JWT mocks.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = transaction_data + mock_state_store = AsyncMock() + + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + **extra + ) + + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + } + ) + mocker.patch.object( + client, + "_get_jwks_cached", + return_value={"keys": [{"kty": "RSA", "kid": "test-key"}]} + ) + async_fetch_token = AsyncMock(return_value={ + "access_token": "at123", + "id_token": "id_token_jwt", + "scope": "openid profile", + }) + mocker.patch.object(client._oauth, "fetch_token", async_fetch_token) + mocker.patch("jwt.get_unverified_header", return_value={"kid": "test-key"}) + mock_signing_key = mocker.MagicMock() + mock_signing_key.key = "mock_pem_key" + mocker.patch("jwt.PyJWK.from_dict", return_value=mock_signing_key) + return client + + + +@pytest.mark.asyncio +async def test_org_by_id_matching_claim_succeeds(mocker): + """Token with matching org_id passes validation.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", + "aud": "test_client", "org_id": "org_abc123", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result["state_data"]["user"]["org_id"] == "org_abc123" + + + +@pytest.mark.asyncio +async def test_org_by_id_missing_claim_raises(mocker): + """Token missing org_id raises OrganizationTokenValidationError.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + # no org_id + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "org_id" in str(exc.value) + assert "must be a string present" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_by_id_wrong_claim_raises(mocker): + """Token with wrong org_id raises OrganizationTokenValidationError with mismatch detail.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": "org_attacker", + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "mismatch" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_by_id_null_claim_raises(mocker): + """Token with null org_id raises OrganizationTokenValidationError.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": None, + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "org_id" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_by_name_exact_match_succeeds(mocker): + """Token with matching org_name (exact case) passes validation.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="acme-corp"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_name": "acme-corp", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result is not None + + + +@pytest.mark.asyncio +async def test_org_by_name_case_insensitive_match_succeeds(mocker): + """Token with org_name differing only in case passes validation.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="ACME-CORP"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_name": "acme-corp", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result is not None + + + +@pytest.mark.asyncio +async def test_org_by_name_missing_claim_raises(mocker): + """Token missing org_name raises OrganizationTokenValidationError.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="acme-corp"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + # no org_name + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "org_name" in str(exc.value) + assert "must be a string present" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_by_name_wrong_claim_raises(mocker): + """Token with wrong org_name raises OrganizationTokenValidationError with detail.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="acme-corp"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_name": "evil-corp", + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "mismatch" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_no_org_requested_token_with_org_id_passes(mocker): + """When no org was requested, tokens carrying org_id must not be rejected.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com"), # no organization + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": "org_abc123", "org_name": "acme", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result["state_data"]["user"]["org_id"] == "org_abc123" + assert result["state_data"]["user"]["org_name"] == "acme" + + + +@pytest.mark.asyncio +async def test_no_org_requested_plain_token_passes(mocker): + """When no org was requested, a token without org claims passes normally.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result is not None + + + +@pytest.mark.asyncio +async def test_invitation_and_org_forwarded_to_authorize(mocker): + """organization and invitation appear in the authorization URL.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + organization="org_abc123", + invitation="inv_token_xyz", + ) + ) + + assert "organization=org_abc123" in url + assert "invitation=inv_token_xyz" in url + + # Confirm transaction stores the organization + stored = mock_tx_store.set.call_args[0][1] + assert stored.organization == "org_abc123" + + +@pytest.mark.asyncio +async def test_invitation_without_org_forwarded_to_authorize(mocker): + """invitation alone appears in the URL; no organization param.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + invitation="inv_token_xyz", + ) + ) + + assert "invitation=inv_token_xyz" in url + assert "organization=" not in url + + +@pytest.mark.asyncio +async def test_per_login_org_overrides_client_org(mocker): + """Per-login organization overrides the client-level default.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + organization="org_default", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions(organization="org_override") + ) + + assert "organization=org_override" in url + assert "org_default" not in url + + stored = mock_tx_store.set.call_args[0][1] + assert stored.organization == "org_override" + + + +@pytest.mark.asyncio +async def test_client_level_org_used_when_no_per_login_org(mocker): + """Client-level organization is used when no per-login org is set.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + organization="org_default", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login(StartInteractiveLoginOptions()) + + assert "organization=org_default" in url + stored = mock_tx_store.set.call_args[0][1] + assert stored.organization == "org_default" + + + +@pytest.mark.asyncio +async def test_org_name_present_in_user_claims_after_org_login(mocker): + """org_id and org_name both surface in session user claims.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": "org_abc123", "org_name": "acme-corp", + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + user = result["state_data"]["user"] + assert user["org_id"] == "org_abc123" + assert user["org_name"] == "acme-corp" + + +# Adversarial tests + +@pytest.mark.asyncio +async def test_adv_org_id_is_integer_not_string_raises(mocker): + """org_id claim as integer (not string) raises OrganizationTokenValidationError.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": 12345, + }) + with pytest.raises(OrganizationTokenValidationError): + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + + +@pytest.mark.asyncio +async def test_adv_org_name_is_array_raises(mocker): + """org_name claim as array raises OrganizationTokenValidationError.""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="acme"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_name": ["acme", "other"], + }) + with pytest.raises(OrganizationTokenValidationError): + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + + +@pytest.mark.asyncio +async def test_adv_empty_string_org_id_raises(mocker): + """Empty string org_id claim raises OrganizationTokenValidationError (mismatch).""" + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123"), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_id": "", + }) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "mismatch" in str(exc.value) + + +@pytest.mark.asyncio +async def test_adv_org_in_authorization_params_is_forwarded(mocker): + """organization passed via authorization_params is forwarded to /authorize.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + authorization_params={"organization": "org_via_dict"}, + ) + ) + + parsed = parse_qs(urlparse(url).query) + assert parsed["organization"] == ["org_via_dict"] + + +@pytest.mark.asyncio +async def test_adv_invitation_in_authorization_params_is_forwarded(mocker): + """invitation passed via authorization_params is forwarded to /authorize.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + authorization_params={"invitation": "inv_via_dict"}, + ) + ) + + parsed = parse_qs(urlparse(url).query) + assert parsed["invitation"] == ["inv_via_dict"] + + +@pytest.mark.asyncio +async def test_adv_typed_invitation_wins_over_dict(mocker): + """Typed invitation field wins when both typed and dict values are present.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + invitation="inv_typed", + authorization_params={"invitation": "inv_via_dict"}, + ) + ) + + parsed = parse_qs(urlparse(url).query) + assert parsed["invitation"] == ["inv_typed"] + + +@pytest.mark.asyncio +async def test_adv_typed_org_wins_over_dict_injection(mocker): + """Typed organization field wins when both typed and dict values are present.""" + mock_tx_store = AsyncMock() + mock_state_store = AsyncMock() + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + url = await client.start_interactive_login( + StartInteractiveLoginOptions( + organization="org_legitimate", + authorization_params={"organization": "org_attacker"}, + ) + ) + + # TransactionData stores the typed value + stored = mock_tx_store.set.call_args[0][1] + assert stored.organization == "org_legitimate" + + # URL must contain only the typed value + parsed = parse_qs(urlparse(url).query) + org_values = parsed.get("organization", []) + assert org_values == ["org_legitimate"] + + +@pytest.mark.asyncio +async def test_adv_unicode_nfc_nfd_org_name_matches(mocker): + """ADV-005: NFC and NFD representations of the same org name are treated as equal.""" + # "café" NFC: é is U+00E9 (single precomposed codepoint) + # "café" NFD: é is U+0065 U+0301 (base letter + combining accent) + nfc_name = unicodedata.normalize("NFC", "café") + nfd_name = unicodedata.normalize("NFD", "café") + assert nfc_name != nfd_name, "precondition: NFC and NFD byte sequences differ" + + client = _make_org_client( + mocker, + TransactionData(code_verifier="cv", domain="tenant.auth0.com", organization=nfd_name), + ) + mocker.patch("jwt.decode", return_value={ + "sub": "u1", "iss": "https://tenant.auth0.com/", "aud": "test_client", + "org_name": nfc_name, + }) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result is not None + + +# Error class properties + +def test_organization_token_validation_error_code(): + """OrganizationTokenValidationError has the correct code and message.""" + err = OrganizationTokenValidationError("test message") + assert err.code == "organization_token_validation_error" + assert err.name == "OrganizationTokenValidationError" + assert str(err) == "test message" + + + +@pytest.mark.asyncio +async def test_org_userinfo_path_matching_org_id_succeeds(mocker): + """userinfo response with matching org_id passes validation.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123" + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + # Token response returns userinfo (no id_token) + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + "userinfo": {"sub": "u1", "org_id": "org_abc123"}, + })) + result = await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert result["state_data"]["user"]["org_id"] == "org_abc123" + + +@pytest.mark.asyncio +async def test_org_userinfo_path_wrong_org_id_raises(mocker): + """userinfo response with wrong org_id raises OrganizationTokenValidationError.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123" + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + "userinfo": {"sub": "u1", "org_id": "org_different"}, + })) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "mismatch" in str(exc.value) + + +@pytest.mark.asyncio +async def test_org_userinfo_path_missing_org_id_raises(mocker): + """userinfo response missing org_id raises OrganizationTokenValidationError.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123" + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + "userinfo": {"sub": "u1"}, # no org_id + })) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "must be a string present" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_requested_no_userinfo_no_id_token_fails_closed(mocker): + """org was requested but token response has neither user_info nor id_token — fails closed.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123" + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + # neither user_info nor id_token + })) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "neither" in str(exc.value) + + + +@pytest.mark.asyncio +async def test_org_userinfo_non_dict_raises_organization_error(mocker): + """Non-dict truthy userinfo raises OrganizationTokenValidationError when org is requested.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc123" + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + # Return a string (truthy, but not a dict) as userinfo + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + "userinfo": "not-a-dict", + })) + with pytest.raises(OrganizationTokenValidationError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert "valid claims dictionary" in str(exc.value) + + +@pytest.mark.asyncio +async def test_userinfo_non_dict_no_org_raises_api_error(mocker): + """Non-dict truthy userinfo without org requested raises ApiError (not OrganizationTokenValidationError).""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", + ) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = None + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + mocker.patch.object( + client, "_get_oidc_metadata_cached", + return_value={ + "issuer": "https://tenant.auth0.com/", + "token_endpoint": "https://tenant.auth0.com/token", + } + ) + mocker.patch.object(client._oauth, "fetch_token", AsyncMock(return_value={ + "access_token": "at123", + "userinfo": "not-a-dict", + })) + with pytest.raises(ApiError) as exc: + await client.complete_interactive_login("http://localhost/cb?code=abc&state=xyz") + assert exc.value.code == "invalid_response" + assert "valid claims dictionary" in str(exc.value) + + +# --------------------------------------------------------------------------- +# complete_interactive_login — org errors raised as ApiError +# --------------------------------------------------------------------------- + +def _make_org_callback_client(mock_tx_store, mock_state_store, org=None): + return ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + secret="test_secret_key_32_chars_long!!", + organization=org, + transaction_store=mock_tx_store, + state_store=mock_state_store, + ) + + +@pytest.mark.asyncio +async def test_callback_org_access_denied_raises_api_error(): + """access_denied from org membership check → ApiError with original code and description.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc" + ) + client = _make_org_callback_client(mock_tx_store, AsyncMock(), org="org_abc") + desc = "user u1 is not part of the org_abc organization" + url = f"http://localhost/cb?state=xyz&error=access_denied&error_description={desc}" + with pytest.raises(ApiError) as exc: + await client.complete_interactive_login(url) + assert exc.value.code == "access_denied" + assert desc in exc.value.message + + +@pytest.mark.asyncio +async def test_callback_org_invalid_format_raises_api_error(): + """invalid_request with bad org format → ApiError with original code and description.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="my-org" + ) + client = _make_org_callback_client(mock_tx_store, AsyncMock()) + desc = "authorization request parameter organization must be an organization id" + url = f"http://localhost/cb?state=xyz&error=invalid_request&error_description={desc}" + with pytest.raises(ApiError) as exc: + await client.complete_interactive_login(url) + assert exc.value.code == "invalid_request" + assert desc in exc.value.message + + +@pytest.mark.asyncio +async def test_callback_invitation_error_raises_api_error(): + """Expired/invalid invitation ticket → ApiError with original code and description.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", organization="org_abc" + ) + client = _make_org_callback_client(mock_tx_store, AsyncMock(), org="org_abc") + desc = "invalid_user_invitation_ticket: ticket has already been used" + url = f"http://localhost/cb?state=xyz&error=invalid_request&error_description={desc}" + with pytest.raises(ApiError) as exc: + await client.complete_interactive_login(url) + assert exc.value.code == "invalid_request" + assert desc in exc.value.message + + +@pytest.mark.asyncio +async def test_callback_error_raises_api_error(): + """Any auth error → ApiError preserving the raw error code and description.""" + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="cv", domain="tenant.auth0.com", + ) + client = _make_org_callback_client(mock_tx_store, AsyncMock()) + url = "http://localhost/cb?state=xyz&error=access_denied&error_description=User+cancelled" + with pytest.raises(ApiError) as exc: + await client.complete_interactive_login(url) + assert type(exc.value) is ApiError + assert exc.value.code == "access_denied" + + +# --------------------------------------------------------------------------- +# Org resolution — per-login vs client-level precedence +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_per_login_org_overrides_client_default(mocker): + """ + A per-login org value overrides the client-level default. + Both paths end up in TransactionData — this is the multi-org scenario regression guard. + """ + mock_tx_store = AsyncMock() + stored_tx = None + + async def capture_set(key, value, options=None): + nonlocal stored_tx + stored_tx = value + + mock_tx_store.set.side_effect = capture_set + + client = ServerClient( + domain="tenant.auth0.com", + client_id="cid", + client_secret="csec", + secret="test_secret_key_32_chars_long!!", + organization="org_default", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=AsyncMock(), + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + mocker.patch.object(client._oauth, "create_authorization_url", + return_value=("https://tenant.auth0.com/authorize?state=x", "x")) + + await client.start_interactive_login( + StartInteractiveLoginOptions(organization="org_override") + ) + + assert stored_tx.organization == "org_override" + + +@pytest.mark.asyncio +async def test_blank_org_raises_invalid_argument_error(mocker): + """Whitespace-only organization value is rejected with InvalidArgumentError.""" + client = ServerClient( + domain="tenant.auth0.com", + client_id="cid", + client_secret="csec", + secret="test_secret_key_32_chars_long!!", + redirect_uri="https://app.example.com/callback", + transaction_store=AsyncMock(), + state_store=AsyncMock(), + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + + with pytest.raises(InvalidArgumentError) as exc: + await client.start_interactive_login( + StartInteractiveLoginOptions(organization=" ") + ) + assert "organization" in exc.value.argument + + +@pytest.mark.asyncio +async def test_client_level_org_used_when_options_org_is_none_not_set(mocker): + """ + When StartInteractiveLoginOptions does not set organization (defaults to None), + the client-level default is used — same as before the fix. + """ + mock_tx_store = AsyncMock() + stored_tx = None + + async def capture_set(key, value, options=None): + nonlocal stored_tx + stored_tx = value + + mock_tx_store.set.side_effect = capture_set + + client = ServerClient( + domain="tenant.auth0.com", + client_id="cid", + client_secret="csec", + secret="test_secret_key_32_chars_long!!", + organization="org_default", + redirect_uri="https://app.example.com/callback", + transaction_store=mock_tx_store, + state_store=AsyncMock(), + ) + mocker.patch.object(client, "_get_oidc_metadata_cached", return_value={ + "issuer": "https://tenant.auth0.com/", + "authorization_endpoint": "https://tenant.auth0.com/authorize", + }) + mocker.patch.object(client._oauth, "create_authorization_url", + return_value=("https://tenant.auth0.com/authorize?state=x", "x")) + + await client.start_interactive_login(StartInteractiveLoginOptions()) + + assert stored_tx.organization == "org_default" diff --git a/src/auth0_server_python/utils/helpers.py b/src/auth0_server_python/utils/helpers.py index 05cb0f8..1fff76a 100644 --- a/src/auth0_server_python/utils/helpers.py +++ b/src/auth0_server_python/utils/helpers.py @@ -3,11 +3,12 @@ import secrets import string import time +import unicodedata from typing import Any, Optional from urllib.parse import parse_qs, urlencode, urlparse from auth0_server_python.auth_types import DomainResolverContext -from auth0_server_python.error import DomainResolverError +from auth0_server_python.error import DomainResolverError, OrganizationTokenValidationError class PKCE: @@ -293,3 +294,43 @@ def validate_resolved_domain_value(domain_value: Any) -> str: ) return domain_value + + +# ============================================================================= +# Claim Validation Utilities +# ============================================================================= + +def validate_org_claims(claims: dict, expected_org: str) -> None: + """ + Validate org_id or org_name in token claims against the requested organization. + + Uses expected_org prefix to determine which claim to check: + - 'org_' prefix → validate claims['org_id'] exact match (case-sensitive) + - no prefix → validate claims['org_name'] case-insensitive match (NFC-normalized) + + Raises: + OrganizationTokenValidationError: if the claim is missing, not a string, or mismatched. + """ + if expected_org.startswith("org_"): + actual = claims.get("org_id") + if not isinstance(actual, str): + raise OrganizationTokenValidationError( + "Organization Id (org_id) claim must be a string present in the ID token" + ) + if actual != expected_org: + raise OrganizationTokenValidationError( + "Organization Id (org_id) claim value mismatch in the ID token" + ) + else: + actual = claims.get("org_name") + if not isinstance(actual, str): + raise OrganizationTokenValidationError( + "Organization Name (org_name) claim must be a string present in the ID token" + ) + # NFC-normalize before comparison: the same visual character (e.g. é) can have + # multiple byte representations in Unicode. Normalizing both sides prevents + # false rejections without risk of false matches. + if unicodedata.normalize("NFC", actual).lower() != unicodedata.normalize("NFC", expected_org).lower(): + raise OrganizationTokenValidationError( + "Organization Name (org_name) claim value mismatch in the ID token" + )