From 03447d16dd1e179d77685670bc5c26798358d806 Mon Sep 17 00:00:00 2001 From: Phil Bracikowski Date: Thu, 12 Feb 2026 14:26:29 -0800 Subject: [PATCH] fix: improve write pipeline concurrency and cleanup Fixes channel buffering for concurrent writers by sizing the batch channel to match the concurrency setting. Moves gzip compression and buffer preparation outside the retry loop to avoid needless recompression. Reduces allocations by reusing buffers and taking ownership of batch data. Updates golang to 1.24. Fixes typos and wording in comments. --- go.mod | 2 +- inch.go | 80 +++++++++++++++++++++++++++++++-------------------------- 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index bca333d..0af3084 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/influxdata/inch -go 1.17 +go 1.24 require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d diff --git a/inch.go b/inch.go index ac9ad8b..10b41e7 100644 --- a/inch.go +++ b/inch.go @@ -74,7 +74,7 @@ type Simulator struct { // Decay factor used when weighting average latency returned by server. alpha float64 - V2 bool + V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used Token string Verbose bool ReportHost string @@ -159,7 +159,7 @@ func (s *Simulator) Validate() error { el = append(el, errors.New("number of fields must be > 0")) } - // validate reporting client is accessable + // validate reporting client is accessible if s.ReportHost != "" { var err error s.clt, err = client.NewHTTPClient(client.HTTPConfig{ @@ -183,6 +183,15 @@ func (s *Simulator) Validate() error { el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) } + if s.Concurrency <= 0 { + fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency) + s.Concurrency = 1 + } + + if s.Concurrency >= 50 { + fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) + } + if len(el) > 0 { return el } @@ -275,7 +284,7 @@ func (s *Simulator) Run(ctx context.Context) error { } // Stream batches from a separate goroutine. - ch := s.generateBatches() + ch := s.generateBatches(s.Concurrency) // Start clients. var wg sync.WaitGroup @@ -350,8 +359,11 @@ func (s *Simulator) makeField(val int) []string { } // generateBatches returns a channel for streaming batches. -func (s *Simulator) generateBatches() <-chan []byte { - ch := make(chan []byte, 10) +func (s *Simulator) generateBatches(concurrency int) <-chan []byte { + // use concurrency setting to adjust internal buffering. We should attempt to make as many batches + // as there are concurrent writers up to some limit ~50. + pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1 + ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant go func() { values := make([]int, len(s.Tags)) @@ -440,7 +452,7 @@ func (s *Simulator) generateBatches() <-chan []byte { return ch } -var space []byte = []byte(" ") +var space = []byte(" ") func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) { buf.Write(measurement) // Write measurement @@ -623,8 +635,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration { // runClient executes a client to send points in a separate goroutine. func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { - b := bytes.NewBuffer(make([]byte, 0, 1024)) - g := gzip.NewWriter(b) + gzipBackingBuffer := make([]byte, 0, 1024) + g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) for { select { @@ -636,35 +648,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { return } - // Keep trying batch until successful. - // Stop client if it cannot connect. - for { - b.Reset() + b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b' + b.Reset() // resets backing buffer to zero length - if s.Gzip { - g.Reset(b) + if s.Gzip { + g.Reset(b) - if _, err := g.Write(buf); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if _, err := g.Write(buf); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) + } - if err := g.Close(); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } - } else { - _, err := io.Copy(b, bytes.NewReader(buf)) - if err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if err := g.Close(); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) } + } else { + b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer + } - if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused { + // Keep trying batch until successful. + // Stop client if it cannot connect. + for { + if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) { return } else if err != nil { fmt.Fprintln(s.Stderr, err) @@ -787,7 +795,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. // sendBatch writes a batch to the server. Continually retries until successful. func (s *Simulator) sendBatch(buf []byte) error { - // Don't send the batch anywhere.. + // Don't send the batch anywhere. if s.DryRun { return nil } @@ -854,17 +862,17 @@ func (s *Simulator) sendBatch(buf []byte) error { // slower than the desired maximum latency. We use a weighted moving average // to determine that, favouring recent latencies over historic ones. // - // The implementation is pretty ghetto at the moment, it has the following + // The implementation is pretty primitive at the moment, it has the following // rules: // - // - wma reponse time faster than desired latency and currentDelay > 0? + // - wma response time faster than desired latency and currentDelay > 0? // * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency). // - response time slower than desired latency? // * increase currentDelay by 1/n * 0.25 * (desired latency - wma response). // - currentDelay < 100ms? // * set currentDelay to 0 // - // n is the number of concurent writers. The general rule then, is that + // n is the number of concurrent writers. The general rule then, is that // we look at how far away from the desired latency and move a quarter of the // way there in total (over all writers). If we're coming un under the max // latency and our writers are using a delay (currentDelay > 0) then we will