Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Python bytecode cache files
__pycache__/
*.py[cod]
*$py.class

# Virtual Environment
venv/
testvenv/

# Database files
*.db
test.db

# Log files
ans_audit.log
ans_server.log
*.log

# Backups
*.bak

# Environment-specific files
.env
.env.*

# IDE files
.vscode/
.idea/

# Mac OS-specific files
.DS_Store
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file removed ans.db
Binary file not shown.
266 changes: 138 additions & 128 deletions ans/api/logging.py
Original file line number Diff line number Diff line change
@@ -1,143 +1,153 @@
"""
Security audit logging for the ANS API.
Logging configuration for the Agent Name Service.
"""
import json
import os
import logging
import json
from datetime import datetime
import os
from typing import Dict, Any, Optional
from fastapi import Request, Response

# Configure logging
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_LEVEL = os.environ.get("ANS_LOG_LEVEL", "INFO")
LOG_FILE = os.environ.get("ANS_LOG_FILE", "ans_audit.log")

# Create logger
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format=LOG_FORMAT,
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)

logger = logging.getLogger("ans-audit")
# Logger instances
server_logger = logging.getLogger("ans.server")
audit_logger = logging.getLogger("ans.audit")

def get_client_info(request: Request) -> Dict[str, Any]:
"""Get client information from the request."""
return {
"ip": request.client.host if request.client else None,
"user_agent": request.headers.get("user-agent"),
"referer": request.headers.get("referer"),
"method": request.method,
"url": str(request.url),
"path": request.url.path,
}

def log_request(request: Request, username: Optional[str] = None) -> None:
"""Log an incoming request."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "request",
"username": username or "anonymous",
"client": client_info,
"headers": dict(request.headers),
}

# Sanitize sensitive headers
if "authorization" in log_data["headers"]:
log_data["headers"]["authorization"] = "REDACTED"

logger.info(f"REQUEST: {json.dumps(log_data)}")

def log_response(request: Request, response: Response, username: Optional[str] = None, execution_time: Optional[float] = None) -> None:
"""Log a response."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "response",
"username": username or "anonymous",
"client": client_info,
"status_code": response.status_code,
"execution_time_ms": execution_time,
}

logger.info(f"RESPONSE: {json.dumps(log_data)}")
def setup_logging(server_log_path: str = None, audit_log_path: str = None) -> None:
"""
Set up logging for the ANS service.

Args:
server_log_path: Path to the server log file (defaults to tests/logs/ans_server.log)
audit_log_path: Path to the audit log file (defaults to tests/logs/ans_audit.log)
"""
# Set default log paths if not provided
if server_log_path is None:
# Use tests/logs directory by default
log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", "logs")
os.makedirs(log_dir, exist_ok=True)
server_log_path = os.path.join(log_dir, "ans_server.log")

if audit_log_path is None:
# Use tests/logs directory by default
log_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "tests", "logs")
os.makedirs(log_dir, exist_ok=True)
audit_log_path = os.path.join(log_dir, "ans_audit.log")

# Configure server logger
server_logger.setLevel(logging.INFO)
server_handler = logging.FileHandler(server_log_path)
server_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
server_handler.setFormatter(server_formatter)
server_logger.addHandler(server_handler)

# Configure console handler for server logger
console_handler = logging.StreamHandler()
console_handler.setFormatter(server_formatter)
server_logger.addHandler(console_handler)

# Configure audit logger
audit_logger.setLevel(logging.INFO)
audit_handler = logging.FileHandler(audit_log_path)
audit_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
audit_handler.setFormatter(audit_formatter)
audit_logger.addHandler(audit_handler)

def log_auth_success(username: str, request: Request) -> None:
"""Log a successful authentication."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "auth_success",
"username": username,
"client": client_info,
}

logger.info(f"AUTH SUCCESS: {json.dumps(log_data)}")
def log_request(request: Request) -> None:
"""
Log an incoming request.

Args:
request: The FastAPI request object
"""
server_logger.info(
f"Request: {request.method} {request.url.path} from {request.client.host} "
f"- User-Agent: {request.headers.get('user-agent', 'Unknown')}"
)

def log_auth_failure(username: str, request: Request, reason: str) -> None:
"""Log a failed authentication attempt."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "auth_failure",
"username": username,
"reason": reason,
"client": client_info,
}

logger.warning(f"AUTH FAILURE: {json.dumps(log_data)}")
def log_response(request: Request, response: Response, execution_time: float = None) -> None:
"""
Log a response to a request.

Args:
request: The FastAPI request object
response: The FastAPI response object
execution_time: The time taken to process the request (in ms)
"""
log_message = (
f"Response: {request.method} {request.url.path} from {request.client.host} "
f"- Status: {response.status_code}"
)

if execution_time is not None:
log_message += f" - Time: {execution_time:.2f}ms"

server_logger.info(log_message)

def log_security_event(event_type: str, details: Dict[str, Any], username: Optional[str] = None, request: Optional[Request] = None) -> None:
"""Log a security event."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": event_type,
"username": username or "system",
"details": details,
}

if request:
log_data["client"] = get_client_info(request)

logger.warning(f"SECURITY EVENT: {json.dumps(log_data)}")
def log_security_event(
event_type: str,
details: Dict[str, Any],
source: str,
request: Optional[Request] = None
) -> None:
"""
Log a security-related event.

Args:
event_type: Type of security event (e.g., "access_denied", "invalid_token")
details: Details of the event
source: Source of the event (e.g., "auth_service", "public_api")
request: Associated request object (if any)
"""
client_ip = request.client.host if request else "unknown"

log_message = (
f"Security event: {event_type} from {source} - IP: {client_ip} - "
f"Details: {json.dumps(details)}"
)

audit_logger.warning(log_message)
server_logger.warning(log_message)

def log_certificate_event(event_type: str, agent_id: str, details: Dict[str, Any], username: Optional[str] = None) -> None:
"""Log a certificate-related event."""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": f"certificate_{event_type}",
"username": username or "system",
"agent_id": agent_id,
"details": details,
}

logger.info(f"CERTIFICATE EVENT: {json.dumps(log_data)}")
def log_certificate_event(
event_type: str,
agent_id: str,
details: Dict[str, Any],
source: str
) -> None:
"""
Log a certificate-related event.

Args:
event_type: Type of certificate event (e.g., "issued", "revoked")
agent_id: ID of the agent the certificate belongs to
details: Details of the event
source: Source of the event (e.g., "ca_service", "public_api")
"""
log_message = (
f"Certificate event: {event_type} for agent {agent_id} from {source} - "
f"Details: {json.dumps(details)}"
)

audit_logger.info(log_message)
server_logger.info(log_message)

def log_rate_limit_exceeded(request: Request) -> None:
"""Log a rate limit exceeded event."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "rate_limit_exceeded",
"client": client_info,
}

logger.warning(f"RATE LIMIT EXCEEDED: {json.dumps(log_data)}")
"""
Log a rate limit exceeded event.

Args:
request: The FastAPI request object
"""
log_message = (
f"Rate limit exceeded: {request.method} {request.url.path} from {request.client.host}"
)

audit_logger.warning(log_message)
server_logger.warning(log_message)

def log_access_denied(request: Request, username: str, required_permission: str) -> None:
"""Log an access denied event."""
client_info = get_client_info(request)
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": "access_denied",
"username": username,
"required_permission": required_permission,
"client": client_info,
}

logger.warning(f"ACCESS DENIED: {json.dumps(log_data)}")
# Set up logging on import
setup_logging()
Loading