Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/influxdata/inch

go 1.17
go 1.24
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping the module go version to 1.24 reduces build compatibility significantly. This change appears primarily needed for min/max builtins (Go 1.21+); if no other 1.24-specific features are required, consider setting the go directive to the minimum version actually needed (or using local helper functions) to keep the supported Go range wider.

Suggested change
go 1.24
go 1.21

Copilot uses AI. Check for mistakes.

require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
80 changes: 44 additions & 36 deletions inch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Simulator struct {

// Decay factor used when weighting average latency returned by server.
alpha float64
V2 bool
V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used
Token string
Verbose bool
ReportHost string
Expand Down Expand Up @@ -159,7 +159,7 @@ func (s *Simulator) Validate() error {
el = append(el, errors.New("number of fields must be > 0"))
}

// validate reporting client is accessable
// validate reporting client is accessible
if s.ReportHost != "" {
var err error
s.clt, err = client.NewHTTPClient(client.HTTPConfig{
Expand All @@ -183,6 +183,15 @@ func (s *Simulator) Validate() error {
el = append(el, fmt.Errorf("invalid precision: %s", s.Precision))
}

if s.Concurrency <= 0 {
fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency)
s.Concurrency = 1
}

if s.Concurrency >= 50 {
fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency)
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning condition uses s.Concurrency >= 50 but the message says "over 50 concurrent writers". Either change the condition to > 50 or adjust the warning text so they match.

Suggested change
fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency)
fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; 50 or more concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency)

Copilot uses AI. Check for mistakes.
}

if len(el) > 0 {
return el
}
Expand Down Expand Up @@ -275,7 +284,7 @@ func (s *Simulator) Run(ctx context.Context) error {
}

// Stream batches from a separate goroutine.
ch := s.generateBatches()
ch := s.generateBatches(s.Concurrency)

// Start clients.
var wg sync.WaitGroup
Expand Down Expand Up @@ -350,8 +359,11 @@ func (s *Simulator) makeField(val int) []string {
}

// generateBatches returns a channel for streaming batches.
func (s *Simulator) generateBatches() <-chan []byte {
ch := make(chan []byte, 10)
func (s *Simulator) generateBatches(concurrency int) <-chan []byte {
// use concurrency setting to adjust internal buffering. We should attempt to make as many batches
// as there are concurrent writers up to some limit ~50.
pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1
ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a channel of size 10 means 10 batches get buffered (naturally) but if the concurrency is higher, they might drain unsmoothly creating delays for the concurrent writers because a batch isn't available yet: the writing might stutter on inch's data generation size.

If the channel for buffering batches is slightly bigger than concurrency, the individual writeres should see a smoother pattern where the next batch is always available when it is finished with the last and ready to write the next batch (without waiting on generation).

This pr increases cpu usage but increasing the concurrent work batch generation and writing to the db under test. Also increases memory usage if concurrency is high, but really high concurrency might be better simulated with multiple inch instances running.


go func() {
values := make([]int, len(s.Tags))
Expand Down Expand Up @@ -440,7 +452,7 @@ func (s *Simulator) generateBatches() <-chan []byte {
return ch
}

var space []byte = []byte(" ")
var space = []byte(" ")

func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) {
buf.Write(measurement) // Write measurement
Expand Down Expand Up @@ -623,8 +635,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration {

// runClient executes a client to send points in a separate goroutine.
func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) {
b := bytes.NewBuffer(make([]byte, 0, 1024))
g := gzip.NewWriter(b)
gzipBackingBuffer := make([]byte, 0, 1024)
g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer))
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) creates a new buffer that's immediately discarded on the first g.Reset(b) call. Consider initializing the gzip writer with a long-lived bytes.Buffer (or a dummy writer like io.Discard) so there isn't an unnecessary allocation at startup.

Suggested change
g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer))
g := gzip.NewWriter(io.Discard)

Copilot uses AI. Check for mistakes.

for {
select {
Expand All @@ -636,35 +648,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) {
return
}

// Keep trying batch until successful.
// Stop client if it cannot connect.
for {
b.Reset()
b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b'
b.Reset() // resets backing buffer to zero length
Comment on lines +651 to +652
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new bytes.Buffer for every batch (bytes.NewBuffer(gzipBackingBuffer)) defeats the intended buffer reuse (and also doesn’t retain any grown capacity because gzipBackingBuffer is never updated). To actually reduce allocations, keep a per-client bytes.Buffer outside the loop and Reset() it per batch (and/or update the retained backing slice from the buffer after use).

Copilot uses AI. Check for mistakes.

if s.Gzip {
g.Reset(b)
if s.Gzip {
g.Reset(b)

if _, err := g.Write(buf); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
if _, err := g.Write(buf); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}

if err := g.Close(); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
} else {
_, err := io.Copy(b, bytes.NewReader(buf))
if err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
if err := g.Close(); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
} else {
b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer
}
Comment on lines +669 to +670
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-gzip path, bytes.NewBuffer(buf) allocates a new bytes.Buffer per batch even though you already have the final []byte. Consider avoiding the buffer wrapper and keep the []byte directly for the retry loop to reduce per-batch allocations.

Copilot uses AI. Check for mistakes.

if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused {
// Keep trying batch until successful.
// Stop client if it cannot connect.
for {
if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) {
return
} else if err != nil {
fmt.Fprintln(s.Stderr, err)
Expand Down Expand Up @@ -787,7 +795,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.

// sendBatch writes a batch to the server. Continually retries until successful.
func (s *Simulator) sendBatch(buf []byte) error {
// Don't send the batch anywhere..
// Don't send the batch anywhere.
if s.DryRun {
return nil
}
Expand Down Expand Up @@ -854,17 +862,17 @@ func (s *Simulator) sendBatch(buf []byte) error {
// slower than the desired maximum latency. We use a weighted moving average
// to determine that, favouring recent latencies over historic ones.
//
// The implementation is pretty ghetto at the moment, it has the following
// The implementation is pretty primitive at the moment, it has the following
// rules:
//
// - wma reponse time faster than desired latency and currentDelay > 0?
// - wma response time faster than desired latency and currentDelay > 0?
// * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency).
// - response time slower than desired latency?
// * increase currentDelay by 1/n * 0.25 * (desired latency - wma response).
// - currentDelay < 100ms?
// * set currentDelay to 0
//
// n is the number of concurent writers. The general rule then, is that
// n is the number of concurrent writers. The general rule then, is that
// we look at how far away from the desired latency and move a quarter of the
// way there in total (over all writers). If we're coming un under the max
// latency and our writers are using a delay (currentDelay > 0) then we will
Expand Down
Loading