diff --git a/CHANGELOG.md b/CHANGELOG.md index 836d04ac64..52ddc31152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ All notable changes to this project will be documented in this file. - Smartcontract - Migrate read callers in the CLI, sentinel, client, controlplane admin, and Rust SDK topology helper to read interfaces from `Device::new_interfaces` instead of the legacy `interfaces` enum vec, and adopt the `Device::find_interface` signature that returns `&NewInterface`. The legacy `interfaces` slot is still written on-disk via the per-write V2 projection from #3667; this PR only migrates reads. The temporary `Device::find_interface_legacy` helper is retained for the smartcontract program processors, which migrate in a later issue. Activator is intentionally excluded — it is deprecated ([#3659](https://github.com/malbeclabs/doublezero/issues/3659)) +- Client + - Add periodic kernel route reconciliation to `doublezerod` that detects and reinstalls missing routes, with a metric tracking install failures ([#3669](https://github.com/malbeclabs/doublezero/issues/3669)) - Activator - Delete the `activator/` crate from the workspace; onchain allocation (RFC-11) supersedes it. The deployed activator was frozen in Phase 1 ([#3608](https://github.com/malbeclabs/doublezero/pull/3608), [#3628](https://github.com/malbeclabs/doublezero/pull/3628)) and removed from e2e in Phase 2 ([#3609](https://github.com/malbeclabs/doublezero/pull/3609), [#3610](https://github.com/malbeclabs/doublezero/pull/3610), [#3611](https://github.com/malbeclabs/doublezero/pull/3611), [#3629](https://github.com/malbeclabs/doublezero/pull/3629)). The `*/activate`, `*/reject`, and `*/closeaccount` onchain instructions and their SDK command modules remain in place for older CLIs until the min-version gate ([#3612](https://github.com/malbeclabs/doublezero/issues/3612)) - Smartcontract diff --git a/client/doublezerod/cmd/doublezerod/main.go b/client/doublezerod/cmd/doublezerod/main.go index c0453969cb..f2d4b8dde3 100644 --- a/client/doublezerod/cmd/doublezerod/main.go +++ b/client/doublezerod/cmd/doublezerod/main.go @@ -45,13 +45,14 @@ var ( stateDir = flag.String("state-dir", "/var/lib/doublezerod", "directory for persistent state files") // Route liveness configuration flags. - routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") - routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") - routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") - routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") - routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") - routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") - routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") + routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") + routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") + routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") + routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") + routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") + routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", defaultRouteLivenessReconcileInterval, "interval for periodic kernel route reconciliation; 0 disables") + routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") + routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") // TODO(snormore): These flags are temporary for initial rollout testing. // They will be superceded by a single `route-liveness-enable` flag, where false means @@ -66,12 +67,13 @@ var ( ) const ( - defaultOnchainRPCTimeout = 30 * time.Second - defaultRouteLivenessTxMin = 1 * time.Second - defaultRouteLivenessRxMin = 1 * time.Second - defaultRouteLivenessDetectMult = 3 - defaultRouteLivenessMinTxFloor = 50 * time.Millisecond - defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultOnchainRPCTimeout = 30 * time.Second + defaultRouteLivenessTxMin = 1 * time.Second + defaultRouteLivenessRxMin = 1 * time.Second + defaultRouteLivenessDetectMult = 3 + defaultRouteLivenessMinTxFloor = 50 * time.Millisecond + defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultRouteLivenessReconcileInterval = 30 * time.Second defaultRouteLivenessBindIP = "0.0.0.0" ) @@ -179,6 +181,8 @@ func main() { EnablePeerMetrics: *routeLivenessPeerMetrics, + RouteReconcileInterval: *routeLivenessReconcileInterval, + // Default to treating peers that advertise passive mode as passive. That is, we will // install their routes immediately and never uninstall them on down events. HonorPeerAdvertisedPassive: true, diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 32556c282b..0bfd72d3b8 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -13,6 +13,7 @@ import ( "github.com/malbeclabs/doublezero/client/doublezerod/internal/routing" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" ) const ( @@ -25,6 +26,9 @@ const ( defaultBackoffMax = 1 * time.Minute defaultMaxEvents = 10240 + + // Default interval for periodic kernel route reconciliation. + defaultRouteReconcileInterval = 30 * time.Second ) // Peer identifies a remote endpoint and the local interface context used to reach it. @@ -92,6 +96,10 @@ type ManagerConfig struct { // Client version to advertise to peers in control packets. ClientVersion string + + // RouteReconcileInterval controls how often the manager scans the kernel + // routing table for missing routes and reinstalls them. Zero disables. + RouteReconcileInterval time.Duration } // Validate fills defaults and enforces constraints for ManagerConfig. @@ -151,6 +159,12 @@ func (c *ManagerConfig) Validate() error { if c.ClientVersion == "" { return errors.New("clientVersion is required") } + if c.RouteReconcileInterval < 0 { + return errors.New("routeReconcileInterval must be non-negative") + } + if c.RouteReconcileInterval == 0 { + c.RouteReconcileInterval = defaultRouteReconcileInterval + } return nil } @@ -291,6 +305,26 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR } }() + // Route reconciliation goroutine: periodically scans the kernel routing + // table for missing routes and reinstalls them. + if cfg.RouteReconcileInterval > 0 { + log.Info("liveness: route reconciliation enabled", "interval", cfg.RouteReconcileInterval.String()) + m.wg.Add(1) + go func() { + defer m.wg.Done() + ticker := time.NewTicker(cfg.RouteReconcileInterval) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.reconcileRoutes() + } + } + }() + } + // If any routes are configured to be excluded, mark then as AdminDown immediately. if m.cr != nil { for ip := range m.cr.GetExcluded() { @@ -834,7 +868,7 @@ func (m *manager) onSessionDown(sess *Session) { } if m.cfg.PassiveMode { - m.log.Debug("liveness: session down (global passive; keeping route)", + m.log.Info("liveness: session down (global passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -845,7 +879,7 @@ func (m *manager) onSessionDown(sess *Session) { } if effectivelyPassive { - m.log.Debug("liveness: session down (peer passive; keeping route)", + m.log.Info("liveness: session down (peer passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -875,6 +909,86 @@ func (m *manager) onSessionDown(sess *Session) { ) } +// reconcileRoutes scans the kernel routing table for routes that should be +// installed but are missing, and reinstalls them. This mitigates routes being +// removed by external processes. +func (m *manager) reconcileRoutes() { + // Snapshot installed and desired under lock. + type installedRoute struct { + rk RouteKey + route *Route + } + m.mu.Lock() + toCheck := make([]installedRoute, 0, len(m.installed)) + for rk, ok := range m.installed { + if !ok { + continue + } + if r, exists := m.desired[rk]; exists { + toCheck = append(toCheck, installedRoute{rk: rk, route: r}) + } + } + m.mu.Unlock() + + if len(toCheck) == 0 { + return + } + + kernelRoutes, err := m.cfg.Netlinker.RouteByProtocol(unix.RTPROT_BGP) + if err != nil { + m.log.Error("liveness: error fetching kernel routes for reconciliation", "error", err) + return + } + + // Build a lookup set keyed by (table, dst, nexthop, src) for fast matching. + type kernelKey struct { + Table int + DstIP string + NextHop string + SrcIP string + } + kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes)) + for _, kr := range kernelRoutes { + var dstIP, nhIP, srcIP string + if kr.Dst != nil && kr.Dst.IP != nil && kr.Dst.IP.To4() != nil { + dstIP = kr.Dst.IP.To4().String() + } + if kr.NextHop != nil && kr.NextHop.To4() != nil { + nhIP = kr.NextHop.To4().String() + } + if kr.Src != nil && kr.Src.To4() != nil { + srcIP = kr.Src.To4().String() + } + kernelSet[kernelKey{Table: kr.Table, DstIP: dstIP, NextHop: nhIP, SrcIP: srcIP}] = struct{}{} + } + + for _, ir := range toCheck { + kk := kernelKey{Table: ir.route.Table, DstIP: ir.rk.DstPrefix, NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP} + if _, present := kernelSet[kk]; present { + continue + } + // Re-check under lock: the route may have been intentionally withdrawn + // between our snapshot and now (e.g. by onSessionDown). + m.mu.Lock() + stillInstalled := m.installed[ir.rk] + m.mu.Unlock() + if !stillInstalled { + continue + } + m.log.Warn("liveness: reinstalling missing route", + "route", ir.route.String(), + "iface", ir.rk.Interface, + ) + if err := m.cfg.Netlinker.RouteAdd(&ir.route.Route); err != nil { + m.log.Error("liveness: error reinstalling route", + "error", err, "route", ir.route.String()) + m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc() + } else { + m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) + } + } +} + // isPeerEffectivelyPassive returns true when this session should not have its // dataplane (kernel route) managed due to peer-advertised passive mode. // diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index c015ef4a1d..0c8a3a607e 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -1650,6 +1650,130 @@ func metricHasLabels(m *prom.Metric, labels prometheus.Labels) bool { return true } +func TestClient_Liveness_Manager_ReconcileRoutes_ReinstallsMissing(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return empty — no routes in kernel. + return nil, nil + }, + } + + m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour // disable ticker; we call manually + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(nil) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // RegisterRoute in PassiveMode calls RouteAdd once. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 1, addCalls, "expected one RouteAdd call to reinstall the missing route") + mock.mu.Unlock() + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(1), reinstalls) +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(rr *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return the route as present in kernel. + return []*routing.Route{&r.Route}, nil + }, + } + + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Reset after RegisterRoute's install. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that is present in the kernel") + mock.mu.Unlock() +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + return nil, nil + }, + } + + // Active mode: route is registered but not installed until session goes Up. + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = false + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(func(r *Route) { + r.Src = net.IPv4(127, 0, 0, 1) + r.Dst = &net.IPNet{IP: net.IPv4(127, 0, 0, 2), Mask: net.CIDRMask(32, 32)} + }) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // In active mode, installed[rk] is false until session goes Up. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that was never installed") + mock.mu.Unlock() +} + func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 { t.Helper() diff --git a/client/doublezerod/internal/liveness/metrics.go b/client/doublezerod/internal/liveness/metrics.go index d1e9e31842..4aff9a6a1e 100644 --- a/client/doublezerod/internal/liveness/metrics.go +++ b/client/doublezerod/internal/liveness/metrics.go @@ -42,6 +42,7 @@ type Metrics struct { DesiredMapSize prometheus.Gauge PeerSessions *prometheus.GaugeVec PeerDetectTime *prometheus.GaugeVec + RouteReinstalls *prometheus.CounterVec } var ( @@ -212,6 +213,13 @@ func newMetrics() *Metrics { Help: "Size of the desired map", }, ), + RouteReinstalls: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_liveness_route_reinstalls_total", + Help: "Count of routes reinstalled after being removed from the kernel by an external process", + }, + serviceLabels, + ), } } @@ -241,6 +249,7 @@ func (m *Metrics) Register(r prometheus.Registerer) { m.DesiredMapSize, m.PeerSessions, m.PeerDetectTime, + m.RouteReinstalls, ) } @@ -296,6 +305,10 @@ func (m *Metrics) convergenceToUp(peer Peer, convergence time.Duration) { m.ConvergenceToUp.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) } +func (m *Metrics) routeReinstall(iface, localIP string) { + m.RouteReinstalls.WithLabelValues(iface, localIP).Inc() +} + func (m *Metrics) convergenceToDown(peer Peer, convergence time.Duration) { m.ConvergenceToDown.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) }