Skip to content

refactor: trim typed API to edge-wrapper shape only#179

Closed
caldempsey wants to merge 21 commits intoapache:masterfrom
datalake-go:refactor/typed-api-audit
Closed

refactor: trim typed API to edge-wrapper shape only#179
caldempsey wants to merge 21 commits intoapache:masterfrom
datalake-go:refactor/typed-api-audit

Conversation

@caldempsey
Copy link
Copy Markdown

Summary

The typed surface on the fork had drifted past its charter. This PR trims `DataFrameOf[T]` back to the edge-typing wrapper role the design contract pins (lakeorm/TYPING.md): transformations stay on the untyped DataFrame; the type parameter participates only at materialisation.

Final typed API

  • Type: `DataFrameOf[T any]`
  • Constructor: `AsT (*DataFrameOf[T], error)`
  • Free fns: `Collect[T](ctx, df)`, `Stream[T](ctx, df)`, `First[T](ctx, df)`
  • Methods on `*DataFrameOf[T]`: `.DataFrame()` (unwrap), `.Collect(ctx)`, `.Stream(ctx)`, `.First(ctx)`

Removed

Removed Was
`(*DataFrameOf[T]).Where / Limit / OrderBy` typed transformations — rule violation
`Dataset[T]` type alias redundant with `DataFrameOf[T]`
`SqlAs[T]`, `TableAs[T]` session construction helpers; use `session.Sql/Table` + `As[T]`
`SqlTyped[T]` deprecated alias for removed `SqlAs`
`TypedDataFrame[T]` merged into `As[T]`
`Into` non-generic scanner; off-charter
`datasetOp`, `ops`, `resolveDataFrame`, `clone` internals existed only to support removed transformation methods

Migration

  • `SqlAs[User](ctx, session, q)` → `df, _ := session.Sql(ctx, q); AsUser`
  • `TableAs[User](ctx, session, "users")` → `df, _ := session.Table("users"); AsUser`
  • `ds.Where(...).Collect(ctx)` → `df2, _ := df.Where(ctx, ...); Collect[T](ctx, df2)`

Test plan

  • `go build ./...` clean
  • `go test ./spark/sql/...` green; existing `dataframe_typed_test.go` tests unaffected, new `TestAs_RejectsNonStruct` + `TestFirst_RejectsNonStructT` cover the refactored constructor paths
  • `gofumpt` clean on all touched files

🤖 Generated with Claude Code

