Skip to content
Merged

Dev #22

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 145 additions & 42 deletions cmd/centralserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,37 @@ type sourceConn struct {
writeMu sync.Mutex
}

// writeTimeout prevents tunnel writes from blocking forever on congested TCP.
const sourceWriteTimeout = 10 * time.Second

func (sc *sourceConn) WriteFrame(f *tunnel.Frame) error {
sc.writeMu.Lock()
defer sc.writeMu.Unlock()
return tunnel.WriteFrame(sc.conn, f)
sc.conn.SetWriteDeadline(time.Now().Add(sourceWriteTimeout))
err := tunnel.WriteFrame(sc.conn, f)
sc.conn.SetWriteDeadline(time.Time{})
return err
}

// connState tracks a single reassembled connection.
type connState struct {
mu sync.Mutex
target net.Conn // connection to the SOCKS upstream
reorderer *tunnel.Reorderer
txSeq uint32 // next sequence number for reverse data
cancel context.CancelFunc
created time.Time
mu sync.Mutex
target net.Conn // connection to the SOCKS upstream
reorderer *tunnel.Reorderer
txSeq uint32 // next sequence number for reverse data
cancel context.CancelFunc
created time.Time
lastActive time.Time // last time data was sent or received

// Sources: all tunnel connections that can carry reverse data.
// We round-robin responses across them (not broadcast).
sources []*sourceConn
sourceIdx int

// Async write queue: handleData sends chunks here instead of writing
// to target synchronously (which would block the frame dispatch loop).
// The upstreamWriter goroutine drains this channel and writes to target.
writeCh chan []byte
}

// centralServer manages all active connections.
Expand All @@ -55,7 +66,7 @@ type centralServer struct {
conns map[uint32]*connState // ConnID → state

// sourceMu protects the sources map (net.Conn → *sourceConn).
sourceMu sync.Mutex
sourceMu sync.Mutex
sourceMap map[net.Conn]*sourceConn
}

Expand Down Expand Up @@ -300,6 +311,10 @@ func (cs *centralServer) parseHeader(hdr [tunnel.HeaderSize]byte, conn net.Conn,
}

func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source *sourceConn) {
// ConnID 0 is a keepalive from TunnelPool — ignore silently
if frame.ConnID == 0 {
return
}
if frame.IsSYN() {
cs.handleSYN(frame, source)
return
Expand Down Expand Up @@ -428,25 +443,35 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta
io.ReadFull(upConn, make([]byte, 6))
}

// Create async write channel and set target under lock
writeCh := make(chan []byte, 256)

state.mu.Lock()
state.target = upConn
state.writeCh = writeCh

// Flush any data that arrived before upstream was ready
// Drain any data that arrived before upstream was ready
var flushChunks [][]byte
for {
data := state.reorderer.Next()
if data == nil {
break
}
if _, err := upConn.Write(data); err != nil {
state.mu.Unlock()
log.Printf("[central] conn=%d: flush failed: %v", connID, err)
upConn.Close()
cs.removeConn(connID)
return
}
flushChunks = append(flushChunks, data)
}
state.mu.Unlock()

// Start async writer goroutine — all writes to upstream go through writeCh
go cs.upstreamWriter(ctx, connID, upConn, writeCh)

// Send flush data through the channel (channel is empty, won't block)
for _, data := range flushChunks {
select {
case writeCh <- data:
default:
}
}

log.Printf("[central] conn=%d: upstream connected to %s", connID, targetAddr)

// Read upstream data and send back through tunnel (NO broadcast — round-robin)
Expand Down Expand Up @@ -501,9 +526,49 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3
}
}

