Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion gravity/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,9 +841,17 @@ func (g *GravityClient) startMultiEndpoint() error {
g.mu.Unlock()
return ErrNoGravityFound
}
g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(urls)
primaryURL := g.url
hasDiscoveryResolver := g.discoveryResolveFunc != nil
g.mu.Unlock()

var discoveryURLs []string
discoveryResolved := false
if shouldResolveForPeerDiscoveryDisableCheck(urls, primaryURL, hasDiscoveryResolver) {
discoveryURLs, discoveryResolved = g.resolvePeerDiscoveryURLs()
}
g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(urls, primaryURL, hasDiscoveryResolver, discoveryURLs, discoveryResolved)

g.logger.Info("multi-endpoint mode: connecting to %d Gravity servers: %v", len(urls), urls)

g.stopPeerDiscovery()
Expand Down Expand Up @@ -1212,6 +1220,61 @@ func allGravityURLsAreDirectIPs(urls []string) bool {
return hasURL
}

func sameGravityURLSet(a, b []string) bool {
normalize := func(in []string) map[string]struct{} {
out := make(map[string]struct{}, len(in))
for _, raw := range in {
u := strings.TrimSpace(raw)
if u == "" {
continue
}
out[u] = struct{}{}
}
return out
}

aa := normalize(a)
bb := normalize(b)
if len(aa) == 0 || len(aa) != len(bb) {
return false
}
for u := range aa {
if _, ok := bb[u]; !ok {
return false
}
}
return true
}

func shouldResolveForPeerDiscoveryDisableCheck(urls []string, primaryURL string, hasDiscoveryResolver bool) bool {
return allGravityURLsAreDirectIPs(urls) && !isDirectIPGravityURL(primaryURL) && hasDiscoveryResolver
}

func shouldDisablePeerDiscoveryForResolvedURLs(urls []string, primaryURL string, hasDiscoveryResolver bool, discoveryURLs []string, discoveryResolved bool) bool {
if !allGravityURLsAreDirectIPs(urls) {
return false
}

// A direct IP primary URL is an explicit operator override. Do not discover
// away from it.
if isDirectIPGravityURL(primaryURL) {
return true
}

if !hasDiscoveryResolver {
return true
}

if !discoveryResolved {
return true
}

// Hadron may resolve DNS before constructing the Gravity client, which
// means DNS-derived peers arrive here as direct IP URLs. Keep peer discovery
// enabled when the resolver confirms this URL set came from discovery.
return !sameGravityURLSet(urls, discoveryURLs)
}

func isDirectIPGravityURL(raw string) bool {
raw = strings.TrimSpace(raw)
if raw == "" {
Expand Down Expand Up @@ -5149,6 +5212,10 @@ func (g *GravityClient) resolvePeerDiscoveryURLs() ([]string, bool) {
timeout := g.discoveryResolveTimeout
g.discoveryMu.Unlock()

if g.discoveryResolveFunc == nil {
return nil, false
}

if ctx == nil {
ctx = g.ctx
}
Expand Down
204 changes: 202 additions & 2 deletions gravity/peer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,124 @@ func TestAllGravityURLsAreDirectIPs(t *testing.T) {
}
}

func TestShouldDisablePeerDiscoveryForResolvedURLs(t *testing.T) {
tests := []struct {
name string
url string
urls []string
hasResolver bool
discoveryURLs []string
discoveryResolved bool
want bool
}{
{
name: "explicit primary IP disables discovery",
url: "grpc://10.0.0.1:443",
urls: []string{"grpc://10.0.0.1:443"},
hasResolver: true,
discoveryURLs: []string{"grpc://10.0.0.1:443"},
discoveryResolved: true,
want: true,
},
{
name: "static direct IP URLs without resolver disable discovery",
url: "grpc://gravity.example.com:443",
urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"},
want: true,
},
{
name: "static direct IP URLs with different resolver output disable discovery",
url: "grpc://gravity.example.com:443",
urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"},
hasResolver: true,
discoveryURLs: []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"},
discoveryResolved: true,
want: true,
},
{
name: "DNS discovered direct IP URLs keep discovery enabled",
url: "grpc://gravity.example.com:443",
urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"},
hasResolver: true,
discoveryURLs: []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"},
discoveryResolved: true,
want: false,
},
{
name: "resolver timeout disables discovery conservatively",
url: "grpc://gravity.example.com:443",
urls: []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"},
hasResolver: true,
discoveryResolved: false,
want: true,
},
{
name: "hostnames keep discovery enabled",
url: "grpc://gravity.example.com:443",
urls: []string{"grpc://gravity-a.example.com:443", "grpc://gravity-b.example.com:443"},
hasResolver: true,
discoveryURLs: []string{"grpc://gravity-a.example.com:443", "grpc://gravity-b.example.com:443"},
discoveryResolved: true,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := shouldDisablePeerDiscoveryForResolvedURLs(tt.urls, tt.url, tt.hasResolver, tt.discoveryURLs, tt.discoveryResolved); got != tt.want {
t.Fatalf("shouldDisablePeerDiscoveryForResolvedURLs() = %v, want %v", got, tt.want)
}
})
}
}

func TestShouldResolveForPeerDiscoveryDisableCheck(t *testing.T) {
tests := []struct {
name string
urls []string
primaryURL string
hasResolver bool
want bool
}{
{
name: "DNS-derived direct IP candidate needs bounded resolve",
urls: []string{"grpc://10.0.0.1:443"},
primaryURL: "grpc://gravity.example.com:443",
hasResolver: true,
want: true,
},
{
name: "explicit primary direct IP skips resolve",
urls: []string{"grpc://10.0.0.1:443"},
primaryURL: "grpc://10.0.0.1:443",
hasResolver: true,
want: false,
},
{
name: "hostnames skip resolve",
urls: []string{"grpc://gravity-a.example.com:443"},
primaryURL: "grpc://gravity.example.com:443",
hasResolver: true,
want: false,
},
{
name: "missing resolver skips resolve",
urls: []string{"grpc://10.0.0.1:443"},
primaryURL: "grpc://gravity.example.com:443",
hasResolver: false,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := shouldResolveForPeerDiscoveryDisableCheck(tt.urls, tt.primaryURL, tt.hasResolver); got != tt.want {
t.Fatalf("shouldResolveForPeerDiscoveryDisableCheck() = %v, want %v", got, tt.want)
}
})
}
}

