Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ All notable changes to this project will be documented in this file.

- Activator
- Delete the `activator/` crate from the workspace; onchain allocation (RFC-11) supersedes it. The deployed activator was frozen in Phase 1 ([#3608](https://github.com/malbeclabs/doublezero/pull/3608), [#3628](https://github.com/malbeclabs/doublezero/pull/3628)) and removed from e2e in Phase 2 ([#3609](https://github.com/malbeclabs/doublezero/pull/3609), [#3610](https://github.com/malbeclabs/doublezero/pull/3610), [#3611](https://github.com/malbeclabs/doublezero/pull/3611), [#3629](https://github.com/malbeclabs/doublezero/pull/3629)). The `*/activate`, `*/reject`, and `*/closeaccount` onchain instructions and their SDK command modules remain in place for older CLIs until the min-version gate ([#3612](https://github.com/malbeclabs/doublezero/issues/3612))
- Telemetry
- Replace the Linux netlink BGP session collector in `bgpstatus` with a gNMI Get to the Arista local gNMI socket ([#3674](https://github.com/malbeclabs/doublezero/pull/3674))
- Smartcontract
- Stop writing `InterfaceV3` from `CreateDeviceInterface` and `UpdateDeviceInterface`; `CurrentInterfaceVersion` is now `InterfaceV2`. `MigrateDeviceInterfaces` and `BackfillTopology` continue to write `InterfaceV3` since they are admin-controlled and need the `flex_algo_node_segments` field
- Add forward-compatible `NewInterface` struct in `state/interface.rs` with a `size: u16` + `version: u8` on-disk prefix, V3-shaped body, and `flex_algo_node_segments`. Older readers can use the size prefix to skip past unknown future versions in constant time. Additive only — no callers, processors, or SDKs change in this PR ([#3666](https://github.com/malbeclabs/doublezero/pull/3666))
Expand Down
28 changes: 23 additions & 5 deletions controlplane/telemetry/cmd/telemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/malbeclabs/doublezero/config"
"github.com/malbeclabs/doublezero/controlplane/agent/pkg/arista"
aristapb "github.com/malbeclabs/doublezero/controlplane/proto/arista/gen/pb-go/arista/EosSdkRpc"
gpb "github.com/openconfig/gnmi/proto/gnmi"

"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/bgpstatus"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/gnmitunnel"
Expand Down Expand Up @@ -379,7 +381,7 @@ func main() {
var bgpStatusErrCh <-chan error
if *bgpStatusEnable {
bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK,
serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient)
serviceabilityProgramID, cachedSvcClient, rpcClient)
}

// Wait for the context to be done or an error to be returned.
Expand Down Expand Up @@ -412,20 +414,36 @@ func startBGPStatusSubmitter(
keypair solana.PrivateKey,
localDevicePK solana.PublicKey,
serviceabilityProgramID solana.PublicKey,
localNet netutil.LocalNet,
bgpNamespace string,
svcClient telemetrysvc.ProgramDataProvider,
rpcClient *solanarpc.Client,
) <-chan error {
executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID)

const gnmiSocketPath = "/var/run/gnmiServer.sock"
gnmiConn, err := grpc.NewClient(
"passthrough:///"+gnmiSocketPath,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(dialCtx context.Context, _ string) (net.Conn, error) {
if *managementNamespace != "" {
return netns.RunInNamespace(*managementNamespace, func() (net.Conn, error) {
return (&net.Dialer{}).DialContext(dialCtx, "unix", gnmiSocketPath)
})
}
return (&net.Dialer{}).DialContext(dialCtx, "unix", gnmiSocketPath)
}),
)
if err != nil {
log.Error("failed to create gNMI client connection for BGP status", "error", err)
os.Exit(1)
}
gnmiClient := gpb.NewGNMIClient(gnmiConn)

sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{
Log: log,
Executor: executor,
ServiceabilityClient: svcClient,
Collector: bgpstatus.DefaultCollector(localNet),
Collector: bgpstatus.GNMICollector(gnmiClient),
LocalDevicePK: localDevicePK,
BGPNamespace: bgpNamespace,
Interval: *bgpStatusInterval,
PeriodicRefreshInterval: *bgpStatusRefreshInterval,
DownGracePeriod: *bgpStatusDownGracePeriod,
Expand Down
103 changes: 22 additions & 81 deletions controlplane/telemetry/internal/bgpstatus/bgpstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (
"fmt"
"log/slog"
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/gagliardetto/solana-go"
"github.com/jonboulle/clockwork"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
)

Expand All @@ -35,20 +32,18 @@ type ServiceabilityClient interface {
GetProgramData(ctx context.Context) (*serviceability.ProgramData, error)
}

// NamespaceCollector collects BGP session state and local network interfaces
// from a single Linux VRF network namespace. It returns the set of remote IP
// strings with ESTABLISHED BGP sessions, the local interfaces, and any error.
// Implement with DefaultCollector for production; use a mock in tests.
type NamespaceCollector func(ctx context.Context, namespace string) (established map[string]struct{}, ifaces []netutil.Interface, err error)
// BGPCollector returns the set of remote IP strings with ESTABLISHED BGP
// sessions, across all network instances visible to the device. It is called
// once per tick. Implement with GNMICollector for production; use a mock in tests.
type BGPCollector func(ctx context.Context) (established map[string]struct{}, err error)

// Config holds all parameters for the BGP status submitter.
type Config struct {
Log *slog.Logger
Executor BGPStatusExecutor
ServiceabilityClient ServiceabilityClient
Collector NamespaceCollector
Collector BGPCollector
LocalDevicePK solana.PublicKey
BGPNamespace string
Interval time.Duration // default: 60s
PeriodicRefreshInterval time.Duration // default: 6h
DownGracePeriod time.Duration // default: 0
Expand All @@ -71,9 +66,6 @@ func (c *Config) validate() error {
if c.LocalDevicePK.IsZero() {
return errors.New("local device pubkey is required")
}
if c.BGPNamespace == "" {
return errors.New("bgp namespace is required")
}
if c.Interval <= 0 {
c.Interval = defaultInterval
}
Expand All @@ -99,7 +91,7 @@ type submitTask struct {
status serviceability.BGPStatus
}

// Submitter collects BGP socket state on each tick, determines per-user BGP
// Submitter collects BGP session state on each tick, determines per-user BGP
// status, and submits SetUserBGPStatus onchain via a non-blocking worker.
type Submitter struct {
cfg Config
Expand All @@ -125,8 +117,7 @@ func NewSubmitter(cfg Config) (*Submitter, error) {
}

// Start launches the submitter in the background and returns a channel that
// receives a fatal error (or is closed on clean shutdown). It mirrors the
// state.Collector.Start pattern.
// receives a fatal error (or is closed on clean shutdown).
func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error {
errCh := make(chan error, 1)
go func() {
Expand All @@ -141,8 +132,8 @@ func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan
}

// userStateFor returns or creates the per-user tracking entry (caller must hold s.mu).
// initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so
// that a restarted submitter correctly handles users whose onchain state is already Up.
// initialStatus seeds lastOnchainStatus so a restarted submitter correctly handles
// users whose onchain state is already Up.
func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState {
us, ok := s.userState[key]
if !ok {
Expand All @@ -152,35 +143,27 @@ func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPSta
return us
}

// bgpSocket is the minimal BGP socket representation used by the pure helpers.
// The Linux-specific submitter.go converts state.BGPSocketState to this type.
type bgpSocket struct {
RemoteIP string
State string
}

// --- Pure helpers (no Linux syscalls; fully testable on all platforms) ---

// buildEstablishedIPSet returns a set of remote IP strings for BGP sessions
// that are currently in the ESTABLISHED state.
func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} {
m := make(map[string]struct{}, len(sockets))
for _, sock := range sockets {
if sock.State == "ESTABLISHED" {
m[sock.RemoteIP] = struct{}{}
}
}
return m
}
// --- Pure helpers (no platform-specific code; fully testable on all platforms) ---

// tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a
// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length].
// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length].
func tunnelNetToIPNet(b [5]byte) *net.IPNet {
ip := net.IPv4(b[0], b[1], b[2], b[3])
mask := net.CIDRMask(int(b[4]), 32)
return &net.IPNet{IP: ip.To4(), Mask: mask}
}

// peerIPsFor31 returns both host IPs in a /31 network. Since tunnel IPs are
// globally unique (onchain-allocated), exactly one of the two will be the
// BGP neighbor address for a given user on this device.
func peerIPsFor31(tunnelNet *net.IPNet) (net.IP, net.IP) {
ip0 := tunnelNet.IP.To4()
ip1 := make(net.IP, 4)
copy(ip1, ip0)
ip1[3] ^= 1
return ip0, ip1
}

// computeEffectiveStatus derives the BGP status to report, applying the down
// grace period: if observedUp is false but the user was last seen Up within
// gracePeriod, we still report Up to avoid transient flaps.
Expand All @@ -202,48 +185,6 @@ func computeEffectiveStatus(
return serviceability.BGPStatusDown
}

// rootNamespace is the sentinel passed to DefaultCollector / RunInNamespace
// to indicate the root (global) Linux network namespace. Arista EOS places
// the default VRF in the root namespace rather than a named namespace under
// /var/run/netns/, so there is no file to open with netns.GetFromName.
// RunInNamespace treats "" as "execute in the current namespace" (no switching).
const rootNamespace = ""

// vrfNamespaces builds the list of Linux network namespaces to check for BGP
// sockets and tunnel interfaces. The base namespace is always included first.
// Additional namespaces are derived from two sources:
// - Tenant VRF IDs (non-zero): replaces the trailing numeric suffix of base
// (e.g. "ns-vrf1") with each tenant's VrfId, giving e.g. "ns-vrf2".
// - Multicast users: GRE tunnels for multicast users live in the global VRF
// (the root network namespace), not in a per-tenant namespace. rootNamespace
// is appended if any user in the provided slice has UserTypeMulticast.
func vrfNamespaces(base string, tenants []serviceability.Tenant, users []serviceability.User) []string {
prefix := strings.TrimRight(base, "0123456789")
seen := map[string]struct{}{base: {}}
nss := []string{base}
for _, t := range tenants {
if t.VrfId == 0 {
continue
}
ns := prefix + strconv.FormatUint(uint64(t.VrfId), 10)
if _, ok := seen[ns]; !ok {
seen[ns] = struct{}{}
nss = append(nss, ns)
}
}
// Multicast users' GRE tunnels live in the root network namespace (global VRF).
if _, ok := seen[rootNamespace]; !ok {
for _, u := range users {
if u.UserType == serviceability.UserTypeMulticast {
seen[rootNamespace] = struct{}{}
nss = append(nss, rootNamespace)
break
}
}
}
return nss
}

// shouldSubmit returns true when a submission is warranted: either the status
// has changed from what was last confirmed onchain, or it is time for a
// periodic keepalive write.
Expand Down
Loading
Loading