caldempsey and others added 21 commits July 13, 2025 01:12
…atabricks/Spark signal done processing rows)
…-go (#1)

Declares the module as github.com/caldempsey/spark-connect-go,
drops the /v40 suffix, and updates every import in the tree to
match. Consumers that want the fork's in-flight fixes no longer
need a replace directive in their go.mod — go get against the
caldempsey path resolves directly.

Rebases against upstream stay mechanical: merge upstream in, sed
module paths, resolve in the usual spots.
The fork's work lives on main, but the build workflow only triggers
on push to master. Point the push filter at main so merges to the
fork's default branch actually get CI signal. The pull_request
trigger already accepts any base branch and doesn't need to change.
* fix/8: fall back to archive.apache.org when dlcdn 404s

dlcdn.apache.org only mirrors current releases and rotates older
ones off without warning, which blocks every CI run behind a
transient 404. archive.apache.org is the canonical Apache mirror
and never rotates. Try dlcdn first for speed, fall back to archive
on failure.

The cache is keyed on Spark + Hadoop version, so this penalty is
paid at most once per cache eviction.

Closes #8.

* fix/8: run gofumpt on files whose imports shifted in the rename

golangci-lint's gofumpt check flagged three files where the import
block got out of shape: stdlib imports mixed with third-party,
missing blank-line group separator. Let gofumpt produce the
canonical layout.
Four fmt.Print calls sat in the response-stream receive loop printing
raw protocol chatter (response type, EOF flag, ResultComplete) to
the host process's stdout on every query. For a library, stdout
belongs to the host — downstream services should decide what lands
there. Delete the prints; add a test that captures stdout during a
complete ExecutePlan round-trip and asserts nothing was written to
pin the regression.

The fmt.Println in dataFrame.Show is intentional (user-requested
console output via ResultCollector) and stays put.

Closes #2.
…rs (#11)

BaseBuilder.WithDialOptions lets callers append arbitrary
grpc.DialOption values — per-call message ceilings, keepalive
profiles, interceptors — anything Spark Connect doesn't expose as a
server conf. SparkSessionBuilder.WithDialOptions forwards the same
knob through the default builder path.

Also bumps the default per-call send/receive ceiling from gRPC's
4 MiB to 1 GiB to match the server's typical upper bound
(spark.connect.grpc.maxInboundMessageSize). Without that bump, a
single Arrow batch truncates silently on non-trivial queries with
a ResourceExhausted error buried in Collect() or StreamRows().

Caller-supplied DialOptions are appended after the builder's
defaults so a tighter MaxCallRecvMsgSize will override the default.

Closes #3.
DataFrameOf[T] wraps a DataFrame with a cached, reflected row plan
and decodes each row directly into T on Collect. Users tag fields
with `spark:"colname"` or leave them bare to fall back on
snake_case of the Go field name; schema drift (struct field not in
the result projection) surfaces at first Collect, not per row.

Two entry points:

    SqlTyped[User](ctx, session, "SELECT id, email FROM users") *DataFrameOf[User]
    TypedDataFrame[User](df)                                     *DataFrameOf[User]

Collect returns []T. DataFrame() drops back to the untyped surface
for operations the typed layer doesn't cover — GroupBy, joins,
window functions.

Streaming (iter.Seq2 over T) is intentionally out of scope here —
needs dedicated ExecutePlanClient plumbing and a matching test
matrix, lands in a follow-up. Users with large result sets drop to
DataFrame().ToLocalIterator() or the streaming primitive in the
untyped DataFrame.

Closes #4.
)

The paragraph described the Apache PMC's stance on the upstream client,
not this fork's. Carrying it on a fork that downstream projects ship
against was misleading on both provenance and intent.
* feat: top-level typed helpers Collect/Stream/First/As/Into + Dataset[T] alias

Adds the five top-level typed helpers the bootstrap spec names:

- Collect[T](ctx, df) ([]T, error)
- Stream[T](ctx, df) iter.Seq2[T, error]
- First[T](ctx, df) (*T, error) / ErrNotFound sentinel
- As[T](df) (*DataFrameOf[T], error)
- Into(ctx, df, dst any) error   // non-generic, slice or struct dst

Also a `Dataset[T]` type alias for DataFrameOf[T] to match the
Scala / Java naming and Apache Spark's Dataset[T] precedent.
DataFrameOf[T] stays as the original name and is fully
interchangeable via the alias.

Stream[T] is the real new capability: the existing DataFrameOf[T]
deliberately omits streaming (documented in dataframe_typed.go). The
new helper wraps the untyped DataFrame.All iterator and decodes each
row in place, yielding constant memory regardless of result size.

Into covers the non-generic path where T isn't known at compile time
— typical of code-gen consumers or reflection-heavy DSLs. Uses a
non-generic decodeRowReflect sibling to decodeRow so already-typed
slice slots populate via reflect.Value rather than forcing a T
instantiation.

Tests cover the guard paths (Into rejects non-pointer / nil pointer
/ non-slice-non-struct / slice-of-non-struct; As rejects non-struct
T; Collect rejects non-struct T; Dataset[T] alias identity at
compile time; ErrNotFound sentinel propagates through errors.Is).
Full DataFrame-backed coverage for Collect / Stream / First lands
with the integration suite (no mockDataFrame exists today; the thin
wrappers call already-tested primitives).

* chore: bump go.mod to 1.24 for generic type aliases

The new Dataset[T] = DataFrameOf[T] alias is a generic type alias,
which requires Go 1.24 (GA) or GOEXPERIMENT=aliastypeparams on
older toolchains. CI was pinning 1.23.2 via the go-version-file in
.github/workflows/build.yml and failing on the typed-helpers
compile step.

Bump the module's declared Go version to 1.24. The downstream
datalakeorm/dorm module already uses go 1.24.9, so nothing
downstream regresses; CI workers on 1.24+ pick it up automatically.

---------

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
)

The org rename moved the fork from caldempsey/spark-connect-go to
datalakego/spark-connect-go. Update the module declaration in go.mod
and sweep all import paths across the tree.

43 files touched (Go sources, tests, mocks, Makefile-less module
references, CI). Sanity-checked afterwards:
  grep -rn "caldempsey/spark-connect-go" .
returns zero hits.

Tests unchanged; they pass green under the new module path:
  ok  github.com/datalakego/spark-connect-go/spark/sql   1.702s

Consumers who depend on this module update their go.mod lines to
the new path; a redirect from the old GitHub URL handles clone and
checkout automatically.

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
Adds the five chainable methods the Scala / Java Dataset[T] API
advertises but DataFrameOf[T] didn't expose. Completes the
"typed DataFrame is a first-class surface" story the fork promised.

