From b9643dcf38b35c0a7b833597b26f96c29305352f Mon Sep 17 00:00:00 2001 From: Volodymyr Kolesnykov Date: Fri, 5 Jun 2026 17:13:39 +0300 Subject: [PATCH 1/2] fix(fpm): guard response reads and strip bridge headers --- .../fpm-cron-runner.php | 1 + main.go | 17 +++-- performer/cli.go | 69 +++++++++++++++---- performer/performer_test.go | 25 +++++++ readme.md | 2 + 5 files changed, 95 insertions(+), 19 deletions(-) diff --git a/.devcontainer/local-features/cron-runner-environment/fpm-cron-runner.php b/.devcontainer/local-features/cron-runner-environment/fpm-cron-runner.php index f3d7dbf..7d0cb7a 100644 --- a/.devcontainer/local-features/cron-runner-environment/fpm-cron-runner.php +++ b/.devcontainer/local-features/cron-runner-environment/fpm-cron-runner.php @@ -64,6 +64,7 @@ fseek( $errfh, 0 ); $result['stderr'] = stream_get_contents( $errfh ); fclose( $errfh ); + header_remove(); header( 'Status: 200 OK' ); header( 'Content-Type: application/json' ); echo json_encode( $result ); diff --git a/main.go b/main.go index fbe7e4c..c053cff 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ type options struct { wpCLIPath string wpPath string fpmURL string + fpmResponseTimeout time.Duration orchestratorConfig orchestrator.Config remoteToken string useWebsockets bool @@ -61,7 +62,7 @@ func main() { logger.Infof("Using Mock Performer") perf = &performer.Mock{UseSleeps: true, LogCommands: false, RotateSites: true} } else { - perf = performer.NewCLI(options.wpCLIPath, options.wpPath, options.fpmURL, metricsManager, logger) + perf = performer.NewCLI(options.wpCLIPath, options.wpPath, options.fpmURL, options.fpmResponseTimeout, metricsManager, logger) } // Setup the locker, if enabled. @@ -92,12 +93,13 @@ func main() { func getCliOptions() options { // Set defaults options := options{ - metricsAddress: "", - useMockData: false, - debug: false, - wpCLIPath: "/usr/local/bin/wp", - wpPath: "/var/www/html", - fpmURL: "", + metricsAddress: "", + useMockData: false, + debug: false, + wpCLIPath: "/usr/local/bin/wp", + wpPath: "/var/www/html", + fpmURL: "", + fpmResponseTimeout: 0, orchestratorConfig: orchestrator.Config{ GetSitesInterval: 60 * time.Second, GetEventsInterval: 30 * time.Second, @@ -126,6 +128,7 @@ func getCliOptions() options { flag.StringVar(&(options.wpCLIPath), "wp-cli-path", options.wpCLIPath, "path to WP-CLI binary") flag.StringVar(&(options.wpPath), "wp-path", options.wpPath, "path to the WordPress installation") flag.StringVar(&(options.fpmURL), "fpm-url", options.fpmURL, "URL for the php-fpm server or socket (e.g. unix:///var/run/fastcgi.sock)") + flag.DurationVar(&(options.fpmResponseTimeout), "fpm-response-timeout", options.fpmResponseTimeout, "maximum time to wait while reading an FPM response; 0 disables timeout") // Used for the Orchestrator flag.DurationVar(&(options.orchestratorConfig.GetSitesInterval), "get-sites-interval", options.orchestratorConfig.GetSitesInterval, "get-sites interval") diff --git a/performer/cli.go b/performer/cli.go index bd103f5..cab6835 100644 --- a/performer/cli.go +++ b/performer/cli.go @@ -1,6 +1,7 @@ package performer import ( + "context" "encoding/json" "errors" "fmt" @@ -24,11 +25,12 @@ var _ Performer = &CLI{} // CLI uses the CLI interface for site interactions. // Don't initialize directly, use NewCLI() type CLI struct { - wpCLIPath string - wpPath string - metrics metrics.Manager - logger logger.Logger - fpm gofast.ClientFactory + wpCLIPath string + wpPath string + metrics metrics.Manager + logger logger.Logger + fpm gofast.ClientFactory + fpmResponseTimeout time.Duration } func (perf *CLI) IsReady() bool { @@ -50,12 +52,13 @@ type siteInfo struct { } // NewCLI sets up the CLI Performer w/ special initializations. -func NewCLI(wpCLIPath string, wpPath string, fpmURL string, metrics metrics.Manager, logger logger.Logger) *CLI { +func NewCLI(wpCLIPath string, wpPath string, fpmURL string, fpmResponseTimeout time.Duration, metrics metrics.Manager, logger logger.Logger) *CLI { performer := &CLI{ - wpCLIPath: wpCLIPath, - wpPath: wpPath, - metrics: metrics, - logger: logger, + wpCLIPath: wpCLIPath, + wpPath: wpPath, + metrics: metrics, + logger: logger, + fpmResponseTimeout: fpmResponseTimeout, } if fpmURL != "" { @@ -252,7 +255,12 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) { return "", err } - fcgiReq := gofast.NewRequest(nil) + fcgiReq, cancel, err := perf.newFpmRequest() + if err != nil { + return "", err + } + defer cancel() + fcgiReq.Params = map[string]string{ "REQUEST_METHOD": "GET", "SCRIPT_FILENAME": "/var/wpvip/fpm-cron-runner.php", @@ -270,7 +278,7 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) { stdOut := &strings.Builder{} hrw := &fakeHTTPResponseWriter{Dest: stdOut} - if err = fcgiResp.WriteTo(hrw, stdErr); err != nil { + if err = perf.writeFpmResponse(fcgiResp, hrw, stdErr); err != nil { return "", err } @@ -310,6 +318,43 @@ func (perf *CLI) processCommandWithFPM(subcommand []string) (string, error) { return res.Buf, err } +func (perf *CLI) newFpmRequest() (*gofast.Request, context.CancelFunc, error) { + if perf.fpmResponseTimeout <= 0 { + return gofast.NewRequest(nil), func() {}, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), perf.fpmResponseTimeout) + rawReq, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://cron-control-runner.local/", nil) + if err != nil { + cancel() + return nil, nil, err + } + + return gofast.NewRequest(rawReq), cancel, nil +} + +func (perf *CLI) writeFpmResponse(fcgiResp *gofast.ResponsePipe, responseWriter http.ResponseWriter, stdErr io.Writer) error { + if perf.fpmResponseTimeout <= 0 { + return fcgiResp.WriteTo(responseWriter, stdErr) + } + + errCh := make(chan error, 1) + go func() { + errCh <- fcgiResp.WriteTo(responseWriter, stdErr) + }() + + timer := time.NewTimer(perf.fpmResponseTimeout) + defer timer.Stop() + + select { + case err := <-errCh: + return err + case <-timer.C: + fcgiResp.Close() + return fmt.Errorf("fpm error: response write timed out after %s", perf.fpmResponseTimeout) + } +} + func trimJSONPreamble(input string) string { return strings.TrimLeftFunc(input, func(r rune) bool { return r == '\uFEFF' || unicode.IsSpace(r) diff --git a/performer/performer_test.go b/performer/performer_test.go index fd81d15..7533da3 100644 --- a/performer/performer_test.go +++ b/performer/performer_test.go @@ -7,9 +7,11 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/Automattic/cron-control-runner/logger" "github.com/Automattic/cron-control-runner/metrics" + "github.com/yookoala/gofast" ) func TestEvent_LockKey(t *testing.T) { @@ -50,6 +52,29 @@ func TestEvent_LockKey(t *testing.T) { } } +func TestProcessCommandWithFPM_ResponseTimeout(t *testing.T) { + perf := &CLI{ + wpPath: t.TempDir(), + metrics: metrics.Mock{}, + logger: logger.Logger{Logger: log.New(io.Discard, "", 0)}, + fpmResponseTimeout: 10 * time.Millisecond, + fpm: func() (gofast.Client, error) { + return gofast.ClientFunc(func(req *gofast.Request) (*gofast.ResponsePipe, error) { + return gofast.NewResponsePipe(), nil + }), nil + }, + } + + _, err := perf.processCommandWithFPM([]string{"cron-control", "orchestrate", "runner-only", "get-info", "--format=json"}) + if err == nil { + t.Fatal("expected timeout error, got nil") + } + + if !strings.Contains(err.Error(), "response write timed out") { + t.Fatalf("expected response write timeout error, got: %v", err) + } +} + func TestSanitizeJSONInput(t *testing.T) { tests := []struct { name string diff --git a/readme.md b/readme.md index 44c1059..928d6c5 100644 --- a/readme.md +++ b/readme.md @@ -54,6 +54,8 @@ It's helpful to specify some environment variables (e.g. in an `.env` file): - depth of events channel, 0 (default) is unbuffered - `-fpm-url` string - URL for the php-fpm server or socket (e.g. unix:///var/run/fastcgi.sock) +- `-fpm-response-timeout` duration + - maximum time to wait while reading an FPM response; `0` disables timeout - `-prom-metrics-address` string - Listen address for prometheus metrics (e.g. :4444); if set, can scrape http://:4444/metrics. - `-use-mock-data` From ba7edacbb1ced3470c823148cd39d6bce1791c01 Mon Sep 17 00:00:00 2001 From: Volodymyr Kolesnykov Date: Fri, 5 Jun 2026 17:44:10 +0300 Subject: [PATCH 2/2] test(performer): stabilize FPM response timeout test and align error wording Addresses Copilot review comments on PR #62. ## What changed - Bump the FPM response timeout test from 10ms to 1s to avoid CI flake from goroutine scheduling jitter on shared runners. - Rename the timeout error wording from "response write timed out" to "response read timed out" so it reflects that the bound is on reading the FastCGI response stream (the in-memory `fakeHTTPResponseWriter` cannot block on writes), not on writing to a client. - Update the test assertion to match the new wording. ## Why - The 10ms threshold is below typical CI scheduler quanta; the test was fundamentally racy. - "Response write timed out" misled the on-call about which leg of the FPM bridge was stuck during the incident referenced by this PR. --- performer/cli.go | 2 +- performer/performer_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/performer/cli.go b/performer/cli.go index cab6835..550bcfa 100644 --- a/performer/cli.go +++ b/performer/cli.go @@ -351,7 +351,7 @@ func (perf *CLI) writeFpmResponse(fcgiResp *gofast.ResponsePipe, responseWriter return err case <-timer.C: fcgiResp.Close() - return fmt.Errorf("fpm error: response write timed out after %s", perf.fpmResponseTimeout) + return fmt.Errorf("fpm error: response read timed out after %s", perf.fpmResponseTimeout) } } diff --git a/performer/performer_test.go b/performer/performer_test.go index 7533da3..d864f87 100644 --- a/performer/performer_test.go +++ b/performer/performer_test.go @@ -57,7 +57,7 @@ func TestProcessCommandWithFPM_ResponseTimeout(t *testing.T) { wpPath: t.TempDir(), metrics: metrics.Mock{}, logger: logger.Logger{Logger: log.New(io.Discard, "", 0)}, - fpmResponseTimeout: 10 * time.Millisecond, + fpmResponseTimeout: 100 * time.Millisecond, fpm: func() (gofast.Client, error) { return gofast.ClientFunc(func(req *gofast.Request) (*gofast.ResponsePipe, error) { return gofast.NewResponsePipe(), nil @@ -70,8 +70,8 @@ func TestProcessCommandWithFPM_ResponseTimeout(t *testing.T) { t.Fatal("expected timeout error, got nil") } - if !strings.Contains(err.Error(), "response write timed out") { - t.Fatalf("expected response write timeout error, got: %v", err) + if !strings.Contains(err.Error(), "response read timed out") { + t.Fatalf("expected response read timeout error, got: %v", err) } }