This system is a distributed peer-to-peer publish/subscribe network built on libp2p, designed for secure, decentralized messaging in DePIN (Decentralized Physical Infrastructure Network) applications. The system provides permissionless node participation through Solana smart contract registry integration, where nodes are authorized via on-chain wallet verification.
- Distributed Pub/Sub: No central server; all nodes are equal peers
- DePIN-Optimized: Designed for decentralized physical infrastructure networks
- Permissionless Access: Nodes automatically authorized via Solana smart contract registry
- Security-First: Solana wallet-based identity with smart contract gating
- Bootstrap Discovery: New nodes join using DHT and registry-based peer discovery
- Namespace Isolation: Single database name acts as namespace for topic keys
- Real-time Messaging: Publish and subscribe to arbitrary topics
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Go Library Interface │ │
│ │ (Subscribe/Publish) │ │
│ └─────────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ P2P Pub/Sub Layer │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ Topic Manager │ │ Event Routing │ │
│ │ (Pub/Sub) │ │ (Message Distribution) │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Security Layer │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │Registry Gater │ │ Solana Identity │ │
│ │(Smart Contract) │ │ (Ed25519 Keys) │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ libp2p Networking Layer │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │ GossipSub │ │ Kademlia DHT │ │ Connection │ │
│ │ (Pub/Sub) │ │ (Discovery) │ │ Manager │ │
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Transport Layer │
│ ┌─────────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ QUIC │ │ TCP │ │ Circuit │ │
│ │ (Primary) │ │ (Fallback) │ │ (Relay) │ │
│ └─────────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
- Identity Source: Solana wallet keypairs (Ed25519)
- Key Conversion: Solana private key → Ed25519 libp2p key
- Peer ID Generation: Deterministic from public key
- Format: Base58-encoded private keys for external storage
type Config struct {
WalletPrivateKey string // Base58-encoded Solana private key
DatabaseName string // Namespace for topics
GetAuthorizedWallets GetAuthorizedWalletsFunc // Function to get authorized wallets
GetBootstrapNodes GetBootstrapNodesFunc // Function to get bootstrap nodes
Logger Logger // Logger for all internal operations
ListenPorts ListenPorts // Ports for different transports
}
// Function provided by node software to get authorized wallets
type GetAuthorizedWalletsFunc func(ctx context.Context) ([]solana.PublicKey, error)
// Function provided by node software to get bootstrap nodes
type GetBootstrapNodesFunc func(ctx context.Context) ([]BootstrapNode, error)
// Bootstrap node information
type BootstrapNode struct {
PublicKey solana.PublicKey // Node's Solana public key
IP string // IP address
QUICPort int // QUIC port
TCPPort int // TCP port
}
// Listen ports for different transports
type ListenPorts struct {
QUIC int // QUIC listen port (default: 4001)
TCP int // TCP listen port (default: 4002)
}type SolanaRegistryGater struct {
getAuthorizedWallets GetAuthorizedWalletsFunc // Function provided by node software
logger Logger // Logger instance
cache *AuthorizationCache // Authorized wallets cache
refreshInterval time.Duration // How often to refresh authorized wallets list
ctx context.Context // Context for the refresh goroutine
cancel context.CancelFunc // Cancel function for the refresh goroutine
wg sync.WaitGroup // Wait group for the refresh goroutine
}
// Authorization cache stores only authorized wallets for performance
type AuthorizationCache struct {
authorizedWallets sync.Map // solana.PublicKey -> bool (only true values stored)
mutex sync.RWMutex // Protects cache operations
}Network-Level Security Functions:
InterceptPeerDial(): Validates outgoing connections against cache/registryInterceptAddrDial(): Validates specific address connectionsInterceptSecured(): Post-handshake authorization via cache/registry lookupcheckWalletInCache(): Core validation with cache fallback to registryrefreshAuthorizedWallets(): Background process that refreshes cache every 30 seconds
Connection Authorization Flow:
- Node Attempts Connection → Gater checks wallet against cache first
- If in Cache → Connection allowed immediately (performance optimization)
- If Not in Cache → Check registry directly (fallback for new wallets)
- If Authorized in Registry → Add to cache and allow connection
- If Unauthorized → Connection blocked at transport level
- Background Refresh → Cache updated every 30 seconds (configurable)
Cache Strategy:
- Performance: Cache stores only authorized wallets (no TTL needed)
- Freshness: Background refresh every 30 seconds ensures up-to-date authorization
- Fallback: Direct registry check for wallets not in cache (handles new additions)
- Simplicity: No complex TTL or eviction logic needed
Registry Integration (Handled by Node Software):
// Example: Node software provides this function to pubsub library
func (node *DePINNode) GetAuthorizedWallets(ctx context.Context) ([]solana.PublicKey, error) {
// Node software handles:
// - Smart contract RPC calls
// - Caching and refresh logic
// - Error handling and retries
// - Multiple RPC endpoint failover
node.registryMutex.RLock()
defer node.registryMutex.RUnlock()
// Return cached authorized wallets from node's internal registry
return node.cachedAuthorizedWallets, nil
}
// Pubsub gater checks cache first, then falls back to registry
func (g *SolanaRegistryGater) checkWalletInCache(ctx context.Context, wallet solana.PublicKey) (bool, error) {
// Check cache first for performance
if _, found := g.cache.authorizedWallets.Load(wallet); found {
return true, nil
}
// Fallback to registry for new wallets
authorizedWallets, err := g.getAuthorizedWallets(ctx)
if err != nil {
return false, err
}
// Check if wallet is authorized
for _, authorized := range authorizedWallets {
if wallet.Equals(authorized) {
// Add to cache for future lookups
g.cache.authorizedWallets.Store(wallet, true)
return true, nil
}
}
return false, nil
}- Multiple Transports: QUIC (primary), TCP (fallback), Circuit Relay (NAT assistance)
- Connection Manager: Automatic with connection limits and peer tagging
- Datacenter Optimization: Connection limits optimized for datacenter infrastructure
- NAT Traversal: QUIC hole punching and Circuit Relay
Three-Transport Architecture for Universal Connectivity:
// Transport priority for DePIN nodes (enabled by default)
transports := []string{
"QUIC", // Primary: UDP-based, built-in encryption, connection migration
"TCP", // Fallback: Reliable for stable connections
"Circuit", // NAT assistance: Relay through other nodes when direct connection fails
}1. QUIC Transport (Primary)
- Multiaddr Format:
/ip4/<ip>/udp/<port>/quic-v1 - Benefits for Datacenter DePIN:
- Built-in TLS 1.3 encryption
- Reduced connection setup time
- No head-of-line blocking
- NAT hole punching capabilities
2. TCP Transport (Fallback)
- Multiaddr Format:
/ip4/<ip>/tcp/<port> - Benefits for Datacenter DePIN:
- Universal compatibility
- Reliable for stable connections
- Excellent for datacenter infrastructure
- Works through most firewalls
3. Circuit Relay (NAT Assistance)
- Multiaddr Format:
/ip4/<relay-ip>/tcp/<port>/p2p/<relay-peer-id>/p2p-circuit - Benefits for Datacenter DePIN:
- Connects nodes that can't establish direct connections
- Helps with network bootstrapping
- Provides connectivity fallback for complex network topologies
- Registry-Based Discovery: Query smart contract for active nodes
- DHT Discovery: Kademlia-based peer finding with rendezvous
- Gossip Discovery: Peer discovery through topic participation
- Bootstrap Redundancy: Multiple fallback discovery mechanisms
// DePIN discovery process
rendezvous := DiscoveryTag + "_" + db.Name // "p2p-database-discovery_<dbname>"
routingDiscovery := routing.NewRoutingDiscovery(globalDHT)
// Advertise this node to the network
util.Advertise(ctx, routingDiscovery, rendezvous)
// Find peers through DHT discovery
util.FindPeers(ctx, routingDiscovery, rendezvous)
// Note: Authorization happens at connection time via gater- Protocol: libp2p GossipSub for reliable message propagation
- Topic Format:
<database_name>_<user_topic> - Message Structure: JSON-encoded Event objects
- Deduplication: Built-in message ID tracking
- DePIN Optimizations: Bandwidth-aware routing, adaptive mesh overlay
type Event struct {
ID string // UUID for deduplication
FromPeerId string // Sender's peer ID
Message interface{} // Arbitrary message payload
Timestamp int64 // Unix timestamp
}type TopicSubscription struct {
subscription *pubsub.Subscription // libp2p subscription
topic *pubsub.Topic // Topic handle
handler PubSubHandler // User callback function
}Topic Lifecycle:
- Join:
pubSub.Join(db.Name + "_" + topic) - Subscribe:
topic.Subscribe()+ event listener goroutine - Publish: JSON marshal +
topic.Publish() - Leave:
subscription.Cancel()+topic.Close()
// Shared libp2p infrastructure managed by process-level manager
type P2PInfrastructure struct {
host host.Host // Single libp2p host for all databases
dht *dual.DHT // Kademlia DHT
gossipSub *pubsub.PubSub // GossipSub instance
connManager connmgr.ConnManager // Connection manager
// Database-specific resources
databases map[string]*DatabaseInstance // Active database instances
mutex sync.RWMutex // Protects databases map
}
// Per-database instance with isolated topics
type DatabaseInstance struct {
name string // Database name
gater *SolanaRegistryGater // Registry-based connection gater
topics map[string]*pubsub.Topic // Joined topics
subscriptions map[string]*TopicSubscription // Active subscriptions
mutex sync.RWMutex // Protects topics/subscriptions
}Process-Level Initialization (Once):
func initializeP2PInfrastructure(config Config) (*P2PInfrastructure, error) {
// 1. Create libp2p host with all transports
host, err := libp2p.New(
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.ListenPorts.QUIC),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", config.ListenPorts.TCP),
),
libp2p.Transport(quic.NewTransport),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.EnableCircuitRelay(),
libp2p.ConnectionManager(connManager),
libp2p.NATPortMap(),
)
// 2. Initialize DHT for peer discovery
dht, err := dual.New(ctx, host)
// 3. Initialize GossipSub for pub/sub messaging
gossipSub, err := pubsub.NewGossipSub(ctx, host)
// 4. Bootstrap connection to network
return connectToNetwork(host, dht, config.GetBootstrapNodes)
}Database-Level Initialization (Per Database):
func createDatabaseInstance(infra *P2PInfrastructure, config Config) (*DB, error) {
// 1. Create registry-based connection gater
gater := &SolanaRegistryGater{
getAuthorizedWallets: config.GetAuthorizedWallets,
logger: config.Logger,
cache: newAuthorizationCache(),
}
// 2. Apply connection gating to host
infra.host.Network().Notify(gater)
// 3. Create database instance
dbInstance := &DatabaseInstance{
name: config.DatabaseName,
gater: gater,
topics: make(map[string]*pubsub.Topic),
subscriptions: make(map[string]*TopicSubscription),
}
return &DB{infrastructure: infra, instance: dbInstance}, nil
}┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │ │ Topic │ │ GossipSub │ │ Network │
│ │ │ Manager │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
│ Publish(topic, │ │ │
│ message) │ │ │
│──────────────────▶│ │ │
│ │ │ │
│ │ Join topic if │ │
│ │ not exists │ │
│ │──────────────────▶│ │
│ │ │ │
│ │ Create Event{ │ │
│ │ ID: uuid, │ │
│ │ FromPeerId, │ │
│ │ Message, │ │
│ │ Timestamp } │ │
│ │ │ │
│ │ JSON.Marshal │ │
│ │ (Event) │ │
│ │ │ │
│ │ topic.Publish() │ │
│ │──────────────────▶│ │
│ │ │ │
│ │ │ Gossip to │
│ │ │ authorized peers │
│ │ │──────────────────▶│
│ │ │ │
│ Return Event{ID} │ │ │
│◀──────────────────│ │ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │ │ Topic │ │ Event │ │ Message │
│ │ │ Manager │ │ Listener │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
│ Subscribe(topic, │ │ │
│ handler) │ │ │
│──────────────────▶│ │ │
│ │ │ │
│ │ Join topic │ │
│ │ topic.Subscribe() │ │
│ │ │ │
│ │ Start event │ │
│ │ listener goroutine│ │
│ │──────────────────▶│ │
│ │ │ │
│ │ │ Listen loop: │
│ │ │ subscription. │
│ │ │ Next() │
│ │ │ │
│ │ │ JSON.Unmarshal │
│ │ │ → Event │
│ │ │ │
│ │ │ handler(Event) │
│ │ │──────────────────▶│
│ │ │ │
│ Registration │ │ │
│ successful │ │ │
│◀──────────────────│ │ │
Note: Authorization happens at connection time via the connection gater. Once nodes are connected to the network, they can freely subscribe to and publish on any topics.
- Smart Contract Level: On-chain wallet registry provides Sybil resistance
- Network Level: Connection gater blocks unauthorized peers at connection time
- Namespace Level: Database name provides network isolation (different DePIN networks)
- Application Level: Custom message validation (if needed)
Important: Once a node passes connection gating (wallet in registry), it can access all topics within that database namespace. Topic-level access control is not implemented at the pub/sub layer.
- Economic Attacks: Smart contract registry prevents low-cost identity creation
- Malicious Hardware: Wallet-based identity tied to physical device ownership
- Network Partitioning: DHT and GossipSub provide resilience
- Resource Exhaustion: Connection limits protect against DoS
- Identity Spoofing: Cryptographic peer IDs prevent impersonation
- Infrastructure Attacks: Network topology monitoring for datacenter infrastructure
- Permissionless Operation: New nodes automatically authorized via smart contract
- Dynamic Authorization: Real-time registry updates without node restarts
- Decentralized Control: No single point of failure for node authorization
- Audit Trail: On-chain record of all authorization changes
- Authorization Point: Connection gater validates wallet during connection handshake
- One-Time Check: Once connected, nodes have access to all topics in database namespace
- No Per-Topic Gating: Topic subscription/publishing is unrestricted for connected nodes
- Network Isolation: Different database names create separate networks with separate authorization
// Connection flow
1. Node A attempts to connect to Node B
2. Connection gater checks Node A's wallet against registry
3. If authorized: Connection established, Node A can use any topics
4. If unauthorized: Connection rejected at transport level- Bandwidth Adaptation: Message routing based on connection quality
- Datacenter Distribution: Latency-based peer prioritization across datacenters
The system implements robust bootstrap retry mechanisms to ensure network connectivity in dynamic DePIN environments where nodes may start in any order and network conditions may be unstable.
type P2PInfrastructure struct {
// Readiness and bootstrap retry state
isReady bool // True when node has at least 1 peer connected
readinessMutex sync.RWMutex // Protects isReady
getBootstrapNodes common.GetBootstrapNodesFunc // Bootstrap function for retry attempts
retryCancel context.CancelFunc // Cancel function for bootstrap retry goroutine
retryMutex sync.Mutex // Protects retryCancel
// Shutdown handling
shutdownCtx context.Context // Context that gets cancelled during shutdown
shutdownCancel context.CancelFunc // Cancel function for shutdown
}- Initial State:
isReady = false(no peers connected) - Peer Connected:
isReady = true(at least 1 peer connected) - All Peers Lost:
isReady = false→ triggers bootstrap retry - Peer Reconnected:
isReady = true→ stops bootstrap retry
func bootstrapNetworkWithRetry(ctx context.Context, infra *P2PInfrastructure) error {
// Check bootstrap nodes availability first
bootstrapNodes, err := infra.getBootstrapNodes(ctx)
if err != nil {
return fmt.Errorf("failed to get bootstrap nodes: %w", err)
}
if len(bootstrapNodes) == 0 {
// No bootstrap nodes available, start retry process
infra.logger.Info("No bootstrap nodes available, starting retry process")
infra.startBootstrapRetry(infra.shutdownCtx)
return nil // Don't return error, we'll retry
}
// Bootstrap nodes are available, try normal bootstrap
err = bootstrapNetwork(ctx, infra.host, infra.dht, infra.getBootstrapNodes, infra.logger)
if err != nil {
// Bootstrap failed, start retry process to keep trying
infra.logger.Warn("Initial bootstrap failed, starting retry process", "error", err.Error())
infra.startBootstrapRetry(infra.shutdownCtx)
return nil // Don't return error, we'll retry
}
// Bootstrap succeeded
infra.logger.Info("Initial bootstrap successful")
return nil
}func (infra *P2PInfrastructure) startBootstrapRetry(parentCtx context.Context) {
// Check if shutdown is in progress
select {
case <-infra.shutdownCtx.Done():
infra.logger.Debug("Shutdown in progress, not starting bootstrap retry")
return
default:
}
// Check if retry is already running (with proper synchronization)
infra.retryMutex.Lock()
if infra.retryCancel != nil {
infra.retryMutex.Unlock()
infra.logger.Debug("Bootstrap retry already running, not starting another")
return
}
// Create cancelable context for the retry goroutine
retryCtx, cancel := context.WithCancel(parentCtx)
infra.retryCancel = cancel
infra.retryMutex.Unlock()
go func() {
defer func() {
cancel()
// Clear the cancel function when goroutine exits
infra.retryMutex.Lock()
infra.retryCancel = nil
infra.retryMutex.Unlock()
}()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
infra.logger.Info("Starting bootstrap retry loop (every 5 seconds)")
for {
select {
case <-retryCtx.Done():
infra.logger.Debug("Bootstrap retry loop cancelled")
return
case <-infra.shutdownCtx.Done():
infra.logger.Debug("Bootstrap retry loop cancelled due to shutdown")
return
case <-ticker.C:
// Check if node is already ready (has peers)
if infra.IsReady() {
infra.logger.Info("Node is ready, stopping bootstrap retry")
return
}
// Try to get bootstrap nodes
bootstrapNodes, err := infra.getBootstrapNodes(retryCtx)
if err != nil {
infra.logger.Warn("Failed to get bootstrap nodes during retry", "error", err.Error())
continue
}
if len(bootstrapNodes) == 0 {
infra.logger.Debug("Still no bootstrap nodes available, will retry")
continue
}
// Found bootstrap nodes, try to connect
infra.logger.Info("Found bootstrap nodes during retry", "count", len(bootstrapNodes))
if err := bootstrapNetwork(retryCtx, infra.host, infra.dht, infra.getBootstrapNodes, infra.logger); err != nil {
infra.logger.Warn("Bootstrap retry failed", "error", err.Error())
continue
}
// Bootstrap succeeded
infra.logger.Info("Bootstrap retry successful")
time.Sleep(1 * time.Second) // Give DHT a moment to settle
return
}
}
}()
}Time 0s: Client starts, tries to connect to bootstrap → FAILS
Time 0s: Client starts bootstrap retry loop
Time 5s: Client retries bootstrap → FAILS (bootstrap not running)
Time 10s: Bootstrap node starts
Time 15s: Client retries bootstrap → SUCCESS
Time 15s: Client becomes ready, stops retry loop
Time 0s: Node0 (bootstrap) + Node1 start → SUCCESS
Time 30s: Node0 goes down → Node1 loses all peers
Time 30s: Node1 becomes not ready, starts bootstrap retry
Time 35s: Node2 starts as new bootstrap
Time 40s: Node1 retries, connects to Node2 → SUCCESS
Time 40s: Node1 becomes ready, stops retry loop
Time 0s: All nodes connected and communicating
Time 30s: Network partition occurs → all nodes lose peers
Time 30s: All nodes become not ready, start bootstrap retry
Time 35s: Network partition resolves
Time 40s: Nodes retry bootstrap → SUCCESS
Time 40s: All nodes become ready, stop retry loops
- Retry Goroutine: Started when node becomes not ready
- Automatic Cleanup: Goroutine stops when node becomes ready
- Shutdown Safety: All retry operations cancelled during shutdown
- Race Condition Protection: Mutex-protected retry state management
// Shutdown context for proper cleanup
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
// Retry context combines parent and shutdown contexts
retryCtx, cancel := context.WithCancel(parentCtx)
// Cleanup on disconnect
func (db *DB) Disconnect(ctx context.Context) error {
// Cancel shutdown context first to stop all retry operations immediately
if db.infrastructure.shutdownCancel != nil {
db.infrastructure.shutdownCancel()
}
// Stop bootstrap retry
db.infrastructure.stopBootstrapRetry()
// ... rest of cleanup
}- Initial Retry Delay: 2 seconds (avoid immediate reconnection attempts)
- Retry Interval: 5 seconds (balance between responsiveness and resource usage)
- Connection Timeout: 30 seconds per bootstrap attempt
- DHT Settlement Time: 1 second after successful bootstrap
- Memory: Minimal (single goroutine per node)
- CPU: Low (5-second intervals)
- Network: Only during retry attempts
- Concurrent Retries: Prevented (only one retry goroutine per node)
// Registry can return different bootstrap nodes over time
func (node *DePINNode) GetBootstrapNodes(ctx context.Context) ([]common.BootstrapNode, error) {
// Query smart contract for current bootstrap nodes
// This can change as nodes join/leave the network
return node.registry.GetBootstrapNodes(ctx)
}// Multiple bootstrap nodes for redundancy
bootstrapNodes := []common.BootstrapNode{
{PublicKey: node0Key, IP: "primary.example.com", QUICPort: 4001, TCPPort: 4002},
{PublicKey: node1Key, IP: "backup.example.com", QUICPort: 4001, TCPPort: 4002},
{PublicKey: node2Key, IP: "secondary.example.com", QUICPort: 4001, TCPPort: 4002},
}- Node Readiness:
isReadyboolean state - Peer Count: Number of connected peers
- Bootstrap Retries: Count of retry attempts
- Retry Duration: Time from not ready to ready
- Bootstrap Success Rate: Successful vs failed attempts
// Readiness state changes
infra.logger.Info("Node became ready", "peer_count", peerCount)
infra.logger.Info("Node no longer ready", "peer_count", peerCount)
// Retry operations
infra.logger.Info("Starting bootstrap retry loop (every 5 seconds)")
infra.logger.Info("Found bootstrap nodes during retry", "count", len(bootstrapNodes))
infra.logger.Info("Bootstrap retry successful")
infra.logger.Info("Node is ready, stopping bootstrap retry")// go.mod - Go 1.23.11
module github.com/dTelecom/p2p-database
go 1.23.11
require (
github.com/libp2p/go-libp2p v0.41.1 // Latest libp2p networking
github.com/libp2p/go-libp2p-pubsub v0.12.0 // Latest GossipSub
github.com/libp2p/go-libp2p-kad-dht v0.27.0 // Latest Kademlia DHT
github.com/libp2p/go-libp2p-quic-transport v0.10.0 // QUIC transport (primary)
github.com/libp2p/go-libp2p-circuit v0.20.0 // Circuit relay transport
github.com/gagliardetto/solana-go v1.12.0 // Solana integration
github.com/multiformats/go-multiaddr v0.12.0 // Multiaddress support
github.com/google/uuid v1.5.0 // Event ID generation
)- Startup Performance: Fast initialization optimized for DePIN deployment
- Network Efficiency: Efficient pub/sub routing with multi-transport optimization
The pubsub library uses a simple logging interface to remain logging-library agnostic:
package common
import "log/slog"
// Logger interface for structured logging
type Logger interface {
Debug(msg string, keysAndValues ...interface{})
Info(msg string, keysAndValues ...interface{})
Warn(msg string, keysAndValues ...interface{})
Error(msg string, keysAndValues ...interface{})
}
// NewSlogLogger creates a logger using Go's standard slog (recommended for Go 1.23.11+)
func NewSlogLogger(logger *slog.Logger) Logger {
return &slogLogger{logger: logger}
}
type slogLogger struct {
logger *slog.Logger
}
func (l *slogLogger) Debug(msg string, keysAndValues ...interface{}) {
l.logger.Debug(msg, keysAndValues...)
}
func (l *slogLogger) Info(msg string, keysAndValues ...interface{}) {
l.logger.Info(msg, keysAndValues...)
}
func (l *slogLogger) Warn(msg string, keysAndValues ...interface{}) {
l.logger.Warn(msg, keysAndValues...)
}
func (l *slogLogger) Error(msg string, keysAndValues ...interface{}) {
l.logger.Error(msg, keysAndValues...)
}import (
"log/slog"
"os"
)
// Create structured JSON logger
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
AddSource: true, // Add source file information
}))
// Create pubsub logger wrapper
pubsubLogger := common.NewSlogLogger(logger)
// Connect with logger in config
config := Config{
WalletPrivateKey: "your-wallet-key",
DatabaseName: "depin-network",
GetAuthorizedWallets: node.GetAuthorizedWallets,
GetBootstrapNodes: node.GetBootstrapNodes,
Logger: pubsubLogger,
ListenPorts: ListenPorts{
QUIC: 4001,
TCP: 4002,
},
}
db, err := Connect(context.Background(), config)The interface can be implemented with other popular logging libraries:
// Using zap
import "go.uber.org/zap"
type ZapLogger struct {
logger *zap.SugaredLogger
}
func (l *ZapLogger) Info(msg string, keysAndValues ...interface{}) {
l.logger.Infow(msg, keysAndValues...)
}
// Using logrus
import "github.com/sirupsen/logrus"
type LogrusLogger struct {
logger *logrus.Logger
}
func (l *LogrusLogger) Info(msg string, keysAndValues ...interface{}) {
l.logger.WithFields(logrus.Fields{
// Convert keysAndValues to fields
}).Info(msg)
}For projects using LiveKit loggers with methods like Debugw, Infow, Warnw, Errorw, use the built-in adapter:
import "github.com/livekit/protocol/logger"
// Assuming you have a LiveKit logger instance
livekitLogger := logger.GetLogger() // or your LiveKit logger instance
// Create adapter
pubsubLogger := common.NewLivekitLoggerAdapter(livekitLogger)
// Use in config
config := Config{
WalletPrivateKey: "your-wallet-key",
DatabaseName: "depin-network",
GetAuthorizedWallets: node.GetAuthorizedWallets,
GetBootstrapNodes: node.GetBootstrapNodes,
Logger: pubsubLogger,
ListenPorts: ListenPorts{
QUIC: 4001,
TCP: 4002,
},
}The LiveKitLogger interface required by NewLivekitLoggerAdapter is:
type LiveKitLogger interface {
Debugw(msg string, keysAndValues ...interface{})
Infow(msg string, keysAndValues ...interface{})
Warnw(msg string, err error, keysAndValues ...interface{})
Errorw(msg string, err error, keysAndValues ...interface{})
}Recommendation: Use Go's standard slog package for new projects as it provides excellent performance, structured logging, and is part of the standard library since Go 1.21.
Note: Smart contract resilience (RPC failover, caching, retry logic) is handled by the node software, not the pubsub library.
// Test with mock smart contract registry
func TestDePINNetworkWithRegistry(t *testing.T) {
// Deploy mock registry smart contract
mockRegistry := NewMockRegistryContract()
// Add test wallets to registry
testWallets := generateTestWallets(10)
for _, wallet := range testWallets {
mockRegistry.AddAuthorizedWallet(wallet)
}
// Start test nodes with registry integration
nodes := startTestNodes(testWallets, mockRegistry.RPCEndpoint())
// Test pub/sub functionality
testPubSubAcrossNodes(nodes)
// Test unauthorized wallet rejection
testUnauthorizedWalletBlocking(nodes, generateTestWallets(1))
}This updated P2P distributed pub/sub system is specifically optimized for DePIN (Decentralized Physical Infrastructure Network) applications. Key improvements include:
- Clean Separation of Concerns: Pubsub library focuses on messaging; node software handles registry and bootstrap management
- Function-Based Integration: Simple function interfaces for authorization and bootstrap discovery without RPC complexity
- Three-Transport Architecture: QUIC (primary), TCP (fallback), and Circuit Relay (NAT assistance) for universal connectivity
- Process-Level Resource Sharing: Efficient infrastructure sharing across multiple database instances
- Connection-Level Security: Registry-based authorization at connection time, not per-topic
- Latest Technology Stack: Go 1.23.11, libp2p v0.41.1, solana-go v1.12.0
- Permissionless Participation: Nodes automatically authorized via smart contract (handled by node software)
- Simple Integration: Pubsub library receives authorization via function call, no RPC complexity
- Universal Connectivity: Multi-transport strategy (QUIC + TCP + Circuit Relay) ensures nodes can connect regardless of NAT/firewall restrictions
- Economic Integration: Registry tied to DePIN tokenomics and staking mechanisms
- Infrastructure Resilience: Multi-region datacenter support with adaptive transport selection
- Scalability: Support for thousands of datacenter nodes with optimized connectivity
- Sybil Resistance: Smart contract registry prevents cheap identity creation
- Datacenter Distribution: Built-in monitoring and optimization across datacenter infrastructure
- Economic Security: Integration with DePIN reward/penalty mechanisms
- Infrastructure Protection: Resource limits and reputation-based peer management
This architecture provides a robust foundation for DePIN applications requiring secure, scalable, and economically-integrated peer-to-peer communication infrastructure. The clean separation of concerns ensures the pubsub library remains focused on messaging performance while the node software handles all smart contract complexity, enabling truly permissionless and decentralized operation.