This layer ensures that only authorized users can create containers and that each container has a cryptographically-bound identity. JSON Web Tokens (JWTs) encode container permissions and are signed to prevent tampering.
JWTs are used to link containers to their allowed network destinations and to authenticate the container management API. Each token contains claims about the container's identity, allowed hosts, and expiration time.
Header:
{
"typ": "JWT",
"alg": "ES256",
"kid": "your-ecdsa-key-id"
}Payload:
{
"iss": "secure-egress-control",
"organization_uuid": "00000000-0000-0000-0000-000000000000",
"iat": 1700000000,
"exp": 1700014400,
"allowed_hosts": "api.example.com,github.com,...",
"container_id": "sandbox_session_xyz123",
"enforce_container_binding": "true",
"use_egress_gateway": "true"
}The following steps create an ES256 key pair (Elliptic Curve P-256 with ECDSA) which provides strong security with smaller key sizes compared to RSA. The private key signs tokens; the public key validates them.
1. Key Generation (ES256)
The following commands generate an ES256 (ECDSA with P-256 curve) key pair. ES256 provides equivalent security to RSA 2048 but with much smaller key sizes (256 bits vs 2048 bits), making JWTs more compact.
# Generate ES256 key pair
openssl ecparam -genkey -name prime256v1 -noout -out private-key.pem
openssl ec -in private-key.pem -pubout -out public-key.pem
# Extract public key in JWK format
python3 << 'PYTHON'
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import base64
import json
with open('public-key.pem', 'rb') as f:
public_key = serialization.load_pem_public_key(f.read(), backend=default_backend())
numbers = public_key.public_numbers()
x = base64.urlsafe_b64encode(numbers.x.to_bytes(32, byteorder='big')).decode('utf-8').rstrip('=')
y = base64.urlsafe_b64encode(numbers.y.to_bytes(32, byteorder='big')).decode('utf-8').rstrip('=')
jwk = {
"kty": "EC",
"crv": "P-256",
"x": x,
"y": y,
"use": "sig",
"kid": "your-key-id"
}
print(json.dumps(jwk, indent=2))
PYTHON2. Token Generation Service
File: auth-service.py
#!/usr/bin/env python3
import jwt
import time
import secrets
from typing import List, Optional
from datetime import datetime, timedelta
class AuthService:
def __init__(self, private_key_path: str, key_id: str):
with open(private_key_path, 'r') as f:
self.private_key = f.read()
self.key_id = key_id
def generate_container_token(self,
container_id: str,
user_id: str,
allowed_hosts: List[str],
duration_hours: int = 4,
enforce_binding: bool = True) -> str:
"""
Generate JWT for container access
CRITICAL: Set enforce_container_binding=true for production
"""
now = int(time.time())
payload = {
# Standard claims
"iss": "your-service-egress-control",
"iat": now,
"exp": now + (duration_hours * 3600),
"jti": secrets.token_urlsafe(32), # Unique token ID
# Custom claims
"container_id": container_id,
"organization_uuid": user_id,
"allowed_hosts": ",".join(allowed_hosts),
# Security flags
"enforce_container_binding": "true" if enforce_binding else "false",
"use_egress_gateway": "true",
"enforce_centralized_egress": "true",
# Compliance flags (customize per user)
"is_hipaa_regulated": "false",
"is_pci_compliant": "false"
}
token = jwt.encode(
payload,
self.private_key,
algorithm="ES256",
headers={"kid": self.key_id}
)
return token
def validate_token(self, token: str, public_key: str) -> Optional[dict]:
"""Validate JWT token"""
try:
payload = jwt.decode(
token,
public_key,
algorithms=["ES256"],
options={"verify_exp": True}
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# Usage
if __name__ == '__main__':
auth = AuthService('private-key.pem', 'your-key-id')
token = auth.generate_container_token(
container_id="sandbox_session_def456",
user_id="user_789",
allowed_hosts=["api.example.com", "github.com"],
enforce_binding=True
)
print(f"Token: {token}")3. Token Validation Middleware
This Flask decorator protects API endpoints by validating JWT tokens from incoming requests. It extracts the container ID from the token and can optionally enforce container binding (matching the token to the requesting container).
File: auth-middleware.py
from functools import wraps
from flask import request, jsonify
import jwt
def require_valid_token(public_key: str):
"""Decorator for endpoint authentication"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract token from header
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing token'}), 401
token = auth_header.split(' ')[1]
try:
# Validate token
payload = jwt.decode(
token,
public_key,
algorithms=["ES256"]
)
# Check container binding if enforced
if payload.get('enforce_container_binding') == 'true':
container_id = request.headers.get('X-Container-ID')
if container_id != payload.get('container_id'):
return jsonify({'error': 'Container ID mismatch'}), 403
# Add payload to request context
request.jwt_payload = payload
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated_function
return decoratorThis class manages the lifecycle of sandbox containers. It creates containers with proper security settings, generates JWT tokens for network access, and handles cleanup when containers are destroyed.
File: container-manager.py
#!/usr/bin/env python3
import docker
import secrets
from typing import Dict
from auth_service import AuthService
class ContainerManager:
def __init__(self):
self.client = docker.from_env()
self.auth_service = AuthService('private-key.pem', 'key-id')
self.active_containers: Dict[str, str] = {}
def create_sandbox(self, user_id: str, allowed_hosts: list) -> Dict:
"""Create isolated sandbox container"""
# Generate unique container ID
container_id = f"container_{secrets.token_urlsafe(16)}"
# Generate JWT
token = self.auth_service.generate_container_token(
container_id=container_id,
user_id=user_id,
allowed_hosts=allowed_hosts,
enforce_binding=True
)
# Create container with security settings
container = self.client.containers.run(
image='sandbox-image:latest',
name=container_id,
runtime='runsc',
detach=True,
# Network isolation
network_mode='none',
# Environment variables
environment={
'CONTAINER_ID': container_id,
'HTTP_PROXY': f'http://container_{container_id}:jwt_{token}@proxy:8443',
'HTTPS_PROXY': f'http://container_{container_id}:jwt_{token}@proxy:8443',
'NO_PROXY': 'localhost,127.0.0.1'
},
# Resource limits
mem_limit='4g',
cpu_quota=200000,
pids_limit=100,
# Security options
cap_drop=['ALL'],
cap_add=['CHOWN', 'SETUID', 'SETGID', 'NET_BIND_SERVICE'],
security_opt=['no-new-privileges:true'],
# Read-only root filesystem
read_only=True,
tmpfs={'/tmp': 'size=1G,mode=1777'},
# Volumes
volumes={
f'/srv/sandbox/users/{user_id}/uploads': {
'bind': '/mnt/uploads',
'mode': 'ro'
},
f'/srv/sandbox/users/{user_id}/outputs': {
'bind': '/mnt/outputs',
'mode': 'rw'
},
'/srv/sandbox/skills': {
'bind': '/mnt/skills',
'mode': 'ro'
}
}
)
self.active_containers[container_id] = container.id
return {
'container_id': container_id,
'token': token,
'status': 'created'
}
def destroy_sandbox(self, container_id: str):
"""Destroy sandbox and cleanup"""
if container_id in self.active_containers:
container = self.client.containers.get(self.active_containers[container_id])
container.stop(timeout=5)
container.remove(force=True)
del self.active_containers[container_id]
# Usage
if __name__ == '__main__':
manager = ContainerManager()
sandbox = manager.create_sandbox(
user_id='user_123',
allowed_hosts=['api.example.com', 'github.com']
)
print(f"Created sandbox: {sandbox['container_id']}")
print(f"Token: {sandbox['token']}")Redis stores active session data for audit and management purposes. Sessions automatically expire when tokens expire, ensuring stale sessions don't accumulate.
File: session-store.py
import redis
import json
from datetime import timedelta
class SessionStore:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis = redis.Redis(host=redis_host, port=redis_port)
def create_session(self, container_id: str, user_id: str,
token: str, ttl_hours: int = 4):
"""Store session info"""
session_data = {
'container_id': container_id,
'user_id': user_id,
'token': token,
'created_at': str(datetime.now())
}
# Store with expiration
self.redis.setex(
f"session:{container_id}",
timedelta(hours=ttl_hours),
json.dumps(session_data)
)
def get_session(self, container_id: str):
"""Retrieve session"""
data = self.redis.get(f"session:{container_id}")
return json.loads(data) if data else None
def delete_session(self, container_id: str):
"""Delete session"""
self.redis.delete(f"session:{container_id}")