diff --git a/docs/jwt_middleware.md b/docs/jwt_middleware.md new file mode 100644 index 0000000..bca9ed9 --- /dev/null +++ b/docs/jwt_middleware.md @@ -0,0 +1,321 @@ +# JWT Middleware & Passive Authentication + +This documentation covers the JWT middleware system and passive authentication classes for Django applications. + +## Overview + +The JWT middleware system provides stateless token authentication for Django applications using `ninja-jwt`. It supports: + +- **Header-based JWT authentication** (`Authorization: Bearer `) +- **Cookie-based JWT authentication** with CSRF protection +- **Development-only basic auth** (no password required) +- **Passive authentication** for Ninja API views + +## Middleware Classes + +### JWTAuthBaseMiddleware + +Abstract base class for JWT authentication middleware. + +```python +from oxutils.jwt.middleware import JWTAuthBaseMiddleware +``` + +**Features:** +- Stateless authentication (no database lookup) +- Token validation using `ninja-jwt` +- Automatic user assignment to `request.user` +- Structured logging with `structlog` +- Skips authentication if user is already authenticated (middleware chaining) + +**Abstract Methods:** +- `get_token_from_request(request)` - Must be implemented by subclasses + +### JWTHeaderAuthMiddleware + +Extracts and validates JWT tokens from the `Authorization` header. + +```python +# settings.py +MIDDLEWARE = [ + # ... other middlewares + 'oxutils.jwt.middleware.JWTHeaderAuthMiddleware', +] +``` + +**Configuration:** +- `openapi_scheme`: `"bearer"` (default) +- `header`: `"Authorization"` (default) + +**Usage:** +```http +Authorization: Bearer +``` + +**Validation:** +- Checks for `Bearer` scheme (case-insensitive) +- Validates JWT format (3 parts separated by dots) +- Returns `None` if header is missing or invalid + +### JWTCookieAuthMiddleware + +Extracts and validates JWT tokens from cookies with CSRF protection. + +```python +# settings.py +MIDDLEWARE = [ + # ... other middlewares + 'oxutils.jwt.middleware.JWTCookieAuthMiddleware', +] +``` + +**Configuration:** +- `param_name`: Cookie name (default: `access_token` from `ACCESS_TOKEN_COOKIE` constant) + +**Features:** +- CSRF validation via `check_csrf()` +- Raises `PermissionDenied` if CSRF check fails +- Sets `AnonymousUser` on CSRF failure (does not block request) + +**Cookie Structure:** +```python +# Cookie name: "access_token" (configurable) +# Value: +``` + +### BasicNoPasswordAuthMiddleware + +⚠️ **DEVELOPMENT ONLY** - Basic authentication without password verification. + +```python +# settings.py - DEBUG mode only! +MIDDLEWARE = [ + # ... other middlewares + 'oxutils.jwt.middleware.BasicNoPasswordAuthMiddleware', +] +``` + +**Security:** +- Automatically disabled when `settings.DEBUG = False` +- Logs warning on initialization in DEBUG mode +- Requires only username/email (password is ignored) + +**Usage:** +```http +Authorization: Basic base64(username:) +# or +Authorization: Basic base64(username:anything) +``` + +**Example:** +```bash +# For user "admin@example.com" +# Base64 encode: "admin@example.com:" +echo -n "admin@example.com:" | base64 +# Output: YWRtaW5AZXhhbXBsZS5jb206 + +# Request +curl -H "Authorization: Basic YWRtaW5AZXhhbXBsZS5jb206" http://localhost:8000/api/ +``` + +## Middleware Chaining + +Multiple JWT middlewares can be chained together. The first successful authentication sets `request.user`, and subsequent middlewares skip authentication. + +```python +# settings.py +MIDDLEWARE = [ + # ... other middlewares + 'oxutils.jwt.middleware.JWTHeaderAuthMiddleware', # Try header first + 'oxutils.jwt.middleware.JWTCookieAuthMiddleware', # Fallback to cookie + 'oxutils.jwt.middleware.BasicNoPasswordAuthMiddleware', # Dev fallback +] +``` + +**Behavior:** +1. If `JWTHeaderAuthMiddleware` authenticates → `request.user` is set +2. `JWTCookieAuthMiddleware` sees `is_authenticated=True` → skips +3. `BasicNoPasswordAuthMiddleware` sees `is_authenticated=True` → skips + +## Passive Authentication Classes + +For Ninja API views that need to accept authentication but don't require it (optional auth). + +### JWTPassiveAuth + +Checks if user was already authenticated by middleware, no active token validation. + +```python +from ninja import Router +from oxutils.jwt.auth import jwt_passive_auth + +router = Router() + +@router.get("/profile", auth=jwt_passive_auth) +def get_profile(request): + if request.user.is_authenticated: + return {"user": request.user.email} + return {"user": None} +``` + +### JWTCookiePassiveAuth + +Cookie-based passive authentication. + +```python +from ninja import Router +from oxutils.jwt.auth import jwt_cookie_passive_auth + +router = Router() + +@router.get("/dashboard", auth=jwt_cookie_passive_auth) +def get_dashboard(request): + # User may or may not be authenticated + pass +``` + +## Auth Handler Functions + +### get_passive_auth_handlers() + +Returns appropriate auth handlers based on `settings.DEBUG`. + +```python +from oxutils.jwt.auth import get_passive_auth_handlers +from ninja import Router + +router = Router() + +@router.get("/api/data", auth=get_passive_auth_handlers()) +def get_data(request): + """ + In DEBUG: No auth required (empty list) + In PRODUCTION: Passive auth handlers + """ + pass +``` + +**Behavior:** +- `DEBUG=True`: Returns provided `auths` (default: empty list - no auth) +- `DEBUG=False`: Returns `[jwt_passive_auth, jwt_cookie_passive_auth]` + +### get_auth_handlers() + +Returns active auth handlers for protected endpoints. + +```python +from oxutils.jwt.auth import get_auth_handlers + +@router.get("/api/protected", auth=get_auth_handlers()) +def protected_endpoint(request): + """ + In DEBUG: Basic auth without password + In PRODUCTION: JWT header and cookie auth + """ + pass +``` + +## Complete Configuration Example + +```python +# settings.py + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + + # JWT Middleware (order matters) + 'oxutils.jwt.middleware.JWTHeaderAuthMiddleware', + 'oxutils.jwt.middleware.JWTCookieAuthMiddleware', + + # Dev-only (safe - auto-disabled in production) + 'oxutils.jwt.middleware.BasicNoPasswordAuthMiddleware', +] + +# Ninja JWT settings +NINJA_JWT = { + 'ACCESS_TOKEN_LIFETIME': timedelta(minutes=30), + 'REFRESH_TOKEN_LIFETIME': timedelta(days=7), + 'AUTH_HEADER_TYPES': ('Bearer',), + 'AUTH_COOKIE': 'access_token', +} +``` + +## API Usage Examples + +### Protected Endpoint (Required Auth) + +```python +from ninja import Router +from oxutils.jwt.auth import get_auth_handlers + +router = Router() + +@router.get("/users", auth=get_auth_handlers()) +def list_users(request): + """ + Requires valid JWT token in header or cookie. + """ + return {"users": []} +``` + +### Optional Auth Endpoint + +```python +from ninja import Router +from oxutils.jwt.auth import get_passive_auth_handlers + +router = Router() + +@router.get("/public", auth=get_passive_auth_handlers()) +def public_data(request): + """ + Works with or without authentication. + """ + if request.user.is_authenticated: + return {"message": f"Hello {request.user.email}"} + return {"message": "Hello anonymous"} +``` + +### Manual Middleware Usage + +```python +from django.http import JsonResponse + +def my_view(request): + # Middleware has already set request.user + if request.user.is_authenticated: + return JsonResponse({"email": request.user.email}) + return JsonResponse({"error": "Not authenticated"}, status=401) +``` + +## Logging + +All middleware uses `structlog` for structured logging: + +```python +# Successful authentication +logger.info("dev_auth_success", user_id=1, username="user@example.com") + +# Failed authentication +logger.debug("jwt_validation_failed", path="/api/users") + +# Security warnings +logger.warning("csrf_check_failed", path="/api/data") +logger.warning("insecure_middleware_loaded") # Dev middleware +``` + +## Testing + +See `/tests/oxiliere/test_middleware.py` for comprehensive test examples. + +## Security Considerations + +1. **Always use HTTPS** in production for cookie-based auth +2. **CSRF protection** is mandatory for cookie auth +3. **Remove `BasicNoPasswordAuthMiddleware`** before production (it auto-disables but best to remove) +4. **Token expiration** is handled by `ninja-jwt` validation +5. **No database lookup** - middleware uses stateless `TokenUser` \ No newline at end of file diff --git a/docs/jwt_security.md b/docs/jwt_security.md new file mode 100644 index 0000000..675c73c --- /dev/null +++ b/docs/jwt_security.md @@ -0,0 +1,335 @@ +# JWT Security Best Practices + +## Overview + +This document outlines the security features and best practices for using JWT authentication in OxUtils. + +## Security Features + +### 1. Token Blacklisting + +Tokens can be revoked before expiration using the JTI (JWT ID) claim: + +```python +from oxutils.jwt.utils import blacklist_token +from ninja_jwt.tokens import AccessToken + +# Blacklist a token (e.g., on logout) +token = AccessToken(raw_token) +blacklist_token(token, reason="user_logout") +``` + +**How it works:** +- Tokens must include a `jti` claim (JWT ID) +- Blacklisted tokens are stored in cache with TTL matching token expiration +- Middleware automatically checks blacklist on each request + +### 2. Rate Limiting + +Built-in rate limiting protects against brute force attacks: + +- **Limit**: 10 failed authentication attempts per minute per IP +- **Scope**: Per IP address (supports proxy headers) +- **Reset**: Counter resets on successful authentication + +**Configuration:** + +Rate limiting is automatic. To customize: + +```python +# In your middleware subclass +class CustomJWTMiddleware(JWTAuthBaseMiddleware): + def process_request(self, request): + # Customize rate limit + rate_limit_key = f'jwt_auth_attempts:{ip_address}' + max_attempts = 5 # Custom limit + window = 300 # 5 minutes +``` + +### 3. Token Validation + +Multiple layers of validation: + +1. **Format validation**: JWT must have 3 parts (header.payload.signature) +2. **Signature verification**: Cryptographic signature check +3. **Expiration check**: Token must not be expired +4. **Blacklist check**: Token must not be revoked +5. **User ID validation**: + - Must be present in token + - Must be int or string + - String max length: 255 characters +6. **User status check**: User must be active (for DB-backed auth) + +### 4. Secure Logging + +Security events are logged without exposing sensitive data: + +```python +# ✅ Good - logs IP and error type +logger.warning( + f"Invalid JWT token from {ip_address}: InvalidToken", + extra={'path': request.path, 'ip': ip_address} +) + +# ❌ Bad - never log the actual token +logger.error(f"Invalid token: {token}") # NEVER DO THIS +``` + +**Logged events:** +- Invalid token format +- Authentication failures +- Rate limit exceeded +- Blacklisted token attempts + +### 5. IP Address Detection + +Supports proxy headers for accurate IP detection: + +```python +# Checks in order: +# 1. HTTP_X_FORWARDED_FOR (first IP in chain) +# 2. REMOTE_ADDR +``` + +## Middleware Options + +### JWTHeaderAuthMiddleware (Recommended) + +Stateless authentication from Authorization header: + +```python +# settings.py +MIDDLEWARE = [ + # ... + 'oxutils.jwt.middleware.JWTHeaderAuthMiddleware', + # ... +] +``` + +**Features:** +- Stateless (no DB lookup) +- Rate limiting +- Token blacklisting +- Secure logging + +### JWTCookieAuthMiddleware + +Stateless authentication from cookies: + +```python +# settings.py +MIDDLEWARE = [ + # ... + 'oxutils.jwt.middleware.JWTCookieAuthMiddleware', + # ... +] +``` + +**Cookie requirements:** +```python +# settings.py +SESSION_COOKIE_SECURE = True # HTTPS only +SESSION_COOKIE_HTTPONLY = True # No JavaScript access +SESSION_COOKIE_SAMESITE = 'Strict' # CSRF protection +``` + +## Configuration + +### Required Settings + +```python +# settings.py +from datetime import timedelta + +# JWT Settings +SIMPLE_JWT = { + 'ACCESS_TOKEN_LIFETIME': timedelta(minutes=15), + 'REFRESH_TOKEN_LIFETIME': timedelta(days=1), + 'ROTATE_REFRESH_TOKENS': True, + 'BLACKLIST_AFTER_ROTATION': True, + 'UPDATE_LAST_LOGIN': False, + + # Security + 'ALGORITHM': 'RS256', # Use asymmetric encryption + 'SIGNING_KEY': settings.SECRET_KEY, + 'VERIFYING_KEY': None, + + # Include JTI for blacklisting + 'JTI_CLAIM': 'jti', + + # User identification + 'USER_ID_FIELD': 'id', + 'USER_ID_CLAIM': 'user_id', +} + +# Cache (required for blacklisting and rate limiting) +CACHES = { + 'default': { + 'BACKEND': 'django_redis.cache.RedisCache', + 'LOCATION': 'redis://127.0.0.1:6379/1', + 'OPTIONS': { + 'CLIENT_CLASS': 'django_redis.client.DefaultClient', + } + } +} +``` + +### Cookie Configuration + +```python +# settings.py +from oxutils.constants import ACCESS_TOKEN_COOKIE + +# Cookie settings +SESSION_COOKIE_SECURE = True +SESSION_COOKIE_HTTPONLY = True +SESSION_COOKIE_SAMESITE = 'Strict' +SESSION_COOKIE_AGE = 900 # 15 minutes + +# Custom cookie name (optional) +# Default is 'access_token' from ACCESS_TOKEN_COOKIE +``` + +## Usage Examples + +### User Logout (Blacklist Token) + +```python +from ninja import Router +from ninja_jwt.tokens import AccessToken +from oxutils.jwt.utils import blacklist_token + +router = Router() + +@router.post("/logout") +def logout(request): + # Get token from request + auth_header = request.META.get('HTTP_AUTHORIZATION', '') + if auth_header.startswith('Bearer '): + raw_token = auth_header[7:] + token = AccessToken(raw_token) + blacklist_token(token, reason="user_logout") + + return {"message": "Logged out successfully"} +``` + +### Password Reset (Invalidate All Tokens) + +```python +from oxutils.jwt.utils import clear_user_tokens + +def reset_password(user, new_password): + user.set_password(new_password) + user.save() + + # Clear rate limits (optional) + clear_user_tokens(user.id) + + # Note: To invalidate all existing tokens, you need to: + # 1. Track active tokens per user (custom implementation) + # 2. Blacklist them all + # OR + # 3. Change user's password salt/secret (forces re-authentication) +``` + +### Custom Rate Limiting + +```python +from oxutils.jwt.middleware import JWTHeaderAuthMiddleware + +class StrictJWTMiddleware(JWTHeaderAuthMiddleware): + """Custom middleware with stricter rate limiting.""" + + def process_request(self, request): + # Custom rate limit for specific paths + if request.path.startswith('/api/admin/'): + ip_address = self._get_client_ip(request) + rate_limit_key = f'jwt_admin_attempts:{ip_address}' + attempts = cache.get(rate_limit_key, 0) + + # Stricter limit for admin endpoints + if attempts >= 3: + logger.warning(f"Admin rate limit exceeded for {ip_address}") + request.user = AnonymousUser() + return + + # Call parent implementation + super().process_request(request) +``` + +## Security Checklist + +### Production Deployment + +- [ ] Use HTTPS only (`SESSION_COOKIE_SECURE = True`) +- [ ] Set `HttpOnly` cookies (`SESSION_COOKIE_HTTPONLY = True`) +- [ ] Configure `SameSite` (`SESSION_COOKIE_SAMESITE = 'Strict'`) +- [ ] Use Redis for cache backend (not in-memory) +- [ ] Configure proper CORS settings +- [ ] Use asymmetric encryption (RS256) for tokens +- [ ] Set short token lifetimes (15 min for access, 1 day for refresh) +- [ ] Enable token rotation +- [ ] Monitor failed authentication attempts +- [ ] Set up security alerts for rate limit violations + +### Code Review + +- [ ] Never log tokens or sensitive data +- [ ] Always validate user input +- [ ] Use parameterized queries (ORM handles this) +- [ ] Implement proper error handling +- [ ] Don't expose internal error details to clients +- [ ] Use environment variables for secrets +- [ ] Rotate secrets regularly + +## Monitoring + +### Key Metrics to Track + +1. **Failed authentication rate** + - Alert if > 100 failures/minute + +2. **Rate limit violations** + - Track IPs hitting rate limits + +3. **Blacklisted token attempts** + - May indicate token theft + +4. **Token validation errors** + - Spike may indicate attack + +### Log Analysis + +```python +# Example: Find IPs with most failed attempts +grep "JWT authentication failed" /var/log/django.log | \ + awk '{print $NF}' | sort | uniq -c | sort -rn | head -10 +``` + +## Troubleshooting + +### "Token has been revoked" + +**Cause**: Token is blacklisted +**Solution**: User needs to re-authenticate + +### "Rate limit exceeded" + +**Cause**: Too many failed attempts from IP +**Solution**: Wait 1 minute or contact admin + +### "Invalid user identification format" + +**Cause**: Token contains invalid user_id claim +**Solution**: Token may be tampered, re-authenticate + +### "User not found" + +**Cause**: User deleted or user_id invalid +**Solution**: Re-authenticate or check user status + +## Related Documentation + +- [JWT Authentication](jwt.md) +- [Settings](settings.md) +- [Audit System](audit.md) diff --git a/docs/oxiliere.md b/docs/oxiliere.md index a67318e..c451a16 100644 --- a/docs/oxiliere.md +++ b/docs/oxiliere.md @@ -94,7 +94,8 @@ INSTALLED_APPS = [ # Middleware (IMPORTANT: Order matters!) MIDDLEWARE = [ - 'oxutils.oxiliere.middleware.TenantMainMiddleware', # Must be first! + 'oxutils.jwt.middleware.JWTCookieAuthMiddleware', # Must be first! + 'oxutils.oxiliere.middleware.TenantMainMiddleware', 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', @@ -338,30 +339,54 @@ DEFAULT_NOT_FOUND_TENANT_VIEW = 'myapp.views.tenant_not_found' ## Permissions -### TenantPermission +The permission system uses `TokenTenant` from the middleware to verify user access rights. All tenant permissions check that `request.tenant` is a valid `TokenTenant` instance with the appropriate user relationship. -Verifies user has access to the current tenant. +### Available Permission Classes + +| Permission | Description | Use Case | +|------------|-------------|----------| +| `TenantUserPermission` / `IsTenantUser` | User is a member of the current tenant | General tenant access | +| `TenantOwnerPermission` / `IsTenantOwner` | User is the owner of the tenant | Sensitive operations | +| `TenantAdminPermission` / `IsTenantAdmin` | User is admin or owner of the tenant | Administrative functions | +| `OxiliereServicePermission` / `IsOxiliereService` | Request from internal Oxiliere service | Internal APIs | + +### TenantUserPermission (IsTenantUser) + +Verifies the user is an active member of the current tenant. ```python from ninja_extra import api_controller, http_get -from oxutils.oxiliere.permissions import TenantPermission +from oxutils.oxiliere.permissions import TenantUserPermission, IsTenantUser -@api_controller('/orders', permissions=[TenantPermission()]) +# Using class +@api_controller('/orders', permissions=[TenantUserPermission()]) class OrderController: @http_get('/') def list_orders(self, request): # User must be a member of the tenant return Order.objects.all() + +# Using singleton instance (recommended) +@api_controller('/products', permissions=[IsTenantUser]) +class ProductController: + @http_get('/') + def list_products(self, request): + pass ``` -### TenantOwnerPermission +**Requirements:** +- `request.user` must be authenticated +- `request.tenant` must be a `TokenTenant` instance +- `request.tenant.user` must exist and be active (`is_tenant_user` property) + +### TenantOwnerPermission (IsTenantOwner) -Verifies user is the owner of the current tenant. +Verifies the user is the owner of the current tenant. ```python -from oxutils.oxiliere.permissions import TenantOwnerPermission +from oxutils.oxiliere.permissions import TenantOwnerPermission, IsTenantOwner -@api_controller('/settings', permissions=[TenantOwnerPermission()]) +@api_controller('/settings', permissions=[IsTenantOwner]) class SettingsController: @http_post('/update') def update_settings(self, request, payload): @@ -369,14 +394,18 @@ class SettingsController: pass ``` -### TenantAdminPermission +**Requirements:** +- All `TenantUserPermission` requirements +- `request.tenant.user.is_owner` must be `True` -Verifies user is an admin or owner of the current tenant. +### TenantAdminPermission (IsTenantAdmin) + +Verifies the user is an admin or owner of the current tenant. ```python -from oxutils.oxiliere.permissions import TenantAdminPermission +from oxutils.oxiliere.permissions import TenantAdminPermission, IsTenantAdmin -@api_controller('/users', permissions=[TenantAdminPermission()]) +@api_controller('/users', permissions=[IsTenantAdmin]) class UserManagementController: @http_post('/invite') def invite_user(self, request, email: str): @@ -384,14 +413,18 @@ class UserManagementController: pass ``` -### OxiliereServicePermission +**Requirements:** +- All `TenantUserPermission` requirements +- `request.tenant.user.is_admin` must be `True` + +### OxiliereServicePermission (IsOxiliereService) Verifies request comes from an internal Oxiliere service. ```python -from oxutils.oxiliere.permissions import OxiliereServicePermission +from oxutils.oxiliere.permissions import OxiliereServicePermission, IsOxiliereService -@api_controller('/setup', permissions=[OxiliereServicePermission()]) +@api_controller('/setup', permissions=[IsOxiliereService]) class SetupController: @http_post('/init') def init_tenant(self, payload): @@ -400,6 +433,70 @@ class SetupController: pass ``` +**Header Required:** +```http +X-Oxiliere-Service-Token: +``` + +### Custom Tenant Permissions + +Extend `TenantBasePermission` to create custom tenant-based permissions: + +```python +from oxutils.oxiliere.permissions import TenantBasePermission + +class TenantBillingPermission(TenantBasePermission): + """Custom permission for billing operations.""" + + def check_tenant_permission(self, request) -> bool: + tenant = request.tenant + # Custom logic here + return tenant.subscription_plan in ['premium', 'enterprise'] + +# Usage +@api_controller('/billing', permissions=[TenantBillingPermission()]) +class BillingController: + pass +``` + +### How Permissions Work + +1. **Base Check** (`TenantBasePermission.has_permission`): + - Verifies `request.user.is_authenticated` + - Verifies `request.tenant` exists and is a `TokenTenant` + - Calls `check_tenant_permission()` for specific logic + +2. **TokenTenant Properties**: + - `is_tenant_user`: True if user exists and is active + - `is_owner_user`: True if user is owner + - `is_admin_user`: True if user is admin + +3. **Logging**: All permission checks are logged with structured logging: + ```python + logger.info('tenant_permission', + type="tenant_user_access_permission", + tenant=tenant, + user=request.user, + passed=True) + ``` + +### Combining Permissions + +```python +from ninja_extra.permissions import AND, OR, NOT +from oxutils.oxiliere.permissions import IsTenantAdmin, IsTenantOwner + +# Admin OR Owner +@api_controller('/admin', permissions=[OR(IsTenantAdmin, IsTenantOwner)]) +class AdminController: + pass + +# Owner AND specific condition +@api_controller('/delete', permissions=[AND(IsTenantOwner, CustomPermission())]) +class DeleteController: + pass +``` + ## Utilities ### oxid_to_schema_name() diff --git a/pyproject.toml b/pyproject.toml index 11ac903..85fc581 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oxutils" -version = "0.1.14" +version = "0.1.15" description = "Production-ready utilities for Django applications in the Oxiliere ecosystem" readme = "README.md" license = "Apache-2.0" diff --git a/src/oxutils/__init__.py b/src/oxutils/__init__.py index a1160a7..2c67d3e 100644 --- a/src/oxutils/__init__.py +++ b/src/oxutils/__init__.py @@ -10,7 +10,7 @@ - Permission management """ -__version__ = "0.1.14" +__version__ = "0.1.15" from oxutils.settings import oxi_settings from oxutils.conf import UTILS_APPS, AUDIT_MIDDLEWARE diff --git a/src/oxutils/jwt/auth.py b/src/oxutils/jwt/auth.py index f3d241c..30e5377 100644 --- a/src/oxutils/jwt/auth.py +++ b/src/oxutils/jwt/auth.py @@ -17,6 +17,7 @@ JWTStatelessUserAuthentication ) from ninja.security import ( + HttpBearer, APIKeyCookie, HttpBasicAuth, ) @@ -176,6 +177,11 @@ def authenticate(self, request: HttpRequest, username: str, password: str) -> Op class BasicNoPasswordAuth(HttpBasicAuth): def authenticate(self, request: HttpRequest, username: str, password: str) -> Optional[Any]: + + # check if the middleware have already authenticated the user + if request.user.is_authenticated: + return request.user + try: user = get_user_model().objects.get(email=username) if user and user.is_active: @@ -185,12 +191,34 @@ def authenticate(self, request: HttpRequest, username: str, password: str) -> Op except Exception as e: return None -x_session_token_auth = XSessionTokenAuth() + +class JWTPassiveAuth(HttpBearer): + def authenticate(self, request: HttpRequest, token: str) -> Optional[Any]: + if request.user.is_authenticated: + return request.user + return None + + +class JWTCookiePassiveAuth(APIKeyCookie): + def authenticate(self, request: HttpRequest, key: Optional[str]) -> Optional[Any]: + if request.user.is_authenticated: + return request.user + return None + + +# for development Purpose only basic_auth = BasicAuth() basic_no_password_auth = BasicNoPasswordAuth() + +# used on oxiliere authentication service +x_session_token_auth = XSessionTokenAuth() jwt_auth = JWTAuth() jwt_cookie_auth = JWTCookieAuth() +# Production in all oxiliere services +jwt_passive_auth = JWTPassiveAuth() +jwt_cookie_passive_auth = JWTCookiePassiveAuth() + @@ -202,3 +230,13 @@ def get_auth_handlers(auths: List[AuthBase] = []) -> List[AuthBase]: return auths return [jwt_auth, jwt_cookie_auth] + + +def get_passive_auth_handlers(auths: List[AuthBase] = []) -> List[AuthBase]: + """Passive auth handler switcher based on settings.DEBUG""" + from django.conf import settings + + if settings.DEBUG: + return auths + + return [jwt_passive_auth, jwt_cookie_passive_auth] diff --git a/src/oxutils/jwt/middleware.py b/src/oxutils/jwt/middleware.py new file mode 100644 index 0000000..11cf9d1 --- /dev/null +++ b/src/oxutils/jwt/middleware.py @@ -0,0 +1,404 @@ +from typing import Type, Optional +import structlog + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.auth.models import AbstractBaseUser, AnonymousUser +from django.core.exceptions import PermissionDenied +from django.http import HttpRequest +from django.utils.translation import gettext_lazy as _ + +from ninja_jwt.exceptions import AuthenticationFailed, InvalidToken, TokenError +from ninja_jwt.settings import api_settings +from ninja_jwt.tokens import Token +from oxutils.constants import ACCESS_TOKEN_COOKIE +from ninja.utils import check_csrf + + +logger = structlog.get_logger(__name__) + + + + +class JWTAuthBaseMiddleware: + """ + Base middleware for JWT authentication. + Handles token validation and user authentication. + """ + + def __init__(self, get_response): + self.get_response = get_response + self.user_model = get_user_model() + + def __call__(self, request: HttpRequest): + """ + Process the request through the middleware. + """ + # Process request before view + self.process_request(request) + + # Get response from next middleware/view + response = self.get_response(request) + + return response + + def get_token_from_request(self, request: HttpRequest) -> Optional[str]: + """ + Extract JWT token from request. + Must be implemented by subclasses. + + Returns: + Token string if found, None otherwise + """ + raise NotImplementedError( + "Subclasses must implement get_token_from_request() method" + ) + + def process_request(self, request: HttpRequest): + """ + Process request and validate JWT token if present. + Authentication is handled by another service - this only validates token signature and claims. + + Skips authentication if user is already authenticated by a previous middleware. + """ + # Skip if user is already authenticated by another middleware + if hasattr(request, 'user') and request.user.is_authenticated: + if settings.DEBUG: + logger.debug( + "jwt_auth_skipped", + description="User already authenticated, skipping JWT middleware", + user_id=getattr(request.user, 'id', None), + path=request.path + ) + return + + try: + token = self.get_token_from_request(request) + except PermissionDenied as e: + # CSRF or other security check failed + logger.warning( + "security_check_failed", + description="Security check failed during token extraction", + exception=type(e).__name__, + error=str(e), + path=request.path, + remote_addr=request.META.get('REMOTE_ADDR') + ) + request.user = AnonymousUser() + return + + if token: + try: + # Only validate token and extract user info (no DB lookup) + self.jwt_authenticate(request, token) + except (InvalidToken, AuthenticationFailed) as e: + # Token invalid - set anonymous user + if settings.DEBUG: + logger.debug( + "jwt_validation_failed", + description="JWT validation failed", + exception=type(e).__name__, + path=request.path + ) + request.user = AnonymousUser() + else: + # No token provided - anonymous user + request.user = AnonymousUser() + + @classmethod + def get_validated_token(cls, raw_token) -> Type[Token]: + """ + Validates an encoded JSON web token and returns a validated token wrapper object. + Only validates signature and claims - authentication is handled by external service. + """ + messages = [] + for AuthToken in api_settings.AUTH_TOKEN_CLASSES: + try: + return AuthToken(raw_token) + except TokenError as e: + messages.append( + { + "token_class": AuthToken.__name__, + "token_type": AuthToken.token_type, + "message": e.args[0], + } + ) + + raise InvalidToken( + { + "detail": _("Given token not valid for any token type"), + "messages": messages, + } + ) + + def get_user(self, validated_token) -> AbstractBaseUser: + """ + Returns a stateless user object from the validated token. + No database lookup - authentication is handled by external service. + """ + try: + user_id = validated_token[api_settings.USER_ID_CLAIM] + except KeyError as e: + raise InvalidToken( + _("Token contained no recognizable user identification") + ) from e + + # Validate user_id type and value + if not isinstance(user_id, (int, str)) or not user_id: + raise InvalidToken(_("Invalid user identification format")) + + # Additional validation for string user_id + if isinstance(user_id, str) and len(user_id) > 255: + raise InvalidToken(_("User identification too long")) + + # Return stateless TokenUser (no DB lookup) + return api_settings.TOKEN_USER_CLASS(validated_token) + + def jwt_authenticate(self, request: HttpRequest, token: str) -> AbstractBaseUser: + """ + Authenticate user from JWT token and attach to request. + """ + validated_token = self.get_validated_token(token) + user = self.get_user(validated_token) + request.user = user + request.token_user = user # For backward compatibility and request.user can be overridden by other middlewares + return user + + +class JWTHeaderAuthMiddleware(JWTAuthBaseMiddleware): + """ + JWT authentication middleware that extracts token from Authorization header. + Stateless authentication without database lookup. + """ + openapi_scheme: str = "bearer" + header: str = "Authorization" + + + def get_token_from_request(self, request: HttpRequest) -> str: + """ + Extract JWT token from Authorization header. + Validates scheme and format without logging sensitive data. + """ + headers = request.headers + auth_value = headers.get(self.header) + if not auth_value: + return None + + parts = auth_value.split(" ") + + # Validate minimum parts + if len(parts) < 2: + if settings.DEBUG: + logger.warning( + "invalid_authorization_header", + description="Invalid Authorization header format", + remote_addr=request.META.get('REMOTE_ADDR') + ) + return None + + # Validate scheme + if parts[0].lower() != self.openapi_scheme: + if settings.DEBUG: + logger.warning( + "unexpected_auth_scheme", + description="Unexpected auth scheme", + expected_scheme=self.openapi_scheme, + actual_scheme=parts[0], + remote_addr=request.META.get('REMOTE_ADDR') + ) + return None + + token = " ".join(parts[1:]) + + # Basic token format validation (JWT has 3 parts separated by dots) + if token.count('.') != 2: + if settings.DEBUG: + logger.warning( + "invalid_jwt_format", + description="Invalid JWT format", + remote_addr=request.META.get('REMOTE_ADDR') + ) + return None + + return token + + +class JWTCookieAuthMiddleware(JWTAuthBaseMiddleware): + """ + JWT authentication middleware that extracts token from cookies. + Stateless authentication without database lookup. + """ + param_name = ACCESS_TOKEN_COOKIE + + def get_token_from_request(self, request: HttpRequest) -> Optional[str]: + """ + Extract JWT token from cookies with CSRF protection. + + CSRF check is required for cookie-based authentication to prevent + cross-site request forgery attacks. + + Override to customize cookie name or CSRF behavior. + + Raises: + PermissionDenied: If CSRF check fails + """ + # CSRF protection for cookie-based auth + error_response = check_csrf(request) + if error_response: + logger.warning( + "csrf_check_failed", + description="CSRF validation failed for cookie-based JWT auth", + path=request.path, + remote_addr=request.META.get('REMOTE_ADDR'), + cookie_name=self.param_name + ) + raise PermissionDenied("CSRF check failed") + + return request.COOKIES.get(self.param_name, None) + + +class BasicNoPasswordAuthMiddleware: + """ + DEVELOPMENT ONLY: Basic authentication middleware without password verification. + + WARNING: This middleware bypasses password authentication and should ONLY be used + in development environments. It allows authentication by providing only a username/email + in the Authorization header using Basic auth format. + + This middleware automatically disables itself when settings.DEBUG is False. + + Usage: + Authorization: Basic base64(username:) + or + Authorization: Basic base64(username:anything) + + Example: + # For user "admin@example.com" + # Base64 encode: "admin@example.com:" + # Header: Authorization: Basic YWRtaW5AZXhhbXBsZS5jb206 + + IMPORTANT: Remove this middleware before deploying to production! + """ + + header = "Authorization" + scheme = "basic" + + def __init__(self, get_response): + self.get_response = get_response + self.user_model = get_user_model() + + # Check if in debug mode - disable completely if not + self._enabled = settings.DEBUG + + if self._enabled: + # Warning log on initialization + logger.warning( + "insecure_middleware_loaded", + description="BasicNoPasswordAuthMiddleware loaded - THIS IS INSECURE AND FOR DEVELOPMENT ONLY", + middleware=self.__class__.__name__ + ) + else: + # Log that middleware is disabled + logger.info( + "insecure_middleware_disabled", + description="BasicNoPasswordAuthMiddleware disabled in non-DEBUG mode", + middleware=self.__class__.__name__ + ) + + def __call__(self, request: HttpRequest): + """ + Process the request through the middleware. + If not in DEBUG mode, simply passes through without any processing. + """ + if not self._enabled: + return self.get_response(request) + + self.process_request(request) + return self.get_response(request) + + def process_request(self, request: HttpRequest): + """ + Process request and authenticate user if Basic auth header is present. + Skips if user is already authenticated. + Note: This method is only called when DEBUG is True. + """ + # Skip if already authenticated + if hasattr(request, 'user') and request.user.is_authenticated: + return + + auth_header = request.headers.get(self.header) + if not auth_header: + return + + try: + user = self._authenticate(auth_header) + if user: + request.user = user + logger.info( + "dev_auth_success", + description="Development authentication successful", + user_id=user.id, + username=user.email, + path=request.path + ) + except Exception as e: + logger.debug( + "dev_auth_failed", + description="Development authentication failed", + error=str(e), + path=request.path + ) + + def _authenticate(self, auth_header: str) -> Optional[AbstractBaseUser]: + """ + Extract credentials from Basic auth header and authenticate user without password. + + Args: + auth_header: The Authorization header value + + Returns: + User object if found and active, None otherwise + """ + parts = auth_header.split(" ") + + # Validate scheme + if len(parts) != 2 or parts[0].lower() != self.scheme: + return None + + # Decode base64 credentials + import base64 + try: + encoded_credentials = parts[1] + decoded_bytes = base64.b64decode(encoded_credentials) + decoded_credentials = decoded_bytes.decode('utf-8') + except Exception: + logger.debug("Failed to decode base64 credentials") + return None + + # Split username:password (password is ignored) + if ':' in decoded_credentials: + username, _ = decoded_credentials.split(':', 1) + else: + username = decoded_credentials + + if not username: + return None + + # Find user by email or username + try: + user = self.user_model.objects.get(email=username) + except self.user_model.DoesNotExist: + try: + # Try by username field if different from email + user = self.user_model.objects.get(**{self.user_model.USERNAME_FIELD: username}) + except (self.user_model.DoesNotExist, AttributeError): + logger.debug(f"User not found: {username}") + return None + + # Check if user is active + if not user.is_active: + logger.debug(f"User {username} is inactive") + return None + + return user + diff --git a/src/oxutils/jwt/models.py b/src/oxutils/jwt/models.py index 7b69a46..6e391c6 100644 --- a/src/oxutils/jwt/models.py +++ b/src/oxutils/jwt/models.py @@ -11,23 +11,49 @@ logger = structlog.get_logger(__name__) +class TenantUser: + def __init__( + self, + oxi_id: str | None = None, + id: str | None = None, + is_owner: bool = False, + is_admin: bool = False, + status: str | None = None, + ): + self.oxi_id = oxi_id + self.id = id + self.is_owner = is_owner + self.is_admin = is_admin + self.status = status + + def __bool__(self): + return self.status == 'active' + + def is_active(self): + return self.status == 'active' + + class TokenTenant: def __init__( self, schema_name: str, - tenant_id: int, + tenant_id: str, oxi_id: str, subscription_plan: str, subscription_status: str, - status: str, + subscription_end_date: str | None = None, + status: str = 'active', + user: TenantUser | None = None, ): self.schema_name = schema_name self.id = tenant_id self.oxi_id = oxi_id self.subscription_plan = subscription_plan self.subscription_status = subscription_status + self.subscription_end_date = subscription_end_date self.status = status + self.user = user def __str__(self): return f"{self.schema_name} - {self.oxi_id}" @@ -44,23 +70,93 @@ def is_active(self): def is_deleted(self): return self.status == 'deleted' + def __bool__(self): + return self.status == 'active' + + @property + def is_admin_user(self): + if self.user: + return self.user.is_admin + return False + + @property + def is_owner_user(self): + if self.user: + return self.user.is_owner + return False + + @property + def is_tenant_user(self): + if self.user: + return self.user.is_active() + return False + + def get_tenant_type(self): + return self.subscription_status + @classmethod def for_token(cls, token): try: token_obj = OrganizationAccessToken(token=token) + + # set the tenant user + if 'tenant_user_id' in token_obj and token_obj.get('tenant_user_id'): + user = TenantUser( + oxi_id=token_obj.get('tenant_user_oxi_id'), + id=token_obj.get('tenant_user_id'), + is_owner=token_obj.get('tenant_user_is_owner'), + is_admin=token_obj.get('tenant_user_is_admin'), + status=token_obj.get('tenant_user_status'), + ) + else: + user = TenantUser() + tenant = cls( schema_name=token_obj['schema_name'], tenant_id=token_obj['tenant_id'], oxi_id=token_obj['oxi_id'], subscription_plan=token_obj['subscription_plan'], subscription_status=token_obj['subscription_status'], + subscription_end_date=token_obj.get('subscription_end_date'), status=token_obj['status'], + user=user, ) + return tenant except Exception: logger.exception('Failed to create TokenTenant from token', token=token) return None + @classmethod + def from_db(cls, tenant) -> 'TokenTenant': + if not tenant: + raise ValueError('Tenant is required') + + if hasattr(tenant, 'user') and tenant.user: + user = TenantUser( + oxi_id=tenant.user.user.oxi_id, + id=tenant.user.id, + is_owner=tenant.user.is_owner, + is_admin=tenant.user.is_admin, + status=tenant.user.status, + ) + else: + user = TenantUser() + + return cls( + schema_name=tenant.schema_name, + tenant_id=tenant.id, + oxi_id=tenant.oxi_id, + subscription_plan=tenant.subscription_plan, + subscription_status=tenant.subscription_status, + subscription_end_date=tenant.subscription_end_date, + status=tenant.status, + user=user, + ) + + def __repr__(self): + return f"TokenTenant(schema_name='{self.schema_name}', oxi_id='{self.oxi_id}', status='{self.status}')" + class TokenUser(DefaultTonkenUser): @cached_property diff --git a/src/oxutils/jwt/tokens.py b/src/oxutils/jwt/tokens.py index 8980236..e724fd6 100644 --- a/src/oxutils/jwt/tokens.py +++ b/src/oxutils/jwt/tokens.py @@ -58,6 +58,14 @@ def for_tenant(cls, tenant) -> Token: token.payload['subscription_end_date'] = str(tenant.subscription_end_date) token.payload['status'] = str(tenant.status) + # Add tenant user info if available + if hasattr(tenant, 'user'): + token.payload['tenant_user_oxi_id'] = str(tenant.user.user.oxi_id) + token.payload['tenant_user_id'] = str(tenant.user.id) + token.payload['tenant_user_is_owner'] = tenant.user.is_owner + token.payload['tenant_user_is_admin'] = tenant.user.is_admin + token.payload['tenant_user_status'] = str(tenant.user.status) + return token @property diff --git a/src/oxutils/oxiliere/middleware.py b/src/oxutils/oxiliere/middleware.py index 9c0ba8f..0662f0e 100644 --- a/src/oxutils/oxiliere/middleware.py +++ b/src/oxutils/oxiliere/middleware.py @@ -1,10 +1,13 @@ from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist from django.db import connection from django.http import Http404 from django.urls import set_urlconf from django.utils.module_loading import import_string from django.utils.deprecation import MiddlewareMixin +import structlog + from django_tenants.utils import ( get_public_schema_name, get_public_schema_urlconf, @@ -23,6 +26,9 @@ +logger = structlog.get_logger(__name__) + + class TenantMainMiddleware(MiddlewareMixin): TENANT_NOT_FOUND_EXCEPTION = Http404 @@ -44,6 +50,22 @@ def get_tenant(self, tenant_model, oxi_id): """ return tenant_model.objects.get(oxi_id=oxi_id) + def get_tenant_user(self, tenant, user, raise_exception=False): + """ Get tenant user by tenant and user. + """ + if not tenant or not user: + if raise_exception: + raise ObjectDoesNotExist("tenant_user_not_found, tenant or user is None") + return None + + try: + return tenant.users.select_related('user').get(user__pk=user.id) + except ObjectDoesNotExist: + logger.error("tenant_user_not_found", tenant_id=tenant.id, user_id=user.id) + if raise_exception: + raise ObjectDoesNotExist("tenant_user_not_found") + return None + def process_request(self, request): # Connection needs first to be at the public schema, as this is where # the tenant metadata is stored. @@ -51,42 +73,61 @@ def process_request(self, request): connection.set_schema_to_public() oxi_id = self.get_org_id_from_request(request) + tenant_model = connection.tenant_model # Try to get tenant from cookie token first tenant_token = request.COOKIES.get(ORGANIZATION_TOKEN_COOKIE_KEY) tenant = None + old_tenant = None request._should_set_tenant_cookie = False if tenant_token: tenant = TokenTenant.for_token(tenant_token) # Verify the token's oxi_id matches the request if not is_system_tenant(tenant) and tenant.oxi_id != oxi_id: + logger.info("tenant_token_oxi_id_doesnt_match_request_oxi_id", tenant_oxi_id=tenant.oxi_id, request_oxi_id=oxi_id) + old_tenant = tenant tenant = None # If no valid token, fetch from database if not tenant: if oxi_id: # fetch with oxi_id on tenant - tenant_model = connection.tenant_model try: tenant = self.get_tenant(tenant_model, oxi_id) + tenant.user = self.get_tenant_user(tenant, request.user, raise_exception=True) + # Mark that we need to set the cookie in the response request._should_set_tenant_cookie = True - except tenant_model.DoesNotExist: + + if old_tenant: + logger.info("tenant_changed", old_tenant=old_tenant.oxi_id, new_tenant=tenant.oxi_id) + + except ObjectDoesNotExist as ex: + logger.error("tenant_not_found", oxi_id=oxi_id, error=str(ex)) default_tenant = self.no_tenant_found(request, oxi_id) return default_tenant else: # try to return the system tenant try: from oxutils.oxiliere.caches import get_system_tenant tenant = get_system_tenant() + tenant.user = self.get_tenant_user(tenant, request.user, raise_exception=False) request._should_set_tenant_cookie = True except Exception as e: + logger.error("system_tenant_not_found", error=str(e)) from django.http import HttpResponseBadRequest return HttpResponseBadRequest('Missing X-Organization-ID header') if tenant.is_deleted or not tenant.is_active: + logger.error("tenant_is_deleted_or_inactive", oxi_id=oxi_id) return self.no_tenant_found(request, oxi_id) - request.tenant = tenant + if tenant and not isinstance(tenant, TokenTenant): + request.db_tenant = tenant + else: + request.db_tenant = None + + request.tenant = TokenTenant.from_db(tenant) + set_current_tenant_schema_name(tenant.schema_name) connection.set_tenant(request.tenant) self.setup_url_routing(request) @@ -94,9 +135,9 @@ def process_request(self, request): def process_response(self, request, response): """Set the tenant token cookie if needed.""" if hasattr(request, '_should_set_tenant_cookie') and request._should_set_tenant_cookie: - if hasattr(request, 'tenant') and not isinstance(request.tenant, TokenTenant): + if hasattr(request, 'db_tenant') and isinstance(request.db_tenant, connection.tenant_model): # Generate token from DB tenant - token = OrganizationAccessToken.for_tenant(request.tenant) + token = OrganizationAccessToken.for_tenant(request.db_tenant) response.set_cookie( key=ORGANIZATION_TOKEN_COOKIE_KEY, value=str(token), diff --git a/src/oxutils/oxiliere/permissions.py b/src/oxutils/oxiliere/permissions.py index a48211c..e77cfbc 100644 --- a/src/oxutils/oxiliere/permissions.py +++ b/src/oxutils/oxiliere/permissions.py @@ -1,81 +1,95 @@ +import structlog + from ninja_extra.permissions import BasePermission -from oxutils.oxiliere.utils import get_tenant_user_model from oxutils.constants import OXILIERE_SERVICE_TOKEN from oxutils.jwt.tokens import OxilierServiceToken +from oxutils.jwt.models import TokenTenant + + +logger = structlog.get_logger(__name__) -class TenantPermission(BasePermission): + + +class TenantBasePermission(BasePermission): """ Vérifie que l'utilisateur a accès au tenant actuel. L'utilisateur doit être authentifié et avoir un lien avec le tenant. """ + def check_tenant_permission(self, request) -> bool: + raise NotImplementedError("Subclasses must implement this method") + def has_permission(self, request, **kwargs): if not request.user or not request.user.is_authenticated: return False if not hasattr(request, 'tenant'): + logger.warning('tenant_permission', type="tenant_not_found", user=request.user) + return False + + if not isinstance(request.tenant, TokenTenant): + logger.warning( + 'tenant_permission', + type="tenant_is_not_token_tenant", + tenant=request.tenant, + user=request.user + ) return False - - # Vérifier que l'utilisateur a accès à ce tenant - return get_tenant_user_model().objects.filter( - tenant__pk=request.tenant.pk, - user__pk=request.user.pk - ).exists() + return self.check_tenant_permission(request) -class TenantOwnerPermission(BasePermission): + +class TenantUserPermission(TenantBasePermission): """ - Vérifie que l'utilisateur est propriétaire (owner) du tenant actuel. + Vérifie que l'utilisateur est un membre du tenant actuel. + Alias de TenantPermission pour plus de clarté sémantique. """ - def has_permission(self, request, **kwargs): - if not request.user or not request.user.is_authenticated: - return False - - if not hasattr(request, 'tenant'): - return False + def check_tenant_permission(self, request) -> bool: + tenant: TokenTenant = request.tenant + + logger.info( + 'tenant_permission', + type="tenant_user_access_permission", + tenant=tenant, user=request.user, + passed=tenant.is_tenant_user + ) - return get_tenant_user_model().objects.filter( - tenant__pk=request.tenant.pk, - user__pk=request.user.pk, - is_owner=True - ).exists() + return tenant.is_tenant_user -class TenantAdminPermission(BasePermission): +class TenantOwnerPermission(TenantBasePermission): """ - Vérifie que l'utilisateur est admin ou owner du tenant actuel. + Vérifie que l'utilisateur est propriétaire (owner) du tenant actuel. """ - def has_permission(self, request, **kwargs): - if not request.user or not request.user.is_authenticated: - return False - - if not hasattr(request, 'tenant'): - return False + def check_tenant_permission(self, request) -> bool: + tenant: TokenTenant = request.tenant + + logger.info( + 'tenant_permission', + type="tenant_user_access_permission", + tenant=tenant, user=request.user, + passed=tenant.is_owner_user + ) - return get_tenant_user_model().objects.filter( - tenant__pk=request.tenant.pk, - user__pk=request.user.pk, - is_admin=True - ).exists() + return tenant.is_owner_user -class TenantUserPermission(BasePermission): +class TenantAdminPermission(TenantBasePermission): """ - Vérifie que l'utilisateur est un membre du tenant actuel. - Alias de TenantPermission pour plus de clarté sémantique. + Vérifie que l'utilisateur est admin ou owner du tenant actuel. """ - def has_permission(self, request, **kwargs): - if not request.user or not request.user.is_authenticated: - return False - - if not hasattr(request, 'tenant'): - return False + def check_tenant_permission(self, request) -> bool: + tenant: TokenTenant = request.tenant + + logger.info( + 'tenant_permission', + type="tenant_user_access_permission", + tenant=tenant, user=request.user, + passed=tenant.is_admin_user + ) - return get_tenant_user_model().objects.filter( - tenant__pk=request.tenant.pk, - user__pk=request.user.pk - ).exists() + return tenant.is_admin_user class OxiliereServicePermission(BasePermission): @@ -95,3 +109,10 @@ def has_permission(self, request, **kwargs): return True except Exception: return False + + + +IsTenantUser = TenantUserPermission() +IsTenantOwner = TenantOwnerPermission() +IsTenantAdmin = TenantAdminPermission() +IsOxiliereService = OxiliereServicePermission() diff --git a/src/oxutils/oxiliere/schemas.py b/src/oxutils/oxiliere/schemas.py index 8a6c693..709671b 100644 --- a/src/oxutils/oxiliere/schemas.py +++ b/src/oxutils/oxiliere/schemas.py @@ -1,6 +1,9 @@ from typing import Optional from uuid import UUID from ninja import Schema +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from django.utils.module_loading import import_string from django.db import transaction from django.contrib.auth import get_user_model from django_tenants.utils import get_tenant_model @@ -13,6 +16,18 @@ logger = structlog.get_logger(__name__) + +def get_tenant_schema() -> 'TenantSchema': + if hasattr(settings, 'OX_TENANT_SCHEMA'): + try: + return import_string(settings.OX_TENANT_SCHEMA) + except ImportError as e: + raise ImproperlyConfigured( + f"Error: OX_TENANT_SCHEMA import error: {settings.OX_TENANT_SCHEMA}, please check your settings" + ) from e + return TenantSchema + + class TenantSchema(Schema): name: str oxi_id: str @@ -29,6 +44,22 @@ class TenantOwnerSchema(Schema): email: str +class UserSchema(Schema): + oxi_id: UUID + first_name: Optional[str] = None + last_name: Optional[str] = None + email: str + is_active: bool + photo: Optional[str] = None + + +class TenantUser(Schema): + user: UserSchema + is_owner: bool + is_admin: bool + status: str + + class CreateTenantSchema(Schema): tenant: TenantSchema owner: TenantOwnerSchema diff --git a/tests/common/test_jwt_tokens.py b/tests/common/test_jwt_tokens.py index 14ef707..0bd8349 100644 --- a/tests/common/test_jwt_tokens.py +++ b/tests/common/test_jwt_tokens.py @@ -6,7 +6,13 @@ from unittest.mock import Mock, patch from django.conf import settings from oxutils.jwt.tokens import OxilierServiceToken, OrganizationAccessToken -from oxutils.jwt.models import TokenTenant +from oxutils.jwt.models import TokenTenant, TenantUser + + + +user = TenantUser() +user.user = Mock() +user.user.oxi_id = 'test-user' class TestOxilierServiceToken: @@ -80,7 +86,6 @@ def test_org_token_for_tenant(self): mock_tenant.schema_name = 'tenant_schema' mock_tenant.subscription_plan = 'premium' mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' mock_tenant.status = 'active' token = OrganizationAccessToken.for_tenant(mock_tenant) @@ -100,7 +105,6 @@ def test_org_token_type(self): mock_tenant.schema_name = 'test' mock_tenant.subscription_plan = 'basic' mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' mock_tenant.status = 'active' token = OrganizationAccessToken.for_tenant(mock_tenant) @@ -109,16 +113,17 @@ def test_org_token_type(self): def test_org_token_encoding(self): """Test encoding organization token.""" - mock_tenant = Mock() - mock_tenant.id = 1 - mock_tenant.oxi_id = 'test-org' - mock_tenant.schema_name = 'test_schema' - mock_tenant.subscription_plan = 'basic' - mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' - mock_tenant.status = 'active' + tenant = TokenTenant( + tenant_id=1, + oxi_id='test-org', + schema_name='test_schema', + subscription_plan='basic', + subscription_status='active', + status='active', + user=user + ) - token = OrganizationAccessToken.for_tenant(mock_tenant) + token = OrganizationAccessToken.for_tenant(tenant) token_str = str(token) assert isinstance(token_str, str) @@ -127,16 +132,17 @@ def test_org_token_encoding(self): def test_org_token_decoding(self): """Test decoding organization token.""" - mock_tenant = Mock() - mock_tenant.id = 456 - mock_tenant.oxi_id = 'org-456' - mock_tenant.schema_name = 'org_schema' - mock_tenant.subscription_plan = 'enterprise' - mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' - mock_tenant.status = 'active' + tenant = TokenTenant( + tenant_id=456, + oxi_id='org-456', + schema_name='org_schema', + subscription_plan='enterprise', + subscription_status='active', + status='active', + user=user + ) - original_token = OrganizationAccessToken.for_tenant(mock_tenant) + original_token = OrganizationAccessToken.for_tenant(tenant) token_str = str(original_token) decoded_token = OrganizationAccessToken(token=token_str) @@ -153,7 +159,6 @@ def test_org_token_expiration(self): mock_tenant.schema_name = 'test' mock_tenant.subscription_plan = 'basic' mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' mock_tenant.status = 'active' token = OrganizationAccessToken.for_tenant(mock_tenant) @@ -177,7 +182,8 @@ def test_token_tenant_creation(self): oxi_id='org-123', subscription_plan='premium', subscription_status='active', - status='active' + status='active', + user=user ) assert tenant.schema_name == 'test_schema' @@ -195,7 +201,8 @@ def test_token_tenant_pk_property(self): oxi_id='org-456', subscription_plan='basic', subscription_status='active', - status='active' + status='active', + user=user ) assert tenant.pk == 456 @@ -209,34 +216,36 @@ def test_token_tenant_str(self): oxi_id='org-uuid', subscription_plan='basic', subscription_status='active', - status='active' + status='active', + user=user ) assert str(tenant) == 'my_schema - org-uuid' def test_token_tenant_for_token_success(self): """Test creating TokenTenant from valid token.""" - mock_tenant = Mock() - mock_tenant.id = 789 - mock_tenant.oxi_id = 'org-789' - mock_tenant.schema_name = 'tenant_789' - mock_tenant.subscription_plan = 'enterprise' - mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' - mock_tenant.status = 'active' + tenant = TokenTenant( + tenant_id=789, + oxi_id='org-789', + schema_name='tenant_789', + subscription_plan='enterprise', + subscription_status='active', + status='active', + user=user + ) # Create token - token = OrganizationAccessToken.for_tenant(mock_tenant) + token = OrganizationAccessToken.for_tenant(tenant) token_str = str(token) # Create TokenTenant from token - tenant = TokenTenant.for_token(token_str) + result_tenant = TokenTenant.for_token(token_str) - assert tenant is not None - assert tenant.id == '789' - assert tenant.oxi_id == 'org-789' - assert tenant.schema_name == 'tenant_789' - assert tenant.subscription_plan == 'enterprise' + assert result_tenant is not None + assert result_tenant.id == '789' + assert result_tenant.oxi_id == 'org-789' + assert result_tenant.schema_name == 'tenant_789' + assert result_tenant.subscription_plan == 'enterprise' def test_token_tenant_for_token_invalid(self): """Test creating TokenTenant from invalid token returns None.""" @@ -246,24 +255,25 @@ def test_token_tenant_for_token_invalid(self): def test_token_tenant_for_token_expired(self): """Test creating TokenTenant from expired token returns None.""" - mock_tenant = Mock() - mock_tenant.id = 1 - mock_tenant.oxi_id = 'test' - mock_tenant.schema_name = 'test' - mock_tenant.subscription_plan = 'basic' - mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' - mock_tenant.status = 'active' + tenant = TokenTenant( + tenant_id=1, + oxi_id='test', + schema_name='test', + subscription_plan='basic', + subscription_status='active', + status='active', + user=user + ) # Create token and manually expire it - token = OrganizationAccessToken.for_tenant(mock_tenant) + token = OrganizationAccessToken.for_tenant(tenant) token.payload['exp'] = datetime.now() - timedelta(hours=1) token_str = str(token) # Should return None for expired token - tenant = TokenTenant.for_token(token_str) + result_tenant = TokenTenant.for_token(token_str) - assert tenant is None + assert result_tenant is None class TestTokenIntegration: @@ -285,49 +295,52 @@ def test_service_token_roundtrip(self): def test_org_token_to_tenant_model(self): """Test converting organization token to TokenTenant.""" - mock_tenant = Mock() - mock_tenant.id = 999 - mock_tenant.oxi_id = 'org-999' - mock_tenant.schema_name = 'org_999_schema' - mock_tenant.subscription_plan = 'premium' - mock_tenant.subscription_status = 'active' - mock_tenant.subscription_end_date = '2025-12-31' - mock_tenant.status = 'active' + tenant = TokenTenant( + tenant_id=999, + oxi_id='org-999', + schema_name='org_999_schema', + subscription_plan='premium', + subscription_status='active', + status='active', + user=user + ) # Create token from tenant - token = OrganizationAccessToken.for_tenant(mock_tenant) + token = OrganizationAccessToken.for_tenant(tenant) token_str = str(token) # Recreate tenant from token - tenant = TokenTenant.for_token(token_str) + result_tenant = TokenTenant.for_token(token_str) - assert tenant.id == '999' - assert tenant.oxi_id == 'org-999' - assert tenant.schema_name == 'org_999_schema' - assert tenant.subscription_plan == 'premium' + assert result_tenant.id == '999' + assert result_tenant.oxi_id == 'org-999' + assert result_tenant.schema_name == 'org_999_schema' + assert result_tenant.subscription_plan == 'premium' def test_multiple_tokens_different_data(self): """Test creating multiple tokens with different data.""" - mock_tenant1 = Mock() - mock_tenant1.id = 1 - mock_tenant1.oxi_id = 'org-1' - mock_tenant1.schema_name = 'schema_1' - mock_tenant1.subscription_plan = 'basic' - mock_tenant1.subscription_status = 'active' - mock_tenant1.subscription_end_date = '2025-12-31' - mock_tenant1.status = 'active' - - mock_tenant2 = Mock() - mock_tenant2.id = 2 - mock_tenant2.oxi_id = 'org-2' - mock_tenant2.schema_name = 'schema_2' - mock_tenant2.subscription_plan = 'premium' - mock_tenant2.subscription_status = 'active' - mock_tenant2.subscription_end_date = '2025-12-31' - mock_tenant2.status = 'active' - - token1 = OrganizationAccessToken.for_tenant(mock_tenant1) - token2 = OrganizationAccessToken.for_tenant(mock_tenant2) + tenant1 = TokenTenant( + tenant_id=1, + oxi_id='org-1', + schema_name='schema_1', + subscription_plan='basic', + subscription_status='active', + status='active', + user=user + ) + + tenant2 = TokenTenant( + tenant_id=2, + oxi_id='org-2', + schema_name='schema_2', + subscription_plan='premium', + subscription_status='active', + status='active', + user=user + ) + + token1 = OrganizationAccessToken.for_tenant(tenant1) + token2 = OrganizationAccessToken.for_tenant(tenant2) assert token1['tenant_id'] != token2['tenant_id'] assert token1['oxi_id'] != token2['oxi_id'] diff --git a/tests/oxiliere/test_middleware.py b/tests/oxiliere/test_middleware.py new file mode 100644 index 0000000..fcb6b02 --- /dev/null +++ b/tests/oxiliere/test_middleware.py @@ -0,0 +1,530 @@ +""" +Tests for JWT middleware authentication. + +Tests the following middlewares: +- JWTHeaderAuthMiddleware: Token extraction from Authorization header +- JWTCookieAuthMiddleware: Token extraction from cookies with CSRF protection +- BasicNoPasswordAuthMiddleware: Development-only basic auth without password +""" + +import base64 +import pytest +from unittest.mock import Mock, patch + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.auth.models import AnonymousUser +from django.core.exceptions import PermissionDenied +from django.http import HttpRequest + +from oxutils.jwt.middleware import ( + JWTAuthBaseMiddleware, + JWTHeaderAuthMiddleware, + JWTCookieAuthMiddleware, + BasicNoPasswordAuthMiddleware, +) +from ninja_jwt.exceptions import InvalidToken +from ninja_jwt.tokens import AccessToken + + +User = get_user_model() + + +@pytest.fixture +def mock_get_response(): + """Mock get_response function for middleware initialization.""" + return Mock(return_value=Mock(status_code=200)) + + +@pytest.fixture +def create_request(): + """Factory to create HTTP requests with various configurations.""" + def _create_request( + method="GET", + auth_header=None, + cookies=None, + remote_addr="127.0.0.1", + user=None, + ): + request = HttpRequest() + request.method = method + request.META = { + "REMOTE_ADDR": remote_addr, + "HTTP_USER_AGENT": "TestAgent", + } + + if auth_header: + request.META["HTTP_AUTHORIZATION"] = auth_header + + if cookies: + request.COOKIES = cookies + else: + request.COOKIES = {} + + # Initialize with AnonymousUser by default + request.user = user if user else AnonymousUser() + + return request + + return _create_request + + +@pytest.fixture +def active_user(db): + """Create an active user for testing.""" + return User.objects.create_user( + username="active_user", + email="test@example.com", + password="testpass123", + is_active=True, + ) + + +@pytest.fixture +def inactive_user(db): + """Create an inactive user for testing.""" + return User.objects.create_user( + username="inactive_user", + email="inactive@example.com", + password="testpass123", + is_active=False, + ) + + +class TestJWTAuthBaseMiddleware: + """Tests for the base JWT authentication middleware.""" + + def test_middleware_initialization(self, mock_get_response): + """Test middleware initializes correctly.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + + assert middleware.get_response == mock_get_response + assert middleware.user_model == User + + def test_get_token_from_request_not_implemented(self, mock_get_response, create_request): + """Test that base middleware requires subclass implementation.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + request = create_request() + + with pytest.raises(NotImplementedError, match="Subclasses must implement"): + middleware.get_token_from_request(request) + + def test_process_request_skips_if_already_authenticated( + self, mock_get_response, create_request, active_user + ): + """Test that middleware skips if user is already authenticated.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + request = create_request(user=active_user) + + # Should not raise NotImplementedError because it skips early + middleware.process_request(request) + + assert request.user == active_user + + def test_process_request_anonymous_user(self, mock_get_response, create_request): + """Test that middleware attempts auth for anonymous users.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + request = create_request() + + # Will fail because get_token_from_request is not implemented + with pytest.raises(NotImplementedError): + middleware.process_request(request) + + +class TestJWTHeaderAuthMiddleware: + """Tests for JWT header authentication middleware.""" + + def test_extract_valid_token(self, mock_get_response, create_request): + """Test extracting a valid JWT token from header.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + token = "valid.jwt.token" + request = create_request(auth_header=f"Bearer {token}") + + extracted = middleware.get_token_from_request(request) + + assert extracted == token + + def test_extract_token_without_bearer_prefix(self, mock_get_response, create_request): + """Test that token without Bearer prefix is rejected.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + request = create_request(auth_header="valid.jwt.token") + + extracted = middleware.get_token_from_request(request) + + assert extracted is None + + def test_extract_token_wrong_scheme(self, mock_get_response, create_request): + """Test that wrong scheme (Basic instead of Bearer) is rejected.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + request = create_request(auth_header="Basic dXNlcjpwYXNz") + + extracted = middleware.get_token_from_request(request) + + assert extracted is None + + def test_extract_token_missing_header(self, mock_get_response, create_request): + """Test that missing Authorization header returns None.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + request = create_request() + + extracted = middleware.get_token_from_request(request) + + assert extracted is None + + def test_extract_token_invalid_format(self, mock_get_response, create_request): + """Test that invalid JWT format (not 3 parts) is rejected.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + request = create_request(auth_header="Bearer invalid.token") + + extracted = middleware.get_token_from_request(request) + + assert extracted is None + + def test_extract_token_case_insensitive_bearer(self, mock_get_response, create_request): + """Test that bearer scheme is case-insensitive.""" + middleware = JWTHeaderAuthMiddleware(mock_get_response) + token = "valid.jwt.token" + request = create_request(auth_header=f"bearer {token}") + + extracted = middleware.get_token_from_request(request) + + assert extracted == token + + def test_process_request_valid_token( + self, mock_get_response, create_request, active_user, settings + ): + """Test full authentication flow with valid token.""" + settings.DEBUG = True + + middleware = JWTHeaderAuthMiddleware(mock_get_response) + token = str(AccessToken.for_user(active_user)) + request = create_request(auth_header=f"Bearer {token}") + + middleware.process_request(request) + + assert request.user.is_authenticated + assert request.user.id == active_user.id + assert hasattr(request, 'token_user') + + def test_process_request_invalid_token(self, mock_get_response, create_request, settings): + """Test that invalid token results in anonymous user.""" + settings.DEBUG = True + + middleware = JWTHeaderAuthMiddleware(mock_get_response) + request = create_request(auth_header="Bearer invalid.jwt.token") + + middleware.process_request(request) + + assert not request.user.is_authenticated + assert isinstance(request.user, AnonymousUser) + + +class TestJWTCookieAuthMiddleware: + """Tests for JWT cookie authentication middleware.""" + + def test_extract_valid_token_from_cookie(self, mock_get_response, create_request): + """Test extracting token from cookie.""" + middleware = JWTCookieAuthMiddleware(mock_get_response) + token = "valid.jwt.token" + request = create_request(cookies={"access_token": token}) + + with patch("oxutils.jwt.middleware.check_csrf", return_value=None): + extracted = middleware.get_token_from_request(request) + + assert extracted == token + + def test_extract_missing_cookie(self, mock_get_response, create_request): + """Test that missing cookie returns None.""" + middleware = JWTCookieAuthMiddleware(mock_get_response) + request = create_request() + + with patch("oxutils.jwt.middleware.check_csrf", return_value=None): + extracted = middleware.get_token_from_request(request) + + assert extracted is None + + def test_csrf_check_fails(self, mock_get_response, create_request): + """Test that CSRF failure raises PermissionDenied.""" + middleware = JWTCookieAuthMiddleware(mock_get_response) + request = create_request(cookies={"access_token": "valid.token"}) + + with patch("oxutils.jwt.middleware.check_csrf", return_value=Mock()): + with pytest.raises(PermissionDenied, match="CSRF check failed"): + middleware.get_token_from_request(request) + + def test_process_request_handles_csrf_failure( + self, mock_get_response, create_request, settings + ): + """Test that CSRF failure in process_request sets anonymous user.""" + settings.DEBUG = True + + middleware = JWTCookieAuthMiddleware(mock_get_response) + request = create_request(cookies={"access_token": "valid.token"}) + + with patch("oxutils.jwt.middleware.check_csrf", return_value=Mock()): + middleware.process_request(request) + + assert not request.user.is_authenticated + assert isinstance(request.user, AnonymousUser) + + def test_custom_cookie_name(self, mock_get_response, create_request): + """Test that custom cookie name can be used.""" + class CustomCookieMiddleware(JWTCookieAuthMiddleware): + param_name = "custom_token" + + middleware = CustomCookieMiddleware(mock_get_response) + token = "valid.jwt.token" + request = create_request(cookies={"custom_token": token}) + + with patch("oxutils.jwt.middleware.check_csrf", return_value=None): + extracted = middleware.get_token_from_request(request) + + assert extracted == token + + +class TestBasicNoPasswordAuthMiddleware: + """Tests for development-only basic auth middleware.""" + + def test_middleware_disabled_in_production(self, mock_get_response, settings): + """Test that middleware is disabled when DEBUG is False.""" + settings.DEBUG = False + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + + assert not middleware._enabled + + def test_middleware_enabled_in_debug(self, mock_get_response, settings): + """Test that middleware is enabled when DEBUG is True.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + + assert middleware._enabled + + def test_bypass_in_production_mode(self, mock_get_response, create_request, settings): + """Test that middleware bypasses completely in production.""" + settings.DEBUG = False + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + request = create_request() + + # Should not process the request at all + response = middleware(request) + + assert middleware.get_response.called + assert not hasattr(request.user, 'is_authenticated') or not request.user.is_authenticated + + def test_valid_basic_auth_header(self, mock_get_response, create_request, active_user, settings): + """Test authentication with valid Basic auth header.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + credentials = base64.b64encode(f"{active_user.email}:anything".encode()).decode() + request = create_request(auth_header=f"Basic {credentials}") + + middleware.process_request(request) + + assert request.user.is_authenticated + assert request.user.id == active_user.id + + def test_basic_auth_without_password(self, mock_get_response, create_request, active_user, settings): + """Test authentication with username only (no password).""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + credentials = base64.b64encode(f"{active_user.email}:".encode()).decode() + request = create_request(auth_header=f"Basic {credentials}") + + middleware.process_request(request) + + assert request.user.is_authenticated + assert request.user.id == active_user.id + + def test_basic_auth_inactive_user(self, mock_get_response, create_request, inactive_user, settings): + """Test that inactive users cannot authenticate.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + credentials = base64.b64encode(f"{inactive_user.email}:anything".encode()).decode() + request = create_request(auth_header=f"Basic {inactive_user.email}") + + middleware.process_request(request) + + assert not request.user.is_authenticated + + def test_basic_auth_nonexistent_user(self, mock_get_response, create_request, settings): + """Test that non-existent users cannot authenticate.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + credentials = base64.b64encode(b"nonexistent@example.com:anything").decode() + request = create_request(auth_header=f"Basic {credentials}") + + middleware.process_request(request) + + assert not request.user.is_authenticated + + def test_basic_auth_missing_header(self, mock_get_response, create_request, settings): + """Test that missing header results in anonymous user.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + request = create_request() + + middleware.process_request(request) + + assert not request.user.is_authenticated + + def test_basic_auth_wrong_scheme(self, mock_get_response, create_request, active_user, settings): + """Test that wrong scheme (Bearer instead of Basic) is rejected.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + request = create_request(auth_header=f"Bearer {active_user.email}") + + middleware.process_request(request) + + assert not request.user.is_authenticated + + def test_basic_auth_invalid_base64(self, mock_get_response, create_request, settings): + """Test that invalid base64 is handled gracefully.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + request = create_request(auth_header="Basic invalid!!!base64") + + middleware.process_request(request) + + assert not request.user.is_authenticated + + def test_skips_if_already_authenticated( + self, mock_get_response, create_request, active_user, settings + ): + """Test that middleware skips if user is already authenticated.""" + settings.DEBUG = True + + middleware = BasicNoPasswordAuthMiddleware(mock_get_response) + request = create_request(user=active_user) + + middleware.process_request(request) + + # Should not have changed the user + assert request.user == active_user + + +class TestMiddlewareChaining: + """Tests for chaining multiple JWT middlewares together.""" + + def test_header_middleware_authenticates_first( + self, mock_get_response, create_request, active_user, settings + ): + """Test that first successful middleware auth prevents second attempt.""" + settings.DEBUG = True + + token = str(AccessToken.for_user(active_user)) + request = create_request( + auth_header=f"Bearer {token}", + cookies={"access_token": "different.token"} + ) + + # First middleware (header) authenticates + header_middleware = JWTHeaderAuthMiddleware(mock_get_response) + header_middleware.process_request(request) + + assert request.user.is_authenticated + assert request.user.id == active_user.id + + # Second middleware (cookie) should skip + original_user = request.user + with patch("oxutils.jwt.middleware.check_csrf", return_value=None): + cookie_middleware = JWTCookieAuthMiddleware(mock_get_response) + cookie_middleware.process_request(request) + + # User should not have changed + assert request.user == original_user + + def test_cookie_fallback_when_header_fails( + self, mock_get_response, create_request, active_user, settings + ): + """Test that cookie middleware can authenticate when header fails.""" + settings.DEBUG = True + + token = str(AccessToken.for_user(active_user)) + request = create_request( + auth_header="Bearer invalid.token", + cookies={"access_token": token} + ) + + # First middleware (header) fails + header_middleware = JWTHeaderAuthMiddleware(mock_get_response) + header_middleware.process_request(request) + + # User should be anonymous after header failure + assert not request.user.is_authenticated + + # Second middleware (cookie) should authenticate + with patch("oxutils.jwt.middleware.check_csrf", return_value=None): + cookie_middleware = JWTCookieAuthMiddleware(mock_get_response) + cookie_middleware.process_request(request) + + # Now user should be authenticated + assert request.user.is_authenticated + assert request.user.id == active_user.id + + +class TestJWTTokenValidation: + """Tests for JWT token validation.""" + + def test_get_validated_token_success(self, active_user): + """Test successful token validation.""" + token = AccessToken.for_user(active_user) + raw_token = str(token) + + validated = JWTAuthBaseMiddleware.get_validated_token(raw_token) + + assert validated is not None + from ninja_jwt.settings import api_settings + assert validated[api_settings.USER_ID_CLAIM] == active_user.id + + def test_get_validated_token_invalid(self): + """Test that invalid token raises exception.""" + with pytest.raises(InvalidToken): + JWTAuthBaseMiddleware.get_validated_token("invalid.token") + + def test_get_user_with_valid_token(self, mock_get_response, active_user): + """Test extracting user from valid token.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + token = AccessToken.for_user(active_user) + + user = middleware.get_user(token) + + assert user is not None + assert user.id == active_user.id + + def test_get_user_missing_user_id_claim(self, mock_get_response): + """Test that missing user_id claim raises InvalidToken.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + token = AccessToken() + + with pytest.raises(InvalidToken, match="no recognizable user identification"): + middleware.get_user(token) + + def test_get_user_invalid_user_id_type(self, mock_get_response): + """Test that invalid user_id type raises InvalidToken.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + token = AccessToken() + from ninja_jwt.settings import api_settings + token[api_settings.USER_ID_CLAIM] = None # Invalid type + + with pytest.raises(InvalidToken, match="Invalid user identification format"): + middleware.get_user(token) + + def test_get_user_user_id_too_long(self, mock_get_response): + """Test that too long user_id raises InvalidToken.""" + middleware = JWTAuthBaseMiddleware(mock_get_response) + token = AccessToken() + from ninja_jwt.settings import api_settings + token[api_settings.USER_ID_CLAIM] = "x" * 256 # Too long + + with pytest.raises(InvalidToken, match="User identification too long"): + middleware.get_user(token) diff --git a/tests/oxiliere/test_oxiliere.py b/tests/oxiliere/test_oxiliere.py index 01d892f..5172690 100644 --- a/tests/oxiliere/test_oxiliere.py +++ b/tests/oxiliere/test_oxiliere.py @@ -129,16 +129,17 @@ def test_missing_org_id_returns_bad_request(self, mock_connection): @patch('oxutils.oxiliere.middleware.connection') def test_tenant_not_found_raises_404(self, mock_connection): """Test that non-existent tenant raises 404.""" - from django_tenants.utils import get_tenant_model + from django.http import Http404 mock_connection.set_schema_to_public = Mock() mock_connection.tenant_model = Mock() - mock_connection.tenant_model.DoesNotExist = Exception + mock_connection.tenant_model.DoesNotExist = Http404 request = self.factory.get('/', HTTP_X_ORGANIZATION_ID='nonexistent') + request.user = Mock() # Add user attribute with patch.object(self.middleware, 'get_tenant') as mock_get_tenant: - mock_get_tenant.side_effect = mock_connection.tenant_model.DoesNotExist + mock_get_tenant.side_effect = Http404 with pytest.raises(Http404): self.middleware.process_request(request) @@ -146,25 +147,44 @@ def test_tenant_not_found_raises_404(self, mock_connection): @patch('oxutils.oxiliere.middleware.connection') def test_successful_tenant_switch(self, mock_connection): """Test successful tenant schema switch.""" - mock_tenant = Mock() + from django.contrib.auth.models import AnonymousUser + from oxutils.jwt.models import TokenTenant + + mock_connection.set_schema_to_public = Mock() + + # Create a real class for tenant_model and mock_tenant so isinstance works + TenantModel = type('TenantModel', (), {}) + mock_connection.tenant_model = TenantModel + mock_connection.set_tenant = Mock() + + # Create mock_tenant as instance of TenantModel so isinstance works + mock_tenant = Mock(spec=TenantModel) + mock_tenant.id = "test-id" mock_tenant.oxi_id = 'acme-corp' mock_tenant.schema_name = 'tenant_acmecorp' mock_tenant.is_deleted = False mock_tenant.is_active = True - - mock_connection.set_schema_to_public = Mock() - mock_connection.tenant_model = Mock() - mock_connection.set_tenant = Mock() + mock_tenant.subscription_plan = 'basic' + mock_tenant.subscription_status = 'active' + mock_tenant.subscription_end_date = '2025-12-31' + mock_tenant.status = 'active' request = self.factory.get('/', HTTP_X_ORGANIZATION_ID='acme-corp') + request.user = AnonymousUser() # Add user attribute with patch.object(self.middleware, 'get_tenant', return_value=mock_tenant): - with patch.object(self.middleware, 'setup_url_routing'): - with patch('oxutils.oxiliere.middleware.set_current_tenant_schema_name'): - self.middleware.process_request(request) - - assert request.tenant == mock_tenant - mock_connection.set_tenant.assert_called_once_with(mock_tenant) + with patch.object(self.middleware, 'get_tenant_user', return_value=Mock()): + with patch.object(self.middleware, 'setup_url_routing'): + with patch('oxutils.oxiliere.middleware.set_current_tenant_schema_name'): + self.middleware.process_request(request) + + # Check that request.tenant is now a TokenTenant (not the original mock) + assert isinstance(request.tenant, TokenTenant) + assert request.tenant.oxi_id == 'acme-corp' + assert request.tenant.schema_name == 'tenant_acmecorp' + + # Check that request.db_tenant is the original DB tenant + assert request.db_tenant == mock_tenant @pytest.mark.django_db diff --git a/tests/oxiliere/test_permissions.py b/tests/oxiliere/test_permissions.py index 9099492..3014b9c 100644 --- a/tests/oxiliere/test_permissions.py +++ b/tests/oxiliere/test_permissions.py @@ -5,72 +5,31 @@ from unittest.mock import Mock, patch, MagicMock from django.contrib.auth.models import AnonymousUser from oxutils.oxiliere.permissions import ( - TenantPermission, + TenantBasePermission, + TenantUserPermission, TenantOwnerPermission, TenantAdminPermission, - TenantUserPermission, OxiliereServicePermission, + IsTenantUser, + IsTenantOwner, + IsTenantAdmin, + IsOxiliereService, ) +from oxutils.jwt.models import TokenTenant, TenantUser from oxutils.jwt.tokens import OxilierServiceToken from oxutils.constants import OXILIERE_SERVICE_TOKEN -class TestTenantPermission: - """Test TenantPermission class.""" - - def test_permission_authenticated_user_with_access(self): - """Test permission granted for authenticated user with tenant access.""" - permission = TenantPermission() - - mock_user = Mock() - mock_user.is_authenticated = True - mock_user.pk = 1 - - mock_tenant = Mock() - mock_tenant.pk = 100 - - mock_request = Mock() - mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = True - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is True - mock_tenant_user_model.objects.filter.assert_called_once() +class TestTenantBasePermission: + """Test TenantBasePermission abstract base class.""" - def test_permission_authenticated_user_without_access(self): - """Test permission denied for authenticated user without tenant access.""" - permission = TenantPermission() - - mock_user = Mock() - mock_user.is_authenticated = True - mock_user.pk = 1 - - mock_tenant = Mock() - mock_tenant.pk = 100 - - mock_request = Mock() - mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = False - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is False - - def test_permission_unauthenticated_user(self): + def test_unauthenticated_user_denied(self): """Test permission denied for unauthenticated user.""" - permission = TenantPermission() + class TestPermission(TenantBasePermission): + def check_tenant_permission(self, request): + return True + permission = TestPermission() mock_request = Mock() mock_request.user = AnonymousUser() @@ -78,10 +37,13 @@ def test_permission_unauthenticated_user(self): assert result is False - def test_permission_no_user(self): + def test_no_user_denied(self): """Test permission denied when no user in request.""" - permission = TenantPermission() + class TestPermission(TenantBasePermission): + def check_tenant_permission(self, request): + return True + permission = TestPermission() mock_request = Mock() mock_request.user = None @@ -89,10 +51,13 @@ def test_permission_no_user(self): assert result is False - def test_permission_no_tenant(self): + def test_no_tenant_denied(self): """Test permission denied when no tenant in request.""" - permission = TenantPermission() + class TestPermission(TenantBasePermission): + def check_tenant_permission(self, request): + return True + permission = TestPermission() mock_user = Mock() mock_user.is_authenticated = True @@ -102,179 +67,295 @@ def test_permission_no_tenant(self): result = permission.has_permission(mock_request) assert result is False + + def test_tenant_not_token_tenant_denied(self): + """Test permission denied when tenant is not a TokenTenant.""" + class TestPermission(TenantBasePermission): + def check_tenant_permission(self, request): + return True + + permission = TestPermission() + mock_user = Mock() + mock_user.is_authenticated = True + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = Mock() # Not a TokenTenant + + result = permission.has_permission(mock_request) + + assert result is False -class TestTenantOwnerPermission: - """Test TenantOwnerPermission class.""" +class TestTenantUserPermission: + """Test TenantUserPermission / IsTenantUser.""" - def test_permission_owner_user(self): - """Test permission granted for owner user.""" - permission = TenantOwnerPermission() - + def _create_tenant(self, tenant_user=None): + """Helper to create TokenTenant.""" + return TokenTenant( + schema_name='test_schema', + tenant_id='100', + oxi_id='test-org', + subscription_plan='basic', + subscription_status='active', + subscription_end_date='2025-12-31', + status='active', + user=tenant_user + ) + + def test_permission_granted_for_active_tenant_user(self): + """Test permission granted for active tenant user.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=False, + status='active' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = True - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is True - # Verify is_owner=True was in filter - call_kwargs = mock_tenant_user_model.objects.filter.call_args[1] - assert call_kwargs['is_owner'] is True - - def test_permission_non_owner_user(self): - """Test permission denied for non-owner user.""" - permission = TenantOwnerPermission() + mock_request.tenant = tenant + + permission = TenantUserPermission() + result = permission.has_permission(mock_request) + assert result is True + + def test_permission_denied_for_inactive_tenant_user(self): + """Test permission denied for inactive tenant user.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=False, + status='inactive' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = False - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is False + mock_request.tenant = tenant + + permission = TenantUserPermission() + result = permission.has_permission(mock_request) + + assert result is False - def test_permission_unauthenticated_user(self): - """Test permission denied for unauthenticated user.""" - permission = TenantOwnerPermission() + def test_permission_denied_for_no_tenant_user(self): + """Test permission denied when tenant has no user.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant = self._create_tenant(None) # No tenant user mock_request = Mock() - mock_request.user = AnonymousUser() + mock_request.user = mock_user + mock_request.tenant = tenant + permission = TenantUserPermission() result = permission.has_permission(mock_request) assert result is False + + def test_singleton_instance_works(self): + """Test IsTenantUser singleton instance.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=False, + status='active' + ) + tenant = self._create_tenant(tenant_user) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + + result = IsTenantUser.has_permission(mock_request) + + assert result is True class TestTenantAdminPermission: - """Test TenantAdminPermission class.""" + """Test TenantAdminPermission / IsTenantAdmin.""" + + def _create_tenant(self, tenant_user=None): + """Helper to create TokenTenant.""" + return TokenTenant( + schema_name='test_schema', + tenant_id='100', + oxi_id='test-org', + subscription_plan='basic', + subscription_status='active', + subscription_end_date='2025-12-31', + status='active', + user=tenant_user + ) - def test_permission_admin_user(self): - """Test permission granted for admin user.""" + def test_permission_granted_for_admin(self): + """Test permission granted for admin.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=True, + status='active' + ) + tenant = self._create_tenant(tenant_user) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + permission = TenantAdminPermission() + result = permission.has_permission(mock_request) + assert result is True + + def test_permission_denied_for_non_admin(self): + """Test permission denied for regular user.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=False, + status='active' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = True - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is True - # Verify is_admin=True was in filter - call_kwargs = mock_tenant_user_model.objects.filter.call_args[1] - assert call_kwargs['is_admin'] is True - - def test_permission_non_admin_user(self): - """Test permission denied for non-admin user.""" + mock_request.tenant = tenant + permission = TenantAdminPermission() + result = permission.has_permission(mock_request) + assert result is False + + def test_singleton_instance_works(self): + """Test IsTenantAdmin singleton instance.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=True, + status='active' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = False - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is False + mock_request.tenant = tenant + + result = IsTenantAdmin.has_permission(mock_request) + + assert result is True -class TestTenantUserPermission: - """Test TenantUserPermission class.""" +class TestTenantOwnerPermission: + """Test TenantOwnerPermission / IsTenantOwner.""" - def test_permission_tenant_member(self): - """Test permission granted for tenant member.""" - permission = TenantUserPermission() - + def _create_tenant(self, tenant_user=None): + """Helper to create TokenTenant.""" + return TokenTenant( + schema_name='test_schema', + tenant_id='100', + oxi_id='test-org', + subscription_plan='basic', + subscription_status='active', + subscription_end_date='2025-12-31', + status='active', + user=tenant_user + ) + + def test_permission_granted_for_owner(self): + """Test permission granted for owner.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=True, + is_admin=False, + status='active' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = True - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is True + mock_request.tenant = tenant + + permission = TenantOwnerPermission() + result = permission.has_permission(mock_request) + + assert result is True - def test_permission_non_member(self): - """Test permission denied for non-member.""" - permission = TenantUserPermission() + def test_permission_denied_for_admin_non_owner(self): + """Test permission denied for admin who is not owner.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=True, + status='active' + ) + tenant = self._create_tenant(tenant_user) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + + permission = TenantOwnerPermission() + result = permission.has_permission(mock_request) + assert result is False + + def test_singleton_instance_works(self): + """Test IsTenantOwner singleton instance.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=True, + is_admin=False, + status='active' + ) + tenant = self._create_tenant(tenant_user) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = False - mock_get_model.return_value = mock_tenant_user_model - - result = permission.has_permission(mock_request) - - assert result is False + mock_request.tenant = tenant + + result = IsTenantOwner.has_permission(mock_request) + + assert result is True class TestOxiliereServicePermission: @@ -376,52 +457,105 @@ def test_permission_header_name_conversion(self): result = permission.has_permission(mock_request) assert result is True + + def test_singleton_instance_works(self): + """Test IsOxiliereService singleton instance.""" + token = OxilierServiceToken.for_service({'service': 'test'}) + + mock_request = Mock() + mock_request.headers.get.return_value = str(token) + mock_request.META.get.return_value = None + + result = IsOxiliereService.has_permission(mock_request) + + assert result is True class TestPermissionIntegration: """Test permission integration scenarios.""" - def test_multiple_permissions_hierarchy(self): - """Test that owner permission is more restrictive than tenant permission.""" + def _create_tenant(self, is_owner=False, is_admin=False, status='active'): + """Helper to create TokenTenant with user.""" + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=is_owner, + is_admin=is_admin, + status=status + ) + return TokenTenant( + schema_name='test_schema', + tenant_id='100', + oxi_id='test-org', + subscription_plan='basic', + subscription_status='active', + subscription_end_date='2025-12-31', + status='active', + user=tenant_user + ) + + def test_owner_has_all_permissions(self): + """Test owner passes all permission checks.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant = self._create_tenant(is_owner=True, is_admin=True) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + + assert IsTenantUser.has_permission(mock_request) is True + assert IsTenantOwner.has_permission(mock_request) is True + assert IsTenantAdmin.has_permission(mock_request) is True + + def test_admin_has_user_and_admin_permissions(self): + """Test admin passes user and admin checks but not owner.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant = self._create_tenant(is_owner=False, is_admin=True) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + + assert IsTenantUser.has_permission(mock_request) is True + assert IsTenantOwner.has_permission(mock_request) is False + assert IsTenantAdmin.has_permission(mock_request) is True + + def test_regular_user_has_only_user_permission(self): + """Test regular user only passes basic user check.""" + mock_user = Mock() + mock_user.is_authenticated = True + + tenant = self._create_tenant(is_owner=False, is_admin=False) + + mock_request = Mock() + mock_request.user = mock_user + mock_request.tenant = tenant + + assert IsTenantUser.has_permission(mock_request) is True + assert IsTenantOwner.has_permission(mock_request) is False + assert IsTenantAdmin.has_permission(mock_request) is False + + def test_inactive_user_has_no_permissions(self): + """Test inactive user fails all permission checks.""" mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant = self._create_tenant(is_owner=True, is_admin=True, status='inactive') mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - tenant_perm = TenantPermission() - owner_perm = TenantOwnerPermission() - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - - # User is member but not owner - def filter_side_effect(**kwargs): - mock_qs = Mock() - if 'is_owner' in kwargs: - mock_qs.exists.return_value = False - else: - mock_qs.exists.return_value = True - return mock_qs - - mock_tenant_user_model.objects.filter.side_effect = filter_side_effect - mock_get_model.return_value = mock_tenant_user_model - - # Should pass tenant permission - assert tenant_perm.has_permission(mock_request) is True - - # Should fail owner permission - assert owner_perm.has_permission(mock_request) is False + mock_request.tenant = tenant + + assert IsTenantUser.has_permission(mock_request) is False + assert IsTenantOwner.has_permission(mock_request) is False + assert IsTenantAdmin.has_permission(mock_request) is False - def test_service_permission_vs_user_permission(self): + def test_service_permission_independent_of_user(self): """Test service permission works independently of user permission.""" - # Service permission with valid token - service_perm = OxiliereServicePermission() token = OxilierServiceToken.for_service({'service': 'api'}) mock_request = Mock() @@ -430,33 +564,39 @@ def test_service_permission_vs_user_permission(self): mock_request.user = AnonymousUser() # No user # Service permission should pass even without user - assert service_perm.has_permission(mock_request) is True + assert IsOxiliereService.has_permission(mock_request) is True # But tenant permission should fail - tenant_perm = TenantPermission() - assert tenant_perm.has_permission(mock_request) is False + assert IsTenantUser.has_permission(mock_request) is False def test_kwargs_passed_to_permission(self): """Test that kwargs are properly handled by permissions.""" - permission = TenantPermission() - mock_user = Mock() mock_user.is_authenticated = True - mock_user.pk = 1 - mock_tenant = Mock() - mock_tenant.pk = 100 + tenant_user = TenantUser( + oxi_id='user-123', + id='1', + is_owner=False, + is_admin=False, + status='active' + ) + tenant = TokenTenant( + schema_name='test_schema', + tenant_id='100', + oxi_id='test-org', + subscription_plan='basic', + subscription_status='active', + subscription_end_date='2025-12-31', + status='active', + user=tenant_user + ) mock_request = Mock() mock_request.user = mock_user - mock_request.tenant = mock_tenant - - with patch('oxutils.oxiliere.permissions.get_tenant_user_model') as mock_get_model: - mock_tenant_user_model = Mock() - mock_tenant_user_model.objects.filter.return_value.exists.return_value = True - mock_get_model.return_value = mock_tenant_user_model - - # Should work with kwargs - result = permission.has_permission(mock_request, view=Mock(), extra_param='test') - - assert result is True + mock_request.tenant = tenant + + # Should work with kwargs + result = IsTenantUser.has_permission(mock_request, view=Mock(), extra_param='test') + + assert result is True diff --git a/uv.lock b/uv.lock index 5cfc8b7..a892235 100644 --- a/uv.lock +++ b/uv.lock @@ -1127,7 +1127,7 @@ wheels = [ [[package]] name = "oxutils" -version = "0.1.14" +version = "0.1.15" source = { editable = "." } dependencies = [ { name = "boto3" },