diff --git a/gravity/grpc_client.go b/gravity/grpc_client.go index d4b647f..acd1897 100644 --- a/gravity/grpc_client.go +++ b/gravity/grpc_client.go @@ -841,9 +841,17 @@ func (g *GravityClient) startMultiEndpoint() error { g.mu.Unlock() return ErrNoGravityFound } - g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(urls) + primaryURL := g.url + hasDiscoveryResolver := g.discoveryResolveFunc != nil g.mu.Unlock() + var discoveryURLs []string + discoveryResolved := false + if shouldResolveForPeerDiscoveryDisableCheck(urls, primaryURL, hasDiscoveryResolver) { + discoveryURLs, discoveryResolved = g.resolvePeerDiscoveryURLs() + } + g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(urls, primaryURL, hasDiscoveryResolver, discoveryURLs, discoveryResolved) + g.logger.Info("multi-endpoint mode: connecting to %d Gravity servers: %v", len(urls), urls) g.stopPeerDiscovery() @@ -1212,6 +1220,61 @@ func allGravityURLsAreDirectIPs(urls []string) bool { return hasURL } +func sameGravityURLSet(a, b []string) bool { + normalize := func(in []string) map[string]struct{} { + out := make(map[string]struct{}, len(in)) + for _, raw := range in { + u := strings.TrimSpace(raw) + if u == "" { + continue + } + out[u] = struct{}{} + } + return out + } + + aa := normalize(a) + bb := normalize(b) + if len(aa) == 0 || len(aa) != len(bb) { + return false + } + for u := range aa { + if _, ok := bb[u]; !ok { + return false + } + } + return true +} + +func shouldResolveForPeerDiscoveryDisableCheck(urls []string, primaryURL string, hasDiscoveryResolver bool) bool { + return allGravityURLsAreDirectIPs(urls) && !isDirectIPGravityURL(primaryURL) && hasDiscoveryResolver +} + +func shouldDisablePeerDiscoveryForResolvedURLs(urls []string, primaryURL string, hasDiscoveryResolver bool, discoveryURLs []string, discoveryResolved bool) bool { + if !allGravityURLsAreDirectIPs(urls) { + return false + } + + // A direct IP primary URL is an explicit operator override. Do not discover + // away from it. + if isDirectIPGravityURL(primaryURL) { + return true + } + + if !hasDiscoveryResolver { + return true + } + + if !discoveryResolved { + return true + } + + // Hadron may resolve DNS before constructing the Gravity client, which + // means DNS-derived peers arrive here as direct IP URLs. Keep peer discovery + // enabled when the resolver confirms this URL set came from discovery. + return !sameGravityURLSet(urls, discoveryURLs) +} + func isDirectIPGravityURL(raw string) bool { raw = strings.TrimSpace(raw) if raw == "" { @@ -5149,6 +5212,10 @@ func (g *GravityClient) resolvePeerDiscoveryURLs() ([]string, bool) { timeout := g.discoveryResolveTimeout g.discoveryMu.Unlock() + if g.discoveryResolveFunc == nil { + return nil, false + } + if ctx == nil { ctx = g.ctx } diff --git a/gravity/peer_discovery_test.go b/gravity/peer_discovery_test.go index c73a0f9..3943ff2 100644 --- a/gravity/peer_discovery_test.go +++ b/gravity/peer_discovery_test.go @@ -250,6 +250,124 @@ func TestAllGravityURLsAreDirectIPs(t *testing.T) { } } +func TestShouldDisablePeerDiscoveryForResolvedURLs(t *testing.T) { + tests := []struct { + name string + url string + urls []string + hasResolver bool + discoveryURLs []string + discoveryResolved bool + want bool + }{ + { + name: "explicit primary IP disables discovery", + url: "grpc://10.0.0.1:443", + urls: []string{"grpc://10.0.0.1:443"}, + hasResolver: true, + discoveryURLs: []string{"grpc://10.0.0.1:443"}, + discoveryResolved: true, + want: true, + }, + { + name: "static direct IP URLs without resolver disable discovery", + url: "grpc://gravity.example.com:443", + urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, + want: true, + }, + { + name: "static direct IP URLs with different resolver output disable discovery", + url: "grpc://gravity.example.com:443", + urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, + hasResolver: true, + discoveryURLs: []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"}, + discoveryResolved: true, + want: true, + }, + { + name: "DNS discovered direct IP URLs keep discovery enabled", + url: "grpc://gravity.example.com:443", + urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, + hasResolver: true, + discoveryURLs: []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"}, + discoveryResolved: true, + want: false, + }, + { + name: "resolver timeout disables discovery conservatively", + url: "grpc://gravity.example.com:443", + urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, + hasResolver: true, + discoveryResolved: false, + want: true, + }, + { + name: "hostnames keep discovery enabled", + url: "grpc://gravity.example.com:443", + urls: []string{"grpc://gravity-a.example.com:443", "grpc://gravity-b.example.com:443"}, + hasResolver: true, + discoveryURLs: []string{"grpc://gravity-a.example.com:443", "grpc://gravity-b.example.com:443"}, + discoveryResolved: true, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := shouldDisablePeerDiscoveryForResolvedURLs(tt.urls, tt.url, tt.hasResolver, tt.discoveryURLs, tt.discoveryResolved); got != tt.want { + t.Fatalf("shouldDisablePeerDiscoveryForResolvedURLs() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestShouldResolveForPeerDiscoveryDisableCheck(t *testing.T) { + tests := []struct { + name string + urls []string + primaryURL string + hasResolver bool + want bool + }{ + { + name: "DNS-derived direct IP candidate needs bounded resolve", + urls: []string{"grpc://10.0.0.1:443"}, + primaryURL: "grpc://gravity.example.com:443", + hasResolver: true, + want: true, + }, + { + name: "explicit primary direct IP skips resolve", + urls: []string{"grpc://10.0.0.1:443"}, + primaryURL: "grpc://10.0.0.1:443", + hasResolver: true, + want: false, + }, + { + name: "hostnames skip resolve", + urls: []string{"grpc://gravity-a.example.com:443"}, + primaryURL: "grpc://gravity.example.com:443", + hasResolver: true, + want: false, + }, + { + name: "missing resolver skips resolve", + urls: []string{"grpc://10.0.0.1:443"}, + primaryURL: "grpc://gravity.example.com:443", + hasResolver: false, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := shouldResolveForPeerDiscoveryDisableCheck(tt.urls, tt.primaryURL, tt.hasResolver); got != tt.want { + t.Fatalf("shouldResolveForPeerDiscoveryDisableCheck() = %v, want %v", got, tt.want) + } + }) + } +} + // ---------- pickRandomURL ---------- func TestPickRandomURL_SingleElement(t *testing.T) { @@ -441,6 +559,39 @@ func TestCheckPeerDiscovery_StaleURLReplacedWhenResolvedCountMatchesConnectedCou } } +func TestCheckPeerDiscovery_DNSDiscoveredDirectIPAddsEndpointBelowCapacity(t *testing.T) { + connected := []string{ + "grpc://10.0.0.1:443", + "grpc://10.0.0.2:443", + } + g := newTestGravityClient(nil, connected) + defer g.cancel() + g.discoveryResolveFunc = func() []string { + return []string{ + "grpc://10.0.0.1:443", + "grpc://10.0.0.2:443", + "grpc://10.0.0.3:443", + } + } + g.poolConfig.MaxGravityPeers = 3 + + g.checkPeerDiscovery(2 * time.Hour) + + g.endpointsMu.RLock() + urls := make(map[string]bool) + for _, ep := range g.endpoints { + urls[ep.URL] = true + } + g.endpointsMu.RUnlock() + + if !urls["grpc://10.0.0.3:443"] { + t.Fatalf("expected DNS-discovered direct IP endpoint to be added, got %v", urls) + } + if len(urls) != 3 { + t.Fatalf("expected 3 endpoints after discovery, got %d", len(urls)) + } +} + func TestCheckPeerDiscovery_NewURLDiscovered(t *testing.T) { // Connected to g1, g2, g3. DNS returns g1, g2, g3, g4. // More URLs available than connected -- should cycle one endpoint. @@ -871,16 +1022,17 @@ func TestCleanup_CancelsPeerDiscoveryLoop(t *testing.T) { } } -func TestStartPeerDiscovery_DisabledForDirectIPGravityURLs(t *testing.T) { +func TestStartPeerDiscovery_DisabledForExplicitDirectIPGravityURLs(t *testing.T) { g := newTestGravityClient([]string{"grpc://10.0.0.1:443"}, []string{"grpc://10.0.0.1:443"}) defer g.cancel() + g.url = "grpc://10.0.0.1:443" g.peerDiscoveryWake = make(chan struct{}, 1) g.discoveryResolveFunc = func() []string { t.Fatal("discovery resolver should not be called for direct IP Gravity URLs") return nil } - g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(g.gravityURLs) + g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, nil, false) g.startPeerDiscovery() @@ -894,6 +1046,54 @@ func TestStartPeerDiscovery_DisabledForDirectIPGravityURLs(t *testing.T) { } } +func TestStartPeerDiscovery_DisabledForStaticDirectIPGravityURLs(t *testing.T) { + g := newTestGravityClient([]string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, []string{"grpc://10.0.0.1:443"}) + defer g.cancel() + + g.url = "grpc://gravity.example.com:443" + g.peerDiscoveryWake = make(chan struct{}, 1) + g.discoveryResolveFunc = func() []string { + return []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"} + } + g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"}, true) + + g.startPeerDiscovery() + + g.discoveryMu.Lock() + discoveryCtx := g.discoveryCtx + discoveryDone := g.discoveryDone + g.discoveryMu.Unlock() + + if discoveryCtx != nil || discoveryDone != nil { + t.Fatal("expected peer discovery loop not to start for static direct IP Gravity URLs") + } +} + +func TestStartPeerDiscovery_EnabledForDNSDiscoveredDirectIPGravityURLs(t *testing.T) { + urls := []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"} + g := newTestGravityClient(urls, urls) + defer g.cancel() + + g.url = "grpc://gravity.example.com:443" + g.peerDiscoveryWake = make(chan struct{}, 1) + g.discoveryResolveFunc = func() []string { + return []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"} + } + g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"}, true) + + g.startPeerDiscovery() + defer g.stopPeerDiscovery() + + g.discoveryMu.Lock() + discoveryCtx := g.discoveryCtx + discoveryDone := g.discoveryDone + g.discoveryMu.Unlock() + + if discoveryCtx == nil || discoveryDone == nil { + t.Fatal("expected peer discovery loop to start for DNS-discovered direct IP Gravity URLs") + } +} + func TestCleanup_CancelsBlockedPeerDiscoveryResolve(t *testing.T) { g := newTestGravityClient([]string{ "grpc://g1.example.com",