Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error {
Index int `json:"index"`
RemoteAddr string `json:"remote"`
Module string `json:"module"`
UpstreamAddr string `json:"upstream"`
Upstream string `json:"upstream"`
ConnectedAt time.Time `json:"connected"`
ReceivedBytes int64 `json:"receivedBytes"`
SentBytes int64 `json:"sentBytes"`
Expand Down Expand Up @@ -125,7 +125,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error {
tw.AlignRight, // Index
tw.AlignRight, // RemoteAddr
tw.AlignDefault, // Module
tw.AlignRight, // UpstreamAddr
tw.AlignDefault, // Upstream
tw.AlignDefault, // ConnectedAt
tw.AlignRight, // ReceivedBytes
tw.AlignRight, // SentBytes
Expand All @@ -137,7 +137,7 @@ func SendConnectionsRequest(addr string, stdout, stderr io.Writer) error {
strconv.Itoa(conn.Index),
conn.RemoteAddr,
conn.Module,
conn.UpstreamAddr,
conn.Upstream,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个看起来会改变 rsync-proxy connections 输出的表格内容,虽然我觉得问题不大

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个其实挺好的,我一直都想这么做

conn.ConnectedAt.Format(time.DateTime),
strconv.FormatInt(conn.ReceivedBytes, 10),
strconv.FormatInt(conn.SentBytes, 10),
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
snapshot := conn.snapshot()
key := prometheusConnectionGroup{
module: prometheusLabelValueOrUnknown(snapshot.Module),
upstream: prometheusLabelValueOrUnknown(snapshot.UpstreamAddr),
upstream: prometheusLabelValueOrUnknown(snapshot.Upstream),
}
Comment on lines 110 to 114
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented in the PR description: this is intentionally a breaking change to the per-connection metrics (active_connections_by_module, connection_*). The rationale is that the conflicting upstream semantics introduced in #30 vs #34 cannot be joined in PromQL, and #30 only landed in v0.1.5 with no broad public uptake yet, so cleaning it up now has a low blast radius. An additive migration with a parallel upstream_id label would freeze the inconsistency and double the metric footprint in every existing dashboard query — keeping the simpler unified label seems the better trade-off here. Happy to revisit if maintainers prefer the additive approach.

connectionCounts[key]++
}
Expand Down Expand Up @@ -138,21 +138,21 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_sent_bytes gauge")
for _, conn := range connections {
snapshot := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.SentBytes)
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.SentBytes)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_received_bytes Bytes received from clients for active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_received_bytes gauge")
for _, conn := range connections {
snapshot := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.ReceivedBytes)
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.ReceivedBytes)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_connected_timestamp_seconds Unix timestamp when active connections were established.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_connected_timestamp_seconds gauge")
for _, conn := range connections {
snapshot := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), snapshot.ConnectedAt.Unix())
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), snapshot.ConnectedAt.Unix())
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_duration_seconds Current duration of active connections.")
Expand All @@ -163,7 +163,7 @@ func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
if duration < 0 {
duration = 0
}
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.3f\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.UpstreamAddr), duration)
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.3f\n", prometheusLabels(snapshot.Index, snapshot.Module, snapshot.Upstream), duration)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_accepted_connections_total Total accepted connections since start.")
Expand Down
12 changes: 6 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ConnInfo struct {
RemoteAddr string
ConnectedAt time.Time
Module string
UpstreamAddr string
Upstream string
SentBytes atomic.Int64
Comment on lines 59 to 63
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name Upstream is deliberately aligned with the existing Target.Upstream field on the same struct, the [upstreams.<NAME>] config table, and the upstream Prometheus label / JSON tag. Renaming the Go field to UpstreamID while keeping the JSON tag and metric label as upstream would create a Go-vs-wire mismatch, which seems worse than the current naming. The struct already exposes the resolved address separately as Target.Addr for callers that need it.

ReceivedBytes atomic.Int64
}
Expand All @@ -70,7 +70,7 @@ type connInfoSnapshot struct {
RemoteAddr string `json:"remote"`
ConnectedAt time.Time `json:"connected"`
Module string `json:"module"`
UpstreamAddr string `json:"upstream"`
Upstream string `json:"upstream"`
SentBytes int64 `json:"sentBytes"`
ReceivedBytes int64 `json:"receivedBytes"`
}
Expand All @@ -81,10 +81,10 @@ func (c *ConnInfo) SetModule(module string) {
c.Module = module
}