Surface:

    func (d *DataFrameOf[T]) Where(sql string, args ...any) *DataFrameOf[T]
    func (d *DataFrameOf[T]) Limit(n int) *DataFrameOf[T]
    func (d *DataFrameOf[T]) OrderBy(columns ...string) *DataFrameOf[T]
    func (d *DataFrameOf[T]) First(ctx context.Context) (*T, error)
    func (d *DataFrameOf[T]) Stream(ctx context.Context) iter.Seq2[T, error]

Where / Limit / OrderBy are lazy: each queues a datasetOp on a
cloned DataFrameOf so chains don't mutate shared state. Ops are
applied in declaration order by resolveDataFrame when Collect /
Stream / First materialises.

args on Where is accepted for compatibility with dorm.Query's
signature but currently forwarded nowhere — the underlying
DataFrame.Where takes a bare string. Callers interpolate with
fmt.Sprintf or build predicates via the functions package.

First returns ErrNotFound on empty (sentinel already in
typed_helpers.go from the previous cut). Stream uses DataFrame.All
under the hood, honouring any queued ops via resolveDataFrame.

Collect rewired through resolveDataFrame so queued ops apply
correctly there too. Existing tests still pass (no behavioural
change for a DataFrameOf constructed without chained ops).

Tests: chainable shape (Where / Limit / OrderBy each queue one op;
chain of three queues three), clone isolation (parent.ops unchanged
when child adds more), resolve-with-no-ops returns the underlying
DataFrame untouched via a sentinel fake.

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
Cluster warm-up is a generic Spark Connect concern — any cluster
on any platform can be in a Pending state during cold start. This
was sitting in datalakego/dorm; move the detection + sentinel to
the fork where every Spark Connect consumer benefits, not just
dorm.

Surface (package sparkerrors):

    var ErrClusterNotReady = errors.New("sparkerrors: cluster not ready")

    type ClusterNotReady struct {
        State     string
        RequestID string
        Message   string
        Cause     error
    }

    func (e *ClusterNotReady) Error() string
    func (e *ClusterNotReady) Unwrap() error
    func (e *ClusterNotReady) Is(target error) bool    // matches ErrClusterNotReady
    func (e *ClusterNotReady) IsRetryable() bool       // always true

    func IsClusterNotReady(err error) bool             // errors.As convenience
    func NewClusterNotReady(err error) *ClusterNotReady

NewClusterNotReady inspects err for the canonical
[FailedPrecondition] + "state Pending" pattern and returns a typed
ClusterNotReady if it matches, else nil. The string-matching looks
fragile but is the exact detection pattern that's held across
multiple Databricks runtime versions and is the shape self-managed
Spark clusters emit too.

Retry-loop wiring (spark/client/retry.go — the fork's existing
retry code) lands in a follow-up PR alongside OpenSession +
SessionOption (Feature 1c). This PR just lands the types so the
retry code has something typed to inspect.

Tests cover: canonical Databricks pattern detection (State and
RequestID extracted correctly; Cause preserved); rejection on nil /
unrelated errors / "[FailedPrecondition]" without state Pending;
IsClusterNotReady matches typed, %%w-wrapped typed, and %%w-wrapped
sentinel; typed.Is(ErrClusterNotReady) holds for errors.Is callers;
Unwrap exposes the original cause.

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
* feat: SqlAs[T] + TableAs[T] free functions; SqlTyped deprecated

