diff --git a/go.mod b/go.mod index e6992f2..ee48a1f 100644 --- a/go.mod +++ b/go.mod @@ -5,25 +5,34 @@ go 1.26 require ( github.com/olekukonko/tablewriter v1.1.4 github.com/pelletier/go-toml v1.9.5 + github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.2 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.11.1 ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.10.0 // indirect github.com/clipperhouse/uax29/v2 v2.6.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.18.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.19 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6 // indirect github.com/olekukonko/errors v1.2.0 // indirect github.com/olekukonko/ll v0.1.6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/spf13/pflag v1.0.10 // indirect - golang.org/x/sys v0.30.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f56071f..d09b71d 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clipperhouse/displaywidth v0.10.0 h1:GhBG8WuerxjFQQYeuZAeVTuyxuX+UraiZGD4HJQ3Y8g= @@ -5,19 +7,31 @@ github.com/clipperhouse/displaywidth v0.10.0/go.mod h1:XqJajYsaiEwkxOj4bowCTMcT1 github.com/clipperhouse/uax29/v2 v2.6.0 h1:z0cDbUV+aPASdFb2/ndFnS9ts/WNXgTNNGFoKXuhpos= github.com/clipperhouse/uax29/v2 v2.6.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw= github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6 h1:zrbMGy9YXpIeTnGj4EljqMiZsIcE09mmF8XsD5AYOJc= github.com/olekukonko/cat v0.0.0-20250911104152-50322a0618f6/go.mod h1:rEKTHC9roVVicUIfZK7DYrdIoM0EOr8mK1Hj5s3JjH0= github.com/olekukonko/errors v1.2.0 h1:10Zcn4GeV59t/EGqJc8fUjtFT/FuUh5bTMzZ1XwmCRo= @@ -30,25 +44,36 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 92ef967..429b0dc 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -47,6 +47,27 @@ func (q *Queue) GetMax() int { return ret } +func (q *Queue) GetMaxQueued() int { + q.mu.Lock() + ret := q.maxQueued + q.mu.Unlock() + return ret +} + +func (q *Queue) ActiveLen() int { + q.mu.Lock() + ret := len(q.active) + q.mu.Unlock() + return ret +} + +func (q *Queue) QueuedLen() int { + q.mu.Lock() + ret := len(q.queued) + q.mu.Unlock() + return ret +} + func (q *Queue) SetMax(max, maxQueued int) { q.mu.Lock() q.max, q.maxQueued = max, maxQueued diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index 06d586c..fedfeb1 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -6,6 +6,8 @@ import ( "sort" "strings" "time" + + "github.com/ustclug/rsync-proxy/pkg/queue" ) func prometheusEscapeLabelValue(s string) string { @@ -39,6 +41,66 @@ type prometheusConnectionGroup struct { func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { connections := s.ListConnectionInfo() + s.reloadLock.RLock() + upstreams := make([]upstreamConfig, len(s.upstreams)) + copy(upstreams, s.upstreams) + queues := make(map[string]*queue.Queue, len(s.upstreamQueues)) + for k, v := range s.upstreamQueues { + queues[k] = v + } + s.reloadLock.RUnlock() + + sort.Slice(upstreams, func(i, j int) bool { + return upstreams[i].Name < upstreams[j].Name + }) + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queued_connections Current queued rsync proxy connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queued_connections gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queued_connections{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.QueuedLen()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_active_max Configured max active connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_active_max gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_active_max{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.GetMax()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_queued_max Configured max queued connections per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_queued_max gauge") + for _, u := range upstreams { + if q, ok := queues[u.Name]; ok { + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_queued_max{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), q.GetMaxQueued()) + } + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_queue_full_rejected_total Total connections rejected due to queue full per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_queue_full_rejected_total counter") + for _, u := range upstreams { + c := s.getUpstreamCounters(u.Name) + _, _ = fmt.Fprintf(w, "rsync_proxy_queue_full_rejected_total{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), c.queueFull.Load()) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_upstream_dial_errors_total Total upstream dial failures per upstream.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_upstream_dial_errors_total counter") + for _, u := range upstreams { + c := s.getUpstreamCounters(u.Name) + _, _ = fmt.Fprintf(w, "rsync_proxy_upstream_dial_errors_total{upstream=\"%s\"} %d\n", + prometheusEscapeLabelValue(u.Name), c.dialError.Load()) + } + + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_unknown_module_requests_total Total requests for unknown modules.") + _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_unknown_module_requests_total counter") + _, _ = fmt.Fprintf(w, "rsync_proxy_unknown_module_requests_total %d\n", s.unknownModuleCount.Load()) + _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections gauge") _, _ = fmt.Fprintf(w, "rsync_proxy_active_connections %d\n", s.GetActiveConnectionCount()) diff --git a/pkg/server/server.go b/pkg/server/server.go index 857a676..be46f45 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,6 +22,9 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/ustclug/rsync-proxy/pkg/logging" "github.com/ustclug/rsync-proxy/pkg/queue" ) @@ -118,6 +121,12 @@ type upstreamConfig struct { MaxQueuedConns int } +// upstreamCounters holds per-upstream failure counters. +type upstreamCounters struct { + queueFull atomic.Uint64 + dialError atomic.Uint64 +} + type Server struct { // --- Options section // Listen Address @@ -152,6 +161,11 @@ type Server struct { sentBytesTotal atomic.Uint64 recvBytesTotal atomic.Uint64 + // Per-upstream failure counters. Lazy-initialized via getUpstreamCounters. + // map key is upstream name. Value is *upstreamCounters. + upstreamCounters sync.Map + unknownModuleCount atomic.Uint64 + TCPListener net.Listener TLSListener net.Listener HTTPListener net.Listener @@ -322,6 +336,16 @@ func (s *Server) getQueueForUpstream(name string) (*queue.Queue, bool) { return q, ok } +// getUpstreamCounters returns the per-upstream counters, creating them lazily +// on first reference. Safe for concurrent use. +func (s *Server) getUpstreamCounters(name string) *upstreamCounters { + if v, ok := s.upstreamCounters.Load(name); ok { + return v.(*upstreamCounters) + } + v, _ := s.upstreamCounters.LoadOrStore(name, &upstreamCounters{}) + return v.(*upstreamCounters) +} + func buildModuleTargets(upstreams []upstreamConfig) map[string][]Target { modules := map[string][]Target{} for _, upstream := range upstreams { @@ -573,6 +597,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err // ...\n" followed by "@RSYNCD: EXIT" caused the client to // exit 0, which masked the failure for downstream tools such // as tunasync (which then marked the job as success). + s.unknownModuleCount.Add(1) _, _ = writeWithTimeout(downConn, fmt.Appendf(nil, "@ERROR: Unknown module '%s'\n", moduleName), writeTimeout) s.accessLog.F("client %s requests non-existing module %s", ip, moduleName) return nil @@ -592,6 +617,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err defer handle.Release() status := <-handle.C if status.Full { + s.getUpstreamCounters(target.Upstream).queueFull.Add(1) s.accessLog.F("client %s queue full for module %s", ip, moduleName) _, _ = writeWithTimeout(downConn, []byte("Server queue is full for this upstream. Please retry later.\n"), writeTimeout) _, _ = writeWithTimeout(downConn, RsyncdExit, writeTimeout) @@ -627,6 +653,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err upConn, err := dialContextTCPOrUnix(ctx, s.dialer, upstreamAddr) if err != nil { + s.getUpstreamCounters(target.Upstream).dialError.Add(1) return fmt.Errorf("dial to upstream: %s: %w", upstreamAddr, err) } defer upConn.Close() @@ -847,7 +874,12 @@ func (s *Server) runHTTPServer() error { return } - w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + // promhttp.HandlerFor sets the Content-Type itself based on + // content negotiation; do not pre-set it here. + // EnableOpenMetrics is disabled so that no "# EOF" terminator is + // emitted, allowing us to append our own legacy text-format + // metrics after the runtime/process metrics. + promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{DisableCompression: true, EnableOpenMetrics: false}).ServeHTTP(w, r) s.writePrometheusMetrics(w, time.Now()) }) diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 755674a..9b374aa 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -394,7 +394,7 @@ func TestMetricsEndpointNoConnections(t *testing.T) { text := string(body) assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "text/plain; version=0.0.4; charset=utf-8", resp.Header.Get("Content-Type")) + assert.Contains(t, resp.Header.Get("Content-Type"), "text/plain; version=0.0.4") assert.Contains(t, text, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.") assert.Contains(t, text, "# TYPE rsync_proxy_active_connections gauge") assert.Contains(t, text, "rsync_proxy_active_connections 0\n") @@ -470,6 +470,186 @@ func TestMetricsIncludesActiveConnections(t *testing.T) { wg.Done() } +func TestMetricsIncludesQueueGauges(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + // Configure two upstreams with different queue capacities to verify + // the gauges are emitted per upstream and reflect configuration. + srv.reloadLock.Lock() + srv.upstreams = []upstreamConfig{ + {Name: "u1", MaxActiveConns: 1, MaxQueuedConns: 2}, + {Name: "u2", MaxActiveConns: 0, MaxQueuedConns: 0}, + } + srv.upstreamQueues = map[string]*queue.Queue{ + "u1": queue.New(1, 2), + "u2": queue.New(0, 0), + } + srv.reloadLock.Unlock() + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + + // queued_connections gauge: nothing queued yet. + assert.Contains(t, text, "# HELP rsync_proxy_queued_connections") + assert.Contains(t, text, "# TYPE rsync_proxy_queued_connections gauge") + assert.Contains(t, text, "rsync_proxy_queued_connections{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_queued_connections{upstream=\"u2\"} 0\n") + + // queue_active_max gauge. + assert.Contains(t, text, "# HELP rsync_proxy_queue_active_max") + assert.Contains(t, text, "# TYPE rsync_proxy_queue_active_max gauge") + assert.Contains(t, text, "rsync_proxy_queue_active_max{upstream=\"u1\"} 1\n") + assert.Contains(t, text, "rsync_proxy_queue_active_max{upstream=\"u2\"} 0\n") + + // queue_queued_max gauge. + assert.Contains(t, text, "# HELP rsync_proxy_queue_queued_max") + assert.Contains(t, text, "# TYPE rsync_proxy_queue_queued_max gauge") + assert.Contains(t, text, "rsync_proxy_queue_queued_max{upstream=\"u1\"} 2\n") + assert.Contains(t, text, "rsync_proxy_queue_queued_max{upstream=\"u2\"} 0\n") + + // per-upstream failure counters initialized at zero. + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u2\"} 0\n") + assert.Contains(t, text, "rsync_proxy_upstream_dial_errors_total{upstream=\"u1\"} 0\n") + assert.Contains(t, text, "rsync_proxy_upstream_dial_errors_total{upstream=\"u2\"} 0\n") + + // unknown module counter (no label). + assert.Contains(t, text, "rsync_proxy_unknown_module_requests_total 0\n") +} + +func TestMetricsCountsQueueFullRejection(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + var release sync.WaitGroup + release.Add(1) + + upstream := rsync.NewServer(func(conn *rsync.Conn) { + defer conn.Close() + _, _, err := doServerHandshake(conn, RsyncdServerVersion) + require.NoError(t, err) + release.Wait() + }) + upstream.Start() + defer upstream.Close() + + srv.reloadLock.Lock() + srv.upstreams = []upstreamConfig{ + {Name: "u1", MaxActiveConns: 1, MaxQueuedConns: 1}, + } + srv.modules = map[string][]Target{ + "fake": {{Upstream: "u1", Addr: upstream.Listener.Addr().String()}}, + } + srv.upstreamQueues = map[string]*queue.Queue{"u1": queue.New(1, 1)} + srv.reloadLock.Unlock() + + // First connection occupies the active slot. + c1Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c1 := rsync.NewConn(c1Raw) + defer c1.Close() + _, err = doClientHandshake(c1, RsyncdServerVersion, "fake") + require.NoError(t, err) + + // Second connection fills the queued slot. + c2Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c2 := rsync.NewConn(c2Raw) + defer c2.Close() + _, err = doClientHandshake(c2, RsyncdServerVersion, "fake") + require.NoError(t, err) + _, err = c2.ReadLine() + require.NoError(t, err) + _, err = c2.ReadLine() + require.NoError(t, err) + + // Third connection should be rejected with queue-full. + c3Raw, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + c3 := rsync.NewConn(c3Raw) + defer c3.Close() + _, err = doClientHandshake(c3, RsyncdServerVersion, "fake") + require.NoError(t, err) + line, err := c3.ReadLine() + require.NoError(t, err) + require.Contains(t, line, "Server queue is full") + + require.Eventually(t, func() bool { + return srv.getUpstreamCounters("u1").queueFull.Load() == 1 + }, time.Second, 10*time.Millisecond) + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + assert.Contains(t, text, "rsync_proxy_queue_full_rejected_total{upstream=\"u1\"} 1\n") + + release.Done() +} + +func TestMetricsCountsUnknownModule(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + srv.reloadLock.Lock() + srv.modules = map[string][]Target{} + srv.upstreamQueues = map[string]*queue.Queue{} + srv.upstreams = nil + srv.reloadLock.Unlock() + + rawConn, err := net.Dial("tcp", srv.TCPListener.Addr().String()) + require.NoError(t, err) + conn := rsync.NewConn(rawConn) + defer conn.Close() + + _, err = doClientHandshake(conn, RsyncdServerVersion, "does-not-exist") + require.NoError(t, err) + _, err = io.ReadAll(conn) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return srv.unknownModuleCount.Load() == 1 + }, time.Second, 10*time.Millisecond) + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + assert.Contains(t, text, "rsync_proxy_unknown_module_requests_total 1\n") +} + +func TestMetricsIncludesGoRuntime(t *testing.T) { + srv := startServer(t) + defer srv.Close() + + resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + text := string(body) + + // promhttp's default gatherer exposes Go runtime and process metrics. + assert.Contains(t, text, "go_goroutines") + assert.Contains(t, text, "go_gc_duration_seconds") + // Our legacy text-format metrics should still be present after the + // promhttp output (no "# EOF" terminator from OpenMetrics). + assert.Contains(t, text, "rsync_proxy_active_connections") +} + func TestPrometheusConnectionGroupingUsesStructuredKey(t *testing.T) { srv := New()