From ef7d6c24f75928af738977e41e522e7cce1beb0d Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Sun, 29 Mar 2026 06:05:03 +0330 Subject: [PATCH 1/4] Update main.go --- cmd/centralserver/main.go | 100 ++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index a76f053..3f8a116 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -25,10 +25,16 @@ type sourceConn struct { writeMu sync.Mutex } +// writeTimeout prevents tunnel writes from blocking forever on congested TCP. +const sourceWriteTimeout = 10 * time.Second + func (sc *sourceConn) WriteFrame(f *tunnel.Frame) error { sc.writeMu.Lock() defer sc.writeMu.Unlock() - return tunnel.WriteFrame(sc.conn, f) + sc.conn.SetWriteDeadline(time.Now().Add(sourceWriteTimeout)) + err := tunnel.WriteFrame(sc.conn, f) + sc.conn.SetWriteDeadline(time.Time{}) + return err } // connState tracks a single reassembled connection. @@ -431,21 +437,29 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta state.mu.Lock() state.target = upConn - // Flush any data that arrived before upstream was ready + // Drain buffered data under lock, then write outside + var flushChunks [][]byte for { data := state.reorderer.Next() if data == nil { break } + flushChunks = append(flushChunks, data) + } + state.mu.Unlock() + + // Flush outside lock + for _, data := range flushChunks { + upConn.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) if _, err := upConn.Write(data); err != nil { - state.mu.Unlock() + upConn.SetWriteDeadline(time.Time{}) log.Printf("[central] conn=%d: flush failed: %v", connID, err) upConn.Close() cs.removeConn(connID) return } } - state.mu.Unlock() + upConn.SetWriteDeadline(time.Time{}) log.Printf("[central] conn=%d: upstream connected to %s", connID, targetAddr) @@ -502,8 +516,9 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 } // sendFrame picks ONE source via round-robin and writes the frame. -// If that source fails, tries the next one. Uses sourceConn.WriteFrame -// which is mutex-protected per TCP connection, preventing interleaved writes. +// CRITICAL: state.mu is only held briefly to pick the source and advance +// the index — the actual TCP write happens OUTSIDE the lock to prevent +// cascading lock contention that freezes frame dispatch for other ConnIDs. func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { cs.mu.RLock() state, ok := cs.conns[connID] @@ -512,25 +527,34 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { return } + // Snapshot sources under lock, then write outside lock state.mu.Lock() - defer state.mu.Unlock() - - if len(state.sources) == 0 { + n := len(state.sources) + if n == 0 { + state.mu.Unlock() return } + // Build ordered list starting from current sourceIdx + sources := make([]*sourceConn, n) + startIdx := state.sourceIdx % n + state.sourceIdx++ + for i := 0; i < n; i++ { + sources[i] = state.sources[(startIdx+i)%n] + } + state.mu.Unlock() - // Try each source once, starting from current index - for tries := 0; tries < len(state.sources); tries++ { - idx := state.sourceIdx % len(state.sources) - state.sourceIdx++ - sc := state.sources[idx] - + // Try each source — write happens outside state.mu + for _, sc := range sources { if err := sc.WriteFrame(frame); err != nil { - // Remove dead source - state.sources = append(state.sources[:idx], state.sources[idx+1:]...) - if state.sourceIdx > 0 { - state.sourceIdx-- + // Remove dead source under lock + state.mu.Lock() + for i, s := range state.sources { + if s == sc { + state.sources = append(state.sources[:i], state.sources[i+1:]...) + break + } } + state.mu.Unlock() continue } return // success @@ -538,6 +562,9 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { log.Printf("[central] conn=%d: all sources failed", connID) } +// upstreamWriteTimeout prevents writes to Xray upstream from blocking forever. +const upstreamWriteTimeout = 10 * time.Second + func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { cs.mu.RLock() state, ok := cs.conns[frame.ConnID] @@ -546,11 +573,10 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { return } + // Insert frame and collect ready data under lock (fast) state.mu.Lock() - defer state.mu.Unlock() // If this source isn't known yet (e.g., after tunnel recycling), add it. - // This ensures responses can flow back through the new connection. found := false for _, s := range state.sources { if s == source { @@ -564,20 +590,34 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { state.lastActive = time.Now() state.reorderer.Insert(frame.SeqNum, frame.Payload) + if state.target == nil { + state.mu.Unlock() return // buffered, flushed when upstream connects } + // Drain all ready data into a local slice, then release the lock + var chunks [][]byte for { data := state.reorderer.Next() if data == nil { break } - if _, err := state.target.Write(data); err != nil { + chunks = append(chunks, data) + } + target := state.target + state.mu.Unlock() + + // Write to upstream OUTSIDE state.mu — prevents blocking frame dispatch + for _, data := range chunks { + target.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) + if _, err := target.Write(data); err != nil { + target.SetWriteDeadline(time.Time{}) log.Printf("[central] conn=%d: write to upstream failed: %v", frame.ConnID, err) return } } + target.SetWriteDeadline(time.Time{}) } func (cs *centralServer) handleFIN(frame *tunnel.Frame) { @@ -587,20 +627,30 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) { if !ok { return } + + // Drain remaining data and grab target under lock, then close outside state.mu.Lock() + var chunks [][]byte if state.target != nil { for { data := state.reorderer.Next() if data == nil { break } - state.target.Write(data) + chunks = append(chunks, data) } - state.target.Close() } + target := state.target state.mu.Unlock() - // Remove from map and cancel context so relayUpstreamToTunnel exits cleanly + // Write and close outside the lock + if target != nil { + for _, data := range chunks { + target.Write(data) + } + target.Close() + } + cs.removeConn(frame.ConnID) log.Printf("[central] conn=%d: FIN received, cleaned up", frame.ConnID) } From 630a6f181c2108c79663a351fa543857fcf03110 Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Sun, 29 Mar 2026 06:05:47 +0330 Subject: [PATCH 2/4] fix: resolve cascading lock contention that freezes new connections after minutes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sendFrame held state.mu during sourceConn.WriteFrame (no timeout), and handleData held state.mu during target.Write. When tunnel TCP writes got slow, all frame dispatch for that tunnel froze — SYN frames couldn't be processed, so new connections failed while existing ones continued. - Add 10s write timeout to sourceConn.WriteFrame - Refactor sendFrame to pick source under lock, write outside lock - Refactor handleData/handleFIN to drain reorderer under lock, write outside - Add 10s write timeout for upstream (Xray) writes --- cmd/centralserver/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index 3f8a116..eced89f 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -39,12 +39,12 @@ func (sc *sourceConn) WriteFrame(f *tunnel.Frame) error { // connState tracks a single reassembled connection. type connState struct { - mu sync.Mutex - target net.Conn // connection to the SOCKS upstream - reorderer *tunnel.Reorderer - txSeq uint32 // next sequence number for reverse data - cancel context.CancelFunc - created time.Time + mu sync.Mutex + target net.Conn // connection to the SOCKS upstream + reorderer *tunnel.Reorderer + txSeq uint32 // next sequence number for reverse data + cancel context.CancelFunc + created time.Time lastActive time.Time // last time data was sent or received // Sources: all tunnel connections that can carry reverse data. @@ -61,7 +61,7 @@ type centralServer struct { conns map[uint32]*connState // ConnID → state // sourceMu protects the sources map (net.Conn → *sourceConn). - sourceMu sync.Mutex + sourceMu sync.Mutex sourceMap map[net.Conn]*sourceConn } @@ -696,7 +696,7 @@ func (cs *centralServer) cleanupLoop() { for id, state := range cs.conns { state.mu.Lock() shouldClean := false - + //cc // No upstream established after 60 seconds = stuck if state.target == nil && now.Sub(state.created) > 60*time.Second { shouldClean = true From ad7dc808600e73c2241deb7dd965b8d10ccd9973 Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Sun, 29 Mar 2026 06:31:26 +0330 Subject: [PATCH 3/4] fix: async upstream writes to prevent frame dispatch loop stall MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handleData wrote to upstream (Xray) synchronously inside the sequential frame dispatch loop. A slow upstream write blocked ALL frame processing on that tunnel — including SYN frames for new connections, causing new connections to fail while existing ones continued. Each connState now has a writeCh + upstreamWriter goroutine. handleData inserts into the reorderer and sends chunks to writeCh non-blocking, then returns immediately so the frame loop can process the next frame. --- cmd/centralserver/main.go | 109 +++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 30 deletions(-) diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index eced89f..99a82cc 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -51,6 +51,11 @@ type connState struct { // We round-robin responses across them (not broadcast). sources []*sourceConn sourceIdx int + + // Async write queue: handleData sends chunks here instead of writing + // to target synchronously (which would block the frame dispatch loop). + // The upstreamWriter goroutine drains this channel and writes to target. + writeCh chan []byte } // centralServer manages all active connections. @@ -434,10 +439,14 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta io.ReadFull(upConn, make([]byte, 6)) } + // Create async write channel and set target under lock + writeCh := make(chan []byte, 256) + state.mu.Lock() state.target = upConn + state.writeCh = writeCh - // Drain buffered data under lock, then write outside + // Drain any data that arrived before upstream was ready var flushChunks [][]byte for { data := state.reorderer.Next() @@ -448,18 +457,16 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta } state.mu.Unlock() - // Flush outside lock + // Start async writer goroutine — all writes to upstream go through writeCh + go cs.upstreamWriter(ctx, connID, upConn, writeCh) + + // Send flush data through the channel (channel is empty, won't block) for _, data := range flushChunks { - upConn.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) - if _, err := upConn.Write(data); err != nil { - upConn.SetWriteDeadline(time.Time{}) - log.Printf("[central] conn=%d: flush failed: %v", connID, err) - upConn.Close() - cs.removeConn(connID) - return + select { + case writeCh <- data: + default: } } - upConn.SetWriteDeadline(time.Time{}) log.Printf("[central] conn=%d: upstream connected to %s", connID, targetAddr) @@ -515,6 +522,45 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 } } +// upstreamWriter is a dedicated goroutine that drains writeCh and writes +// to the upstream (Xray) connection. This decouples upstream write speed +// from frame dispatch speed — handleData never blocks the frame loop. +func (cs *centralServer) upstreamWriter(ctx context.Context, connID uint32, upstream net.Conn, writeCh chan []byte) { + for { + select { + case <-ctx.Done(): + // Context cancelled (removeConn or cleanup) — drain any remaining data best-effort + for { + select { + case data := <-writeCh: + upstream.SetWriteDeadline(time.Now().Add(2 * time.Second)) + upstream.Write(data) + default: + return + } + } + case data, ok := <-writeCh: + if !ok { + return + } + upstream.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) + if _, err := upstream.Write(data); err != nil { + log.Printf("[central] conn=%d: upstream write failed: %v", connID, err) + upstream.SetWriteDeadline(time.Time{}) + // Drain channel to unblock any senders, then exit + for { + select { + case <-writeCh: + default: + return + } + } + } + upstream.SetWriteDeadline(time.Time{}) + } + } +} + // sendFrame picks ONE source via round-robin and writes the frame. // CRITICAL: state.mu is only held briefly to pick the source and advance // the index — the actual TCP write happens OUTSIDE the lock to prevent @@ -573,7 +619,7 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { return } - // Insert frame and collect ready data under lock (fast) + // Insert frame and collect ready data under lock (fast, no I/O) state.mu.Lock() // If this source isn't known yet (e.g., after tunnel recycling), add it. @@ -591,12 +637,14 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { state.lastActive = time.Now() state.reorderer.Insert(frame.SeqNum, frame.Payload) - if state.target == nil { + // If upstream not connected yet, data stays in reorderer for later flush + writeCh := state.writeCh + if writeCh == nil { state.mu.Unlock() - return // buffered, flushed when upstream connects + return } - // Drain all ready data into a local slice, then release the lock + // Drain all ready data from reorderer var chunks [][]byte for { data := state.reorderer.Next() @@ -605,19 +653,16 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { } chunks = append(chunks, data) } - target := state.target state.mu.Unlock() - // Write to upstream OUTSIDE state.mu — prevents blocking frame dispatch + // Send to async writer (non-blocking — MUST NOT stall the frame dispatch loop) for _, data := range chunks { - target.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) - if _, err := target.Write(data); err != nil { - target.SetWriteDeadline(time.Time{}) - log.Printf("[central] conn=%d: write to upstream failed: %v", frame.ConnID, err) - return + select { + case writeCh <- data: + default: + log.Printf("[central] conn=%d: write queue full, dropping %d bytes", frame.ConnID, len(data)) } } - target.SetWriteDeadline(time.Time{}) } func (cs *centralServer) handleFIN(frame *tunnel.Frame) { @@ -628,10 +673,11 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) { return } - // Drain remaining data and grab target under lock, then close outside + // Drain remaining data and send to async writer (non-blocking) state.mu.Lock() var chunks [][]byte - if state.target != nil { + writeCh := state.writeCh + if writeCh != nil { for { data := state.reorderer.Next() if data == nil { @@ -640,17 +686,20 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) { chunks = append(chunks, data) } } - target := state.target + state.writeCh = nil // prevent further sends from handleData state.mu.Unlock() - // Write and close outside the lock - if target != nil { + // Send remaining data to writer (non-blocking, best-effort) + if writeCh != nil { for _, data := range chunks { - target.Write(data) + select { + case writeCh <- data: + default: + } } - target.Close() } + // removeConn cancels ctx → upstreamWriter exits → upstream closed by relayUpstreamToTunnel cs.removeConn(frame.ConnID) log.Printf("[central] conn=%d: FIN received, cleaned up", frame.ConnID) } @@ -696,7 +745,7 @@ func (cs *centralServer) cleanupLoop() { for id, state := range cs.conns { state.mu.Lock() shouldClean := false - //cc + // No upstream established after 60 seconds = stuck if state.target == nil && now.Sub(state.created) > 60*time.Second { shouldClean = true From 1ef48c73265642307c404f05f69b7903a04405cc Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Sun, 29 Mar 2026 07:05:01 +0330 Subject: [PATCH 4/4] fix: prevent health probe interference with TunnelPool persistent connections Root cause: health checker created SEPARATE TCP connections to instances for framing protocol probes, concurrent with TunnelPool's persistent connection. This likely disrupted the DNS tunnel, breaking the shared persistent connection that all packet_split traffic flows through. - Skip probeFramingProtocol when TunnelPool has active connection to instance - Add keepalive frames (10s interval) to detect dead tunnels and keep DNS sessions alive - Add max-age (3min) forced reconnect to prevent long-lived connection degradation - Reduce stale threshold from 20s to 15s for faster dead tunnel detection - Ignore ConnID 0 (keepalive) on both client and CentralServer --- cmd/centralserver/main.go | 4 ++ cmd/slipstreamplus/main.go | 3 + internal/health/checker.go | 27 +++++++- internal/tunnel/pool.go | 132 +++++++++++++++++++++++++------------ 4 files changed, 122 insertions(+), 44 deletions(-) diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index 99a82cc..4a8f7ab 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -311,6 +311,10 @@ func (cs *centralServer) parseHeader(hdr [tunnel.HeaderSize]byte, conn net.Conn, } func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source *sourceConn) { + // ConnID 0 is a keepalive from TunnelPool — ignore silently + if frame.ConnID == 0 { + return + } if frame.IsSYN() { cs.handleSYN(frame, source) return diff --git a/cmd/slipstreamplus/main.go b/cmd/slipstreamplus/main.go index 780fca9..1490131 100644 --- a/cmd/slipstreamplus/main.go +++ b/cmd/slipstreamplus/main.go @@ -99,6 +99,9 @@ func main() { if isPacketSplit { tunnelPool = tunnel.NewTunnelPool(mgr) tunnelPool.Start() + // Tell health checker to use TunnelPool for probes instead of + // creating separate TCP connections that interfere with the tunnel. + checker.SetTunnelPool(tunnelPool) log.Printf("Packet-split mode: central_server=%s, chunk_size=%d", cfg.CentralServer.Address, cfg.CentralServer.ChunkSize) } diff --git a/internal/health/checker.go b/internal/health/checker.go index 4afbbb2..ef8f834 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -25,12 +25,18 @@ import ( // Latency is only set from successful tunnel probes (real RTT). const maxConsecutiveFailures = 3 +// tunnelHealthChecker is implemented by TunnelPool to report active tunnels. +type tunnelHealthChecker interface { + HasActiveTunnel(instID int) bool +} + type Checker struct { manager *engine.Manager interval time.Duration timeout time.Duration target string // health_check.target (e.g. "google.com") packetSplit bool // true when strategy=packet_split + tunnelPool tunnelHealthChecker // set in packet_split mode ctx context.Context cancel context.CancelFunc @@ -75,6 +81,14 @@ func (c *Checker) SetPacketSplit(enabled bool) { c.packetSplit = enabled } +// SetTunnelPool provides the TunnelPool so the health checker can skip +// creating separate TCP connections when the TunnelPool already has an +// active connection to an instance. This prevents interference with the +// persistent tunnel connections used in packet_split mode. +func (c *Checker) SetTunnelPool(pool tunnelHealthChecker) { + c.tunnelPool = pool +} + func (c *Checker) Stop() { c.cancel() } @@ -176,13 +190,22 @@ func (c *Checker) checkOne(inst *engine.Instance) { } // Step 3: End-to-end probe. - // In packet_split mode: test if instance's upstream speaks our framing protocol. + // In packet_split mode: if TunnelPool has an active connection, skip the + // separate framing probe entirely — the TunnelPool's keepalive + stale + // detection provides continuous health monitoring. Opening a separate TCP + // connection can interfere with the DNS tunnel's persistent connection. + // Only fall back to probeFramingProtocol for initial validation (before + // TunnelPool has connected). // In normal mode: full SOCKS5 CONNECT + HTTP through the tunnel. if c.target != "" && inst.Config.Mode != "ssh" { var e2eRtt time.Duration var e2eErr error - if c.packetSplit { + if c.packetSplit && c.tunnelPool != nil && c.tunnelPool.HasActiveTunnel(inst.ID()) { + // TunnelPool is connected — tunnel is working, skip separate probe + e2eRtt = rtt + } else if c.packetSplit { + // TunnelPool not yet connected — need separate probe for initial health check e2eRtt, e2eErr = c.probeFramingProtocol(inst) } else { e2eRtt, e2eErr = c.probeEndToEnd(inst) diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index 4fbef24..ce44d11 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -20,15 +20,23 @@ const writeTimeout = 10 * time.Second // staleThreshold: if we've sent data but haven't received anything // in this long, the connection is considered half-dead and will be // force-closed by refreshConnections (which triggers reconnect). -const staleThreshold = 20 * time.Second +const staleThreshold = 15 * time.Second + +// maxTunnelAge: force-reconnect tunnels older than this, even if they +// appear healthy. Prevents long-lived connection degradation in DNS tunnels. +const maxTunnelAge = 3 * time.Minute + +// keepaliveInterval: how often to send keepalive frames to detect dead tunnels. +const keepaliveInterval = 10 * time.Second // TunnelConn wraps a persistent TCP connection to a single instance. type TunnelConn struct { - inst *engine.Instance - mu sync.Mutex - conn net.Conn - writeMu sync.Mutex - closed bool + inst *engine.Instance + mu sync.Mutex + conn net.Conn + writeMu sync.Mutex + closed bool + createdAt time.Time lastRead atomic.Int64 // unix millis of last successful read lastWrite atomic.Int64 // unix millis of last successful write @@ -36,13 +44,13 @@ type TunnelConn struct { // TunnelPool manages ONE persistent connection per healthy instance. type TunnelPool struct { - mgr *engine.Manager - mu sync.RWMutex - tunnels map[int]*TunnelConn - handlers sync.Map // ConnID (uint32) → chan *Frame - stopCh chan struct{} - wg sync.WaitGroup - ready chan struct{} // closed when at least one tunnel is connected + mgr *engine.Manager + mu sync.RWMutex + tunnels map[int]*TunnelConn + handlers sync.Map // ConnID (uint32) → chan *Frame + stopCh chan struct{} + wg sync.WaitGroup + ready chan struct{} // closed when at least one tunnel is connected readyOnce sync.Once } @@ -73,25 +81,38 @@ func (p *TunnelPool) HasTunnels() bool { return n > 0 } +// HasActiveTunnel returns true if the given instance has a non-closed tunnel. +func (p *TunnelPool) HasActiveTunnel(instID int) bool { + p.mu.RLock() + tc, ok := p.tunnels[instID] + p.mu.RUnlock() + return ok && !tc.closed +} + func (p *TunnelPool) Start() { p.refreshConnections() p.wg.Add(1) go func() { defer p.wg.Done() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + refreshTicker := time.NewTicker(5 * time.Second) + keepaliveTicker := time.NewTicker(keepaliveInterval) + defer refreshTicker.Stop() + defer keepaliveTicker.Stop() for { select { case <-p.stopCh: return - case <-ticker.C: + case <-refreshTicker.C: p.refreshConnections() + case <-keepaliveTicker.C: + p.sendKeepalives() } } }() - log.Printf("[tunnel-pool] started (stale_threshold=%s)", staleThreshold) + log.Printf("[tunnel-pool] started (stale_threshold=%s, max_age=%s, keepalive=%s)", + staleThreshold, maxTunnelAge, keepaliveInterval) } func (p *TunnelPool) Stop() { @@ -131,10 +152,37 @@ func (p *TunnelPool) SendFrame(instID int, f *Frame) error { return tc.writeFrame(f) } -// refreshConnections reconnects dead/stale tunnels and adds new ones. +// sendKeepalives writes a small keepalive frame through each tunnel. +// This detects dead tunnels faster than waiting for stale detection, +// and keeps DNS tunnel sessions alive. +func (p *TunnelPool) sendKeepalives() { + p.mu.RLock() + tunnels := make([]*TunnelConn, 0, len(p.tunnels)) + for _, tc := range p.tunnels { + tunnels = append(tunnels, tc) + } + p.mu.RUnlock() + + keepalive := &Frame{ + ConnID: 0, // reserved — CentralServer ignores ConnID 0 + SeqNum: 0, + Flags: FlagData, + Payload: nil, + } + + for _, tc := range tunnels { + if err := tc.writeFrame(keepalive); err != nil { + log.Printf("[tunnel-pool] instance %d: keepalive failed: %v", tc.inst.ID(), err) + tc.close() + } + } +} + +// refreshConnections reconnects dead/stale/old tunnels and adds new ones. func (p *TunnelPool) refreshConnections() { healthy := p.mgr.HealthyInstances() - nowMs := time.Now().UnixMilli() + now := time.Now() + nowMs := now.UnixMilli() p.mu.Lock() defer p.mu.Unlock() @@ -146,24 +194,30 @@ func (p *TunnelPool) refreshConnections() { for id, tc := range p.tunnels { shouldRemove := false + reason := "" - if !activeIDs[id] || tc.closed { + if !activeIDs[id] { + shouldRemove = true + reason = "instance unhealthy" + } else if tc.closed { shouldRemove = true + reason = "connection closed" + } else if now.Sub(tc.createdAt) > maxTunnelAge { + // Force-reconnect old connections to prevent DNS tunnel degradation + shouldRemove = true + reason = fmt.Sprintf("max age exceeded (%s)", now.Sub(tc.createdAt).Round(time.Second)) } else { - // Detect half-dead connections: - // If we wrote recently but haven't read in staleThreshold, - // the QUIC tunnel is likely dead but local TCP is still open. - // Force-close it so readLoop exits and we can reconnect. + // Detect half-dead connections lastW := tc.lastWrite.Load() lastR := tc.lastRead.Load() if lastW > 0 && (nowMs-lastR) > staleThreshold.Milliseconds() { - log.Printf("[tunnel-pool] instance %d: stale (last_read=%dms ago, last_write=%dms ago), force-closing", - id, nowMs-lastR, nowMs-lastW) shouldRemove = true + reason = fmt.Sprintf("stale (last_read=%dms ago, last_write=%dms ago)", nowMs-lastR, nowMs-lastW) } } if shouldRemove { + log.Printf("[tunnel-pool] instance %d: removing (%s)", id, reason) tc.close() delete(p.tunnels, id) } @@ -206,12 +260,13 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) tc.SetNoDelay(true) } - now := time.Now().UnixMilli() + now := time.Now() tunnel := &TunnelConn{ - inst: inst, - conn: conn, + inst: inst, + conn: conn, + createdAt: now, } - tunnel.lastRead.Store(now) + tunnel.lastRead.Store(now.UnixMilli()) tunnel.lastWrite.Store(0) p.wg.Add(1) @@ -236,9 +291,6 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { default: } - // NO read deadline here! ReadFrame blocks until a complete frame - // arrives or the connection closes. read deadline would cause - // partial header reads → stream corruption. frame, err := ReadFrame(tc.conn) if err != nil { if err != io.EOF && !isClosedErr(err) { @@ -250,10 +302,9 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { tc.lastRead.Store(time.Now().UnixMilli()) - // Diagnostic: log reverse frames (data coming back from instances) - if frame.IsReverse() || frame.IsFIN() || frame.IsRST() { - log.Printf("[tunnel-pool] instance %d: recv frame conn=%d seq=%d flags=0x%02x len=%d", - tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload)) + // Skip keepalive responses (ConnID 0) + if frame.ConnID == 0 { + continue } if v, ok := p.handlers.Load(frame.ConnID); ok { @@ -261,11 +312,8 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { select { case ch <- frame: default: - // Drop immediately — MUST NOT block readLoop because it serves - // ALL ConnIDs on this tunnel. Blocking here stalls every other - // connection sharing this tunnel. - log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d seq=%d (buffer full)", - tc.inst.ID(), frame.ConnID, frame.SeqNum) + log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d (buffer full)", + tc.inst.ID(), frame.ConnID) } } }