Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions PUBLIC_API.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ construction.

**Interfaces:**
- `Timestamped` - Optional interface for records with timestamps (see below)
- `StatisticalCodec` - Optional codec interface for per-file column statistics
- `StatisticalStreamEncoder` - Optional stream encoder interface for per-file column statistics

**Types (per-file statistics):**
- `FileStats` - Per-file row count and column statistics
- `ColumnStats` - Per-column min, max, null count, and distinct count

**Range read support:**
- `Store.ReadRange(ctx, path, offset, length)` - Read byte range from object
Expand Down Expand Up @@ -289,6 +295,32 @@ _, err := ds.StreamWriteRecords(ctx, iter, metadata)

*Contract reference: [`CONTRACT_PARQUET.md`](docs/contracts/CONTRACT_PARQUET.md)*

### Per-File Statistics

The Parquet codec implements `StatisticalCodec` and automatically populates
per-file column statistics on each `FileRef` during `Write`:

<!-- illustrative -->
```go
snap, _ := ds.Write(ctx, records, metadata)
for _, f := range snap.Manifest.Files {
if f.Stats != nil {
fmt.Println("rows:", f.Stats.RowCount)
for _, col := range f.Stats.Columns {
fmt.Printf(" %s: min=%v max=%v nulls=%d\n",
col.Name, col.Min, col.Max, col.NullCount)
}
}
}
```

Statistics are computed from Go record values during encoding. Orderable types
(int32, int64, float32, float64, string, timestamp) get min/max. All columns
get null count. Boolean and bytes columns have no min/max.

Codecs that do not implement `StatisticalCodec` (e.g., JSONL) produce no stats —
`FileRef.Stats` is nil and omitted from the manifest JSON.

---

## Metadata
Expand Down
13 changes: 13 additions & 0 deletions docs/contracts/CONTRACT_CORE.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ Minimum required fields:
- min/max timestamp (when data units implement `Timestamped`; omit if not applicable)
Optional fields:
- codec name (omit when no codec is configured)
- per-file statistics (when the codec reports them via `StatisticalCodec`; omit when not available)

### Per-File Statistics

FileRef MAY contain per-file statistics reported by the codec.

- Statistics are codec-agnostic: any codec may report them via the `StatisticalCodec` interface.
- When a codec reports statistics, they MUST be persisted on the FileRef.
- When a codec does not report statistics, the stats field MUST be omitted.
- Statistics values MUST be JSON-serializable.
- Statistics MUST NOT be inferred; they are reported by the codec from observed data.
- Per-file statistics include: row count, and per-column min, max, null count, and distinct count.
- Distinct count is optional; zero means not computed.

### Checksum Rules

Expand Down
22 changes: 14 additions & 8 deletions docs/contracts/CONTRACT_PARQUET.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,20 @@ The following statistics MAY be extracted and recorded in manifests:
- `RowCount`: Total rows in the file (from Parquet metadata).
- `MinTimestamp` / `MaxTimestamp`: When a timestamp column is designated.

### Future: Extended Manifest Stats

Future versions MAY add:
- Column-level min/max statistics.
- Null counts per column.
- Byte size per column.

These extensions are additive and do not affect this contract.
### Per-File Statistics

The Parquet codec implements `StatisticalCodec` and reports per-file statistics
via `FileStats()` after each `Encode` call:

- `RowCount`: Total rows encoded in the file.
- Per-column `Min` and `Max` for orderable types: int32, int64, float32, float64, string, timestamp.
- Per-column `NullCount`: Number of null values for nullable columns.
- `DistinctCount`: Reserved for future use (reported as 0).
- Boolean and bytes columns report `NullCount` only (no min/max).
- Statistics are computed during `Encode` from the Go record values, not from Parquet internal metadata.
- Min/Max values use the same Go types as the input records. When serialized to
JSON, `time.Time` becomes an RFC3339Nano string; consumers must interpret based
on schema context.

---

Expand Down
4 changes: 4 additions & 0 deletions docs/contracts/CONTRACT_WRITE_API.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ It is authoritative for any `Dataset` implementation.
- Writes MUST NOT mutate existing snapshots or manifests.
- The manifest MUST include all required fields defined in `CONTRACT_CORE.md`
(including row/event count and min/max timestamp when applicable).
- When the codec implements `StatisticalCodec`, per-file statistics MUST be
collected after encoding and recorded on the FileRef.
- When no codec is configured, each write represents a single data unit and
the row/event count MUST be `1`.

