Problem
The metrics collector recalculates ALL metrics from scratch every 15 seconds, wasting CPU cycles on unchanged historical data and causing growing collection times as data accumulates.
Impact
- CPU Waste: 90% of processing time spent on unchanged data
- Growing Latency: Collection time increases linearly with data size
- Resource Usage: Unnecessary database load and memory allocation
Current Behavior
# Every 15 seconds:
def collect_metrics(self):
# Processes entire history
cursor.execute('SELECT * FROM agent_invocations') # ALL records
# Recalculates everything
Proposed Solution
1. Track Last Processed State
class MetricsCollector:
def __init__(self, db_path):
self.db_path = db_path
self.checkpoint_file = db_path.parent / '.metrics_checkpoint'
self.last_processed = self.load_checkpoint()
self.metrics_cache = {}
def load_checkpoint(self):
"""Load last processed timestamp and ID"""
if self.checkpoint_file.exists():
with open(self.checkpoint_file) as f:
data = json.load(f)
return data.get('last_timestamp'), data.get('last_id')
return None, 0
def save_checkpoint(self, timestamp, last_id):
"""Persist checkpoint for recovery"""
with open(self.checkpoint_file, 'w') as f:
json.dump({
'last_timestamp': timestamp,
'last_id': last_id,
'saved_at': datetime.now().isoformat()
}, f)
2. Process Only New Records
def collect_incremental(self):
"""Process only new/changed records"""
last_timestamp, last_id = self.last_processed
# Get new invocations
cursor.execute('''
SELECT * FROM agent_invocations
WHERE id > ? OR timestamp > ?
ORDER BY id
LIMIT 1000
''', (last_id, last_timestamp or '1970-01-01'))
new_records = cursor.fetchall()
if not new_records:
logger.debug("No new records to process")
return
# Process new records
for record in new_records:
self.update_metrics(record)
# Save checkpoint
last_record = new_records[-1]
self.save_checkpoint(last_record['timestamp'], last_record['id'])
3. Maintain Running Totals
class IncrementalMetrics:
def __init__(self):
self.agent_counts = defaultdict(int)
self.session_metrics = {}
self.last_update = {}
def update_agent_count(self, agent_name, delta=1):
"""Update running total"""
self.agent_counts[agent_name] += delta
self.last_update[agent_name] = datetime.now()
def get_agent_total(self, agent_name):
"""Get current total without recalculation"""
return self.agent_counts[agent_name]
4. Handle Historical Data Changes
def detect_historical_changes(self):
"""Check if historical data was modified"""
cursor.execute('''
SELECT COUNT(*) as modified_count
FROM agent_invocations
WHERE id <= ? AND updated_at > ?
''', (self.last_id, self.last_checkpoint_time))
if cursor.fetchone()['modified_count'] > 0:
logger.warning("Historical data modified, full recalculation needed")
return True
return False
5. Periodic Full Refresh
def should_full_refresh(self):
"""Determine if full recalculation needed"""
# Full refresh every hour to ensure consistency
if time.time() - self.last_full_refresh > 3600:
return True
# Or if checkpoint is too old
if self.checkpoint_age() > timedelta(hours=6):
return True
return False
Implementation Phases
Phase 1: Basic Incremental (2 hours)
- Add checkpoint tracking
- Process only new records by ID
- Save/load checkpoint state
Phase 2: Running Totals (1 hour)
- Maintain metric caches
- Update only changed values
- Implement delta calculations
Phase 3: Advanced Features (1 hour)
- Historical change detection
- Periodic full refresh
- Checkpoint recovery
Validation Criteria
Performance Improvement
| Dataset Size |
Current Time |
With Incremental |
Improvement |
| 10K records |
100ms |
10ms |
10x |
| 100K records |
850ms |
15ms |
57x |
| 1M records |
8s |
20ms |
400x |
| 10M records |
80s |
25ms |
3200x |
Testing
def test_incremental_accuracy():
# Insert 1000 records
# Run full collection
full_metrics = collector.collect_full()
# Insert 10 more records
# Run incremental collection
incremental_metrics = collector.collect_incremental()
# Verify metrics match
assert incremental_metrics == expected_metrics
def test_checkpoint_recovery():
# Process some records
# Kill collector
# Restart and verify continues from checkpoint
References
Effort Estimate
4 hours total
- 2 hours: Basic implementation
- 1 hour: Running totals
- 1 hour: Advanced features and testing
Dependencies
Problem
The metrics collector recalculates ALL metrics from scratch every 15 seconds, wasting CPU cycles on unchanged historical data and causing growing collection times as data accumulates.
Impact
Current Behavior
Proposed Solution
1. Track Last Processed State
2. Process Only New Records
3. Maintain Running Totals
4. Handle Historical Data Changes
5. Periodic Full Refresh
Implementation Phases
Phase 1: Basic Incremental (2 hours)
Phase 2: Running Totals (1 hour)
Phase 3: Advanced Features (1 hour)
Validation Criteria
Performance Improvement
Testing
References
Effort Estimate
4 hours total
Dependencies