Skip to content

Latest commit

 

History

History
368 lines (293 loc) · 11.4 KB

File metadata and controls

368 lines (293 loc) · 11.4 KB

Layer 6: Authentication and Authorization

This layer ensures that only authorized users can create containers and that each container has a cryptographically-bound identity. JSON Web Tokens (JWTs) encode container permissions and are signed to prevent tampering.

JWT Authentication

JWTs are used to link containers to their allowed network destinations and to authenticate the container management API. Each token contains claims about the container's identity, allowed hosts, and expiration time.

JWT Structure

Header:

{
  "typ": "JWT",
  "alg": "ES256",
  "kid": "your-ecdsa-key-id"
}

Payload:

{
  "iss": "secure-egress-control",
  "organization_uuid": "00000000-0000-0000-0000-000000000000",
  "iat": 1700000000,
  "exp": 1700014400,
  "allowed_hosts": "api.example.com,github.com,...",
  "container_id": "sandbox_session_xyz123",
  "enforce_container_binding": "true",
  "use_egress_gateway": "true"
}

Implementation Steps

The following steps create an ES256 key pair (Elliptic Curve P-256 with ECDSA) which provides strong security with smaller key sizes compared to RSA. The private key signs tokens; the public key validates them.

1. Key Generation (ES256)

The following commands generate an ES256 (ECDSA with P-256 curve) key pair. ES256 provides equivalent security to RSA 2048 but with much smaller key sizes (256 bits vs 2048 bits), making JWTs more compact.

# Generate ES256 key pair
openssl ecparam -genkey -name prime256v1 -noout -out private-key.pem
openssl ec -in private-key.pem -pubout -out public-key.pem

# Extract public key in JWK format
python3 << 'PYTHON'
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import base64
import json

with open('public-key.pem', 'rb') as f:
    public_key = serialization.load_pem_public_key(f.read(), backend=default_backend())

numbers = public_key.public_numbers()
x = base64.urlsafe_b64encode(numbers.x.to_bytes(32, byteorder='big')).decode('utf-8').rstrip('=')
y = base64.urlsafe_b64encode(numbers.y.to_bytes(32, byteorder='big')).decode('utf-8').rstrip('=')

jwk = {
    "kty": "EC",
    "crv": "P-256",
    "x": x,
    "y": y,
    "use": "sig",
    "kid": "your-key-id"
}

print(json.dumps(jwk, indent=2))
PYTHON

2. Token Generation Service

File: auth-service.py

#!/usr/bin/env python3
import jwt
import time
import secrets
from typing import List, Optional
from datetime import datetime, timedelta

class AuthService:
    def __init__(self, private_key_path: str, key_id: str):
        with open(private_key_path, 'r') as f:
            self.private_key = f.read()
        self.key_id = key_id

    def generate_container_token(self,
                                 container_id: str,
                                 user_id: str,
                                 allowed_hosts: List[str],
                                 duration_hours: int = 4,
                                 enforce_binding: bool = True) -> str:
        """
        Generate JWT for container access

        CRITICAL: Set enforce_container_binding=true for production
        """
        now = int(time.time())

        payload = {
            # Standard claims
            "iss": "your-service-egress-control",
            "iat": now,
            "exp": now + (duration_hours * 3600),
            "jti": secrets.token_urlsafe(32),  # Unique token ID

            # Custom claims
            "container_id": container_id,
            "organization_uuid": user_id,
            "allowed_hosts": ",".join(allowed_hosts),

            # Security flags
            "enforce_container_binding": "true" if enforce_binding else "false",
            "use_egress_gateway": "true",
            "enforce_centralized_egress": "true",

            # Compliance flags (customize per user)
            "is_hipaa_regulated": "false",
            "is_pci_compliant": "false"
        }

        token = jwt.encode(
            payload,
            self.private_key,
            algorithm="ES256",
            headers={"kid": self.key_id}
        )

        return token

    def validate_token(self, token: str, public_key: str) -> Optional[dict]:
        """Validate JWT token"""
        try:
            payload = jwt.decode(
                token,
                public_key,
                algorithms=["ES256"],
                options={"verify_exp": True}
            )
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

# Usage
if __name__ == '__main__':
    auth = AuthService('private-key.pem', 'your-key-id')

    token = auth.generate_container_token(
        container_id="sandbox_session_def456",
        user_id="user_789",
        allowed_hosts=["api.example.com", "github.com"],
        enforce_binding=True
    )

    print(f"Token: {token}")

3. Token Validation Middleware

This Flask decorator protects API endpoints by validating JWT tokens from incoming requests. It extracts the container ID from the token and can optionally enforce container binding (matching the token to the requesting container).

File: auth-middleware.py

from functools import wraps
from flask import request, jsonify
import jwt

