Skip to content
Merged

Dev #20

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 77 additions & 30 deletions cmd/centralserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@ import (

"github.com/ParsaKSH/SlipStream-Plus/internal/tunnel"
)
//cc

// sourceConn wraps a tunnel connection with a write mutex so that
// concurrent WriteFrame calls from different ConnIDs don't interleave bytes.
type sourceConn struct {
conn net.Conn
writeMu sync.Mutex
}

func (sc *sourceConn) WriteFrame(f *tunnel.Frame) error {
sc.writeMu.Lock()
defer sc.writeMu.Unlock()
return tunnel.WriteFrame(sc.conn, f)
}

// connState tracks a single reassembled connection.
type connState struct {
mu sync.Mutex
Expand All @@ -29,7 +42,7 @@ type connState struct {

// Sources: all tunnel connections that can carry reverse data.
// We round-robin responses across them (not broadcast).
sources []io.Writer
sources []*sourceConn
sourceIdx int
}

Expand All @@ -39,6 +52,10 @@ type centralServer struct {

mu sync.RWMutex
conns map[uint32]*connState // ConnID → state

// sourceMu protects the sources map (net.Conn → *sourceConn).
sourceMu sync.Mutex
sourceMap map[net.Conn]*sourceConn
}

func main() {
Expand All @@ -54,6 +71,7 @@ func main() {
cs := &centralServer{
socksUpstream: *socksUpstream,
conns: make(map[uint32]*connState),
sourceMap: make(map[net.Conn]*sourceConn),
}

sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -147,12 +165,15 @@ func (cs *centralServer) handleSOCKS5Passthrough(clientConn net.Conn, firstByte
func (cs *centralServer) handleFrameConn(conn net.Conn, firstByte byte, remoteAddr string) {
log.Printf("[central] frame connection from %s", remoteAddr)

sc := cs.getSourceConn(conn)

// Track which ConnIDs this source served
servedIDs := make(map[uint32]bool)

defer func() {
// Source TCP died — clean up connStates that only had this source
cs.cleanupSource(conn, servedIDs, remoteAddr)
// Source TCP died — clean up sourceConn and connStates
cs.removeSourceConn(conn)
cs.cleanupSource(sc, servedIDs, remoteAddr)
}()

// Read remaining header bytes (we already read 1)
Expand All @@ -168,7 +189,7 @@ func (cs *centralServer) handleFrameConn(conn net.Conn, firstByte byte, remoteAd
firstFrame := cs.parseHeader(fullHdr, conn, remoteAddr)
if firstFrame != nil {
servedIDs[firstFrame.ConnID] = true
cs.dispatchFrame(firstFrame, conn)
cs.dispatchFrame(firstFrame, sc)
}

for {
Expand All @@ -180,13 +201,13 @@ func (cs *centralServer) handleFrameConn(conn net.Conn, firstByte byte, remoteAd
return
}
servedIDs[frame.ConnID] = true
cs.dispatchFrame(frame, conn)
cs.dispatchFrame(frame, sc)
}
}

// cleanupSource removes a dead source connection from all connStates.
// If a connState has no remaining sources, it is fully cleaned up.
func (cs *centralServer) cleanupSource(deadSource net.Conn, servedIDs map[uint32]bool, remoteAddr string) {
func (cs *centralServer) cleanupSource(deadSource *sourceConn, servedIDs map[uint32]bool, remoteAddr string) {
cs.mu.Lock()
defer cs.mu.Unlock()

Expand Down Expand Up @@ -227,6 +248,26 @@ func (cs *centralServer) cleanupSource(deadSource net.Conn, servedIDs map[uint32
}
}

// getSourceConn returns the sourceConn wrapper for a raw net.Conn,
// creating one if it doesn't exist yet.
func (cs *centralServer) getSourceConn(conn net.Conn) *sourceConn {
cs.sourceMu.Lock()
defer cs.sourceMu.Unlock()
sc, ok := cs.sourceMap[conn]
if !ok {
sc = &sourceConn{conn: conn}
cs.sourceMap[conn] = sc
}
return sc
}

// removeSourceConn removes the sourceConn wrapper when the raw conn dies.
func (cs *centralServer) removeSourceConn(conn net.Conn) {
cs.sourceMu.Lock()
delete(cs.sourceMap, conn)
cs.sourceMu.Unlock()
}

func isClosedConnErr(err error) bool {
if err == nil {
return false
Expand Down Expand Up @@ -257,7 +298,7 @@ func (cs *centralServer) parseHeader(hdr [tunnel.HeaderSize]byte, conn net.Conn,
}
}

func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source net.Conn) {
func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source *sourceConn) {
if frame.IsSYN() {
cs.handleSYN(frame, source)
return
Expand All @@ -273,7 +314,7 @@ func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source net.Conn) {
cs.handleData(frame, source)
}

func (cs *centralServer) handleSYN(frame *tunnel.Frame, source net.Conn) {
func (cs *centralServer) handleSYN(frame *tunnel.Frame, source *sourceConn) {
connID := frame.ConnID

cs.mu.Lock()
Expand Down Expand Up @@ -307,19 +348,19 @@ func (cs *centralServer) handleSYN(frame *tunnel.Frame, source net.Conn) {
ctx, cancel := context.WithCancel(context.Background())
state := &connState{
reorderer: tunnel.NewReordererAt(frame.SeqNum + 1), // skip SYN's SeqNum
sources: []io.Writer{source},
sources: []*sourceConn{source},
cancel: cancel,
created: time.Now(),
}
cs.conns[connID] = state
cs.mu.Unlock()

log.Printf("[central] conn=%d: SYN → target=%s", connID, targetAddr)
go cs.connectUpstream(ctx, connID, state, atyp, addr, port, targetAddr, source)
go cs.connectUpstream(ctx, connID, state, atyp, addr, port, targetAddr)
}

func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, state *connState,
atyp byte, addr, port []byte, targetAddr string, source net.Conn) {
atyp byte, addr, port []byte, targetAddr string) {

upConn, err := net.DialTimeout("tcp", cs.socksUpstream, 10*time.Second)
if err != nil {
Expand Down Expand Up @@ -349,6 +390,7 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta
return
}

// Read greeting response (2 bytes) + CONNECT response header (4 bytes)
resp := make([]byte, 6)
if _, err := io.ReadFull(upConn, resp); err != nil {
log.Printf("[central] conn=%d: upstream response read failed: %v", connID, err)
Expand All @@ -358,7 +400,18 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta
return
}

// Drain bind address
// Check CONNECT result BEFORE draining bind address.
// resp[3] = REP field (0x00 = success). If non-zero, upstream may close
// without sending bind address, so don't try to drain it.
if resp[3] != 0x00 {
log.Printf("[central] conn=%d: upstream CONNECT rejected: 0x%02x", connID, resp[3])
upConn.Close()
cs.sendFrame(connID, &tunnel.Frame{ConnID: connID, Flags: tunnel.FlagRST | tunnel.FlagReverse})
cs.removeConn(connID)
return
}

// Drain bind address (only on success)
switch resp[5] {
case 0x01:
io.ReadFull(upConn, make([]byte, 6))
Expand All @@ -372,14 +425,6 @@ func (cs *centralServer) connectUpstream(ctx context.Context, connID uint32, sta
io.ReadFull(upConn, make([]byte, 6))
}

if resp[3] != 0x00 {
log.Printf("[central] conn=%d: upstream CONNECT rejected: 0x%02x", connID, resp[3])
upConn.Close()
cs.sendFrame(connID, &tunnel.Frame{ConnID: connID, Flags: tunnel.FlagRST | tunnel.FlagReverse})
cs.removeConn(connID)
return
}

state.mu.Lock()
state.target = upConn

Expand Down Expand Up @@ -453,7 +498,8 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3
}

// sendFrame picks ONE source via round-robin and writes the frame.
// If that source fails, tries the next one. Much better than broadcasting.
// If that source fails, tries the next one. Uses sourceConn.WriteFrame
// which is mutex-protected per TCP connection, preventing interleaved writes.
func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
cs.mu.RLock()
state, ok := cs.conns[connID]
Expand All @@ -473,9 +519,9 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
for tries := 0; tries < len(state.sources); tries++ {
idx := state.sourceIdx % len(state.sources)
state.sourceIdx++
w := state.sources[idx]
sc := state.sources[idx]

if err := tunnel.WriteFrame(w, frame); err != nil {
if err := sc.WriteFrame(frame); err != nil {
// Remove dead source
state.sources = append(state.sources[:idx], state.sources[idx+1:]...)
if state.sourceIdx > 0 {
Expand All @@ -488,7 +534,7 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
log.Printf("[central] conn=%d: all sources failed", connID)
}

func (cs *centralServer) handleData(frame *tunnel.Frame, source net.Conn) {
func (cs *centralServer) handleData(frame *tunnel.Frame, source *sourceConn) {
cs.mu.RLock()
state, ok := cs.conns[frame.ConnID]
cs.mu.RUnlock()
Expand Down Expand Up @@ -548,7 +594,10 @@ func (cs *centralServer) handleFIN(frame *tunnel.Frame) {
state.target.Close()
}
state.mu.Unlock()
log.Printf("[central] conn=%d: FIN received", frame.ConnID)

// Remove from map and cancel context so relayUpstreamToTunnel exits cleanly
cs.removeConn(frame.ConnID)
log.Printf("[central] conn=%d: FIN received, cleaned up", frame.ConnID)
}

func (cs *centralServer) handleRST(frame *tunnel.Frame) {
Expand Down Expand Up @@ -601,10 +650,8 @@ func (cs *centralServer) cleanupLoop() {
if len(state.sources) == 0 && now.Sub(state.created) > 30*time.Second {
shouldClean = true
}
// Connection too old (5 min max lifetime)
if now.Sub(state.created) > 5*time.Minute {
shouldClean = true
}
// No max lifetime — long-lived connections (downloads, streams)
// are valid. Cleanup only based on actual broken state above.

state.mu.Unlock()
if shouldClean {
Expand Down
4 changes: 2 additions & 2 deletions internal/gui/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ canvas{width:100%;height:200px;border-radius:var(--rs);background:var(--bg2);bor
<div class="cs-title">Central Server Settings</div>
<div class="form-row">
<div class="form-group">
<label>Address <span class="tip-btn" data-tip="Host:port of your centralserver binary&#10;(e.g. 45.89.223.100:9500).&#10;All slipstream-server instances must point&#10;their --target-address here.">?</span></label>
<input id="cfg-cs-addr" placeholder="45.89.223.100:9500">
<label>Address <span class="tip-btn" data-tip="Host:port of your centralserver binary&#10;(e.g. 192.168.1.1:9500).&#10;All slipstream-server instances must point&#10;their --target-address here.">?</span></label>
<input id="cfg-cs-addr" placeholder="192.168.1.1:9500">
</div>
<div class="form-group">
<label>Chunk Size <span class="tip-btn" data-tip="Max bytes per frame sent to centralserver.&#10;Default: 8192. Larger = fewer frames,&#10;smaller = lower latency. Range: 1024–65536.">?</span></label>
Expand Down
14 changes: 12 additions & 2 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package health

import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"sync"
"sync/atomic"
"time"

"github.com/ParsaKSH/SlipStream-Plus/internal/config"
Expand All @@ -34,6 +36,8 @@ type Checker struct {

mu sync.Mutex
failures map[int]int

probeSeq atomic.Uint32 // unique probe counter to avoid ConnID collisions
}

func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker {
Expand Down Expand Up @@ -371,8 +375,14 @@ func (c *Checker) probeFramingProtocol(inst *engine.Instance) (time.Duration, er
synPayload = append(synPayload, []byte(domain)...) // domain
synPayload = append(synPayload, 0x00, 0x50) // port 80

// Use a unique probe ConnID (high range to avoid collision)
probeConnID := uint32(0xFFFF0000) + uint32(inst.ID())
// Use a unique probe ConnID combining high-range prefix, instance ID,
// monotonic counter, and random bits to avoid collisions with real connections
// and previous probes that haven't been cleaned up yet.
seq := c.probeSeq.Add(1)
var rndBuf [2]byte
rand.Read(rndBuf[:])
rnd := uint32(binary.BigEndian.Uint16(rndBuf[:]))
probeConnID := uint32(0xFE000000) | (uint32(inst.ID())&0xFF)<<16 | (seq&0xFF)<<8 | (rnd & 0xFF)

synFrame := &tunnel.Frame{
ConnID: probeConnID,
Expand Down
10 changes: 9 additions & 1 deletion internal/proxy/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Server struct {
// Packet-split mode fields
packetSplit bool
tunnelPool *tunnel.TunnelPool
connIDGen tunnel.ConnIDGenerator
connIDGen *tunnel.ConnIDGenerator
chunkSize int
}

Expand All @@ -53,6 +53,7 @@ func NewServer(listenAddr string, bufferSize int, maxConns int, mgr *engine.Mana
func (s *Server) EnablePacketSplit(pool *tunnel.TunnelPool, chunkSize int) {
s.packetSplit = true
s.tunnelPool = pool
s.connIDGen = tunnel.NewConnIDGenerator()
s.chunkSize = chunkSize
log.Printf("[proxy] packet-split mode enabled (chunk_size=%d)", chunkSize)
}
Expand Down Expand Up @@ -281,6 +282,13 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// When context is cancelled (either direction finished), close the client
// connection so that blocking Read/Write calls unblock immediately.
go func() {
<-ctx.Done()
clientConn.Close()
}()

var txN, rxN int64
var wg sync.WaitGroup

Expand Down
8 changes: 6 additions & 2 deletions internal/tunnel/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,12 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {
ch := v.(chan *Frame)
select {
case ch <- frame:
default:
// Buffer full — drop silently
case <-time.After(5 * time.Second):
// Buffer full for too long — connection is stuck, log and drop
log.Printf("[tunnel-pool] instance %d: frame buffer full for conn=%d, dropping frame seq=%d",
tc.inst.ID(), frame.ConnID, frame.SeqNum)
case <-p.stopCh:
return
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions internal/tunnel/protocol.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tunnel

import (
"crypto/rand"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -104,10 +105,27 @@ func ReadFrame(r io.Reader) (*Frame, error) {
}

// ConnIDGenerator produces unique connection IDs.
// Starts from a random offset to avoid collisions after process restarts
// while CentralServer still holds state from the previous session.
type ConnIDGenerator struct {
counter atomic.Uint32
}

// NewConnIDGenerator creates a generator with a random starting offset.
func NewConnIDGenerator() *ConnIDGenerator {
g := &ConnIDGenerator{}
// Random start in [1, 0x7FFFFFFF) — avoids 0 and leaves room before wrap
var buf [4]byte
if _, err := rand.Read(buf[:]); err == nil {
start := binary.BigEndian.Uint32(buf[:]) & 0x7FFFFFFF
if start == 0 {
start = 1
}
g.counter.Store(start)
}
return g
}

// Next returns the next unique connection ID.
func (g *ConnIDGenerator) Next() uint32 {
return g.counter.Add(1)
Expand Down
Loading
Loading