fix: improve write pipeline concurrency and cleanup#49
Conversation
Fixes channel buffering for concurrent writers by sizing the batch channel to match the concurrency setting. Moves gzip compression and buffer preparation outside the retry loop to avoid needless recompression. Reduces allocations by reusing buffers and taking ownership of batch data. Updates golang to 1.24. Fixes typos and wording in comments.
| // use concurrency setting to adjust internal buffering. We should attempt to make as many batches | ||
| // as there are concurrent writers up to some limit ~50. | ||
| pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1 | ||
| ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant |
There was a problem hiding this comment.
a channel of size 10 means 10 batches get buffered (naturally) but if the concurrency is higher, they might drain unsmoothly creating delays for the concurrent writers because a batch isn't available yet: the writing might stutter on inch's data generation size.
If the channel for buffering batches is slightly bigger than concurrency, the individual writeres should see a smoother pattern where the next batch is always available when it is finished with the last and ready to write the next batch (without waiting on generation).
This pr increases cpu usage but increasing the concurrent work batch generation and writing to the db under test. Also increases memory usage if concurrency is high, but really high concurrency might be better simulated with multiple inch instances running.
There was a problem hiding this comment.
Pull request overview
Improves the write pipeline’s behavior under concurrent writers by adjusting internal batch buffering, reducing repeated work in the client retry loop, and updating minor comments/typos; also bumps the module’s Go version.
Changes:
- Size the batch generation channel buffer based on configured concurrency (with a cap).
- Move gzip/buffer preparation outside the retry loop in
runClientand adjust retry error handling. - Update Go version directive and fix typos/wording in comments.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
inch.go |
Concurrency-aware batch buffering; refactors client batch handling; comment/wording updates. |
go.mod |
Updates the module go version directive. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| if s.Concurrency >= 50 { | ||
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) |
There was a problem hiding this comment.
The warning condition uses s.Concurrency >= 50 but the message says "over 50 concurrent writers". Either change the condition to > 50 or adjust the warning text so they match.
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) | |
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; 50 or more concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) |
| b := bytes.NewBuffer(make([]byte, 0, 1024)) | ||
| g := gzip.NewWriter(b) | ||
| gzipBackingBuffer := make([]byte, 0, 1024) | ||
| g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) |
There was a problem hiding this comment.
gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) creates a new buffer that's immediately discarded on the first g.Reset(b) call. Consider initializing the gzip writer with a long-lived bytes.Buffer (or a dummy writer like io.Discard) so there isn't an unnecessary allocation at startup.
| g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) | |
| g := gzip.NewWriter(io.Discard) |
| b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b' | ||
| b.Reset() // resets backing buffer to zero length |
There was a problem hiding this comment.
Creating a new bytes.Buffer for every batch (bytes.NewBuffer(gzipBackingBuffer)) defeats the intended buffer reuse (and also doesn’t retain any grown capacity because gzipBackingBuffer is never updated). To actually reduce allocations, keep a per-client bytes.Buffer outside the loop and Reset() it per batch (and/or update the retained backing slice from the buffer after use).
| b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer | ||
| } |
There was a problem hiding this comment.
In the non-gzip path, bytes.NewBuffer(buf) allocates a new bytes.Buffer per batch even though you already have the final []byte. Consider avoiding the buffer wrapper and keep the []byte directly for the retry loop to reduce per-batch allocations.
| module github.com/influxdata/inch | ||
|
|
||
| go 1.17 | ||
| go 1.24 |
There was a problem hiding this comment.
Bumping the module go version to 1.24 reduces build compatibility significantly. This change appears primarily needed for min/max builtins (Go 1.21+); if no other 1.24-specific features are required, consider setting the go directive to the minimum version actually needed (or using local helper functions) to keep the supported Go range wider.
| go 1.24 | |
| go 1.21 |
Fixes channel buffering for concurrent writers by sizing the batch channel to match the concurrency setting. Moves gzip compression and buffer preparation outside the retry loop to avoid needless recompression. Reduces allocations by reusing buffers and taking ownership of batch data.
Updates golang to 1.24. Fixes typos and wording in comments.