From d0947e6983302510e3434158cdbfb51bee3e963c Mon Sep 17 00:00:00 2001 From: yaoge123 Date: Sun, 31 May 2026 10:14:09 +0800 Subject: [PATCH] refactor: unify upstream label to use config name Previously, two metric families used the upstream label with conflicting semantics: - Per-connection metrics (introduced in #30) used the backend address (IP:port from target.Addr): active_connections_by_module, connection_sent_bytes, connection_received_bytes, connection_connected_timestamp_seconds, connection_duration_seconds. - Queue metrics (introduced in #34) used the upstream config name (from upstreamConfig.Name): queued_connections, queue_active_max, queue_queued_max, queue_full_rejected_total, upstream_dial_errors_total. Same label name, different value space, so they could not be joined with on(upstream) in PromQL. The config name is also the more stable identifier - backend addresses can change when an upstream is reconfigured, but the config name is fixed. Switch the per-connection metrics to use the upstream config name so the upstream label has a single, consistent meaning across all metrics. This is technically a breaking change for any dashboard or alert that relied on the old IP:port values, but #30 only landed recently and has not seen widespread use yet, so it is the right time to fix this. Renames: - ConnInfo.UpstreamAddr -> ConnInfo.Upstream - ConnInfo.SetUpstreamAddr -> ConnInfo.SetUpstream - connInfoSnapshot.UpstreamAddr -> connInfoSnapshot.Upstream (JSON tag "upstream" preserved for /connections API compatibility) The /connections JSON value now contains the upstream name. The rsync-proxyctl connections table column header ("Upstream") already fits this semantics. --- cmd/cmd.go | 6 +++--- pkg/server/metrics.go | 10 +++++----- pkg/server/server.go | 12 ++++++------ pkg/server/server_test.go | 15 ++++++++------- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index c3dd94e..1cb8be0 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -90,7 +90,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error { Index int `json:"index"` RemoteAddr string `json:"remote"` Module string `json:"module"` - UpstreamAddr string `json:"upstream"` + Upstream string `json:"upstream"` ConnectedAt time.Time `json:"connected"` ReceivedBytes int64 `json:"receivedBytes"` SentBytes int64 `json:"sentBytes"` @@ -125,7 +125,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error { tw.AlignRight, // Index tw.AlignRight, // RemoteAddr tw.AlignDefault, // Module - tw.AlignRight, // UpstreamAddr + tw.AlignDefault, // Upstream tw.AlignDefault, // ConnectedAt tw.AlignRight, // ReceivedBytes tw.AlignRight, // SentBytes @@ -137,7 +137,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error { strconv.Itoa(conn.Index), conn.RemoteAddr, conn.Module, - conn.UpstreamAddr, + conn.Upstream, conn.ConnectedAt.Format(time.DateTime), strconv.FormatInt(conn.ReceivedBytes, 10), strconv.FormatInt(conn.SentBytes, 10), diff --git a/pkg/server/metrics.go b/pkg/server/metrics.go index fedfeb1..990c240 100644 --- a/pkg/server/metrics.go +++ b/pkg/server/metrics.go @@ -110,7 +110,7 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { snapshot := conn.snapshot() key := prometheusConnectionGroup{ module: prometheusLabelValueOrUnknown(snapshot.Module), - upstream: prometheusLabelValueOrUnknown(snapshot.UpstreamAddr), + upstream: prometheusLabelValueOrUnknown(snapshot.Upstream), } connectionCounts[key]++ } @@ -138,21 +138,21 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_sent_bytes gauge") for _, conn := range connections { snapshot := conn.snapshot() - _, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.SentBytes) + _, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.SentBytes) } _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_received_bytes Bytes received from clients for active connections.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_received_bytes gauge") for _, conn := range connections { snapshot := conn.snapshot() - _, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.ReceivedBytes) + _, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.ReceivedBytes) } _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_connected_timestamp_seconds Unix timestamp when active connections were established.") _, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_connected_timestamp_seconds gauge") for _, conn := range connections { snapshot := conn.snapshot() - _, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.ConnectedAt.Unix()) + _, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.ConnectedAt.Unix()) } _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_duration_seconds Current duration of active connections.") @@ -163,7 +163,7 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) { if duration < 0 { duration = 0 } - _, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.3f\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), duration) + _, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.3f\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), duration) } _, _ = fmt.Fprintln(w, "# HELP rsync_proxy_accepted_connections_total Total accepted connections since start.") diff --git a/pkg/server/server.go b/pkg/server/server.go index be46f45..d0c5287 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -59,7 +59,7 @@ type ConnInfo struct { RemoteAddr string ConnectedAt time.Time Module string - UpstreamAddr string + Upstream string SentBytes atomic.Int64 ReceivedBytes atomic.Int64 } @@ -70,7 +70,7 @@ type connInfoSnapshot struct { RemoteAddr string `json:"remote"` ConnectedAt time.Time `json:"connected"` Module string `json:"module"` - UpstreamAddr string `json:"upstream"` + Upstream string `json:"upstream"` SentBytes int64 `json:"sentBytes"` ReceivedBytes int64 `json:"receivedBytes"` } @@ -81,10 +81,10 @@ func (c *ConnInfo) SetModule(module string) { c.Module = module } -func (c *ConnInfo) SetUpstreamAddr(upstreamAddr string) { +func (c *ConnInfo) SetUpstream(upstream string) { c.mu.Lock() defer c.mu.Unlock() - c.UpstreamAddr = upstreamAddr + c.Upstream = upstream } func (c *ConnInfo) snapshot() connInfoSnapshot { @@ -96,7 +96,7 @@ func (c *ConnInfo) snapshot() connInfoSnapshot { RemoteAddr: c.RemoteAddr, ConnectedAt: c.ConnectedAt, Module: c.Module, - UpstreamAddr: c.UpstreamAddr, + Upstream: c.Upstream, SentBytes: c.SentBytes.Load(), ReceivedBytes: c.ReceivedBytes.Load(), } @@ -606,7 +606,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err target := targets[chooseTargetByClientIP(net.ParseIP(ip), len(targets))] upstreamAddr := target.Addr useProxyProtocol := target.UseProxyProtocol - info.SetUpstreamAddr(upstreamAddr) + info.SetUpstream(target.Upstream) upstreamQueue, ok := s.getQueueForUpstream(target.Upstream) if !ok { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 9b374aa..23508b9 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -375,7 +375,7 @@ func TestStatusIncludesSelectedUpstream(t *testing.T) { if len(infos) != 1 { return false } - return infos[0].snapshot().UpstreamAddr == upstreamAddr + return infos[0].snapshot().Upstream == "u1" }, time.Second, 10*time.Millisecond) wg.Done() @@ -445,7 +445,7 @@ func TestMetricsIncludesActiveConnections(t *testing.T) { if len(infos) != 1 { return false } - return infos[0].snapshot().UpstreamAddr == upstreamAddr + return infos[0].snapshot().Upstream == "u1" }, time.Second, 10*time.Millisecond) resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics") @@ -458,14 +458,15 @@ func TestMetricsIncludesActiveConnections(t *testing.T) { assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Contains(t, text, "rsync_proxy_active_connections 1\n") - assert.Contains(t, text, fmt.Sprintf("rsync_proxy_active_connections_by_module{module=\"fake\",upstream=%q} 1\n", upstreamAddr)) + assert.Contains(t, text, "rsync_proxy_active_connections_by_module{module=\"fake\",upstream=\"u1\"} 1\n") assert.Contains(t, text, "rsync_proxy_connection_sent_bytes{index=\"") assert.Contains(t, text, "module=\"fake\"") - assert.Contains(t, text, fmt.Sprintf("upstream=%q", upstreamAddr)) + assert.Contains(t, text, "upstream=\"u1\"") assert.Contains(t, text, "rsync_proxy_connection_received_bytes{index=\"") assert.Contains(t, text, "rsync_proxy_connection_connected_timestamp_seconds{index=\"") assert.Contains(t, text, "rsync_proxy_connection_duration_seconds{index=\"") assert.NotContains(t, text, rawConn.LocalAddr().String()) + assert.NotContains(t, text, upstreamAddr) wg.Done() } @@ -655,12 +656,12 @@ func TestPrometheusConnectionGroupingUsesStructuredKey(t *testing.T) { first := &ConnInfo{Index: 1, ConnectedAt: time.Unix(100, 0)} first.Module = "a\xffb" - first.UpstreamAddr = "c" + first.Upstream = "c" srv.connInfo.Store(first.Index, first) second := &ConnInfo{Index: 2, ConnectedAt: time.Unix(100, 0)} second.Module = "a" - second.UpstreamAddr = "b\xffc" + second.Upstream = "b\xffc" srv.connInfo.Store(second.Index, second) var buf bytes.Buffer @@ -676,7 +677,7 @@ func TestPrometheusDurationIncludesFractionalSeconds(t *testing.T) { srv := New() conn := &ConnInfo{Index: 1, ConnectedAt: time.Unix(100, 0)} conn.Module = "fake" - conn.UpstreamAddr = "127.0.0.1:873" + conn.Upstream = "127.0.0.1:873" srv.connInfo.Store(conn.Index, conn) var buf bytes.Buffer