{$PWD}/
├── .github/
│ ├── workflows/
│ │ ├── security-scan.yml []
│ │ └── tests.yml []
│ └── dependabot.yml []
├── .pre-commit-config.yaml []
├── sdk_workflow/
│ ├── core/
│ │ ├── agent_client.py []
│ │ ├── cache.py []
│ │ ├── cache_manager.py []
│ │ ├── lru_cache.py []
│ │ ├── semantic_cache.py []
│ │ ├── metrics.py []
│ │ ├── performance.py []
│ │ ├── profiling.py []
│ │ └── logging.py []
│ ├── managers/
│ │ ├── token_manager.py []
│ │ ├── cost_manager.py []
│ │ ├── checkpoint_manager.py []
│ │ └── session_manager.py []
│ ├── cli/
│ │ ├── main.py []
│ │ └── arguments.py []
│ └── executors/
│ └── streaming_example.py []
├── tests/
│ ├── test_token_manager.py []
│ ├── test_cost_manager.py []
│ ├── test_checkpoint_manager.py []
│ ├── test_session_manager.py []
│ ├── test_lru_cache.py []
│ ├── test_semantic_cache.py []
│ ├── test_cache_manager.py []
│ ├── test_cache_integration.py []
│ ├── test_performance.py []
│ ├── test_profiling.py []
│ └── test_logging.py []
├── pyproject.toml []
├── requirements-lock.txt []
└── SDK_OPTIMIZATION_PLAN.md []
- This plan assumes Python 3.10+ environment
- All code must follow PEP 8 style guidelines
- All new code must include type hints
- All public APIs must have comprehensive docstrings
- Security is prioritized throughout all phases
- Performance is monitored and validated at each phase
- The plan is iterative - adjustments may be made based on phase outcomes
This plan details the implementation of 5 major SDK optimizations in priority order, coordinated across implementer, reviewer, and tester subagents. Project Structure:
- Language: Python 3.10+
- SDK: claude-agent-sdk
- Package: {$PWD} (located in
C:\Users\Ray\.claude\{$PWD}) - Dependencies: anthropic, httpx, pydantic
Priority: HIGHEST Estimated Effort: Medium Status: Managers exist but need enhancement
The following managers already exist with basic functionality:
- TokenManager (40 lines) - Basic token tracking with deduplication
- CostManager (36 lines) - Basic cost calculation with model pricing
- CheckpointManager (41 lines) - Basic checkpoint persistence
- SessionManager (40 lines) - Basic session lifecycle
File: sdk_workflow/managers/token_manager.py
New Features:
- Add context window overflow detection with warnings
- Implement token usage history tracking (last N requests)
- Add token rate limiting per time window
- Implement token usage analytics (average per request, trends)
- Add export functionality (JSON/CSV)
- Thread-safety improvements with locks
- Integration with MetricsEngine for unified tracking New Methods:
def get_usage_history(self, limit: int = 10) -> List[Dict]
def check_rate_limit(self, window_seconds: int, max_tokens: int) -> bool
def get_analytics(self) -> Dict[str, Any]
def export_metrics(self, format: str = "json") -> str
def predict_overflow(self, estimated_tokens: int) -> Tuple[bool, float]File: sdk_workflow/managers/cost_manager.py
New Features:
- Add budget alerts (soft/hard limit warnings)
- Implement cost projection based on usage patterns
- Add cost breakdown by operation type
- Cache efficiency reporting (90% savings tracking)
- Multi-session cost aggregation
- Export cost reports (JSON/CSV/PDF)
- Integration with MetricsEngine New Methods:
def check_budget_status(self, budget_limit: float) -> Dict[str, Any]
def project_session_cost(self, estimated_turns: int) -> float
def get_cost_breakdown(self) -> Dict[str, float]
def calculate_cache_efficiency(self) -> Dict[str, Any]
def export_cost_report(self, format: str = "json") -> str
def aggregate_costs(self, session_ids: List[str]) -> Dict[str, Any]File: sdk_workflow/managers/checkpoint_manager.py
New Features:
- Add checkpoint versioning and history
- Implement incremental checkpoint saves (delta encoding)
- Add checkpoint compression (gzip)
- Implement checkpoint validation and integrity checks
- Add auto-cleanup of old checkpoints (retention policy)
- Support for checkpoint migration between versions
- Backup and restore functionality New Methods:
def save_checkpoint_versioned(self, session_id: str, **data) -> Tuple[Path, int]
def load_checkpoint_version(self, session_id: str, version: int) -> Optional[Dict]
def list_checkpoint_versions(self, session_id: str) -> List[Dict]
def validate_checkpoint(self, session_id: str, version: int) -> bool
def cleanup_old_checkpoints(self, retention_days: int = 30) -> int
def backup_checkpoint(self, session_id: str, backup_dir: Path) -> Path
def restore_checkpoint(self, backup_path: Path) -> strFile: sdk_workflow/managers/session_manager.py
New Features:
- Add session state persistence to disk
- Implement session search and filtering
- Add session analytics (duration, success rate, etc.)
- Support session tagging and categorization
- Implement session lifecycle events (hooks)
- Add session archival and cleanup
- Support concurrent session management New Methods:
def persist_session(self, session_id: str) -> Path
def load_persisted_session(self, session_id: str) -> Optional[Dict]
def search_sessions(self, filters: Dict[str, Any]) -> List[Dict]
def get_session_analytics(self) -> Dict[str, Any]
def tag_session(self, session_id: str, tags: List[str]) -> None
def archive_session(self, session_id: str) -> Path
def cleanup_sessions(self, older_than_days: int) -> int- Implementer: Enhance each manager sequentially (TokenManager → CostManager → CheckpointManager → SessionManager)
- Reviewer: Review each manager for code quality, error handling, and API design
- Tester: Write comprehensive unit tests for each manager (aim for >90% coverage) Validation Criteria:
- All new methods have docstrings
- All managers have comprehensive error handling
- All managers have unit tests with >90% coverage
- Integration tests verify manager interactions
- Performance benchmarks show <10ms overhead
Priority: HIGH Estimated Effort: Medium
File: sdk_workflow/cli/main.py:189-413
Status: DEPRECATED - Already marked for removal
Action: Document deprecation, remove dead code after validation
File: sdk_workflow/cli/arguments.py:246-418
Status: DEPRECATED - Related to archived mailbox system
Action: Remove after confirming no dependencies
File: sdk_workflow/executors/streaming_example.py:186-330
Refactoring Strategy: Extract Method + Strategy Pattern
- Extract setup logic →
_setup_agent_client() - Extract streaming logic →
_handle_streaming_response() - Extract message processing →
_process_assistant_message() - Extract error handling →
_handle_execution_error()Target: Break into 5 functions, each <40 lines
File: sdk_workflow/core/__init__.py:76-206
Refactoring Strategy: Strategy Pattern + Registry
- Extract import logic →
_import_module_lazy() - Create module registry →
MODULE_REGISTRYconstant - Extract deprecation warnings →
_handle_deprecated_import() - Simplify main logic to <30 lines
File: sdk_workflow/cli/arguments.py:57-164
Refactoring Strategy: Builder Pattern
- Create
ExecutionArgsBuilderclass - Extract model args →
add_model_arguments() - Extract execution args →
add_execution_arguments() - Extract config args →
add_config_arguments()Target: Each method <30 lines
- Single Responsibility: Each function has one clear purpose
- Extract Method: Break large functions into smaller, named helpers
- Strategy Pattern: Use when multiple algorithms/approaches exist
- Builder Pattern: Use for complex object construction
- DRY: Eliminate code duplication
- No function >100 lines (target <50 lines)
- Cyclomatic complexity <10 per function
- All tests still pass after refactoring
- Code coverage remains ≥90%
- No performance regression (benchmark key functions)
Priority: MEDIUM-HIGH Estimated Effort: High
- Existing:
PromptCacheOptimizerinsdk_workflow/core/cache.py(262 lines) - Functionality: 4-tier prompt caching (system, tools, history, current)
- Limitation: Only handles API-level prompt caching, no semantic caching
New File: sdk_workflow/core/lru_cache.py
Features:
- In-memory LRU cache with configurable size limits
- TTL (time-to-live) support for cache entries
- Cache hit/miss tracking
- Thread-safe implementation
- Integration with existing PromptCacheOptimizer Implementation:
class LRUCache:
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600)
def get(self, key: str) -> Optional[Any]
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None
def invalidate(self, key: str) -> bool
def clear(self) -> None
def get_stats(self) -> Dict[str, Any]New File: sdk_workflow/core/semantic_cache.py
Features:
- Embedding-based similarity matching
- Configurable similarity threshold (default: 0.95)
- Integration with sentence-transformers or similar
- Fallback to exact match if embeddings unavailable
- Performance monitoring (latency tracking) Implementation:
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95)
def find_similar(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]
def add(self, query: str, response: Any) -> None
def get_if_similar(self, query: str) -> Optional[Any]New Dependency: Add to pyproject.toml:
dependencies = [
# ... existing ...
"sentence-transformers>=2.2.0", # For semantic similarity
"numpy>=1.24.0", # For vector operations
]New File: sdk_workflow/core/cache_manager.py
Features:
- Orchestrates all cache layers (LRU + Semantic + Prompt)
- Cascade lookup strategy (LRU → Semantic → API)
- Cache warming and preloading
- Cache analytics and reporting
- Cache persistence (save/load to disk) Implementation:
class CacheManager:
def __init__(self, lru_cache: LRUCache, semantic_cache: SemanticCache, prompt_optimizer: PromptCacheOptimizer)
def get_cached_response(self, request: Dict) -> Optional[Any]
def cache_response(self, request: Dict, response: Any) -> None
def warm_cache(self, common_requests: List[Dict]) -> None
def get_cache_report(self) -> Dict[str, Any]
def persist_to_disk(self, path: Path) -> None
def load_from_disk(self, path: Path) -> None- Integrate with
AgentClientManagerincore/agent_client.py - Add cache hooks to executor classes
- Update
MetricsEngineto track cache performance - Add CLI flags for cache control (
--cache-mode,--no-cache) Test Files: tests/test_lru_cache.py(NEW)tests/test_semantic_cache.py(NEW)tests/test_cache_manager.py(NEW)tests/test_cache_integration.py(NEW)
- LRU cache hit rate >60% for repeated requests
- Semantic cache finds similar queries with >0.90 accuracy
- Cache lookup latency <50ms (95th percentile)
- Cache reduces API costs by >50% in typical workflows
- All cache operations are thread-safe
Priority: MEDIUM Estimated Effort: Low-Medium
File: pyproject.toml
dependencies = [
"anthropic>=0.40.0",
"httpx>=0.27.0",
"pydantic>=2.0.0",
]Issue: Unpinned versions allow breaking changes
Action: Update pyproject.toml with exact versions
Strategy:
- Run
pip freezeto get current versions - Pin major dependencies with
==operator - Pin transitive dependencies in
requirements-lock.txt - Add
requirements-dev.txtfor development tools New Files:
requirements-lock.txt- Fully pinned transitive dependenciesrequirements-dev.txt- Development/testing dependencies Updatedpyproject.toml:
dependencies = [
"anthropic==0.40.0",
"httpx==0.27.0",
"pydantic==2.6.0",
"sentence-transformers==2.2.2", # NEW
"numpy==1.24.3", # NEW
]
[project.optional-dependencies]
dev = [
"pytest==7.4.3",
"pytest-cov==4.1.0",
"pytest-asyncio==0.21.1",
"black==23.12.0",
"ruff==0.1.9",
"mypy==1.7.1",
]
security = [
"bandit==1.7.6",
"safety==3.0.1",
]New File: .github/workflows/security-scan.yml
Features:
- Run on every push and PR
- Scan dependencies with
safety check - Scan code with
bandit - Generate SBOM (Software Bill of Materials)
- Fail on HIGH severity issues Workflow:
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install safety bandit
pip install -e .
- name: Safety check
run: safety check --json
- name: Bandit scan
run: bandit -r sdk_workflow -f json -o bandit-report.json
- name: Upload results
uses: actions/upload-artifact@v3
with:
name: security-reports
path: |
bandit-report.jsonNew File: .pre-commit-config.yaml
Hooks:
- Run
banditon changed files - Run
blackfor code formatting - Run
rufffor linting - Run
mypyfor type checking
repos:
- repo: https://github.com/psf/black
rev: 23.12.0
hooks:
- id: black
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.9
hooks:
- id: ruff
- repo: https://github.com/PyCQA/bandit
rev: 1.7.6
hooks:
- id: bandit
args: ['-c', 'pyproject.toml']
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.1
hooks:
- id: mypyNew File: .github/dependabot.yml
Features:
- Auto-update dependencies weekly
- Create PRs for security updates daily
- Group minor updates
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10New File: SECURITY.md
Content:
- Security policy
- Vulnerability reporting process
- Supported versions
- Security best practices
- All dependencies pinned to exact versions
safety checkpasses with no HIGH/CRITICAL vulnerabilitiesbanditscan passes with no HIGH confidence issues- Pre-commit hooks installed and working
- Dependabot configured and running
- Security policy documented
Priority: MEDIUM-LOW Estimated Effort: Medium-High
- Existing:
MetricsEngineinsdk_workflow/core/metrics.py(291 lines) - Coverage: Token/cost tracking, budget enforcement
- Gap: No latency tracking, no distributed tracing, no profiling
New File: sdk_workflow/core/performance.py
Features:
- Distributed tracing with span context
- Automatic instrumentation for key functions
- Latency tracking (p50, p95, p99)
- Operation profiling (CPU, memory)
- Export to OpenTelemetry format Implementation:
class PerformanceTracer:
def __init__(self, enable_tracing: bool = True)
def start_span(self, name: str, parent: Optional[str] = None) -> str
def end_span(self, span_id: str, metadata: Dict = None) -> None
def record_metric(self, name: str, value: float, unit: str) -> None
def get_span_summary(self, span_id: str) -> Dict
def export_traces(self, format: str = "json") -> strDecorator for automatic instrumentation:
@trace_function
def my_function():
# Automatically traced
passFile: sdk_workflow/core/metrics.py (ENHANCEMENT)
New Features:
- Latency tracking per request
- Throughput metrics (requests/sec)
- Error rate tracking
- Integration with PerformanceTracer
- Real-time metrics dashboard (CLI) New Methods:
def track_latency(self, operation: str, latency_ms: float) -> None
def get_latency_percentiles(self, operation: str) -> Dict[str, float]
def get_throughput(self) -> float
def get_error_rate(self) -> float
def get_realtime_dashboard(self) -> strNew File: sdk_workflow/core/profiling.py
Features:
- CPU profiling with cProfile integration
- Memory profiling with memory_profiler
- Automatic hotspot detection
- Profile comparison tools
- Export flamegraphs Implementation:
class Profiler:
def __init__(self, profile_cpu: bool = True, profile_memory: bool = True)
def start_profiling(self) -> None
def stop_profiling(self) -> Dict[str, Any]
def get_hotspots(self, top_n: int = 10) -> List[Dict]
def export_flamegraph(self, output_path: Path) -> None
def compare_profiles(self, other: 'Profiler') -> DictNew File: sdk_workflow/core/logging.py
Features:
- Structured logging (JSON format)
- Log levels with dynamic control
- Correlation IDs for request tracking
- Log aggregation hooks
- Performance-aware logging (minimal overhead) Implementation:
class StructuredLogger:
def __init__(self, name: str, level: str = "INFO")
def log(self, level: str, message: str, **context) -> None
def with_context(self, **context) -> 'StructuredLogger'
def set_correlation_id(self, correlation_id: str) -> None- Instrument all executor classes
- Instrument all manager classes
- Add tracing to core/agent_client.py
- Add CLI flags for performance monitoring
- Create performance benchmarks CLI Additions:
{$PWD} run --profile --trace-output traces.json
{$PWD} benchmark --operations 1000
{$PWD} monitor --realtimeTest Files:
tests/test_performance.py(NEW)tests/test_profiling.py(NEW)tests/test_logging.py(NEW)benchmarks/(NEW DIRECTORY)benchmark_managers.pybenchmark_executors.pybenchmark_cache.py
- Tracing overhead <5% (measured with profiler)
- All key operations instrumented (>50 trace points)
- Latency metrics accurate within ±5ms
- Performance dashboard updates <1s latency
- Profiler identifies top 10 hotspots correctly
- Structured logging adds <1ms per log entry
Role: Write production code Tasks:
- Implement manager enhancements (Phase 1)
- Refactor complex functions (Phase 2)
- Implement caching layers (Phase 3)
- Update dependency configs (Phase 4)
- Implement performance instrumentation (Phase 5) Deliverables per Phase:
- Production code files
- Updated/new modules
- Integration code
- CLI enhancements
Role: Code quality assurance Tasks:
- Review all code changes for:
- Code style compliance (PEP 8)
- Error handling completeness
- API design consistency
- Documentation quality
- Security concerns
- Performance implications Deliverables per Phase:
- Code review reports
- Issue lists with severity ratings
- Approval/rejection recommendations
- Refactoring suggestions
Role: Quality validation Tasks:
- Write unit tests for all new code
- Write integration tests for cross-module features
- Perform regression testing
- Validate performance benchmarks
- Test error scenarios Deliverables per Phase:
- Test files (unit + integration)
- Test coverage reports (aim >90%)
- Benchmark results
- Validation reports
┌─────────────────────────────────────────────────────────────┐
│ PHASE WORKFLOW │
└─────────────────────────────────────────────────────────────┘
1. [IMPLEMENTER] → Write code for phase
↓
2. [REVIEWER] → Review code, provide feedback
↓
3. [IMPLEMENTER] → Address feedback (if needed)
↓
4. [TESTER] → Write tests, validate functionality
↓
5. [REVIEWER] → Review tests
↓
6. [TESTER] → Run full test suite + benchmarks
↓
7. [ALL] → Generate commit + validation report
↓
8. [ORCHESTRATOR]→ Create git commit with detailed message
Between Agents:
- Use structured JSON messages
- Include phase number and task ID
- Reference file paths and line numbers
- Provide clear action items Format:
{
"from": "reviewer",
"to": "implementer",
"phase": 1,
"task": "token_manager_enhancement",
"status": "issues_found",
"issues": [
{
"file": "sdk_workflow/managers/token_manager.py",
"line": 42,
"severity": "high",
"message": "Missing error handling for division by zero",
"suggestion": "Add check for context_window_limit > 0"
}
]
}- One commit per manager (Phase 1: 4 commits)
- One commit per refactored function (Phase 2: ~5 commits)
- One commit per cache layer (Phase 3: 4 commits)
- One commit for dependency updates (Phase 4: 1 commit)
- One commit for security setup (Phase 4: 1 commit)
- One commit per instrumentation layer (Phase 5: ~4 commits) Total: ~19-20 commits
[Phase N] <Component>: <Summary>
<Detailed description>
Changes:
- <change 1>
- <change 2>
- <change 3>
Testing:
- <test coverage>
- <validation method>
Validation Report:
- <key metrics>
- <performance impact>
Co-authored-by: Implementer Agent <implementer@{$PWD}>
Co-authored-by: Reviewer Agent <reviewer@{$PWD}>
Co-authored-by: Tester Agent <tester@{$PWD}>
[Phase 1] TokenManager: Add full functionality with history and analytics
Enhances TokenManager with comprehensive tracking capabilities including:
- Token usage history for last N requests
- Rate limiting per time window
- Usage analytics and trend analysis
- Export functionality (JSON/CSV)
- Thread-safety improvements
- Integration with MetricsEngine
Changes:
- Added get_usage_history() method for historical tracking
- Added check_rate_limit() for rate limiting enforcement
- Added get_analytics() for usage trend analysis
- Added export_metrics() for data export
- Added predict_overflow() for proactive warnings
- Implemented thread locks for concurrent access
- Integrated with MetricsEngine for unified tracking
Testing:
- Unit tests: tests/test_token_manager.py (95% coverage)
- Integration tests: tests/test_managers_integration.py
- Performance benchmarks: <10ms overhead per operation
Validation Report:
- All 47 tests passing
- Code coverage: 95.2%
- Performance: avg 2.3ms per operation (target <10ms)
- Thread-safety validated with 100 concurrent operations
- Memory usage: +1.2MB for 1000 tracked requests
Co-authored-by: Implementer Agent <implementer@{$PWD}>
Co-authored-by: Reviewer Agent <reviewer@{$PWD}>
Co-authored-by: Tester Agent <tester@{$PWD}>
# Phase N Validation Report
## Summary
- **Phase:** N - <Phase Name>
- **Status:** PASSED / FAILED
- **Completion Date:** YYYY-MM-DD
- **Total Effort:** X hours
## Components Delivered
1. <Component 1> - Status: PASS/FAIL
2. <Component 2> - Status: PASS/FAIL
...
## Test Results
- **Total Tests:** X
- **Passed:** X (X%)
- **Failed:** X
- **Skipped:** X
- **Coverage:** X%
## Performance Metrics
| Metric | Target | Actual | Status |
|--------|--------|--------|--------|
| Function complexity | <10 | 8.2 | PASS |
| Test coverage | >90% | 93.5% | PASS |
| Operation latency | <10ms | 6.8ms | PASS |
## Issues Identified
1. [SEVERITY] Description - Status: RESOLVED/OPEN
## Code Review Summary
- **Reviewed by:** Reviewer Agent
- **Files Reviewed:** X
- **Issues Found:** X
- **Issues Resolved:** X
- **Approval Status:** APPROVED / CONDITIONAL / REJECTED
## Recommendations
1. <Recommendation 1>
2. <Recommendation 2>
## Next Steps
1. <Next step 1>
2. <Next step 2>Risk: Manager enhancements break existing integrations Mitigation: Comprehensive integration tests, backward compatibility Risk: Thread-safety issues in concurrent scenarios Mitigation: Use proven patterns (locks), stress testing with concurrent operations
Risk: Refactoring introduces bugs Mitigation: Refactor with tests first, validate no behavior changes Risk: Performance degradation from function calls Mitigation: Benchmark before/after, inline critical paths if needed
Risk: Semantic cache has poor accuracy Mitigation: Configurable threshold, extensive testing, fallback to exact match Risk: Cache memory overhead too high Mitigation: Configurable limits, LRU eviction, monitoring Risk: New dependency (sentence-transformers) increases package size Mitigation: Make it optional, graceful degradation without it
Risk: Pinned dependencies conflict with user environments
Mitigation: Use >= for minor versions, document upgrade path
Risk: Security scans fail CI/CD
Mitigation: Fix issues immediately, use issue tracking
Risk: Instrumentation overhead impacts performance Mitigation: Make instrumentation optional, use sampling, benchmark overhead Risk: Tracing data storage becomes too large Mitigation: Configurable retention, compression, optional export
All 5 phases completed All tests passing (>90% coverage) All code reviews approved All validation reports show PASS status No HIGH/CRITICAL security issues Performance targets met Documentation complete
- Phase 1: All 4 managers fully functional with tests
- Phase 2: All functions <100 lines, complexity <10
- Phase 3: Cache hit rate >60%, cost reduction >50%
- Phase 4: All deps pinned, security scans passing
- Phase 5: Instrumentation coverage >50 trace points, <5% overhead
| Phase | Estimated Effort | Dependencies |
|---|---|---|
| Phase 1 | 12-16 hours | None |
| Phase 2 | 8-12 hours | Phase 1 complete |
| Phase 3 | 16-20 hours | Phases 1-2 complete |
| Phase 4 | 4-6 hours | None (parallel with Phase 3) |
| Phase 5 | 12-16 hours | Phases 1-3 complete |
| Total Estimated Effort: 52-70 hours | ||
| Critical Path: Phase 1 → Phase 2 → Phase 3 → Phase 5 |
- Semantic Cache: Should we make
sentence-transformersoptional to reduce package size? - Cache Persistence: Should cache be persisted across sessions by default?
- Performance Overhead: What's the acceptable overhead for instrumentation? (current target: 5%)
- Dependency Pinning: Prefer exact pinning (
==) or compatible pinning (~=)? - Security Scan Failures: Should HIGH severity issues block commits or just warn?