// upstreamWriter is a dedicated goroutine that drains writeCh and writes
// to the upstream (Xray) connection. This decouples upstream write speed
// from frame dispatch speed — handleData never blocks the frame loop.
func (cs *centralServer) upstreamWriter(ctx context.Context, connID uint32, upstream net.Conn, writeCh chan []byte) {
for {
select {
case <-ctx.Done():
// Context cancelled (removeConn or cleanup) — drain any remaining data best-effort
for {
select {
case data := <-writeCh:
upstream.SetWriteDeadline(time.Now().Add(2 * time.Second))
upstream.Write(data)
default:
return
}
}
case data, ok := <-writeCh:
if !ok {
return
}
upstream.SetWriteDeadline(time.Now().Add(upstreamWriteTimeout))
if _, err := upstream.Write(data); err != nil {
log.Printf("[central] conn=%d: upstream write failed: %v", connID, err)
upstream.SetWriteDeadline(time.Time{})
// Drain channel to unblock any senders, then exit
for {
select {
case <-writeCh:
default:
return
}
}
}
upstream.SetWriteDeadline(time.Time{})
}
}
}

// sendFrame picks ONE source via round-robin and writes the frame.
// If that source fails, tries the next one. Uses sourceConn.WriteFrame
// which is mutex-protected per TCP connection, preventing interleaved writes.
// CRITICAL: state.mu is only held briefly to pick the source and advance
// the index — the actual TCP write happens OUTSIDE the lock to prevent
// cascading lock contention that freezes frame dispatch for other ConnIDs.
func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
cs.mu.RLock()
state, ok := cs.conns[connID]
Expand All @@ -512,32 +577,44 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
return
}

// Snapshot sources under lock, then write outside lock
state.mu.Lock()
defer state.mu.Unlock()

