Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ RUN pip install --no-cache-dir \
opentelemetry-instrumentation-httpx==0.50b0 \
opentelemetry-instrumentation-redis==0.50b0 \
python-magic==0.4.27 \
pyotp==2.9.0 \
twilio==9.10.5 \
"audioop-lts; python_version >= '3.13'"

Expand Down
1 change: 1 addition & 0 deletions docs/memory/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ Open-core seam: enterprise backend code lives in the private `trinity-enterprise
| `audit` | (#941) | Entitlement only — flips the OSS audit-log dashboard route visible; `/api/audit-log/*` stays OSS |
| `user_management` | `enterprise/backend/user_management/` (#995) | Org lifecycle: invite (whitelist + email), deactivate/reactivate (over the OSS `users.suspended_at` primitive), per-user activity view (reads OSS `audit_log`). `/api/enterprise/user-management/*`; Settings → User Management UI |
| `siem` | `enterprise/backend/siem/` (#997) | SIEM log export — ships OSS `audit_log` to a customer SIEM over HTTP/JSON webhook. Private `enterprise_siem_config` (destination + AES-encrypted token + export cursor); Redis-lock-serialised background pusher; at-least-once (cursor advances only on successful POST). `/api/enterprise/siem/*`; no OSS/UI surface |
| `2fa` | `enterprise/backend/two_factor/` (#5) | Two-factor auth via **TOTP** (RFC 6238; Google Authenticator is one compatible app — chosen over Google-OIDC to avoid a runtime IdP dependency). Private `enterprise_user_2fa` (AES-256-GCM secret + monotonic `last_used_step` replay guard), `enterprise_2fa_recovery_codes` (single-use, SHA-256-hashed), `enterprise_2fa_config` (per-role policy). `/api/enterprise/2fa/*` — self-service enroll/confirm/disable/recovery + admin policy + the `login/*` challenge-completion endpoints. Login seam is edition-agnostic: `services/mfa_gate.py` (no provider in OSS → no-op, fail-open) + `create_mfa_challenge_token`/`decode_mfa_challenge` in `dependencies.py`; on a required second factor the OSS auth routers (`/token`, `/api/auth/email/verify`) return a short-lived challenge token instead of an access token. Settings → Security UI |

---

Expand Down
51 changes: 51 additions & 0 deletions src/backend/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,47 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None, m
return encoded_jwt


# Scope marker for the short-lived token issued between password/email
# verification and second-factor completion (enterprise 2FA, #5). A token
# carrying this scope is NOT a valid access token — it only authorizes the
# /api/enterprise/2fa/login/* endpoints.
MFA_CHALLENGE_SCOPE = "mfa_challenge"
MFA_CHALLENGE_EXPIRE_MINUTES = 5


def create_mfa_challenge_token(username: str, mode: str = "prod") -> str:
"""Mint a short-lived challenge token binding a half-authenticated session
to its eventual login ``mode``. Generic (OSS) — the enterprise module
decides *whether* to require it; this only encodes it. The carried ``mode``
is replayed into the final access token so admin/email tokens keep their
original mode after the second factor."""
return create_access_token(
data={"sub": username, "scope": MFA_CHALLENGE_SCOPE},
expires_delta=timedelta(minutes=MFA_CHALLENGE_EXPIRE_MINUTES),
mode=mode,
)


def decode_mfa_challenge(token: str) -> Optional[dict]:
"""Validate a challenge token. Returns ``{"username", "mode"}`` if the
token is a non-expired challenge token for an existing, non-suspended
user; ``None`` otherwise. Used by the enterprise 2FA login endpoints to
resolve the half-authenticated identity before minting the real token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return None
if payload.get("scope") != MFA_CHALLENGE_SCOPE:
return None
username = payload.get("sub")
if not username:
return None
user = db.get_user_by_username(username)
if not user or user.get("suspended_at"):
return None
return {"username": username, "mode": payload.get("mode", "prod")}


def decode_token(token: str) -> Optional[dict]:
"""
Decode a JWT token without FastAPI dependency.
Expand All @@ -82,6 +123,10 @@ def decode_token(token: str) -> Optional[dict]:
if not username:
return None

# #5 — a half-authenticated 2FA challenge token is not a session token.
if payload.get("scope") == MFA_CHALLENGE_SCOPE:
return None

# Get full user record from database
user = db.get_user_by_username(username)
if not user:
Expand Down Expand Up @@ -117,6 +162,12 @@ async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)
if username is None:
raise credentials_exception

# #5 — reject a 2FA challenge token used as a session token. It only
# authorizes /api/enterprise/2fa/login/*; the second factor must be
# completed there to obtain a real access token.
if payload.get("scope") == MFA_CHALLENGE_SCOPE:
raise credentials_exception

user = db.get_user_by_username(username)
if user is None:
raise credentials_exception
Expand Down
17 changes: 14 additions & 3 deletions src/backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,20 @@ class User(BaseModel):


class Token(BaseModel):
"""JWT token response."""
access_token: str
token_type: str
"""JWT token response.

Normally carries ``access_token``. When enterprise 2FA (#5) requires a
second factor, the login endpoint instead returns ``mfa_required`` +
``challenge_token`` and no ``access_token`` — the client completes the
flow at ``/api/enterprise/2fa/login/*`` to obtain the real token. The 2FA
fields are always absent in OSS-only builds.
"""
access_token: Optional[str] = None
token_type: str = "bearer"
mfa_required: Optional[bool] = None
mfa_enrolled: Optional[bool] = None
enrollment_required: Optional[bool] = None
challenge_token: Optional[str] = None


class ChatMessageRequest(BaseModel):
Expand Down
39 changes: 39 additions & 0 deletions src/backend/routers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,26 @@ async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends
# Update last login timestamp
db.update_last_login(user["username"])

# #5 — enterprise 2FA gate. Password is the first factor; if a second
# factor is required (user enrolled OR policy mandates it for the role)
# return a challenge instead of an access token. OSS-only builds have no
# provider registered → returns None → unchanged behaviour.
from services import mfa_gate
challenge = mfa_gate.gate_login(user, mode="admin")
if challenge:
await platform_audit_service.log(
event_type=AuditEventType.AUTHENTICATION,
event_action="mfa_challenge_issued",
source="api",
actor_ip=client_ip,
target_type="user",
target_id=user["username"],
endpoint=str(request.url.path),
request_id=getattr(request.state, "request_id", None),
details={"method": "admin"},
)
return challenge

access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]},
Expand Down Expand Up @@ -506,6 +526,25 @@ async def verify_email_login_code(request: Request):
# Update last login
db.update_last_login(user["username"])

# #5 — enterprise 2FA gate. The verified email code is the first factor;
# if a second factor is required, return a challenge instead of a token.
# OSS-only builds have no provider → returns None → unchanged behaviour.
from services import mfa_gate
challenge = mfa_gate.gate_login(user, mode="email")
if challenge:
await platform_audit_service.log(
event_type=AuditEventType.AUTHENTICATION,
event_action="mfa_challenge_issued",
source="api",
actor_ip=client_ip,
target_type="user",
target_id=user["username"],
endpoint=str(request.url.path),
request_id=getattr(request.state, "request_id", None),
details={"method": "email", "email": email},
)
return challenge

# Create JWT token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
Expand Down
90 changes: 90 additions & 0 deletions src/backend/services/mfa_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""MFA gate — the OSS seam the enterprise 2FA module (#5) plugs into.

The login path (``routers/auth.py``) must stay edition-agnostic: OSS-only
builds have no second factor and behave exactly as before. This module is the
single hook point. An enterprise module registers a *provider* at startup; the
auth routers call :func:`gate_login` after primary credentials are verified.

* OSS-only build → no provider registered → :func:`gate_login` returns ``None``
→ the router issues the JWT normally. Zero behavioural change.
* Enterprise build with 2FA entitled → the provider decides whether this user
must complete a second factor; if so, :func:`gate_login` returns a *challenge
response* (a short-lived challenge token + flags) and the router returns that
instead of an access token. The frontend then completes the flow against
``/api/enterprise/2fa/login/*``, which mints the real access token.

The provider holds all the IP (TOTP verify, policy). This module only knows
the *protocol*:

provider.gate_decision(user: dict) -> {"enrolled": bool, "required": bool}

``user`` is the OSS user row dict (``id``, ``username``, ``role``, ``email``).
"""
from __future__ import annotations

import logging
from typing import Any, Optional, Protocol

logger = logging.getLogger(__name__)


class MfaProvider(Protocol):
def gate_decision(self, user: dict) -> dict: # {"enrolled": bool, "required": bool}
...


_provider: Optional[MfaProvider] = None


def register_provider(provider: MfaProvider) -> None:
"""Register the enterprise MFA provider. Idempotent (last wins)."""
global _provider
_provider = provider
logger.info("[mfa_gate] provider registered: %s", type(provider).__name__)


def get_provider() -> Optional[MfaProvider]:
return _provider


def clear_provider() -> None:
"""Drop the provider — used by tests to restore the OSS no-op path."""
global _provider
_provider = None


def gate_login(user: dict, mode: str) -> Optional[dict[str, Any]]:
"""Decide whether ``user`` must complete a second factor before a token
is issued. Returns ``None`` to proceed normally, or a challenge response.

Fail-open by design (consistent with Trinity's availability bias for
cross-cutting gates): if the provider errors we log and let the password
factor stand rather than locking everyone out. This is a deliberate
tradeoff documented for #5 — a hard fail-closed would turn any 2FA bug
into a full-platform login outage.
"""
provider = _provider
if provider is None:
return None # OSS-only build — no second factor
try:
decision = provider.gate_decision(user) or {}
except Exception: # noqa: BLE001 — never let a 2FA bug block all logins
logger.exception("[mfa_gate] provider.gate_decision failed; failing open")
return None

enrolled = bool(decision.get("enrolled"))
required = bool(decision.get("required"))
if not enrolled and not required:
return None # user has no 2FA and policy doesn't force it

# Late import: keeps this module importable without the full auth stack
# (e.g. isolated unit tests of the registry).
from dependencies import create_mfa_challenge_token

challenge = create_mfa_challenge_token(user["username"], mode)
return {
"mfa_required": True,
"mfa_enrolled": enrolled,
"enrollment_required": required and not enrolled,
"challenge_token": challenge,
}
1 change: 1 addition & 0 deletions src/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"mermaid": "^11.15.0",
"monaco-editor": "^0.55.1",
"pinia": "^3.0.4",
"qrcode": "^1.5.4",
"uplot": "^1.6.32",
"vue": "^3.5.35",
"vue-chartjs": "^5.3.2",
Expand Down
50 changes: 50 additions & 0 deletions src/frontend/src/components/QrCode.vue
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<template>
<div class="flex flex-col items-center">
<img
v-if="dataUrl"
:src="dataUrl"
:alt="alt"
class="rounded-lg border border-gray-200 dark:border-gray-700 bg-white p-2"
width="200"
height="200"
/>
<div
v-else
class="w-[200px] h-[200px] flex items-center justify-center rounded-lg border border-dashed border-gray-300 dark:border-gray-600 text-xs text-gray-500 dark:text-gray-400 text-center px-3"
>
{{ error ? 'QR unavailable — use the manual code below' : 'Generating QR…' }}
</div>
</div>
</template>

<script setup>
// Renders a QR for a TOTP otpauth:// URI (#5). The `qrcode` package is
// imported dynamically and best-effort: if it isn't installed yet the
// component degrades to a hint and the caller's manual-entry secret carries
// the enrollment. The secret never leaves the browser — no external QR service.
import { ref, watch, onMounted } from 'vue'

const props = defineProps({
value: { type: String, required: true },
alt: { type: String, default: 'Authenticator QR code' },
})

const dataUrl = ref('')
const error = ref(false)

const render = async () => {
dataUrl.value = ''
error.value = false
if (!props.value) return
try {
const QR = await import('qrcode')
dataUrl.value = await (QR.default || QR).toDataURL(props.value, { width: 200, margin: 1 })
} catch (e) {
console.warn('[QrCode] render failed:', e?.message || e)
error.value = true
}
}

onMounted(render)
watch(() => props.value, render)
</script>
Loading
Loading