-
Notifications
You must be signed in to change notification settings - Fork 4
feat: add per-module lifetime counters for connections and bytes #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -127,6 +127,21 @@ type upstreamCounters struct { | |
| dialError atomic.Uint64 | ||
| } | ||
|
|
||
| // moduleUpstreamKey identifies a (module, upstream) pair for per-module | ||
| // lifetime counters. | ||
| type moduleUpstreamKey struct { | ||
| module string | ||
| upstream string | ||
| } | ||
|
|
||
| // moduleCounters holds per-(module, upstream) lifetime counters that are | ||
| // updated when a relay finishes successfully. | ||
| type moduleCounters struct { | ||
| completed atomic.Uint64 | ||
| sentBytes atomic.Uint64 | ||
| recvBytes atomic.Uint64 | ||
| } | ||
|
|
||
| type Server struct { | ||
| // --- Options section | ||
| // Listen Address | ||
|
|
@@ -166,6 +181,11 @@ type Server struct { | |
| upstreamCounters sync.Map | ||
| unknownModuleCount atomic.Uint64 | ||
|
|
||
| // Per-(module, upstream) counters tracked when a relay finishes | ||
| // successfully. Lazy-initialized via getModuleCounters. | ||
| // map key is moduleUpstreamKey. Value is *moduleCounters. | ||
| moduleCounters sync.Map | ||
|
|
||
| TCPListener net.Listener | ||
| TLSListener net.Listener | ||
| HTTPListener net.Listener | ||
|
|
@@ -346,6 +366,26 @@ func (s *Server) getUpstreamCounters(name string) *upstreamCounters { | |
| return v.(*upstreamCounters) | ||
| } | ||
|
|
||
| // getModuleCounters returns the per-(module, upstream) counters, creating | ||
| // them lazily on first reference. Safe for concurrent use. | ||
| // | ||
| // Empty module/upstream values are normalized to "unknown" so that the | ||
| // internal sync.Map key matches what prometheusLabelValueOrUnknown emits at | ||
| // scrape time. Otherwise an empty string and the literal "unknown" would | ||
| // produce two distinct map entries that render to the same Prometheus label | ||
| // set, leading to duplicate output lines. | ||
| func (s *Server) getModuleCounters(module, upstream string) *moduleCounters { | ||
| key := moduleUpstreamKey{ | ||
| module: prometheusLabelValueOrUnknown(module), | ||
| upstream: prometheusLabelValueOrUnknown(upstream), | ||
| } | ||
| if v, ok := s.moduleCounters.Load(key); ok { | ||
| return v.(*moduleCounters) | ||
| } | ||
| v, _ := s.moduleCounters.LoadOrStore(key, &moduleCounters{}) | ||
| return v.(*moduleCounters) | ||
| } | ||
|
Comment on lines
+377
to
+387
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, fixed in 3d3b8a7.
Comment on lines
+377
to
+387
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sticking with the load-then-LoadOrStore pattern: it matches the same shape used in |
||
|
|
||
| func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target { | ||
| modules := map[string][]Target{} | ||
| for _, upstream := range upstreams { | ||
|
|
@@ -744,6 +784,11 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err | |
| s.sentBytesTotal.Add(uint64(sentBytes)) | ||
| s.recvBytesTotal.Add(uint64(receivedBytes)) | ||
|
|
||
| mc := s.getModuleCounters(moduleName, target.Upstream) | ||
| mc.completed.Add(1) | ||
| mc.sentBytes.Add(uint64(sentBytes)) | ||
| mc.recvBytes.Add(uint64(receivedBytes)) | ||
|
|
||
| duration := time.Since(info.ConnectedAt) | ||
| s.accessLog.F("client %s finishes module %s (sent: %d, received: %d, duration: %s)", ip, moduleName, sentBytes, receivedBytes, duration) | ||
| return nil | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -742,6 +742,17 @@ func TestMetricsIncludesLifetimeCounters(t *testing.T) { | |
| assert.Contains(t, text, fmt.Sprintf("rsync_proxy_sent_bytes_total %d\n", len(payload))) | ||
| assert.Contains(t, text, "# HELP rsync_proxy_received_bytes_total") | ||
| assert.Contains(t, text, "# TYPE rsync_proxy_received_bytes_total counter") | ||
|
|
||
| // Per-(module, upstream) lifetime counters. | ||
| assert.Contains(t, text, "# HELP rsync_proxy_module_completed_connections_total") | ||
| assert.Contains(t, text, "# TYPE rsync_proxy_module_completed_connections_total counter") | ||
| assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"fake\",upstream=\"u1\"} 1\n") | ||
| assert.Contains(t, text, "# HELP rsync_proxy_module_sent_bytes_total") | ||
| assert.Contains(t, text, "# TYPE rsync_proxy_module_sent_bytes_total counter") | ||
| assert.Contains(t, text, fmt.Sprintf("rsync_proxy_module_sent_bytes_total{module=\"fake\",upstream=\"u1\"} %d\n", len(payload))) | ||
| assert.Contains(t, text, "# HELP rsync_proxy_module_received_bytes_total") | ||
| assert.Contains(t, text, "# TYPE rsync_proxy_module_received_bytes_total counter") | ||
| assert.Contains(t, text, "rsync_proxy_module_received_bytes_total{module=\"fake\",upstream=\"u1\"} 0\n") | ||
|
Comment on lines
+746
to
+755
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty-vs- |
||
| } | ||
|
|
||
| func TestPrometheusLabelValueEscaping(t *testing.T) { | ||
|
|
@@ -1101,3 +1112,27 @@ func TestDiscoverModulesFromTrailingModuleBlock(t *testing.T) { | |
| require.NoError(t, err) | ||
| assert.Equal(t, []string{"bar", "foo"}, modules) | ||
| } | ||
|
|
||
| func TestModuleCountersNormalizeEmptyKeyToUnknown(t *testing.T) { | ||
| srv := New() | ||
|
|
||
| // Both empty and "unknown" inputs must point at the same internal | ||
| // counter, so a scrape cannot emit two lines that share the same | ||
| // rendered Prometheus label set. | ||
| c1 := srv.getModuleCounters("", "") | ||
| c2 := srv.getModuleCounters("unknown", "unknown") | ||
| assert.Same(t, c1, c2) | ||
|
|
||
| c1.completed.Add(7) | ||
|
|
||
| // metrics.go uses prometheusEscapeLabelValue on the stored key only, | ||
| // so the rendered output must show the normalized "unknown" value. | ||
| var buf bytes.Buffer | ||
| srv.writePrometheusMetrics(&buf, time.Now()) | ||
| text := buf.String() | ||
| assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"unknown\",upstream=\"unknown\"} 7\n") | ||
|
|
||
| // And there should be exactly one such line — i.e. no second line with | ||
| // an empty-string label rendered separately. | ||
| assert.Equal(t, 1, strings.Count(text, "rsync_proxy_module_completed_connections_total{")) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice this is bounded.
target.Upstreamis set by the proxy itself toupstreamConfig.Name(always a configured value), and themodulevalue at the increment site only reaches the lifetime accumulator after the request has resolved againsts.modules— the listing-all-modules / unknown-module / queue-full / dial-error paths return earlier without touchinggetModuleCounters. So the upper bound isΣ(len(upstream.Modules)), identical in cardinality to the existing per-connection gauges. Happy to add an explicit doc comment about that bound if helpful.