Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions gravity/endpoint_independence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,11 +800,10 @@ func TestSelectStreamForPacket_UnhealthyTunnelsTriggersReconnect(t *testing.T) {
}
}

// TestTunnelOffsetOutOfBounds reproduces the bug where reconnectSingleEndpoint
// computes tunnelOffset = endpointIndex * streamsPerGravity, but the
// tunnelStreams array was sized for fewer endpoints. Endpoints with high
// indices silently skip tunnel creation.
func TestTunnelOffsetOutOfBounds(t *testing.T) {
// TestEnsureTunnelStreamSlotsForAddedEndpoint verifies that reconnecting an
// endpoint discovered after startup has tunnel slots even when the initial
// tunnelStreams slice was sized for fewer endpoints.
func TestEnsureTunnelStreamSlotsForAddedEndpoint(t *testing.T) {
t.Parallel()

// Simulate: 6 endpoints discovered from DNS, but only 3 healthy at
Expand All @@ -825,18 +824,27 @@ func TestTunnelOffsetOutOfBounds(t *testing.T) {
}
}

// Endpoint 5 would need tunnelOffset = 5*2 = 10, but array only has 6 slots.
streamsPerGravity := g.poolConfig.StreamsPerGravity
tunnelOffset := 5 * streamsPerGravity // = 10
outOfBounds := tunnelOffset >= len(g.streamManager.tunnelStreams)
g.ensureTunnelStreamSlots(5, streamsPerGravity)

if !outOfBounds {
t.Fatal("expected tunnel offset 10 to be out of bounds for 6-element array")
requiredSlots := (5 + 1) * streamsPerGravity
if got := len(g.streamManager.tunnelStreams); got < requiredSlots {
t.Fatalf("expected at least %d tunnel stream slots, got %d", requiredSlots, got)
}

// This proves the bug: endpoint 5 can never create tunnels with the
// current sizing. The fix should either resize the array or map
// endpoints to tunnel indices dynamically.
// Existing stream entries must be preserved while new slots are appended.
for i := 0; i < 6; i++ {
if g.streamManager.tunnelStreams[i] == nil || g.streamManager.tunnelStreams[i].streamID != fmt.Sprintf("s%d", i) {
t.Fatalf("existing tunnel stream slot %d was not preserved", i)
}
}

// Endpoint 5 now has addressable slots for reconnectSingleEndpoint to fill.
tunnelOffset := 5 * streamsPerGravity // = 10
outOfBounds := tunnelOffset >= len(g.streamManager.tunnelStreams)
if outOfBounds {
t.Fatal("expected tunnel offset 10 to be in bounds after ensuring slots")
}
}

// TestTriggerEndpointReconnectByURL verifies that the URL-based reconnect
Expand Down
16 changes: 16 additions & 0 deletions gravity/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,21 @@ func (g *GravityClient) rebuildEndpointStreamIndices() {
g.mu.Unlock()
}

func (g *GravityClient) ensureTunnelStreamSlots(endpointIndex, streamsPerGravity int) {
if endpointIndex < 0 {
return
}
if streamsPerGravity <= 0 {
streamsPerGravity = DefaultStreamsPerGravity
}
required := (endpointIndex + 1) * streamsPerGravity
g.streamManager.tunnelMu.Lock()
for len(g.streamManager.tunnelStreams) < required {
g.streamManager.tunnelStreams = append(g.streamManager.tunnelStreams, nil)
}
g.streamManager.tunnelMu.Unlock()
}

func (g *GravityClient) refreshEndpointHealth() {
g.endpointsMu.RLock()
endpoints := make([]*GravityEndpoint, len(g.endpoints))
Expand Down Expand Up @@ -4770,6 +4785,7 @@ accepted:
if streamsPerGravity <= 0 {
streamsPerGravity = DefaultStreamsPerGravity
}
g.ensureTunnelStreamSlots(endpointIndex, streamsPerGravity)
tunnelOffset := endpointIndex * streamsPerGravity

streamsToStart := make([]struct {
Expand Down
Loading