From 069b511a799635710a545c0ffaad46076fe2e5d9 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 16 Jun 2026 13:53:57 +0930 Subject: [PATCH] lib: add streaming decode, stream producers, and emit macro Add three composable features for processing large payloads in CEL programs without materialising all records in memory: - Opaque stream type (streamVal) wrapping io.Reader, with stream_gzip and stream_zip producers that wrap decompression readers around in-memory bytes. - Lazy JSON stream decoder (decode_json_stream_lazy) that accepts stream, bytes, or string and returns a traits.Iterable decoding values on demand via json.Decoder. - Emit macro that iterates a list or iterable, calling an Emitter callback for each element. Supports optional per-element cursor tracking. In the mito CLI, emitted events are printed to stderr. --- lib/csv.go | 248 ++++++++++++++++++ lib/csv_test.go | 254 ++++++++++++++++++ lib/emit.go | 439 ++++++++++++++++++++++++++++++++ lib/emit_test.go | 271 ++++++++++++++++++++ lib/json.go | 170 +++++++++++++ lib/lines.go | 183 +++++++++++++ lib/lines_test.go | 238 +++++++++++++++++ lib/stream.go | 153 +++++++++++ lib/stream_test.go | 209 +++++++++++++++ lib/types.go | 1 + mito.go | 16 ++ testdata/csv_lazy.txt | 21 ++ testdata/csv_lazy_no_header.txt | 20 ++ testdata/emit.txt | 33 +++ testdata/lines.txt | 21 ++ 15 files changed, 2277 insertions(+) create mode 100644 lib/csv.go create mode 100644 lib/csv_test.go create mode 100644 lib/emit.go create mode 100644 lib/emit_test.go create mode 100644 lib/lines.go create mode 100644 lib/lines_test.go create mode 100644 lib/stream.go create mode 100644 lib/stream_test.go create mode 100644 testdata/csv_lazy.txt create mode 100644 testdata/csv_lazy_no_header.txt create mode 100644 testdata/emit.txt create mode 100644 testdata/lines.txt diff --git a/lib/csv.go b/lib/csv.go new file mode 100644 index 0000000..750ed5f --- /dev/null +++ b/lib/csv.go @@ -0,0 +1,248 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "bytes" + "encoding/csv" + "fmt" + "io" + "reflect" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" +) + +// CSV returns a cel.EnvOption to configure lazy CSV stream decode functions. +// +// # Decode CSV Stream Lazy +// +// decode_csv_stream_lazy returns a lazy iterable that decodes CSV rows on +// demand from the receiver. The first row is treated as a header; each +// subsequent row is returned as a map keyed by the header +// values. The receiver may be bytes, string, or a stream value. +// +// .decode_csv_stream_lazy() -> >> +// .decode_csv_stream_lazy() -> >> +// .decode_csv_stream_lazy() -> >> +// +// # Decode CSV Stream Lazy No Header +// +// decode_csv_stream_lazy_no_header returns a lazy iterable that decodes CSV +// rows on demand. Each row is returned as a list. No header row is +// consumed. +// +// .decode_csv_stream_lazy_no_header() -> >> +// .decode_csv_stream_lazy_no_header() -> >> +// .decode_csv_stream_lazy_no_header() -> >> +func CSV() cel.EnvOption { + return cel.Lib(csvLib{}) +} + +type csvLib struct{} + +func (csvLib) CompileOptions() []cel.EnvOption { + return []cel.EnvOption{ + cel.Function("decode_csv_stream_lazy", + cel.MemberOverload( + "stream_decode_csv_stream_lazy", + []*cel.Type{streamCELType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazy)), + ), + cel.MemberOverload( + "bytes_decode_csv_stream_lazy", + []*cel.Type{cel.BytesType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazy)), + ), + cel.MemberOverload( + "string_decode_csv_stream_lazy", + []*cel.Type{cel.StringType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazy)), + ), + ), + + cel.Function("decode_csv_stream_lazy_no_header", + cel.MemberOverload( + "stream_decode_csv_stream_lazy_no_header", + []*cel.Type{streamCELType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazyNoHeader)), + ), + cel.MemberOverload( + "bytes_decode_csv_stream_lazy_no_header", + []*cel.Type{cel.BytesType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazyNoHeader)), + ), + cel.MemberOverload( + "string_decode_csv_stream_lazy_no_header", + []*cel.Type{cel.StringType}, + cel.DynType, + cel.UnaryBinding(catch(decodeCSVStreamLazyNoHeader)), + ), + ), + } +} + +func (csvLib) ProgramOptions() []cel.ProgramOption { return nil } + +func decodeCSVStreamLazy(val ref.Val) ref.Val { + r := csvReader(val) + if r == nil { + return types.NoSuchOverloadErr() + } + cr := csv.NewReader(r) + header, err := cr.Read() + if err != nil { + return types.NewErr("decode_csv_stream_lazy: reading header: %v", err) + } + return &lazyCSVStream{cr: cr, header: header} +} + +func decodeCSVStreamLazyNoHeader(val ref.Val) ref.Val { + r := csvReader(val) + if r == nil { + return types.NoSuchOverloadErr() + } + return &lazyCSVStream{cr: csv.NewReader(r)} +} + +func csvReader(val ref.Val) io.Reader { + switch v := val.(type) { + case *streamVal: + return v.reader + case types.Bytes: + return bytes.NewReader(v) + case types.String: + return bytes.NewReader([]byte(v)) + default: + return nil + } +} + +var ( + _ ref.Val = (*lazyCSVStream)(nil) + _ traits.Iterable = (*lazyCSVStream)(nil) + _ traits.Iterator = (*csvStreamIterator)(nil) +) + +var lazyCSVStreamRefType = types.NewObjectType("lazy_csv_stream", traits.IterableType) + +// lazyCSVStream is a ref.Val implementing traits.Iterable that decodes CSV +// rows on demand from a csv.Reader. If header is non-nil, each row is +// returned as map[string]string; otherwise as []string. +type lazyCSVStream struct { + cr *csv.Reader + header []string +} + +func (s *lazyCSVStream) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("lazy CSV streams cannot be converted to %v", typeDesc) +} + +func (s *lazyCSVStream) ConvertToType(typeVal ref.Type) ref.Val { + if typeVal == types.TypeType { + return types.NewTypeValue("lazy_csv_stream") + } + return types.NewErr("type conversion error from 'lazy_csv_stream' to '%s'", typeVal.TypeName()) +} + +func (s *lazyCSVStream) Equal(other ref.Val) ref.Val { + return types.NewErr("lazy CSV streams are not comparable") +} + +func (s *lazyCSVStream) Type() ref.Type { return lazyCSVStreamRefType } + +func (s *lazyCSVStream) Value() any { return s.cr } + +func (s *lazyCSVStream) Iterator() traits.Iterator { + return &csvStreamIterator{cr: s.cr, header: s.header} +} + +// csvStreamIterator wraps a csv.Reader as a traits.Iterator. +type csvStreamIterator struct { + cr *csv.Reader + header []string + + // next holds a prefetched row; hasNext reads ahead one row + // because csv.Reader has no equivalent of json.Decoder.More(). + next []string + done bool + err error +} + +func (it *csvStreamIterator) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("CSV stream iterators cannot be converted to %v", typeDesc) +} + +func (it *csvStreamIterator) ConvertToType(typeVal ref.Type) ref.Val { + return types.NewErr("type conversion error from 'csv_stream_iterator' to '%s'", typeVal.TypeName()) +} + +func (it *csvStreamIterator) Equal(other ref.Val) ref.Val { + return types.NewErr("CSV stream iterators are not comparable") +} + +func (it *csvStreamIterator) Type() ref.Type { return types.IteratorType } + +func (it *csvStreamIterator) Value() any { return it.cr } + +func (it *csvStreamIterator) HasNext() ref.Val { + if it.done { + return types.False + } + if it.next != nil { + return types.True + } + rec, err := it.cr.Read() + if err != nil { + it.done = true + if err != io.EOF { + it.err = err + } + return types.False + } + it.next = rec + return types.True +} + +func (it *csvStreamIterator) Next() ref.Val { + rec := it.next + it.next = nil + if rec == nil { + if it.err != nil { + return types.NewErr("csv: %v", it.err) + } + return types.NewErr("csv: no more rows") + } + if it.header != nil { + m := make(map[string]string, len(it.header)) + for j, name := range it.header { + if j < len(rec) { + m[name] = rec[j] + } + } + return types.DefaultTypeAdapter.NativeToValue(m) + } + return types.DefaultTypeAdapter.NativeToValue(rec) +} diff --git a/lib/csv_test.go b/lib/csv_test.go new file mode 100644 index 0000000..88971e2 --- /dev/null +++ b/lib/csv_test.go @@ -0,0 +1,254 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "testing" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" +) + +func TestDecodeCSVStreamLazy_Header(t *testing.T) { + env, err := cel.NewEnv( + CSV(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_csv_stream_lazy()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + csv := []byte("name,age\nalice,30\nbob,25\n") + out, _, err := prg.Eval(map[string]any{"data": csv}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + + want := []map[string]string{ + {"name": "alice", "age": "30"}, + {"name": "bob", "age": "25"}, + } + for i, w := range want { + if it.HasNext() != types.True { + t.Fatalf("row %d: HasNext() = false, want true", i) + } + row := it.Next() + m, ok := row.(traits.Mapper) + if !ok { + t.Fatalf("row %d: got %T, want traits.Mapper", i, row) + } + for k, v := range w { + got := m.Get(types.String(k)) + if got != types.String(v) { + t.Errorf("row %d: %s = %v, want %q", i, k, got, v) + } + } + } + if it.HasNext() != types.False { + t.Errorf("expected no more rows") + } +} + +func TestDecodeCSVStreamLazy_NoHeader(t *testing.T) { + env, err := cel.NewEnv( + CSV(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_csv_stream_lazy_no_header()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + csv := []byte("alice,30\nbob,25\n") + out, _, err := prg.Eval(map[string]any{"data": csv}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + + want := [][]string{ + {"alice", "30"}, + {"bob", "25"}, + } + for i, w := range want { + if it.HasNext() != types.True { + t.Fatalf("row %d: HasNext() = false, want true", i) + } + row := it.Next() + l, ok := row.(traits.Lister) + if !ok { + t.Fatalf("row %d: got %T, want traits.Lister", i, row) + } + for j, v := range w { + got := l.Get(types.Int(j)) + if got != types.String(v) { + t.Errorf("row %d col %d: got %v, want %q", i, j, got, v) + } + } + } + if it.HasNext() != types.False { + t.Errorf("expected no more rows") + } +} + +func TestDecodeCSVStreamLazy_WithEmit(t *testing.T) { + emitter := newTestEmitter() + env, err := cel.NewEnv( + CSV(), + Emit(func() Emitter { return emitter }), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_csv_stream_lazy().emit(row, row)`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + csv := []byte("name,value\nfoo,1\nbar,2\nbaz,3\n") + out, _, err := prg.Eval(map[string]any{"data": csv}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected traits.Mapper, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(3) { + t.Errorf("published = %v, want 3", published) + } + if len(emitter.values) != 3 { + t.Fatalf("emitter received %d values, want 3", len(emitter.values)) + } + row0, ok := emitter.values[0].(map[string]any) + if !ok { + t.Fatalf("row 0: got %T, want map[string]any", emitter.values[0]) + } + if row0["name"] != "foo" || row0["value"] != "1" { + t.Errorf("row 0 = %v, want {name:foo value:1}", row0) + } +} + +func TestDecodeCSVStreamLazy_Stream(t *testing.T) { + env, err := cel.NewEnv( + CSV(), + Stream(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.stream_gzip().decode_csv_stream_lazy()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + csvData := []byte("name,age\nalice,30\n") + gz := gzipBytes(t, csvData) + + out, _, err := prg.Eval(map[string]any{"data": []byte(gz)}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + if it.HasNext() != types.True { + t.Fatal("expected at least one row") + } + row := it.Next() + m, ok := row.(traits.Mapper) + if !ok { + t.Fatalf("got %T, want traits.Mapper", row) + } + if got := m.Get(types.String("name")); got != types.String("alice") { + t.Errorf("name = %v, want alice", got) + } +} + +func TestDecodeCSVStreamLazy_EmptyHeader(t *testing.T) { + env, err := cel.NewEnv( + CSV(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_csv_stream_lazy()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + _, _, err = prg.Eval(map[string]any{"data": []byte("")}) + if err == nil { + t.Error("expected error for empty input, got nil") + } +} + +// gzipBytes is defined in stream_test.go but we need it here too. +// Since both files are in the same package, it's shared. +func csvCheckRef(v ref.Val) { + // Compile-time interface checks for test coverage. + var _ ref.Val = (*lazyCSVStream)(nil) + var _ traits.Iterable = (*lazyCSVStream)(nil) + var _ traits.Iterator = (*csvStreamIterator)(nil) +} diff --git a/lib/emit.go b/lib/emit.go new file mode 100644 index 0000000..74a882d --- /dev/null +++ b/lib/emit.go @@ -0,0 +1,439 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "fmt" + "sync" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common" + "github.com/google/cel-go/common/ast" + "github.com/google/cel-go/common/operators" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" + "github.com/google/cel-go/interpreter" + "github.com/google/cel-go/parser" +) + +// Emitter is the callback interface used by the emit macro to publish events +// during CEL evaluation. +type Emitter interface { + Emit(value, cursor any) error +} + +const ( + emitSentinel = "@emit" + emitBodySentinel = "@emit_body" + emitCursorSentinel = "@emit_cursor" + emitAccuVar = "@emit_result" +) + +// Emit returns a cel.EnvOption that configures the emit macro. The factory +// function is called at eval time to obtain the session-scoped Emitter; +// this indirection is necessary because the CEL program is compiled before +// the publish session exists. +// +// # Call forms +// +// // Two-arg: publish value with no cursor. +// .emit(, ) -> map(string, dyn) +// +// // Three-arg: publish value with per-element cursor. +// .emit(, , ) -> map(string, dyn) +// +// The range must be traits.Lister or traits.Iterable. For each element, +// the macro evaluates the value expression, optionally the cursor expression, +// then calls Emitter.Emit. Iteration is sequential: cursor ordering is +// preserved. +// +// # Return value +// +// The macro returns {"published": }. If a cursor expression was +// present and at least one event was published, the result also contains +// {"cursor": }. If zero events were published, "cursor" is +// absent. +// +// # Error handling +// +// If iteration, expression evaluation, or Emit returns an error, +// iteration stops and the result includes {"error": }. The +// "published" count and "cursor" reflect the state at the point of +// failure. Programs can check for the presence of the "error" key to +// distinguish incomplete processing from a clean completion. +func Emit(factory func() Emitter) cel.EnvOption { + return cel.Lib(&emitLib{factory: factory}) +} + +type emitLib struct { + factory func() Emitter + + mu sync.Mutex + + // bodies maps each @emit sentinel call's AST node ID to compile-time + // metadata recorded during macro expansion. + bodies map[int64]emitBody + + // bodyImpls maps @emit sentinel call ID → planned body Interpretable, + // stored by the @emit_body decorator. + bodyImpls map[int64]interpreter.Interpretable + + // cursorImpls maps @emit sentinel call ID → planned cursor Interpretable, + // stored by the @emit_cursor decorator. Only populated for three-arg form. + cursorImpls map[int64]interpreter.Interpretable +} + +type emitBody struct { + iterVar string + hasCursor bool + bodyCallID int64 + curCallID int64 +} + +func (l *emitLib) CompileOptions() []cel.EnvOption { + return []cel.EnvOption{ + cel.Variable(emitAccuVar, cel.ListType(cel.DynType)), + + // @emit(dyn, dyn) -> map(string, dyn) + cel.Function(emitSentinel, + cel.Overload(emitSentinel+"_impl", + []*cel.Type{cel.DynType, cel.DynType}, + mapStringDyn, + ), + ), + // @emit_body(dyn) -> dyn + cel.Function(emitBodySentinel, + cel.Overload(emitBodySentinel+"_impl", + []*cel.Type{cel.DynType}, + cel.DynType, + ), + ), + // @emit_cursor(dyn) -> dyn + cel.Function(emitCursorSentinel, + cel.Overload(emitCursorSentinel+"_impl", + []*cel.Type{cel.DynType}, + cel.DynType, + ), + ), + + cel.Macros( + cel.ReceiverMacro("emit", 2, l.makeEmit2), + cel.ReceiverMacro("emit", 3, l.makeEmit3), + ), + } +} + +func (l *emitLib) ProgramOptions() []cel.ProgramOption { + return []cel.ProgramOption{ + cel.CustomDecorator(l.emitBodyDecorator()), + cel.CustomDecorator(l.emitCursorDecorator()), + cel.CustomDecorator(l.emitDecorator()), + } +} + +// makeEmit2 handles range.emit(iterVar, valueExpr) +func (l *emitLib) makeEmit2(mef cel.MacroExprFactory, target ast.Expr, args []ast.Expr) (ast.Expr, *common.Error) { + iterVar, err := emitRequireIdent(mef, args[0]) + if err != nil { + return nil, err + } + bodyCall := mef.NewCall(emitBodySentinel, mef.Copy(args[1])) + eb := emitBody{iterVar: iterVar, bodyCallID: bodyCall.ID()} + return l.emitSentinelAST(mef, target, eb, func(rangeExpr ast.Expr) ast.Expr { + accu := mef.NewIdent(emitAccuVar) + return mef.NewComprehension( + rangeExpr, iterVar, emitAccuVar, + mef.NewList(), + mef.NewLiteral(types.True), + mef.NewCall(operators.Add, accu, mef.NewList(bodyCall)), + mef.NewIdent(emitAccuVar), + ) + }) +} + +// makeEmit3 handles range.emit(iterVar, valueExpr, cursorExpr) +func (l *emitLib) makeEmit3(mef cel.MacroExprFactory, target ast.Expr, args []ast.Expr) (ast.Expr, *common.Error) { + iterVar, err := emitRequireIdent(mef, args[0]) + if err != nil { + return nil, err + } + bodyCall := mef.NewCall(emitBodySentinel, mef.Copy(args[1])) + cursorCall := mef.NewCall(emitCursorSentinel, mef.Copy(args[2])) + eb := emitBody{ + iterVar: iterVar, + hasCursor: true, + bodyCallID: bodyCall.ID(), + curCallID: cursorCall.ID(), + } + return l.emitSentinelAST(mef, target, eb, func(rangeExpr ast.Expr) ast.Expr { + accu := mef.NewIdent(emitAccuVar) + return mef.NewComprehension( + rangeExpr, iterVar, emitAccuVar, + mef.NewList(), + mef.NewLiteral(types.True), + mef.NewCall(operators.Add, accu, mef.NewList(bodyCall, cursorCall)), + mef.NewIdent(emitAccuVar), + ) + }) +} + +func (l *emitLib) emitSentinelAST(mef cel.MacroExprFactory, target ast.Expr, eb emitBody, foldBuilder func(rangeExpr ast.Expr) ast.Expr) (ast.Expr, *common.Error) { + fold := foldBuilder(mef.Copy(target)) + sentinel := mef.NewCall(emitSentinel, mef.Copy(target), fold) + l.recordBody(sentinel.ID(), eb) + return sentinel, nil +} + +func (l *emitLib) recordBody(id int64, eb emitBody) { + l.mu.Lock() + defer l.mu.Unlock() + if l.bodies == nil { + l.bodies = make(map[int64]emitBody) + } + l.bodies[id] = eb +} + +func emitRequireIdent(mef cel.MacroExprFactory, e ast.Expr) (string, *common.Error) { + if e.Kind() != ast.IdentKind { + return "", mef.NewError(e.ID(), "emit: iteration variable must be a simple identifier") + } + name := e.AsIdent() + if name == emitAccuVar || name == parser.AccumulatorName || name == parser.HiddenAccumulatorName { + return "", mef.NewError(e.ID(), "emit: iteration variable conflicts with internal accumulator") + } + return name, nil +} + +// emitBodyDecorator intercepts @emit_body calls, stores the body +// Interpretable, and returns the body to make the sentinel transparent. +func (l *emitLib) emitBodyDecorator() interpreter.InterpretableDecorator { + return func(i interpreter.Interpretable) (interpreter.Interpretable, error) { + call, ok := i.(interpreter.InterpretableCall) + if !ok || call.Function() != emitBodySentinel { + return i, nil + } + args := call.Args() + if len(args) != 1 { + return nil, fmt.Errorf("emit: @emit_body has %d args, expected 1", len(args)) + } + l.mu.Lock() + for emitID, eb := range l.bodies { + if eb.bodyCallID == call.ID() { + if l.bodyImpls == nil { + l.bodyImpls = make(map[int64]interpreter.Interpretable) + } + l.bodyImpls[emitID] = args[0] + break + } + } + l.mu.Unlock() + return args[0], nil + } +} + +// emitCursorDecorator intercepts @emit_cursor calls and stores the cursor +// Interpretable. +func (l *emitLib) emitCursorDecorator() interpreter.InterpretableDecorator { + return func(i interpreter.Interpretable) (interpreter.Interpretable, error) { + call, ok := i.(interpreter.InterpretableCall) + if !ok || call.Function() != emitCursorSentinel { + return i, nil + } + args := call.Args() + if len(args) != 1 { + return nil, fmt.Errorf("emit: @emit_cursor has %d args, expected 1", len(args)) + } + l.mu.Lock() + for emitID, eb := range l.bodies { + if eb.curCallID == call.ID() { + if l.cursorImpls == nil { + l.cursorImpls = make(map[int64]interpreter.Interpretable) + } + l.cursorImpls[emitID] = args[0] + break + } + } + l.mu.Unlock() + return args[0], nil + } +} + +// emitDecorator intercepts @emit calls and replaces them with emitFold +// Interpretables. +func (l *emitLib) emitDecorator() interpreter.InterpretableDecorator { + return func(i interpreter.Interpretable) (interpreter.Interpretable, error) { + call, ok := i.(interpreter.InterpretableCall) + if !ok || call.Function() != emitSentinel { + return i, nil + } + args := call.Args() + if len(args) != 2 { + return nil, fmt.Errorf("emit: sentinel call has %d args, expected 2", len(args)) + } + iterRange := args[0] + + l.mu.Lock() + eb, found := l.bodies[call.ID()] + body := l.bodyImpls[call.ID()] + cursor := l.cursorImpls[call.ID()] + l.mu.Unlock() + + if !found { + return nil, fmt.Errorf( + "emit: @emit call %d has no registered metadata; "+ + "ensure the same Emit() lib instance is passed to both "+ + "cel.NewEnv and env.Program", call.ID()) + } + if body == nil { + return nil, fmt.Errorf( + "emit: no body Interpretable for call %d; "+ + "ensure the same Emit() lib instance is passed to both "+ + "cel.NewEnv and env.Program", call.ID()) + } + if eb.hasCursor && cursor == nil { + return nil, fmt.Errorf( + "emit: no cursor Interpretable for call %d; "+ + "ensure the same Emit() lib instance is passed to both "+ + "cel.NewEnv and env.Program", call.ID()) + } + + return &emitFold{ + id: call.ID(), + iterVar: eb.iterVar, + hasCursor: eb.hasCursor, + iterRange: iterRange, + body: body, + cursor: cursor, + factory: l.factory, + }, nil + } +} + +// emitFold is the Interpretable that replaces each @emit sentinel at runtime. +// It iterates sequentially, evaluating the body and optional cursor for each +// element, and calls Emitter.Emit for each. +type emitFold struct { + id int64 + iterVar string + hasCursor bool + iterRange interpreter.Interpretable + body interpreter.Interpretable + cursor interpreter.Interpretable // nil when !hasCursor + factory func() Emitter +} + +var _ interpreter.Interpretable = (*emitFold)(nil) + +func (e *emitFold) ID() int64 { return e.id } + +func (e *emitFold) Eval(activation interpreter.Activation) ref.Val { + rangeVal := e.iterRange.Eval(activation) + if types.IsError(rangeVal) { + return rangeVal + } + + emitter := e.factory() + if emitter == nil { + return types.NewErr("emit: no emitter available") + } + + var iter traits.Iterator + switch rv := rangeVal.(type) { + case traits.Lister: + iter = rv.Iterator() + case traits.Iterable: + iter = rv.Iterator() + default: + return types.NewErr("emit: receiver must be a list or iterable, got %T", rangeVal) + } + + var ( + count int64 + lastCursor any + errMsg string + ) + for iter.HasNext() == types.True { + elem := iter.Next() + if types.IsError(elem) { + errMsg = elem.(*types.Err).Error() + break + } + + bindings := map[string]any{e.iterVar: elem} + child, err := interpreter.NewActivation(bindings) + if err != nil { + return types.WrapErr(err) + } + hier := interpreter.NewHierarchicalActivation(activation, child) + + bodyVal := e.body.Eval(hier) + if types.IsError(bodyVal) { + errMsg = bodyVal.(*types.Err).Error() + break + } + + var cursorVal any + if e.hasCursor { + cv := e.cursor.Eval(hier) + if types.IsError(cv) { + errMsg = cv.(*types.Err).Error() + break + } + cursorVal = nativeValue(cv) + } + + eventVal := nativeValue(bodyVal) + if err := emitter.Emit(eventVal, cursorVal); err != nil { + errMsg = err.Error() + break + } + count++ + if e.hasCursor { + lastCursor = cursorVal + } + } + + result := map[string]any{"published": count} + if e.hasCursor && count > 0 { + result["cursor"] = lastCursor + } + if errMsg != "" { + result["error"] = errMsg + } + return types.DefaultTypeAdapter.NativeToValue(result) +} + +// nativeValue converts a ref.Val to a Go-native value suitable for passing +// to the Emitter. Maps and lists are converted to their native representations; +// scalars are unwrapped via Value(). +func nativeValue(v ref.Val) any { + switch v := v.(type) { + case traits.Mapper: + m, err := v.ConvertToNative(reflectMapStringAnyType) + if err == nil { + return m + } + case traits.Lister: + l, err := v.ConvertToNative(reflectAnySliceType) + if err == nil { + return l + } + } + return v.Value() +} diff --git a/lib/emit_test.go b/lib/emit_test.go new file mode 100644 index 0000000..3f01257 --- /dev/null +++ b/lib/emit_test.go @@ -0,0 +1,271 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "errors" + "sync" + "testing" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" +) + +func TestEmit_TwoArg_Basic(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `items.emit(x, x)`) + out := emitEval(t, prg, map[string]any{"items": []any{1, 2, 3}}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(3) { + t.Errorf("published = %v, want 3", published) + } + if len(emitter.values) != 3 { + t.Errorf("emitter received %d values, want 3", len(emitter.values)) + } + // Two-arg form: no cursor field in result. + cursorField := m.Get(types.String("cursor")) + if !types.IsError(cursorField) { + t.Errorf("expected no cursor field in two-arg form, got %v", cursorField) + } +} + +func TestEmit_ThreeArg_WithCursor(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `items.emit(x, {"value": x}, {"offset": x})`) + out := emitEval(t, prg, map[string]any{"items": []any{10, 20, 30}}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(3) { + t.Errorf("published = %v, want 3", published) + } + cursor := m.Get(types.String("cursor")) + if types.IsError(cursor) { + t.Fatalf("expected cursor field, got error: %v", cursor) + } + // Last cursor should be {"offset": 30}. + cm, ok := cursor.(traits.Mapper) + if !ok { + t.Fatalf("expected cursor map, got %T", cursor) + } + offset := cm.Get(types.String("offset")) + if offset != types.Int(30) { + t.Errorf("cursor.offset = %v, want 30", offset) + } +} + +func TestEmit_EmptyRange(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `items.emit(x, x, {"n": x})`) + out := emitEval(t, prg, map[string]any{"items": []any{}}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(0) { + t.Errorf("published = %v, want 0", published) + } + // No cursor field when zero events published. + cursorField := m.Get(types.String("cursor")) + if !types.IsError(cursorField) { + t.Errorf("expected no cursor field for empty range, got %v", cursorField) + } +} + +func TestEmit_PartialFailure(t *testing.T) { + emitter := newTestEmitter() + emitter.failAt = 2 + + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `items.emit(x, x, {"n": x})`) + out := emitEval(t, prg, map[string]any{"items": []any{1, 2, 3, 4, 5}}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(2) { + t.Errorf("published = %v, want 2", published) + } + cv := m.Get(types.String("cursor")) + cm, ok := cv.(traits.Mapper) + if !ok { + t.Fatalf("cursor: got %T, want traits.Mapper", cv) + } + n := cm.Get(types.String("n")) + if n != types.Int(2) { + t.Errorf("cursor.n = %v, want 2", n) + } + if len(emitter.values) != 2 { + t.Errorf("emitter received %d values, want 2", len(emitter.values)) + } + errVal := m.Get(types.String("error")) + errStr, ok := errVal.(types.String) + if !ok { + t.Fatalf("error: got %T, want types.String", errVal) + } + if errStr != types.String("emit failed") { + t.Errorf("error = %v, want %q", errStr, "emit failed") + } +} + +func TestEmit_WithLazyStream(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `data.decode_json_stream_lazy().emit(x, x)`) + ndjson := []byte("{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n") + out := emitEval(t, prg, map[string]any{"data": ndjson}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(3) { + t.Errorf("published = %v, want 3", published) + } + if len(emitter.values) != 3 { + t.Errorf("emitter received %d values, want 3", len(emitter.values)) + } +} + +func TestEmit_WithGzipStream(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + prg := emitProgram(t, env, `data.stream_gzip().decode_json_stream_lazy().emit(x, x)`) + ndjson := "{\"a\":1}\n{\"b\":2}\n" + compressed := gzipBytes(t, []byte(ndjson)) + out := emitEval(t, prg, map[string]any{"data": compressed}) + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected map, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(2) { + t.Errorf("published = %v, want 2", published) + } +} + +func TestEmit_CompileError_NonIdentIterVar(t *testing.T) { + emitter := newTestEmitter() + env := emitTestEnv(t, emitter) + _, issues := env.Compile(`items.emit(1 + 1, x)`) + if issues == nil || issues.Err() == nil { + t.Fatal("expected compile error for non-identifier iterVar") + } +} + +func TestEmit_WrongLibInstance(t *testing.T) { + lib1 := &emitLib{factory: func() Emitter { return newTestEmitter() }} + lib2 := &emitLib{factory: func() Emitter { return newTestEmitter() }} + + env1, err := cel.NewEnv( + cel.Lib(lib1), + cel.Variable("items", cel.ListType(cel.DynType)), + ) + if err != nil { + t.Fatalf("cel.NewEnv lib1: %v", err) + } + ast, issues := env1.Compile(`items.emit(x, x)`) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile: %v", issues.Err()) + } + + env2, err := cel.NewEnv( + cel.Lib(lib2), + cel.Variable("items", cel.ListType(cel.DynType)), + ) + if err != nil { + t.Fatalf("cel.NewEnv lib2: %v", err) + } + _, progErr := env2.Program(ast) + if progErr == nil { + t.Error("expected error when programming AST compiled by different lib instance") + } +} + +type testEmitter struct { + mu sync.Mutex + values []any + cursors []any + failAt int // -1 to never fail +} + +func newTestEmitter() *testEmitter { + return &testEmitter{failAt: -1} +} + +func (e *testEmitter) Emit(value, cursor any) error { + e.mu.Lock() + defer e.mu.Unlock() + if e.failAt >= 0 && len(e.values) >= e.failAt { + return errors.New("emit failed") + } + e.values = append(e.values, value) + e.cursors = append(e.cursors, cursor) + return nil +} + +func emitTestEnv(t *testing.T, emitter *testEmitter, extra ...cel.EnvOption) *cel.Env { + t.Helper() + base := []cel.EnvOption{ + Emit(func() Emitter { return emitter }), + JSON(nil), + Stream(), + cel.Variable("items", cel.ListType(cel.DynType)), + cel.Variable("data", cel.BytesType), + } + env, err := cel.NewEnv(append(base, extra...)...) + if err != nil { + t.Fatalf("cel.NewEnv: %v", err) + } + return env +} + +func emitProgram(t *testing.T, env *cel.Env, src string) cel.Program { + t.Helper() + ast, issues := env.Compile(src) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile %q: %v", src, issues.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("env.Program %q: %v", src, err) + } + return prg +} + +func emitEval(t *testing.T, prg cel.Program, vars map[string]any) ref.Val { + t.Helper() + out, _, err := prg.Eval(vars) + if err != nil { + t.Fatalf("eval: %v", err) + } + return out +} diff --git a/lib/json.go b/lib/json.go index fa08b32..968f739 100644 --- a/lib/json.go +++ b/lib/json.go @@ -23,11 +23,13 @@ import ( "errors" "fmt" "io" + "reflect" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/google/cel-go/cel" "github.com/google/cel-go/common/types" "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" ) // JSON returns a cel.EnvOption to configure extended functions for JSON @@ -105,6 +107,25 @@ import ( // // '{"a":1}{"b":2}'.decode_json_stream_string_numbers() // return [{"a":"1"}, {"b":"2"}] // b'{"a":1}{"b":2}'.decode_json_stream_string_numbers() // return [{"a":"1"}, {"b":"2"}] +// +// # Decode JSON Stream (Lazy) +// +// decode_json_stream_lazy returns a lazy iterable that decodes concatenated +// JSON values on demand from the receiver. Unlike decode_json_stream, values +// are not materialised into a list; each value is decoded when the iterator +// advances. This is useful for streaming large payloads through a +// comprehension without holding all decoded records in memory. +// +// The receiver may be bytes, string, or a stream value (from stream_gzip or +// stream_zip). When backed by a stream, decompression and decoding happen +// together with no intermediate buffer. +// +// .decode_json_stream_lazy() -> > +// .decode_json_stream_lazy() -> > +// .decode_json_stream_lazy() -> > +// +// decode_json_stream_lazy_string_numbers is the same but uses UseNumber +// decoding for integer precision beyond 2^52. func JSON(adapter types.Adapter) cel.EnvOption { if adapter == nil { adapter = types.DefaultTypeAdapter @@ -240,6 +261,48 @@ func (l jsonLib) CompileOptions() []cel.EnvOption { cel.UnaryBinding(catch(l.decodeJSONStreamUseNumber)), ), ), + + cel.Function("decode_json_stream_lazy", + cel.MemberOverload( + "stream_decode_json_stream_lazy", + []*cel.Type{streamCELType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazy)), + ), + cel.MemberOverload( + "bytes_decode_json_stream_lazy", + []*cel.Type{cel.BytesType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazy)), + ), + cel.MemberOverload( + "string_decode_json_stream_lazy", + []*cel.Type{cel.StringType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazy)), + ), + ), + + cel.Function("decode_json_stream_lazy_string_numbers", + cel.MemberOverload( + "stream_decode_json_stream_lazy_string_numbers", + []*cel.Type{streamCELType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazyUseNumber)), + ), + cel.MemberOverload( + "bytes_decode_json_stream_lazy_string_numbers", + []*cel.Type{cel.BytesType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazyUseNumber)), + ), + cel.MemberOverload( + "string_decode_json_stream_lazy_string_numbers", + []*cel.Type{cel.StringType}, + cel.DynType, + cel.UnaryBinding(catch(l.decodeJSONStreamLazyUseNumber)), + ), + ), } } @@ -388,3 +451,110 @@ func (l jsonLib) decodeJSONStreamUseNumber(val ref.Val) ref.Val { } return l.adapter.NativeToValue(s) } + +func (l jsonLib) decodeJSONStreamLazy(val ref.Val) ref.Val { + return l.lazyStream(val, false) +} + +func (l jsonLib) decodeJSONStreamLazyUseNumber(val ref.Val) ref.Val { + return l.lazyStream(val, true) +} + +func (l jsonLib) lazyStream(val ref.Val, useNum bool) ref.Val { + var r io.Reader + switch v := val.(type) { + case *streamVal: + r = v.reader + case types.Bytes: + r = bytes.NewReader(v) + case types.String: + r = bytes.NewReader([]byte(v)) + default: + return types.NoSuchOverloadErr() + } + return &lazyJSONStream{reader: r, adapter: l.adapter, useNum: useNum} +} + +var ( + _ ref.Val = (*lazyJSONStream)(nil) + _ traits.Iterable = (*lazyJSONStream)(nil) + _ traits.Iterator = (*jsonStreamIterator)(nil) +) + +// lazyJSONStreamRefType is the runtime type for lazy JSON stream iterables. +var lazyJSONStreamRefType = types.NewObjectType("lazy_json_stream", traits.IterableType) + +// lazyJSONStream is a ref.Val implementing traits.Iterable that decodes +// concatenated JSON values on demand from an io.Reader. Each call to the +// iterator's Next() decodes one value; previously decoded values are not +// retained. This enables streaming decode of large NDJSON or concatenated +// JSON payloads without materialising the full list. +// +// The underlying reader is consumed once. A second Iterator() call returns +// an exhausted iterator. +type lazyJSONStream struct { + reader io.Reader + adapter types.Adapter + useNum bool +} + +func (s *lazyJSONStream) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("lazy JSON streams cannot be converted to %v", typeDesc) +} + +func (s *lazyJSONStream) ConvertToType(typeVal ref.Type) ref.Val { + if typeVal == types.TypeType { + return types.NewTypeValue("lazy_json_stream") + } + return types.NewErr("type conversion error from 'lazy_json_stream' to '%s'", typeVal.TypeName()) +} + +func (s *lazyJSONStream) Equal(other ref.Val) ref.Val { + return types.NewErr("lazy JSON streams are not comparable") +} + +func (s *lazyJSONStream) Type() ref.Type { return lazyJSONStreamRefType } + +func (s *lazyJSONStream) Value() any { return s.reader } + +func (s *lazyJSONStream) Iterator() traits.Iterator { + dec := json.NewDecoder(s.reader) + if s.useNum { + dec.UseNumber() + } + return &jsonStreamIterator{dec: dec, adapter: s.adapter} +} + +// jsonStreamIterator wraps a json.Decoder as a traits.Iterator. +type jsonStreamIterator struct { + dec *json.Decoder + adapter types.Adapter +} + +func (it *jsonStreamIterator) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("JSON stream iterators cannot be converted to %v", typeDesc) +} + +func (it *jsonStreamIterator) ConvertToType(typeVal ref.Type) ref.Val { + return types.NewErr("type conversion error from 'json_stream_iterator' to '%s'", typeVal.TypeName()) +} + +func (it *jsonStreamIterator) Equal(other ref.Val) ref.Val { + return types.NewErr("JSON stream iterators are not comparable") +} + +func (it *jsonStreamIterator) Type() ref.Type { return types.IteratorType } + +func (it *jsonStreamIterator) Value() any { return it.dec } + +func (it *jsonStreamIterator) HasNext() ref.Val { + return types.Bool(it.dec.More()) +} + +func (it *jsonStreamIterator) Next() ref.Val { + var v any + if err := it.dec.Decode(&v); err != nil { + return types.NewErr("decode: %v", err) + } + return it.adapter.NativeToValue(v) +} diff --git a/lib/lines.go b/lib/lines.go new file mode 100644 index 0000000..ffe14cb --- /dev/null +++ b/lib/lines.go @@ -0,0 +1,183 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "bufio" + "bytes" + "fmt" + "io" + "reflect" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" +) + +// Lines returns a cel.EnvOption to configure the decode_lines function. +// +// # Decode Lines +// +// decode_lines returns a lazy iterable that yields one string per line from +// the receiver. Lines are split by newline; the trailing newline is stripped. +// The receiver may be bytes, string, or a stream value (from stream_gzip or +// stream_zip). +// +// .decode_lines() -> > +// .decode_lines() -> > +// .decode_lines() -> > +// +// Combined with string split functions and emit, this provides a composable +// way to stream delimited text formats: +// +// // TSV +// data.stream_gzip().decode_lines().emit(line, line.split("\t")) +// +// // Pipe-delimited +// data.decode_lines().emit(line, line.split("|")) +func Lines() cel.EnvOption { + return cel.Lib(linesLib{}) +} + +type linesLib struct{} + +func (linesLib) CompileOptions() []cel.EnvOption { + return []cel.EnvOption{ + cel.Function("decode_lines", + cel.MemberOverload( + "stream_decode_lines", + []*cel.Type{streamCELType}, + cel.DynType, + cel.UnaryBinding(catch(decodeLines)), + ), + cel.MemberOverload( + "bytes_decode_lines", + []*cel.Type{cel.BytesType}, + cel.DynType, + cel.UnaryBinding(catch(decodeLines)), + ), + cel.MemberOverload( + "string_decode_lines", + []*cel.Type{cel.StringType}, + cel.DynType, + cel.UnaryBinding(catch(decodeLines)), + ), + ), + } +} + +func (linesLib) ProgramOptions() []cel.ProgramOption { return nil } + +func decodeLines(val ref.Val) ref.Val { + var r io.Reader + switch v := val.(type) { + case *streamVal: + r = v.reader + case types.Bytes: + r = bytes.NewReader(v) + case types.String: + r = bytes.NewReader([]byte(v)) + default: + return types.NoSuchOverloadErr() + } + return &lazyLineStream{scanner: bufio.NewScanner(r)} +} + +var ( + _ ref.Val = (*lazyLineStream)(nil) + _ traits.Iterable = (*lazyLineStream)(nil) + _ traits.Iterator = (*lineIterator)(nil) +) + +var lazyLineStreamRefType = types.NewObjectType("lazy_line_stream", traits.IterableType) + +// lazyLineStream is a ref.Val implementing traits.Iterable that yields +// one string per line from a bufio.Scanner. The scanner is consumed once; +// a second Iterator() call returns an exhausted iterator. +type lazyLineStream struct { + scanner *bufio.Scanner +} + +func (s *lazyLineStream) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("lazy line streams cannot be converted to %v", typeDesc) +} + +func (s *lazyLineStream) ConvertToType(typeVal ref.Type) ref.Val { + if typeVal == types.TypeType { + return types.NewTypeValue("lazy_line_stream") + } + return types.NewErr("type conversion error from 'lazy_line_stream' to '%s'", typeVal.TypeName()) +} + +func (s *lazyLineStream) Equal(other ref.Val) ref.Val { + return types.NewErr("lazy line streams are not comparable") +} + +func (s *lazyLineStream) Type() ref.Type { return lazyLineStreamRefType } + +func (s *lazyLineStream) Value() any { return s.scanner } + +func (s *lazyLineStream) Iterator() traits.Iterator { + return &lineIterator{scanner: s.scanner} +} + +// lineIterator wraps a bufio.Scanner as a traits.Iterator. +type lineIterator struct { + scanner *bufio.Scanner + scanned bool + hasMore bool +} + +func (it *lineIterator) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("line iterators cannot be converted to %v", typeDesc) +} + +func (it *lineIterator) ConvertToType(typeVal ref.Type) ref.Val { + return types.NewErr("type conversion error from 'line_iterator' to '%s'", typeVal.TypeName()) +} + +func (it *lineIterator) Equal(other ref.Val) ref.Val { + return types.NewErr("line iterators are not comparable") +} + +func (it *lineIterator) Type() ref.Type { return types.IteratorType } + +func (it *lineIterator) Value() any { return it.scanner } + +func (it *lineIterator) HasNext() ref.Val { + if !it.scanned { + it.hasMore = it.scanner.Scan() + it.scanned = true + } + return types.Bool(it.hasMore) +} + +func (it *lineIterator) Next() ref.Val { + if !it.scanned { + it.hasMore = it.scanner.Scan() + } + it.scanned = false + if !it.hasMore { + if err := it.scanner.Err(); err != nil { + return types.NewErr("decode_lines: %v", err) + } + return types.NewErr("decode_lines: no more lines") + } + return types.String(it.scanner.Text()) +} diff --git a/lib/lines_test.go b/lib/lines_test.go new file mode 100644 index 0000000..a2cf8cc --- /dev/null +++ b/lib/lines_test.go @@ -0,0 +1,238 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "testing" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/traits" +) + +func TestDecodeLines_Bytes(t *testing.T) { + env, err := cel.NewEnv( + Lines(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_lines()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + out, _, err := prg.Eval(map[string]any{"data": []byte("hello\nworld\nfoo\n")}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + + want := []string{"hello", "world", "foo"} + for i, w := range want { + if it.HasNext() != types.True { + t.Fatalf("line %d: HasNext() = false, want true", i) + } + got := it.Next() + if got != types.String(w) { + t.Errorf("line %d: got %v, want %q", i, got, w) + } + } + if it.HasNext() != types.False { + t.Errorf("expected no more lines") + } +} + +func TestDecodeLines_String(t *testing.T) { + env, err := cel.NewEnv( + Lines(), + cel.Variable("data", cel.StringType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_lines()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + out, _, err := prg.Eval(map[string]any{"data": "a\tb\nc\td\n"}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + + want := []string{"a\tb", "c\td"} + for i, w := range want { + if it.HasNext() != types.True { + t.Fatalf("line %d: HasNext() = false, want true", i) + } + got := it.Next() + if got != types.String(w) { + t.Errorf("line %d: got %v, want %q", i, got, w) + } + } + if it.HasNext() != types.False { + t.Errorf("expected no more lines") + } +} + +func TestDecodeLines_Empty(t *testing.T) { + env, err := cel.NewEnv( + Lines(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_lines()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + out, _, err := prg.Eval(map[string]any{"data": []byte("")}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + if it.HasNext() != types.False { + t.Errorf("expected no lines for empty input") + } +} + +func TestDecodeLines_WithStream(t *testing.T) { + env, err := cel.NewEnv( + Lines(), + Stream(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.stream_gzip().decode_lines()`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + gz := gzipBytes(t, []byte("alpha\nbeta\n")) + + out, _, err := prg.Eval(map[string]any{"data": []byte(gz)}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected traits.Iterable, got %T", out) + } + it := iter.Iterator() + + want := []string{"alpha", "beta"} + for i, w := range want { + if it.HasNext() != types.True { + t.Fatalf("line %d: HasNext() = false, want true", i) + } + got := it.Next() + if got != types.String(w) { + t.Errorf("line %d: got %v, want %q", i, got, w) + } + } + if it.HasNext() != types.False { + t.Errorf("expected no more lines") + } +} + +func TestDecodeLines_WithEmit(t *testing.T) { + emitter := newTestEmitter() + env, err := cel.NewEnv( + Lines(), + Strings(), + Emit(func() Emitter { return emitter }), + cel.Variable("data", cel.StringType), + ) + if err != nil { + t.Fatalf("failed to create env: %v", err) + } + ast, iss := env.Compile(`data.decode_lines().emit(line, {"parts": line.split("\t")})`) + if iss.Err() != nil { + t.Fatalf("failed to compile: %v", iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("failed to program: %v", err) + } + + out, _, err := prg.Eval(map[string]any{"data": "a\t1\nb\t2\n"}) + if err != nil { + t.Fatalf("failed to eval: %v", err) + } + m, ok := out.(traits.Mapper) + if !ok { + t.Fatalf("expected traits.Mapper, got %T", out) + } + published := m.Get(types.String("published")) + if published != types.Int(2) { + t.Errorf("published = %v, want 2", published) + } + if len(emitter.values) != 2 { + t.Fatalf("emitter received %d values, want 2", len(emitter.values)) + } + row0, ok := emitter.values[0].(map[string]any) + if !ok { + t.Fatalf("row 0: got %T, want map[string]any", emitter.values[0]) + } + parts, ok := row0["parts"].([]any) + if !ok { + t.Fatalf("row 0 parts: got %T, want []any", row0["parts"]) + } + if len(parts) != 2 || parts[0] != "a" || parts[1] != "1" { + t.Errorf("row 0 parts = %v, want [a 1]", parts) + } +} diff --git a/lib/stream.go b/lib/stream.go new file mode 100644 index 0000000..4f5edb3 --- /dev/null +++ b/lib/stream.go @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "archive/zip" + "bytes" + "compress/gzip" + "fmt" + "io" + "reflect" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" +) + +// Stream returns a cel.EnvOption to configure stream producer functions. +// Stream producers wrap decompression readers around in-memory bytes, +// returning an opaque streamVal that can be passed to lazy decode functions +// (decode_json_stream_lazy, etc.) for streaming decompression and decoding. +// +// # stream_gzip +// +// stream_gzip returns a stream wrapping a gzip reader over the receiver bytes. +// The compressed bytes remain in memory; decompression happens on demand +// through a ~32 KB internal buffer. +// +// .stream_gzip() -> +// +// # stream_zip +// +// stream_zip returns a stream wrapping the decompression reader for the +// specified entry (by index) in a zip archive held in the receiver bytes. +// The zip archive must fit in memory since archive/zip requires random access. +// +// .stream_zip() -> +func Stream() cel.EnvOption { + return cel.Lib(streamLib{}) +} + +type streamLib struct{} + +func (streamLib) CompileOptions() []cel.EnvOption { + return []cel.EnvOption{ + cel.Function("stream_gzip", + cel.MemberOverload( + "bytes_stream_gzip", + []*cel.Type{cel.BytesType}, + streamCELType, + cel.UnaryBinding(catch(streamGzip)), + ), + ), + + cel.Function("stream_zip", + cel.MemberOverload( + "bytes_stream_zip_int", + []*cel.Type{cel.BytesType, cel.IntType}, + streamCELType, + cel.BinaryBinding(catch(streamZip)), + ), + ), + } +} + +func (streamLib) ProgramOptions() []cel.ProgramOption { return nil } + +// streamCELType is the compile-time CEL type for stream values. +var streamCELType = cel.ObjectType("stream") + +// streamRefType is the runtime ref.Type for stream values. +var streamRefType = types.NewObjectType("stream") + +var _ ref.Val = (*streamVal)(nil) + +// streamVal is an opaque ref.Val wrapping an io.Reader. It advertises no +// traits: it is not iterable, indexable, sizable, or comparable. The only +// useful thing to do with a stream is pass it to a consuming function such +// as decode_json_stream_lazy. +type streamVal struct { + reader io.Reader +} + +func (s *streamVal) ConvertToNative(typeDesc reflect.Type) (any, error) { + return nil, fmt.Errorf("stream values cannot be converted to %v", typeDesc) +} + +func (s *streamVal) ConvertToType(typeVal ref.Type) ref.Val { + if typeVal == types.TypeType { + return types.NewTypeValue("stream") + } + return types.NewErr("type conversion error from 'stream' to '%s'", typeVal.TypeName()) +} + +func (s *streamVal) Equal(other ref.Val) ref.Val { + return types.NewErr("streams are not comparable") +} + +func (s *streamVal) Type() ref.Type { return streamRefType } + +func (s *streamVal) Value() any { return s.reader } + +func streamGzip(val ref.Val) ref.Val { + b, ok := val.(types.Bytes) + if !ok { + return types.NoSuchOverloadErr() + } + r, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return types.NewErr("stream_gzip: %v", err) + } + return &streamVal{reader: r} +} + +func streamZip(data, index ref.Val) ref.Val { + b, ok := data.(types.Bytes) + if !ok { + return types.NoSuchOverloadErr() + } + idx, ok := index.(types.Int) + if !ok { + return types.NoSuchOverloadErr() + } + br := bytes.NewReader(b) + zr, err := zip.NewReader(br, br.Size()) + if err != nil { + return types.NewErr("stream_zip: %v", err) + } + i := int(idx) + if i < 0 || i >= len(zr.File) { + return types.NewErr("stream_zip: index %d out of range [0, %d)", i, len(zr.File)) + } + rc, err := zr.File[i].Open() + if err != nil { + return types.NewErr("stream_zip: %v", err) + } + return &streamVal{reader: rc} +} diff --git a/lib/stream_test.go b/lib/stream_test.go new file mode 100644 index 0000000..d0f76f9 --- /dev/null +++ b/lib/stream_test.go @@ -0,0 +1,209 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lib + +import ( + "bytes" + "compress/gzip" + "testing" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/common/types/traits" +) + +func TestStreamGzip_LazyDecode(t *testing.T) { + ndjson := "{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n" + compressed := gzipBytes(t, []byte(ndjson)) + + env, err := cel.NewEnv( + JSON(nil), + Stream(), + cel.Variable("body", cel.BytesType), + ) + if err != nil { + t.Fatalf("cel.NewEnv: %v", err) + } + ast, issues := env.Compile(`body.stream_gzip().decode_json_stream_lazy()`) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile: %v", issues.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("env.Program: %v", err) + } + out, _, err := prg.Eval(map[string]any{"body": []byte(compressed)}) + if err != nil { + t.Fatalf("eval: %v", err) + } + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected Iterable, got %T", out) + } + it := iter.Iterator() + var results []ref.Val + for it.HasNext() == types.True { + results = append(results, it.Next()) + } + if len(results) != 3 { + t.Fatalf("got %d results, want 3", len(results)) + } +} + +func TestStreamGzip_TypeProperties(t *testing.T) { + data := gzipBytes(t, []byte("hello")) + s := streamGzip(types.Bytes(data)) + sv, ok := s.(*streamVal) + if !ok { + t.Fatalf("expected *streamVal, got %T: %v", s, s) + } + if sv.Type().TypeName() != "stream" { + t.Errorf("TypeName = %q, want %q", sv.Type().TypeName(), "stream") + } + if !types.IsError(sv.Equal(sv)) { + t.Error("Equal should return an error for streams") + } +} + +func TestStreamZip_OutOfRange(t *testing.T) { + result := streamZip(types.Bytes{0x50, 0x4b}, types.Int(0)) + if !types.IsError(result) { + t.Fatalf("expected error for invalid zip, got %T", result) + } +} + +func TestLazyJSONStream_BytesFallback(t *testing.T) { + env, err := cel.NewEnv( + JSON(nil), + Stream(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("cel.NewEnv: %v", err) + } + ast, issues := env.Compile(`data.decode_json_stream_lazy()`) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile: %v", issues.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("env.Program: %v", err) + } + ndjson := []byte("{\"x\":1}{\"y\":2}") + out, _, err := prg.Eval(map[string]any{"data": ndjson}) + if err != nil { + t.Fatalf("eval: %v", err) + } + iter, ok := out.(traits.Iterable) + if !ok { + t.Fatalf("expected Iterable, got %T", out) + } + it := iter.Iterator() + count := 0 + for it.HasNext() == types.True { + it.Next() + count++ + } + if count != 2 { + t.Errorf("got %d elements, want 2", count) + } +} + +func TestLazyJSONStream_InComprehension(t *testing.T) { + env, err := cel.NewEnv( + JSON(nil), + Stream(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("cel.NewEnv: %v", err) + } + ast, issues := env.Compile(`data.decode_json_stream_lazy().map(x, x)`) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile: %v", issues.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("env.Program: %v", err) + } + ndjson := []byte("{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n") + out, _, err := prg.Eval(map[string]any{"data": ndjson}) + if err != nil { + t.Fatalf("eval: %v", err) + } + list, ok := out.(traits.Lister) + if !ok { + t.Fatalf("expected Lister, got %T", out) + } + if list.Size() != types.Int(3) { + t.Errorf("list size = %v, want 3", list.Size()) + } +} + +func TestLazyJSONStream_UseNumber(t *testing.T) { + env, err := cel.NewEnv( + JSON(nil), + Stream(), + cel.Variable("data", cel.BytesType), + ) + if err != nil { + t.Fatalf("cel.NewEnv: %v", err) + } + ast, issues := env.Compile(`data.decode_json_stream_lazy_string_numbers().map(x, x)`) + if issues != nil && issues.Err() != nil { + t.Fatalf("compile: %v", issues.Err()) + } + prg, err := env.Program(ast) + if err != nil { + t.Fatalf("env.Program: %v", err) + } + out, _, err := prg.Eval(map[string]any{"data": []byte(`{"n":9007199254740993}`)}) + if err != nil { + t.Fatalf("eval: %v", err) + } + list, ok := out.(traits.Lister) + if !ok { + t.Fatalf("expected Lister, got %T", out) + } + if list.Size() != types.Int(1) { + t.Fatalf("list size = %v, want 1", list.Size()) + } + elem := list.Get(types.Int(0)) + m, ok := elem.(traits.Mapper) + if !ok { + t.Fatalf("expected Mapper, got %T", elem) + } + n := m.Get(types.String("n")) + if n.Value() != "9007199254740993" { + t.Errorf("number = %v (%T), want string %q", n.Value(), n.Value(), "9007199254740993") + } +} + +func gzipBytes(t *testing.T, data []byte) []byte { + t.Helper() + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + if _, err := w.Write(data); err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + return buf.Bytes() +} diff --git a/lib/types.go b/lib/types.go index 7959c98..36544f7 100644 --- a/lib/types.go +++ b/lib/types.go @@ -91,6 +91,7 @@ var ( reflectMapStringAnyType = reflect.TypeOf(map[string]interface{}(nil)) reflectMapStringStringSliceType = reflect.TypeOf(map[string][]string(nil)) reflectStringType = reflect.TypeOf("") + reflectAnySliceType = reflect.TypeOf([]any(nil)) reflectStringSliceType = reflect.TypeOf([]string(nil)) structpbValueType = reflect.TypeOf((*structpb.Value)(nil)) diff --git a/mito.go b/mito.go index 23331d7..00ce997 100644 --- a/mito.go +++ b/mito.go @@ -208,6 +208,7 @@ func Main() int { return 2 } } + libMap["emit"] = lib.Emit(func() lib.Emitter { return stderrEmitter{} }) if *use == "all" { for _, l := range libMap { libs = append(libs, l) @@ -512,6 +513,9 @@ var ( "strings": lib.Strings(), "printf": lib.Printf(), "xml": nil, // This will be populated by Main. + "stream": lib.Stream(), + "csv": lib.CSV(), + "lines": lib.Lines(), } mimetypes = map[string]interface{}{ @@ -750,3 +754,15 @@ func oAuth2Client(cfg OAuth2) (*http.Client, error) { return nil, errors.New("oauth2: unknown provider") } } + +// stderrEmitter prints emitted events to stderr for the mito CLI. +type stderrEmitter struct{} + +func (stderrEmitter) Emit(value, cursor any) error { + if cursor != nil { + fmt.Fprintf(os.Stderr, "EMIT: %v (cursor: %v)\n", value, cursor) + } else { + fmt.Fprintf(os.Stderr, "EMIT: %v\n", value) + } + return nil +} diff --git a/testdata/csv_lazy.txt b/testdata/csv_lazy.txt new file mode 100644 index 0000000..6b9dd67 --- /dev/null +++ b/testdata/csv_lazy.txt @@ -0,0 +1,21 @@ +mito -use csv src.cel +! stderr . +cmp stdout want.txt + +-- src.cel -- +b''' +name,age +alice,30 +bob,25 +'''.decode_csv_stream_lazy().map(row, row) +-- want.txt -- +[ + { + "age": "30", + "name": "alice" + }, + { + "age": "25", + "name": "bob" + } +] diff --git a/testdata/csv_lazy_no_header.txt b/testdata/csv_lazy_no_header.txt new file mode 100644 index 0000000..8f18df9 --- /dev/null +++ b/testdata/csv_lazy_no_header.txt @@ -0,0 +1,20 @@ +mito -use csv src.cel +! stderr . +cmp stdout want.txt + +-- src.cel -- +b''' +alice,30 +bob,25 +'''.decode_csv_stream_lazy_no_header().map(row, row) +-- want.txt -- +[ + [ + "alice", + "30" + ], + [ + "bob", + "25" + ] +] diff --git a/testdata/emit.txt b/testdata/emit.txt new file mode 100644 index 0000000..064b1b3 --- /dev/null +++ b/testdata/emit.txt @@ -0,0 +1,33 @@ +mito -use json,collections,emit src.cel +cmp stdout want.txt +cmp stderr want_stderr.txt + +-- src.cel -- +[ + // Two-arg form: no cursor. + [1, 2, 3].emit(x, {"value": x}), + + // Three-arg form: with per-element cursor. + [ + {"message": "hello", "id": 1}, + {"message": "world", "id": 2}, + ].emit(item, {"message": item.message}, {"id": item.id}), +] +-- want_stderr.txt -- +EMIT: map[value:1] +EMIT: map[value:2] +EMIT: map[value:3] +EMIT: map[message:hello] (cursor: map[id:1]) +EMIT: map[message:world] (cursor: map[id:2]) +-- want.txt -- +[ + { + "published": 3 + }, + { + "cursor": { + "id": 2 + }, + "published": 2 + } +] diff --git a/testdata/lines.txt b/testdata/lines.txt new file mode 100644 index 0000000..a386146 --- /dev/null +++ b/testdata/lines.txt @@ -0,0 +1,21 @@ +mito -use lines,strings src.cel +! stderr . +cmp stdout want.txt + +-- src.cel -- +"a\t1\nb\t2\nc\t3\n".decode_lines().map(line, line.split("\t")) +-- want.txt -- +[ + [ + "a", + "1" + ], + [ + "b", + "2" + ], + [ + "c", + "3" + ] +]