Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions pkg/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,58 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_received_bytes_total Total bytes received from clients since start.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_received_bytes_total counter")
_, _ = fmt.Fprintf(w, "rsync_proxy_received_bytes_total %d\n", s.recvBytesTotal.Load())

// Per-(module, upstream) lifetime counters. We collect a sorted slice of
// keys so that the text output is stable across scrapes.
type moduleStat struct {
key moduleUpstreamKey
completed uint64
sentBytes uint64
recvBytes uint64
}
var moduleStats []moduleStat
s.moduleCounters.Range(func(k, v any) bool {
key := k.(moduleUpstreamKey)
c := v.(*moduleCounters)
moduleStats = append(moduleStats, moduleStat{
key: key,
completed: c.completed.Load(),
sentBytes: c.sentBytes.Load(),
recvBytes: c.recvBytes.Load(),
})
return true
})
sort.Slice(moduleStats, func(i, j int) bool {
if moduleStats[i].key.module != moduleStats[j].key.module {
return moduleStats[i].key.module < moduleStats[j].key.module
}
return moduleStats[i].key.upstream < moduleStats[j].key.upstream
})

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_completed_connections_total Total completed connections by module and upstream.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_completed_connections_total counter")
for _, m := range moduleStats {
_, _ = fmt.Fprintf(w, "rsync_proxy_module_completed_connections_total{module=\"%s\",upstream=\"%s\"} %d\n",
prometheusEscapeLabelValue(m.key.module),
prometheusEscapeLabelValue(m.key.upstream),
m.completed)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_sent_bytes_total Total bytes sent to clients by module and upstream.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_sent_bytes_total counter")
for _, m := range moduleStats {
_, _ = fmt.Fprintf(w, "rsync_proxy_module_sent_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n",
prometheusEscapeLabelValue(m.key.module),
prometheusEscapeLabelValue(m.key.upstream),
m.sentBytes)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_module_received_bytes_total Total bytes received from clients by module and upstream.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_module_received_bytes_total counter")
for _, m := range moduleStats {
_, _ = fmt.Fprintf(w, "rsync_proxy_module_received_bytes_total{module=\"%s\",upstream=\"%s\"} %d\n",
prometheusEscapeLabelValue(m.key.module),
prometheusEscapeLabelValue(m.key.upstream),
m.recvBytes)
}
}
45 changes: 45 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ type upstreamCounters struct {
dialError atomic.Uint64
}

// moduleUpstreamKey identifies a (module, upstream) pair for per-module
// lifetime counters.
type moduleUpstreamKey struct {
module string
upstream string
}

// moduleCounters holds per-(module, upstream) lifetime counters that are
// updated when a relay finishes successfully.
type moduleCounters struct {
completed atomic.Uint64
sentBytes atomic.Uint64
recvBytes atomic.Uint64
}

type Server struct {
// --- Options section
// Listen Address
Expand Down Expand Up @@ -166,6 +181,11 @@ type Server struct {
upstreamCounters sync.Map
unknownModuleCount atomic.Uint64

// Per-(module, upstream) counters tracked when a relay finishes
// successfully. Lazy-initialized via getModuleCounters.
// map key is moduleUpstreamKey. Value is *moduleCounters.
moduleCounters sync.Map
Comment on lines +184 to +187
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice this is bounded. target.Upstream is set by the proxy itself to upstreamConfig.Name (always a configured value), and the module value at the increment site only reaches the lifetime accumulator after the request has resolved against s.modules — the listing-all-modules / unknown-module / queue-full / dial-error paths return earlier without touching getModuleCounters. So the upper bound is Σ(len(upstream.Modules)), identical in cardinality to the existing per-connection gauges. Happy to add an explicit doc comment about that bound if helpful.


TCPListener net.Listener
TLSListener net.Listener
HTTPListener net.Listener
Expand Down Expand Up @@ -346,6 +366,26 @@ func (s *Server) getUpstreamCounters(name string) *upstreamCounters {
return v.(*upstreamCounters)
}

// getModuleCounters returns the per-(module, upstream) counters, creating
// them lazily on first reference. Safe for concurrent use.
//
// Empty module/upstream values are normalized to "unknown" so that the
// internal sync.Map key matches what prometheusLabelValueOrUnknown emits at
// scrape time. Otherwise an empty string and the literal "unknown" would
// produce two distinct map entries that render to the same Prometheus label
// set, leading to duplicate output lines.
func (s *Server) getModuleCounters(module, upstream string) *moduleCounters {
key := moduleUpstreamKey{
module: prometheusLabelValueOrUnknown(module),
upstream: prometheusLabelValueOrUnknown(upstream),
}
if v, ok := s.moduleCounters.Load(key); ok {
return v.(*moduleCounters)
}
v, _ := s.moduleCounters.LoadOrStore(key, &moduleCounters{})
return v.(*moduleCounters)
}
Comment on lines +377 to +387
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in 3d3b8a7. getModuleCounters now applies prometheusLabelValueOrUnknown at key creation, so the internal sync.Map key matches the rendered Prometheus label set; metrics.go now only escapes the (already normalized) key. Added TestModuleCountersNormalizeEmptyKeyToUnknown as a regression test.

Comment on lines +377 to +387
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sticking with the load-then-LoadOrStore pattern: it matches the same shape used in getUpstreamCounters immediately above, and the Load fast-path avoids the LoadOrStore allocation/CAS on the steady-state hot path (every relay completion). Concurrency semantics are identical either way.


func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target {
modules := map[string][]Target{}
for _, upstream := range upstreams {
Expand Down Expand Up @@ -744,6 +784,11 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err
s.sentBytesTotal.Add(uint64(sentBytes))
s.recvBytesTotal.Add(uint64(receivedBytes))

mc := s.getModuleCounters(moduleName, target.Upstream)
mc.completed.Add(1)
mc.sentBytes.Add(uint64(sentBytes))
mc.recvBytes.Add(uint64(receivedBytes))

duration := time.Since(info.ConnectedAt)
s.accessLog.F("client %s finishes module %s (sent: %d, received: %d, duration: %s)", ip, moduleName, sentBytes, receivedBytes, duration)
return nil
Expand Down
35 changes: 35 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,17 @@ func TestMetricsIncludesLifetimeCounters(t *testing.T) {
assert.Contains(t, text, fmt.Sprintf("rsync_proxy_sent_bytes_total %d\n", len(payload)))
assert.Contains(t, text, "# HELP rsync_proxy_received_bytes_total")
assert.Contains(t, text, "# TYPE rsync_proxy_received_bytes_total counter")

// Per-(module, upstream) lifetime counters.
assert.Contains(t, text, "# HELP rsync_proxy_module_completed_connections_total")
assert.Contains(t, text, "# TYPE rsync_proxy_module_completed_connections_total counter")
assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"fake\",upstream=\"u1\"} 1\n")
assert.Contains(t, text, "# HELP rsync_proxy_module_sent_bytes_total")
assert.Contains(t, text, "# TYPE rsync_proxy_module_sent_bytes_total counter")
assert.Contains(t, text, fmt.Sprintf("rsync_proxy_module_sent_bytes_total{module=\"fake\",upstream=\"u1\"} %d\n", len(payload)))
assert.Contains(t, text, "# HELP rsync_proxy_module_received_bytes_total")
assert.Contains(t, text, "# TYPE rsync_proxy_module_received_bytes_total counter")
assert.Contains(t, text, "rsync_proxy_module_received_bytes_total{module=\"fake\",upstream=\"u1\"} 0\n")
Comment on lines +746 to +755
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty-vs-unknown collision is now covered by TestModuleCountersNormalizeEmptyKeyToUnknown (3d3b8a7). Escape-character coverage already exists at the helper level in TestPrometheusEscapeLabelValue (quotes, backslashes, newlines), and exercising the full render path with an exotic module name would mostly re-test that helper — leaving as-is to keep the test surface focused.

}

func TestPrometheusLabelValueEscaping(t *testing.T) {
Expand Down Expand Up @@ -1101,3 +1112,27 @@ func TestDiscoverModulesFromTrailingModuleBlock(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, []string{"bar", "foo"}, modules)
}

func TestModuleCountersNormalizeEmptyKeyToUnknown(t *testing.T) {
srv := New()

// Both empty and "unknown" inputs must point at the same internal
// counter, so a scrape cannot emit two lines that share the same
// rendered Prometheus label set.
c1 := srv.getModuleCounters("", "")
c2 := srv.getModuleCounters("unknown", "unknown")
assert.Same(t, c1, c2)

c1.completed.Add(7)

// metrics.go uses prometheusEscapeLabelValue on the stored key only,
// so the rendered output must show the normalized "unknown" value.
var buf bytes.Buffer
srv.writePrometheusMetrics(&buf, time.Now())
text := buf.String()
assert.Contains(t, text, "rsync_proxy_module_completed_connections_total{module=\"unknown\",upstream=\"unknown\"} 7\n")

// And there should be exactly one such line — i.e. no second line with
// an empty-string label rendered separately.
assert.Equal(t, 1, strings.Count(text, "rsync_proxy_module_completed_connections_total{"))
}
Loading