diff --git a/gravity/endpoint_independence_test.go b/gravity/endpoint_independence_test.go index 20a4d11..4a8d35b 100644 --- a/gravity/endpoint_independence_test.go +++ b/gravity/endpoint_independence_test.go @@ -800,11 +800,10 @@ func TestSelectStreamForPacket_UnhealthyTunnelsTriggersReconnect(t *testing.T) { } } -// TestTunnelOffsetOutOfBounds reproduces the bug where reconnectSingleEndpoint -// computes tunnelOffset = endpointIndex * streamsPerGravity, but the -// tunnelStreams array was sized for fewer endpoints. Endpoints with high -// indices silently skip tunnel creation. -func TestTunnelOffsetOutOfBounds(t *testing.T) { +// TestEnsureTunnelStreamSlotsForAddedEndpoint verifies that reconnecting an +// endpoint discovered after startup has tunnel slots even when the initial +// tunnelStreams slice was sized for fewer endpoints. +func TestEnsureTunnelStreamSlotsForAddedEndpoint(t *testing.T) { t.Parallel() // Simulate: 6 endpoints discovered from DNS, but only 3 healthy at @@ -825,18 +824,27 @@ func TestTunnelOffsetOutOfBounds(t *testing.T) { } } - // Endpoint 5 would need tunnelOffset = 5*2 = 10, but array only has 6 slots. streamsPerGravity := g.poolConfig.StreamsPerGravity - tunnelOffset := 5 * streamsPerGravity // = 10 - outOfBounds := tunnelOffset >= len(g.streamManager.tunnelStreams) + g.ensureTunnelStreamSlots(5, streamsPerGravity) - if !outOfBounds { - t.Fatal("expected tunnel offset 10 to be out of bounds for 6-element array") + requiredSlots := (5 + 1) * streamsPerGravity + if got := len(g.streamManager.tunnelStreams); got < requiredSlots { + t.Fatalf("expected at least %d tunnel stream slots, got %d", requiredSlots, got) } - // This proves the bug: endpoint 5 can never create tunnels with the - // current sizing. The fix should either resize the array or map - // endpoints to tunnel indices dynamically. + // Existing stream entries must be preserved while new slots are appended. + for i := 0; i < 6; i++ { + if g.streamManager.tunnelStreams[i] == nil || g.streamManager.tunnelStreams[i].streamID != fmt.Sprintf("s%d", i) { + t.Fatalf("existing tunnel stream slot %d was not preserved", i) + } + } + + // Endpoint 5 now has addressable slots for reconnectSingleEndpoint to fill. + tunnelOffset := 5 * streamsPerGravity // = 10 + outOfBounds := tunnelOffset >= len(g.streamManager.tunnelStreams) + if outOfBounds { + t.Fatal("expected tunnel offset 10 to be in bounds after ensuring slots") + } } // TestTriggerEndpointReconnectByURL verifies that the URL-based reconnect diff --git a/gravity/grpc_client.go b/gravity/grpc_client.go index acd1897..6274a28 100644 --- a/gravity/grpc_client.go +++ b/gravity/grpc_client.go @@ -2027,6 +2027,21 @@ func (g *GravityClient) rebuildEndpointStreamIndices() { g.mu.Unlock() } +func (g *GravityClient) ensureTunnelStreamSlots(endpointIndex, streamsPerGravity int) { + if endpointIndex < 0 { + return + } + if streamsPerGravity <= 0 { + streamsPerGravity = DefaultStreamsPerGravity + } + required := (endpointIndex + 1) * streamsPerGravity + g.streamManager.tunnelMu.Lock() + for len(g.streamManager.tunnelStreams) < required { + g.streamManager.tunnelStreams = append(g.streamManager.tunnelStreams, nil) + } + g.streamManager.tunnelMu.Unlock() +} + func (g *GravityClient) refreshEndpointHealth() { g.endpointsMu.RLock() endpoints := make([]*GravityEndpoint, len(g.endpoints)) @@ -4770,6 +4785,7 @@ accepted: if streamsPerGravity <= 0 { streamsPerGravity = DefaultStreamsPerGravity } + g.ensureTunnelStreamSlots(endpointIndex, streamsPerGravity) tunnelOffset := endpointIndex * streamsPerGravity streamsToStart := make([]struct {