// ---------- pickRandomURL ----------

func TestPickRandomURL_SingleElement(t *testing.T) {
Expand Down Expand Up @@ -441,6 +559,39 @@ func TestCheckPeerDiscovery_StaleURLReplacedWhenResolvedCountMatchesConnectedCou
}
}

func TestCheckPeerDiscovery_DNSDiscoveredDirectIPAddsEndpointBelowCapacity(t *testing.T) {
connected := []string{
"grpc://10.0.0.1:443",
"grpc://10.0.0.2:443",
}
g := newTestGravityClient(nil, connected)
defer g.cancel()
g.discoveryResolveFunc = func() []string {
return []string{
"grpc://10.0.0.1:443",
"grpc://10.0.0.2:443",
"grpc://10.0.0.3:443",
}
}
g.poolConfig.MaxGravityPeers = 3

g.checkPeerDiscovery(2 * time.Hour)

g.endpointsMu.RLock()
urls := make(map[string]bool)
for _, ep := range g.endpoints {
urls[ep.URL] = true
}
g.endpointsMu.RUnlock()

if !urls["grpc://10.0.0.3:443"] {
t.Fatalf("expected DNS-discovered direct IP endpoint to be added, got %v", urls)
}
if len(urls) != 3 {
t.Fatalf("expected 3 endpoints after discovery, got %d", len(urls))
}
}

func TestCheckPeerDiscovery_NewURLDiscovered(t *testing.T) {
// Connected to g1, g2, g3. DNS returns g1, g2, g3, g4.
// More URLs available than connected -- should cycle one endpoint.
Expand Down Expand Up @@ -871,16 +1022,17 @@ func TestCleanup_CancelsPeerDiscoveryLoop(t *testing.T) {
}
}

func TestStartPeerDiscovery_DisabledForDirectIPGravityURLs(t *testing.T) {
func TestStartPeerDiscovery_DisabledForExplicitDirectIPGravityURLs(t *testing.T) {
g := newTestGravityClient([]string{"grpc://10.0.0.1:443"}, []string{"grpc://10.0.0.1:443"})
defer g.cancel()

g.url = "grpc://10.0.0.1:443"
g.peerDiscoveryWake = make(chan struct{}, 1)
g.discoveryResolveFunc = func() []string {
t.Fatal("discovery resolver should not be called for direct IP Gravity URLs")
return nil
}
g.peerDiscoveryDisabled = allGravityURLsAreDirectIPs(g.gravityURLs)
g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, nil, false)

g.startPeerDiscovery()

Expand All @@ -894,6 +1046,54 @@ func TestStartPeerDiscovery_DisabledForDirectIPGravityURLs(t *testing.T) {
}
}

func TestStartPeerDiscovery_DisabledForStaticDirectIPGravityURLs(t *testing.T) {
g := newTestGravityClient([]string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}, []string{"grpc://10.0.0.1:443"})
defer g.cancel()

g.url = "grpc://gravity.example.com:443"
g.peerDiscoveryWake = make(chan struct{}, 1)
g.discoveryResolveFunc = func() []string {
return []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"}
}
g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, []string{"grpc://10.0.0.3:443", "grpc://10.0.0.4:443"}, true)

g.startPeerDiscovery()

g.discoveryMu.Lock()
discoveryCtx := g.discoveryCtx
discoveryDone := g.discoveryDone
g.discoveryMu.Unlock()

if discoveryCtx != nil || discoveryDone != nil {
t.Fatal("expected peer discovery loop not to start for static direct IP Gravity URLs")
}
}

func TestStartPeerDiscovery_EnabledForDNSDiscoveredDirectIPGravityURLs(t *testing.T) {
urls := []string{"grpc://10.0.0.1:443", "grpc://10.0.0.2:443"}
g := newTestGravityClient(urls, urls)
defer g.cancel()

g.url = "grpc://gravity.example.com:443"
g.peerDiscoveryWake = make(chan struct{}, 1)
g.discoveryResolveFunc = func() []string {
return []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"}
}
g.peerDiscoveryDisabled = shouldDisablePeerDiscoveryForResolvedURLs(g.gravityURLs, g.url, g.discoveryResolveFunc != nil, []string{"grpc://10.0.0.2:443", "grpc://10.0.0.1:443"}, true)

g.startPeerDiscovery()
defer g.stopPeerDiscovery()

g.discoveryMu.Lock()
discoveryCtx := g.discoveryCtx
discoveryDone := g.discoveryDone
g.discoveryMu.Unlock()

if discoveryCtx == nil || discoveryDone == nil {
t.Fatal("expected peer discovery loop to start for DNS-discovered direct IP Gravity URLs")
}
}

func TestCleanup_CancelsBlockedPeerDiscoveryResolve(t *testing.T) {
g := newTestGravityClient([]string{
"grpc://g1.example.com",
Expand Down
Loading