if len(state.sources) == 0 {
n := len(state.sources)
if n == 0 {
state.mu.Unlock()
return
}
// Build ordered list starting from current sourceIdx
sources := make([]*sourceConn, n)
startIdx := state.sourceIdx % n
state.sourceIdx++
for i := 0; i < n; i++ {
sources[i] = state.sources[(startIdx+i)%n]
}
state.mu.Unlock()

// Try each source once, starting from current index
for tries := 0; tries < len(state.sources); tries++ {
idx := state.sourceIdx % len(state.sources)
state.sourceIdx++
sc := state.sources[idx]

// Try each source — write happens outside state.mu
for _, sc := range sources {
if err := sc.WriteFrame(frame); err != nil {
// Remove dead source
state.sources = append(state.sources[:idx], state.sources[idx+1:]...)
if state.sourceIdx > 0 {
state.sourceIdx--
// Remove dead source under lock
state.mu.Lock()
for i, s := range state.sources {
if s == sc {
state.sources = append(state.sources[:i], state.sources[i+1:]...)
break
}
}
state.mu.Unlock()
continue
}
return // success
}
log.Printf("[central] conn=%d: all sources failed", connID)
}

// upstreamWriteTimeout prevents writes to Xray upstream from blocking forever.
const upstreamWriteTimeout = 10 * time.Second

func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) {
cs.mu.RLock()
state, ok := cs.conns[frame.ConnID]
Expand All @@ -546,11 +623,10 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) {
return
}

// Insert frame and collect ready data under lock (fast, no I/O)
state.mu.Lock()
defer state.mu.Unlock()

// If this source isn't known yet (e.g., after tunnel recycling), add it.
// This ensures responses can flow back through the new connection.
found := false
for _, s := range state.sources {
if s == source {
Expand All @@ -564,18 +640,31 @@ func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) {

state.lastActive = time.Now()
state.reorderer.Insert(frame.SeqNum, frame.Payload)
if state.target == nil {
return // buffered, flushed when upstream connects

// If upstream not connected yet, data stays in reorderer for later flush
writeCh := state.writeCh
if writeCh == nil {
state.mu.Unlock()
return
}

// Drain all ready data from reorderer
var chunks [][]byte
for {
data := state.reorderer.Next()
if data == nil {
break
}
if _, err := state.target.Write(data); err != nil {
log.Printf("[central] conn=%d: write to upstream failed: %v", frame.ConnID, err)
return
chunks = append(chunks, data)
}
state.mu.Unlock()

// Send to async writer (non-blocking — MUST NOT stall the frame dispatch loop)
for _, data := range chunks {
select {
case writeCh <- data:
default:
log.Printf("[central] conn=%d: write queue full, dropping %d bytes", frame.ConnID, len(data))
}
}
}
Expand All @@ -587,20 +676,34 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) {
if !ok {
return
}

// Drain remaining data and send to async writer (non-blocking)
state.mu.Lock()
if state.target != nil {
var chunks [][]byte
writeCh := state.writeCh
if writeCh != nil {
for {
data := state.reorderer.Next()
if data == nil {
break
}
state.target.Write(data)
chunks = append(chunks, data)
}
state.target.Close()
}
state.writeCh = nil // prevent further sends from handleData
state.mu.Unlock()

// Remove from map and cancel context so relayUpstreamToTunnel exits cleanly
// Send remaining data to writer (non-blocking, best-effort)
if writeCh != nil {
for _, data := range chunks {
select {
case writeCh <- data:
default:
}
}
}

// removeConn cancels ctx → upstreamWriter exits → upstream closed by relayUpstreamToTunnel
cs.removeConn(frame.ConnID)
log.Printf("[central] conn=%d: FIN received, cleaned up", frame.ConnID)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/slipstreamplus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func main() {
if isPacketSplit {
tunnelPool = tunnel.NewTunnelPool(mgr)
tunnelPool.Start()
// Tell health checker to use TunnelPool for probes instead of
// creating separate TCP connections that interfere with the tunnel.
checker.SetTunnelPool(tunnelPool)
log.Printf("Packet-split mode: central_server=%s, chunk_size=%d",
cfg.CentralServer.Address, cfg.CentralServer.ChunkSize)
}
Expand Down
27 changes: 25 additions & 2 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ import (
// Latency is only set from successful tunnel probes (real RTT).
const maxConsecutiveFailures = 3

// tunnelHealthChecker is implemented by TunnelPool to report active tunnels.
type tunnelHealthChecker interface {
HasActiveTunnel(instID int) bool
}

type Checker struct {
manager *engine.Manager
interval time.Duration
timeout time.Duration
target string // health_check.target (e.g. "google.com")
packetSplit bool // true when strategy=packet_split
tunnelPool tunnelHealthChecker // set in packet_split mode
ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -75,6 +81,14 @@ func (c *Checker) SetPacketSplit(enabled bool) {
c.packetSplit = enabled
}

// SetTunnelPool provides the TunnelPool so the health checker can skip
// creating separate TCP connections when the TunnelPool already has an
// active connection to an instance. This prevents interference with the
// persistent tunnel connections used in packet_split mode.
func (c *Checker) SetTunnelPool(pool tunnelHealthChecker) {
c.tunnelPool = pool
}

func (c *Checker) Stop() {
c.cancel()
}
Expand Down Expand Up @@ -176,13 +190,22 @@ func (c *Checker) checkOne(inst *engine.Instance) {
}

// Step 3: End-to-end probe.
// In packet_split mode: test if instance's upstream speaks our framing protocol.
// In packet_split mode: if TunnelPool has an active connection, skip the
// separate framing probe entirely — the TunnelPool's keepalive + stale
// detection provides continuous health monitoring. Opening a separate TCP
// connection can interfere with the DNS tunnel's persistent connection.
// Only fall back to probeFramingProtocol for initial validation (before
// TunnelPool has connected).
// In normal mode: full SOCKS5 CONNECT + HTTP through the tunnel.
if c.target != "" && inst.Config.Mode != "ssh" {
var e2eRtt time.Duration
var e2eErr error

if c.packetSplit {
if c.packetSplit && c.tunnelPool != nil && c.tunnelPool.HasActiveTunnel(inst.ID()) {
// TunnelPool is connected — tunnel is working, skip separate probe
e2eRtt = rtt
} else if c.packetSplit {
// TunnelPool not yet connected — need separate probe for initial health check
e2eRtt, e2eErr = c.probeFramingProtocol(inst)
} else {
e2eRtt, e2eErr = c.probeEndToEnd(inst)
Expand Down
Loading
Loading