Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
fseek( $errfh, 0 );
$result['stderr'] = stream_get_contents( $errfh );
fclose( $errfh );
header_remove();
header( 'Status: 200 OK' );
header( 'Content-Type: application/json' );
echo json_encode( $result );
Expand Down
17 changes: 10 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type options struct {
wpCLIPath string
wpPath string
fpmURL string
fpmResponseTimeout time.Duration
orchestratorConfig orchestrator.Config
remoteToken string
useWebsockets bool
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
logger.Infof("Using Mock Performer")
perf = &performer.Mock{UseSleeps: true, LogCommands: false, RotateSites: true}
} else {
perf = performer.NewCLI(options.wpCLIPath, options.wpPath, options.fpmURL, metricsManager, logger)
perf = performer.NewCLI(options.wpCLIPath, options.wpPath, options.fpmURL, options.fpmResponseTimeout, metricsManager, logger)
}

// Setup the locker, if enabled.
Expand Down Expand Up @@ -92,12 +93,13 @@ func main() {
func getCliOptions() options {
// Set defaults
options := options{
metricsAddress: "",
useMockData: false,
debug: false,
wpCLIPath: "/usr/local/bin/wp",
wpPath: "/var/www/html",
fpmURL: "",
metricsAddress: "",
useMockData: false,
debug: false,
wpCLIPath: "/usr/local/bin/wp",
wpPath: "/var/www/html",
fpmURL: "",
fpmResponseTimeout: 0,
orchestratorConfig: orchestrator.Config{
GetSitesInterval: 60 * time.Second,
GetEventsInterval: 30 * time.Second,
Expand Down Expand Up @@ -126,6 +128,7 @@ func getCliOptions() options {
flag.StringVar(&(options.wpCLIPath), "wp-cli-path", options.wpCLIPath, "path to WP-CLI binary")
flag.StringVar(&(options.wpPath), "wp-path", options.wpPath, "path to the WordPress installation")
flag.StringVar(&(options.fpmURL), "fpm-url", options.fpmURL, "URL for the php-fpm server or socket (e.g. unix:///var/run/fastcgi.sock)")
flag.DurationVar(&(options.fpmResponseTimeout), "fpm-response-timeout", options.fpmResponseTimeout, "maximum time to wait while reading an FPM response; 0 disables timeout")

// Used for the Orchestrator
flag.DurationVar(&(options.orchestratorConfig.GetSitesInterval), "get-sites-interval", options.orchestratorConfig.GetSitesInterval, "get-sites interval")
Expand Down
69 changes: 57 additions & 12 deletions performer/cli.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package performer

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -24,11 +25,12 @@ var _ Performer = &CLI{}
// CLI uses the CLI interface for site interactions.
// Don't initialize directly, use NewCLI()
type CLI struct {
wpCLIPath string
wpPath string
metrics metrics.Manager
logger logger.Logger
fpm gofast.ClientFactory
wpCLIPath string
wpPath string
metrics metrics.Manager
logger logger.Logger
fpm gofast.ClientFactory
fpmResponseTimeout time.Duration
}

func (perf *CLI) IsReady() bool {
Expand All @@ -50,12 +52,13 @@ type siteInfo struct {
}

// NewCLI sets up the CLI Performer w/ special initializations.
func NewCLI(wpCLIPath string, wpPath string, fpmURL string, metrics metrics.Manager, logger logger.Logger) *CLI {
func NewCLI(wpCLIPath string, wpPath string, fpmURL string, fpmResponseTimeout time.Duration, metrics metrics.Manager, logger logger.Logger) *CLI {
performer := &CLI{
wpCLIPath: wpCLIPath,
wpPath: wpPath,
metrics: metrics,
logger: logger,
wpCLIPath: wpCLIPath,
wpPath: wpPath,
metrics: metrics,
logger: logger,
fpmResponseTimeout: fpmResponseTimeout,
}

if fpmURL != "" {
Expand Down Expand Up @@ -252,7 +255,12 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) {
return "", err
}

fcgiReq := gofast.NewRequest(nil)
fcgiReq, cancel, err := perf.newFpmRequest()
if err != nil {
return "", err
}
defer cancel()

fcgiReq.Params = map[string]string{
"REQUEST_METHOD": "GET",
"SCRIPT_FILENAME": "/var/wpvip/fpm-cron-runner.php",
Expand All @@ -270,7 +278,7 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) {
stdOut := &strings.Builder{}
hrw := &fakeHTTPResponseWriter{Dest: stdOut}

if err = fcgiResp.WriteTo(hrw, stdErr); err != nil {
if err = perf.writeFpmResponse(fcgiResp, hrw, stdErr); err != nil {
return "", err
}

Expand Down Expand Up @@ -310,6 +318,43 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) {
return res.Buf, err
}

func (perf *CLI) newFpmRequest() (*gofast.Request, context.CancelFunc, error) {
if perf.fpmResponseTimeout <= 0 {
return gofast.NewRequest(nil), func() {}, nil
}

ctx, cancel := context.WithTimeout(context.Background(), perf.fpmResponseTimeout)
rawReq, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://cron-control-runner.local/", nil)
if err != nil {
cancel()
return nil, nil, err
}

return gofast.NewRequest(rawReq), cancel, nil
}

func (perf *CLI) writeFpmResponse(fcgiResp *gofast.ResponsePipe, responseWriter http.ResponseWriter, stdErr io.Writer) error {
if perf.fpmResponseTimeout <= 0 {
return fcgiResp.WriteTo(responseWriter, stdErr)
}

errCh := make(chan error, 1)
go func() {
errCh <- fcgiResp.WriteTo(responseWriter, stdErr)
}()

timer := time.NewTimer(perf.fpmResponseTimeout)
defer timer.Stop()

select {
case err := <-errCh:
return err
case <-timer.C:
fcgiResp.Close()
return fmt.Errorf("fpm error: response read timed out after %s", perf.fpmResponseTimeout)
}
}

func trimJSONPreamble(input string) string {
return strings.TrimLeftFunc(input, func(r rune) bool {
return r == '\uFEFF' || unicode.IsSpace(r)
Expand Down
25 changes: 25 additions & 0 deletions performer/performer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/Automattic/cron-control-runner/logger"
"github.com/Automattic/cron-control-runner/metrics"
"github.com/yookoala/gofast"
)

func TestEvent_LockKey(t *testing.T) {
Expand Down Expand Up @@ -50,6 +52,29 @@ func TestEvent_LockKey(t *testing.T) {
}
}

func TestProcessCommandWithFPM_ResponseTimeout(t *testing.T) {
perf := &CLI{
wpPath: t.TempDir(),
metrics: metrics.Mock{},
logger: logger.Logger{Logger: log.New(io.Discard, "", 0)},
fpmResponseTimeout: 100 * time.Millisecond,
fpm: func() (gofast.Client, error) {
Comment thread
sjinks marked this conversation as resolved.
return gofast.ClientFunc(func(req *gofast.Request) (*gofast.ResponsePipe, error) {
return gofast.NewResponsePipe(), nil
}), nil
},
}

_, err := perf.processCommandWithFPM([]string{"cron-control", "orchestrate", "runner-only", "get-info", "--format=json"})
if err == nil {
t.Fatal("expected timeout error, got nil")
}

if !strings.Contains(err.Error(), "response read timed out") {
t.Fatalf("expected response read timeout error, got: %v", err)
}
}

func TestSanitizeJSONInput(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 2 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ It's helpful to specify some environment variables (e.g. in an `.env` file):
- depth of events channel, 0 (default) is unbuffered
- `-fpm-url` string
- URL for the php-fpm server or socket (e.g. unix:///var/run/fastcgi.sock)
- `-fpm-response-timeout` duration
- maximum time to wait while reading an FPM response; `0` disables timeout
- `-prom-metrics-address` string
- Listen address for prometheus metrics (e.g. :4444); if set, can scrape http://:4444/metrics.
- `-use-mock-data`
Expand Down
Loading