-
Notifications
You must be signed in to change notification settings - Fork 3
PoC: enforce upstream IdP session_expiry ceiling #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
aaad5b1
ec4c45b
a1c5d5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,6 +55,7 @@ | |
| MissingRequiredArgumentError, | ||
| MissingTransactionError, | ||
| PollingApiError, | ||
| SessionExpiredError, | ||
| StartLinkUserError, | ||
| ) | ||
| from auth0_server_python.telemetry import Telemetry | ||
|
|
@@ -638,9 +639,17 @@ async def complete_interactive_login( | |
| user_info = token_response.get("userinfo") | ||
| user_claims = None | ||
| id_token = token_response.get("id_token") | ||
| # IPSIE session_expiry ceiling, read from the verified ID token claims. | ||
| session_expires_at = None | ||
| # ID token `iat`, used to detect a ceiling that is already past at login. | ||
| issued_at = None | ||
|
|
||
| if user_info: | ||
| user_claims = UserClaims.parse_obj(user_info) | ||
| # authlib populates `userinfo` from parsed ID token claims, so the | ||
| # IPSIE session_expiry claim may surface here. | ||
| session_expires_at = user_info.get("session_expiry") | ||
| issued_at = user_info.get("iat") | ||
| elif id_token: | ||
| # Fetch JWKS for signature verification | ||
| jwks = await self._get_jwks_cached(origin_domain, metadata) | ||
|
|
@@ -657,6 +666,9 @@ async def complete_interactive_login( | |
| raise IssuerValidationError("ID token issuer mismatch. Ensure your Auth0 domain is configured correctly.") | ||
|
|
||
| user_claims = UserClaims.parse_obj(claims) | ||
| # IPSIE session_expiry ceiling from the verified ID token. | ||
| session_expires_at = claims.get("session_expiry") | ||
| issued_at = claims.get("iat") | ||
| except ValueError as e: | ||
| raise ApiError("jwks_key_not_found", str(e)) | ||
| except jwt.InvalidSignatureError as e: | ||
|
|
@@ -685,6 +697,10 @@ async def complete_interactive_login( | |
| ) | ||
|
|
||
|
|
||
| # Refuse to persist a session whose ceiling is already in the past. | ||
| if State.is_session_ceiling_in_past(session_expires_at, issued_at): | ||
| raise SessionExpiredError() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we raise here we skip the transaction cleanup that happens later in the method, so the transaction record is left behind. The auth code has already been spent at |
||
|
|
||
| # Build a token set using the token response data | ||
| token_set = TokenSet( | ||
| audience=transaction_data.audience or self.DEFAULT_AUDIENCE_STATE_KEY, | ||
|
|
@@ -708,7 +724,8 @@ async def complete_interactive_login( | |
| domain=origin_domain, | ||
| internal={ | ||
| "sid": sid, | ||
| "created_at": int(time.time()) | ||
| "created_at": int(time.time()), | ||
| "session_expires_at": session_expires_at | ||
| } | ||
| ) | ||
|
|
||
|
|
@@ -734,6 +751,23 @@ async def complete_interactive_login( | |
| # Methods for retrieving user information, session data, and logout operations. | ||
| # ============================================================================ | ||
|
|
||
| async def _is_session_expired_by_ceiling( | ||
| self, state_data_dict: dict, store_options: Optional[dict[str, Any]] = None | ||
| ) -> bool: | ||
| """ | ||
| Enforce the IPSIE session_expiry ceiling on a session read. | ||
|
|
||
| Returns True (and deletes the stored session) when the upstream | ||
| IdP-asserted ceiling has been reached. Sessions without a | ||
| session_expires_at value are never expired on this basis. | ||
| """ | ||
| internal = state_data_dict.get("internal") or {} | ||
| session_expires_at = internal.get("session_expires_at") | ||
| if State.is_session_ceiling_reached(session_expires_at): | ||
| await self._state_store.delete(self._state_identifier, options=store_options) | ||
| return True | ||
| return False | ||
|
|
||
| async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Optional[dict[str, Any]]: | ||
| """ | ||
| Retrieves the user from the store, or None if no user found. | ||
|
|
@@ -760,6 +794,10 @@ async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Opti | |
| if self._normalize_url(session_domain) != self._normalize_url(current_domain): | ||
| return None | ||
|
|
||
| # IPSIE: force re-auth once the upstream IdP session ceiling passes. | ||
| if await self._is_session_expired_by_ceiling(state_data, store_options): | ||
| return None | ||
|
|
||
| return state_data.get("user") | ||
| return None | ||
|
|
||
|
|
@@ -789,6 +827,10 @@ async def get_session(self, store_options: Optional[dict[str, Any]] = None) -> O | |
| if self._normalize_url(session_domain) != self._normalize_url(current_domain): | ||
| return None | ||
|
|
||
| # IPSIE: force re-auth once the upstream IdP session ceiling passes. | ||
| if await self._is_session_expired_by_ceiling(state_data, store_options): | ||
| return None | ||
|
|
||
| session_data = {k: v for k, v in state_data.items() | ||
| if k != "internal"} | ||
| return session_data | ||
|
|
@@ -972,6 +1014,15 @@ async def get_access_token( | |
|
|
||
| merged_scope = self._merge_scope_with_defaults(scope, audience) | ||
|
|
||
| # Once the session ceiling has passed, fail instead of serving or refreshing a token. | ||
| internal = (state_data_dict or {}).get("internal") or {} | ||
| if State.is_session_ceiling_reached(internal.get("session_expires_at")): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same ceiling check plus session delete that |
||
| await self._state_store.delete(self._state_identifier, options=store_options) | ||
| raise AccessTokenError( | ||
| AccessTokenErrorCode.SESSION_EXPIRED, | ||
| "The session has expired and the user must re-authenticate." | ||
| ) | ||
|
|
||
| # Find matching token set | ||
| token_set = None | ||
| if state_data_dict and "token_sets" in state_data_dict: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to make sure we are reading
session_expiryfrom a trusted source here. In theid_tokenbranch below we verify the signature and run the normalized issuer check before we trust any claim. On thisuserinfobranch we readsession_expiryandiatstraight off the dict without either of those checks. Since session_expiry is the thing that caps the session, trusting it from an unverified source feels risky. The comment saysauthlibfillsuserinfofrom the parsed ID token, so doesauthlibactually verify that token's signature for us in every flow we support ? If we are not sure, I would rather pullsession_expiryfrom the verified claims path so both branches are equally safe.