From 44f5f0965394d45131a8b371f2de0ec394e7a6dc Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Sun, 31 May 2026 10:40:41 +0800 Subject: [PATCH 1/2] feat: add per-module lifetime counters for connections and bytes Introduce three new Prometheus counters tracked at relay completion, labeled by module and upstream: - rsync_proxy_module_completed_connections_total{module,upstream} - rsync_proxy_module_sent_bytes_total{module,upstream} - rsync_proxy_module_received_bytes_total{module,upstream} Until now, lifetime totals (completed connections, sent/received bytes) were only exposed without labels. The per-connection gauges (rsync_proxy_connection_sent_bytes etc.) cannot be used to compute historical traffic per module because the series disappear when the connection ends. The counters are increased at the same point as the existing global totals, so they only count successful relays (listing-all-modules, unknown-module, queue-full and dial-error paths are intentionally excluded, matching the existing rsync_proxy_completed_connections_total semantics). 'accepted' is not split per module because the module name is not yet known at accept time. Storage uses sync.Map keyed by (module, upstream); entries are created lazily on first finished relay. Output is sorted by (module, upstream) to keep scrapes stable. The existing TestMetricsIncludesLifetimeCounters has been extended to assert the new counters. --- pkg/server/metrics.go | 54 +++++++++++++++++++++++++++++++++++++++ pkg/server/server.go | 36 ++++++++++++++++++++++++++ pkg/server/server_test.go | 11 ++++++++ 3 files changed, 101 insertions(+) diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index fedfeb1..0cf2200 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -181,4 +181,58 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_received_bytes_total Total bytes received from clients since start.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_received_bytes_total counter") _, _ = fmt.Fprintf(w, "rsync_proxy_received_bytes_total %d\n", s.recvBytesTotal.Load()) + + // Per-(module, upstream) lifetime counters. We collect a sorted slice of + // keys so that the text output is stable across scrapes. + type moduleStat struct { + key moduleUpstreamKey + completed uint64 + sentBytes uint64 + recvBytes uint64 + } + var moduleStats []moduleStat + s.moduleCounters.Range(func(k, v any) bool { + key := k.(moduleUpstreamKey) + c := v.(*moduleCounters) + moduleStats = append(moduleStats, moduleStat{ + key: key, + completed: c.completed.Load(), + sentBytes: c.sentBytes.Load(), + recvBytes: c.recvBytes.Load(), + }) + return true + }) + sort.Slice(moduleStats, func(i, j int) bool { + if moduleStats[i].key.module != moduleStats[j].key.module { + return moduleStats[i].key.module < moduleStats[j].key.module + } + return moduleStats[i].key.upstream < moduleStats[j].key.upstream + }) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_completed_connections_total Total completed connections by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_completed_connections_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_completed_connections_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + m.completed) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_sent_bytes_total Total bytes sent to clients by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_sent_bytes_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_sent_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + m.sentBytes) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_received_bytes_total Total bytes received from clients by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_received_bytes_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_received_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), + prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + m.recvBytes) + } } diff --git a/pkg/server/server.go b/pkg/server/server.go index be46f45..0cb2bad 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -127,6 +127,21 @@ type upstreamCounters struct { dialError atomic.Uint64 } +// moduleUpstreamKey identifies a (module, upstream) pair for per-module +// lifetime counters. +type moduleUpstreamKey struct { + module string + upstream string +} + +// moduleCounters holds per-(module, upstream) lifetime counters that are +// updated when a relay finishes successfully. +type moduleCounters struct { + completed atomic.Uint64 + sentBytes atomic.Uint64 + recvBytes atomic.Uint64 +} + type Server struct { // --- Options section // Listen Address @@ -166,6 +181,11 @@ type Server struct { upstreamCounters sync.Map unknownModuleCount atomic.Uint64 + // Per-(module, upstream) counters tracked when a relay finishes + // successfully. Lazy-initialized via getModuleCounters. + // map key is moduleUpstreamKey. Value is *moduleCounters. + moduleCounters sync.Map + TCPListener net.Listener TLSListener net.Listener HTTPListener net.Listener @@ -346,6 +366,17 @@ func (s *Server) getUpstreamCounters(name string) *upstreamCounters { return v.(*upstreamCounters) } +// getModuleCounters returns the per-(module, upstream) counters, creating +// them lazily on first reference. Safe for concurrent use. +func (s *Server) getModuleCounters(module, upstream string) *moduleCounters { + key := moduleUpstreamKey{module: module, upstream: upstream} + if v, ok := s.moduleCounters.Load(key); ok { + return v.(*moduleCounters) + } + v, _ := s.moduleCounters.LoadOrStore(key, &moduleCounters{}) + return v.(*moduleCounters) +} + func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target { modules := map[string][]Target{} for _, upstream := range upstreams { @@ -744,6 +775,11 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err s.sentBytesTotal.Add(uint64(sentBytes)) s.recvBytesTotal.Add(uint64(receivedBytes)) + mc := s.getModuleCounters(moduleName, target.Upstream) + mc.completed.Add(1) + mc.sentBytes.Add(uint64(sentBytes)) + mc.recvBytes.Add(uint64(receivedBytes)) + duration := time.Since(info.ConnectedAt) s.accessLog.F("client %s finishes module %s (sent: %d, received: %d, duration: %s)", ip, moduleName, sentBytes, receivedBytes, duration) return nil diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 9b374aa..273d72d 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -742,6 +742,17 @@ func TestMetricsIncludesLifetimeCounters(t *testing.T) { assert.Contains(t, text, fmt.Sprintf("rsync_proxy_sent_bytes_total %d\n", len(payload))) assert.Contains(t, text, "# HELP rsync_proxy_received_bytes_total") assert.Contains(t, text, "# TYPE rsync_proxy_received_bytes_total counter") + + // Per-(module, upstream) lifetime counters. + assert.Contains(t, text, "# HELP rsync_proxy_module_completed_connections_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_completed_connections_total counter") + assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"fake\",upstream=\"u1\"} 1\n") + assert.Contains(t, text, "# HELP rsync_proxy_module_sent_bytes_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_sent_bytes_total counter") + assert.Contains(t, text, fmt.Sprintf("rsync_proxy_module_sent_bytes_total{module=\"fake\",upstream=\"u1\"} %d\n", len(payload))) + assert.Contains(t, text, "# HELP rsync_proxy_module_received_bytes_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_received_bytes_total counter") + assert.Contains(t, text, "rsync_proxy_module_received_bytes_total{module=\"fake\",upstream=\"u1\"} 0\n") } func TestPrometheusLabelValueEscaping(t *testing.T) { From 3d3b8a75c8601464594161c0d48bf862d9447bd8 Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Tue, 2 Jun 2026 20:51:26 +0800 Subject: [PATCH 2/2] fix: normalize empty module/upstream key to "unknown" The module/upstream pair stored in moduleCounters was using the raw string, while metrics.go applied prometheusLabelValueOrUnknown at scrape time. An empty string and the literal "unknown" therefore became two distinct map entries that rendered to the same Prometheus label set, producing duplicate output lines (rejected by Prometheus as ambiguous). Normalize at key creation in getModuleCounters so the internal key matches what is rendered. metrics.go now only escapes the (already normalized) key. Adds a regression test that checks getModuleCounters("","") and getModuleCounters("unknown","unknown") return the same counter, and that the rendered output emits exactly one line for that label set. --- pkg/server/metrics.go | 12 ++++++------ pkg/server/server.go | 11 ++++++++++- pkg/server/server_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index 0cf2200..f8f4737 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -213,8 +213,8 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_completed_connections_total counter") for _, m := range moduleStats { _, _ = fmt.Fprintf(w, "rsync_proxy_module_completed_connections_total{module=\"%s\",upstream=\"%s\"} %d\n", - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), m.completed) } @@ -222,8 +222,8 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_sent_bytes_total counter") for _, m := range moduleStats { _, _ = fmt.Fprintf(w, "rsync_proxy_module_sent_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), m.sentBytes) } @@ -231,8 +231,8 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_received_bytes_total counter") for _, m := range moduleStats { _, _ = fmt.Fprintf(w, "rsync_proxy_module_received_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.module)), - prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(m.key.upstream)), + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), m.recvBytes) } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 0cb2bad..67efef9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -368,8 +368,17 @@ func (s *Server) getUpstreamCounters(name string) *upstreamCounters { // getModuleCounters returns the per-(module, upstream) counters, creating // them lazily on first reference. Safe for concurrent use. +// +// Empty module/upstream values are normalized to "unknown" so that the +// internal sync.Map key matches what prometheusLabelValueOrUnknown emits at +// scrape time. Otherwise an empty string and the literal "unknown" would +// produce two distinct map entries that render to the same Prometheus label +// set, leading to duplicate output lines. func (s *Server) getModuleCounters(module, upstream string) *moduleCounters { - key := moduleUpstreamKey{module: module, upstream: upstream} + key := moduleUpstreamKey{ + module: prometheusLabelValueOrUnknown(module), + upstream: prometheusLabelValueOrUnknown(upstream), + } if v, ok := s.moduleCounters.Load(key); ok { return v.(*moduleCounters) } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 273d72d..fe19717 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -1112,3 +1112,27 @@ func TestDiscoverModulesFromTrailingModuleBlock(t *testing.T) { require.NoError(t, err) assert.Equal(t, []string{"bar", "foo"}, modules) } + +func TestModuleCountersNormalizeEmptyKeyToUnknown(t *testing.T) { + srv := New() + + // Both empty and "unknown" inputs must point at the same internal + // counter, so a scrape cannot emit two lines that share the same + // rendered Prometheus label set. + c1 := srv.getModuleCounters("", "") + c2 := srv.getModuleCounters("unknown", "unknown") + assert.Same(t, c1, c2) + + c1.completed.Add(7) + + // metrics.go uses prometheusEscapeLabelValue on the stored key only, + // so the rendered output must show the normalized "unknown" value. + var buf bytes.Buffer + srv.writePrometheusMetrics(&buf, time.Now()) + text := buf.String() + assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"unknown\",upstream=\"unknown\"} 7\n") + + // And there should be exactly one such line — i.e. no second line with + // an empty-string label rendered separately. + assert.Equal(t, 1, strings.Count(text, "rsync_proxy_module_completed_connections_total{")) +}