feat: add API middleware for CORS, security, rate limiting - PR-6#154
feat: add API middleware for CORS, security, rate limiting - PR-6#154uelkerd wants to merge 2 commits into
Conversation
…initial implementation
Reviewer's GuideIntegrates CORS support, JWT-based authentication, and per-client rate limiting into the unified API server by defining reusable helper modules and registering them as middleware. Class diagram for new authentication and rate limiting modulesclassDiagram
class RateLimiter {
+max_requests: int
+window_seconds: int
+requests: defaultdict
+__init__(max_requests: int = 100, window_seconds: int = 3600)
+is_allowed(identifier: str) bool
+get_remaining_requests(identifier: str) int
}
class Auth {
+verify_password(plain_password, hashed_password)
+get_password_hash(password)
+create_access_token(data: dict, expires_delta: Optional[timedelta] = None)
+get_current_user(credentials: HTTPAuthorizationCredentials)
}
RateLimiter <.. UnifiedAPIServer : used by
Auth <.. UnifiedAPIServer : used by
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughIntroduces Flask API key auth and an in-memory rate limiter, adds a minimal Flask server with a protected endpoint using both, and implements separate FastAPI-compatible password/JWT utilities and a generic sliding-window rate limiter class. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant F as Flask Router
participant AK as require_api_key (decorator)
participant RL as rate_limit (decorator)
participant H as protected handler
C->>F: POST /api/protected
F->>AK: Validate X-API-Key
alt Missing/Invalid API key
AK-->>C: 401 {"error":"API key required"}
else Valid API key
AK->>RL: Enforce sliding-window limit
alt Over limit
RL-->>C: 429 Rate limit exceeded
else Allowed
RL->>H: Invoke handler
H-->>C: 200 {"message":"Protected endpoint"}
end
end
sequenceDiagram
autonumber
participant C as Client
participant FA as FastAPI Route
participant HB as HTTPBearer
participant AU as get_current_user
participant JWT as JWT Decoder
C->>FA: Request with Authorization: Bearer <token>
FA->>HB: Extract bearer credentials
HB-->>FA: HTTPAuthorizationCredentials
FA->>AU: Depends(credentials)
AU->>JWT: Decode token (SECRET_KEY, ALGORITHM)
alt Invalid/expired token or missing sub
AU-->>C: 401 Could not validate credentials
else Valid token
AU-->>FA: username (sub)
FA-->>C: 200 Response
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Pre-merge checks (1 passed, 2 warnings)❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Summary of Changes
Hello @uelkerd, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request establishes foundational API middleware to enhance the application's security and stability. It provides essential mechanisms for user authentication and implements a basic rate limiting system, laying the groundwork for more robust and controlled API interactions.
Highlights
- Authentication Middleware: Introduces a new module
src/security/auth.pyto handle API authentication. This includes functions for password hashing using bcrypt, JWT token creation and validation, and a dependency for retrieving the current authenticated user. - Rate Limiting Middleware: Adds a new module
src/security/rate_limiter.pythat implements a basic in-memory rate limiting mechanism. It allows controlling the number of requests an identifier can make within a specified time window. - New Security Modules: Creates a dedicated
src/securitydirectory to house the new authentication and rate limiting functionalities, promoting modularity and organization for security-related components.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
|
Here's the code health analysis summary for commits Analysis Summary
|
There was a problem hiding this comment.
Pull Request Overview
This PR introduces essential security middleware for the API including CORS configuration, authentication handling, and rate limiting capabilities. The implementation provides a foundation for secure API access control and request throttling.
Key changes:
- JWT-based authentication system with password hashing
- Rate limiting mechanism using sliding window approach
- Security middleware components for API protection
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| src/security/auth.py | Implements JWT authentication with password hashing and token validation |
| src/security/rate_limiter.py | Provides sliding window rate limiting functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
|
||
| # Security settings | ||
| SECRET_KEY = "your-secret-key" # Should be loaded from config |
There was a problem hiding this comment.
Hardcoded secret key poses a security risk. This should be loaded from environment variables or a secure configuration file to prevent exposure in version control.
| # Security settings | |
| SECRET_KEY = "your-secret-key" # Should be loaded from config | |
| import os | |
| # Security settings | |
| SECRET_KEY = os.environ.get("SECRET_KEY") | |
| if not SECRET_KEY: | |
| raise RuntimeError("SECRET_KEY environment variable not set") |
| def get_remaining_requests(self, identifier: str) -> int: | ||
| now = time.time() | ||
| window_start = now - self.window_seconds | ||
| self.requests[identifier] = [ | ||
| timestamp for timestamp in self.requests[identifier] | ||
| if timestamp > window_start | ||
| ] | ||
| return max(0, self.max_requests - len(self.requests[identifier])) No newline at end of file |
There was a problem hiding this comment.
The window cleanup logic is duplicated between is_allowed and get_remaining_requests methods. Consider extracting this into a private helper method to reduce code duplication.
| @@ -0,0 +1,31 @@ | |||
| from collections import defaultdict | |||
| from datetime import datetime, timedelta | |||
There was a problem hiding this comment.
The datetime and timedelta imports are unused. Only time.time() is used for timestamp operations.
| from datetime import datetime, timedelta |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- Hardcoded secret key should be loaded securely from configuration. (link)
General comments:
- Move SECRET_KEY and related settings into environment variables or a config file to avoid hardcoding sensitive values in auth.py.
- Consider using a distributed or persistent store (e.g., Redis) for rate limiting instead of in-memory lists to handle multi-instance deployments and avoid unbounded memory growth.
- Extract the timestamp filtering logic in rate_limiter.py into a shared helper to avoid duplicating it in both is_allowed and get_remaining_requests.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Move SECRET_KEY and related settings into environment variables or a config file to avoid hardcoding sensitive values in auth.py.
- Consider using a distributed or persistent store (e.g., Redis) for rate limiting instead of in-memory lists to handle multi-instance deployments and avoid unbounded memory growth.
- Extract the timestamp filtering logic in rate_limiter.py into a shared helper to avoid duplicating it in both is_allowed and get_remaining_requests.
## Individual Comments
### Comment 1
<location> `src/security/auth.py:9` </location>
<code_context>
+from typing import Optional
+
+# Security settings
+SECRET_KEY = "your-secret-key" # Should be loaded from config
+ALGORITHM = "HS256"
+ACCESS_TOKEN_EXPIRE_MINUTES = 30
</code_context>
<issue_to_address>
Hardcoded secret key should be loaded securely from configuration.
Use environment variables or a secure config system to prevent exposure of sensitive keys.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| from typing import Optional | ||
|
|
||
| # Security settings | ||
| SECRET_KEY = "your-secret-key" # Should be loaded from config |
There was a problem hiding this comment.
🚨 issue (security): Hardcoded secret key should be loaded securely from configuration.
Use environment variables or a secure config system to prevent exposure of sensitive keys.
| to_encode.update({"exp": expire}) | ||
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | ||
| return encoded_jwt |
There was a problem hiding this comment.
suggestion (code-quality): We've found these issues:
- Add single value to dictionary directly rather than using update() (
simplify-dictionary-update) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| to_encode["exp"] = expire | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) |
| except JWTError: | ||
| raise credentials_exception |
There was a problem hiding this comment.
suggestion (code-quality): Explicitly raise from a previous error (raise-from-previous-error)
| except JWTError: | |
| raise credentials_exception | |
| except JWTError as e: | |
| raise credentials_exception from e |
There was a problem hiding this comment.
Code Review
This pull request introduces important middleware for authentication and rate limiting. However, there are significant issues that need to be addressed. The authentication module (auth.py) contains a critical security vulnerability due to a hardcoded SECRET_KEY and also has an inconsistency in token expiration logic. The rate limiter (rate_limiter.py) is not thread-safe, which can lead to race conditions in a concurrent environment, and it also contains duplicated code. My review provides suggestions to fix these critical and high-severity issues.
| from typing import Optional | ||
|
|
||
| # Security settings | ||
| SECRET_KEY = "your-secret-key" # Should be loaded from config |
There was a problem hiding this comment.
Hardcoding secrets like SECRET_KEY is a critical security vulnerability. The comment indicates it should be loaded from config, but the implementation uses a hardcoded value. This secret must be loaded from a secure source, such as environment variables or a secret management service, to prevent unauthorized access and token forgery in production environments. You should import os and use os.getenv('SECRET_KEY'). The application should fail to start if the key is not set in production.
| SECRET_KEY = "your-secret-key" # Should be loaded from config | |
| SECRET_KEY = os.getenv("SECRET_KEY") # Should be loaded from config |
| class RateLimiter: | ||
| def __init__(self, max_requests: int = 100, window_seconds: int = 3600): | ||
| self.max_requests = max_requests | ||
| self.window_seconds = window_seconds | ||
| self.requests = defaultdict(list) | ||
|
|
||
| def is_allowed(self, identifier: str) -> bool: | ||
| now = time.time() | ||
| window_start = now - self.window_seconds | ||
| self.requests[identifier] = [ | ||
| timestamp for timestamp in self.requests[identifier] | ||
| if timestamp > window_start | ||
| ] | ||
| if len(self.requests[identifier]) < self.max_requests: | ||
| self.requests[identifier].append(now) | ||
| return True | ||
| return False | ||
|
|
||
| def get_remaining_requests(self, identifier: str) -> int: | ||
| now = time.time() | ||
| window_start = now - self.window_seconds | ||
| self.requests[identifier] = [ | ||
| timestamp for timestamp in self.requests[identifier] | ||
| if timestamp > window_start | ||
| ] | ||
| return max(0, self.max_requests - len(self.requests[identifier])) No newline at end of file |
There was a problem hiding this comment.
This RateLimiter implementation has two issues:
- It is not thread-safe. In a concurrent environment, multiple requests for the same
identifiercan cause a race condition, potentially allowing more requests thanmax_requests. - Code is duplicated. The logic to clean up old request timestamps is repeated in
is_allowedandget_remaining_requests.
Both issues can be solved by introducing a lock (from the threading module) for thread safety and refactoring the cleanup logic into a private method. You will need to add import threading at the top of the file.
class RateLimiter:
def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
self._lock = threading.Lock()
def _cleanup_requests(self, identifier: str):
"""Removes timestamps outside the current window. Must be called within a lock."""
now = time.time()
window_start = now - self.window_seconds
self.requests[identifier] = [
timestamp for timestamp in self.requests[identifier]
if timestamp > window_start
]
def is_allowed(self, identifier: str) -> bool:
with self._lock:
self._cleanup_requests(identifier)
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(time.time())
return True
return False
def get_remaining_requests(self, identifier: str) -> int:
with self._lock:
self._cleanup_requests(identifier)
return max(0, self.max_requests - len(self.requests[identifier]))| if expires_delta: | ||
| expire = datetime.utcnow() + expires_delta | ||
| else: | ||
| expire = datetime.utcnow() + timedelta(minutes=15) |
There was a problem hiding this comment.
The ACCESS_TOKEN_EXPIRE_MINUTES constant is defined as 30, but the default expiration time for the token is hardcoded to 15 minutes. This is inconsistent and can be misleading. The default expiration should use the defined constant to ensure consistency.
| expire = datetime.utcnow() + timedelta(minutes=15) | |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (3)
src/security/rate_limiter.py (2)
2-3: Remove unused imports.
datetime,timedelta, andOptionalaren’t used.-from datetime import datetime, timedelta -from typing import Optional +# (removed unused imports)
6-11: Make limiter thread-safe and deduplicate cleanup (race condition).Concurrent calls can interleave on
self.requests[identifier], allowing bursts beyondmax_requests. Also, the window-trim logic is duplicated. Add a lock and extract a_cleanuphelper.@@ -from collections import defaultdict +from collections import defaultdict +import threading @@ class RateLimiter: def __init__(self, max_requests: int = 100, window_seconds: int = 3600): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = defaultdict(list) + self._lock = threading.Lock() + def _cleanup(self, identifier: str, now: float) -> None: + window_start = now - self.window_seconds + self.requests[identifier] = [ + ts for ts in self.requests[identifier] if ts > window_start + ] + def is_allowed(self, identifier: str) -> bool: - now = time.time() - window_start = now - self.window_seconds - self.requests[identifier] = [ - timestamp for timestamp in self.requests[identifier] - if timestamp > window_start - ] - if len(self.requests[identifier]) < self.max_requests: - self.requests[identifier].append(now) - return True - return False + now = time.time() + with self._lock: + self._cleanup(identifier, now) + if len(self.requests[identifier]) < self.max_requests: + self.requests[identifier].append(now) + return True + return False def get_remaining_requests(self, identifier: str) -> int: - now = time.time() - window_start = now - self.window_seconds - self.requests[identifier] = [ - timestamp for timestamp in self.requests[identifier] - if timestamp > window_start - ] - return max(0, self.max_requests - len(self.requests[identifier])) + now = time.time() + with self._lock: + self._cleanup(identifier, now) + return max(0, self.max_requests - len(self.requests[identifier]))Also applies to: 12-22, 24-31
src/security/auth.py (1)
8-11: Do not hardcode SECRET_KEY. Load from env and fail fast if missing.Hardcoded secrets are a critical risk.
-from fastapi import Depends, HTTPException, status +from fastapi import Depends, HTTPException, status +import os @@ -# Security settings -SECRET_KEY = "your-secret-key" # Should be loaded from config +# Security settings +SECRET_KEY = os.environ.get("SECRET_KEY") +if not SECRET_KEY: + raise RuntimeError("SECRET_KEY environment variable not set")
🧹 Nitpick comments (12)
src/rate_limiter.py (3)
12-13: Forward-proxy headers for IP — optional hardening.Behind load balancers,
remote_addrmay be the proxy. The patch above readsX-Forwarded-For; also consider enabling ProxyFix at the app layer.
16-17: Return JSON for 429 to keep API responses consistent.Flask’s
abortyields HTML by default. Either register a 429 error handler, or return a JSON response here.Example:
-from flask import abort, request +from flask import abort, request, jsonify, make_response ... - if len(REQUEST_LOG[client_ip]) >= max_requests: - abort(429, description="Rate limit exceeded") + if len(REQUEST_LOG[client_ip]) >= max_requests: + abort(make_response(jsonify(error="Rate limit exceeded"), 429))
5-9: Thread-safety and memory growth notes.Module-level lists aren’t synchronized and can grow unbounded by client key count. For anything beyond dev, move to Redis with atomic ops and TTL.
src/unified_api_server.py (1)
19-20: Bind host/port from env and avoid 0.0.0.0 by default.Keep 0.0.0.0 only when explicitly requested.
Apply:
-if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) +if __name__ == '__main__': + app.run(host=os.getenv('HOST', '127.0.0.1'), port=int(os.getenv('PORT', '5000')))src/security/rate_limiter.py (4)
1-11: Optional: switch to deque for O(1) evictions under load.Lists reallocate on comprehension; deques let you popleft() old entries efficiently.
-from collections import defaultdict +from collections import defaultdict, deque @@ - self.requests = defaultdict(list) + self.requests = defaultdict(deque) @@ - def _cleanup(self, identifier: str, now: float) -> None: - window_start = now - self.window_seconds - self.requests[identifier] = [ - ts for ts in self.requests[identifier] if ts > window_start - ] + def _cleanup(self, identifier: str, now: float) -> None: + window_start = now - self.window_seconds + dq = self.requests[identifier] + while dq and dq[0] <= window_start: + dq.popleft()Also applies to: 12-22, 24-31
13-13: Optional: use monotonic clock to avoid NTP jumps.
time.monotonic()is safer for window math.- now = time.time() + now = time.monotonic()Also applies to: 25-25
6-11: Note: potential unbounded key growth.
defaultdictautovivifies identifiers and never drops empty lists/deques. Consider pruning empty buckets after_cleanupor using an LRU/TTL cache.
6-11: Consolidate rate limiter implementations
Duplicated in-memory rate limiting logic exists in src/security/rate_limiter.py and src/rate_limiter.py; merge into a single core implementation with adapters for framework-specific use.src/security/auth.py (4)
22-30: Align expiration with configured constant (avoid 15-min magic number).Use
ACCESS_TOKEN_EXPIRE_MINUTESwhen no custom delta is provided.-def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): @@ - if expires_delta: + if expires_delta: expire = datetime.utcnow() + expires_delta else: - expire = datetime.utcnow() + timedelta(minutes=15) + expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
32-44: Chain JWT errors for easier debugging.Preserve original exception context.
- except JWTError: - raise credentials_exception + except JWTError as e: + raise credentials_exception from e
32-32: Ruff B008 on Depends is a known FastAPI false positive.Either ignore for this line or configure per-file ignore.
-async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): +async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): # noqa: B008
16-21: Add explicit return types.Improves readability and tooling.
-def verify_password(plain_password, hashed_password): +def verify_password(plain_password: str, hashed_password: str) -> bool: @@ -def get_password_hash(password): +def get_password_hash(password: str) -> str: @@ -def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: @@ -async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): # noqa: B008 +async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: # noqa: B008Also applies to: 22-22, 32-32
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/auth.py(1 hunks)src/rate_limiter.py(1 hunks)src/security/auth.py(1 hunks)src/security/rate_limiter.py(1 hunks)src/unified_api_server.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/auth.py (1)
src/rate_limiter.py (1)
decorated_function(11-19)
src/security/auth.py (1)
tests/unit/test_jwt_manager_extra.py (1)
utcnow(56-58)
src/unified_api_server.py (2)
src/auth.py (1)
require_api_key(4-11)src/rate_limiter.py (1)
rate_limit(8-21)
src/rate_limiter.py (1)
src/auth.py (1)
decorated_function(6-10)
🪛 ast-grep (0.38.6)
src/security/auth.py
[warning] 28-28: Hardcoded JWT secret or private key is used. This is a Insufficiently Protected Credentials weakness: https://cwe.mitre.org/data/definitions/522.html Consider using an appropriate security mechanism to protect the credentials (e.g. keeping secrets in environment variables).
Context: encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
Note: [CWE-522] Insufficiently Protected Credentials. [REFERENCES]
- https://semgrep.dev/blog/2020/hardcoded-secrets-unverified-tokens-and-other-common-jwt-mistakes/
(jwt-python-hardcoded-secret-python)
🪛 Ruff (0.12.2)
src/security/auth.py
9-9: Possible hardcoded password assigned to: "SECRET_KEY"
(S105)
32-32: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
44-44: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
src/unified_api_server.py
20-20: Possible binding to all interfaces
(S104)
src/rate_limiter.py
8-8: Redefinition of unused rate_limit from line 6
(F811)
10-10: Undefined name wraps
(F821)
12-12: Undefined name request
(F821)
🔇 Additional comments (1)
src/unified_api_server.py (1)
9-12: Health endpoint looks good.Simple, correct, and returns JSON 200.
| from functools import wraps | ||
| from flask import request, jsonify | ||
|
|
||
| def require_api_key(f): | ||
| @wraps(f) | ||
| def decorated_function(*args, **kwargs): | ||
| api_key = request.headers.get('X-API-Key') | ||
| if api_key != 'your-secret-key': # Replace with actual key or env var | ||
| return jsonify({'error': 'API key required'}), 401 | ||
| return f(*args, **kwargs) | ||
| return decorated_function |
There was a problem hiding this comment.
Hard-coded API key and non-constant-time compare — use env var + compare_digest.
Current code ships a plaintext secret and uses direct string equality. Replace with an env-driven secret and constant‑time comparison; also return a proper 401 with WWW-Authenticate.
Apply:
+import os
+import hmac
from functools import wraps
from flask import request, jsonify
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
- api_key = request.headers.get('X-API-Key')
- if api_key != 'your-secret-key': # Replace with actual key or env var
- return jsonify({'error': 'API key required'}), 401
+ API_KEY = os.getenv('API_KEY', '')
+ # Prefer X-API-Key; optionally support "Authorization: Api-Key <key>"
+ header_key = request.headers.get('X-API-Key') or (
+ request.headers.get('Authorization').split(' ', 1)[1]
+ if request.headers.get('Authorization', '').startswith('Api-Key ')
+ else None
+ )
+ if not API_KEY or not header_key or not hmac.compare_digest(header_key, API_KEY):
+ resp = jsonify({'error': 'Unauthorized'})
+ resp.status_code = 401
+ resp.headers['WWW-Authenticate'] = 'ApiKey realm="api"'
+ return resp
return f(*args, **kwargs)
return decorated_function📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from functools import wraps | |
| from flask import request, jsonify | |
| def require_api_key(f): | |
| @wraps(f) | |
| def decorated_function(*args, **kwargs): | |
| api_key = request.headers.get('X-API-Key') | |
| if api_key != 'your-secret-key': # Replace with actual key or env var | |
| return jsonify({'error': 'API key required'}), 401 | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| import os | |
| import hmac | |
| from functools import wraps | |
| from flask import request, jsonify | |
| def require_api_key(f): | |
| @wraps(f) | |
| def decorated_function(*args, **kwargs): | |
| # Load secret from environment | |
| API_KEY = os.getenv('API_KEY', '') | |
| # Prefer X-API-Key; optionally support "Authorization: Api-Key <key>" | |
| header_key = request.headers.get('X-API-Key') or ( | |
| request.headers.get('Authorization').split(' ', 1)[1] | |
| if request.headers.get('Authorization', '').startswith('Api-Key ') | |
| else None | |
| ) | |
| # Constant-time compare and ensure key is set | |
| if not API_KEY or not header_key or not hmac.compare_digest(header_key, API_KEY): | |
| resp = jsonify({'error': 'Unauthorized'}) | |
| resp.status_code = 401 | |
| resp.headers['WWW-Authenticate'] = 'ApiKey realm="api"' | |
| return resp | |
| return f(*args, **kwargs) | |
| return decorated_function |
🤖 Prompt for AI Agents
In src/auth.py around lines 1 to 11, the code currently hard-codes the API key
and uses direct string equality; change it to read the secret from an
environment variable (e.g., os.environ.get('API_KEY', '')) and use
hmac.compare_digest() for constant-time comparison to avoid timing attacks;
update imports to include os and hmac, retrieve the provided key from
request.headers.get('X-API-Key', '') (defaulting to empty string), compare with
compare_digest, and on failure return a 401 response that includes the
WWW-Authenticate header (e.g., {'error':'API key required'}, 401,
{'WWW-Authenticate':'API key'}) rather than just a JSON body.
| def require_api_key(f): | ||
| @wraps(f) | ||
| def decorated_function(*args, **kwargs): | ||
| api_key = request.headers.get('X-API-Key') | ||
| if api_key != 'your-secret-key': # Replace with actual key or env var | ||
| return jsonify({'error': 'API key required'}), 401 | ||
| return f(*args, **kwargs) | ||
| return decorated_function |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider rate-limiting failed auth attempts.
If the rate limiter runs only after successful auth, attackers can spray invalid keys. Either reorder decorators at callsites (rate limit outermost) or add a lightweight per-IP failure counter here.
🤖 Prompt for AI Agents
In src/auth.py around lines 4 to 11, the decorator allows unlimited failed API
key attempts which enables key-spraying; change callsite decorator order so a
rate-limiter is the outermost decorator or add a lightweight per-IP failure
counter here: on a failed API key lookup read client IP (respect
X-Forwarded-For), increment a short-lived counter in a shared store (Redis or an
in-process TTL cache), if the counter exceeds a configured threshold return a
429 or small backoff delay, and reset/decay the counter on successful auth;
ensure counters use a configurable window and avoid blocking legitimate proxied
IPs by correctly handling forwarded headers and make the API key compare use a
secure env var rather than a hardcoded string.
| from collections import defaultdict | ||
| from datetime import datetime, timedelta | ||
| from flask import abort, current_app | ||
|
|
There was a problem hiding this comment.
Missing imports and unused import break runtime (F821) and linting.
wraps and request are used but not imported; current_app is unused.
Apply:
from collections import defaultdict
from datetime import datetime, timedelta
-from flask import abort, current_app
+from flask import abort, request
+from functools import wraps📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| from flask import abort, current_app | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| from flask import abort, request | |
| from functools import wraps |
🤖 Prompt for AI Agents
In src/rate_limiter.py lines 1-4, the module currently imports current_app but
uses wraps and request without importing them; remove the unused current_app
import and add the missing imports from functools and flask by importing wraps
from functools and request from flask so the decorator and request references
resolve and lint/runtime errors are eliminated.
| # Simple rate limiter using memory (use Redis for production) | ||
| rate_limit = defaultdict(list) | ||
|
|
||
| def rate_limit(max_requests=100, window_minutes=1): | ||
| def decorator(f): |
There was a problem hiding this comment.
Name collision: rate_limit dict overshadowed by rate_limit function → TypeError at runtime.
After defining the function, rate_limit refers to the function, so subscripting it (rate_limit[client_ip]) will blow up.
Apply:
# Simple rate limiter using memory (use Redis for production)
-rate_limit = defaultdict(list)
+REQUEST_LOG = defaultdict(list)
def rate_limit(max_requests=100, window_minutes=1):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
- client_ip = request.remote_addr
+ # Be proxy-aware
+ forwarded_for = request.headers.get('X-Forwarded-For')
+ client_ip = forwarded_for.split(',')[0].strip() if forwarded_for else (request.remote_addr or 'unknown')
now = datetime.utcnow()
window_start = now - timedelta(minutes=window_minutes)
- rate_limit[client_ip] = [req_time for req_time in rate_limit[client_ip] if req_time > window_start]
- if len(rate_limit[client_ip]) >= max_requests:
+ REQUEST_LOG[client_ip] = [t for t in REQUEST_LOG[client_ip] if t > window_start]
+ if len(REQUEST_LOG[client_ip]) >= max_requests:
abort(429, description="Rate limit exceeded")
- rate_limit[client_ip].append(now)
+ REQUEST_LOG[client_ip].append(now)
return f(*args, **kwargs)Also applies to: 15-19
🧰 Tools
🪛 Ruff (0.12.2)
8-8: Redefinition of unused rate_limit from line 6
(F811)
🤖 Prompt for AI Agents
In src/rate_limiter.py around lines 5-9 (and similarly lines 15-19), the
top-level dict named rate_limit shadows the decorator function of the same name
causing TypeError when the code later subscripts the dict; rename the storage
variable (e.g., rate_limit_store or requests_by_ip) and update all references
inside the module and inside the decorator to use that new name, leaving the
decorator function named rate_limit; ensure imports or other modules using the
storage variable are updated if referenced elsewhere.
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | ||
| security = HTTPBearer() | ||
|
|
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Verify dependencies are declared.
Requires fastapi, python-jose, passlib[bcrypt]. Ensure they’re in requirements/pyproject.
🏁 Script executed:
#!/bin/bash
# Check for required deps
rg -nP '^(fastapi|python-jose|passlib(\[bcrypt\])?)' requirements* pyproject.toml setup.cfg setup.py || trueLength of output: 152
🏁 Script executed:
#!/bin/bash
# Locate dependency declaration files and verify required packages
dep_files=$(find . -maxdepth 2 -type f \( -name 'pyproject.toml' -o -name 'requirements.txt' \))
echo "Found files:"
echo "$dep_files"
for f in $dep_files; do
echo "---- Checking $f ----"
grep -nE 'fastapi|python-jose|passlib(\[bcrypt\])?' "$f" || echo "No required deps in $f"
doneLength of output: 265
Declare missing dependencies
python-jose and passlib[bcrypt] aren’t listed in your dependency files—add them to pyproject.toml (under [tool.poetry.dependencies]) or dependencies/requirements.txt alongside fastapi to ensure the authentication code will install correctly.
🤖 Prompt for AI Agents
In src/security/auth.py around lines 13 to 15, the code uses CryptContext and
likely jose for JWT handling but the required packages are not declared; add
"passlib[bcrypt]" and "python-jose" to your project's dependencies (either under
[tool.poetry.dependencies] in pyproject.toml or in dependencies/requirements.txt
alongside fastapi), pin appropriate versions or use compatible ranges, and then
run your package manager (poetry install or pip install -r requirements.txt) to
ensure the authentication code can import and run.
| from flask import Flask, jsonify | ||
| from flask_cors import CORS | ||
| from auth import require_api_key | ||
| from rate_limiter import rate_limit | ||
|
|
||
| app = Flask(__name__) | ||
| CORS(app) # Enable CORS for all routes | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
CORS: restrict origins via env; default can remain permissive for dev.
Avoid blanket “allow-all” in prod. Load allowed origins from CORS_ORIGINS.
Apply:
-from flask import Flask, jsonify
+import os
+from flask import Flask, jsonify
from flask_cors import CORS
from auth import require_api_key
from rate_limiter import rate_limit
app = Flask(__name__)
-CORS(app) # Enable CORS for all routes
+# Restrict in prod via CORS_ORIGINS="https://app.example.com,https://admin.example.com"
+origins_env = os.getenv('CORS_ORIGINS', '').strip()
+if origins_env:
+ origins = [o.strip() for o in origins_env.split(',') if o.strip()]
+ CORS(app, resources={r"/api/*": {"origins": origins}})
+else:
+ CORS(app) # permissive default for dev📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from flask import Flask, jsonify | |
| from flask_cors import CORS | |
| from auth import require_api_key | |
| from rate_limiter import rate_limit | |
| app = Flask(__name__) | |
| CORS(app) # Enable CORS for all routes | |
| import os | |
| from flask import Flask, jsonify | |
| from flask_cors import CORS | |
| from auth import require_api_key | |
| from rate_limiter import rate_limit | |
| app = Flask(__name__) | |
| # Restrict CORS origins in prod via CORS_ORIGINS="https://app.example.com,https://admin.example.com" | |
| origins_env = os.getenv('CORS_ORIGINS', '').strip() | |
| if origins_env: | |
| origins = [o.strip() for o in origins_env.split(',') if o.strip()] | |
| CORS(app, resources={r"/api/*": {"origins": origins}}) | |
| else: | |
| CORS(app) # permissive default for development |
🤖 Prompt for AI Agents
In src/unified_api_server.py lines 1-8, CORS is currently enabled permissively
for all origins; change it to read allowed origins from the CORS_ORIGINS
environment variable and use that list when initializing CORS (fall back to the
existing permissive behavior only when CORS_ORIGINS is not set or when running
in a development environment). Concretely: import os, read
os.getenv("CORS_ORIGINS"), split on commas into a list, trim whitespace, and
pass that list as the origins configuration to CORS when creating the app; keep
a safe default that allows all origins only if no env var is provided (or when
FLASK_ENV indicates development). Ensure the code handles a single origin and
multiple origins uniformly and does not hardcode "*" in production.
| from auth import require_api_key | ||
| from rate_limiter import rate_limit |
There was a problem hiding this comment.
💡 Verification agent
🧩 Analysis chain
Verify import style based on packaging layout.
from auth import ... assumes auth.py is importable from sys.path root. If src is a package, use from src.auth import ... or relative imports.
Run:
Expected:
- If no
src/__init__.py, top-level imports may fail when running as a module; considerPYTHONPATH=srcor packaging. - If
src/__init__.pyexists, preferfrom src.auth import require_api_key.
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Is src a package?
fd -a '^__init__\.py$' src || true
# Where are direct imports used?
rg -nP 'from (auth|rate_limiter) import 'Length of output: 637
Use package-relative imports in src
In src/unified_api_server.py (lines 3–4), update to:
- from auth import require_api_key
- from rate_limiter import rate_limit
+ from .auth import require_api_key
+ from .rate_limiter import rate_limitApply the same convention across all modules under src/ and adjust imports in deployment/cloud-run/ (e.g. secure_api_server.py:20, debug_api_import.py:85).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from auth import require_api_key | |
| from rate_limiter import rate_limit | |
| from .auth import require_api_key | |
| from .rate_limiter import rate_limit |
🤖 Prompt for AI Agents
In src/unified_api_server.py around lines 3-4, change the top-level imports to
use package-qualified imports (e.g., import from src.auth and src.rate_limiter
instead of bare module names) and ensure you apply the same package-relative
(package-qualified) convention across all modules under src/; also update any
imports in deployment/cloud-run (for example secure_api_server.py line ~20 and
debug_api_import.py line ~85) to import from the src package so all references
resolve consistently when the project is installed/run as a package.
| @app.route('/api/protected', methods=['POST']) | ||
| @require_api_key | ||
| @rate_limit(max_requests=10, window_minutes=1) | ||
| def protected(): |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Decorator order: rate-limit should wrap auth to throttle invalid-key hammering.
Current order runs auth first. Swap them so the limiter executes before auth.
Apply:
@app.route('/api/protected', methods=['POST'])
-@require_api_key
-@rate_limit(max_requests=10, window_minutes=1)
+@rate_limit(max_requests=10, window_minutes=1)
+@require_api_key
def protected():
return jsonify({'message': 'Protected endpoint'})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @app.route('/api/protected', methods=['POST']) | |
| @require_api_key | |
| @rate_limit(max_requests=10, window_minutes=1) | |
| def protected(): | |
| @app.route('/api/protected', methods=['POST']) | |
| @rate_limit(max_requests=10, window_minutes=1) | |
| @require_api_key | |
| def protected(): | |
| return jsonify({'message': 'Protected endpoint'}) |
🤖 Prompt for AI Agents
In src/unified_api_server.py around lines 13 to 16, the decorators are ordered
so authentication runs before rate limiting, allowing repeated invalid-key
attempts to bypass throttling; swap the decorator order so rate_limit is the
outermost decorator (appears above require_api_key) so the rate limiter executes
before authentication, i.e., place @rate_limit(max_requests=10,
window_minutes=1) on the line immediately above @require_api_key.
|
Closing redundant micro-PR: API middleware functionality is now covered by the merged Phase 3A fortress-protected rate limiting system (PR #180). This approach maintains quality gates and follows fortress development principles. |
Implements API middleware including:
Files changed: 3
Lines: ~150
Closes micro-PR-6 from tracking PR #147
Summary by Sourcery
Implement middleware for API security, request throttling, and CORS, and integrate them into the unified API server
New Features:
Enhancements:
Summary by CodeRabbit
New Features
Security
Infrastructure