From 8c6b1c85a5fecdf705def909708a3cbb3baccc98 Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Thu, 28 May 2026 10:03:03 +0800 Subject: [PATCH 1/4] feat: add Prometheus observability metrics and Go runtime exposure - Add queue depth, queue capacity, and queue full rejection counters - Add upstream dial error and unknown module request counters - Expose Go runtime metrics (goroutines, memory, GC) via promhttp - Expose per-upstream queue configuration (max_active, max_queued) - Add Len(), QueuedLen(), ActiveLen(), GetMaxQueued() to queue.Queue --- go.mod | 13 +++++++++-- go.sum | 49 +++++++++++++++++++++++++++++---------- pkg/queue/queue.go | 21 +++++++++++++++++ pkg/server/metrics.go | 48 ++++++++++++++++++++++++++++++++++++++ pkg/server/server.go | 11 +++++++++ pkg/server/server_test.go | 2 +- 6 files changed, 129 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index e6992f2..ee48a1f 100644 --- a/go.mod +++ b/go.mod @@ -5,25 +5,34 @@ go 1.26 require ( github.com/olekukonko/tablewriter v1.1.4 github.com/pelletier/go-toml v1.9.5 + github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.2 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.11.1 ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.10.0 // indirect github.com/clipperhouse/uax29/v2 v2.6.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.18.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.19 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6 // indirect github.com/olekukonko/errors v1.2.0 // indirect github.com/olekukonko/ll v0.1.6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/spf13/pflag v1.0.10 // indirect - golang.org/x/sys v0.30.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f56071f..d09b71d 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clipperhouse/displaywidth v0.10.0 h1:GhBG8WuerxjFQQYeuZAeVTuyxuX+UraiZGD4HJQ3Y8g= @@ -5,19 +7,31 @@ github.com/clipperhouse/displaywidth v0.10.0/go.mod h1:XqJajYsaiEwkxOj4bowCTMcT1 github.com/clipperhouse/uax29/v2 v2.6.0 h1:z0cDbUV+aPASdFb2/ndFnS9ts/WNXgTNNGFoKXuhpos= github.com/clipperhouse/uax29/v2 v2.6.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw= github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6 h1:zrbMGy9YXpIeTnGj4EljqMiZsIcE09mmF8XsD5AYOJc= github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6/go.mod h1:rEKTHC9roVVicUIfZK7DYrdIoM0EOr8mK1Hj5s3JjH0= github.com/olekukonko/errors v1.2.0 h1:10Zcn4GeV59t/EGqJc8fUjtFT/FuUh5bTMzZ1XwmCRo= @@ -30,25 +44,36 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 92ef967..429b0dc 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -47,6 +47,27 @@ func (q *Queue) GetMax() int { return ret } +func (q *Queue) GetMaxQueued() int { + q.mu.Lock() + ret := q.maxQueued + q.mu.Unlock() + return ret +} + +func (q *Queue) ActiveLen() int { + q.mu.Lock() + ret := len(q.active) + q.mu.Unlock() + return ret +} + +func (q *Queue) QueuedLen() int { + q.mu.Lock() + ret := len(q.queued) + q.mu.Unlock() + return ret +} + func (q *Queue) SetMax(max, maxQueued int) { q.mu.Lock() q.max, q.maxQueued = max, maxQueued diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index 06d586c..784fe8a 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -39,6 +39,54 @@ type prometheusConnectionGroup struct { func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { connections := s.ListConnectionInfo() + s.reloadLock.RLock() + upstreams := s.upstreams + queues := s.upstreamQueues + s.reloadLock.RUnlock() + + sort.Slice(upstreams, func(i, j int) bool { + return upstreams[i].Name < upstreams[j].Name + }) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queued_connections Current queued rsync proxy connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queued_connections gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queued_connections{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.QueuedLen()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_active_max Configured max active connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_active_max gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_active_max{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.GetMax()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_queued_max Configured max queued connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_queued_max gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_queued_max{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.GetMaxQueued()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_full_rejected_total Total connections rejected due to queue full.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_full_rejected_total counter") + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_full_rejected_total %d\n", s.queueFullConnCount.Load()) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_upstream_dial_errors_total Total upstream dial failures.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_upstream_dial_errors_total counter") + _, _ = fmt.Fprintf(w, "rsync_proxy_upstream_dial_errors_total %d\n", s.upstreamDialErrorCount.Load()) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_unknown_module_requests_total Total requests for unknown modules.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_unknown_module_requests_total counter") + _, _ = fmt.Fprintf(w, "rsync_proxy_unknown_module_requests_total %d\n", s.unknownModuleCount.Load()) + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections gauge") _, _ = fmt.Fprintf(w, "rsync_proxy_active_connections %d\n", s.GetActiveConnectionCount()) diff --git a/pkg/server/server.go b/pkg/server/server.go index 857a676..a77b9c0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -24,6 +24,9 @@ import ( "github.com/ustclug/rsync-proxy/pkg/logging" "github.com/ustclug/rsync-proxy/pkg/queue" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -152,6 +155,10 @@ type Server struct { sentBytesTotal atomic.Uint64 recvBytesTotal atomic.Uint64 + queueFullConnCount atomic.Uint64 + upstreamDialErrorCount atomic.Uint64 + unknownModuleCount atomic.Uint64 + TCPListener net.Listener TLSListener net.Listener HTTPListener net.Listener @@ -573,6 +580,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err // ...\n" followed by "@RSYNCD: EXIT" caused the client to // exit 0, which masked the failure for downstream tools such // as tunasync (which then marked the job as success). + s.unknownModuleCount.Add(1) _, _ = writeWithTimeout(downConn, fmt.Appendf(nil, "@ERROR: Unknown module '%s'\n", moduleName), writeTimeout) s.accessLog.F("client %s requests non-existing module %s", ip, moduleName) return nil @@ -592,6 +600,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err defer handle.Release() status := <-handle.C if status.Full { + s.queueFullConnCount.Add(1) s.accessLog.F("client %s queue full for module %s", ip, moduleName) _, _ = writeWithTimeout(downConn, []byte("Server queue is full for this upstream. Please retry later.\n"), writeTimeout) _, _ = writeWithTimeout(downConn, RsyncdExit, writeTimeout) @@ -627,6 +636,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err upConn, err := dialContextTCPOrUnix(ctx, s.dialer, upstreamAddr) if err != nil { + s.upstreamDialErrorCount.Add(1) return fmt.Errorf("dial to upstream: %s: %w", upstreamAddr, err) } defer upConn.Close() @@ -848,6 +858,7 @@ func (s *Server) runHTTPServer() error { } w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{DisableCompression: true}).ServeHTTP(w, r) s.writePrometheusMetrics(w, time.Now()) }) diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 755674a..5cd9873 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -394,7 +394,7 @@ func TestMetricsEndpointNoConnections(t *testing.T) { text := string(body) assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "text/plain; version=0.0.4; charset=utf-8", resp.Header.Get("Content-Type")) + assert.Contains(t, resp.Header.Get("Content-Type"), "text/plain; version=0.0.4") assert.Contains(t, text, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.") assert.Contains(t, text, "# TYPE rsync_proxy_active_connections gauge") assert.Contains(t, text, "rsync_proxy_active_connections 0\n") From ed963e12366845caf136b12fa598017ca0d3b702 Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Thu, 28 May 2026 10:12:04 +0800 Subject: [PATCH 2/4] fix: address Copilot review feedback - Deep copy upstreams slice and queues map before releasing reloadLock to prevent data races during sort and iteration - Disable OpenMetrics negotiation in promhttp to prevent # EOF marker from breaking appended custom metrics --- pkg/server/metrics.go | 10 ++++++++-- pkg/server/server.go | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index 784fe8a..b9d7a3e 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -6,6 +6,8 @@ import ( "sort" "strings" "time" + + "github.com/ustclug/rsync-proxy/pkg/queue" ) func prometheusEscapeLabelValue(s string) string { @@ -40,8 +42,12 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { connections := s.ListConnectionInfo() s.reloadLock.RLock() - upstreams := s.upstreams - queues := s.upstreamQueues + upstreams := make([]upstreamConfig, len(s.upstreams)) + copy(upstreams, s.upstreams) + queues := make(map[string]*queue.Queue, len(s.upstreamQueues)) + for k, v := range s.upstreamQueues { + queues[k] = v + } s.reloadLock.RUnlock() sort.Slice(upstreams, func(i, j int) bool { diff --git a/pkg/server/server.go b/pkg/server/server.go index a77b9c0..afd12a5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -858,7 +858,7 @@ func (s *Server) runHTTPServer() error { } w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") - promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{DisableCompression: true}).ServeHTTP(w, r) + promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{DisableCompression: true, EnableOpenMetrics: false}).ServeHTTP(w, r) s.writePrometheusMetrics(w, time.Now()) }) From d2aa7d435d650794392df35d1b7343a30f981f71 Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Thu, 28 May 2026 10:58:30 +0800 Subject: [PATCH 3/4] refactor: label per-upstream failure counters, add metrics tests Address self-review on PR #34. * Labelize `rsync_proxy_queue_full_rejected_total` and `rsync_proxy_upstream_dial_errors_total` with `upstream`. Failures are now attributable to a specific upstream instead of being a single global counter, which is essential for alerting and debugging. * Replace the two global atomic counters on `Server` with a `sync.Map` of per-upstream `upstreamCounters` (lazy-initialized on first reference). `unknownModuleCount` stays global since it has no upstream. * Drop the explicit `Content-Type` header from the /metrics handler. `promhttp.HandlerFor` already sets it via content negotiation, so the previous pre-set value was overwritten. Add a comment explaining why `EnableOpenMetrics` stays disabled (no '# EOF' terminator so the legacy text-format metrics can still be appended). * Add tests for the new metrics: * `TestMetricsIncludesQueueGauges` covers `rsync_proxy_queued_connections`, `queue_active_max`, `queue_queued_max`, plus zero-state failure counters. * `TestMetricsCountsQueueFullRejection` triggers a real queue-full rejection and asserts the labeled counter increments. * `TestMetricsCountsUnknownModule` exercises the unknown-module code path and asserts the counter increments. * `TestMetricsIncludesGoRuntime` verifies promhttp's runtime metrics (`go_goroutines`, `go_gc_duration_seconds`) appear and that legacy text-format metrics still follow them. --- pkg/server/metrics.go | 16 +++- pkg/server/server.go | 33 +++++-- pkg/server/server_test.go | 180 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 219 insertions(+), 10 deletions(-) diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index b9d7a3e..fedfeb1 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -81,13 +81,21 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { } } - _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_full_rejected_total Total connections rejected due to queue full.") + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_full_rejected_total Total connections rejected due to queue full per upstream.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_full_rejected_total counter") - _, _ = fmt.Fprintf(w, "rsync_proxy_queue_full_rejected_total %d\n", s.queueFullConnCount.Load()) + for _, u := range upstreams { + c := s.getUpstreamCounters(u.Name) + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_full_rejected_total{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), c.queueFull.Load()) + } - _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_upstream_dial_errors_total Total upstream dial failures.") + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_upstream_dial_errors_total Total upstream dial failures per upstream.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_upstream_dial_errors_total counter") - _, _ = fmt.Fprintf(w, "rsync_proxy_upstream_dial_errors_total %d\n", s.upstreamDialErrorCount.Load()) + for _, u := range upstreams { + c := s.getUpstreamCounters(u.Name) + _, _ = fmt.Fprintf(w, "rsync_proxy_upstream_dial_errors_total{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), c.dialError.Load()) + } _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_unknown_module_requests_total Total requests for unknown modules.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_unknown_module_requests_total counter") diff --git a/pkg/server/server.go b/pkg/server/server.go index afd12a5..f36368d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -121,6 +121,12 @@ type upstreamConfig struct { MaxQueuedConns int } +// upstreamCounters holds per-upstream failure counters. +type upstreamCounters struct { + queueFull atomic.Uint64 + dialError atomic.Uint64 +} + type Server struct { // --- Options section // Listen Address @@ -155,9 +161,10 @@ type Server struct { sentBytesTotal atomic.Uint64 recvBytesTotal atomic.Uint64 - queueFullConnCount atomic.Uint64 - upstreamDialErrorCount atomic.Uint64 - unknownModuleCount atomic.Uint64 + // Per-upstream failure counters. Lazy-initialized via getUpstreamCounters. + // map key is upstream name. Value is *upstreamCounters. + upstreamCounters sync.Map + unknownModuleCount atomic.Uint64 TCPListener net.Listener TLSListener net.Listener @@ -329,6 +336,16 @@ func (s *Server) getQueueForUpstream(name string) (*queue.Queue, bool) { return q, ok } +// getUpstreamCounters returns the per-upstream counters, creating them lazily +// on first reference. Safe for concurrent use. +func (s *Server) getUpstreamCounters(name string) *upstreamCounters { + if v, ok := s.upstreamCounters.Load(name); ok { + return v.(*upstreamCounters) + } + v, _ := s.upstreamCounters.LoadOrStore(name, &upstreamCounters{}) + return v.(*upstreamCounters) +} + func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target { modules := map[string][]Target{} for _, upstream := range upstreams { @@ -600,7 +617,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err defer handle.Release() status := <-handle.C if status.Full { - s.queueFullConnCount.Add(1) + s.getUpstreamCounters(target.Upstream).queueFull.Add(1) s.accessLog.F("client %s queue full for module %s", ip, moduleName) _, _ = writeWithTimeout(downConn, []byte("Server queue is full for this upstream. Please retry later.\n"), writeTimeout) _, _ = writeWithTimeout(downConn, RsyncdExit, writeTimeout) @@ -636,7 +653,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err upConn, err := dialContextTCPOrUnix(ctx, s.dialer, upstreamAddr) if err != nil { - s.upstreamDialErrorCount.Add(1) + s.getUpstreamCounters(target.Upstream).dialError.Add(1) return fmt.Errorf("dial to upstream: %s: %w", upstreamAddr, err) } defer upConn.Close() @@ -857,7 +874,11 @@ func (s *Server) runHTTPServer() error { return } - w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + // promhttp.HandlerFor sets the Content-Type itself based on + // content negotiation; do not pre-set it here. + // EnableOpenMetrics is disabled so that no "# EOF" terminator is + // emitted, allowing us to append our own legacy text-format + // metrics after the runtime/process metrics. promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{DisableCompression: true, EnableOpenMetrics: false}).ServeHTTP(w, r) s.writePrometheusMetrics(w, time.Now()) }) diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 5cd9873..9b374aa 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -470,6 +470,186 @@ func TestMetricsIncludesActiveConnections(t *testing.T) { wg.Done() } +func TestMetricsIncludesQueueGauges(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + // Configure two upstreams with different queue capacities to verify + // the gauges are emitted per upstream and reflect configuration. + srv.reloadLock.Lock() + srv.upstreams = []upstreamConfig{ + {Name: "u1", MaxActiveConns: 1, MaxQueuedConns: 2}, + {Name: "u2", MaxActiveConns: 0, MaxQueuedConns: 0}, + } + srv.upstreamQueues = map[string]*queue.Queue{ + "u1": queue.New(1, 2), + "u2": queue.New(0, 0), + } + srv.reloadLock.Unlock() + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + // queued_connections gauge: nothing queued yet. + assert.Contains(t, text, "# HELP rsync_proxy_queued_connections") + assert.Contains(t, text, "# TYPE rsync_proxy_queued_connections gauge") + assert.Contains(t, text, "rsync_proxy_queued_connections{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_queued_connections{upstream=\"u2\"} 0\n") + + // queue_active_max gauge. + assert.Contains(t, text, "# HELP rsync_proxy_queue_active_max") + assert.Contains(t, text, "# TYPE rsync_proxy_queue_active_max gauge") + assert.Contains(t, text, "rsync_proxy_queue_active_max{upstream=\"u1\"} 1\n") + assert.Contains(t, text, "rsync_proxy_queue_active_max{upstream=\"u2\"} 0\n") + + // queue_queued_max gauge. + assert.Contains(t, text, "# HELP rsync_proxy_queue_queued_max") + assert.Contains(t, text, "# TYPE rsync_proxy_queue_queued_max gauge") + assert.Contains(t, text, "rsync_proxy_queue_queued_max{upstream=\"u1\"} 2\n") + assert.Contains(t, text, "rsync_proxy_queue_queued_max{upstream=\"u2\"} 0\n") + + // per-upstream failure counters initialized at zero. + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u2\"} 0\n") + assert.Contains(t, text, "rsync_proxy_upstream_dial_errors_total{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_upstream_dial_errors_total{upstream=\"u2\"} 0\n") + + // unknown module counter (no label). + assert.Contains(t, text, "rsync_proxy_unknown_module_requests_total 0\n") +} + +func TestMetricsCountsQueueFullRejection(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + var release sync.WaitGroup + release.Add(1) + + upstream := rsync.NewServer(func(conn *rsync.Conn) { + defer conn.Close() + _, _, err := doServerHandshake(conn, RsyncdServerVersion) + require.NoError(t, err) + release.Wait() + }) + upstream.Start() + defer upstream.Close() + + srv.reloadLock.Lock() + srv.upstreams = []upstreamConfig{ + {Name: "u1", MaxActiveConns: 1, MaxQueuedConns: 1}, + } + srv.modules = map[string][]Target{ + "fake": {{Upstream: "u1", Addr: upstream.Listener.Addr().String()}}, + } + srv.upstreamQueues = map[string]*queue.Queue{"u1": queue.New(1, 1)} + srv.reloadLock.Unlock() + + // First connection occupies the active slot. + c1Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c1 := rsync.NewConn(c1Raw) + defer c1.Close() + _, err = doClientHandshake(c1, RsyncdServerVersion, "fake") + require.NoError(t, err) + + // Second connection fills the queued slot. + c2Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c2 := rsync.NewConn(c2Raw) + defer c2.Close() + _, err = doClientHandshake(c2, RsyncdServerVersion, "fake") + require.NoError(t, err) + _, err = c2.ReadLine() + require.NoError(t, err) + _, err = c2.ReadLine() + require.NoError(t, err) + + // Third connection should be rejected with queue-full. + c3Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c3 := rsync.NewConn(c3Raw) + defer c3.Close() + _, err = doClientHandshake(c3, RsyncdServerVersion, "fake") + require.NoError(t, err) + line, err := c3.ReadLine() + require.NoError(t, err) + require.Contains(t, line, "Server queue is full") + + require.Eventually(t, func() bool { + return srv.getUpstreamCounters("u1").queueFull.Load() == 1 + }, time.Second, 10*time.Millisecond) + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u1\"} 1\n") + + release.Done() +} + +func TestMetricsCountsUnknownModule(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + srv.reloadLock.Lock() + srv.modules = map[string][]Target{} + srv.upstreamQueues = map[string]*queue.Queue{} + srv.upstreams = nil + srv.reloadLock.Unlock() + + rawConn, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + conn := rsync.NewConn(rawConn) + defer conn.Close() + + _, err = doClientHandshake(conn, RsyncdServerVersion, "does-not-exist") + require.NoError(t, err) + _, err = io.ReadAll(conn) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return srv.unknownModuleCount.Load() == 1 + }, time.Second, 10*time.Millisecond) + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + assert.Contains(t, text, "rsync_proxy_unknown_module_requests_total 1\n") +} + +func TestMetricsIncludesGoRuntime(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + + // promhttp's default gatherer exposes Go runtime and process metrics. + assert.Contains(t, text, "go_goroutines") + assert.Contains(t, text, "go_gc_duration_seconds") + // Our legacy text-format metrics should still be present after the + // promhttp output (no "# EOF" terminator from OpenMetrics). + assert.Contains(t, text, "rsync_proxy_active_connections") +} + func TestPrometheusConnectionGroupingUsesStructuredKey(t *testing.T) { srv := New() From 8e58b63bd09948b0e66e41ed2f26c9cce04bdc41 Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Sat, 30 May 2026 22:36:52 +0800 Subject: [PATCH 4/4] fix: order imports for gci linter Place third-party (prometheus/client_golang) before local (github.com/ustclug/rsync-proxy/...) imports as required by the project's gci configuration. --- pkg/server/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index f36368d..be46f45 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,11 +22,11 @@ import ( "sync/atomic" "time" - "github.com/ustclug/rsync-proxy/pkg/logging" - "github.com/ustclug/rsync-proxy/pkg/queue" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/ustclug/rsync-proxy/pkg/logging" + "github.com/ustclug/rsync-proxy/pkg/queue" ) const (