feat: add converge package for push-based metric stabilization#25
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
bda4e5c to
1b94432
Compare
1b94432 to
d1a111e
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new converge/ Go package that implements a push-oriented metric stabilization engine (with a thin runner loop) intended to support migrating time-bucketed Cloudflare analytics metrics from scrape-based Prometheus collection to timestamp-correct push-based ingestion.
Changes:
- Introduces a pure(ish) state-machine-style
Enginethat ingestsObservations and emits stabilizedSamples, with TTL-based window expiry and flush semantics. - Adds a
Run()convenience loop to drive the engine with aFetcherandSink, supporting live polling plus rate-limited backfill. - Updates module dependencies (
go.mod/go.sum) and ignore rules for.gograph/.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
converge/engine.go |
New core engine + config/defaults + ingest/expire/flush/stats. |
converge/window.go |
Per-bucket window state and forced-flush sampling. |
converge/tracker.go |
Per-series stabilization tracker (consecutive-identical threshold). |
converge/runner.go |
Loop wiring Fetcher→Engine→Sink with live/backfill lanes. |
converge/types.go |
Public types and interfaces (Observation/Sample/Fetcher/Sink/Stats). |
converge/engine_test.go |
Unit tests for engine behavior (stabilization, TTL expiry, flush). |
converge/tracker_test.go |
Unit tests for tracker state transitions and edge cases. |
go.mod |
Adds new indirect requirements (notably github.com/prometheus/prometheus). |
go.sum |
Corresponding checksum additions for new/updated transitive modules. |
.gitignore |
Ignores .gograph/ at repo root. |
converge/.gitignore |
Ignores .gograph/ under converge/. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for _, o := range obs { | ||
| w := e.windows[o.Bucket] | ||
| if w == nil { | ||
| w = newWindow(o.Bucket, time.Now()) | ||
| e.windows[o.Bucket] = w | ||
| } |
There was a problem hiding this comment.
will fix this up in the next PR
| github.com/prometheus/common v0.53.0 // indirect | ||
| github.com/prometheus/procfs v0.14.0 // indirect | ||
| github.com/prometheus/prometheus v0.50.0 // indirect |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| // Feed observations. Returns samples for any tracker that just stabilized. | ||
| samples := eng.Ingest(observations) | ||
|
|
||
| // Check TTLs. Returns forced-push samples from expiring windows. | ||
| // Removes closed windows. | ||
| samples = eng.Expire(time.Now()) |
There was a problem hiding this comment.
why are these two different calls? any downsides from doing both in a single call so we only ever need to process a single set of samples?
There was a problem hiding this comment.
The separation make sense semantically to me.
- expire check runs even when fetch fails, otherwise we'd have to pass in a nil set of samples or something
- testability is easier since expire can be tested without having to construct observations so we can make targeted assertions with less setup
Pull Request Template
Description
Adds
converge/, a pure state machine engine for push-based metric stabilization. It watches a stream of timestamped observations, detects when values stop changing, and emits push-ready samples with correct source timestamps.This is the core building block for migrating zone traffic metrics from Prometheus scrape (with 4+ minute
scrape_delay) to Victoria Metrics push with accurate timestamps.Type of Change
Testing
Code Quality
Additional Notes
The package separates into a functional core and an imperative shell:
Ingest([]Observation) → []Sample,Expire(now) → []Sample,Flush() → []SampleFetcherandSinkto the engine on a tick, handles live/backfill priority and graceful shutdownInternally the engine manages windows (one per source time bucket), each containing trackers (one per metric series). A tracker counts consecutive identical
uint64values and signals when the configured threshold is crossed.Key decisions
Threshold=1pushes on first observation.Threshold=3waits for convergence. Config knob, not a code change.Ingestpath. It stabilizes faster because the source has already finished aggregating.What's NOT included
main.go/ mode flag