Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
*~
/bin/cron-control-runner
.env
kb/
26 changes: 25 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type options struct {
remoteToken string
useWebsockets bool
eventsWebhookURL string
maxHandshakeBytes int
handshakeInitialTTL time.Duration
handshakeIdleTTL time.Duration
maxHandshakeSessions int
Comment on lines 31 to +35
useLocker bool
dataConfigPath string
lockerRefreshInterval time.Duration
Expand Down Expand Up @@ -78,7 +82,19 @@ func main() {
// Setup the remote CLI module if enabled.
if 0 < len(options.remoteToken) {
// TODO: This module could definitely use some general refactoring, but namely a graceful shutdown would be good.
remote.Setup(options.remoteToken, options.useWebsockets, options.wpCLIPath, options.wpPath, options.eventsWebhookURL)
remote.SetupWithOptions(
options.remoteToken,
options.useWebsockets,
options.wpCLIPath,
options.wpPath,
options.eventsWebhookURL,
remote.SetupOptions{
MaxHandshakeBytes: options.maxHandshakeBytes,
HandshakeInitialTimeout: options.handshakeInitialTTL,
HandshakeIdleTimeout: options.handshakeIdleTTL,
MaxConcurrentHandshakes: options.maxHandshakeSessions,
},
)
go remote.ListenForConnections()
}

Expand Down Expand Up @@ -107,6 +123,10 @@ func getCliOptions() options {
},
remoteToken: "",
useWebsockets: false,
maxHandshakeBytes: 64 * 1024,
handshakeInitialTTL: 15 * time.Second,
handshakeIdleTTL: 200 * time.Millisecond,
maxHandshakeSessions: 256,
useLocker: false,
dataConfigPath: "/etc/wpvip-data-config/config.json",
lockerRefreshInterval: 10 * time.Second,
Expand Down Expand Up @@ -138,6 +158,10 @@ func getCliOptions() options {
flag.StringVar(&(options.remoteToken), "token", options.remoteToken, "Token to authenticate remote WP CLI requests")
flag.BoolVar(&(options.useWebsockets), "use-websockets", options.useWebsockets, "Use the websocket listener instead of raw tcp for remote WP CLI requests")
flag.StringVar(&(options.eventsWebhookURL), "events-webhook-url", options.eventsWebhookURL, "Webhook URL used to send WP CLI events")
flag.IntVar(&(options.maxHandshakeBytes), "max-handshake-bytes", options.maxHandshakeBytes, "Maximum number of bytes accepted during remote handshake")
flag.DurationVar(&(options.handshakeInitialTTL), "handshake-initial-timeout", options.handshakeInitialTTL, "Absolute timeout for finishing remote handshake")
flag.DurationVar(&(options.handshakeIdleTTL), "handshake-idle-timeout", options.handshakeIdleTTL, "Idle timeout between handshake packets")
flag.IntVar(&(options.maxHandshakeSessions), "max-concurrent-handshakes", options.maxHandshakeSessions, "Maximum number of concurrent in-progress remote handshakes")

// NOTE: this will exit if options are invalid or if help is requested, etc.
flag.Parse()
Expand Down
24 changes: 24 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@ It's helpful to specify some environment variables (e.g. in an `.env` file):
- `-use-mock-data`
- use the mock performer for testing

## Remote WP CLI Options
- `-token` string
- Token to authenticate remote WP CLI requests.
- `-use-websockets`
- Use the websocket listener instead of raw tcp for remote WP CLI requests.
- `-events-webhook-url` string
- Webhook URL used to send WP CLI events.
- `-max-handshake-bytes` int
- Maximum number of bytes accepted during remote handshake (default 65536).
- `-handshake-initial-timeout` duration
- Absolute timeout to finish the handshake, regardless of trickle traffic (default 15s).
- `-handshake-idle-timeout` duration
- Maximum idle time between handshake packets (default 200ms).
- `-max-concurrent-handshakes` int
- Maximum number of concurrent in-progress handshakes (default 256).

### Recommended Production Baseline
- Keep `-max-handshake-bytes` at `65536` unless clients require larger metadata.
- Keep `-handshake-initial-timeout` at `15s`; increase only if legitimate clients regularly exceed it.
- Keep `-handshake-idle-timeout` at `200ms`; this blocks slow-loris style packet trickling.
- Set `-max-concurrent-handshakes` to match host capacity and expected peak auth bursts.
- If memory pressure is observed during spikes, lower this value.
- If legitimate clients are rejected during deploy bursts, raise this value gradually.

## Architecture

![runner diagram](https://d.pr/i/1THmhI+)
Expand Down
230 changes: 190 additions & 40 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ import (

const (
shutdownErrorCode = 4001 // WebSocket close code when a shutdown signal is detected

defaultMaxHandshakeBytes = 64 * 1024
defaultHandshakeInitialTimeout = 15 * time.Second
defaultHandshakeIdleTimeout = 200 * time.Millisecond
defaultMaxConcurrentHandshakes = 256
)

var nonUTF8Replacement = []byte(string(unicode.ReplacementChar))
Expand All @@ -54,29 +59,69 @@ type wpCLIProcess struct {
}

var (
gGUIDLength = 36
gGUIDttys map[string]*wpCLIProcess
padlock *sync.Mutex
guidRegex *regexp.Regexp
gGUIDLength = 36
gGUIDttys map[string]*wpCLIProcess
padlock *sync.Mutex
guidRegex *regexp.Regexp
handshakeSem chan struct{}
)

type config struct {
remoteToken string
useWebsockets bool
wpCLIPath string
wpPath string
remoteToken string
useWebsockets bool
wpCLIPath string
wpPath string
maxHandshakeBytes int
handshakeInitialTimeout time.Duration
handshakeIdleTimeout time.Duration
maxConcurrentHandshakes int
}

type SetupOptions struct {
MaxHandshakeBytes int
HandshakeInitialTimeout time.Duration
HandshakeIdleTimeout time.Duration
MaxConcurrentHandshakes int
}

var remoteConfig config
var wpCliEventSender EventSender

// Setup configures the module (not super ideal, but this module needs some reworking to make it better)
func Setup(remoteToken string, useWebsockets bool, wpCLIPath string, wpPath string, eventsWebhookURL string) {
SetupWithOptions(remoteToken, useWebsockets, wpCLIPath, wpPath, eventsWebhookURL, SetupOptions{})
}

func SetupWithOptions(remoteToken string, useWebsockets bool, wpCLIPath string, wpPath string, eventsWebhookURL string, options SetupOptions) {
maxHandshakeBytes := options.MaxHandshakeBytes
if maxHandshakeBytes <= 0 {
maxHandshakeBytes = defaultMaxHandshakeBytes
}

handshakeInitialTimeout := options.HandshakeInitialTimeout
if handshakeInitialTimeout <= 0 {
handshakeInitialTimeout = defaultHandshakeInitialTimeout
}

handshakeIdleTimeout := options.HandshakeIdleTimeout
if handshakeIdleTimeout <= 0 {
handshakeIdleTimeout = defaultHandshakeIdleTimeout
}

maxConcurrentHandshakes := options.MaxConcurrentHandshakes
if maxConcurrentHandshakes <= 0 {
maxConcurrentHandshakes = defaultMaxConcurrentHandshakes
}

remoteConfig = config{
remoteToken: remoteToken,
useWebsockets: useWebsockets,
wpCLIPath: wpCLIPath,
wpPath: wpPath,
remoteToken: remoteToken,
useWebsockets: useWebsockets,
wpCLIPath: wpCLIPath,
wpPath: wpPath,
maxHandshakeBytes: maxHandshakeBytes,
handshakeInitialTimeout: handshakeInitialTimeout,
handshakeIdleTimeout: handshakeIdleTimeout,
maxConcurrentHandshakes: maxConcurrentHandshakes,
}

wpCliEventSender = setupWebhookSender(
Expand Down Expand Up @@ -105,6 +150,7 @@ func setupWebhookSender(remoteToken string, eventsWebhookURL string) EventSender
func ListenForConnections() {
gGUIDttys = make(map[string]*wpCLIProcess)
padlock = &sync.Mutex{}
handshakeSem = make(chan struct{}, effectiveMaxConcurrentHandshakes())

guidRegex = regexp.MustCompile("^[a-fA-F0-9\\-]+$")
if nil == guidRegex {
Expand Down Expand Up @@ -152,53 +198,155 @@ func ListenForConnections() {
for {
log.Println("listening...")
conn, err := listener.AcceptTCP()
log.Printf("connection from %s\n", conn.RemoteAddr().String())
if err != nil {
log.Printf("error accepting connection: %s\n", err.Error())
continue
}
log.Printf("connection from %s\n", conn.RemoteAddr().String())

go authConn(conn)
}
}

func authConn(conn net.Conn) {
var rows, cols uint16
var offset int64
var token, GUID, cmd string
var read int
var err error
var data []byte
buf := make([]byte, 65535)
func tryAcquireHandshakeSlot() bool {
if handshakeSem == nil {
return true
}

log.Println("waiting for auth data")
select {
case handshakeSem <- struct{}{}:
return true
default:
return false
}
}

conn.SetReadDeadline(time.Now().Add(time.Duration(5000 * time.Millisecond.Nanoseconds())))
bufReader := bufio.NewReader(conn)
func releaseHandshakeSlot() {
if handshakeSem == nil {
return
}

select {
case <-handshakeSem:
default:
}
}

func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}

return b
}

func effectiveMaxHandshakeBytes() int {
if remoteConfig.maxHandshakeBytes <= 0 {
return defaultMaxHandshakeBytes
}

return remoteConfig.maxHandshakeBytes
}

func effectiveHandshakeInitialTimeout() time.Duration {
if remoteConfig.handshakeInitialTimeout <= 0 {
return defaultHandshakeInitialTimeout
}

return remoteConfig.handshakeInitialTimeout
}

func effectiveHandshakeIdleTimeout() time.Duration {
if remoteConfig.handshakeIdleTimeout <= 0 {
return defaultHandshakeIdleTimeout
}

return remoteConfig.handshakeIdleTimeout
}

func effectiveMaxConcurrentHandshakes() int {
if remoteConfig.maxConcurrentHandshakes <= 0 {
return defaultMaxConcurrentHandshakes
}

return remoteConfig.maxConcurrentHandshakes
}

func readHandshakeData(conn net.Conn, bufReader *bufio.Reader) ([]byte, error) {
data := make([]byte, 0, 1024)
buf := make([]byte, 4096)
handshakeDeadline := time.Now().Add(effectiveHandshakeInitialTimeout())
handshakeIdleTimeout := effectiveHandshakeIdleTimeout()
maxHandshakeBytes := effectiveMaxHandshakeBytes()

if err := conn.SetReadDeadline(minTime(handshakeDeadline, time.Now().Add(handshakeIdleTimeout))); err != nil {
return nil, err
}

for {
read, err = bufReader.Read(buf)
read, err := bufReader.Read(buf)

if nil != err && !strings.Contains(err.Error(), "i/o timeout") {
conn.Write([]byte("error during handshaking\n"))
log.Printf("error handshaking: %s\n", err.Error())
conn.Close()
return
if read > 0 {
if len(data)+read > maxHandshakeBytes {
return nil, fmt.Errorf("error handshake exceeds maximum size of %d bytes", maxHandshakeBytes)
}

data = append(data, buf[:read]...)
if data[len(data)-1] == '\n' {
break
}
Comment on lines +289 to +297
}

if 0 != read {
if nil == data {
data = make([]byte, read)
copy(data, buf[:read])
} else {
data = append(data, buf[:read]...)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, errors.New("error handshake terminated before delimiter")
}
} else if 0 == bufReader.Buffered() {
break

if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return nil, errors.New("error handshake timed out")
}

return nil, err
}

if err := conn.SetReadDeadline(minTime(handshakeDeadline, time.Now().Add(handshakeIdleTimeout))); err != nil {
return nil, err
}
}

return data, nil
}

func authConn(conn net.Conn) {
var rows, cols uint16
var offset int64
var token, GUID, cmd string
var err error
var data []byte
handshakeSlotHeld := false

conn.SetReadDeadline(time.Now().Add(time.Duration(200 * time.Millisecond.Nanoseconds())))
if !tryAcquireHandshakeSlot() {
conn.Write([]byte("server busy, try again"))
conn.Close()
return
}
handshakeSlotHeld = true
defer func() {
if handshakeSlotHeld {
releaseHandshakeSlot()
}
}()

log.Println("waiting for auth data")

bufReader := bufio.NewReader(conn)
data, err = readHandshakeData(conn, bufReader)
if nil != err {
Comment on lines +340 to +344
conn.Write([]byte("error during handshaking\n"))
log.Printf("error handshaking: %s\n", err.Error())
conn.Close()
return
}
buf = nil

size := len(data)
log.Printf("size of handshake %d\n", size)
Expand Down Expand Up @@ -238,6 +386,8 @@ func authConn(conn net.Conn) {
return
}

handshakeSlotHeld = false
releaseHandshakeSlot()
log.Println("handshake complete!")

conn.SetReadDeadline(time.Time{})
Expand Down
Loading
Loading