-
Notifications
You must be signed in to change notification settings - Fork 25
fix: improve write pipeline concurrency and cleanup #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| module github.com/influxdata/inch | ||
|
|
||
| go 1.17 | ||
| go 1.24 | ||
|
|
||
| require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -74,7 +74,7 @@ type Simulator struct { | |||||
|
|
||||||
| // Decay factor used when weighting average latency returned by server. | ||||||
| alpha float64 | ||||||
| V2 bool | ||||||
| V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used | ||||||
| Token string | ||||||
| Verbose bool | ||||||
| ReportHost string | ||||||
|
|
@@ -159,7 +159,7 @@ func (s *Simulator) Validate() error { | |||||
| el = append(el, errors.New("number of fields must be > 0")) | ||||||
| } | ||||||
|
|
||||||
| // validate reporting client is accessable | ||||||
| // validate reporting client is accessible | ||||||
| if s.ReportHost != "" { | ||||||
| var err error | ||||||
| s.clt, err = client.NewHTTPClient(client.HTTPConfig{ | ||||||
|
|
@@ -183,6 +183,15 @@ func (s *Simulator) Validate() error { | |||||
| el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) | ||||||
| } | ||||||
|
|
||||||
| if s.Concurrency <= 0 { | ||||||
| fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency) | ||||||
| s.Concurrency = 1 | ||||||
| } | ||||||
|
|
||||||
| if s.Concurrency >= 50 { | ||||||
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) | ||||||
|
||||||
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) | |
| fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; 50 or more concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a channel of size 10 means 10 batches get buffered (naturally) but if the concurrency is higher, they might drain unsmoothly creating delays for the concurrent writers because a batch isn't available yet: the writing might stutter on inch's data generation size.
If the channel for buffering batches is slightly bigger than concurrency, the individual writeres should see a smoother pattern where the next batch is always available when it is finished with the last and ready to write the next batch (without waiting on generation).
This pr increases cpu usage but increasing the concurrent work batch generation and writing to the db under test. Also increases memory usage if concurrency is high, but really high concurrency might be better simulated with multiple inch instances running.
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) creates a new buffer that's immediately discarded on the first g.Reset(b) call. Consider initializing the gzip writer with a long-lived bytes.Buffer (or a dummy writer like io.Discard) so there isn't an unnecessary allocation at startup.
| g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) | |
| g := gzip.NewWriter(io.Discard) |
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new bytes.Buffer for every batch (bytes.NewBuffer(gzipBackingBuffer)) defeats the intended buffer reuse (and also doesn’t retain any grown capacity because gzipBackingBuffer is never updated). To actually reduce allocations, keep a per-client bytes.Buffer outside the loop and Reset() it per batch (and/or update the retained backing slice from the buffer after use).
Copilot
AI
Feb 12, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the non-gzip path, bytes.NewBuffer(buf) allocates a new bytes.Buffer per batch even though you already have the final []byte. Consider avoiding the buffer wrapper and keep the []byte directly for the retry loop to reduce per-batch allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping the module
goversion to 1.24 reduces build compatibility significantly. This change appears primarily needed formin/maxbuiltins (Go 1.21+); if no other 1.24-specific features are required, consider setting thegodirective to the minimum version actually needed (or using local helper functions) to keep the supported Go range wider.