Expand Down Expand Up @@ -68,6 +70,8 @@ It is authoritative for any `Dataset` implementation.
- Row/event count MUST equal the total number of records consumed.
- When a checksum component is configured, the checksum MUST be computed during
streaming and recorded in the manifest for each file written.
- When the stream encoder implements `StatisticalStreamEncoder`, per-file
statistics MUST be collected after stream finalization and recorded on the FileRef.

### Timestamp computation

Expand Down
65 changes: 65 additions & 0 deletions lode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ type FileRef struct {

// Checksum is an optional integrity hash.
Checksum string `json:"checksum,omitempty"`

// Stats contains per-file column statistics reported by the codec.
// Omitted when the codec does not report statistics.
Stats *FileStats `json:"stats,omitempty"`
}

// FileStats holds per-file statistics reported by a codec after encoding.
type FileStats struct {
// RowCount is the number of rows in this file.
RowCount int64 `json:"row_count"`

// Columns contains per-column statistics. May be empty if the codec
// does not report column-level stats.
Columns []ColumnStats `json:"columns,omitempty"`
}

// ColumnStats holds per-column statistics for a single data file.
type ColumnStats struct {
// Name is the column/field name.
Name string `json:"name"`

// Min is the minimum value observed. nil if no non-null values exist.
// Values are JSON-serializable Go types matching the codec's decoded types.
Min any `json:"min,omitempty"`

// Max is the maximum value observed. nil if no non-null values exist.
Max any `json:"max,omitempty"`

// NullCount is the number of null values observed for this column.
NullCount int64 `json:"null_count"`

// DistinctCount is the approximate number of distinct values.
// Zero means not computed or not available.
DistinctCount int64 `json:"distinct_count,omitempty"`
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -211,6 +245,37 @@ type RecordStreamEncoder interface {
Close() error
}

// -----------------------------------------------------------------------------
// Statistical codec interfaces
// -----------------------------------------------------------------------------

// StatisticalCodec is implemented by codecs that report per-file statistics
// after encoding. This is an optional extension to the Codec interface.
//
// The codec accumulates statistics during Encode and returns them via FileStats.
// FileStats must be called after a successful Encode call and before the next
// Encode call; behavior is undefined otherwise.
type StatisticalCodec interface {
Codec

// FileStats returns statistics accumulated during the most recent Encode call.
// Returns nil if no statistics are available.
FileStats() *FileStats
}

// StatisticalStreamEncoder is implemented by stream encoders that report
// per-file statistics after the stream is finalized. This is an optional
// extension to the RecordStreamEncoder interface.
//
// FileStats must be called after Close returns successfully.
type StatisticalStreamEncoder interface {
RecordStreamEncoder

// FileStats returns statistics accumulated during stream encoding.
// Returns nil if no statistics are available.
FileStats() *FileStats
}

// -----------------------------------------------------------------------------
// Compressor interface
// -----------------------------------------------------------------------------
Expand Down
173 changes: 172 additions & 1 deletion lode/codec_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func WithParquetCompression(codec ParquetCompression) ParquetOption {

// Error sentinels ErrSchemaViolation and ErrInvalidFormat are defined in api.go.

// parquetCodec implements Codec for Apache Parquet format.
// parquetCodec implements Codec and StatisticalCodec for Apache Parquet format.
type parquetCodec struct {
schema ParquetSchema
compression ParquetCompression
pqSchema *parquet.Schema
fieldOrder []string // ordered field names matching schema columns
fieldsByName map[string]ParquetField // field lookup by name
lastStats *FileStats // stats from most recent Encode call
}

// validateSchema validates a ParquetSchema and builds a field lookup map.
Expand Down Expand Up @@ -147,7 +148,13 @@ func (c *parquetCodec) Name() string {
return "parquet"
}

// FileStats returns statistics accumulated during the most recent Encode call.
func (c *parquetCodec) FileStats() *FileStats {
return c.lastStats
}

func (c *parquetCodec) Encode(w io.Writer, records []any) error {
c.lastStats = nil // reset before encoding
// Buffer to collect complete parquet file
var buf bytes.Buffer

Expand Down Expand Up @@ -178,6 +185,9 @@ func (c *parquetCodec) Encode(w io.Writer, records []any) error {
return fmt.Errorf("parquet: close writer: %w", err)
}

// Compute per-column statistics from the input records
c.lastStats = computeFileStats(c.schema, records)

// Write buffered content to output
_, err := io.Copy(w, &buf)
return err
Expand Down Expand Up @@ -485,3 +495,164 @@ func buildFieldNode(field ParquetField) parquet.Node {

return node
}

// -----------------------------------------------------------------------------
// Per-file statistics
// -----------------------------------------------------------------------------

// orderable maps ParquetType to whether min/max comparisons are meaningful.
var orderable = map[ParquetType]bool{
ParquetInt32: true,
ParquetInt64: true,
ParquetFloat32: true,
ParquetFloat64: true,
ParquetString: true,
ParquetTimestamp: true,
ParquetBool: false,
ParquetBytes: false,
}

// computeFileStats computes per-column statistics from the input records.
func computeFileStats(schema ParquetSchema, records []any) *FileStats {
columns := make([]ColumnStats, len(schema.Fields))
for i, field := range schema.Fields {
columns[i] = computeColumnStats(field, records)
}
return &FileStats{
RowCount: int64(len(records)),
Columns: columns,
}
}

// columnAccumulator tracks min/max/nullCount for a single column.
type columnAccumulator struct {
field ParquetField
min any
max any
nullCount int64
hasValue bool
}

// observe records a single value for this column.
func (a *columnAccumulator) observe(val any) {
if val == nil {
a.nullCount++
return
}

if !orderable[a.field.Type] {
return
}

if !a.hasValue {
a.min = val
a.max = val
a.hasValue = true
return
}

if lessThan(a.field.Type, val, a.min) {
a.min = val
}
if lessThan(a.field.Type, a.max, val) {
a.max = val
}
}

// computeColumnStats computes statistics for a single column across all records.
func computeColumnStats(field ParquetField, records []any) ColumnStats {
acc := columnAccumulator{field: field}

for _, record := range records {
m, ok := record.(map[string]any)
if !ok {
continue
}
val, exists := m[field.Name]
if !exists {
acc.observe(nil)
continue
}
acc.observe(val)
}

cs := ColumnStats{
Name: field.Name,
NullCount: acc.nullCount,
}
if acc.hasValue {
cs.Min = acc.min
cs.Max = acc.max
}
return cs
}

// lessThan returns true if a < b for the given orderable ParquetType.
// Both a and b must be non-nil. Handles type coercion for numeric types
// that may arrive as different Go types (int, int32, int64, float64).
//
//nolint:gocyclo // Type dispatch for each orderable Parquet type.
func lessThan(ptype ParquetType, a, b any) bool {
switch ptype {
case ParquetInt32:
return toInt64(a) < toInt64(b)
case ParquetInt64:
return toInt64(a) < toInt64(b)
case ParquetFloat32:
return toFloat64(a) < toFloat64(b)
case ParquetFloat64:
return toFloat64(a) < toFloat64(b)
case ParquetString:
as, _ := a.(string)
bs, _ := b.(string)
return as < bs
case ParquetTimestamp:
return toTimestamp(a).Before(toTimestamp(b))
default:
return false
}
}

// toInt64 coerces numeric Go values to int64 for comparison.
func toInt64(v any) int64 {
switch n := v.(type) {
case int:
return int64(n)
case int32:
return int64(n)
case int64:
return n
case float64:
return int64(n)
default:
return 0
}
}

// toFloat64 coerces numeric Go values to float64 for comparison.
func toFloat64(v any) float64 {
switch n := v.(type) {
case float32:
return float64(n)
case float64:
return n
default:
return 0
}
}

// toTimestamp coerces time values for comparison.
func toTimestamp(v any) time.Time {
switch t := v.(type) {
case time.Time:
return t
case string:
parsed, err := time.Parse(time.RFC3339Nano, t)
if err != nil {
return time.Time{}
}
return parsed
default:
return time.Time{}
}
}
Loading