Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
538 changes: 248 additions & 290 deletions METRICS.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ with TaskHandler(configuration=config, metrics_settings=metrics, scan_for_annota
curl http://localhost:8000/metrics
```

See [examples/metrics_example.py](examples/metrics_example.py) and [METRICS.md](METRICS.md) for details on all tracked metrics.
Legacy metrics are emitted by default. Set `WORKER_CANONICAL_METRICS=true` before starting workers to use the canonical metric catalog. See [examples/metrics_example.py](examples/metrics_example.py) and [METRICS.md](METRICS.md) for the full legacy and canonical reference.

### Managing Workflow Executions

Expand Down
7 changes: 1 addition & 6 deletions WORKER_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ export conductor.worker.process_order.paused=true
When a worker is paused:
- It stops polling for new tasks
- Already-executing tasks complete normally
- The `task_paused_total` metric is incremented for each skipped poll
- No code changes or process restarts required

**Use cases:**
Expand All @@ -346,11 +345,7 @@ unset conductor.worker.all.paused
export conductor.worker.all.paused=false
```

**Monitor paused workers** using the `task_paused_total` metric:
```promql
# Check how many times workers were paused
task_paused_total{taskType="process_order"}
```
See [METRICS.md](METRICS.md) for the current Python SDK metrics catalog.

### Multi-Region Deployment

Expand Down
115 changes: 7 additions & 108 deletions docs/design/WORKER_DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,114 +767,13 @@ Workers package - all worker modules auto-discovered

## Metrics & Monitoring

The SDK provides comprehensive Prometheus metrics collection with two deployment modes:
This design document describes how worker events flow through the SDK. The
current user-facing metrics setup, legacy and canonical metric catalogs,
`WORKER_CANONICAL_METRICS` behavior, Prometheus examples, and migration guidance
are maintained in [`../../METRICS.md`](../../METRICS.md).

### Configuration

**HTTP Mode (Recommended - Metrics served from memory):**
```python
from conductor.client.configuration.settings.metrics_settings import MetricsSettings

metrics_settings = MetricsSettings(
directory="/tmp/conductor-metrics", # .db files for multiprocess coordination
update_interval=0.1, # Update every 100ms
http_port=8000 # Expose metrics via HTTP
)

with TaskHandler(
configuration=config,
metrics_settings=metrics_settings
) as handler:
handler.start_processes()
```

**File Mode (Metrics written to file):**
```python
metrics_settings = MetricsSettings(
directory="/tmp/conductor-metrics",
file_name="metrics.prom",
update_interval=1.0,
http_port=None # No HTTP server - write to file instead
)
```

### Modes

| Mode | HTTP Server | File Writes | Use Case |
|------|-------------|-------------|----------|
| HTTP (`http_port` set) | ✅ Built-in | ❌ Disabled | Prometheus scraping, production |
| File (`http_port=None`) | ❌ Disabled | ✅ Enabled | File-based monitoring, testing |

**HTTP Mode Benefits:**
- Metrics served directly from memory (no file I/O)
- Built-in HTTP server with `/metrics` and `/health` endpoints
- Automatic aggregation across worker processes (no PID labels)
- Ready for Prometheus scraping out-of-the-box

### Key Metrics

**Task Metrics:**
- `task_poll_time_seconds{taskType,quantile}` - Poll latency (includes batch polling)
- `task_execute_time_seconds{taskType,quantile}` - Actual execution time (async tasks: from submission to completion)
- `task_execute_error_total{taskType,exception}` - Execution errors by type
- `task_poll_total{taskType}` - Total poll count
- `task_result_size_bytes{taskType,quantile}` - Task output size

**API Metrics:**
- `http_api_client_request{method,uri,status,quantile}` - API request latency
- `http_api_client_request_count{method,uri,status}` - Request count by endpoint
- `http_api_client_request_sum{method,uri,status}` - Total request time

**Labels:**
- `taskType`: Task definition name
- `method`: HTTP method (GET, POST, PUT)
- `uri`: API endpoint path
- `status`: HTTP status code
- `exception`: Exception type (for errors)
- `quantile`: 0.5, 0.75, 0.9, 0.95, 0.99

**Important Notes:**
- **No PID labels**: Metrics are automatically aggregated across processes
- **Async execution time**: Includes actual execution time, not just coroutine submission time
- **Multiprocess safe**: Uses SQLite .db files in `directory` for coordination

### Prometheus Integration

**Scrape Config:**
```yaml
scrape_configs:
- job_name: 'conductor-workers'
static_configs:
- targets: ['localhost:8000']
scrape_interval: 15s
```

**Accessing Metrics:**
```bash
# Metrics endpoint
curl http://localhost:8000/metrics

# Health check
curl http://localhost:8000/health

# Watch specific metric
watch -n 1 'curl -s http://localhost:8000/metrics | grep task_execute_time_seconds'
```

**PromQL Examples:**
```promql
# Average execution time
rate(task_execute_time_seconds_sum[5m]) / rate(task_execute_time_seconds_count[5m])

# Success rate
sum(rate(task_execute_time_seconds_count{status="SUCCESS"}[5m])) / sum(rate(task_execute_time_seconds_count[5m]))

# p95 latency
task_execute_time_seconds{quantile="0.95"}

# Error rate
sum(rate(task_execute_error_total[5m])) by (taskType)
```
Keep metric names and PromQL examples out of this design document so the SDK has
one source of truth for legacy and canonical metrics.

---

Expand Down Expand Up @@ -1264,8 +1163,8 @@ class CostTracker(TaskRunnerEventsListener):
```python
handler = TaskHandler(
configuration=config,
metrics_settings=metrics_settings,
event_listeners=[
PrometheusMetricsCollector(),
SLAMonitor(threshold_ms=5000),
CostTracker(cost_per_second={'ml_task': 0.05}),
CustomAuditLogger()
Expand Down
43 changes: 7 additions & 36 deletions docs/design/WORKER_SDK_IMPLEMENTATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1789,44 +1789,15 @@ Response: void

## 15. Metrics & Monitoring

### 15.1 Required Metrics
The Python SDK's current metrics behavior is documented in
[`../../METRICS.md`](../../METRICS.md). That file is the source of truth for:

**Via Event System (Recommended):**
- Enabling metrics with `MetricsSettings`
- Selecting canonical metrics with `WORKER_CANONICAL_METRICS`
- The complete legacy and canonical Prometheus catalogs
- Migration guidance from legacy quantile gauges to canonical histograms

Implement MetricsCollector as EventListener:

```
class MetricsCollector implements TaskRunnerEventsListener {
on_poll_started(event):
increment_counter("task_poll_total", labels={taskType: event.taskType})

on_poll_completed(event):
record_histogram("task_poll_time_seconds", event.durationMs / 1000)
increment_counter("task_poll_total", labels={taskType: event.taskType})

on_task_execution_completed(event):
record_histogram("task_execute_time_seconds", event.durationMs / 1000)
record_histogram("task_result_size_bytes", event.outputSizeBytes)

on_task_execution_failure(event):
increment_counter("task_execute_error_total",
labels={taskType: event.taskType, exception: event.cause.type})

on_task_update_failure(event):
increment_counter("task_update_failed_total",
labels={taskType: event.taskType})
// CRITICAL: Alert operations team
}
```

**Metric Names (Prometheus format):**
- `task_poll_total{taskType}`
- `task_poll_time_seconds{taskType,quantile}`
- `task_execute_time_seconds{taskType,quantile}`
- `task_execute_error_total{taskType,exception}`
- `task_result_size_bytes{taskType,quantile}`
- `task_update_error_total{taskType,exception}`
- `task_update_failed_total{taskType}` ← CRITICAL metric
Do not duplicate metric names or PromQL examples in this implementation guide.

---

Expand Down
Loading
Loading