Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 73 additions & 50 deletions infrastructure/qa/enterprise_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import httpx
import numpy as np
import requests

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/home/vivi/pixelated/ai/logs/enterprise_validation.log"),
logging.FileHandler("enterprise_validation.log"),
logging.StreamHandler(),
],
)
Expand Down Expand Up @@ -208,22 +208,25 @@ async def run_authentication_test(self) -> ValidationResult:
start_time = time.time()

try:
# Test unauthenticated request
response = requests.get(
"http://localhost:8000/api/v1/protected", timeout=10
)
if response.status_code != 401:
raise Exception(f"Expected 401, got {response.status_code}")

# Test with invalid token
headers = {"Authorization": "Bearer invalid_token"}
response = requests.get(
"http://localhost:8000/api/v1/protected", headers=headers, timeout=10
)
if response.status_code != 401:
raise Exception(
f"Expected 401 for invalid token, got {response.status_code}"
async with httpx.AsyncClient() as client:
# Test unauthenticated request
response = await client.get(
"http://localhost:8000/api/v1/protected", timeout=10.0
)
if response.status_code != 401:
raise Exception(f"Expected 401, got {response.status_code}")

# Test with invalid token
headers = {"Authorization": "Bearer invalid_token"}
response = await client.get(
"http://localhost:8000/api/v1/protected",
headers=headers,
timeout=10.0,
)
if response.status_code != 401:
raise Exception(
f"Expected 401 for invalid token, got {response.status_code}"
)

# Test with valid token (simulated)
valid_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.test"
Expand Down Expand Up @@ -267,18 +270,23 @@ async def run_rate_limiting_test(self) -> ValidationResult:
request_count = 0
blocked_count = 0

for i in range(20): # Send 20 rapid requests
async def make_request(client):
nonlocal request_count, blocked_count
try:
response = requests.get(
"http://localhost:8000/api/v1/test", timeout=5
response = await client.get(
"http://localhost:8000/api/v1/test", timeout=5.0
)
request_count += 1
if response.status_code == 429: # Too Many Requests
blocked_count += 1
except requests.exceptions.RequestException:
except httpx.RequestError:
# Connection refused or timeout indicates rate limiting
blocked_count += 1

async with httpx.AsyncClient() as client:
tasks = [make_request(client) for _ in range(20)]
await asyncio.gather(*tasks)

execution_time = (time.time() - start_time) * 1000

# Rate limiting should block some requests
Expand All @@ -288,9 +296,11 @@ async def run_rate_limiting_test(self) -> ValidationResult:
validation_id=f"val_{int(time.time())}",
suite_id="security_suite",
test_id=test.test_id,
status=ValidationStatus.PASSED
if rate_limiting_active
else ValidationStatus.WARNING,
status=(
ValidationStatus.PASSED
if rate_limiting_active
else ValidationStatus.WARNING
),
execution_time_ms=execution_time,
result_data={
"total_requests": request_count + blocked_count,
Expand Down Expand Up @@ -336,9 +346,11 @@ async def run_vulnerability_scan(self) -> ValidationResult:
validation_id=f"val_{int(time.time())}",
suite_id="security_suite",
test_id=test.test_id,
status=ValidationStatus.PASSED
if critical_issues == 0
else ValidationStatus.FAILED,
status=(
ValidationStatus.PASSED
if critical_issues == 0
else ValidationStatus.FAILED
),
execution_time_ms=execution_time,
result_data={
"vulnerabilities": vulnerabilities,
Expand Down Expand Up @@ -441,9 +453,11 @@ async def run_hipaa_validation(self) -> ValidationResult:
validation_id=f"val_{int(time.time())}",
suite_id="compliance_suite",
test_id=test.test_id,
status=ValidationStatus.PASSED
if compliance_score >= 95
else ValidationStatus.WARNING,
status=(
ValidationStatus.PASSED
if compliance_score >= 95
else ValidationStatus.WARNING
),
execution_time_ms=execution_time,
result_data={
"compliance_checks": compliance_checks,
Expand Down Expand Up @@ -522,20 +536,23 @@ async def run_response_time_test(self) -> ValidationResult:
try:
response_times = []

# Make multiple requests to measure response times
for i in range(50):
async def measure_request(client):
request_start = time.time()
try:
response = requests.get(
"http://localhost:8000/api/v1/health", timeout=10
response = await client.get(
"http://localhost:8000/api/v1/health", timeout=10.0
)
request_time = (time.time() - request_start) * 1000
if response.status_code == 200:
response_times.append(request_time)
except requests.exceptions.RequestException:
except httpx.RequestError:
# Skip failed requests for response time calculation
pass

async with httpx.AsyncClient() as client:
tasks = [measure_request(client) for _ in range(50)]
await asyncio.gather(*tasks)

if not response_times:
raise Exception("No successful requests completed")

Expand All @@ -554,9 +571,11 @@ async def run_response_time_test(self) -> ValidationResult:
validation_id=f"val_{int(time.time())}",
suite_id="performance_suite",
test_id=test.test_id,
status=ValidationStatus.PASSED
if sla_compliant
else ValidationStatus.WARNING,
status=(
ValidationStatus.PASSED
if sla_compliant
else ValidationStatus.WARNING
),
execution_time_ms=execution_time,
result_data={
"response_times": {
Expand Down Expand Up @@ -589,9 +608,7 @@ class EnterpriseValidator:
"""Main enterprise validation system"""

def __init__(self):
self.validation_path = Path(
"/home/vivi/pixelated/ai/infrastructure/qa/validation_results"
)
self.validation_path = Path("validation_results")
self.validation_path.mkdir(parents=True, exist_ok=True)

# Initialize validators
Expand Down Expand Up @@ -724,15 +741,21 @@ async def _generate_validation_report(

# Compliance status
compliance_status = {
"HIPAA": "COMPLIANT"
if category_scores.get("compliance", 0) >= 95
else "NON_COMPLIANT",
"SOC2": "COMPLIANT"
if category_scores.get("security", 0) >= 95
else "NON_COMPLIANT",
"GDPR": "COMPLIANT"
if category_scores.get("compliance", 0) >= 95
else "NON_COMPLIANT",
"HIPAA": (
"COMPLIANT"
if category_scores.get("compliance", 0) >= 95
else "NON_COMPLIANT"
),
"SOC2": (
"COMPLIANT"
if category_scores.get("security", 0) >= 95
else "NON_COMPLIANT"
),
"GDPR": (
"COMPLIANT"
if category_scores.get("compliance", 0) >= 95
else "NON_COMPLIANT"
),
}

# Generate recommendations
Expand Down