func (c *ConnInfo) SetUpstreamAddr(upstreamAddr string) {
func (c *ConnInfo) SetUpstream(upstream string) {
c.mu.Lock()
defer c.mu.Unlock()
c.UpstreamAddr = upstreamAddr
c.Upstream = upstream
}

func (c *ConnInfo) snapshot() connInfoSnapshot {
Expand All @@ -96,7 +96,7 @@ func (c *ConnInfo) snapshot() connInfoSnapshot {
RemoteAddr: c.RemoteAddr,
ConnectedAt: c.ConnectedAt,
Module: c.Module,
UpstreamAddr: c.UpstreamAddr,
Upstream: c.Upstream,
SentBytes: c.SentBytes.Load(),
ReceivedBytes: c.ReceivedBytes.Load(),
}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err
target := targets[chooseTargetByClientIP(net.ParseIP(ip), len(targets))]
upstreamAddr := target.Addr
useProxyProtocol := target.UseProxyProtocol
info.SetUpstreamAddr(upstreamAddr)
info.SetUpstream(target.Upstream)

upstreamQueue, ok := s.getQueueForUpstream(target.Upstream)
if !ok {
Expand Down
15 changes: 8 additions & 7 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestStatusIncludesSelectedUpstream(t *testing.T) {
if len(infos) != 1 {
return false
}
return infos[0].snapshot().UpstreamAddr == upstreamAddr
return infos[0].snapshot().Upstream == "u1"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The literal "u1" is the established convention across the existing test suite (see e.g. TestPerUpstreamQueueIsolation, TestQueueFullRejectsConnection, TestMetricsCountsQueueFullRejection); the new assertions just match it. Centralizing it in a single constant could make sense but is a separate, repo-wide refactor that would change a lot of unrelated tests, so leaving it consistent with the surrounding tests in this PR.

}, time.Second, 10*time.Millisecond)

wg.Done()
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestMetricsIncludesActiveConnections(t *testing.T) {
if len(infos) != 1 {
return false
}
return infos[0].snapshot().UpstreamAddr == upstreamAddr
return infos[0].snapshot().Upstream == "u1"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment on line 378 — sticking with the existing "u1" convention used across the existing test file.

}, time.Second, 10*time.Millisecond)

resp, err := testHTTPClient().Get("http://" + srv.HTTPListener.Addr().String() + "/metrics")
Expand All @@ -458,14 +458,15 @@ func TestMetricsIncludesActiveConnections(t *testing.T) {

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Contains(t, text, "rsync_proxy_active_connections 1\n")
assert.Contains(t, text, fmt.Sprintf("rsync_proxy_active_connections_by_module{module=\"fake\",upstream=%q} 1\n", upstreamAddr))
assert.Contains(t, text, "rsync_proxy_active_connections_by_module{module=\"fake\",upstream=\"u1\"} 1\n")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment on line 378 — sticking with the existing "u1" convention used across the existing test file.

assert.Contains(t, text, "rsync_proxy_connection_sent_bytes{index=\"")
assert.Contains(t, text, "module=\"fake\"")
assert.Contains(t, text, fmt.Sprintf("upstream=%q", upstreamAddr))
assert.Contains(t, text, "upstream=\"u1\"")
assert.Contains(t, text, "rsync_proxy_connection_received_bytes{index=\"")
assert.Contains(t, text, "rsync_proxy_connection_connected_timestamp_seconds{index=\"")
assert.Contains(t, text, "rsync_proxy_connection_duration_seconds{index=\"")
assert.NotContains(t, text, rawConn.LocalAddr().String())
assert.NotContains(t, text, upstreamAddr)

wg.Done()
}
Expand Down Expand Up @@ -655,12 +656,12 @@ func TestPrometheusConnectionGroupingUsesStructuredKey(t *testing.T) {

first := &ConnInfo{Index: 1, ConnectedAt: time.Unix(100, 0)}
first.Module = "a\xffb"
first.UpstreamAddr = "c"
first.Upstream = "c"
srv.connInfo.Store(first.Index, first)

second := &ConnInfo{Index: 2, ConnectedAt: time.Unix(100, 0)}
second.Module = "a"
second.UpstreamAddr = "b\xffc"
second.Upstream = "b\xffc"
srv.connInfo.Store(second.Index, second)

var buf bytes.Buffer
Expand All @@ -676,7 +677,7 @@ func TestPrometheusDurationIncludesFractionalSeconds(t *testing.T) {
srv := New()
conn := &ConnInfo{Index: 1, ConnectedAt: time.Unix(100, 0)}
conn.Module = "fake"
conn.UpstreamAddr = "127.0.0.1:873"
conn.Upstream = "127.0.0.1:873"
srv.connInfo.Store(conn.Index, conn)

var buf bytes.Buffer
Expand Down
Loading