Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (a UserPassAuthenticator) Authenticate(reader io.Reader, writer io.Writer)
// authenticate is used to handle connection authentication
func (s *Server) authenticate(conn io.Writer, bufConn io.Reader) (*AuthContext, error) {
// Get the methods
methods, err := readMethods(bufConn)
methods, err := ReadMethods(bufConn)
if err != nil {
return nil, fmt.Errorf("failed to get auth methods: %v", err)
}
Expand All @@ -190,9 +190,9 @@ func noAcceptableAuth(conn io.Writer) error {
return ErrNoSupportedAuth
}

// readMethods is used to read the number of methods
// ReadMethods is used to read the number of methods
// and proceeding auth methods
func readMethods(r io.Reader) ([]byte, error) {
func ReadMethods(r io.Reader) ([]byte, error) {
header := []byte{0}
if _, err := r.Read(header); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/haxii/socks5

go 1.12
go 1.22
171 changes: 171 additions & 0 deletions proxy_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package socks5

import (
"errors"
"fmt"
"io"
"sync"
)

type closeWriter interface {
CloseWrite() error
}

type closeReader interface {
CloseRead() error
}

// Ring buffer size. Must be a power of two so that `x & mask` is equivalent
// to `x % proxyBufSize` (see ProxyStream for the indexing scheme).
const (
proxyBufSize uint64 = 64 << 10
proxyBufMask = proxyBufSize - 1
)

// proxyBufPool recycles the per-connection 64 KiB ring buffers. The buffer
// dominates ProxyStream's allocation footprint, and under sustained load
// each connection would otherwise force a fresh 64 KiB heap allocation.
// Storing a pointer to a fixed-size array (rather than a slice) avoids the
// extra slice-header allocation that sync.Pool would otherwise introduce.
var proxyBufPool = sync.Pool{
New: func() any {
return new([proxyBufSize]byte)
},
}

// ProxyStream forwards data from src to dst, similar to io.Copy, but with improved performance.
// Unlike io.Copy’s sequential read/write model, it allows reads to continue while writes are in progress,
// using a single 64 KiB ring buffer shared between the reader and writer.
// ProxyStream closes both the read and write sides when the transfer completes.
func ProxyStream(src io.Reader, dst io.Writer) error {
// head and tail are monotonically increasing byte counters, not positions
// inside buf. Indexing into buf is done via `& proxyBufMask`, which is
// equivalent to `% proxyBufSize` but cheaper — this requires proxyBufSize
// to be a power of two. With this scheme, `head - tail` directly yields
// the number of occupied bytes regardless of wrap, and we never need to
// reset the counters.
bufArr := proxyBufPool.Get().(*[proxyBufSize]byte)
defer proxyBufPool.Put(bufArr)
buf := bufArr[:]
var (
head uint64 // total bytes written into buf by reader
tail uint64 // total bytes consumed from buf by writer
mu sync.Mutex
readDone bool
writeErr error
)
cond := sync.NewCond(&mu)

writerDone := make(chan struct{})

// Writer goroutine: drains [tail, head) from the ring buffer into dst.
go func() {
defer close(writerDone)
for {
mu.Lock()
for head == tail && !readDone {
cond.Wait()
}
if head == tail {
// Reader finished and buffer drained.
mu.Unlock()
return
}

// Contiguous readable region starting at rIdx. When the occupied
// range wraps past proxyBufSize, only the first segment
// [rIdx, proxyBufSize) is taken here; the remainder [0, hIdx) is
// handled on the next iteration once tail has advanced past the
// wrap point.
rIdx := tail & proxyBufMask
hIdx := head & proxyBufMask
var data []byte
if hIdx > rIdx {
data = buf[rIdx:hIdx]
} else {
data = buf[rIdx:proxyBufSize]
}
mu.Unlock()

n, err := dst.Write(data)

mu.Lock()
tail += uint64(n)
cond.Signal()
if err != nil {
writeErr = err
mu.Unlock()
// Unblock reader if it’s stuck on src.Read.
if cr, ok := src.(closeReader); ok {
_ = cr.CloseRead()
}
return
}
mu.Unlock()
}
}()

// Reader loop: fills [head, tail+proxyBufSize) in the ring buffer from src.
var readErr error
for {
mu.Lock()
for head-tail == proxyBufSize && writeErr == nil {
cond.Wait()
}
if writeErr != nil {
mu.Unlock()
break
}

// Contiguous writable region starting at wIdx. Mirrors the writer
// side: if the free range wraps, only [wIdx, proxyBufSize) is taken
// here and [0, tIdx) is picked up on the next iteration after head
// wraps.
wIdx := head & proxyBufMask
tIdx := tail & proxyBufMask
var space []byte
if tIdx > wIdx {
space = buf[wIdx:tIdx]
} else {
space = buf[wIdx:proxyBufSize]
}
mu.Unlock()

n, err := src.Read(space)

mu.Lock()
head += uint64(n)
if err != nil {
readDone = true
if !errors.Is(err, io.EOF) {
readErr = err
}
cond.Signal()
mu.Unlock()
break
}
if n > 0 {
cond.Signal()
}
mu.Unlock()
}

<-writerDone

// Close both sides.
if cr, ok := src.(closeReader); ok {
_ = cr.CloseRead()
}
if cw, ok := dst.(closeWriter); ok {
_ = cw.CloseWrite()
}

if readErr != nil {
readErr = fmt.Errorf("read error: %v", readErr)
}
if writeErr != nil {
writeErr = fmt.Errorf("write error: %v", writeErr)
}

return errors.Join(writeErr, readErr)
}
Loading