diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index fedfeb1..f8f4737 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -181,4 +181,58 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_received_bytes_total Total bytes received from clients since start.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_received_bytes_total counter") _, _ = fmt.Fprintf(w, "rsync_proxy_received_bytes_total %d\n", s.recvBytesTotal.Load()) + + // Per-(module, upstream) lifetime counters. We collect a sorted slice of + // keys so that the text output is stable across scrapes. + type moduleStat struct { + key moduleUpstreamKey + completed uint64 + sentBytes uint64 + recvBytes uint64 + } + var moduleStats []moduleStat + s.moduleCounters.Range(func(k, v any) bool { + key := k.(moduleUpstreamKey) + c := v.(*moduleCounters) + moduleStats = append(moduleStats, moduleStat{ + key: key, + completed: c.completed.Load(), + sentBytes: c.sentBytes.Load(), + recvBytes: c.recvBytes.Load(), + }) + return true + }) + sort.Slice(moduleStats, func(i, j int) bool { + if moduleStats[i].key.module != moduleStats[j].key.module { + return moduleStats[i].key.module < moduleStats[j].key.module + } + return moduleStats[i].key.upstream < moduleStats[j].key.upstream + }) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_completed_connections_total Total completed connections by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_completed_connections_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_completed_connections_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), + m.completed) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_sent_bytes_total Total bytes sent to clients by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_sent_bytes_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_sent_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), + m.sentBytes) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_received_bytes_total Total bytes received from clients by module and upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_received_bytes_total counter") + for _, m := range moduleStats { + _, _ = fmt.Fprintf(w, "rsync_proxy_module_received_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(m.key.module), + prometheusEscapeLabelValue(m.key.upstream), + m.recvBytes) + } } diff --git a/pkg/server/server.go b/pkg/server/server.go index be46f45..67efef9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -127,6 +127,21 @@ type upstreamCounters struct { dialError atomic.Uint64 } +// moduleUpstreamKey identifies a (module, upstream) pair for per-module +// lifetime counters. +type moduleUpstreamKey struct { + module string + upstream string +} + +// moduleCounters holds per-(module, upstream) lifetime counters that are +// updated when a relay finishes successfully. +type moduleCounters struct { + completed atomic.Uint64 + sentBytes atomic.Uint64 + recvBytes atomic.Uint64 +} + type Server struct { // --- Options section // Listen Address @@ -166,6 +181,11 @@ type Server struct { upstreamCounters sync.Map unknownModuleCount atomic.Uint64 + // Per-(module, upstream) counters tracked when a relay finishes + // successfully. Lazy-initialized via getModuleCounters. + // map key is moduleUpstreamKey. Value is *moduleCounters. + moduleCounters sync.Map + TCPListener net.Listener TLSListener net.Listener HTTPListener net.Listener @@ -346,6 +366,26 @@ func (s *Server) getUpstreamCounters(name string) *upstreamCounters { return v.(*upstreamCounters) } +// getModuleCounters returns the per-(module, upstream) counters, creating +// them lazily on first reference. Safe for concurrent use. +// +// Empty module/upstream values are normalized to "unknown" so that the +// internal sync.Map key matches what prometheusLabelValueOrUnknown emits at +// scrape time. Otherwise an empty string and the literal "unknown" would +// produce two distinct map entries that render to the same Prometheus label +// set, leading to duplicate output lines. +func (s *Server) getModuleCounters(module, upstream string) *moduleCounters { + key := moduleUpstreamKey{ + module: prometheusLabelValueOrUnknown(module), + upstream: prometheusLabelValueOrUnknown(upstream), + } + if v, ok := s.moduleCounters.Load(key); ok { + return v.(*moduleCounters) + } + v, _ := s.moduleCounters.LoadOrStore(key, &moduleCounters{}) + return v.(*moduleCounters) +} + func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target { modules := map[string][]Target{} for _, upstream := range upstreams { @@ -744,6 +784,11 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err s.sentBytesTotal.Add(uint64(sentBytes)) s.recvBytesTotal.Add(uint64(receivedBytes)) + mc := s.getModuleCounters(moduleName, target.Upstream) + mc.completed.Add(1) + mc.sentBytes.Add(uint64(sentBytes)) + mc.recvBytes.Add(uint64(receivedBytes)) + duration := time.Since(info.ConnectedAt) s.accessLog.F("client %s finishes module %s (sent: %d, received: %d, duration: %s)", ip, moduleName, sentBytes, receivedBytes, duration) return nil diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 9b374aa..fe19717 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -742,6 +742,17 @@ func TestMetricsIncludesLifetimeCounters(t *testing.T) { assert.Contains(t, text, fmt.Sprintf("rsync_proxy_sent_bytes_total %d\n", len(payload))) assert.Contains(t, text, "# HELP rsync_proxy_received_bytes_total") assert.Contains(t, text, "# TYPE rsync_proxy_received_bytes_total counter") + + // Per-(module, upstream) lifetime counters. + assert.Contains(t, text, "# HELP rsync_proxy_module_completed_connections_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_completed_connections_total counter") + assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"fake\",upstream=\"u1\"} 1\n") + assert.Contains(t, text, "# HELP rsync_proxy_module_sent_bytes_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_sent_bytes_total counter") + assert.Contains(t, text, fmt.Sprintf("rsync_proxy_module_sent_bytes_total{module=\"fake\",upstream=\"u1\"} %d\n", len(payload))) + assert.Contains(t, text, "# HELP rsync_proxy_module_received_bytes_total") + assert.Contains(t, text, "# TYPE rsync_proxy_module_received_bytes_total counter") + assert.Contains(t, text, "rsync_proxy_module_received_bytes_total{module=\"fake\",upstream=\"u1\"} 0\n") } func TestPrometheusLabelValueEscaping(t *testing.T) { @@ -1101,3 +1112,27 @@ func TestDiscoverModulesFromTrailingModuleBlock(t *testing.T) { require.NoError(t, err) assert.Equal(t, []string{"bar", "foo"}, modules) } + +func TestModuleCountersNormalizeEmptyKeyToUnknown(t *testing.T) { + srv := New() + + // Both empty and "unknown" inputs must point at the same internal + // counter, so a scrape cannot emit two lines that share the same + // rendered Prometheus label set. + c1 := srv.getModuleCounters("", "") + c2 := srv.getModuleCounters("unknown", "unknown") + assert.Same(t, c1, c2) + + c1.completed.Add(7) + + // metrics.go uses prometheusEscapeLabelValue on the stored key only, + // so the rendered output must show the normalized "unknown" value. + var buf bytes.Buffer + srv.writePrometheusMetrics(&buf, time.Now()) + text := buf.String() + assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"unknown\",upstream=\"unknown\"} 7\n") + + // And there should be exactly one such line — i.e. no second line with + // an empty-string label rendered separately. + assert.Equal(t, 1, strings.Count(text, "rsync_proxy_module_completed_connections_total{")) +}