Rename SqlTyped -> SqlAs to match the Scala / Java precedent (and
the bootstrap spec's nomenclature) and add TableAs as the table-
addressed sibling.

    func SqlAs[T any](ctx, session, query) (*DataFrameOf[T], error)
    func TableAs[T any](ctx, session, name) (*DataFrameOf[T], error)

Both are free functions rather than methods on SparkSession because
Go doesn't allow type parameters on interface methods; the session
is passed as an explicit second argument. Both wrap the untyped
path (session.Sql / session.Table) and pass the result through
TypedDataFrame[T] for plan-building, so the row plan is computed
exactly once per call.

SqlTyped is retained as a deprecated alias that simply delegates to
SqlAs, with a //Deprecated comment. No external caller exists today
— the fork just landed — but keeping the alias costs nothing and
makes the rename zero-friction for any in-flight private usage.

Tests: SqlAs forwards the query verbatim and returns a populated
Dataset with a cached plan; SqlAs propagates errors from session.Sql
unchanged; SqlAs rejects non-struct T with the established
"must be a struct" message; TableAs mirrors these for session.Table;
SqlTyped forwards identically to SqlAs.

* chore: gofumpt sqlas_test.go stubSession alignment

---------

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
* feat: database/sql driver over Spark Connect (spark/sql/driver)

Every Go tool that speaks database/sql — goose, sqlc, pgx consumers,
ad-hoc test harnesses — can now target a Spark-backed lakehouse
without learning the native client API. Strictly additive to the
fork; zero changes to existing DataFrame surface.

Usage:

    import (
        "database/sql"
        _ "github.com/datalakego/spark-connect-go/spark/sql/driver"
    )

    db, err := sql.Open("spark", "sc://localhost:15002?format=iceberg")

DSN grammar:
  sc://host:port                            plain
  sc://host:port?token=<bearer>             bearer-token auth
  sc://host:port?format=iceberg|delta       format passthrough for
                                            dialect-aware consumers
                                            (goose-spark reads this)

Driver surface (all stdlib database/sql/driver interfaces):

  driver.Driver            Open(dsn) / OpenConnector (modern ctx path)
  driver.Connector         Connect(ctx) / Driver()
  driver.Conn              Prepare / PrepareContext / Close / Begin /
                           BeginTx / ExecContext / QueryContext /
                           Ping
  driver.Stmt              Close / NumInput / Exec / Query /
                           ExecContext / QueryContext
  driver.Rows              Columns / Close / Next (iterates a
                           pre-Collect'd DataFrame)
  driver.Result            RowsAffected=-1 (unknown),
                           LastInsertId errors (no auto-increment
                           in lakehouse)
  driver.Tx                Commit = no-op, Rollback = error
                           (lakehouse commit semantics are per-
                           statement inside the table format; a SQL-
                           layer Tx doesn't buy atomicity here)

v0 scope is what goose needs: CREATE / INSERT / SELECT against a
Spark Connect endpoint. Specifically:

  - NumInput() = 0. Parameter binding is not supported; callers that
    pass args see errArgsUnsupported. Migrations author literal SQL
    so this limitation is not felt; v1+ wires parameter binding via
    the Spark Connect parameterised-query proto.
  - No prepared-statement caching; every Exec/Query re-sends the
    statement. Right-sized for goose's request rate; v1+ adds a
    server-side plan cache if latency-sensitive callers appear.
  - Rows wraps a materialised DataFrame.Collect slice. Suits small
    reads (the SELECTs goose fires against the version table).
    Larger scans should bypass database/sql entirely and use the
    native DataFrame / iter.Seq2 path.

Tests cover: DSN parsing (plain, with token, with format, case
normalisation, missing scheme, empty, unknown format); sql.Register
side-effect lands under the name "spark"; OpenConnector rejects bad
DSNs; Stmt rejects args in both Exec + Query paths; Rows advances
through a fake result and returns io.EOF on exhaustion; Result
returns -1 for RowsAffected and errors for LastInsertId; Tx commits
silently and errors on Rollback.

    ok  github.com/datalakego/spark-connect-go/spark/sql/driver  0.606s

Strictly additive: zero changes to existing files under spark/sql/.
Diff against upstream apache/spark-connect-go master is additions
only.

* chore: gofumpt alignment on rowFake methods in driver_test.go

---------

Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
The typed surface on the fork drifted past its charter. DataFrameOf[T]
had Where / Limit / OrderBy transformation methods, and the package
exposed SqlAs / TableAs / SqlTyped / TypedDataFrame / Into on top of
As. The design contract (lakeorm/TYPING.md) is that typed wrappers
are edge-only — transformations stay on the untyped DataFrame; the
type parameter participates only at materialisation.

Trim to exactly the promised surface:

  - Type:     DataFrameOf[T any]
  - New:      As[T](df) (*DataFrameOf[T], error)       -- constructor
  - Keep:     Collect[T](ctx, df) ([]T, error)         -- free fn
  - Keep:     Stream[T](ctx, df) iter.Seq2[T, error]   -- free fn
  - Keep:     First[T](ctx, df) (*T, error)            -- free fn
  - Methods:  (*DataFrameOf[T]).{DataFrame, Collect, Stream, First}

Removed: .Where / .Limit / .OrderBy methods, Dataset[T] alias,
SqlAs / TableAs / SqlTyped / TypedDataFrame free functions, Into
non-generic scanner, and the internal ops/datasetOp/resolveDataFrame/
clone machinery that existed only to support the removed
transformation methods.

Callers that were using SqlAs / TableAs move to session.Sql /
session.Table + As[T]. Callers that were using .Where / .Limit /
.OrderBy move to untyped DataFrame.Where / Limit / Sort + re-typing
via As[T] at the materialisation edge.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@caldempsey
Copy link
Copy Markdown
Author

Sorry, opened against wrong remote — this is for the datalakego fork, not upstream. Closing and moving.

@caldempsey caldempsey closed this Apr 19, 2026
@caldempsey caldempsey deleted the refactor/typed-api-audit branch April 19, 2026 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant