From cfbfbd7bf678a8a7a2cdc333da14632aa54eacc4 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Tue, 5 May 2026 16:26:08 -0400 Subject: [PATCH 1/2] bgpstatus: replace netlink collector with gNMI Get Replace the Linux-specific netlink-based BGP session collector with a gNMI Get call to Arista's local gNMI server (/var/run/gnmiServer.sock). - Remove NamespaceCollector, vrfNamespaces(), and namespace switching logic; add BGPCollector interface (no namespace parameter) - Add GNMIClient interface and GNMICollector() that queries all BGP neighbors across all network-instances in a single Get request - Simplify per-user peer IP matching: check both IPs in the /31 tunnel net instead of finding the local tunnel interface first - Remove //go:build linux build tag; code is now platform-agnostic - Consolidate submitter_linux_test.go into submitter_test.go; add tests for parseEstablished, neighborAddress, and tick() with gNMI collector --- controlplane/telemetry/cmd/telemetry/main.go | 28 +- .../telemetry/internal/bgpstatus/bgpstatus.go | 103 +--- .../telemetry/internal/bgpstatus/submitter.go | 199 ++++---- .../bgpstatus/submitter_linux_test.go | 288 ----------- .../internal/bgpstatus/submitter_test.go | 446 +++++++++++++----- 5 files changed, 496 insertions(+), 568 deletions(-) delete mode 100644 controlplane/telemetry/internal/bgpstatus/submitter_linux_test.go diff --git a/controlplane/telemetry/cmd/telemetry/main.go b/controlplane/telemetry/cmd/telemetry/main.go index 58e03dfef..2eb71c6dc 100644 --- a/controlplane/telemetry/cmd/telemetry/main.go +++ b/controlplane/telemetry/cmd/telemetry/main.go @@ -17,6 +17,8 @@ import ( "github.com/malbeclabs/doublezero/config" "github.com/malbeclabs/doublezero/controlplane/agent/pkg/arista" aristapb "github.com/malbeclabs/doublezero/controlplane/proto/arista/gen/pb-go/arista/EosSdkRpc" + gpb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/bgpstatus" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/gnmitunnel" @@ -379,7 +381,7 @@ func main() { var bgpStatusErrCh <-chan error if *bgpStatusEnable { bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK, - serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient) + serviceabilityProgramID, cachedSvcClient, rpcClient) } // Wait for the context to be done or an error to be returned. @@ -412,20 +414,36 @@ func startBGPStatusSubmitter( keypair solana.PrivateKey, localDevicePK solana.PublicKey, serviceabilityProgramID solana.PublicKey, - localNet netutil.LocalNet, - bgpNamespace string, svcClient telemetrysvc.ProgramDataProvider, rpcClient *solanarpc.Client, ) <-chan error { executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID) + const gnmiSocketPath = "/var/run/gnmiServer.sock" + gnmiConn, err := grpc.NewClient( + "passthrough:///"+gnmiSocketPath, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(dialCtx context.Context, _ string) (net.Conn, error) { + if *managementNamespace != "" { + return netns.RunInNamespace(*managementNamespace, func() (net.Conn, error) { + return (&net.Dialer{}).DialContext(dialCtx, "unix", gnmiSocketPath) + }) + } + return (&net.Dialer{}).DialContext(dialCtx, "unix", gnmiSocketPath) + }), + ) + if err != nil { + log.Error("failed to create gNMI client connection for BGP status", "error", err) + os.Exit(1) + } + gnmiClient := gpb.NewGNMIClient(gnmiConn) + sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{ Log: log, Executor: executor, ServiceabilityClient: svcClient, - Collector: bgpstatus.DefaultCollector(localNet), + Collector: bgpstatus.GNMICollector(gnmiClient), LocalDevicePK: localDevicePK, - BGPNamespace: bgpNamespace, Interval: *bgpStatusInterval, PeriodicRefreshInterval: *bgpStatusRefreshInterval, DownGracePeriod: *bgpStatusDownGracePeriod, diff --git a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go index 6dabf34e7..f66f9c04a 100644 --- a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go +++ b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go @@ -6,14 +6,11 @@ import ( "fmt" "log/slog" "net" - "strconv" - "strings" "sync" "time" "github.com/gagliardetto/solana-go" "github.com/jonboulle/clockwork" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" ) @@ -35,20 +32,18 @@ type ServiceabilityClient interface { GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) } -// NamespaceCollector collects BGP session state and local network interfaces -// from a single Linux VRF network namespace. It returns the set of remote IP -// strings with ESTABLISHED BGP sessions, the local interfaces, and any error. -// Implement with DefaultCollector for production; use a mock in tests. -type NamespaceCollector func(ctx context.Context, namespace string) (established map[string]struct{}, ifaces []netutil.Interface, err error) +// BGPCollector returns the set of remote IP strings with ESTABLISHED BGP +// sessions, across all network instances visible to the device. It is called +// once per tick. Implement with GNMICollector for production; use a mock in tests. +type BGPCollector func(ctx context.Context) (established map[string]struct{}, err error) // Config holds all parameters for the BGP status submitter. type Config struct { Log *slog.Logger Executor BGPStatusExecutor ServiceabilityClient ServiceabilityClient - Collector NamespaceCollector + Collector BGPCollector LocalDevicePK solana.PublicKey - BGPNamespace string Interval time.Duration // default: 60s PeriodicRefreshInterval time.Duration // default: 6h DownGracePeriod time.Duration // default: 0 @@ -71,9 +66,6 @@ func (c *Config) validate() error { if c.LocalDevicePK.IsZero() { return errors.New("local device pubkey is required") } - if c.BGPNamespace == "" { - return errors.New("bgp namespace is required") - } if c.Interval <= 0 { c.Interval = defaultInterval } @@ -99,7 +91,7 @@ type submitTask struct { status serviceability.BGPStatus } -// Submitter collects BGP socket state on each tick, determines per-user BGP +// Submitter collects BGP session state on each tick, determines per-user BGP // status, and submits SetUserBGPStatus onchain via a non-blocking worker. type Submitter struct { cfg Config @@ -125,8 +117,7 @@ func NewSubmitter(cfg Config) (*Submitter, error) { } // Start launches the submitter in the background and returns a channel that -// receives a fatal error (or is closed on clean shutdown). It mirrors the -// state.Collector.Start pattern. +// receives a fatal error (or is closed on clean shutdown). func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error { errCh := make(chan error, 1) go func() { @@ -141,8 +132,8 @@ func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan } // userStateFor returns or creates the per-user tracking entry (caller must hold s.mu). -// initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so -// that a restarted submitter correctly handles users whose onchain state is already Up. +// initialStatus seeds lastOnchainStatus so a restarted submitter correctly handles +// users whose onchain state is already Up. func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState { us, ok := s.userState[key] if !ok { @@ -152,35 +143,27 @@ func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPSta return us } -// bgpSocket is the minimal BGP socket representation used by the pure helpers. -// The Linux-specific submitter.go converts state.BGPSocketState to this type. -type bgpSocket struct { - RemoteIP string - State string -} - -// --- Pure helpers (no Linux syscalls; fully testable on all platforms) --- - -// buildEstablishedIPSet returns a set of remote IP strings for BGP sessions -// that are currently in the ESTABLISHED state. -func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} { - m := make(map[string]struct{}, len(sockets)) - for _, sock := range sockets { - if sock.State == "ESTABLISHED" { - m[sock.RemoteIP] = struct{}{} - } - } - return m -} +// --- Pure helpers (no platform-specific code; fully testable on all platforms) --- // tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a -// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. +// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. func tunnelNetToIPNet(b [5]byte) *net.IPNet { ip := net.IPv4(b[0], b[1], b[2], b[3]) mask := net.CIDRMask(int(b[4]), 32) return &net.IPNet{IP: ip.To4(), Mask: mask} } +// peerIPsFor31 returns both host IPs in a /31 network. Since tunnel IPs are +// globally unique (onchain-allocated), exactly one of the two will be the +// BGP neighbor address for a given user on this device. +func peerIPsFor31(tunnelNet *net.IPNet) (net.IP, net.IP) { + ip0 := tunnelNet.IP.To4() + ip1 := make(net.IP, 4) + copy(ip1, ip0) + ip1[3] ^= 1 + return ip0, ip1 +} + // computeEffectiveStatus derives the BGP status to report, applying the down // grace period: if observedUp is false but the user was last seen Up within // gracePeriod, we still report Up to avoid transient flaps. @@ -202,48 +185,6 @@ func computeEffectiveStatus( return serviceability.BGPStatusDown } -// rootNamespace is the sentinel passed to DefaultCollector / RunInNamespace -// to indicate the root (global) Linux network namespace. Arista EOS places -// the default VRF in the root namespace rather than a named namespace under -// /var/run/netns/, so there is no file to open with netns.GetFromName. -// RunInNamespace treats "" as "execute in the current namespace" (no switching). -const rootNamespace = "" - -// vrfNamespaces builds the list of Linux network namespaces to check for BGP -// sockets and tunnel interfaces. The base namespace is always included first. -// Additional namespaces are derived from two sources: -// - Tenant VRF IDs (non-zero): replaces the trailing numeric suffix of base -// (e.g. "ns-vrf1") with each tenant's VrfId, giving e.g. "ns-vrf2". -// - Multicast users: GRE tunnels for multicast users live in the global VRF -// (the root network namespace), not in a per-tenant namespace. rootNamespace -// is appended if any user in the provided slice has UserTypeMulticast. -func vrfNamespaces(base string, tenants []serviceability.Tenant, users []serviceability.User) []string { - prefix := strings.TrimRight(base, "0123456789") - seen := map[string]struct{}{base: {}} - nss := []string{base} - for _, t := range tenants { - if t.VrfId == 0 { - continue - } - ns := prefix + strconv.FormatUint(uint64(t.VrfId), 10) - if _, ok := seen[ns]; !ok { - seen[ns] = struct{}{} - nss = append(nss, ns) - } - } - // Multicast users' GRE tunnels live in the root network namespace (global VRF). - if _, ok := seen[rootNamespace]; !ok { - for _, u := range users { - if u.UserType == serviceability.UserTypeMulticast { - seen[rootNamespace] = struct{}{} - nss = append(nss, rootNamespace) - break - } - } - } - return nss -} - // shouldSubmit returns true when a submission is warranted: either the status // has changed from what was last confirmed onchain, or it is time for a // periodic keepalive write. diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go index a90f76e08..a982b4cd1 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -1,20 +1,106 @@ -//go:build linux - package bgpstatus import ( "context" - "errors" + "encoding/json" "fmt" "time" "github.com/gagliardetto/solana-go" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" + gpb "github.com/openconfig/gnmi/proto/gnmi" + "google.golang.org/grpc" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" ) +// GNMIClient is a minimal interface satisfied by gpb.GNMIClient (the generated +// gNMI gRPC client). Using the interface keeps GNMICollector testable without a +// live gRPC server. +type GNMIClient interface { + Get(ctx context.Context, req *gpb.GetRequest, opts ...grpc.CallOption) (*gpb.GetResponse, error) +} + +// bgpNeighborsGetRequest fetches all BGP neighbor state across all network instances. +var bgpNeighborsGetRequest = &gpb.GetRequest{ + Path: []*gpb.Path{ + { + Elem: []*gpb.PathElem{ + {Name: "network-instances"}, + {Name: "network-instance", Key: map[string]string{"name": "*"}}, + {Name: "protocols"}, + {Name: "protocol", Key: map[string]string{"identifier": "BGP", "name": "BGP"}}, + {Name: "bgp"}, + {Name: "neighbors"}, + {Name: "neighbor", Key: map[string]string{"neighbor-address": "*"}}, + {Name: "state"}, + }, + }, + }, + Type: gpb.GetRequest_STATE, + Encoding: gpb.Encoding_JSON_IETF, +} + +// GNMICollector returns a BGPCollector that reads BGP neighbor session state via +// a gNMI Get to the Arista device's local gNMI server. It reports all neighbors +// whose session-state is ESTABLISHED across every network instance. +func GNMICollector(client GNMIClient) BGPCollector { + return func(ctx context.Context) (map[string]struct{}, error) { + resp, err := client.Get(ctx, bgpNeighborsGetRequest) + if err != nil { + return nil, fmt.Errorf("gNMI Get BGP neighbors: %w", err) + } + return parseEstablished(resp), nil + } +} + +// bgpStateJSON extracts session-state from a gNMI JSON IETF update value. +type bgpStateJSON struct { + SessionState string `json:"openconfig-network-instance:session-state"` +} + +// parseEstablished returns the set of neighbor-address strings whose +// session-state is ESTABLISHED in the gNMI GetResponse. +func parseEstablished(resp *gpb.GetResponse) map[string]struct{} { + established := make(map[string]struct{}) + for _, notif := range resp.GetNotification() { + prefix := notif.GetPrefix() + for _, update := range notif.GetUpdate() { + addr := neighborAddress(prefix, update.GetPath()) + if addr == "" { + continue + } + jsonVal := update.GetVal().GetJsonIetfVal() + if len(jsonVal) == 0 { + continue + } + var state bgpStateJSON + if err := json.Unmarshal(jsonVal, &state); err != nil { + continue + } + if state.SessionState == "ESTABLISHED" { + established[addr] = struct{}{} + } + } + } + return established +} + +// neighborAddress extracts the neighbor-address key from the gNMI path elements, +// checking both the notification prefix and the update path. +func neighborAddress(prefix, path *gpb.Path) string { + for _, elem := range prefix.GetElem() { + if elem.GetName() == "neighbor" { + return elem.GetKey()["neighbor-address"] + } + } + for _, elem := range path.GetElem() { + if elem.GetName() == "neighbor" { + return elem.GetKey()["neighbor-address"] + } + } + return "" +} + // run starts the background worker goroutine, then drives the tick loop, // running an immediate first tick before waiting for the ticker. func (s *Submitter) run(ctx context.Context) error { @@ -35,34 +121,9 @@ func (s *Submitter) run(ctx context.Context) error { } } -// DefaultCollector returns a NamespaceCollector that collects BGP socket stats -// via netlink and local interfaces via Linux namespace switching. -func DefaultCollector(localNet netutil.LocalNet) NamespaceCollector { - return func(ctx context.Context, namespace string) (map[string]struct{}, []netutil.Interface, error) { - rawSockets, err := state.GetBGPSocketStatsInNamespace(ctx, namespace) - if err != nil { - return nil, nil, fmt.Errorf("bgp sockets in %s: %w", namespace, err) - } - socks := make([]bgpSocket, len(rawSockets)) - for i, rs := range rawSockets { - socks[i] = bgpSocket{RemoteIP: rs.RemoteIP, State: rs.State} - } - established := buildEstablishedIPSet(socks) - - ifaces, err := netns.RunInNamespace(namespace, func() ([]netutil.Interface, error) { - return localNet.Interfaces() - }) - if err != nil { - return nil, nil, fmt.Errorf("interfaces in %s: %w", namespace, err) - } - return established, ifaces, nil - } -} - -// tick collects BGP socket state, fetches activated users for this device, -// maps each user to their tunnel peer IP, determines Up/Down status (with -// grace period), and enqueues submission tasks for users whose status needs -// updating. +// tick fetches activated users for this device, calls the BGPCollector once to +// get all ESTABLISHED sessions, maps each user's /31 tunnel net to their peer +// IP, and enqueues submission tasks for users whose status needs updating. func (s *Submitter) tick(ctx context.Context) { programData, err := s.cfg.ServiceabilityClient.GetProgramData(ctx) if err != nil { @@ -70,9 +131,6 @@ func (s *Submitter) tick(ctx context.Context) { return } - // Pre-collect activated users for this device. This is needed both to - // derive the full namespace set (multicast users require the root namespace) and to - // drive the per-user status loop below. var deviceUsers []serviceability.User for _, u := range programData.Users { if u.Status == serviceability.UserStatusActivated && @@ -81,32 +139,9 @@ func (s *Submitter) tick(ctx context.Context) { } } - // User tunnel interfaces live in a per-tenant VRF namespace (e.g. ns-vrf1, - // ns-vrf2). Multicast users are an exception: their tunnels live in the - // global VRF (root network namespace). Collect state from all relevant namespaces so that - // all user types are handled correctly. - // Tunnel IPs are globally unique (onchain-allocated), so merging is safe. - namespaces := vrfNamespaces(s.cfg.BGPNamespace, programData.Tenants, deviceUsers) - - establishedIPs := make(map[string]struct{}) - var interfaces []netutil.Interface - successCount := 0 - - for _, ns := range namespaces { - established, ifaces, err := s.cfg.Collector(ctx, ns) - if err != nil { - s.log.Warn("bgpstatus: failed to collect namespace state", "namespace", ns, "error", err) - continue - } - for ip := range established { - establishedIPs[ip] = struct{}{} - } - interfaces = append(interfaces, ifaces...) - successCount++ - } - - if successCount == 0 { - s.log.Error("bgpstatus: failed to collect state from all namespaces", "namespaces", namespaces) + establishedIPs, err := s.cfg.Collector(ctx) + if err != nil { + s.log.Error("bgpstatus: failed to collect BGP state", "error", err) return } @@ -122,33 +157,21 @@ func (s *Submitter) tick(ctx context.Context) { activeUserKeys[userPK] = struct{}{} // Seed lastOnchainStatus from the ledger on first observation (e.g. after - // a daemon restart) so a disappeared tunnel correctly transitions to Down + // a daemon restart) so a disappeared session correctly transitions to Down // rather than being skipped because Unknown != Up. us := s.userStateFor(userPK, serviceability.BGPStatus(user.BgpStatus)) - // Resolve the BGP peer IP for this user's /31 tunnel net. + // A /31 tunnel net has two host IPs: the device-side and the peer-side. + // We check both because we don't know which end the device holds; only + // one can appear as a BGP neighbor on this device. tunnelNet := tunnelNetToIPNet(user.TunnelNet) - var observedUp bool - tunnel, err := netutil.FindLocalTunnel(interfaces, tunnelNet) - if err != nil { - if !errors.Is(err, netutil.ErrLocalTunnelNotFound) { - s.log.Warn("bgpstatus: unexpected error finding tunnel", "user", userPK, "error", err) - continue - } - s.log.Debug("bgpstatus: tunnel not found for user", "user", userPK) - // Without a tunnel, the BGP session cannot be established. - // If the last known onchain status was already Down (or never written), - // there is nothing to update — skip this user. - if us.lastOnchainStatus != serviceability.BGPStatusUp { - continue - } - // The tunnel is gone but the last known onchain status is Up. - // Fall through with observedUp=false so we submit Down. - } else { - _, observedUp = establishedIPs[tunnel.TargetIP.String()] - if observedUp { - us.lastUpObservedAt = now - } + ip0, ip1 := peerIPsFor31(tunnelNet) + _, up0 := establishedIPs[ip0.String()] + _, up1 := establishedIPs[ip1.String()] + observedUp := up0 || up1 + + if observedUp { + us.lastUpObservedAt = now } effectiveStatus := computeEffectiveStatus(observedUp, us, now, s.cfg.DownGracePeriod) @@ -157,7 +180,6 @@ func (s *Submitter) tick(ctx context.Context) { continue } - // Skip if a submission for this user is already in-flight. if s.pending[userPK] { s.log.Debug("bgpstatus: submission already in-flight, skipping", "user", userPK) continue @@ -174,7 +196,6 @@ func (s *Submitter) tick(ctx context.Context) { // Prune userState entries for users no longer activated on this device to // prevent unbounded memory growth as users come and go. - // Also clear pending flags so a reactivated user is not permanently blocked. for pk := range s.userState { if _, active := activeUserKeys[pk]; !active { delete(s.userState, pk) @@ -217,7 +238,7 @@ func (s *Submitter) worker(ctx context.Context) { } // submitWithRetry attempts the onchain write up to submitMaxRetries times with -// exponential backoff. It returns early if the context is cancelled. +// exponential backoff. It returns early if the context is cancelled. func (s *Submitter) submitWithRetry(ctx context.Context, task submitTask) (solana.Signature, error) { update := serviceability.UserBGPStatusUpdate{ UserPubkey: solana.PublicKeyFromBytes(task.user.PubKey[:]), diff --git a/controlplane/telemetry/internal/bgpstatus/submitter_linux_test.go b/controlplane/telemetry/internal/bgpstatus/submitter_linux_test.go deleted file mode 100644 index 9b7f2b341..000000000 --- a/controlplane/telemetry/internal/bgpstatus/submitter_linux_test.go +++ /dev/null @@ -1,288 +0,0 @@ -//go:build linux - -package bgpstatus - -import ( - "context" - "errors" - "net" - "testing" - "time" - - "github.com/gagliardetto/solana-go" - "github.com/jonboulle/clockwork" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" - "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" -) - -// staticCollector returns a NamespaceCollector that serves fixed data per namespace. -func staticCollector( - established map[string]map[string]struct{}, - ifaces map[string][]netutil.Interface, -) NamespaceCollector { - return func(_ context.Context, ns string) (map[string]struct{}, []netutil.Interface, error) { - return established[ns], ifaces[ns], nil - } -} - -// errCollector returns a NamespaceCollector that always fails. -func errCollector(err error) NamespaceCollector { - return func(_ context.Context, _ string) (map[string]struct{}, []netutil.Interface, error) { - return nil, nil, err - } -} - -// makeInterface builds a netutil.Interface with one IPv4 /31 address, so that -// FindLocalTunnel can locate it and derive the peer IP. -// cidr must be in host form, e.g. "10.0.2.0/31". -func makeInterface(name, cidr string) netutil.Interface { - ip, ipnet, err := net.ParseCIDR(cidr) - if err != nil { - panic(err) - } - ipnet.IP = ip.To4() // host IP, not network address - return netutil.Interface{Name: name, Addrs: []net.Addr{ipnet}} -} - -// makeActivatedUser returns a User activated on devicePK with the given /31 tunnelNet. -func makeActivatedUser(devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { - u := serviceability.User{} - copy(u.DevicePubKey[:], devicePK[:]) - userPK := solana.NewWallet().PublicKey() - copy(u.PubKey[:], userPK[:]) - u.TunnelNet = tunnelNet - u.Status = serviceability.UserStatusActivated - return u -} - -// makeMulticastUser returns a multicast User activated on devicePK with the given /31 tunnelNet. -func makeMulticastUser(devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { - u := makeActivatedUser(devicePK, tunnelNet) - u.UserType = serviceability.UserTypeMulticast - return u -} - -// ============================================================ -// tick() – multi-namespace collection -// ============================================================ - -// TestTick_SingleNamespace_SubmitsDown verifies baseline: when the collector -// returns no interfaces (tunnel not found) and status was previously Up, -// tick enqueues a Down submission. -func TestTick_SingleNamespace_SubmitsDown(t *testing.T) { - devicePK := solana.NewWallet().PublicKey() - // tunnelNet 10.0.0.0/31 - tunnelNet := [5]byte{10, 0, 0, 0, 31} - user := makeActivatedUser(devicePK, tunnelNet) - userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - - exec := &mockExecutor{} - clk := clockwork.NewFakeClock() - svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} - - // No tunnel interface → BGP is Down. - col := staticCollector( - map[string]map[string]struct{}{"ns-vrf1": {}}, - map[string][]netutil.Interface{"ns-vrf1": nil}, - ) - - s := newTestSubmitter(t, clk, exec, svc, col, devicePK, 0, 6*time.Hour) - - // Seed the user's state as Up so a Down transition is warranted. - s.mu.Lock() - s.userState[userPK] = &userState{lastOnchainStatus: serviceability.BGPStatusUp} - s.mu.Unlock() - - ctx := context.Background() - s.tick(ctx) - - s.mu.Lock() - enqueued := len(s.taskCh) - s.mu.Unlock() - - if enqueued != 1 { - t.Fatalf("expected 1 task enqueued, got %d", enqueued) - } - task := <-s.taskCh - if task.status != serviceability.BGPStatusDown { - t.Errorf("expected Down task, got %v", task.status) - } -} - -// TestTick_MultiNamespace_UserInSecondVrf verifies that a user whose tunnel -// lives in ns-vrf2 is found when the collector serves it there. -func TestTick_MultiNamespace_UserInSecondVrf(t *testing.T) { - devicePK := solana.NewWallet().PublicKey() - tunnelNet := [5]byte{10, 0, 2, 0, 31} // 10.0.2.0/31 - - user := makeActivatedUser(devicePK, tunnelNet) - userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - - exec := &mockExecutor{} - clk := clockwork.NewFakeClock() - - // Tenant with VrfId=2 causes vrfNamespaces to include ns-vrf2. - tenant := serviceability.Tenant{} - tenant.VrfId = 2 - svc := &mockSvcClient{data: &serviceability.ProgramData{ - Users: []serviceability.User{user}, - Tenants: []serviceability.Tenant{tenant}, - }} - - // The tunnel (10.0.2.0/31) lives in ns-vrf2 with an ESTABLISHED BGP session. - iface := makeInterface("tu500", "10.0.2.0/31") - col := staticCollector( - map[string]map[string]struct{}{ - "ns-vrf1": {}, - "ns-vrf2": {"10.0.2.1": {}}, // peer IP is ESTABLISHED - }, - map[string][]netutil.Interface{ - "ns-vrf1": nil, - "ns-vrf2": {iface}, - }, - ) - - s := newTestSubmitter(t, clk, exec, svc, col, devicePK, 0, 6*time.Hour) - s.tick(context.Background()) - - s.mu.Lock() - enqueued := len(s.taskCh) - s.mu.Unlock() - - if enqueued != 1 { - t.Fatalf("expected 1 task enqueued, got %d", enqueued) - } - task := <-s.taskCh - if task.status != serviceability.BGPStatusUp { - t.Errorf("expected Up task, got %v", task.status) - } - if solana.PublicKeyFromBytes(task.user.PubKey[:]).String() != userPK { - t.Errorf("unexpected user in task") - } -} - -// TestTick_MultiNamespace_PartialFailure verifies that when one namespace -// collection fails, tick continues and processes users in the remaining -// namespaces instead of aborting. -func TestTick_MultiNamespace_PartialFailure(t *testing.T) { - devicePK := solana.NewWallet().PublicKey() - // User whose tunnel is in ns-vrf1 (the working namespace). - tunnelNet := [5]byte{10, 0, 1, 0, 31} - user := makeActivatedUser(devicePK, tunnelNet) - userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - - exec := &mockExecutor{} - clk := clockwork.NewFakeClock() - - tenant := serviceability.Tenant{} - tenant.VrfId = 2 - svc := &mockSvcClient{data: &serviceability.ProgramData{ - Users: []serviceability.User{user}, - Tenants: []serviceability.Tenant{tenant}, - }} - - // ns-vrf2 fails; ns-vrf1 has the tunnel and an established session. - iface := makeInterface("tu501", "10.0.1.0/31") - col := func(_ context.Context, ns string) (map[string]struct{}, []netutil.Interface, error) { - if ns == "ns-vrf2" { - return nil, nil, errors.New("namespace unreachable") - } - return map[string]struct{}{"10.0.1.1": {}}, []netutil.Interface{iface}, nil - } - - s := newTestSubmitter(t, clk, exec, svc, col, devicePK, 0, 6*time.Hour) - s.tick(context.Background()) - - s.mu.Lock() - enqueued := len(s.taskCh) - s.mu.Unlock() - - if enqueued != 1 { - t.Fatalf("expected 1 task enqueued, got %d", enqueued) - } - task := <-s.taskCh - if task.status != serviceability.BGPStatusUp { - t.Errorf("expected Up task, got %v", task.status) - } - if solana.PublicKeyFromBytes(task.user.PubKey[:]).String() != userPK { - t.Errorf("unexpected user in task") - } -} - -// TestTick_MulticastUser_UsesRootNamespace verifies that a multicast user whose -// tunnel lives in the root network namespace (global VRF) is found and reported -// Up. No tenant VRF is needed; the multicast user type alone causes the root -// namespace collector to be invoked. -func TestTick_MulticastUser_UsesRootNamespace(t *testing.T) { - devicePK := solana.NewWallet().PublicKey() - tunnelNet := [5]byte{10, 0, 3, 0, 31} // 10.0.3.0/31 - - user := makeMulticastUser(devicePK, tunnelNet) - userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - - exec := &mockExecutor{} - clk := clockwork.NewFakeClock() - svc := &mockSvcClient{data: &serviceability.ProgramData{ - Users: []serviceability.User{user}, - }} - - // The tunnel (10.0.3.0/31) lives in the root namespace (global VRF) with an ESTABLISHED BGP session. - iface := makeInterface("tu500", "10.0.3.0/31") - col := staticCollector( - map[string]map[string]struct{}{ - "ns-vrf1": {}, - rootNamespace: {"10.0.3.1": {}}, // peer IP is ESTABLISHED in the global VRF - }, - map[string][]netutil.Interface{ - "ns-vrf1": nil, - rootNamespace: {iface}, - }, - ) - - s := newTestSubmitter(t, clk, exec, svc, col, devicePK, 0, 6*time.Hour) - s.tick(context.Background()) - - s.mu.Lock() - enqueued := len(s.taskCh) - s.mu.Unlock() - - if enqueued != 1 { - t.Fatalf("expected 1 task enqueued, got %d", enqueued) - } - task := <-s.taskCh - if task.status != serviceability.BGPStatusUp { - t.Errorf("expected Up task, got %v", task.status) - } - if solana.PublicKeyFromBytes(task.user.PubKey[:]).String() != userPK { - t.Errorf("unexpected user in task") - } -} - -// TestTick_MultiNamespace_AllFail verifies that when every namespace fails, -// tick aborts and enqueues no tasks. -func TestTick_MultiNamespace_AllFail(t *testing.T) { - devicePK := solana.NewWallet().PublicKey() - user := makeActivatedUser(devicePK, [5]byte{10, 0, 0, 0, 31}) - userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - - exec := &mockExecutor{} - clk := clockwork.NewFakeClock() - svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} - - s := newTestSubmitter(t, clk, exec, svc, errCollector(errors.New("all broken")), devicePK, 0, 6*time.Hour) - - // Pre-seed as Up so we know a Down transition would be attempted if tick ran. - s.mu.Lock() - s.userState[userPK] = &userState{lastOnchainStatus: serviceability.BGPStatusUp} - s.mu.Unlock() - - s.tick(context.Background()) - - s.mu.Lock() - enqueued := len(s.taskCh) - s.mu.Unlock() - - if enqueued != 0 { - t.Errorf("expected no tasks when all namespaces fail, got %d", enqueued) - } -} diff --git a/controlplane/telemetry/internal/bgpstatus/submitter_test.go b/controlplane/telemetry/internal/bgpstatus/submitter_test.go index 461b15af4..b77f5e674 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter_test.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter_test.go @@ -2,8 +2,10 @@ package bgpstatus import ( "context" + "encoding/json" "errors" "log/slog" + "net" "sync" "sync/atomic" "testing" @@ -11,8 +13,8 @@ import ( "github.com/gagliardetto/solana-go" "github.com/jonboulle/clockwork" - "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + gpb "github.com/openconfig/gnmi/proto/gnmi" ) // --- mock executor --- @@ -64,11 +66,11 @@ func makeUser(pubkey solana.PublicKey, devicePK solana.PublicKey, tunnelNet [5]b return u } -// noopCollector returns a NamespaceCollector that always succeeds with empty results. +// noopCollector returns a BGPCollector that always succeeds with empty results. // Used by tests that never call tick() and only exercise the worker. -func noopCollector() NamespaceCollector { - return func(_ context.Context, _ string) (map[string]struct{}, []netutil.Interface, error) { - return nil, nil, nil +func noopCollector() BGPCollector { + return func(_ context.Context) (map[string]struct{}, error) { + return nil, nil } } @@ -78,7 +80,7 @@ func newTestSubmitter( clk clockwork.Clock, exec BGPStatusExecutor, svcClient ServiceabilityClient, - collector NamespaceCollector, + collector BGPCollector, devicePK solana.PublicKey, gracePeriod time.Duration, refreshInterval time.Duration, @@ -93,7 +95,6 @@ func newTestSubmitter( ServiceabilityClient: svcClient, Collector: collector, LocalDevicePK: devicePK, - BGPNamespace: "ns-vrf1", Interval: time.Hour, // irrelevant; tests call tick() directly PeriodicRefreshInterval: refreshInterval, DownGracePeriod: gracePeriod, @@ -118,35 +119,6 @@ func (tw testWriter) Write(p []byte) (int, error) { return len(p), nil } -// ============================================================ -// buildEstablishedIPSet -// ============================================================ - -func TestBuildEstablishedIPSet_OnlyEstablished(t *testing.T) { - sockets := []bgpSocket{ - {RemoteIP: "10.0.0.1", State: "ESTABLISHED"}, - {RemoteIP: "10.0.0.2", State: "TIME_WAIT"}, - {RemoteIP: "10.0.0.3", State: "ESTABLISHED"}, - } - got := buildEstablishedIPSet(sockets) - if _, ok := got["10.0.0.1"]; !ok { - t.Error("expected 10.0.0.1 in set") - } - if _, ok := got["10.0.0.3"]; !ok { - t.Error("expected 10.0.0.3 in set") - } - if _, ok := got["10.0.0.2"]; ok { - t.Error("did not expect 10.0.0.2 (TIME_WAIT) in set") - } -} - -func TestBuildEstablishedIPSet_Empty(t *testing.T) { - got := buildEstablishedIPSet(nil /* []bgpSocket */) - if len(got) != 0 { - t.Errorf("expected empty set, got %d entries", len(got)) - } -} - // ============================================================ // tunnelNetToIPNet // ============================================================ @@ -258,75 +230,37 @@ func TestShouldSubmit_PeriodicRefresh(t *testing.T) { } // ============================================================ -// vrfNamespaces +// peerIPsFor31 // ============================================================ -func TestVrfNamespaces_NoTenants(t *testing.T) { - nss := vrfNamespaces("ns-vrf1", nil, nil) - if len(nss) != 1 || nss[0] != "ns-vrf1" { - t.Errorf("expected [ns-vrf1], got %v", nss) - } -} - -func TestVrfNamespaces_SameVrf(t *testing.T) { - tenants := []serviceability.Tenant{{VrfId: 1}} - nss := vrfNamespaces("ns-vrf1", tenants, nil) - if len(nss) != 1 || nss[0] != "ns-vrf1" { - t.Errorf("expected [ns-vrf1], got %v", nss) - } -} - -func TestVrfNamespaces_AdditionalVrf(t *testing.T) { - tenants := []serviceability.Tenant{{VrfId: 1}, {VrfId: 2}} - nss := vrfNamespaces("ns-vrf1", tenants, nil) - if len(nss) != 2 || nss[0] != "ns-vrf1" || nss[1] != "ns-vrf2" { - t.Errorf("expected [ns-vrf1 ns-vrf2], got %v", nss) - } -} - -func TestVrfNamespaces_Deduplication(t *testing.T) { - tenants := []serviceability.Tenant{{VrfId: 2}, {VrfId: 2}, {VrfId: 2}} - nss := vrfNamespaces("ns-vrf1", tenants, nil) - if len(nss) != 2 { - t.Errorf("expected 2 unique namespaces, got %v", nss) - } -} - -func TestVrfNamespaces_SkipsZeroVrfId(t *testing.T) { - tenants := []serviceability.Tenant{{VrfId: 0}, {VrfId: 3}} - nss := vrfNamespaces("ns-vrf1", tenants, nil) - if len(nss) != 2 || nss[0] != "ns-vrf1" || nss[1] != "ns-vrf3" { - t.Errorf("expected [ns-vrf1 ns-vrf3], got %v", nss) - } -} - -func TestVrfNamespaces_MulticastUserAddsRootNamespace(t *testing.T) { - users := []serviceability.User{{UserType: serviceability.UserTypeMulticast}} - nss := vrfNamespaces("ns-vrf1", nil, users) - if len(nss) != 2 || nss[0] != "ns-vrf1" || nss[1] != rootNamespace { - t.Errorf("expected [ns-vrf1 %q], got %v", rootNamespace, nss) - } -} - -func TestVrfNamespaces_NonMulticastUserNoRootNamespace(t *testing.T) { - users := []serviceability.User{{UserType: serviceability.UserTypeIBRL}} - nss := vrfNamespaces("ns-vrf1", nil, users) - if len(nss) != 1 || nss[0] != "ns-vrf1" { - t.Errorf("expected [ns-vrf1], got %v", nss) +func TestPeerIPsFor31(t *testing.T) { + cases := []struct { + cidr string + ip0 string + ip1 string + }{ + {"10.0.0.0/31", "10.0.0.0", "10.0.0.1"}, + {"10.0.0.1/31", "10.0.0.1", "10.0.0.0"}, + {"192.168.1.10/31", "192.168.1.10", "192.168.1.11"}, } -} - -func TestVrfNamespaces_MulticastAndTenantVrfs(t *testing.T) { - tenants := []serviceability.Tenant{{VrfId: 2}} - users := []serviceability.User{{UserType: serviceability.UserTypeMulticast}} - nss := vrfNamespaces("ns-vrf1", tenants, users) - if len(nss) != 3 || nss[0] != "ns-vrf1" || nss[1] != "ns-vrf2" || nss[2] != rootNamespace { - t.Errorf("expected [ns-vrf1 ns-vrf2 %q], got %v", rootNamespace, nss) + for _, tc := range cases { + ip, ipnet, err := net.ParseCIDR(tc.cidr) + if err != nil { + t.Fatalf("ParseCIDR(%q): %v", tc.cidr, err) + } + ipnet.IP = ip.To4() + a, b := peerIPsFor31(ipnet) + got := map[string]struct{}{a.String(): {}, b.String(): {}} + for _, want := range []string{tc.ip0, tc.ip1} { + if _, ok := got[want]; !ok { + t.Errorf("cidr=%s: expected %s in result, got %s and %s", tc.cidr, want, a, b) + } + } } } // ============================================================ -// Worker retry behaviour (integration-style, no Linux syscalls) +// Worker retry behaviour (integration-style, no syscalls) // ============================================================ // workerTestSetup creates a submitter and pre-populates it with a task @@ -539,12 +473,11 @@ func TestNewSubmitter_MissingFields(t *testing.T) { name string cfg Config }{ - {"no log", Config{Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, - {"no executor", Config{Log: slog.Default(), ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, - {"no svc client", Config{Log: slog.Default(), Executor: &mockExecutor{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, - {"no collector", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, - {"zero device pk", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), BGPNamespace: "ns-vrf1"}}, - {"no namespace", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey()}}, + {"no log", Config{Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey()}}, + {"no executor", Config{Log: slog.Default(), ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey()}}, + {"no svc client", Config{Log: slog.Default(), Executor: &mockExecutor{}, Collector: noopCollector(), LocalDevicePK: solana.NewWallet().PublicKey()}}, + {"no collector", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalDevicePK: solana.NewWallet().PublicKey()}}, + {"zero device pk", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, Collector: noopCollector()}}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { @@ -569,7 +502,6 @@ func TestTaskChannel_DropWhenFull(t *testing.T) { ServiceabilityClient: &mockSvcClient{data: &serviceability.ProgramData{}}, Collector: noopCollector(), LocalDevicePK: devicePK, - BGPNamespace: "ns-vrf1", Interval: time.Hour, PeriodicRefreshInterval: 6 * time.Hour, Clock: clockwork.NewFakeClock(), @@ -605,3 +537,307 @@ func TestTaskChannel_DropWhenFull(t *testing.T) { t.Error("expected drop when channel full") } } + +// ============================================================ +// Helpers for tick() and parseEstablished tests +// ============================================================ + +// fixedCollector returns a BGPCollector that serves a fixed established set. +func fixedCollector(established map[string]struct{}) BGPCollector { + return func(_ context.Context) (map[string]struct{}, error) { + return established, nil + } +} + +// makeActivatedUser returns a User activated on devicePK with the given /31 tunnelNet. +func makeActivatedUser(devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { + u := serviceability.User{} + copy(u.DevicePubKey[:], devicePK[:]) + userPK := solana.NewWallet().PublicKey() + copy(u.PubKey[:], userPK[:]) + u.TunnelNet = tunnelNet + u.Status = serviceability.UserStatusActivated + return u +} + +// makeMulticastUser returns a multicast User activated on devicePK with the given /31 tunnelNet. +func makeMulticastUser(devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { + u := makeActivatedUser(devicePK, tunnelNet) + u.UserType = serviceability.UserTypeMulticast + return u +} + +// ============================================================ +// tick() – BGP session detection via BGPCollector +// ============================================================ + +func TestTick_BGPUp(t *testing.T) { + devicePK := solana.NewWallet().PublicKey() + user := makeActivatedUser(devicePK, [5]byte{10, 0, 0, 0, 31}) + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + exec := &mockExecutor{} + clk := clockwork.NewFakeClock() + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + + col := fixedCollector(map[string]struct{}{"10.0.0.1": {}}) + s := newTestSubmitter(t, clk, exec, svc, col, devicePK, 0, 6*time.Hour) + s.tick(context.Background()) + + s.mu.Lock() + enqueued := len(s.taskCh) + s.mu.Unlock() + + if enqueued != 1 { + t.Fatalf("expected 1 task enqueued, got %d", enqueued) + } + task := <-s.taskCh + if task.status != serviceability.BGPStatusUp { + t.Errorf("expected Up task, got %v", task.status) + } + if solana.PublicKeyFromBytes(task.user.PubKey[:]).String() != userPK { + t.Errorf("unexpected user in task") + } +} + +// TestTick_BGPUp_FirstIPInSlash31 verifies that the first IP in the /31 also +// triggers an Up submission (not only the second IP). +func TestTick_BGPUp_FirstIPInSlash31(t *testing.T) { + devicePK := solana.NewWallet().PublicKey() + user := makeActivatedUser(devicePK, [5]byte{10, 0, 0, 0, 31}) + + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + col := fixedCollector(map[string]struct{}{"10.0.0.0": {}}) + s := newTestSubmitter(t, clockwork.NewFakeClock(), &mockExecutor{}, svc, col, devicePK, 0, 6*time.Hour) + s.tick(context.Background()) + + s.mu.Lock() + enqueued := len(s.taskCh) + s.mu.Unlock() + + if enqueued != 1 { + t.Fatalf("expected 1 task enqueued, got %d", enqueued) + } + task := <-s.taskCh + if task.status != serviceability.BGPStatusUp { + t.Errorf("expected Up task, got %v", task.status) + } +} + +// TestTick_BGPDown_PreviouslyUp verifies that an empty established set when the +// user was previously Up results in a Down submission. +func TestTick_BGPDown_PreviouslyUp(t *testing.T) { + devicePK := solana.NewWallet().PublicKey() + user := makeActivatedUser(devicePK, [5]byte{10, 0, 0, 0, 31}) + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + clk := clockwork.NewFakeClock() + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + col := fixedCollector(map[string]struct{}{}) + s := newTestSubmitter(t, clk, &mockExecutor{}, svc, col, devicePK, 0, 6*time.Hour) + + s.mu.Lock() + s.userState[userPK] = &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: clk.Now().Add(-1 * time.Minute), + } + s.mu.Unlock() + + s.tick(context.Background()) + + s.mu.Lock() + enqueued := len(s.taskCh) + s.mu.Unlock() + + if enqueued != 1 { + t.Fatalf("expected 1 task enqueued, got %d", enqueued) + } + task := <-s.taskCh + if task.status != serviceability.BGPStatusDown { + t.Errorf("expected Down task, got %v", task.status) + } +} + +// TestTick_CollectorError_NoTask verifies that a collector error causes tick to +// abort without enqueueing any tasks. +func TestTick_CollectorError_NoTask(t *testing.T) { + devicePK := solana.NewWallet().PublicKey() + user := makeActivatedUser(devicePK, [5]byte{10, 0, 0, 0, 31}) + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + s := newTestSubmitter(t, clockwork.NewFakeClock(), &mockExecutor{}, svc, + func(_ context.Context) (map[string]struct{}, error) { + return nil, errors.New("gNMI unavailable") + }, devicePK, 0, 6*time.Hour) + + s.mu.Lock() + s.userState[userPK] = &userState{lastOnchainStatus: serviceability.BGPStatusUp} + s.mu.Unlock() + + s.tick(context.Background()) + + s.mu.Lock() + enqueued := len(s.taskCh) + s.mu.Unlock() + + if enqueued != 0 { + t.Errorf("expected no tasks when collector fails, got %d", enqueued) + } +} + +// TestTick_MulticastUser verifies that multicast users are handled identically +// to regular users — the peer IP check is type-agnostic. +func TestTick_MulticastUser(t *testing.T) { + devicePK := solana.NewWallet().PublicKey() + user := makeMulticastUser(devicePK, [5]byte{10, 0, 3, 0, 31}) + + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + col := fixedCollector(map[string]struct{}{"10.0.3.1": {}}) + s := newTestSubmitter(t, clockwork.NewFakeClock(), &mockExecutor{}, svc, col, devicePK, 0, 6*time.Hour) + s.tick(context.Background()) + + s.mu.Lock() + enqueued := len(s.taskCh) + s.mu.Unlock() + + if enqueued != 1 { + t.Fatalf("expected 1 task enqueued, got %d", enqueued) + } + task := <-s.taskCh + if task.status != serviceability.BGPStatusUp { + t.Errorf("expected Up task, got %v", task.status) + } +} + +// ============================================================ +// parseEstablished +// ============================================================ + +func gnmiStateJSON(sessionState string) []byte { + b, _ := json.Marshal(map[string]string{ + "openconfig-network-instance:session-state": sessionState, + }) + return b +} + +func buildGetResponse(neighborAddr, sessionState string) *gpb.GetResponse { + return &gpb.GetResponse{ + Notification: []*gpb.Notification{ + { + Update: []*gpb.Update{ + { + Path: &gpb.Path{ + Elem: []*gpb.PathElem{ + {Name: "network-instances"}, + {Name: "network-instance", Key: map[string]string{"name": "default"}}, + {Name: "bgp"}, + {Name: "neighbors"}, + {Name: "neighbor", Key: map[string]string{"neighbor-address": neighborAddr}}, + {Name: "state"}, + }, + }, + Val: &gpb.TypedValue{ + Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: gnmiStateJSON(sessionState)}, + }, + }, + }, + }, + }, + } +} + +func TestParseEstablished_ESTABLISHED(t *testing.T) { + resp := buildGetResponse("10.0.0.1", "ESTABLISHED") + got := parseEstablished(resp) + if _, ok := got["10.0.0.1"]; !ok { + t.Error("expected 10.0.0.1 in established set") + } + if len(got) != 1 { + t.Errorf("expected 1 entry, got %d", len(got)) + } +} + +func TestParseEstablished_NonEstablished(t *testing.T) { + for _, state := range []string{"IDLE", "ACTIVE", "CONNECT", "OPENSENT", "OPENCONFIRM"} { + resp := buildGetResponse("10.0.0.1", state) + got := parseEstablished(resp) + if _, ok := got["10.0.0.1"]; ok { + t.Errorf("state=%s: did not expect 10.0.0.1 in established set", state) + } + } +} + +func TestParseEstablished_Multiple(t *testing.T) { + resp := &gpb.GetResponse{ + Notification: []*gpb.Notification{ + { + Update: []*gpb.Update{ + { + Path: &gpb.Path{Elem: []*gpb.PathElem{ + {Name: "neighbor", Key: map[string]string{"neighbor-address": "10.0.0.1"}}, + {Name: "state"}, + }}, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: gnmiStateJSON("ESTABLISHED")}}, + }, + { + Path: &gpb.Path{Elem: []*gpb.PathElem{ + {Name: "neighbor", Key: map[string]string{"neighbor-address": "10.0.0.3"}}, + {Name: "state"}, + }}, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: gnmiStateJSON("ACTIVE")}}, + }, + { + Path: &gpb.Path{Elem: []*gpb.PathElem{ + {Name: "neighbor", Key: map[string]string{"neighbor-address": "10.0.0.5"}}, + {Name: "state"}, + }}, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: gnmiStateJSON("ESTABLISHED")}}, + }, + }, + }, + }, + } + got := parseEstablished(resp) + if _, ok := got["10.0.0.1"]; !ok { + t.Error("expected 10.0.0.1 in established set") + } + if _, ok := got["10.0.0.5"]; !ok { + t.Error("expected 10.0.0.5 in established set") + } + if _, ok := got["10.0.0.3"]; ok { + t.Error("did not expect 10.0.0.3 (ACTIVE) in established set") + } +} + +func TestParseEstablished_NeighborAddressInPrefix(t *testing.T) { + // Some Arista responses place the neighbor key in the notification prefix. + resp := &gpb.GetResponse{ + Notification: []*gpb.Notification{ + { + Prefix: &gpb.Path{ + Elem: []*gpb.PathElem{ + {Name: "neighbor", Key: map[string]string{"neighbor-address": "10.0.1.0"}}, + }, + }, + Update: []*gpb.Update{ + { + Path: &gpb.Path{Elem: []*gpb.PathElem{{Name: "state"}}}, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: gnmiStateJSON("ESTABLISHED")}}, + }, + }, + }, + }, + } + got := parseEstablished(resp) + if _, ok := got["10.0.1.0"]; !ok { + t.Error("expected 10.0.1.0 in established set (address was in prefix)") + } +} + +func TestParseEstablished_EmptyResponse(t *testing.T) { + got := parseEstablished(&gpb.GetResponse{}) + if len(got) != 0 { + t.Errorf("expected empty set for empty response, got %d entries", len(got)) + } +} From c29d24b0f64f55a3a02e31769ed1be4103bed7a3 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Tue, 5 May 2026 16:52:07 -0400 Subject: [PATCH 2/2] bgpstatus: add changelog entry for gNMI collector --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ebeef449..83f8817ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ All notable changes to this project will be documented in this file. - Activator - Delete the `activator/` crate from the workspace; onchain allocation (RFC-11) supersedes it. The deployed activator was frozen in Phase 1 ([#3608](https://github.com/malbeclabs/doublezero/pull/3608), [#3628](https://github.com/malbeclabs/doublezero/pull/3628)) and removed from e2e in Phase 2 ([#3609](https://github.com/malbeclabs/doublezero/pull/3609), [#3610](https://github.com/malbeclabs/doublezero/pull/3610), [#3611](https://github.com/malbeclabs/doublezero/pull/3611), [#3629](https://github.com/malbeclabs/doublezero/pull/3629)). The `*/activate`, `*/reject`, and `*/closeaccount` onchain instructions and their SDK command modules remain in place for older CLIs until the min-version gate ([#3612](https://github.com/malbeclabs/doublezero/issues/3612)) +- Telemetry + - Replace the Linux netlink BGP session collector in `bgpstatus` with a gNMI Get to the Arista local gNMI socket ([#3674](https://github.com/malbeclabs/doublezero/pull/3674)) - Smartcontract - Stop writing `InterfaceV3` from `CreateDeviceInterface` and `UpdateDeviceInterface`; `CurrentInterfaceVersion` is now `InterfaceV2`. `MigrateDeviceInterfaces` and `BackfillTopology` continue to write `InterfaceV3` since they are admin-controlled and need the `flex_algo_node_segments` field - Add forward-compatible `NewInterface` struct in `state/interface.rs` with a `size: u16` + `version: u8` on-disk prefix, V3-shaped body, and `flex_algo_node_segments`. Older readers can use the size prefix to skip past unknown future versions in constant time. Additive only — no callers, processors, or SDKs change in this PR ([#3666](https://github.com/malbeclabs/doublezero/pull/3666))