Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ The SDK handles per-domain OIDC discovery, JWKS fetching, issuer validation, and

For more details and examples, see [examples/MultipleCustomDomains.md](examples/MultipleCustomDomains.md).

### 6. Session Expiry from the Upstream IdP

For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. If the asserted ceiling is already in the past at login, `complete_interactive_login()` raises a `SessionExpiredError` instead of persisting an already-expired session.

For more details and examples, see [examples/RetrievingData.md](examples/RetrievingData.md#session-expiry-from-the-upstream-idp).

## Feedback

### Contributing
Expand Down
59 changes: 59 additions & 0 deletions examples/RetrievingData.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,65 @@ access_token = await server_client.get_access_token(store_options=store_options)

Read more above in [Configuring the Store](./ConfigureStore.md).

## Session Expiry from the Upstream IdP

For enterprise connections, the upstream identity provider can impose a ceiling on how long the user's session may live. This ceiling is delivered to the SDK as a `session_expiry` claim (an absolute Unix timestamp, **in seconds**) on the ID token. The SDK reads this value at login, stores it with the session, and enforces it on every subsequent read.

### Emitting the claim

The `session_expiry` claim is set by a Post-Login Action on your tenant, and **must** be an absolute Unix timestamp in **seconds**, not milliseconds. For the canonical Action setup, see the [Auth0 documentation](https://auth0.com/docs) (will be adding the link to the session_expiry Action guide once published).

> [!WARNING]
> `session_expiry` is interpreted as **seconds** since the Unix epoch (per RFC 7519 `NumericDate`). A common mistake is emitting milliseconds (e.g. `getTime()` without `/ 1000`). The SDK rejects implausibly large values (anything at or above `10_000_000_000`, β‰ˆ year 2286) as malformed and treats them as **no ceiling**, so a milliseconds value will silently disable enforcement rather than expiring the session ~55,000 years from now. Always divide by 1000.
>
> Because the claim is authored by your Action (untrusted input), the SDK **fails open** on any malformed value β€” a non-numeric, zero, negative, boolean, or millisecond value is treated as "no ceiling" and login proceeds normally. Only a clean, future, seconds timestamp is enforced.

Once the ceiling is reached, the read methods behave as follows:

- `get_user()` returns `None`, as if no session exists.
- `get_session()` returns `None`, as if no session exists.
- `get_access_token()` raises an `AccessTokenError` with code `session_expired`.

`get_access_token_for_connection()` (Token Vault) is **not** gated by the session ceiling β€” connection tokens follow the upstream IdP's own `expires_in`, so they remain retrievable from cache even after the session ceiling has passed.

```python
from auth0_server_python.error import AccessTokenError, AccessTokenErrorCode

try:
access_token = await server_client.get_access_token(store_options=store_options)
except AccessTokenError as error:
if error.code == AccessTokenErrorCode.SESSION_EXPIRED:
# The upstream session ceiling has been reached; start a new login.
...
```

When the ceiling is reached, the SDK deletes the stored session before returning, so the next request starts clean.

If the upstream IdP asserts a ceiling that is already in the past at login time, `complete_interactive_login()` raises a `SessionExpiredError` rather than persisting an already-expired session:

```python
from auth0_server_python.error import SessionExpiredError

try:
await server_client.complete_interactive_login(url, store_options=store_options)
except SessionExpiredError:
# The session was already past its ceiling on arrival; start a new login.
...
```

> [!NOTE]
> **Upgrading:** with this feature enabled, `get_user()` and `get_session()` can return `None` for a user who was previously logged in, once the upstream ceiling passes. Applications that assumed these always return a value after login should add a null check and route the user back through login.

The `session_expiry` value is also surfaced through the user claims, so you can read it without triggering enforcement:

```python
user = await server_client.get_user(store_options=store_options)
session_expires_at = (user or {}).get("session_expiry")
```

> [!NOTE]
> Enforcement applies a small negative leeway (30 seconds) to account for clock skew, so a session is treated as expired slightly before the exact `session_expiry` timestamp. The refresh-token grant preserves the original ceiling - refreshing an access token does not extend the upstream session.

## Multi-Resource Refresh Tokens (MRRT)

Multi-Resource Refresh Tokens allow using a single refresh token to obtain access tokens for multiple audiences, simplifying token management in applications that interact with multiple backend services.
Expand Down
47 changes: 46 additions & 1 deletion src/auth0_server_python/auth_server/server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
MissingTransactionError,
OrganizationTokenValidationError,
PollingApiError,
SessionExpiredError,
StartLinkUserError,
)
from auth0_server_python.telemetry import Telemetry
Expand Down Expand Up @@ -656,7 +657,12 @@ async def complete_interactive_login(
# Use the userinfo field from the token_response for user claims
user_info = token_response.get("userinfo")
user_claims = None
# IPSIE session_expiry ceiling, read from the verified ID token claims.
session_expires_at = None
# ID token `iat`, used to detect a ceiling that is already past at login.
issued_at = None
id_token = token_response.get("id_token")

expected_org = transaction_data.organization

if not user_info and not id_token and expected_org:
Expand Down Expand Up @@ -698,6 +704,8 @@ async def complete_interactive_login(
validate_org_claims(claims, expected_org)

user_claims = UserClaims.parse_obj(claims)
session_expires_at = user_claims.session_expiry
issued_at = claims.get("iat")
except ValueError as e:
raise ApiError("jwks_key_not_found", str(e))
except jwt.InvalidSignatureError as e:
Expand Down Expand Up @@ -726,6 +734,11 @@ async def complete_interactive_login(
)


# Refuse to persist a session whose ceiling is already in the past.
if State.is_session_ceiling_in_past(session_expires_at, issued_at):
await self._transaction_store.delete(transaction_identifier, options=store_options)
raise SessionExpiredError()

# Build a token set using the token response data
token_set = TokenSet(
audience=transaction_data.audience or self.DEFAULT_AUDIENCE_STATE_KEY,
Expand All @@ -749,7 +762,8 @@ async def complete_interactive_login(
domain=origin_domain,
internal={
"sid": sid,
"created_at": int(time.time())
"created_at": int(time.time()),
"session_expires_at": session_expires_at
}
)

Expand All @@ -775,6 +789,23 @@ async def complete_interactive_login(
# Methods for retrieving user information, session data, and logout operations.
# ============================================================================

async def _is_session_expired_by_ceiling(
self, state_data_dict: dict, store_options: Optional[dict[str, Any]] = None
) -> bool:
"""
Enforce the IPSIE session_expiry ceiling on a session read.

Returns True (and deletes the stored session) when the upstream
IdP-asserted ceiling has been reached. Sessions without a
session_expires_at value are never expired on this basis.
"""
internal = state_data_dict.get("internal") or {}
session_expires_at = internal.get("session_expires_at")
if State.is_session_ceiling_reached(session_expires_at):
await self._state_store.delete(self._state_identifier, options=store_options)
return True
return False

async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Optional[dict[str, Any]]:
"""
Retrieves the user from the store, or None if no user found.
Expand All @@ -801,6 +832,10 @@ async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Opti
if self._normalize_url(session_domain) != self._normalize_url(current_domain):
return None

# IPSIE: force re-auth once the upstream IdP session ceiling passes.
if await self._is_session_expired_by_ceiling(state_data, store_options):
return None

return state_data.get("user")
return None

Expand Down Expand Up @@ -830,6 +865,10 @@ async def get_session(self, store_options: Optional[dict[str, Any]] = None) -> O
if self._normalize_url(session_domain) != self._normalize_url(current_domain):
return None

# IPSIE: force re-auth once the upstream IdP session ceiling passes.
if await self._is_session_expired_by_ceiling(state_data, store_options):
return None

session_data = {k: v for k, v in state_data.items()
if k != "internal"}
return session_data
Expand Down Expand Up @@ -1013,6 +1052,12 @@ async def get_access_token(

merged_scope = self._merge_scope_with_defaults(scope, audience)

# Once the session ceiling has passed, fail instead of serving or refreshing a token.
internal = (state_data_dict or {}).get("internal") or {}
if State.is_session_ceiling_reached(internal.get("session_expires_at")):
await self._state_store.delete(self._state_identifier, options=store_options)
raise SessionExpiredError()

# Find matching token set
token_set = None
if state_data_dict and "token_sets" in state_data_dict:
Expand Down
20 changes: 19 additions & 1 deletion src/auth0_server_python/auth_types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

from typing import Any, Literal, Optional, Union

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator

# Upper bound (Unix seconds) for a plausible session_expiry
SESSION_EXPIRY_MAX_PLAUSIBLE = 10_000_000_000


class UserClaims(BaseModel):
Expand All @@ -23,10 +26,21 @@ class UserClaims(BaseModel):
email_verified: Optional[bool] = None
org_id: Optional[str] = None
org_name: Optional[str] = None
# IPSIE SL1 claim: upstream IdP-asserted RP session ceiling (Unix seconds).
session_expiry: Optional[int] = None

class Config:
extra = "allow" # Allow additional fields not defined in the model

@field_validator('session_expiry', mode='before')
@classmethod
def _sanitize_session_expiry(cls, value: Any) -> Optional[int]:
if isinstance(value, bool) or not isinstance(value, int):
return None
if value <= 0 or value >= SESSION_EXPIRY_MAX_PLAUSIBLE:
return None
return value


class TokenSet(BaseModel):
"""
Expand Down Expand Up @@ -55,6 +69,10 @@ class InternalStateData(BaseModel):
"""
sid: str
created_at: int
# IPSIE session_expiry ceiling (Unix seconds), stamped at session creation
# from the ID token's session_expiry claim. None when the upstream IdP did
# not assert one β€” in which case existing session behavior is unchanged.
session_expires_at: Optional[int] = None


class SessionData(BaseModel):
Expand Down
14 changes: 14 additions & 0 deletions src/auth0_server_python/error/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class AccessTokenErrorCode:
INCORRECT_AUDIENCE = "incorrect_audience"
MISSING_SESSION_DOMAIN = "missing_session_domain"
DOMAIN_MISMATCH = "domain_mismatch"
SESSION_EXPIRED = "session_expired"


class OrganizationTokenValidationError(Auth0Error):
Expand All @@ -222,6 +223,19 @@ class AccessTokenForConnectionErrorCode:
DOMAIN_MISMATCH = "domain_mismatch"


class SessionExpiredError(Auth0Error):
"""
Error raised when a session is rejected at login because its
session_expiry ceiling is already in the past.
"""
code = AccessTokenErrorCode.SESSION_EXPIRED

def __init__(self, message: Optional[str] = None, cause=None):
super().__init__(message or "The session has expired and the user must re-authenticate.")
self.name = "SessionExpiredError"
self.cause = cause


class CustomTokenExchangeError(Auth0Error):
"""
Error raised during custom token exchange operations.
Expand Down
Loading
Loading