def require_valid_token(public_key: str):
    """Decorator for endpoint authentication"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Extract token from header
            auth_header = request.headers.get('Authorization')
            if not auth_header or not auth_header.startswith('Bearer '):
                return jsonify({'error': 'Missing token'}), 401

            token = auth_header.split(' ')[1]

            try:
                # Validate token
                payload = jwt.decode(
                    token,
                    public_key,
                    algorithms=["ES256"]
                )

                # Check container binding if enforced
                if payload.get('enforce_container_binding') == 'true':
                    container_id = request.headers.get('X-Container-ID')
                    if container_id != payload.get('container_id'):
                        return jsonify({'error': 'Container ID mismatch'}), 403

                # Add payload to request context
                request.jwt_payload = payload

            except jwt.ExpiredSignatureError:
                return jsonify({'error': 'Token expired'}), 401
            except jwt.InvalidTokenError:
                return jsonify({'error': 'Invalid token'}), 401

            return f(*args, **kwargs)
        return decorated_function
    return decorator

Container Management

This class manages the lifecycle of sandbox containers. It creates containers with proper security settings, generates JWT tokens for network access, and handles cleanup when containers are destroyed.

File: container-manager.py

#!/usr/bin/env python3
import docker
import secrets
from typing import Dict
from auth_service import AuthService

class ContainerManager:
    def __init__(self):
        self.client = docker.from_env()
        self.auth_service = AuthService('private-key.pem', 'key-id')
        self.active_containers: Dict[str, str] = {}

    def create_sandbox(self, user_id: str, allowed_hosts: list) -> Dict:
        """Create isolated sandbox container"""

        # Generate unique container ID
        container_id = f"container_{secrets.token_urlsafe(16)}"

        # Generate JWT
        token = self.auth_service.generate_container_token(
            container_id=container_id,
            user_id=user_id,
            allowed_hosts=allowed_hosts,
            enforce_binding=True
        )

        # Create container with security settings
        container = self.client.containers.run(
            image='sandbox-image:latest',
            name=container_id,
            runtime='runsc',
            detach=True,

            # Network isolation
            network_mode='none',

            # Environment variables
            environment={
                'CONTAINER_ID': container_id,
                'HTTP_PROXY': f'http://container_{container_id}:jwt_{token}@proxy:8443',
                'HTTPS_PROXY': f'http://container_{container_id}:jwt_{token}@proxy:8443',
                'NO_PROXY': 'localhost,127.0.0.1'
            },

            # Resource limits
            mem_limit='4g',
            cpu_quota=200000,
            pids_limit=100,

            # Security options
            cap_drop=['ALL'],
            cap_add=['CHOWN', 'SETUID', 'SETGID', 'NET_BIND_SERVICE'],
            security_opt=['no-new-privileges:true'],

            # Read-only root filesystem
            read_only=True,
            tmpfs={'/tmp': 'size=1G,mode=1777'},

            # Volumes
            volumes={
                f'/srv/sandbox/users/{user_id}/uploads': {
                    'bind': '/mnt/uploads',
                    'mode': 'ro'
                },
                f'/srv/sandbox/users/{user_id}/outputs': {
                    'bind': '/mnt/outputs',
                    'mode': 'rw'
                },
                '/srv/sandbox/skills': {
                    'bind': '/mnt/skills',
                    'mode': 'ro'
                }
            }
        )

        self.active_containers[container_id] = container.id

        return {
            'container_id': container_id,
            'token': token,
            'status': 'created'
        }

    def destroy_sandbox(self, container_id: str):
        """Destroy sandbox and cleanup"""
        if container_id in self.active_containers:
            container = self.client.containers.get(self.active_containers[container_id])
            container.stop(timeout=5)
            container.remove(force=True)
            del self.active_containers[container_id]

# Usage
if __name__ == '__main__':
    manager = ContainerManager()

    sandbox = manager.create_sandbox(
        user_id='user_123',
        allowed_hosts=['api.example.com', 'github.com']
    )

    print(f"Created sandbox: {sandbox['container_id']}")
    print(f"Token: {sandbox['token']}")

Session Store

Redis stores active session data for audit and management purposes. Sessions automatically expire when tokens expire, ensuring stale sessions don't accumulate.

File: session-store.py

import redis
import json
from datetime import timedelta

class SessionStore:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis = redis.Redis(host=redis_host, port=redis_port)

    def create_session(self, container_id: str, user_id: str,
                      token: str, ttl_hours: int = 4):
        """Store session info"""
        session_data = {
            'container_id': container_id,
            'user_id': user_id,
            'token': token,
            'created_at': str(datetime.now())
        }

        # Store with expiration
        self.redis.setex(
            f"session:{container_id}",
            timedelta(hours=ttl_hours),
            json.dumps(session_data)
        )

    def get_session(self, container_id: str):
        """Retrieve session"""
        data = self.redis.get(f"session:{container_id}")
        return json.loads(data) if data else None

    def delete_session(self, container_id: str):
        """Delete session"""
        self.redis.delete(f"session:{container_id}")