diff --git a/.gitignore b/.gitignore index dd0390c..046ca06 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,6 @@ bin/ # Environment files .env .env.* - +.DS_Store # XML files (repomix output) *.xml diff --git a/Dockerfile b/Dockerfile index 354c136..3404b69 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.24.5-alpine AS builder +FROM golang:1.25-alpine AS builder WORKDIR /app diff --git a/README.md b/README.md index bbbcd72..943e090 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,46 @@ INFO[2025-07-10T17:16:04+05:30] Listening on addresses: [/ip4/127.0.0.1/tcp/4001 From the example above, a full multiaddress to use for other nodes would be: `/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWCNsSau1o9MeMVpHudvHaZRLESRcaGVK9FPKhdLU36BtF` +## Debugging Memory / Performance + +### Periodic Status Logs + +Every 60 seconds the node logs a status line with key metrics: + +``` +Status: connected=150 peerstore=152 dht_rt=20 goroutines=45 heap_alloc=28MB heap_inuse=32MB sys=55MB +``` + +| Metric | What to watch for | +|---|---| +| `peerstore` growing >> `connected` | Peerstore GC not cleaning fast enough | +| `dht_rt` growing unbounded | DHT routing table accumulating entries | +| `goroutines` growing | Goroutine leak | +| `heap_alloc` growing while others stable | Leak in libp2p internals (gossipsub, relay, etc.) | + +### pprof Endpoint + +Set `PPROF_PORT=6060` in your `.env` file to enable the Go pprof debug server. The port is already wired in `docker-compose.yaml`. + +```bash +# Heap profile — what's using memory right now +go tool pprof http://localhost:6060/debug/pprof/heap + +# Allocations — what's been allocating the most over time +go tool pprof -alloc_space http://localhost:6060/debug/pprof/heap + +# Compare two snapshots to find what grew (most useful) +curl -o heap1.pb.gz http://localhost:6060/debug/pprof/heap +# ... wait 30 min ... +curl -o heap2.pb.gz http://localhost:6060/debug/pprof/heap +go tool pprof -base heap1.pb.gz heap2.pb.gz + +# Goroutine dump +curl http://localhost:6060/debug/pprof/goroutine?debug=2 +``` + +The pprof diff (`-base`) is the most powerful — it shows exactly which allocations grew in the window, narrowing down whether the source is peerstore, DHT, gossipsub, relay, or something else. + ## Usage with Other Nodes To configure other libp2p nodes (like the `snapshotter-lite-local-collector` or `submission-topic-watcher`) to use this bootstrap node, you typically pass its full multiaddress via a command-line flag or environment variable (e.g., `--bootstrap` flag for the watcher, or `BOOTSTRAP_NODE_ADDR` environment variable for the collector). \ No newline at end of file diff --git a/build-docker.sh b/build-docker.sh index f07c34f..7e19b0d 100755 --- a/build-docker.sh +++ b/build-docker.sh @@ -1,4 +1,14 @@ #!/bin/bash +# Detect docker compose command (docker compose plugin vs docker-compose standalone) +if docker compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker compose" +elif docker-compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker-compose" +else + echo "Error: Neither 'docker compose' nor 'docker-compose' found. Please install Docker Compose." + exit 1 +fi + # Build the Docker image using docker-compose -docker-compose build \ No newline at end of file +$DOCKER_COMPOSE build \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 995decb..30c9e25 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,8 +6,11 @@ import ( "encoding/hex" "flag" "fmt" + "net/http" + _ "net/http/pprof" "os" "os/signal" + "runtime" "strconv" "submissions-bootstrap-node/pkg/config" "submissions-bootstrap-node/pkg/service" @@ -117,9 +120,20 @@ func main() { log.Infof("🚀 Bootstrap node started. ID: %s", node.Host.ID().String()) log.Infof("🌍 Listening on addresses: %s", node.Host.Addrs()) - // Start periodic peer logging + // Start pprof debug server if PPROF_PORT is set + if pprofPort := os.Getenv("PPROF_PORT"); pprofPort != "" { + go func() { + addr := ":" + pprofPort + log.Infof("Starting pprof server on %s", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Errorf("pprof server failed: %v", err) + } + }() + } + + // Start periodic peer and memory logging go func() { - ticker := time.NewTicker(60 * time.Second) // Log every 10 seconds + ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { select { @@ -127,7 +141,15 @@ func main() { return case <-ticker.C: peers := node.Host.Network().Peers() - log.Infof("Connected peers: %d", len(peers)) + peerstoreSize := len(node.Host.Peerstore().Peers()) + dhtSize := node.DHT.RoutingTable().Size() + + var m runtime.MemStats + runtime.ReadMemStats(&m) + + log.Infof("Status: connected=%d peerstore=%d dht_rt=%d goroutines=%d heap_alloc=%dMB heap_inuse=%dMB sys=%dMB", + len(peers), peerstoreSize, dhtSize, runtime.NumGoroutine(), + m.HeapAlloc/1024/1024, m.HeapInuse/1024/1024, m.Sys/1024/1024) for _, p := range peers { log.Debugf(" - %s", p.String()) } @@ -139,8 +161,11 @@ func main() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs + signal.Stop(sigs) fmt.Println() log.Info("Shutting down bootstrap node...") - node.Host.Close() + if err := node.Close(); err != nil { + log.Errorf("Error during shutdown: %v", err) + } } diff --git a/docker-compose.yaml b/docker-compose.yaml index 4878e75..50643c9 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,5 +1,3 @@ -version: '3.8' - services: bootstrap-node: build: @@ -7,6 +5,7 @@ services: dockerfile: Dockerfile ports: - "${BOOTSTRAP_PORT:-4001}:${BOOTSTRAP_PORT:-4001}" + - "${PPROF_PORT:-6060}:${PPROF_PORT:-6060}" env_file: - ./.env restart: unless-stopped @@ -17,3 +16,10 @@ services: environment: - LOG_FILE=/app/logs/bootstrap-node.log - LOG_LEVEL=${LOG_LEVEL:-info} + - PPROF_PORT=${PPROF_PORT:-6060} + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "5" + compress: "true" diff --git a/pkg/service/bootstrap.go b/pkg/service/bootstrap.go index 977b05c..46ef82b 100644 --- a/pkg/service/bootstrap.go +++ b/pkg/service/bootstrap.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/net/connmgr" @@ -28,51 +29,64 @@ import ( // BootstrapNode struct holds the libp2p host and the DHT type BootstrapNode struct { - Host host.Host - DHT *dht.IpfsDHT + Host host.Host + Pubsub *pubsub.PubSub + DHT *dht.IpfsDHT + notificationBundle *network.NotifyBundle + ctx context.Context + cancel context.CancelFunc } // NewBootstrapNode creates and initializes a new libp2p host configured as a bootstrap node func NewBootstrapNode(ctx context.Context, port int, cfg config.Config) (*BootstrapNode, error) { + // Create cancelable context for this bootstrap node + hostCtx, cancel := context.WithCancel(ctx) var priv crypto.PrivKey var err error if cfg.PrivateKey != "" { privBytes, err := hex.DecodeString(cfg.PrivateKey) if err != nil { + cancel() return nil, fmt.Errorf("failed to decode private key: %w", err) } priv, err = crypto.UnmarshalEd25519PrivateKey(privBytes) if err != nil { + cancel() return nil, fmt.Errorf("failed to unmarshal private key: %w", err) } } else { priv, _, err = crypto.GenerateEd25519Key(rand.Reader) if err != nil { + cancel() return nil, fmt.Errorf("failed to generate private key: %w", err) } } // 1. Create a new resource manager with custom limits. + // BOUND connections to match connection manager limits to prevent unbounded peer scoring + // Peer scoring tracks state per connection, so bounding connections bounds peer score memory scalingLimits := rcmgr.DefaultLimits limitsCfg := rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ StreamsOutbound: rcmgr.Unlimited, StreamsInbound: rcmgr.Unlimited, Streams: rcmgr.Unlimited, - Conns: rcmgr.Unlimited, - ConnsOutbound: rcmgr.Unlimited, - ConnsInbound: rcmgr.Unlimited, - FD: rcmgr.Unlimited, - Memory: rcmgr.LimitVal64(rcmgr.Unlimited), + // Bound connections to connection manager high water mark + buffer + // This ensures peer scoring state is bounded + Conns: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), // Small buffer for transient connections + ConnsOutbound: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), + ConnsInbound: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), + FD: rcmgr.Unlimited, + Memory: rcmgr.LimitVal64(rcmgr.Unlimited), }, Transient: rcmgr.ResourceLimits{ StreamsOutbound: rcmgr.Unlimited, StreamsInbound: rcmgr.Unlimited, Streams: rcmgr.Unlimited, - Conns: rcmgr.Unlimited, - ConnsOutbound: rcmgr.Unlimited, - ConnsInbound: rcmgr.Unlimited, + Conns: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), + ConnsOutbound: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), + ConnsInbound: rcmgr.LimitVal(cfg.ConnManagerHighWater + 100), FD: rcmgr.Unlimited, Memory: rcmgr.LimitVal64(rcmgr.Unlimited), }, @@ -80,6 +94,7 @@ func NewBootstrapNode(ctx context.Context, port int, cfg config.Config) (*Bootst limiter := rcmgr.NewFixedLimiter(limitsCfg.Build(scalingLimits.AutoScale())) rscMgr, err := rcmgr.NewResourceManager(limiter, rcmgr.WithMetricsDisabled()) if err != nil { + cancel() return nil, fmt.Errorf("failed to create resource manager: %w", err) } @@ -90,10 +105,12 @@ func NewBootstrapNode(ctx context.Context, port int, cfg config.Config) (*Bootst connmgr.WithGracePeriod(time.Minute), ) if err != nil { + cancel() return nil, fmt.Errorf("failed to create connection manager: %w", err) } var kadDHT *dht.IpfsDHT + var notificationBundle *network.NotifyBundle // Create the libp2p host options opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)), @@ -101,7 +118,7 @@ func NewBootstrapNode(ctx context.Context, port int, cfg config.Config) (*Bootst libp2p.ResourceManager(rscMgr), libp2p.ConnectionManager(connMgr), libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - kadDHT, err = dht.New(ctx, h, dht.Mode(dht.ModeServer)) + kadDHT, err = dht.New(hostCtx, h, dht.Mode(dht.ModeServer)) return kadDHT, err }), libp2p.EnableRelayService(), @@ -126,40 +143,127 @@ func NewBootstrapNode(ctx context.Context, port int, cfg config.Config) (*Bootst // Create the libp2p host with the DHT in server mode. h, err := libp2p.New(opts...) if err != nil { + cancel() return nil, fmt.Errorf("failed to create libp2p host: %w", err) } // Get standardized gossipsub parameters for consistency across network gossipParams, peerScoreParams, peerScoreThresholds, paramHash := gossipconfig.ConfigureSnapshotSubmissionsMesh(h.ID()) - - // Create a new GossipSub instance with standardized parameters - _, err = pubsub.NewGossipSub(ctx, h, + + // Create GossipSub with peer scoring for DDoS protection + // Peer scoring is bounded by resource manager connection limits (matching connection manager) + // Connection limits ensure peer score state cannot grow unbounded + gs, err := pubsub.NewGossipSub(hostCtx, h, pubsub.WithGossipSubParams(*gossipParams), pubsub.WithPeerScore(peerScoreParams, peerScoreThresholds), pubsub.WithFloodPublish(true), pubsub.WithMessageSignaturePolicy(pubsub.StrictSign), ) if err != nil { + cancel() return nil, fmt.Errorf("failed to create pubsub: %w", err) } log.Infof("🔑 Gossipsub parameter hash: %s (bootstrap node)", paramHash) - log.Infof("Libp2p host created with ID: %s, listening on: %v", h.ID(), h.Addrs()) log.Infof("Bootstrap node DHT routing table size: %d", kadDHT.RoutingTable().Size()) log.Infof("Bootstrap node created with ID: %s, listening on: %v", h.ID(), h.Addrs()) - h.Network().Notify(&network.NotifyBundle{ + // Store notification bundle for cleanup + notificationBundle = &network.NotifyBundle{ ConnectedF: func(_ network.Network, conn network.Conn) { log.Infof("Bootstrap Peer connected: %s, Addr: %s", conn.RemotePeer(), conn.RemoteMultiaddr()) }, DisconnectedF: func(_ network.Network, conn network.Conn) { log.Infof("Bootstrap Peer disconnected: %s, Addr: %s", conn.RemotePeer(), conn.RemoteMultiaddr()) }, - }) + } + h.Network().Notify(notificationBundle) + + node := &BootstrapNode{ + Host: h, + Pubsub: gs, + DHT: kadDHT, + notificationBundle: notificationBundle, + ctx: hostCtx, + cancel: cancel, + } + + go node.startPeerstoreGC() + + return node, nil +} - return &BootstrapNode{ - Host: h, - DHT: kadDHT, - }, nil +// Close closes the bootstrap node and releases all resources +func (n *BootstrapNode) Close() error { + var errs []error + + // Cancel context first to stop all background operations + if n.cancel != nil { + n.cancel() + } + + // Unregister notification bundle to prevent memory leaks + if n.notificationBundle != nil && n.Host != nil { + n.Host.Network().StopNotify(n.notificationBundle) + } + + // Close DHT to release routing table and provider storage + if n.DHT != nil { + if err := n.DHT.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close DHT: %w", err)) + } + } + + // Close the host (this will close all connections and clean up resources) + if n.Host != nil { + if err := n.Host.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close host: %w", err)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("errors during shutdown: %v", errs) + } + return nil +} + +// startPeerstoreGC periodically removes stale peers from the peerstore. +// Disconnected peers are cleaned immediately: ClearAddrs is called first +// (RemovePeer does not clear addresses per the libp2p interface contract), +// then RemovePeer removes keybook/protobook/metadata entries. +func (n *BootstrapNode) startPeerstoreGC() { + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + for { + select { + case <-n.ctx.Done(): + return + case <-ticker.C: + peers := n.Host.Peerstore().Peers() + connectedPeers := n.Host.Network().Peers() + connectedSet := make(map[peer.ID]struct{}, len(connectedPeers)) + for _, p := range connectedPeers { + connectedSet[p] = struct{}{} + } + removed := 0 + for _, p := range peers { + if p == n.Host.ID() { + continue + } + if _, connected := connectedSet[p]; connected { + continue + } + n.Host.Peerstore().ClearAddrs(p) + n.Host.Peerstore().RemovePeer(p) + removed++ + } + remaining := len(n.Host.Peerstore().Peers()) + if removed > 0 { + log.Infof("Peerstore GC: removed %d stale peers, %d remaining (connected: %d)", removed, remaining, len(connectedPeers)) + } else { + log.Debugf("Peerstore GC: no stale peers removed, %d in store (connected: %d)", remaining, len(connectedPeers)) + } + } + } } diff --git a/start.sh b/start.sh index 85e97d9..1ab162d 100755 --- a/start.sh +++ b/start.sh @@ -1,11 +1,50 @@ #!/bin/bash -# If a port is supplied as the first argument, export it so that it overrides -# the value in .env. Otherwise, rely entirely on Docker Compose’s .env handling. -if [ -n "$1" ]; then - export BOOTSTRAP_PORT="$1" +# Detect docker compose command (docker compose plugin vs docker-compose standalone) +if docker compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker compose" +elif docker-compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker-compose" +else + echo "Error: Neither 'docker compose' nor 'docker-compose' found. Please install Docker Compose." + exit 1 fi -echo "Starting bootstrap node (port: ${BOOTSTRAP_PORT:-})" +# Parse arguments +BUILD_IMAGE=false +PORT_ARG="" -docker-compose up +while [ $# -gt 0 ]; do + case "$1" in + --build) + BUILD_IMAGE=true + shift + ;; + *) + # Treat any non-flag argument as a port number + PORT_ARG="$1" + shift + ;; + esac +done + +# Build image if requested +if [ "$BUILD_IMAGE" = true ]; then + echo "Building Docker image..." + $DOCKER_COMPOSE build + if [ $? -ne 0 ]; then + echo "Error: Docker build failed" + exit 1 + fi + echo "Build complete" +fi + +# Trap SIGINT and SIGTERM to ensure proper cleanup +trap "echo 'Stopping bootstrap node...'; $DOCKER_COMPOSE down; exit 0" SIGINT SIGTERM + +# If a port is supplied, export it so that it overrides the value in .env +if [ -n "$PORT_ARG" ]; then + export BOOTSTRAP_PORT="$PORT_ARG" +fi + +$DOCKER_COMPOSE up diff --git a/stop.sh b/stop.sh index f163d8d..250b830 100755 --- a/stop.sh +++ b/stop.sh @@ -1,3 +1,13 @@ #!/bin/bash -docker-compose down +# Detect docker compose command (docker compose plugin vs docker-compose standalone) +if docker compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker compose" +elif docker-compose version >/dev/null 2>&1; then + DOCKER_COMPOSE="docker-compose" +else + echo "Error: Neither 'docker compose' nor 'docker-compose' found. Please install Docker Compose." + exit 1 +fi + +$DOCKER_COMPOSE down