From 1cf71eb1c99fe1af9bf0da9f9881d62664b6b57e Mon Sep 17 00:00:00 2001 From: Phil Bracikowski Date: Thu, 12 Feb 2026 14:25:25 -0800 Subject: [PATCH] feat: add v3 native write endpoint support Adds options to use the v3 native write endpoint for advanced control of write behavior such as the no-sync and accept-partial options. Updates token and auth handling to support v3 alongside v2. * closes #46 --- cmd/inch/main.go | 5 ++++- inch.go | 25 +++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/cmd/inch/main.go b/cmd/inch/main.go index f593051..ddb8f8a 100644 --- a/cmd/inch/main.go +++ b/cmd/inch/main.go @@ -61,7 +61,7 @@ func (m *Main) ParseFlags(args []string) error { fs := flag.NewFlagSet("inch", flag.ContinueOnError) fs.BoolVar(&m.inch.Verbose, "v", false, "Verbose") fs.BoolVar(&m.inch.V2, "v2", false, "Writing into InfluxDB 2.0") - fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 Authorization token") + fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 or 3 Authorization token") fs.StringVar(&m.inch.ReportHost, "report-host", "", "Host to send metrics") fs.StringVar(&m.inch.ReportUser, "report-user", "", "User for Host to send metrics") fs.StringVar(&m.inch.ReportPassword, "report-password", "", "Password Host to send metrics") @@ -92,6 +92,9 @@ func (m *Main) ParseFlags(args []string) error { fs.BoolVar(&m.inch.Gzip, "gzip", false, "Use gzip compression") fs.StringVar(&m.inch.Precision, "precision", "ns", "Precision of writes") noSetup := fs.Bool("no-setup", false, "Don't ping or set up tables/buckets on run (this is useful for load testing kapacitor)") + fs.BoolVar(&m.inch.V3, "v3", false, "Use v3 write endpoint (only compatible with v3 write endpoint)") + fs.BoolVar(&m.inch.V3NoSync, "v3-no-sync", false, "Disable waiting for durability before ack") + fs.BoolVar(&m.inch.V3AcceptPartial, "v3-accept-partial", false, "Accept lines in batch successfully even if subsequent lines error") if err := fs.Parse(args); err != nil { return err diff --git a/inch.go b/inch.go index ac9ad8b..0fbe532 100644 --- a/inch.go +++ b/inch.go @@ -102,6 +102,9 @@ type Simulator struct { TargetMaxLatency time.Duration Gzip bool Precision string + V3 bool // enables the v3 native write endpoint which has additional semantics; db and precision are required + V3NoSync bool // v3 supports a "no-sync" option, when true will ACK write as soon as possible without waiting for wal durability + V3AcceptPartial bool // allow partial write success when some lines in a batch fail to write Database string RetentionPolicy string // Write to a specific retention policy @@ -183,6 +186,10 @@ func (s *Simulator) Validate() error { el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) } + if !s.V3 && (s.V3NoSync || s.V3AcceptPartial) { + fmt.Fprintf(s.Stdout, "Warning: InfluxDB 3 flag(s) set to true, but V3 write endpoint not being used; flags will have no effect.\n") + } + if len(el) > 0 { return el } @@ -218,11 +225,14 @@ func (s *Simulator) Run(ctx context.Context) error { fmt.Fprintf(s.Stdout, "Retention Policy: %s\n", s.RetentionPolicy) fmt.Fprintf(s.Stdout, "Write Consistency: %s\n", s.Consistency) fmt.Fprintf(s.Stdout, "Writing into InfluxDB 2.0: %t\n", s.V2) - fmt.Fprintf(s.Stdout, "InfluxDB 2.0 Authorization Token: %s\n", s.Token) + fmt.Fprintf(s.Stdout, "InfluxDB 2 or 3 Authorization Token: %s\n", s.Token) fmt.Fprintf(s.Stdout, "Precision: %s\n", s.Precision) + fmt.Fprintf(s.Stdout, "Writing into InfluxDB 3: %t\n", s.V3) + fmt.Fprintf(s.Stdout, "InfluxDB 3 no-sync: %t\n", s.V3NoSync) + fmt.Fprintf(s.Stdout, "InfluxDB 3 accept partial writes: %t\n", s.V3AcceptPartial) - if s.V2 == true && s.Token == "" { - fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2.0") + if (s.V2 || s.V3) && s.Token == "" { + fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2 or 3") return err } @@ -719,7 +729,7 @@ var defaultSetupFn = func(s *Simulator) error { } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) } @@ -744,7 +754,10 @@ var defaultSetupFn = func(s *Simulator) error { // It's the caller's responsibility to close the response body. var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.ReadCloser, err error) { var url string - if s.RetentionPolicy == "" { + + if s.V3 { + url = fmt.Sprintf("%s/api/v3/write_lp?db=%s&precision=%s&no_sync=%v&accept_partial=%v", s.Host, s.Database, s.Precision, s.V3NoSync, s.V3AcceptPartial) + } else if s.RetentionPolicy == "" { url = fmt.Sprintf("%s/write?db=%s&precision=%s&consistency=%s", s.Host, s.Database, s.Precision, s.Consistency) } else { url = fmt.Sprintf("%s/write?db=%s&rp=%s&precision=%s&consistency=%s", s.Host, s.Database, s.RetentionPolicy, s.Precision, s.Consistency) @@ -755,7 +768,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. return 0, nil, err } - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) }