From 75b629ac657c40a055c5c3e46ae42a0e2e72d8f6 Mon Sep 17 00:00:00 2001 From: Andrew Hu Date: Fri, 6 Feb 2026 21:50:45 -0500 Subject: [PATCH] =?UTF-8?q?feat(manifest):=20=E2=9C=A8=20add=20per-file=20?= =?UTF-8?q?column=20statistics=20for=20codec-agnostic=20pruning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add FileStats and ColumnStats types to FileRef for per-file column statistics. Codecs opt in via StatisticalCodec interface; Parquet is the first implementation, reporting min/max for orderable types and null count for all columns. JSONL and raw blob writes produce no stats. - New types: FileStats, ColumnStats (api.go) - New interfaces: StatisticalCodec, StatisticalStreamEncoder (api.go) - Stats field on FileRef with omitempty for backward compat - Parquet accumulates stats during Encode via table-driven comparison - Write path wired in writeDataFile and StreamWriteRecords - 14 new tests covering codec stats, write-path wiring, and JSON round-trip - Contracts updated: CONTRACT_CORE, CONTRACT_PARQUET, CONTRACT_WRITE_API - PUBLIC_API.md updated with new interfaces, types, and usage example Co-Authored-By: Claude Opus 4.6 --- PUBLIC_API.md | 32 +++ docs/contracts/CONTRACT_CORE.md | 13 ++ docs/contracts/CONTRACT_PARQUET.md | 22 +- docs/contracts/CONTRACT_WRITE_API.md | 4 + lode/api.go | 65 ++++++ lode/codec_parquet.go | 173 ++++++++++++++- lode/codec_parquet_test.go | 302 +++++++++++++++++++++++++++ lode/dataset.go | 14 +- lode/dataset_test.go | 182 ++++++++++++++++ 9 files changed, 797 insertions(+), 10 deletions(-) diff --git a/PUBLIC_API.md b/PUBLIC_API.md index 54e5b9c..9bd8851 100644 --- a/PUBLIC_API.md +++ b/PUBLIC_API.md @@ -163,6 +163,12 @@ construction. **Interfaces:** - `Timestamped` - Optional interface for records with timestamps (see below) +- `StatisticalCodec` - Optional codec interface for per-file column statistics +- `StatisticalStreamEncoder` - Optional stream encoder interface for per-file column statistics + +**Types (per-file statistics):** +- `FileStats` - Per-file row count and column statistics +- `ColumnStats` - Per-column min, max, null count, and distinct count **Range read support:** - `Store.ReadRange(ctx, path, offset, length)` - Read byte range from object @@ -289,6 +295,32 @@ _, err := ds.StreamWriteRecords(ctx, iter, metadata) *Contract reference: [`CONTRACT_PARQUET.md`](docs/contracts/CONTRACT_PARQUET.md)* +### Per-File Statistics + +The Parquet codec implements `StatisticalCodec` and automatically populates +per-file column statistics on each `FileRef` during `Write`: + + +```go +snap, _ := ds.Write(ctx, records, metadata) +for _, f := range snap.Manifest.Files { + if f.Stats != nil { + fmt.Println("rows:", f.Stats.RowCount) + for _, col := range f.Stats.Columns { + fmt.Printf(" %s: min=%v max=%v nulls=%d\n", + col.Name, col.Min, col.Max, col.NullCount) + } + } +} +``` + +Statistics are computed from Go record values during encoding. Orderable types +(int32, int64, float32, float64, string, timestamp) get min/max. All columns +get null count. Boolean and bytes columns have no min/max. + +Codecs that do not implement `StatisticalCodec` (e.g., JSONL) produce no stats — +`FileRef.Stats` is nil and omitted from the manifest JSON. + --- ## Metadata diff --git a/docs/contracts/CONTRACT_CORE.md b/docs/contracts/CONTRACT_CORE.md index 898d6d7..8258684 100644 --- a/docs/contracts/CONTRACT_CORE.md +++ b/docs/contracts/CONTRACT_CORE.md @@ -55,6 +55,19 @@ Minimum required fields: - min/max timestamp (when data units implement `Timestamped`; omit if not applicable) Optional fields: - codec name (omit when no codec is configured) +- per-file statistics (when the codec reports them via `StatisticalCodec`; omit when not available) + +### Per-File Statistics + +FileRef MAY contain per-file statistics reported by the codec. + +- Statistics are codec-agnostic: any codec may report them via the `StatisticalCodec` interface. +- When a codec reports statistics, they MUST be persisted on the FileRef. +- When a codec does not report statistics, the stats field MUST be omitted. +- Statistics values MUST be JSON-serializable. +- Statistics MUST NOT be inferred; they are reported by the codec from observed data. +- Per-file statistics include: row count, and per-column min, max, null count, and distinct count. +- Distinct count is optional; zero means not computed. ### Checksum Rules diff --git a/docs/contracts/CONTRACT_PARQUET.md b/docs/contracts/CONTRACT_PARQUET.md index 2552422..409782c 100644 --- a/docs/contracts/CONTRACT_PARQUET.md +++ b/docs/contracts/CONTRACT_PARQUET.md @@ -240,14 +240,20 @@ The following statistics MAY be extracted and recorded in manifests: - `RowCount`: Total rows in the file (from Parquet metadata). - `MinTimestamp` / `MaxTimestamp`: When a timestamp column is designated. -### Future: Extended Manifest Stats - -Future versions MAY add: -- Column-level min/max statistics. -- Null counts per column. -- Byte size per column. - -These extensions are additive and do not affect this contract. +### Per-File Statistics + +The Parquet codec implements `StatisticalCodec` and reports per-file statistics +via `FileStats()` after each `Encode` call: + +- `RowCount`: Total rows encoded in the file. +- Per-column `Min` and `Max` for orderable types: int32, int64, float32, float64, string, timestamp. +- Per-column `NullCount`: Number of null values for nullable columns. +- `DistinctCount`: Reserved for future use (reported as 0). +- Boolean and bytes columns report `NullCount` only (no min/max). +- Statistics are computed during `Encode` from the Go record values, not from Parquet internal metadata. +- Min/Max values use the same Go types as the input records. When serialized to + JSON, `time.Time` becomes an RFC3339Nano string; consumers must interpret based + on schema context. --- diff --git a/docs/contracts/CONTRACT_WRITE_API.md b/docs/contracts/CONTRACT_WRITE_API.md index 6d7425a..2b9c3f8 100644 --- a/docs/contracts/CONTRACT_WRITE_API.md +++ b/docs/contracts/CONTRACT_WRITE_API.md @@ -33,6 +33,8 @@ It is authoritative for any `Dataset` implementation. - Writes MUST NOT mutate existing snapshots or manifests. - The manifest MUST include all required fields defined in `CONTRACT_CORE.md` (including row/event count and min/max timestamp when applicable). +- When the codec implements `StatisticalCodec`, per-file statistics MUST be + collected after encoding and recorded on the FileRef. - When no codec is configured, each write represents a single data unit and the row/event count MUST be `1`. @@ -68,6 +70,8 @@ It is authoritative for any `Dataset` implementation. - Row/event count MUST equal the total number of records consumed. - When a checksum component is configured, the checksum MUST be computed during streaming and recorded in the manifest for each file written. +- When the stream encoder implements `StatisticalStreamEncoder`, per-file + statistics MUST be collected after stream finalization and recorded on the FileRef. ### Timestamp computation diff --git a/lode/api.go b/lode/api.go index 89ee681..fe212e1 100644 --- a/lode/api.go +++ b/lode/api.go @@ -116,6 +116,40 @@ type FileRef struct { // Checksum is an optional integrity hash. Checksum string `json:"checksum,omitempty"` + + // Stats contains per-file column statistics reported by the codec. + // Omitted when the codec does not report statistics. + Stats *FileStats `json:"stats,omitempty"` +} + +// FileStats holds per-file statistics reported by a codec after encoding. +type FileStats struct { + // RowCount is the number of rows in this file. + RowCount int64 `json:"row_count"` + + // Columns contains per-column statistics. May be empty if the codec + // does not report column-level stats. + Columns []ColumnStats `json:"columns,omitempty"` +} + +// ColumnStats holds per-column statistics for a single data file. +type ColumnStats struct { + // Name is the column/field name. + Name string `json:"name"` + + // Min is the minimum value observed. nil if no non-null values exist. + // Values are JSON-serializable Go types matching the codec's decoded types. + Min any `json:"min,omitempty"` + + // Max is the maximum value observed. nil if no non-null values exist. + Max any `json:"max,omitempty"` + + // NullCount is the number of null values observed for this column. + NullCount int64 `json:"null_count"` + + // DistinctCount is the approximate number of distinct values. + // Zero means not computed or not available. + DistinctCount int64 `json:"distinct_count,omitempty"` } // ----------------------------------------------------------------------------- @@ -211,6 +245,37 @@ type RecordStreamEncoder interface { Close() error } +// ----------------------------------------------------------------------------- +// Statistical codec interfaces +// ----------------------------------------------------------------------------- + +// StatisticalCodec is implemented by codecs that report per-file statistics +// after encoding. This is an optional extension to the Codec interface. +// +// The codec accumulates statistics during Encode and returns them via FileStats. +// FileStats must be called after a successful Encode call and before the next +// Encode call; behavior is undefined otherwise. +type StatisticalCodec interface { + Codec + + // FileStats returns statistics accumulated during the most recent Encode call. + // Returns nil if no statistics are available. + FileStats() *FileStats +} + +// StatisticalStreamEncoder is implemented by stream encoders that report +// per-file statistics after the stream is finalized. This is an optional +// extension to the RecordStreamEncoder interface. +// +// FileStats must be called after Close returns successfully. +type StatisticalStreamEncoder interface { + RecordStreamEncoder + + // FileStats returns statistics accumulated during stream encoding. + // Returns nil if no statistics are available. + FileStats() *FileStats +} + // ----------------------------------------------------------------------------- // Compressor interface // ----------------------------------------------------------------------------- diff --git a/lode/codec_parquet.go b/lode/codec_parquet.go index 74af911..edf913e 100644 --- a/lode/codec_parquet.go +++ b/lode/codec_parquet.go @@ -76,13 +76,14 @@ func WithParquetCompression(codec ParquetCompression) ParquetOption { // Error sentinels ErrSchemaViolation and ErrInvalidFormat are defined in api.go. -// parquetCodec implements Codec for Apache Parquet format. +// parquetCodec implements Codec and StatisticalCodec for Apache Parquet format. type parquetCodec struct { schema ParquetSchema compression ParquetCompression pqSchema *parquet.Schema fieldOrder []string // ordered field names matching schema columns fieldsByName map[string]ParquetField // field lookup by name + lastStats *FileStats // stats from most recent Encode call } // validateSchema validates a ParquetSchema and builds a field lookup map. @@ -147,7 +148,13 @@ func (c *parquetCodec) Name() string { return "parquet" } +// FileStats returns statistics accumulated during the most recent Encode call. +func (c *parquetCodec) FileStats() *FileStats { + return c.lastStats +} + func (c *parquetCodec) Encode(w io.Writer, records []any) error { + c.lastStats = nil // reset before encoding // Buffer to collect complete parquet file var buf bytes.Buffer @@ -178,6 +185,9 @@ func (c *parquetCodec) Encode(w io.Writer, records []any) error { return fmt.Errorf("parquet: close writer: %w", err) } + // Compute per-column statistics from the input records + c.lastStats = computeFileStats(c.schema, records) + // Write buffered content to output _, err := io.Copy(w, &buf) return err @@ -485,3 +495,164 @@ func buildFieldNode(field ParquetField) parquet.Node { return node } + +// ----------------------------------------------------------------------------- +// Per-file statistics +// ----------------------------------------------------------------------------- + +// orderable maps ParquetType to whether min/max comparisons are meaningful. +var orderable = map[ParquetType]bool{ + ParquetInt32: true, + ParquetInt64: true, + ParquetFloat32: true, + ParquetFloat64: true, + ParquetString: true, + ParquetTimestamp: true, + ParquetBool: false, + ParquetBytes: false, +} + +// computeFileStats computes per-column statistics from the input records. +func computeFileStats(schema ParquetSchema, records []any) *FileStats { + columns := make([]ColumnStats, len(schema.Fields)) + for i, field := range schema.Fields { + columns[i] = computeColumnStats(field, records) + } + return &FileStats{ + RowCount: int64(len(records)), + Columns: columns, + } +} + +// columnAccumulator tracks min/max/nullCount for a single column. +type columnAccumulator struct { + field ParquetField + min any + max any + nullCount int64 + hasValue bool +} + +// observe records a single value for this column. +func (a *columnAccumulator) observe(val any) { + if val == nil { + a.nullCount++ + return + } + + if !orderable[a.field.Type] { + return + } + + if !a.hasValue { + a.min = val + a.max = val + a.hasValue = true + return + } + + if lessThan(a.field.Type, val, a.min) { + a.min = val + } + if lessThan(a.field.Type, a.max, val) { + a.max = val + } +} + +// computeColumnStats computes statistics for a single column across all records. +func computeColumnStats(field ParquetField, records []any) ColumnStats { + acc := columnAccumulator{field: field} + + for _, record := range records { + m, ok := record.(map[string]any) + if !ok { + continue + } + val, exists := m[field.Name] + if !exists { + acc.observe(nil) + continue + } + acc.observe(val) + } + + cs := ColumnStats{ + Name: field.Name, + NullCount: acc.nullCount, + } + if acc.hasValue { + cs.Min = acc.min + cs.Max = acc.max + } + return cs +} + +// lessThan returns true if a < b for the given orderable ParquetType. +// Both a and b must be non-nil. Handles type coercion for numeric types +// that may arrive as different Go types (int, int32, int64, float64). +// +//nolint:gocyclo // Type dispatch for each orderable Parquet type. +func lessThan(ptype ParquetType, a, b any) bool { + switch ptype { + case ParquetInt32: + return toInt64(a) < toInt64(b) + case ParquetInt64: + return toInt64(a) < toInt64(b) + case ParquetFloat32: + return toFloat64(a) < toFloat64(b) + case ParquetFloat64: + return toFloat64(a) < toFloat64(b) + case ParquetString: + as, _ := a.(string) + bs, _ := b.(string) + return as < bs + case ParquetTimestamp: + return toTimestamp(a).Before(toTimestamp(b)) + default: + return false + } +} + +// toInt64 coerces numeric Go values to int64 for comparison. +func toInt64(v any) int64 { + switch n := v.(type) { + case int: + return int64(n) + case int32: + return int64(n) + case int64: + return n + case float64: + return int64(n) + default: + return 0 + } +} + +// toFloat64 coerces numeric Go values to float64 for comparison. +func toFloat64(v any) float64 { + switch n := v.(type) { + case float32: + return float64(n) + case float64: + return n + default: + return 0 + } +} + +// toTimestamp coerces time values for comparison. +func toTimestamp(v any) time.Time { + switch t := v.(type) { + case time.Time: + return t + case string: + parsed, err := time.Parse(time.RFC3339Nano, t) + if err != nil { + return time.Time{} + } + return parsed + default: + return time.Time{} + } +} diff --git a/lode/codec_parquet_test.go b/lode/codec_parquet_test.go index 7dde20e..4448772 100644 --- a/lode/codec_parquet_test.go +++ b/lode/codec_parquet_test.go @@ -924,3 +924,305 @@ func (s *parquetTestIterator) Record() any { func (s *parquetTestIterator) Err() error { return s.err } + +// ----------------------------------------------------------------------------- +// FileStats tests +// ----------------------------------------------------------------------------- + +func TestParquetCodec_FileStats_BasicTypes(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "id", Type: ParquetInt64}, + {Name: "name", Type: ParquetString}, + {Name: "score", Type: ParquetFloat64}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"id": int64(10), "name": "alice", "score": 95.5}, + map[string]any{"id": int64(1), "name": "bob", "score": 87.3}, + map[string]any{"id": int64(5), "name": "carol", "score": 99.0}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + sc, ok := codec.(StatisticalCodec) + if !ok { + t.Fatal("parquet codec does not implement StatisticalCodec") + } + + stats := sc.FileStats() + if stats == nil { + t.Fatal("FileStats() returned nil") + } + if stats.RowCount != 3 { + t.Errorf("RowCount = %d, want 3", stats.RowCount) + } + if len(stats.Columns) != 3 { + t.Fatalf("len(Columns) = %d, want 3", len(stats.Columns)) + } + + // id: min=1, max=10, nulls=0 + idCol := stats.Columns[0] + if idCol.Name != "id" { + t.Errorf("Columns[0].Name = %q, want %q", idCol.Name, "id") + } + if idCol.Min != int64(1) { + t.Errorf("id.Min = %v, want 1", idCol.Min) + } + if idCol.Max != int64(10) { + t.Errorf("id.Max = %v, want 10", idCol.Max) + } + if idCol.NullCount != 0 { + t.Errorf("id.NullCount = %d, want 0", idCol.NullCount) + } + + // name: min="alice", max="carol" + nameCol := stats.Columns[1] + if nameCol.Min != "alice" { + t.Errorf("name.Min = %v, want %q", nameCol.Min, "alice") + } + if nameCol.Max != "carol" { + t.Errorf("name.Max = %v, want %q", nameCol.Max, "carol") + } + + // score: min=87.3, max=99.0 + scoreCol := stats.Columns[2] + if scoreCol.Min != 87.3 { + t.Errorf("score.Min = %v, want 87.3", scoreCol.Min) + } + if scoreCol.Max != 99.0 { + t.Errorf("score.Max = %v, want 99.0", scoreCol.Max) + } +} + +func TestParquetCodec_FileStats_NullableFields(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "id", Type: ParquetInt64}, + {Name: "tag", Type: ParquetString, Nullable: true}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"id": int64(1), "tag": "alpha"}, + map[string]any{"id": int64(2), "tag": nil}, + map[string]any{"id": int64(3), "tag": "gamma"}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + tagCol := stats.Columns[1] + if tagCol.NullCount != 1 { + t.Errorf("tag.NullCount = %d, want 1", tagCol.NullCount) + } + if tagCol.Min != "alpha" { + t.Errorf("tag.Min = %v, want %q", tagCol.Min, "alpha") + } + if tagCol.Max != "gamma" { + t.Errorf("tag.Max = %v, want %q", tagCol.Max, "gamma") + } +} + +func TestParquetCodec_FileStats_AllNulls(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "val", Type: ParquetInt64, Nullable: true}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"val": nil}, + map[string]any{"val": nil}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + col := stats.Columns[0] + if col.NullCount != 2 { + t.Errorf("NullCount = %d, want 2", col.NullCount) + } + if col.Min != nil { + t.Errorf("Min = %v, want nil", col.Min) + } + if col.Max != nil { + t.Errorf("Max = %v, want nil", col.Max) + } +} + +func TestParquetCodec_FileStats_SingleRecord(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "x", Type: ParquetFloat64}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"x": 42.0}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + col := stats.Columns[0] + if col.Min != 42.0 { + t.Errorf("Min = %v, want 42.0", col.Min) + } + if col.Max != 42.0 { + t.Errorf("Max = %v, want 42.0", col.Max) + } + if stats.RowCount != 1 { + t.Errorf("RowCount = %d, want 1", stats.RowCount) + } +} + +func TestParquetCodec_FileStats_BoolAndBytes_NoMinMax(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "flag", Type: ParquetBool}, + {Name: "data", Type: ParquetBytes}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"flag": true, "data": []byte("hello")}, + map[string]any{"flag": false, "data": []byte("world")}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + + flagCol := stats.Columns[0] + if flagCol.Min != nil { + t.Errorf("flag.Min = %v, want nil", flagCol.Min) + } + if flagCol.Max != nil { + t.Errorf("flag.Max = %v, want nil", flagCol.Max) + } + if flagCol.NullCount != 0 { + t.Errorf("flag.NullCount = %d, want 0", flagCol.NullCount) + } + + dataCol := stats.Columns[1] + if dataCol.Min != nil { + t.Errorf("data.Min = %v, want nil", dataCol.Min) + } + if dataCol.Max != nil { + t.Errorf("data.Max = %v, want nil", dataCol.Max) + } +} + +func TestParquetCodec_FileStats_Timestamps(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "ts", Type: ParquetTimestamp}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + t1 := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + t2 := time.Date(2024, 6, 15, 12, 0, 0, 0, time.UTC) + t3 := time.Date(2024, 3, 10, 8, 0, 0, 0, time.UTC) + + records := []any{ + map[string]any{"ts": t2}, + map[string]any{"ts": t1}, + map[string]any{"ts": t3}, + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, records); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + col := stats.Columns[0] + + minTs, ok := col.Min.(time.Time) + if !ok { + t.Fatalf("ts.Min is %T, want time.Time", col.Min) + } + if !minTs.Equal(t1) { + t.Errorf("ts.Min = %v, want %v", minTs, t1) + } + + maxTs, ok := col.Max.(time.Time) + if !ok { + t.Fatalf("ts.Max is %T, want time.Time", col.Max) + } + if !maxTs.Equal(t2) { + t.Errorf("ts.Max = %v, want %v", maxTs, t2) + } +} + +func TestParquetCodec_FileStats_EmptyRecords(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "id", Type: ParquetInt64}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := codec.Encode(&buf, []any{}); err != nil { + t.Fatal(err) + } + + stats := codec.(StatisticalCodec).FileStats() + if stats == nil { + t.Fatal("FileStats() returned nil for empty records") + } + if stats.RowCount != 0 { + t.Errorf("RowCount = %d, want 0", stats.RowCount) + } + if len(stats.Columns) != 1 { + t.Fatalf("len(Columns) = %d, want 1", len(stats.Columns)) + } + if stats.Columns[0].Min != nil { + t.Errorf("Min = %v, want nil", stats.Columns[0].Min) + } +} diff --git a/lode/dataset.go b/lode/dataset.go index 4d058ba..3a81348 100644 --- a/lode/dataset.go +++ b/lode/dataset.go @@ -630,6 +630,12 @@ func (d *dataset) StreamWriteRecords(ctx context.Context, records RecordIterator return nil, fmt.Errorf("lode: failed to close encoder: %w", err) } + // Collect per-file stats if the stream encoder supports it + var fileStats *FileStats + if se, ok := encoder.(StatisticalStreamEncoder); ok { + fileStats = se.FileStats() + } + // Close compression (flushes final data) if err := compWriter.Close(); err != nil { _ = pw.CloseWithError(err) @@ -651,10 +657,11 @@ func (d *dataset) StreamWriteRecords(ctx context.Context, records RecordIterator return nil, fmt.Errorf("lode: failed to write data: %w", err) } - // Build file reference with optional checksum + // Build file reference with optional checksum and stats fileRef := FileRef{ Path: filePath, SizeBytes: cw.n, + Stats: fileStats, } if hasher != nil { fileRef.Checksum = hasher.Sum() @@ -782,6 +789,11 @@ func (d *dataset) writeDataFile(ctx context.Context, snapshotID DatasetSnapshotI fileRef.Checksum = hasher.Sum() } + // Collect per-file stats if the codec supports it + if sc, ok := d.codec.(StatisticalCodec); ok { + fileRef.Stats = sc.FileStats() + } + return fileRef, nil } diff --git a/lode/dataset_test.go b/lode/dataset_test.go index 9fed71d..dbc7253 100644 --- a/lode/dataset_test.go +++ b/lode/dataset_test.go @@ -1,6 +1,7 @@ package lode import ( + "encoding/json" "errors" "io" "strings" @@ -1929,6 +1930,187 @@ func TestDataset_Write_SingleTimestampedRecord_SameMinMax(t *testing.T) { } } +// ----------------------------------------------------------------------------- +// FileStats write-path tests +// ----------------------------------------------------------------------------- + +func TestDataset_Write_ParquetCodec_StatsPopulated(t *testing.T) { + schema := ParquetSchema{ + Fields: []ParquetField{ + {Name: "id", Type: ParquetInt64}, + {Name: "name", Type: ParquetString}, + }, + } + codec, err := NewParquetCodec(schema) + if err != nil { + t.Fatal(err) + } + + ds, err := NewDataset("stats-ds", NewMemoryFactory(), WithCodec(codec)) + if err != nil { + t.Fatal(err) + } + + records := []any{ + map[string]any{"id": int64(1), "name": "alice"}, + map[string]any{"id": int64(2), "name": "bob"}, + } + + snap, err := ds.Write(t.Context(), records, Metadata{}) + if err != nil { + t.Fatal(err) + } + + if len(snap.Manifest.Files) != 1 { + t.Fatalf("expected 1 file, got %d", len(snap.Manifest.Files)) + } + + stats := snap.Manifest.Files[0].Stats + if stats == nil { + t.Fatal("expected Stats on FileRef, got nil") + } + if stats.RowCount != 2 { + t.Errorf("RowCount = %d, want 2", stats.RowCount) + } + if len(stats.Columns) != 2 { + t.Fatalf("len(Columns) = %d, want 2", len(stats.Columns)) + } + if stats.Columns[0].Name != "id" { + t.Errorf("Columns[0].Name = %q, want %q", stats.Columns[0].Name, "id") + } +} + +func TestDataset_Write_JSONLCodec_StatsNil(t *testing.T) { + ds, err := NewDataset("jsonl-ds", NewMemoryFactory(), WithCodec(NewJSONLCodec())) + if err != nil { + t.Fatal(err) + } + + records := []any{map[string]any{"key": "value"}} + snap, err := ds.Write(t.Context(), records, Metadata{}) + if err != nil { + t.Fatal(err) + } + + if snap.Manifest.Files[0].Stats != nil { + t.Errorf("expected nil Stats for JSONL codec, got %+v", snap.Manifest.Files[0].Stats) + } +} + +func TestDataset_Write_RawBlob_StatsNil(t *testing.T) { + ds, err := NewDataset("blob-ds", NewMemoryFactory()) + if err != nil { + t.Fatal(err) + } + + snap, err := ds.Write(t.Context(), []any{[]byte("raw data")}, Metadata{}) + if err != nil { + t.Fatal(err) + } + + if snap.Manifest.Files[0].Stats != nil { + t.Errorf("expected nil Stats for raw blob, got %+v", snap.Manifest.Files[0].Stats) + } +} + +func TestDataset_StreamWriteRecords_StatsNil(t *testing.T) { + ds, err := NewDataset("stream-ds", NewMemoryFactory(), WithCodec(NewJSONLCodec())) + if err != nil { + t.Fatal(err) + } + + iter := &sliceIterator{records: []any{map[string]any{"key": "value"}}} + snap, err := ds.StreamWriteRecords(t.Context(), iter, Metadata{}) + if err != nil { + t.Fatal(err) + } + + if snap.Manifest.Files[0].Stats != nil { + t.Errorf("expected nil Stats for JSONL stream, got %+v", snap.Manifest.Files[0].Stats) + } +} + +// ----------------------------------------------------------------------------- +// FileStats serialization tests +// ----------------------------------------------------------------------------- + +func TestFileRef_Stats_JSONRoundTrip(t *testing.T) { + ref := FileRef{ + Path: "data/test.parquet", + SizeBytes: 1024, + Stats: &FileStats{ + RowCount: 100, + Columns: []ColumnStats{ + {Name: "id", Min: float64(1), Max: float64(100), NullCount: 0}, + {Name: "name", Min: "alice", Max: "zara", NullCount: 5}, + }, + }, + } + + data, err := json.Marshal(ref) + if err != nil { + t.Fatal(err) + } + + var decoded FileRef + if err := json.Unmarshal(data, &decoded); err != nil { + t.Fatal(err) + } + + if decoded.Stats == nil { + t.Fatal("expected Stats after round-trip, got nil") + } + if decoded.Stats.RowCount != 100 { + t.Errorf("RowCount = %d, want 100", decoded.Stats.RowCount) + } + if len(decoded.Stats.Columns) != 2 { + t.Fatalf("len(Columns) = %d, want 2", len(decoded.Stats.Columns)) + } + if decoded.Stats.Columns[0].Name != "id" { + t.Errorf("Columns[0].Name = %q, want %q", decoded.Stats.Columns[0].Name, "id") + } + // JSON round-trips numbers as float64 + if decoded.Stats.Columns[0].Min != float64(1) { + t.Errorf("Columns[0].Min = %v, want 1", decoded.Stats.Columns[0].Min) + } + if decoded.Stats.Columns[1].NullCount != 5 { + t.Errorf("Columns[1].NullCount = %d, want 5", decoded.Stats.Columns[1].NullCount) + } +} + +func TestFileRef_Stats_BackwardCompat(t *testing.T) { + // JSON without stats field should decode cleanly + jsonData := `{"path":"data/test.gz","size_bytes":512}` + + var ref FileRef + if err := json.Unmarshal([]byte(jsonData), &ref); err != nil { + t.Fatal(err) + } + + if ref.Stats != nil { + t.Errorf("expected nil Stats for JSON without stats field, got %+v", ref.Stats) + } + if ref.Path != "data/test.gz" { + t.Errorf("Path = %q, want %q", ref.Path, "data/test.gz") + } +} + +func TestFileRef_Stats_OmittedWhenNil(t *testing.T) { + ref := FileRef{ + Path: "data/test.gz", + SizeBytes: 256, + } + + data, err := json.Marshal(ref) + if err != nil { + t.Fatal(err) + } + + if strings.Contains(string(data), "stats") { + t.Errorf("expected no stats key in JSON when Stats is nil, got: %s", data) + } +} + // ----------------------------------------------------------------------------- // Test helpers // -----------------------------------------------------------------------------