diff --git a/go.mod b/go.mod index bca333d..0af3084 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/influxdata/inch -go 1.17 +go 1.24 require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d diff --git a/inch.go b/inch.go index ac9ad8b..10b41e7 100644 --- a/inch.go +++ b/inch.go @@ -74,7 +74,7 @@ type Simulator struct { // Decay factor used when weighting average latency returned by server. alpha float64 - V2 bool + V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used Token string Verbose bool ReportHost string @@ -159,7 +159,7 @@ func (s *Simulator) Validate() error { el = append(el, errors.New("number of fields must be > 0")) } - // validate reporting client is accessable + // validate reporting client is accessible if s.ReportHost != "" { var err error s.clt, err = client.NewHTTPClient(client.HTTPConfig{ @@ -183,6 +183,15 @@ func (s *Simulator) Validate() error { el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) } + if s.Concurrency <= 0 { + fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency) + s.Concurrency = 1 + } + + if s.Concurrency >= 50 { + fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) + } + if len(el) > 0 { return el } @@ -275,7 +284,7 @@ func (s *Simulator) Run(ctx context.Context) error { } // Stream batches from a separate goroutine. - ch := s.generateBatches() + ch := s.generateBatches(s.Concurrency) // Start clients. var wg sync.WaitGroup @@ -350,8 +359,11 @@ func (s *Simulator) makeField(val int) []string { } // generateBatches returns a channel for streaming batches. -func (s *Simulator) generateBatches() <-chan []byte { - ch := make(chan []byte, 10) +func (s *Simulator) generateBatches(concurrency int) <-chan []byte { + // use concurrency setting to adjust internal buffering. We should attempt to make as many batches + // as there are concurrent writers up to some limit ~50. + pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1 + ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant go func() { values := make([]int, len(s.Tags)) @@ -440,7 +452,7 @@ func (s *Simulator) generateBatches() <-chan []byte { return ch } -var space []byte = []byte(" ") +var space = []byte(" ") func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) { buf.Write(measurement) // Write measurement @@ -623,8 +635,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration { // runClient executes a client to send points in a separate goroutine. func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { - b := bytes.NewBuffer(make([]byte, 0, 1024)) - g := gzip.NewWriter(b) + gzipBackingBuffer := make([]byte, 0, 1024) + g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) for { select { @@ -636,35 +648,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { return } - // Keep trying batch until successful. - // Stop client if it cannot connect. - for { - b.Reset() + b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b' + b.Reset() // resets backing buffer to zero length - if s.Gzip { - g.Reset(b) + if s.Gzip { + g.Reset(b) - if _, err := g.Write(buf); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if _, err := g.Write(buf); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) + } - if err := g.Close(); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } - } else { - _, err := io.Copy(b, bytes.NewReader(buf)) - if err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if err := g.Close(); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) } + } else { + b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer + } - if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused { + // Keep trying batch until successful. + // Stop client if it cannot connect. + for { + if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) { return } else if err != nil { fmt.Fprintln(s.Stderr, err) @@ -787,7 +795,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. // sendBatch writes a batch to the server. Continually retries until successful. func (s *Simulator) sendBatch(buf []byte) error { - // Don't send the batch anywhere.. + // Don't send the batch anywhere. if s.DryRun { return nil } @@ -854,17 +862,17 @@ func (s *Simulator) sendBatch(buf []byte) error { // slower than the desired maximum latency. We use a weighted moving average // to determine that, favouring recent latencies over historic ones. // - // The implementation is pretty ghetto at the moment, it has the following + // The implementation is pretty primitive at the moment, it has the following // rules: // - // - wma reponse time faster than desired latency and currentDelay > 0? + // - wma response time faster than desired latency and currentDelay > 0? // * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency). // - response time slower than desired latency? // * increase currentDelay by 1/n * 0.25 * (desired latency - wma response). // - currentDelay < 100ms? // * set currentDelay to 0 // - // n is the number of concurent writers. The general rule then, is that + // n is the number of concurrent writers. The general rule then, is that // we look at how far away from the desired latency and move a quarter of the // way there in total (over all writers). If we're coming un under the max // latency and our writers are using a delay (currentDelay > 0) then we will