diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index a76f053..4a8f7ab 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -25,26 +25,37 @@ type sourceConn struct { writeMu sync.Mutex } +// writeTimeout prevents tunnel writes from blocking forever on congested TCP. +const sourceWriteTimeout = 10 * time.Second + func (sc *sourceConn) WriteFrame(f *tunnel.Frame) error { sc.writeMu.Lock() defer sc.writeMu.Unlock() - return tunnel.WriteFrame(sc.conn, f) + sc.conn.SetWriteDeadline(time.Now().Add(sourceWriteTimeout)) + err := tunnel.WriteFrame(sc.conn, f) + sc.conn.SetWriteDeadline(time.Time{}) + return err } // connState tracks a single reassembled connection. type connState struct { - mu sync.Mutex - target net.Conn // connection to the SOCKS upstream - reorderer *tunnel.Reorderer - txSeq uint32 // next sequence number for reverse data - cancel context.CancelFunc - created time.Time + mu sync.Mutex + target net.Conn // connection to the SOCKS upstream + reorderer *tunnel.Reorderer + txSeq uint32 // next sequence number for reverse data + cancel context.CancelFunc + created time.Time lastActive time.Time // last time data was sent or received // Sources: all tunnel connections that can carry reverse data. // We round-robin responses across them (not broadcast). sources []*sourceConn sourceIdx int + + // Async write queue: handleData sends chunks here instead of writing + // to target synchronously (which would block the frame dispatch loop). + // The upstreamWriter goroutine drains this channel and writes to target. + writeCh chan []byte } // centralServer manages all active connections. @@ -55,7 +66,7 @@ type centralServer struct { conns map[uint32]*connState // ConnID → state // sourceMu protects the sources map (net.Conn → *sourceConn). - sourceMu sync.Mutex + sourceMu sync.Mutex sourceMap map[net.Conn]*sourceConn } @@ -300,6 +311,10 @@ func (cs *centralServer) parseHeader(hdr [tunnel.HeaderSize]byte, conn net.Conn, } func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source *sourceConn) { + // ConnID 0 is a keepalive from TunnelPool — ignore silently + if frame.ConnID == 0 { + return + } if frame.IsSYN() { cs.handleSYN(frame, source) return @@ -428,25 +443,35 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta io.ReadFull(upConn, make([]byte, 6)) } + // Create async write channel and set target under lock + writeCh := make(chan []byte, 256) + state.mu.Lock() state.target = upConn + state.writeCh = writeCh - // Flush any data that arrived before upstream was ready + // Drain any data that arrived before upstream was ready + var flushChunks [][]byte for { data := state.reorderer.Next() if data == nil { break } - if _, err := upConn.Write(data); err != nil { - state.mu.Unlock() - log.Printf("[central] conn=%d: flush failed: %v", connID, err) - upConn.Close() - cs.removeConn(connID) - return - } + flushChunks = append(flushChunks, data) } state.mu.Unlock() + // Start async writer goroutine — all writes to upstream go through writeCh + go cs.upstreamWriter(ctx, connID, upConn, writeCh) + + // Send flush data through the channel (channel is empty, won't block) + for _, data := range flushChunks { + select { + case writeCh <- data: + default: + } + } + log.Printf("[central] conn=%d: upstream connected to %s", connID, targetAddr) // Read upstream data and send back through tunnel (NO broadcast — round-robin) @@ -501,9 +526,49 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 } } +// upstreamWriter is a dedicated goroutine that drains writeCh and writes +// to the upstream (Xray) connection. This decouples upstream write speed +// from frame dispatch speed — handleData never blocks the frame loop. +func (cs *centralServer) upstreamWriter(ctx context.Context, connID uint32, upstream net.Conn, writeCh chan []byte) { + for { + select { + case <-ctx.Done(): + // Context cancelled (removeConn or cleanup) — drain any remaining data best-effort + for { + select { + case data := <-writeCh: + upstream.SetWriteDeadline(time.Now().Add(2 * time.Second)) + upstream.Write(data) + default: + return + } + } + case data, ok := <-writeCh: + if !ok { + return + } + upstream.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout)) + if _, err := upstream.Write(data); err != nil { + log.Printf("[central] conn=%d: upstream write failed: %v", connID, err) + upstream.SetWriteDeadline(time.Time{}) + // Drain channel to unblock any senders, then exit + for { + select { + case <-writeCh: + default: + return + } + } + } + upstream.SetWriteDeadline(time.Time{}) + } + } +} + // sendFrame picks ONE source via round-robin and writes the frame. -// If that source fails, tries the next one. Uses sourceConn.WriteFrame -// which is mutex-protected per TCP connection, preventing interleaved writes. +// CRITICAL: state.mu is only held briefly to pick the source and advance +// the index — the actual TCP write happens OUTSIDE the lock to prevent +// cascading lock contention that freezes frame dispatch for other ConnIDs. func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { cs.mu.RLock() state, ok := cs.conns[connID] @@ -512,25 +577,34 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { return } + // Snapshot sources under lock, then write outside lock state.mu.Lock() - defer state.mu.Unlock() - - if len(state.sources) == 0 { + n := len(state.sources) + if n == 0 { + state.mu.Unlock() return } + // Build ordered list starting from current sourceIdx + sources := make([]*sourceConn, n) + startIdx := state.sourceIdx % n + state.sourceIdx++ + for i := 0; i < n; i++ { + sources[i] = state.sources[(startIdx+i)%n] + } + state.mu.Unlock() - // Try each source once, starting from current index - for tries := 0; tries < len(state.sources); tries++ { - idx := state.sourceIdx % len(state.sources) - state.sourceIdx++ - sc := state.sources[idx] - + // Try each source — write happens outside state.mu + for _, sc := range sources { if err := sc.WriteFrame(frame); err != nil { - // Remove dead source - state.sources = append(state.sources[:idx], state.sources[idx+1:]...) - if state.sourceIdx > 0 { - state.sourceIdx-- + // Remove dead source under lock + state.mu.Lock() + for i, s := range state.sources { + if s == sc { + state.sources = append(state.sources[:i], state.sources[i+1:]...) + break + } } + state.mu.Unlock() continue } return // success @@ -538,6 +612,9 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { log.Printf("[central] conn=%d: all sources failed", connID) } +// upstreamWriteTimeout prevents writes to Xray upstream from blocking forever. +const upstreamWriteTimeout = 10 * time.Second + func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { cs.mu.RLock() state, ok := cs.conns[frame.ConnID] @@ -546,11 +623,10 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { return } + // Insert frame and collect ready data under lock (fast, no I/O) state.mu.Lock() - defer state.mu.Unlock() // If this source isn't known yet (e.g., after tunnel recycling), add it. - // This ensures responses can flow back through the new connection. found := false for _, s := range state.sources { if s == source { @@ -564,18 +640,31 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) { state.lastActive = time.Now() state.reorderer.Insert(frame.SeqNum, frame.Payload) - if state.target == nil { - return // buffered, flushed when upstream connects + + // If upstream not connected yet, data stays in reorderer for later flush + writeCh := state.writeCh + if writeCh == nil { + state.mu.Unlock() + return } + // Drain all ready data from reorderer + var chunks [][]byte for { data := state.reorderer.Next() if data == nil { break } - if _, err := state.target.Write(data); err != nil { - log.Printf("[central] conn=%d: write to upstream failed: %v", frame.ConnID, err) - return + chunks = append(chunks, data) + } + state.mu.Unlock() + + // Send to async writer (non-blocking — MUST NOT stall the frame dispatch loop) + for _, data := range chunks { + select { + case writeCh <- data: + default: + log.Printf("[central] conn=%d: write queue full, dropping %d bytes", frame.ConnID, len(data)) } } } @@ -587,20 +676,34 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) { if !ok { return } + + // Drain remaining data and send to async writer (non-blocking) state.mu.Lock() - if state.target != nil { + var chunks [][]byte + writeCh := state.writeCh + if writeCh != nil { for { data := state.reorderer.Next() if data == nil { break } - state.target.Write(data) + chunks = append(chunks, data) } - state.target.Close() } + state.writeCh = nil // prevent further sends from handleData state.mu.Unlock() - // Remove from map and cancel context so relayUpstreamToTunnel exits cleanly + // Send remaining data to writer (non-blocking, best-effort) + if writeCh != nil { + for _, data := range chunks { + select { + case writeCh <- data: + default: + } + } + } + + // removeConn cancels ctx → upstreamWriter exits → upstream closed by relayUpstreamToTunnel cs.removeConn(frame.ConnID) log.Printf("[central] conn=%d: FIN received, cleaned up", frame.ConnID) } diff --git a/cmd/slipstreamplus/main.go b/cmd/slipstreamplus/main.go index 780fca9..1490131 100644 --- a/cmd/slipstreamplus/main.go +++ b/cmd/slipstreamplus/main.go @@ -99,6 +99,9 @@ func main() { if isPacketSplit { tunnelPool = tunnel.NewTunnelPool(mgr) tunnelPool.Start() + // Tell health checker to use TunnelPool for probes instead of + // creating separate TCP connections that interfere with the tunnel. + checker.SetTunnelPool(tunnelPool) log.Printf("Packet-split mode: central_server=%s, chunk_size=%d", cfg.CentralServer.Address, cfg.CentralServer.ChunkSize) } diff --git a/internal/health/checker.go b/internal/health/checker.go index 4afbbb2..ef8f834 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -25,12 +25,18 @@ import ( // Latency is only set from successful tunnel probes (real RTT). const maxConsecutiveFailures = 3 +// tunnelHealthChecker is implemented by TunnelPool to report active tunnels. +type tunnelHealthChecker interface { + HasActiveTunnel(instID int) bool +} + type Checker struct { manager *engine.Manager interval time.Duration timeout time.Duration target string // health_check.target (e.g. "google.com") packetSplit bool // true when strategy=packet_split + tunnelPool tunnelHealthChecker // set in packet_split mode ctx context.Context cancel context.CancelFunc @@ -75,6 +81,14 @@ func (c *Checker) SetPacketSplit(enabled bool) { c.packetSplit = enabled } +// SetTunnelPool provides the TunnelPool so the health checker can skip +// creating separate TCP connections when the TunnelPool already has an +// active connection to an instance. This prevents interference with the +// persistent tunnel connections used in packet_split mode. +func (c *Checker) SetTunnelPool(pool tunnelHealthChecker) { + c.tunnelPool = pool +} + func (c *Checker) Stop() { c.cancel() } @@ -176,13 +190,22 @@ func (c *Checker) checkOne(inst *engine.Instance) { } // Step 3: End-to-end probe. - // In packet_split mode: test if instance's upstream speaks our framing protocol. + // In packet_split mode: if TunnelPool has an active connection, skip the + // separate framing probe entirely — the TunnelPool's keepalive + stale + // detection provides continuous health monitoring. Opening a separate TCP + // connection can interfere with the DNS tunnel's persistent connection. + // Only fall back to probeFramingProtocol for initial validation (before + // TunnelPool has connected). // In normal mode: full SOCKS5 CONNECT + HTTP through the tunnel. if c.target != "" && inst.Config.Mode != "ssh" { var e2eRtt time.Duration var e2eErr error - if c.packetSplit { + if c.packetSplit && c.tunnelPool != nil && c.tunnelPool.HasActiveTunnel(inst.ID()) { + // TunnelPool is connected — tunnel is working, skip separate probe + e2eRtt = rtt + } else if c.packetSplit { + // TunnelPool not yet connected — need separate probe for initial health check e2eRtt, e2eErr = c.probeFramingProtocol(inst) } else { e2eRtt, e2eErr = c.probeEndToEnd(inst) diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index 4fbef24..ce44d11 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -20,15 +20,23 @@ const writeTimeout = 10 * time.Second // staleThreshold: if we've sent data but haven't received anything // in this long, the connection is considered half-dead and will be // force-closed by refreshConnections (which triggers reconnect). -const staleThreshold = 20 * time.Second +const staleThreshold = 15 * time.Second + +// maxTunnelAge: force-reconnect tunnels older than this, even if they +// appear healthy. Prevents long-lived connection degradation in DNS tunnels. +const maxTunnelAge = 3 * time.Minute + +// keepaliveInterval: how often to send keepalive frames to detect dead tunnels. +const keepaliveInterval = 10 * time.Second // TunnelConn wraps a persistent TCP connection to a single instance. type TunnelConn struct { - inst *engine.Instance - mu sync.Mutex - conn net.Conn - writeMu sync.Mutex - closed bool + inst *engine.Instance + mu sync.Mutex + conn net.Conn + writeMu sync.Mutex + closed bool + createdAt time.Time lastRead atomic.Int64 // unix millis of last successful read lastWrite atomic.Int64 // unix millis of last successful write @@ -36,13 +44,13 @@ type TunnelConn struct { // TunnelPool manages ONE persistent connection per healthy instance. type TunnelPool struct { - mgr *engine.Manager - mu sync.RWMutex - tunnels map[int]*TunnelConn - handlers sync.Map // ConnID (uint32) → chan *Frame - stopCh chan struct{} - wg sync.WaitGroup - ready chan struct{} // closed when at least one tunnel is connected + mgr *engine.Manager + mu sync.RWMutex + tunnels map[int]*TunnelConn + handlers sync.Map // ConnID (uint32) → chan *Frame + stopCh chan struct{} + wg sync.WaitGroup + ready chan struct{} // closed when at least one tunnel is connected readyOnce sync.Once } @@ -73,25 +81,38 @@ func (p *TunnelPool) HasTunnels() bool { return n > 0 } +// HasActiveTunnel returns true if the given instance has a non-closed tunnel. +func (p *TunnelPool) HasActiveTunnel(instID int) bool { + p.mu.RLock() + tc, ok := p.tunnels[instID] + p.mu.RUnlock() + return ok && !tc.closed +} + func (p *TunnelPool) Start() { p.refreshConnections() p.wg.Add(1) go func() { defer p.wg.Done() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + refreshTicker := time.NewTicker(5 * time.Second) + keepaliveTicker := time.NewTicker(keepaliveInterval) + defer refreshTicker.Stop() + defer keepaliveTicker.Stop() for { select { case <-p.stopCh: return - case <-ticker.C: + case <-refreshTicker.C: p.refreshConnections() + case <-keepaliveTicker.C: + p.sendKeepalives() } } }() - log.Printf("[tunnel-pool] started (stale_threshold=%s)", staleThreshold) + log.Printf("[tunnel-pool] started (stale_threshold=%s, max_age=%s, keepalive=%s)", + staleThreshold, maxTunnelAge, keepaliveInterval) } func (p *TunnelPool) Stop() { @@ -131,10 +152,37 @@ func (p *TunnelPool) SendFrame(instID int, f *Frame) error { return tc.writeFrame(f) } -// refreshConnections reconnects dead/stale tunnels and adds new ones. +// sendKeepalives writes a small keepalive frame through each tunnel. +// This detects dead tunnels faster than waiting for stale detection, +// and keeps DNS tunnel sessions alive. +func (p *TunnelPool) sendKeepalives() { + p.mu.RLock() + tunnels := make([]*TunnelConn, 0, len(p.tunnels)) + for _, tc := range p.tunnels { + tunnels = append(tunnels, tc) + } + p.mu.RUnlock() + + keepalive := &Frame{ + ConnID: 0, // reserved — CentralServer ignores ConnID 0 + SeqNum: 0, + Flags: FlagData, + Payload: nil, + } + + for _, tc := range tunnels { + if err := tc.writeFrame(keepalive); err != nil { + log.Printf("[tunnel-pool] instance %d: keepalive failed: %v", tc.inst.ID(), err) + tc.close() + } + } +} + +// refreshConnections reconnects dead/stale/old tunnels and adds new ones. func (p *TunnelPool) refreshConnections() { healthy := p.mgr.HealthyInstances() - nowMs := time.Now().UnixMilli() + now := time.Now() + nowMs := now.UnixMilli() p.mu.Lock() defer p.mu.Unlock() @@ -146,24 +194,30 @@ func (p *TunnelPool) refreshConnections() { for id, tc := range p.tunnels { shouldRemove := false + reason := "" - if !activeIDs[id] || tc.closed { + if !activeIDs[id] { + shouldRemove = true + reason = "instance unhealthy" + } else if tc.closed { shouldRemove = true + reason = "connection closed" + } else if now.Sub(tc.createdAt) > maxTunnelAge { + // Force-reconnect old connections to prevent DNS tunnel degradation + shouldRemove = true + reason = fmt.Sprintf("max age exceeded (%s)", now.Sub(tc.createdAt).Round(time.Second)) } else { - // Detect half-dead connections: - // If we wrote recently but haven't read in staleThreshold, - // the QUIC tunnel is likely dead but local TCP is still open. - // Force-close it so readLoop exits and we can reconnect. + // Detect half-dead connections lastW := tc.lastWrite.Load() lastR := tc.lastRead.Load() if lastW > 0 && (nowMs-lastR) > staleThreshold.Milliseconds() { - log.Printf("[tunnel-pool] instance %d: stale (last_read=%dms ago, last_write=%dms ago), force-closing", - id, nowMs-lastR, nowMs-lastW) shouldRemove = true + reason = fmt.Sprintf("stale (last_read=%dms ago, last_write=%dms ago)", nowMs-lastR, nowMs-lastW) } } if shouldRemove { + log.Printf("[tunnel-pool] instance %d: removing (%s)", id, reason) tc.close() delete(p.tunnels, id) } @@ -206,12 +260,13 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) tc.SetNoDelay(true) } - now := time.Now().UnixMilli() + now := time.Now() tunnel := &TunnelConn{ - inst: inst, - conn: conn, + inst: inst, + conn: conn, + createdAt: now, } - tunnel.lastRead.Store(now) + tunnel.lastRead.Store(now.UnixMilli()) tunnel.lastWrite.Store(0) p.wg.Add(1) @@ -236,9 +291,6 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { default: } - // NO read deadline here! ReadFrame blocks until a complete frame - // arrives or the connection closes. read deadline would cause - // partial header reads → stream corruption. frame, err := ReadFrame(tc.conn) if err != nil { if err != io.EOF && !isClosedErr(err) { @@ -250,10 +302,9 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { tc.lastRead.Store(time.Now().UnixMilli()) - // Diagnostic: log reverse frames (data coming back from instances) - if frame.IsReverse() || frame.IsFIN() || frame.IsRST() { - log.Printf("[tunnel-pool] instance %d: recv frame conn=%d seq=%d flags=0x%02x len=%d", - tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload)) + // Skip keepalive responses (ConnID 0) + if frame.ConnID == 0 { + continue } if v, ok := p.handlers.Load(frame.ConnID); ok { @@ -261,11 +312,8 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { select { case ch <- frame: default: - // Drop immediately — MUST NOT block readLoop because it serves - // ALL ConnIDs on this tunnel. Blocking here stalls every other - // connection sharing this tunnel. - log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d seq=%d (buffer full)", - tc.inst.ID(), frame.ConnID, frame.SeqNum) + log.Printf("[tunnel-pool] instance %d: dropping frame conn=%d (buffer full)", + tc.inst.ID(), frame.ConnID) } } }