-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadmin_auth.py
More file actions
115 lines (88 loc) · 3.2 KB
/
admin_auth.py
File metadata and controls
115 lines (88 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# SPDX-License-Identifier: GPL-3.0-or-later
#
# Toolify: Empower any LLM with function calling capabilities.
# Copyright (C) 2025 FunnyCups (https://github.com/funnycups)
import jwt
import bcrypt
import secrets
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
# Security scheme
security = HTTPBearer()
class LoginRequest(BaseModel):
"""Login request model"""
username: str
password: str
class LoginResponse(BaseModel):
"""Login response model"""
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
"""Token data model"""
username: str
exp: datetime
def hash_password(password: str) -> str:
"""Hash a password using bcrypt"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
try:
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
except Exception:
return False
def generate_jwt_secret() -> str:
"""Generate a secure random JWT secret"""
return secrets.token_urlsafe(48)
def create_access_token(username: str, secret_key: str, expires_delta: timedelta = timedelta(hours=24)) -> str:
"""Create a JWT access token"""
expire = datetime.utcnow() + expires_delta
to_encode = {
"sub": username,
"exp": expire
}
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256")
return encoded_jwt
def verify_token(token: str, secret_key: str) -> Optional[str]:
"""Verify a JWT token and return the username if valid"""
try:
payload = jwt.decode(token, secret_key, algorithms=["HS256"])
username: str = payload.get("sub")
if username is None:
return None
return username
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
def get_admin_credentials():
"""Dependency to get admin credentials from config"""
from config_loader import config_loader
config = config_loader.config
if not config.admin_authentication:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Admin authentication is not configured"
)
return config.admin_authentication
async def verify_admin_token(
credentials: HTTPAuthorizationCredentials = Depends(security),
admin_config = Depends(get_admin_credentials)
) -> str:
"""Dependency to verify admin token"""
token = credentials.credentials
username = verify_token(token, admin_config.jwt_secret)
if username != admin_config.username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
return username