From a4201edeb62f12a92df7913b3b9a4e7e28ed56dc Mon Sep 17 00:00:00 2001 From: coanor Date: Tue, 30 Apr 2024 18:07:11 +0800 Subject: [PATCH 1/2] refactor: refactor checker into cfg --- point/check.go | 155 ++++++++++++++++++++++-------------- point/check_test.go | 52 +++--------- point/decode.go | 41 ++-------- point/decode_test.go | 15 +++- point/err.go | 1 + point/json.go | 4 +- point/json_test.go | 12 +-- point/lp.go | 12 +-- point/lp_test.go | 9 +-- point/metrics.go | 41 ++++++++++ point/new_point.go | 28 ++----- point/option.go | 47 ++++++----- point/point.go | 10 ++- point/point_test.go | 19 ++--- point/ptpool.go | 11 ++- point/set.go | 14 +++- point/shared_buf.go | 64 +++++++++++++++ point/shared_buf_test.go | 166 +++++++++++++++++++++++++++++++++++++++ 18 files changed, 477 insertions(+), 224 deletions(-) create mode 100644 point/metrics.go create mode 100644 point/shared_buf.go create mode 100644 point/shared_buf_test.go diff --git a/point/check.go b/point/check.go index 02291db7..3e5f1752 100644 --- a/point/check.go +++ b/point/check.go @@ -10,69 +10,103 @@ import ( "math" "reflect" "strings" + "time" ) -type checker struct { - *cfg - warns []*Warn -} - -func (c *checker) reset() { +func (c *cfg) check(pt *Point) *Point { + // reset warns: make the point can check multiple times and the warns(if any) + // not duplicated. c.warns = c.warns[:0] -} -func (c *checker) check(pt *Point) *Point { pt.pt.Name = c.checkMeasurement(pt.pt.Name) pt.pt.Fields = c.checkKVs(pt.pt.Fields) + c.checkTime(pt) + pt.pt.Warns = append(pt.pt.Warns, c.warns...) - // Add more checkings... + pt.SetFlag(Pcheck) + return pt } -func (c *checker) addWarn(t, msg string) { +func (c *cfg) addWarn(t, msg string) { c.warns = append(c.warns, &Warn{ Type: t, Msg: msg, }) } -func (c *checker) checkMeasurement(m string) string { +func (c *cfg) checkTime(pt *Point) { + x := pt.pt.Time + + if c.timestamp != -1 && x == 0 { // apply time in cfg + x = c.timestamp + } + + if x < 0 && !c.enableNagativeTimestamp { // Set to current time. + c.addWarn(WarnNagativeTimestamp, fmt.Sprintf("got nagative timestamp %d, reset to current time", x)) + x = time.Now().Round(0).UnixNano() // trim monotonic clock + } + + switch c.precision { + case PrecUS: + x *= int64(time.Microsecond) + case PrecMS: + x *= int64(time.Millisecond) + case PrecS: + x *= int64(time.Second) + case PrecM: + x *= int64(time.Minute) + case PrecH: + x *= int64(time.Hour) + case PrecNS: // pass + case PrecD: + x *= (24 * int64(time.Hour)) + case PrecW: + x *= (7 * 24 * int64(time.Hour)) + default: + // pass + } + + pt.pt.Time = x +} + +func (c *cfg) checkMeasurement(m string) string { if len(m) == 0 { c.addWarn(WarnInvalidMeasurement, fmt.Sprintf("empty measurement, use %s", DefaultMeasurementName)) m = DefaultMeasurementName } - if c.cfg.maxMeasurementLen > 0 && len(m) > c.cfg.maxMeasurementLen { + if c.maxMeasurementLen > 0 && len(m) > c.maxMeasurementLen { c.addWarn(WarnInvalidMeasurement, fmt.Sprintf("exceed max measurement length(%d), got length %d, trimmed", - c.cfg.maxMeasurementLen, len(m))) - return m[:c.cfg.maxMeasurementLen] + c.maxMeasurementLen, len(m))) + return m[:c.maxMeasurementLen] } else { return m } } -func (c *checker) checkKVs(kvs KVs) KVs { +func (c *cfg) checkKVs(kvs KVs) KVs { tcnt := kvs.TagCount() fcnt := kvs.FieldCount() // delete extra fields - if c.cfg.maxFields > 0 && fcnt > c.cfg.maxFields { + if c.maxFields > 0 && fcnt > c.maxFields { c.addWarn(WarnMaxFields, fmt.Sprintf("exceed max field count(%d), got %d fields, extra fields deleted", - c.cfg.maxFields, fcnt)) + c.maxFields, fcnt)) - kvs = kvs.TrimFields(c.cfg.maxFields) + kvs = kvs.TrimFields(c.maxFields) } // delete extra tags - if c.cfg.maxTags > 0 && tcnt > c.cfg.maxTags { + if c.maxTags > 0 && tcnt > c.maxTags { c.addWarn(WarnMaxFields, fmt.Sprintf("exceed max tag count(%d), got %d tags, extra tags deleted", - c.cfg.maxTags, tcnt)) + c.maxTags, tcnt)) - kvs = kvs.TrimTags(c.cfg.maxTags) + kvs = kvs.TrimTags(c.maxTags) } // check each kv valid @@ -113,7 +147,7 @@ func adjustKV(x string) string { return x } -func (c *checker) checkKV(f *Field, kvs KVs) (*Field, bool) { +func (c *cfg) checkKV(f *Field, kvs KVs) (*Field, bool) { if f.IsTag { return c.checkTag(f, kvs) } else { @@ -121,7 +155,7 @@ func (c *checker) checkKV(f *Field, kvs KVs) (*Field, bool) { } } -func (c *checker) keyConflict(key string, kvs KVs) bool { +func (c *cfg) keyConflict(key string, kvs KVs) bool { if kvs.Get(key) != nil { // key exist c.addWarn(WarnKeyNameConflict, fmt.Sprintf("same key after rename(%q), kv dropped", key)) @@ -133,13 +167,13 @@ func (c *checker) keyConflict(key string, kvs KVs) bool { // checkTag try to auto modify the f. If we need to drop // f, we return false. -func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { - if c.cfg.maxTagKeyLen > 0 && len(f.Key) > c.cfg.maxTagKeyLen { +func (c *cfg) checkTag(f *Field, kvs KVs) (*Field, bool) { + if c.maxTagKeyLen > 0 && len(f.Key) > c.maxTagKeyLen { c.addWarn(WarnMaxTagKeyLen, fmt.Sprintf("exceed max tag key length(%d), got %d, key truncated", - c.cfg.maxTagKeyLen, len(f.Key))) + c.maxTagKeyLen, len(f.Key))) - newKey := f.Key[:c.cfg.maxTagKeyLen] + newKey := f.Key[:c.maxTagKeyLen] if c.keyConflict(newKey, kvs) { return f, false } else { @@ -149,12 +183,12 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { x := f.Val.(*Field_S) - if c.cfg.maxTagValLen > 0 && len(x.S) > c.cfg.maxTagValLen { + if c.maxTagValLen > 0 && len(x.S) > c.maxTagValLen { c.addWarn(WarnMaxTagValueLen, fmt.Sprintf("exceed max tag value length(%d), got %d, value truncated", - c.cfg.maxTagValLen, len(x.S))) + c.maxTagValLen, len(x.S))) - x.S = x.S[:c.cfg.maxTagValLen] + x.S = x.S[:c.maxTagValLen] f.Val = x } @@ -179,7 +213,7 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { } // replace `.' with `_' in tag keys - if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { + if strings.Contains(f.Key, ".") && !c.enableDotInKey { c.addWarn(WarnInvalidTagKey, fmt.Sprintf("invalid tag key `%s': found `.'", f.Key)) newKey := strings.ReplaceAll(f.Key, ".", "_") @@ -200,14 +234,14 @@ func (c *checker) checkTag(f *Field, kvs KVs) (*Field, bool) { // checkField try to auto modify the f. If we need to drop // f, we return false. -func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { +func (c *cfg) checkField(f *Field, kvs KVs) (*Field, bool) { // trim key - if c.cfg.maxFieldKeyLen > 0 && len(f.Key) > c.cfg.maxFieldKeyLen { + if c.maxFieldKeyLen > 0 && len(f.Key) > c.maxFieldKeyLen { c.addWarn(WarnMaxFieldKeyLen, fmt.Sprintf("exceed max field key length(%d), got %d, key truncated to %s", - c.cfg.maxFieldKeyLen, len(f.Key), f.Key)) + c.maxFieldKeyLen, len(f.Key), f.Key)) - newKey := f.Key[:c.cfg.maxFieldKeyLen] + newKey := f.Key[:c.maxFieldKeyLen] if c.keyConflict(newKey, kvs) { return f, false @@ -216,7 +250,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { } } - if strings.Contains(f.Key, ".") && !c.cfg.enableDotInKey { + if strings.Contains(f.Key, ".") && !c.enableDotInKey { c.addWarn(WarnDotInkey, fmt.Sprintf("invalid field key `%s': found `.'", f.Key)) @@ -236,7 +270,7 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { switch x := f.Val.(type) { case *Field_U: - if !c.cfg.enableU64Field { + if !c.enableU64Field { if x.U > uint64(math.MaxInt64) { c.addWarn(WarnMaxFieldValueInt, fmt.Sprintf("too large int field: key=%s, value=%d(> %d)", f.Key, x.U, uint64(math.MaxInt64))) @@ -263,35 +297,35 @@ func (c *checker) checkField(f *Field, kvs KVs) (*Field, bool) { case *Field_D: // same as []uint8 - if !c.cfg.enableStrField { + if !c.enableStrField { c.addWarn(WarnInvalidFieldValueType, fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) return f, false } - if c.cfg.maxFieldValLen > 0 && len(x.D) > c.cfg.maxFieldValLen { + if c.maxFieldValLen > 0 && len(x.D) > c.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.D))) + f.Key, c.maxFieldValLen, len(x.D))) - x.D = x.D[:c.cfg.maxFieldValLen] + x.D = x.D[:c.maxFieldValLen] f.Val = x } case *Field_S: // same as Field_D - if !c.cfg.enableStrField { + if !c.enableStrField { c.addWarn(WarnInvalidFieldValueType, fmt.Sprintf("field(%s) dropped with string value, when [DisableStringField] enabled", f.Key)) return f, false } - if c.cfg.maxFieldValLen > 0 && len(x.S) > c.cfg.maxFieldValLen { + if c.maxFieldValLen > 0 && len(x.S) > c.maxFieldValLen { c.addWarn(WarnMaxFieldValueLen, fmt.Sprintf("field (%s) exceed max field value length(%d), got %d, value truncated", - f.Key, c.cfg.maxFieldValLen, len(x.S))) + f.Key, c.maxFieldValLen, len(x.S))) - x.S = x.S[:c.cfg.maxFieldValLen] + x.S = x.S[:c.maxFieldValLen] f.Val = x } @@ -317,16 +351,16 @@ func trimSuffixAll(s, sfx string) string { return x } -func (c *checker) keyDisabled(k string) bool { +func (c *cfg) keyDisabled(k string) bool { if k == "" { return true } - if c.cfg.disabledKeys == nil { + if c.disabledKeys == nil { return false } - for _, item := range c.cfg.disabledKeys { + for _, item := range c.disabledKeys { if k == item.key { return true } @@ -335,12 +369,12 @@ func (c *checker) keyDisabled(k string) bool { return false } -func (c *checker) keyMiss(kvs KVs) KVs { - if c.cfg.requiredKeys == nil { +func (c *cfg) keyMiss(kvs KVs) KVs { + if c.requiredKeys == nil { return kvs } - for _, rk := range c.cfg.requiredKeys { + for _, rk := range c.requiredKeys { if !kvs.Has(rk.Key()) { if def := rk.Default(); def != nil { kvs = kvs.MustAddKV(NewKV(rk.Key(), def)) @@ -356,11 +390,6 @@ func (c *checker) keyMiss(kvs KVs) KVs { // CheckPoints used to check pts on various opts. func CheckPoints(pts []*Point, opts ...Option) (arr []*Point) { - c := GetCfg(opts...) - defer PutCfg(c) - - chk := checker{cfg: c} - arr = pts[:0] for _, pt := range pts { @@ -368,11 +397,17 @@ func CheckPoints(pts []*Point, opts ...Option) (arr []*Point) { continue } - pt = chk.check(pt) - pt.SetFlag(Pcheck) - pt.pt.Warns = chk.warns + for _, opt := range opts { + if opt == nil { + continue + } + + opt(pt.cfg) + } + + pt = pt.cfg.check(pt) + arr = append(arr, pt) - chk.reset() } return arr diff --git a/point/check_test.go b/point/check_test.go index a3852d07..26711ce1 100644 --- a/point/check_test.go +++ b/point/check_test.go @@ -72,14 +72,9 @@ func TestCheckMeasurement(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *T.T) { - cfg := GetCfg() - defer PutCfg(cfg) - for _, opt := range tc.opts { - opt(cfg) - } + cfg := getCfg(tc.opts...) - c := checker{cfg: cfg} - m := c.checkMeasurement(tc.measurement) + m := cfg.checkMeasurement(tc.measurement) assert.Equal(t, tc.expect, m) }) } @@ -189,17 +184,11 @@ func TestCheckTags(t *T.T) { for _, tc := range cases { t.Run(tc.name, func(t *T.T) { - cfg := GetCfg() - defer PutCfg(cfg) - - for _, opt := range tc.opts { - opt(cfg) - } + cfg := getCfg(tc.opts...) - c := checker{cfg: cfg} - kvs := c.checkKVs(NewTags(tc.t)) + kvs := cfg.checkKVs(NewTags(tc.t)) - assert.Equal(t, tc.warns, len(c.warns), "got warns: %v", c.warns) + assert.Equal(t, tc.warns, len(cfg.warns), "got warns: %v", cfg.warns) if tc.expect != nil { eq, r := kvsEq(tc.expect, kvs) @@ -457,16 +446,7 @@ func TestCheckFields(t *T.T) { for _, tc := range cases { t.Run(tc.name, func(t *T.T) { - cfg := GetCfg() - defer PutCfg(cfg) - - for _, opt := range tc.opts { - opt(cfg) - } - - t.Logf("cfg: %+#v", cfg) - - c := checker{cfg: cfg} + cfg := getCfg(tc.opts...) kvs := NewKVs(tc.f) expect := NewKVs(tc.expect) @@ -476,8 +456,8 @@ func TestCheckFields(t *T.T) { sort.Sort(expect) } - kvs = c.checkKVs(kvs) - require.Equal(t, tc.warns, len(c.warns)) + kvs = cfg.checkKVs(kvs) + require.Equal(t, tc.warns, len(cfg.warns)) if tc.expect != nil { eq, _ := kvsEq(expect, kvs) @@ -616,24 +596,16 @@ func BenchmarkCheck(b *T.B) { }, } - for _, tc := range cases { - pt, err := NewPoint(tc.m, tc.t, tc.f, tc.opts...) + for _, bc := range cases { + pt, err := NewPoint(bc.m, bc.t, bc.f, bc.opts...) assert.NoError(b, err) b.Logf("pt with warns: %d", len(pt.pt.Warns)) - cfg := GetCfg() - defer PutCfg(cfg) - - for _, opt := range tc.opts { - opt(cfg) - } - c := checker{cfg: cfg} - b.ResetTimer() - b.Run(tc.name, func(b *T.B) { + b.Run(bc.name, func(b *T.B) { for i := 0; i < b.N; i++ { - c.check(pt) + pt.cfg.check(pt) } }) } diff --git a/point/decode.go b/point/decode.go index 98204b6b..fa002bc0 100644 --- a/point/decode.go +++ b/point/decode.go @@ -8,7 +8,6 @@ package point import ( "encoding/json" sync "sync" - "time" ) var decPool sync.Pool @@ -79,10 +78,6 @@ func (d *Decoder) Decode(data []byte, opts ...Option) ([]*Point, error) { err error ) - // point options - cfg := GetCfg(opts...) - defer PutCfg(cfg) - switch d.enc { case JSON: var arr []JSONPoint @@ -91,29 +86,6 @@ func (d *Decoder) Decode(data []byte, opts ...Option) ([]*Point, error) { } for _, x := range arr { - if x.Time > 0 { // check if precision attached - switch cfg.precision { - case PrecUS: - x.Time *= int64(time.Microsecond) - case PrecMS: - x.Time *= int64(time.Millisecond) - case PrecS: - x.Time *= int64(time.Second) - case PrecM: - x.Time *= int64(time.Minute) - case PrecH: - x.Time *= int64(time.Hour) - - case PrecNS: - // pass - - case PrecW, PrecD: // not used - - default: - // pass - } - } - if pt, err := x.Point(opts...); err != nil { return nil, err } else { @@ -138,17 +110,16 @@ func (d *Decoder) Decode(data []byte, opts ...Option) ([]*Point, error) { } } - var chk *checker - if cfg.precheck { - chk = &checker{cfg: cfg} - for idx, pt := range pts { - pts[idx] = chk.check(pt) + // the opts not applied to pts, apply again. + if len(opts) > 0 { + for i, pt := range pts { + pt.cfg = applyCfgOptions(pt.cfg, opts...) + pts[i] = pt.cfg.check(pt) } } case LineProtocol: - - pts, err = parseLPPoints(data, cfg) + pts, err = parseLPPoints(data, opts...) if err != nil { d.detailedError = err return nil, simplifyLPError(err) diff --git a/point/decode_test.go b/point/decode_test.go index f56224a3..37b9ec6f 100644 --- a/point/decode_test.go +++ b/point/decode_test.go @@ -168,16 +168,27 @@ func TestDecode(t *T.T) { }, { - name: "decode-json-with-precision-x", + name: "decode-json-with-precision-w", data: []byte(`[ { "measurement": "abc", "tags": {"t1": "val1"}, "fields": {"f1": 123, "f2": 3.14}, "time":123} ]`), expectLP: []string{ - `abc,t1=val1 f1=123,f2=3.14 123`, + fmt.Sprintf(`abc,t1=val1 f1=123,f2=3.14 %d`, 123*7*24*time.Hour), }, opts: []DecoderOption{WithDecEncoding(JSON)}, ptsOpts: []Option{WithPrecision(PrecW)}, }, + { + name: "decode-json-with-precision-d", + data: []byte(`[ { "measurement": "abc", "tags": {"t1": "val1"}, "fields": {"f1": 123, "f2": 3.14}, "time":123} ]`), + expectLP: []string{ + fmt.Sprintf(`abc,t1=val1 f1=123,f2=3.14 %d`, 123*24*time.Hour), + }, + + opts: []DecoderOption{WithDecEncoding(JSON)}, + ptsOpts: []Option{WithPrecision(PrecD)}, + }, + { name: "decode-metric-json", data: []byte(`[ { "measurement": "abc", "tags": {"t1": "val1"}, "fields": {"f1": 123, "f2": 3.14, "f-str": "hello"}, "time":123} ]`), diff --git a/point/err.go b/point/err.go index f411ae9c..65be5994 100644 --- a/point/err.go +++ b/point/err.go @@ -28,6 +28,7 @@ const ( WarnInvalidTagKey = "invalid_tag_key" WarnInvalidTagValue = "invalid_tag_value" WarnInvalidMeasurement = "invalid_measurement" + WarnNagativeTimestamp = "nagative_timestamp" WarnInvalidFieldValueType = "invalid_field_value_type" WarnAddRequiredKV = "add_required_kv" diff --git a/point/json.go b/point/json.go index 912a6314..9b18e2ad 100644 --- a/point/json.go +++ b/point/json.go @@ -29,10 +29,10 @@ type JSONPoint struct { } func (jp *JSONPoint) Point(opts ...Option) (*Point, error) { - // NOTE: preferred in-point time - if jp.Time != 0 { + if jp.Time != 0 { // Preferred in-point time opts = append(opts, WithTime(time.Unix(0, jp.Time))) } + return NewPoint(jp.Measurement, jp.Tags, jp.Fields, opts...) } diff --git a/point/json_test.go b/point/json_test.go index cbad26b3..4e9ea41b 100644 --- a/point/json_test.go +++ b/point/json_test.go @@ -154,20 +154,22 @@ func TestJSONPoint2Point(t *T.T) { }, { - name: "minus-time", // it's ok! + name: "nagative-time", // it's ok! p: &JSONPoint{ - Measurement: "minus-time", + Measurement: "nagative-time", Tags: nil, Fields: map[string]interface{}{"f1": 123, "f2": false}, }, - opts: []Option{WithTime(time.Unix(0, -123))}, - expect: "minus-time f1=123i,f2=false -123", + opts: []Option{WithTimestamp(-123), WithNagativeTimestamp(true)}, + expect: "nagative-time f1=123i,f2=false -123", }, } for _, tc := range cases { t.Run(tc.name, func(t *T.T) { + pt, err := tc.p.Point(tc.opts...) + if tc.fail { assert.Error(t, err) t.Logf("expect err: %s", err) @@ -176,7 +178,7 @@ func TestJSONPoint2Point(t *T.T) { assert.NoError(t, err) } - assert.Equal(t, tc.expect, pt.LineProto()) + assert.Equalf(t, tc.expect, pt.LineProto(), "cfg: %+#v, pt: %s", pt.cfg, pt.Pretty()) }) } } diff --git a/point/lp.go b/point/lp.go index 6aabff29..34fcc946 100644 --- a/point/lp.go +++ b/point/lp.go @@ -84,15 +84,12 @@ func simplifyLPError(err error) error { } // parseLPPoints parse line-protocol payload to Point. -func parseLPPoints(data []byte, c *cfg) ([]*Point, error) { +func parseLPPoints(data []byte, opts ...Option) ([]*Point, error) { if len(data) == 0 { return nil, fmt.Errorf("empty data") } - if c == nil { - c = GetCfg() - defer PutCfg(c) - } + c := getCfg(opts...) ptTime := c.t if c.t.IsZero() { @@ -105,7 +102,6 @@ func parseLPPoints(data []byte, c *cfg) ([]*Point, error) { } res := []*Point{} - chk := checker{cfg: c} for _, x := range lppts { if x == nil { @@ -142,9 +138,7 @@ func parseLPPoints(data []byte, c *cfg) ([]*Point, error) { } } - pt = chk.check(pt) - pt.pt.Warns = chk.warns - chk.reset() + pt = c.check(pt) // re-sort again: check may update pt.pt.Fields if c.keySorted { diff --git a/point/lp_test.go b/point/lp_test.go index ac327d7f..9e7dc16f 100644 --- a/point/lp_test.go +++ b/point/lp_test.go @@ -1002,14 +1002,7 @@ line`}, time.Unix(0, 123)), } t.Run(tc.name, func(t *testing.T) { - c := GetCfg() - defer PutCfg(c) - - for _, opt := range tc.opts { - opt(c) - } - - pts, err := parseLPPoints(tc.data, c) + pts, err := parseLPPoints(tc.data, tc.opts...) if tc.fail { if len(pts) > 0 { assert.Error(t, err, "got point[0]: %s", pts[0].Pretty()) diff --git a/point/metrics.go b/point/metrics.go new file mode 100644 index 00000000..ed7e9c78 --- /dev/null +++ b/point/metrics.go @@ -0,0 +1,41 @@ +package point + +import ( + p8s "github.com/prometheus/client_golang/prometheus" +) + +var ( + pointSize = p8s.NewSummary( + p8s.SummaryOpts{ + Namespace: "pointpool", + Name: "point_size", + Help: "Byte size of point", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, + ) + + pointBufCap = p8s.NewSummary( + p8s.SummaryOpts{ + Namespace: "pointpool", + Name: "point_bytes_buf_cap", + Help: "Capacity of point's bytes buffer", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, + ) +) + +// Metrics return all exported prometheus metrics of point package. +func Metrics() []p8s.Collector { + return []p8s.Collector{ + pointBufCap, + pointSize, + } +} diff --git a/point/new_point.go b/point/new_point.go index f87edb3f..dcf57ce5 100644 --- a/point/new_point.go +++ b/point/new_point.go @@ -7,12 +7,10 @@ package point import ( "sort" - "time" ) func NewPointV2(name string, kvs KVs, opts ...Option) *Point { - c := GetCfg(opts...) - defer PutCfg(c) + c := getCfg(opts...) return doNewPoint(name, kvs, c) } @@ -35,8 +33,7 @@ func NewPoint(name string, tags map[string]string, fields map[string]any, opts . kvs = kvs.MustAddTag(k, v) // force add these tags } - c := GetCfg(opts...) - defer PutCfg(c) + c := getCfg(opts...) return doNewPoint(name, kvs, c), nil } @@ -51,14 +48,13 @@ func doNewPoint(name string, kvs KVs, c *cfg) *Point { pt.pt.Fields = kvs pt.SetFlag(Ppooled) } else { - pt = &Point{ - pt: &PBPoint{ - Name: name, - Fields: kvs, - }, - } + pt = emptyPoint() + pt.pt.Name = name + pt.pt.Fields = kvs } + pt.cfg = c + // add extra tags if len(c.extraTags) > 0 { for _, kv := range c.extraTags { @@ -77,9 +73,7 @@ func doNewPoint(name string, kvs KVs, c *cfg) *Point { } if c.precheck { - chk := checker{cfg: c} - pt = chk.check(pt) - pt.SetFlag(Pcheck) + pt = pt.cfg.check(pt) } // sort again: during check, kv maybe update @@ -87,11 +81,5 @@ func doNewPoint(name string, kvs KVs, c *cfg) *Point { sort.Sort(KVs(pt.pt.Fields)) } - if c.timestamp >= 0 { - pt.pt.Time = c.timestamp - } else { // not set, use current time - pt.pt.Time = time.Now().Round(0).UnixNano() // trim monotonic clock - } - return pt } diff --git a/point/option.go b/point/option.go index 49f2e8f8..964c7cd7 100644 --- a/point/option.go +++ b/point/option.go @@ -6,7 +6,6 @@ package point import ( - sync "sync" "time" ) @@ -22,27 +21,17 @@ const ( type Option func(*cfg) -var cfgPool sync.Pool - -func GetCfg(opts ...Option) *cfg { - v := cfgPool.Get() - if v == nil { - v = newCfg() - } - - x := v.(*cfg) +func applyCfgOptions(c *cfg, opts ...Option) *cfg { for _, opt := range opts { if opt != nil { - opt(x) + opt(c) } } - - return x + return c } -func PutCfg(c *cfg) { - c.reset() - cfgPool.Put(c) +func getCfg(opts ...Option) *cfg { + return applyCfgOptions(newCfg(), opts...) } type cfg struct { @@ -62,6 +51,7 @@ type cfg struct { keySorted bool + enableNagativeTimestamp, enableDotInKey, // enable dot(.) in tag/field key name enableStrField, // enable string field value // For uint64 field, if value < maxint64,convert to int64, or dropped if value > maxint64 @@ -83,6 +73,8 @@ type cfg struct { disabledKeys, requiredKeys []*Key + + warns []*Warn } func newCfg() *cfg { @@ -110,11 +102,16 @@ func newCfg() *cfg { } func (c *cfg) reset() { + if c == nil { + return + } + c.callback = nil c.disabledKeys = nil c.enableDotInKey = true c.enableStrField = true c.enableU64Field = true + c.enableNagativeTimestamp = false c.enc = DefaultEncoding c.extraTags = nil c.maxFieldKeyLen = defaultKeyLen @@ -130,16 +127,18 @@ func (c *cfg) reset() { c.requiredKeys = nil c.t = time.Time{} c.timestamp = -1 + c.warns = c.warns[:0] } -func WithMaxKVComposeLen(n int) Option { return func(c *cfg) { c.maxTagKeyValComposeLen = n } } -func WithMaxMeasurementLen(n int) Option { return func(c *cfg) { c.maxMeasurementLen = n } } -func WithCallback(fn Callback) Option { return func(c *cfg) { c.callback = fn } } -func WithU64Field(on bool) Option { return func(c *cfg) { c.enableU64Field = on } } -func WithStrField(on bool) Option { return func(c *cfg) { c.enableStrField = on } } -func WithDotInKey(on bool) Option { return func(c *cfg) { c.enableDotInKey = on } } -func WithPrecheck(on bool) Option { return func(c *cfg) { c.precheck = on } } -func WithKeySorted(on bool) Option { return func(c *cfg) { c.keySorted = on } } +func WithMaxKVComposeLen(n int) Option { return func(c *cfg) { c.maxTagKeyValComposeLen = n } } +func WithMaxMeasurementLen(n int) Option { return func(c *cfg) { c.maxMeasurementLen = n } } +func WithCallback(fn Callback) Option { return func(c *cfg) { c.callback = fn } } +func WithU64Field(on bool) Option { return func(c *cfg) { c.enableU64Field = on } } +func WithStrField(on bool) Option { return func(c *cfg) { c.enableStrField = on } } +func WithDotInKey(on bool) Option { return func(c *cfg) { c.enableDotInKey = on } } +func WithPrecheck(on bool) Option { return func(c *cfg) { c.precheck = on } } +func WithNagativeTimestamp(on bool) Option { return func(c *cfg) { c.enableNagativeTimestamp = on } } +func WithKeySorted(on bool) Option { return func(c *cfg) { c.keySorted = on } } func WithTime(t time.Time) Option { return func(c *cfg) { diff --git a/point/point.go b/point/point.go index 50c74b25..9f58b879 100644 --- a/point/point.go +++ b/point/point.go @@ -30,7 +30,12 @@ type Callback func(*Point) (*Point, error) type Point struct { flags uint64 - pt *PBPoint + + buf []byte + bufShrinked int + + cfg *cfg + pt *PBPoint } // ClearFlag clear specific bit. @@ -145,6 +150,9 @@ func (p *Point) Pretty() string { arr = append(arr, fmt.Sprintf("[D] %s", d.Info)) } + arr = append(arr, "-----------") + arr = append(arr, fmt.Sprintf("Buf len/cap: %d/%d", len(p.buf), cap(p.buf))) + return strings.Join(arr, "\n") } diff --git a/point/point_test.go b/point/point_test.go index 87770119..e5d72f9e 100644 --- a/point/point_test.go +++ b/point/point_test.go @@ -113,16 +113,14 @@ func BenchmarkFromModelsLP(b *T.B) { assert.NoError(b, err) assert.Len(b, lppts, 1) - c := GetCfg() - chk := checker{cfg: c} + c := getCfg() b.ResetTimer() for i := 0; i < b.N; i++ { pt := FromModelsLP(lppts[0]) - pt = chk.check(pt) - pt.pt.Warns = chk.warns - chk.reset() + pt = c.check(pt) + c.reset() // re-sort again: check may update pt.kvs if c.keySorted { @@ -274,7 +272,7 @@ func TestFlags(t *T.T) { }) t.Run("test-flag-set-clear", func(t *T.T) { - pt := &Point{} + pt := emptyPoint() pt.SetFlag(Psent) assert.True(t, pt.HasFlag(Psent)) @@ -695,14 +693,7 @@ func TestPointPB(t *T.T) { sort.Sort(kvs) expect.pt.Fields = kvs - cfg := GetCfg() - defer PutCfg(cfg) - chk := checker{cfg: cfg} - expect = chk.check(expect) - expect.SetFlag(Pcheck) - expect.pt.Warns = chk.warns - - assert.Equal(t, expect.Pretty(), pt.Pretty(), "got\n%s\nexpect\n%s", expect.Pretty(), pt.Pretty()) + assert.Equal(t, expect.Pretty(), pt.Pretty(), "got\n%s\nexpect\n%s", pt.Pretty(), expect.Pretty()) t.Logf("pt: %s", pt.Pretty()) }) diff --git a/point/ptpool.go b/point/ptpool.go index 3daca83d..8825039c 100644 --- a/point/ptpool.go +++ b/point/ptpool.go @@ -130,6 +130,14 @@ func (p *Point) clear() { } func (p *Point) Reset() { + if cap(p.buf) > (1 << 12) { // larger than 4KB + p.buf = nil + p.bufShrinked++ // do not reset the field + } + + p.cfg.reset() + + p.buf = p.buf[:0] p.flags = 0 p.clear() } @@ -656,6 +664,8 @@ func (cpp *ReservedCapPointPool) Get() *Point { } func (cpp *ReservedCapPointPool) Put(p *Point) { + pointSize.Observe(float64(p.Size())) + for _, f := range p.KVs() { cpp.PutKV(f) } @@ -845,7 +855,6 @@ func (cpp *ReservedCapPointPool) Collect(ch chan<- p8s.Metric) { ch <- p8s.MustNewConstMetric(chanPutDesc, p8s.CounterValue, float64(cpp.chanPut())) ch <- p8s.MustNewConstMetric(poolGetDesc, p8s.CounterValue, float64(cpp.poolGet())) ch <- p8s.MustNewConstMetric(poolPutDesc, p8s.CounterValue, float64(cpp.poolPut())) - ch <- p8s.MustNewConstMetric(reservedCapacityDesc, p8s.CounterValue, float64(cpp.capacity)) ch <- p8s.MustNewConstMetric(poolMallocDesc, p8s.CounterValue, float64(cpp.malloc.Load())) } diff --git a/point/set.go b/point/set.go index 81a0da4d..2bf51cfd 100644 --- a/point/set.go +++ b/point/set.go @@ -7,10 +7,18 @@ package point import "time" -func (p *Point) SetName(name string) { - p.pt.Name = name +func (p *Point) SetName(name string) *Point { + bufName := p.bufStr(name) + p.pt.Name = bufName + return p } -func (p *Point) SetTime(t time.Time) { +func (p *Point) SetTime(t time.Time) *Point { p.pt.Time = t.UnixNano() + return p +} + +func (p *Point) SetTimestamp(nanoTimestamp int64) *Point { + p.pt.Time = nanoTimestamp + return p } diff --git a/point/shared_buf.go b/point/shared_buf.go new file mode 100644 index 00000000..1064f820 --- /dev/null +++ b/point/shared_buf.go @@ -0,0 +1,64 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package point + +import "unsafe" + +func (p *Point) bufStr(src string) string { + i := len(p.buf) + j := i + len(src) + + p.buf = append(p.buf, src[0:]...) + + pointBufCap.Observe(float64(cap(p.buf))) + + offset := p.buf[i:j] + return *(*string)(unsafe.Pointer(&offset)) +} + +// New create a new empty *Point. +func New(opts ...Option) (pt *Point) { + if defaultPTPool != nil { + pt = defaultPTPool.Get() + } else { + pt = emptyPoint() + } + + pt.cfg = getCfg(opts...) + return pt +} + +func (p *Point) AddKV(k string, v any, force bool, opts ...KVOption) *Point { + kStr := p.bufStr(k) + + switch x := v.(type) { + case string: + vStr := p.bufStr(x) + old := KVs(p.pt.Fields) + p.pt.Fields = old.AddV2(kStr, vStr, force, opts...) + return p + default: + old := KVs(p.pt.Fields) + p.pt.Fields = old.AddV2(kStr, v, force, opts...) + return p + } +} + +func (p *Point) Check(opts ...Option) *Point { + if p.cfg == nil { + return p + } + + for _, opt := range opts { + if opt == nil { + continue + } + + opt(p.cfg) + } + + return p.cfg.check(p) +} diff --git a/point/shared_buf_test.go b/point/shared_buf_test.go new file mode 100644 index 00000000..4b74d472 --- /dev/null +++ b/point/shared_buf_test.go @@ -0,0 +1,166 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package point + +import ( + "math" + T "testing" + "time" + + "github.com/GuanceCloud/cliutils" + "github.com/GuanceCloud/cliutils/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func Test_bufStr(t *T.T) { + t.Run("basic", func(t *T.T) { + pt := New() + + assert.Equal(t, "abc", pt.bufStr("abc")) + t.Logf("len(buf): %d, cap(buf): %d", len(pt.buf), cap(pt.buf)) + }) + + t.Run("loop", func(t *T.T) { + pt := emptyPoint() + + for i := 0; i < 100; i++ { + randStr := cliutils.CreateRandomString(i) + assert.Equal(t, randStr, pt.bufStr(randStr)) + } + t.Logf("len(buf): %d, cap(buf): %d", len(pt.buf), cap(pt.buf)) + }) +} + +func TestAddKV(t *T.T) { + t.Run("basic", func(t *T.T) { + pt := New() + + pt.AddKV("f", 1.0, true) + pt.AddKV("s", "hello", true, WithKVTagSet(true)) + pt.AddKV("i", 1, true) + pt.AddKV("u", uint64(math.MaxUint64), true) + pt.AddKV("b", false, true) + pt.AddKV("d", []byte("world"), true) + + t.Logf("pt: %s", pt.Pretty()) + }) + + t.Run("with-point-pool", func(t *T.T) { + + pp := NewReservedCapPointPool(100) + SetPointPool(pp) + t.Cleanup(func() { + ClearPointPool() + }) + + pt := pp.Get() + + pt.AddKV("f", 1.0, true) + pt.AddKV("s", "hello", true, WithKVTagSet(true)) + pt.AddKV("i", 1, true) + pt.AddKV("u", uint64(math.MaxUint64), true) + pt.AddKV("b", false, true) + pt.AddKV("d", []byte("world"), true) + + t.Logf("pt: %s", pt.Pretty()) + }) +} + +func Test_pointSize(t *T.T) { + t.Run(`basic`, func(t *T.T) { + pp := NewReservedCapPointPool(100) + + SetPointPool(pp) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + + t.Cleanup(func() { + reg.Unregister(pp) + ClearPointPool() + }) + + pt := pp.Get() + + pt.AddKV("f", 1.0, true) + pt.AddKV("s", "hello", true, WithKVTagSet(true)) + pt.AddKV("i", 1, true) + pt.AddKV("u", uint64(math.MaxUint64), true) + pt.AddKV("b", false, true) + pt.AddKV("d", []byte("world"), true) + + pp.Put(pt) + + mfs, err := reg.Gather() + assert.NoError(t, err) + + t.Logf("mfs: %s", metrics.MetricFamily2Text(mfs)) + }) +} + +func BenchmarkBufPool(b *T.B) { + largeStr := cliutils.CreateRandomString((1 << 8)) + + reg := prometheus.NewRegistry() + + pp := NewReservedCapPointPool(100) + reg.MustRegister(Metrics()...) + + SetPointPool(pp) + b.Cleanup(func() { + reg.Unregister(pp) + ClearPointPool() + }) + + b.ResetTimer() + b.Run("with-buf-pool", func(b *T.B) { + for i := 0; i < b.N; i++ { + pt := New(). + AddKV("s", "hello", true, WithKVTagSet(true)). + AddKV("large-s", largeStr, true). + SetName("with-buf-pool"). + SetTimestamp(time.Now().UnixNano()). + Check() + pp.Put(pt) + } + }) + + b.ResetTimer() + b.Run("without-buf-pool", func(b *T.B) { + for i := 0; i < b.N; i++ { + var kvs KVs + + kvs = kvs.AddV2("s", "hello", true, WithKVTagSet(true)) + kvs = kvs.AddV2("large-s", largeStr, true) + pt := NewPointV2("without-buf-pool", kvs, WithPrecheck(false)) + + pp.Put(pt) + } + }) + + mfs, err := reg.Gather() + assert.NoError(b, err) + + b.Logf("mfs: %s", metrics.MetricFamily2Text(mfs)) +} + +func TestCheckWithOptions(t *T.T) { + t.Run(`multiple-check`, func(t *T.T) { + pt := New(). + AddKV("nil", nil, true). + AddKV("f1", 1.23, true).Check() + + t.Logf("pt: %s", pt.Pretty()) + + pt = pt.cfg.check(pt) + + // the nil-value field still in point, should we remove the field after check? + assert.Len(t, pt.pt.Warns, 3) + + t.Logf("pt: %s", pt.Pretty()) + }) +} From 3bf701cc2bf2f85d9b2166d487739938c6a92937 Mon Sep 17 00:00:00 2001 From: coanor Date: Tue, 30 Apr 2024 18:31:01 +0800 Subject: [PATCH 2/2] update cfg applied to point --- point/new_point.go | 24 ++++++++++-------------- point/ptpool.go | 3 ++- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/point/new_point.go b/point/new_point.go index dcf57ce5..5a8dc8ac 100644 --- a/point/new_point.go +++ b/point/new_point.go @@ -10,9 +10,7 @@ import ( ) func NewPointV2(name string, kvs KVs, opts ...Option) *Point { - c := getCfg(opts...) - - return doNewPoint(name, kvs, c) + return doNewPoint(name, kvs, opts...) } // NewPoint returns a new Point given name(measurement), tags, fields and optional options. @@ -33,12 +31,10 @@ func NewPoint(name string, tags map[string]string, fields map[string]any, opts . kvs = kvs.MustAddTag(k, v) // force add these tags } - c := getCfg(opts...) - - return doNewPoint(name, kvs, c), nil + return doNewPoint(name, kvs, opts...), nil } -func doNewPoint(name string, kvs KVs, c *cfg) *Point { +func doNewPoint(name string, kvs KVs, opts ...Option) *Point { var pt *Point if defaultPTPool != nil { @@ -53,31 +49,31 @@ func doNewPoint(name string, kvs KVs, c *cfg) *Point { pt.pt.Fields = kvs } - pt.cfg = c + applyCfgOptions(pt.cfg, opts...) // add extra tags - if len(c.extraTags) > 0 { - for _, kv := range c.extraTags { + if len(pt.cfg.extraTags) > 0 { + for _, kv := range pt.cfg.extraTags { pt.AddTag(kv.Key, kv.GetS()) // NOTE: do-not-override exist keys } } - if c.enc == Protobuf { + if pt.cfg.enc == Protobuf { pt.SetFlag(Ppb) } - if c.keySorted { + if pt.cfg.keySorted { kvs := KVs(pt.pt.Fields) sort.Sort(kvs) pt.pt.Fields = kvs } - if c.precheck { + if pt.cfg.precheck { pt = pt.cfg.check(pt) } // sort again: during check, kv maybe update - if c.keySorted { + if pt.cfg.keySorted { sort.Sort(KVs(pt.pt.Fields)) } diff --git a/point/ptpool.go b/point/ptpool.go index 8825039c..1931a95f 100644 --- a/point/ptpool.go +++ b/point/ptpool.go @@ -144,7 +144,8 @@ func (p *Point) Reset() { func emptyPoint() *Point { return &Point{ - pt: &PBPoint{}, + pt: &PBPoint{}, + cfg: getCfg(), } }