refactor: trim typed API to edge-wrapper shape only#179
Closed
caldempsey wants to merge 21 commits intoapache:masterfrom
Closed
refactor: trim typed API to edge-wrapper shape only#179caldempsey wants to merge 21 commits intoapache:masterfrom
caldempsey wants to merge 21 commits intoapache:masterfrom
Conversation
…atabricks/Spark signal done processing rows)
…-go (#1) Declares the module as github.com/caldempsey/spark-connect-go, drops the /v40 suffix, and updates every import in the tree to match. Consumers that want the fork's in-flight fixes no longer need a replace directive in their go.mod — go get against the caldempsey path resolves directly. Rebases against upstream stay mechanical: merge upstream in, sed module paths, resolve in the usual spots.
The fork's work lives on main, but the build workflow only triggers on push to master. Point the push filter at main so merges to the fork's default branch actually get CI signal. The pull_request trigger already accepts any base branch and doesn't need to change.
* fix/8: fall back to archive.apache.org when dlcdn 404s dlcdn.apache.org only mirrors current releases and rotates older ones off without warning, which blocks every CI run behind a transient 404. archive.apache.org is the canonical Apache mirror and never rotates. Try dlcdn first for speed, fall back to archive on failure. The cache is keyed on Spark + Hadoop version, so this penalty is paid at most once per cache eviction. Closes #8. * fix/8: run gofumpt on files whose imports shifted in the rename golangci-lint's gofumpt check flagged three files where the import block got out of shape: stdlib imports mixed with third-party, missing blank-line group separator. Let gofumpt produce the canonical layout.
Four fmt.Print calls sat in the response-stream receive loop printing raw protocol chatter (response type, EOF flag, ResultComplete) to the host process's stdout on every query. For a library, stdout belongs to the host — downstream services should decide what lands there. Delete the prints; add a test that captures stdout during a complete ExecutePlan round-trip and asserts nothing was written to pin the regression. The fmt.Println in dataFrame.Show is intentional (user-requested console output via ResultCollector) and stays put. Closes #2.
…rs (#11) BaseBuilder.WithDialOptions lets callers append arbitrary grpc.DialOption values — per-call message ceilings, keepalive profiles, interceptors — anything Spark Connect doesn't expose as a server conf. SparkSessionBuilder.WithDialOptions forwards the same knob through the default builder path. Also bumps the default per-call send/receive ceiling from gRPC's 4 MiB to 1 GiB to match the server's typical upper bound (spark.connect.grpc.maxInboundMessageSize). Without that bump, a single Arrow batch truncates silently on non-trivial queries with a ResourceExhausted error buried in Collect() or StreamRows(). Caller-supplied DialOptions are appended after the builder's defaults so a tighter MaxCallRecvMsgSize will override the default. Closes #3.
DataFrameOf[T] wraps a DataFrame with a cached, reflected row plan
and decodes each row directly into T on Collect. Users tag fields
with `spark:"colname"` or leave them bare to fall back on
snake_case of the Go field name; schema drift (struct field not in
the result projection) surfaces at first Collect, not per row.
Two entry points:
SqlTyped[User](ctx, session, "SELECT id, email FROM users") *DataFrameOf[User]
TypedDataFrame[User](df) *DataFrameOf[User]
Collect returns []T. DataFrame() drops back to the untyped surface
for operations the typed layer doesn't cover — GroupBy, joins,
window functions.
Streaming (iter.Seq2 over T) is intentionally out of scope here —
needs dedicated ExecutePlanClient plumbing and a matching test
matrix, lands in a follow-up. Users with large result sets drop to
DataFrame().ToLocalIterator() or the streaming primitive in the
untyped DataFrame.
Closes #4.
* feat: top-level typed helpers Collect/Stream/First/As/Into + Dataset[T] alias Adds the five top-level typed helpers the bootstrap spec names: - Collect[T](ctx, df) ([]T, error) - Stream[T](ctx, df) iter.Seq2[T, error] - First[T](ctx, df) (*T, error) / ErrNotFound sentinel - As[T](df) (*DataFrameOf[T], error) - Into(ctx, df, dst any) error // non-generic, slice or struct dst Also a `Dataset[T]` type alias for DataFrameOf[T] to match the Scala / Java naming and Apache Spark's Dataset[T] precedent. DataFrameOf[T] stays as the original name and is fully interchangeable via the alias. Stream[T] is the real new capability: the existing DataFrameOf[T] deliberately omits streaming (documented in dataframe_typed.go). The new helper wraps the untyped DataFrame.All iterator and decodes each row in place, yielding constant memory regardless of result size. Into covers the non-generic path where T isn't known at compile time — typical of code-gen consumers or reflection-heavy DSLs. Uses a non-generic decodeRowReflect sibling to decodeRow so already-typed slice slots populate via reflect.Value rather than forcing a T instantiation. Tests cover the guard paths (Into rejects non-pointer / nil pointer / non-slice-non-struct / slice-of-non-struct; As rejects non-struct T; Collect rejects non-struct T; Dataset[T] alias identity at compile time; ErrNotFound sentinel propagates through errors.Is). Full DataFrame-backed coverage for Collect / Stream / First lands with the integration suite (no mockDataFrame exists today; the thin wrappers call already-tested primitives). * chore: bump go.mod to 1.24 for generic type aliases The new Dataset[T] = DataFrameOf[T] alias is a generic type alias, which requires Go 1.24 (GA) or GOEXPERIMENT=aliastypeparams on older toolchains. CI was pinning 1.23.2 via the go-version-file in .github/workflows/build.yml and failing on the typed-helpers compile step. Bump the module's declared Go version to 1.24. The downstream datalakeorm/dorm module already uses go 1.24.9, so nothing downstream regresses; CI workers on 1.24+ pick it up automatically. --------- Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
) The org rename moved the fork from caldempsey/spark-connect-go to datalakego/spark-connect-go. Update the module declaration in go.mod and sweep all import paths across the tree. 43 files touched (Go sources, tests, mocks, Makefile-less module references, CI). Sanity-checked afterwards: grep -rn "caldempsey/spark-connect-go" . returns zero hits. Tests unchanged; they pass green under the new module path: ok github.com/datalakego/spark-connect-go/spark/sql 1.702s Consumers who depend on this module update their go.mod lines to the new path; a redirect from the old GitHub URL handles clone and checkout automatically. Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
Adds the five chainable methods the Scala / Java Dataset[T] API
advertises but DataFrameOf[T] didn't expose. Completes the
"typed DataFrame is a first-class surface" story the fork promised.
Surface:
func (d *DataFrameOf[T]) Where(sql string, args ...any) *DataFrameOf[T]
func (d *DataFrameOf[T]) Limit(n int) *DataFrameOf[T]
func (d *DataFrameOf[T]) OrderBy(columns ...string) *DataFrameOf[T]
func (d *DataFrameOf[T]) First(ctx context.Context) (*T, error)
func (d *DataFrameOf[T]) Stream(ctx context.Context) iter.Seq2[T, error]
Where / Limit / OrderBy are lazy: each queues a datasetOp on a
cloned DataFrameOf so chains don't mutate shared state. Ops are
applied in declaration order by resolveDataFrame when Collect /
Stream / First materialises.
args on Where is accepted for compatibility with dorm.Query's
signature but currently forwarded nowhere — the underlying
DataFrame.Where takes a bare string. Callers interpolate with
fmt.Sprintf or build predicates via the functions package.
First returns ErrNotFound on empty (sentinel already in
typed_helpers.go from the previous cut). Stream uses DataFrame.All
under the hood, honouring any queued ops via resolveDataFrame.
Collect rewired through resolveDataFrame so queued ops apply
correctly there too. Existing tests still pass (no behavioural
change for a DataFrameOf constructed without chained ops).
Tests: chainable shape (Where / Limit / OrderBy each queue one op;
chain of three queues three), clone isolation (parent.ops unchanged
when child adds more), resolve-with-no-ops returns the underlying
DataFrame untouched via a sentinel fake.
Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
Cluster warm-up is a generic Spark Connect concern — any cluster
on any platform can be in a Pending state during cold start. This
was sitting in datalakego/dorm; move the detection + sentinel to
the fork where every Spark Connect consumer benefits, not just
dorm.
Surface (package sparkerrors):
var ErrClusterNotReady = errors.New("sparkerrors: cluster not ready")
type ClusterNotReady struct {
State string
RequestID string
Message string
Cause error
}
func (e *ClusterNotReady) Error() string
func (e *ClusterNotReady) Unwrap() error
func (e *ClusterNotReady) Is(target error) bool // matches ErrClusterNotReady
func (e *ClusterNotReady) IsRetryable() bool // always true
func IsClusterNotReady(err error) bool // errors.As convenience
func NewClusterNotReady(err error) *ClusterNotReady
NewClusterNotReady inspects err for the canonical
[FailedPrecondition] + "state Pending" pattern and returns a typed
ClusterNotReady if it matches, else nil. The string-matching looks
fragile but is the exact detection pattern that's held across
multiple Databricks runtime versions and is the shape self-managed
Spark clusters emit too.
Retry-loop wiring (spark/client/retry.go — the fork's existing
retry code) lands in a follow-up PR alongside OpenSession +
SessionOption (Feature 1c). This PR just lands the types so the
retry code has something typed to inspect.
Tests cover: canonical Databricks pattern detection (State and
RequestID extracted correctly; Cause preserved); rejection on nil /
unrelated errors / "[FailedPrecondition]" without state Pending;
IsClusterNotReady matches typed, %%w-wrapped typed, and %%w-wrapped
sentinel; typed.Is(ErrClusterNotReady) holds for errors.Is callers;
Unwrap exposes the original cause.
Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
* feat: SqlAs[T] + TableAs[T] free functions; SqlTyped deprecated
Rename SqlTyped -> SqlAs to match the Scala / Java precedent (and
the bootstrap spec's nomenclature) and add TableAs as the table-
addressed sibling.
func SqlAs[T any](ctx, session, query) (*DataFrameOf[T], error)
func TableAs[T any](ctx, session, name) (*DataFrameOf[T], error)
Both are free functions rather than methods on SparkSession because
Go doesn't allow type parameters on interface methods; the session
is passed as an explicit second argument. Both wrap the untyped
path (session.Sql / session.Table) and pass the result through
TypedDataFrame[T] for plan-building, so the row plan is computed
exactly once per call.
SqlTyped is retained as a deprecated alias that simply delegates to
SqlAs, with a //Deprecated comment. No external caller exists today
— the fork just landed — but keeping the alias costs nothing and
makes the rename zero-friction for any in-flight private usage.
Tests: SqlAs forwards the query verbatim and returns a populated
Dataset with a cached plan; SqlAs propagates errors from session.Sql
unchanged; SqlAs rejects non-struct T with the established
"must be a struct" message; TableAs mirrors these for session.Table;
SqlTyped forwards identically to SqlAs.
* chore: gofumpt sqlas_test.go stubSession alignment
---------
Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
* feat: database/sql driver over Spark Connect (spark/sql/driver)
Every Go tool that speaks database/sql — goose, sqlc, pgx consumers,
ad-hoc test harnesses — can now target a Spark-backed lakehouse
without learning the native client API. Strictly additive to the
fork; zero changes to existing DataFrame surface.
Usage:
import (
"database/sql"
_ "github.com/datalakego/spark-connect-go/spark/sql/driver"
)
db, err := sql.Open("spark", "sc://localhost:15002?format=iceberg")
DSN grammar:
sc://host:port plain
sc://host:port?token=<bearer> bearer-token auth
sc://host:port?format=iceberg|delta format passthrough for
dialect-aware consumers
(goose-spark reads this)
Driver surface (all stdlib database/sql/driver interfaces):
driver.Driver Open(dsn) / OpenConnector (modern ctx path)
driver.Connector Connect(ctx) / Driver()
driver.Conn Prepare / PrepareContext / Close / Begin /
BeginTx / ExecContext / QueryContext /
Ping
driver.Stmt Close / NumInput / Exec / Query /
ExecContext / QueryContext
driver.Rows Columns / Close / Next (iterates a
pre-Collect'd DataFrame)
driver.Result RowsAffected=-1 (unknown),
LastInsertId errors (no auto-increment
in lakehouse)
driver.Tx Commit = no-op, Rollback = error
(lakehouse commit semantics are per-
statement inside the table format; a SQL-
layer Tx doesn't buy atomicity here)
v0 scope is what goose needs: CREATE / INSERT / SELECT against a
Spark Connect endpoint. Specifically:
- NumInput() = 0. Parameter binding is not supported; callers that
pass args see errArgsUnsupported. Migrations author literal SQL
so this limitation is not felt; v1+ wires parameter binding via
the Spark Connect parameterised-query proto.
- No prepared-statement caching; every Exec/Query re-sends the
statement. Right-sized for goose's request rate; v1+ adds a
server-side plan cache if latency-sensitive callers appear.
- Rows wraps a materialised DataFrame.Collect slice. Suits small
reads (the SELECTs goose fires against the version table).
Larger scans should bypass database/sql entirely and use the
native DataFrame / iter.Seq2 path.
Tests cover: DSN parsing (plain, with token, with format, case
normalisation, missing scheme, empty, unknown format); sql.Register
side-effect lands under the name "spark"; OpenConnector rejects bad
DSNs; Stmt rejects args in both Exec + Query paths; Rows advances
through a fake result and returns io.EOF on exhaustion; Result
returns -1 for RowsAffected and errors for LastInsertId; Tx commits
silently and errors on Rollback.
ok github.com/datalakego/spark-connect-go/spark/sql/driver 0.606s
Strictly additive: zero changes to existing files under spark/sql/.
Diff against upstream apache/spark-connect-go master is additions
only.
* chore: gofumpt alignment on rowFake methods in driver_test.go
---------
Co-authored-by: caldempsey <8885269+caldempsey@users.noreply.github.com>
The typed surface on the fork drifted past its charter. DataFrameOf[T]
had Where / Limit / OrderBy transformation methods, and the package
exposed SqlAs / TableAs / SqlTyped / TypedDataFrame / Into on top of
As. The design contract (lakeorm/TYPING.md) is that typed wrappers
are edge-only — transformations stay on the untyped DataFrame; the
type parameter participates only at materialisation.
Trim to exactly the promised surface:
- Type: DataFrameOf[T any]
- New: As[T](df) (*DataFrameOf[T], error) -- constructor
- Keep: Collect[T](ctx, df) ([]T, error) -- free fn
- Keep: Stream[T](ctx, df) iter.Seq2[T, error] -- free fn
- Keep: First[T](ctx, df) (*T, error) -- free fn
- Methods: (*DataFrameOf[T]).{DataFrame, Collect, Stream, First}
Removed: .Where / .Limit / .OrderBy methods, Dataset[T] alias,
SqlAs / TableAs / SqlTyped / TypedDataFrame free functions, Into
non-generic scanner, and the internal ops/datasetOp/resolveDataFrame/
clone machinery that existed only to support the removed
transformation methods.
Callers that were using SqlAs / TableAs move to session.Sql /
session.Table + As[T]. Callers that were using .Where / .Limit /
.OrderBy move to untyped DataFrame.Where / Limit / Sort + re-typing
via As[T] at the materialisation edge.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Author
|
Sorry, opened against wrong remote — this is for the datalakego fork, not upstream. Closing and moving. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
The typed surface on the fork had drifted past its charter. This PR trims `DataFrameOf[T]` back to the edge-typing wrapper role the design contract pins (lakeorm/TYPING.md): transformations stay on the untyped DataFrame; the type parameter participates only at materialisation.
Final typed API
Removed
Migration
Test plan
🤖 Generated with Claude Code