Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
78c7091
[SPARK-52780] Add ToLocalIterator and Arrow Record Streaming
caldempsey Jul 13, 2025
5e0a589
[debug] a case where context cancellations result in a panic
caldempsey Jul 13, 2025
c277f5b
[SPARK-52780] fix test compilation
caldempsey Jul 13, 2025
7ce5d47
[SPARK-52780] TestRowIterator_BothChannelsClosedCleanly should EOF (D…
caldempsey Jul 13, 2025
2b6044a
[SPARK-52780] fix linting error
caldempsey Jul 13, 2025
1a897ef
[SPARK-52780] rowiterator.go channel closing should deterministically…
caldempsey Jul 13, 2025
8c18703
[SPARK-52780] lint errors
caldempsey Jul 13, 2025
3dcab75
fix: merge
caldempsey Sep 3, 2025
485067e
Merge branch 'master' into callum/SPARK-52780
caldempsey Sep 3, 2025
f285079
feat: update the client base to provide lazy fetch
caldempsey Sep 3, 2025
917ce9f
feat: rename ToLocalIterator to StreamRows, establish RowIterator as …
caldempsey Sep 3, 2025
ad7e935
fix: golint-ci
caldempsey Sep 3, 2025
d38170b
fix: improve test doc-comments
caldempsey Sep 3, 2025
a18468f
feat: add tests for streaming rows in DataFrame operations including:
caldempsey Oct 22, 2025
434a579
fix: update Spark version to 4.0.1 in build workflow
caldempsey Oct 22, 2025
0432bde
fix: remove debug print lines from ToTable()
caldempsey Mar 1, 2026
928e9b3
fix: remove c.done race condition in ToRecordSequence
caldempsey Mar 1, 2026
146e423
fix: remove NewRowPull2, fold EOF handling into NewRowSequence
caldempsey Mar 1, 2026
aa4b293
fix: extract rowIterFromRecord to simplify NewRowSequence
caldempsey Mar 1, 2026
fb2a9aa
fix: prefer explicit error yield
caldempsey Mar 1, 2026
b29e5ef
fix: address feedback
caldempsey Mar 5, 2026
8cc399e
refactor/7: rename module path to github.com/caldempsey/spark-connect…
caldempsey Apr 19, 2026
6799238
fix/5: run build workflow on push to main (#6)
caldempsey Apr 19, 2026
55b4050
fix/8: fall back to archive.apache.org when dlcdn 404s (#9)
caldempsey Apr 19, 2026
8ad8d7d
feat/3: expose gRPC transport options on the channel + session builde…
caldempsey Apr 19, 2026
f340a1e
feat/4: add typed DataFrame[T] (#12)
caldempsey Apr 19, 2026
5a8a35d
docs/13: drop upstream "not for production" notice from fork README (…
caldempsey Apr 19, 2026
ccfe0de
feat: top-level typed helpers + Dataset[T] alias (#15)
caldempsey Apr 19, 2026
00c6971
chore: rename module path to github.com/datalakego/spark-connect-go (…
caldempsey Apr 19, 2026
abc5e8d
feat: Dataset[T].Where / Limit / OrderBy / First / Stream methods (#17)
caldempsey Apr 19, 2026
55f88a6
feat: ErrClusterNotReady + IsClusterNotReady + NewClusterNotReady (#19)
caldempsey Apr 19, 2026
8d64dd5
feat: SqlAs[T] + TableAs[T] free functions; SqlTyped deprecated (#18)
caldempsey Apr 19, 2026
0c8bf05
feat: database/sql driver over Spark Connect (#20)
caldempsey Apr 19, 2026
d36ed13
refactor: trim typed API to edge-wrapper shape only (#21)
caldempsey Apr 19, 2026
5c4c4a3
refactor: rename org github.com/datalakego → github.com/datalake-go (…
caldempsey Apr 19, 2026
6c4fc25
feat: parameter binding + drop format DSN param from database/sql dri…
caldempsey Apr 19, 2026
b7342d7
fix: dedupe rowiterator_test.go imports after fork reset
caldempsey Apr 20, 2026
6812fad
docs: refresh README for datalake-go fork
caldempsey Apr 20, 2026
b1b5edd
docs(readme): refocus on the maintained-fork framing, restructure usage
caldempsey Apr 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ on:
pull_request:
push:
branches:
- master
- main

env:
SPARK_VERSION: '4.0.0'
SPARK_VERSION: '4.0.1'
HADOOP_VERSION: '3'

permissions:
Expand Down Expand Up @@ -84,7 +84,12 @@ jobs:
echo "Apache Spark is not installed"
# Access the directory.
mkdir -p ~/deps/
wget -q https://dlcdn.apache.org/spark/spark-${{ env.SPARK_VERSION }}/spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}.tgz
# dlcdn.apache.org only keeps current releases on its mirrors and
# occasionally 404s on older ones. archive.apache.org is the
# canonical mirror and never rotates — use it as a fallback.
ARCHIVE=spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}.tgz
wget -q https://dlcdn.apache.org/spark/spark-${{ env.SPARK_VERSION }}/$ARCHIVE || \
wget -q https://archive.apache.org/dist/spark/spark-${{ env.SPARK_VERSION }}/$ARCHIVE
tar -xzf spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}.tgz -C ~/deps/
# Delete the old file
rm spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}.tgz
Expand Down
155 changes: 115 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,76 +1,151 @@
# Apache Spark Connect Client for Golang
# spark-connect-go

This project houses the **experimental** client for [Spark
Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html) for
[Apache Spark](https://spark.apache.org/) written in [Golang](https://go.dev/).
> A maintained fork of [`apache/spark-connect-go`](https://github.com/apache/spark-connect-go) with a `database/sql` driver, edge-typed DataFrames, exposed gRPC dial options, and a typed `ClusterNotReady` error. Tracks upstream; deltas are queued to upstream.

## Current State of the Project
Spark Connect is Spark's [language-neutral gRPC protocol](https://spark.apache.org/docs/latest/spark-connect-overview.html). The upstream Go client is the official reference implementation. This fork carries the deltas needed for production usage while those patches work their way upstream — drop in by swapping the import path; the session API, DataFrame surface, and protobuf stubs are unchanged.

Currently, the Spark Connect client for Golang is highly experimental and should
not be used in any production setting. In addition, the PMC of the Apache Spark
project reserves the right to withdraw and abandon the development of this project
if it is not sustainable.
## What's added

## Getting started
- **`database/sql` driver.** `sql.Open("spark", "sc://host:port")` works with goose, sqlc-generated code, pgx-style consumers — anything that speaks `database/sql`. Registered under the name `spark` in `spark/sql/driver`. `$N` positional placeholders are rendered client-side into Spark SQL literals (the native parameter proto isn't reliable across every supported Spark version).
- **Edge-typed DataFrames.** `As[T](df) → *DataFrameOf[T]` caches a reflected row plan once; `Collect`, `Stream`, `First` materialise into struct types at the point you know the result shape. Top-level `Collect[T] / Stream[T] / First[T]` helpers do the `As[T]` plus the call in one shot.
- **`SparkSessionBuilder.WithDialOptions`.** gRPC dial options exposed on the builder — auth interceptors, TLS, observability handlers wire in without subclassing.
- **`sparkerrors.IsClusterNotReady(err)`.** Typed error for cluster cold-start states. Databricks serverless clusters take 30-90s to warm; retry logic upstack needs a reliable signal instead of string-matching on error messages.

This section explains how to run Spark Connect Go locally.
Every delta is tracked as a PR queued for `apache/spark-connect-go`. When a delta lands upstream we drop it from the fork. Long-term goal is zero deltas.

Step 1: Install Golang: https://go.dev/doc/install.
## Install

Step 2: Ensure you have installed `buf CLI` installed, [more info here](https://buf.build/docs/installation/)
```bash
go get github.com/datalake-go/spark-connect-go
```

Step 3: Run the following commands to setup the Spark Connect client.
Requires a Spark Connect server (Spark 3.4+).

Building with Spark in case you need to re-generate the source files from the proto sources.
## Quick start

```
git clone https://github.com/apache/spark-connect-go.git
git submodule update --init --recursive
```go
import (
sparksql "github.com/datalake-go/spark-connect-go/spark/sql"
)

make gen && make test
session, err := sparksql.NewSessionBuilder().
Remote("sc://spark.internal:15002").
Build(ctx)
if err != nil { /* ... */ }
defer session.Stop()

df, _ := session.Sql(ctx, "SELECT id, email FROM users WHERE tier = 'gold'")
_ = df.Show(ctx, 20, false)
```

Building without Spark
The `sparksql` alias avoids collision with stdlib `database/sql` — the actual package name is `sql`.

### Using DataFrames

The untyped `DataFrame` is the building block — same surface as upstream. Transformations (`Where`, `Limit`, `OrderBy`, `Select`, `Join`, `GroupBy`) compose lazily and execute on the Spark side; materialisers (`Show`, `Collect`, `First`, `Count`) round-trip and return `[]types.Row`.

```go
df, _ := session.Sql(ctx, "SELECT id, email, created_at FROM users")

filtered, _ := df.Where(ctx, "tier = 'gold'")
top, _ := filtered.OrderBy(ctx, "created_at DESC").Limit(ctx, 100)

rows, _ := top.Collect(ctx)
for _, r := range rows {
// r is types.Row — positional access by index or by name
}
```
git clone https://github.com/apache/spark-connect-go.git
make && make test
```

Step 4: Setup the Spark Driver on localhost.
Use this when the result shape is dynamic, or as the composition surface that you eventually re-type at the edge.

### Using Typed DataFrames

`As[T](df) → *DataFrameOf[T]` is the typed surface. It binds a result shape to a struct, caches the reflected row plan once, and materialises into `[]T` / `*T` without re-validating on every call.

1. [Download Spark distribution](https://spark.apache.org/downloads.html) (4.0.0+), unzip the package.
```go
type User struct {
ID string `spark:"id"`
Email string `spark:"email"`
Created time.Time `spark:"created_at"`
}

df, _ := session.Sql(ctx, "SELECT id, email, created_at FROM users WHERE tier = 'gold'")
typed, _ := sparksql.As[User](df)

users, _ := typed.Collect(ctx)
alice, err := typed.First(ctx)
if errors.Is(err, sparksql.ErrNotFound) { /* zero rows */ }
```

2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):
If you only need the result once, `Collect[T] / First[T] / Stream[T]` are top-level helpers that fold `As[T]` into the call:

```go
users, _ := sparksql.Collect[User](ctx, df)
```
sbin/start-connect-server.sh

Untagged fields map by snake_case'd field name, so plain Go structs work without tags. `spark:"-"` skips a field. `*DataFrameOf[T]` deliberately has no transformation methods — `Where` / `Limit` / `Select` / `Join` change the row shape and would make `T` lie. Compose on the untyped `DataFrame`, then re-type at the edge:

```go
typed, _ := sparksql.As[User](df)
narrower, _ := typed.DataFrame().Select(ctx, "id", "email") // back to untyped
ids, _ := sparksql.Collect[struct{ ID string `spark:"id"` }](ctx, narrower)
```

Step 5: Run the example Go application.
### Streaming Results

`Stream[T]` returns a Go 1.23 [`iter.Seq2[T, error]`](https://pkg.go.dev/iter#Seq2). One of the things Go gives us over the Python / Scala clients is a real pull-based iterator — rows decode one at a time as the gRPC stream resolves them, with constant memory regardless of result size. No need to buffer the whole result, no callback API: just `range`.

```go
for row, err := range sparksql.Stream[User](ctx, df) {
if err != nil { break }
// use row — decoded from the next Arrow batch as it lands
}
```
go run cmd/spark-connect-example-spark-session/main.go

Schema binding happens on the first row; if a later row's schema diverges from the first, the error surfaces through the iterator (no per-row panics).

Use `Stream[T]` when result sets are large, when you want to short-circuit early without dragging the rest of the rows over the wire, or when you're piping into another `iter.Seq2` consumer.

### `database/sql` driver

```go
import (
"database/sql"
_ "github.com/datalake-go/spark-connect-go/spark/sql/driver"
)

db, _ := sql.Open("spark", "sc://spark.internal:15002")
rows, _ := db.QueryContext(ctx, "SELECT id FROM users WHERE tier = $1", "gold")
```

## Runnning Spark Connect Go Application in a Spark Cluster
`$N` placeholders render with type-aware quoting (strings, numbers, bools, `[]byte`, `time.Time`). `?` placeholders aren't supported — most `database/sql`-adjacent codegen (sqlc, goose dialects, pgx patterns) emits `$N`, so the narrower grammar keeps the renderer simple.

To run the Spark Connect Go application in a Spark Cluster, you need to build the Go application and submit it to the Spark Cluster. You can find a more detailed example runner and wrapper script in the `java` directory.
### Cluster cold-start

See the guide here: [Sample Spark-Submit Wrapper](java/README.md).
```go
import "github.com/datalake-go/spark-connect-go/spark/sparkerrors"

## How to write Spark Connect Go Application in your own project
df, err := session.Sql(ctx, query)
if sparkerrors.IsClusterNotReady(err) {
// retry with backoff — Databricks serverless usually warms in 30-90s
}
```

## Building from source

See [Quick Start Guide](quick-start.md)
```bash
git clone https://github.com/datalake-go/spark-connect-go.git
cd spark-connect-go
make && make test
```

## High Level Design
Regenerating protobuf stubs from the Spark submodule:

The overall goal of the design is to find a good balance of principle of the least surprise for
develoeprs that are familiar with the APIs of Apache Spark and idiomatic Go usage. The high-level
structure of the packages follows roughly the PySpark giudance but with Go idioms.
```bash
git submodule update --init --recursive
make gen && make test
```

## Contributing

Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)
for information on how to get started contributing to the project.
Feature work that could land upstream should be proposed against [`apache/spark-connect-go`](https://github.com/apache/spark-connect-go) first. Fork-only changes (anything that wouldn't be accepted upstream) stay on this tree. See [CONTRIBUTING.md](CONTRIBUTING.md).
2 changes: 1 addition & 1 deletion cmd/spark-connect-example-raw-grpc-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"log"
"time"

proto "github.com/apache/spark-connect-go/internal/generated"
proto "github.com/datalake-go/spark-connect-go/internal/generated"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down
8 changes: 4 additions & 4 deletions cmd/spark-connect-example-spark-session/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"fmt"
"log"

"github.com/apache/spark-connect-go/spark/sql/types"
"github.com/datalake-go/spark-connect-go/spark/sql/types"

"github.com/apache/spark-connect-go/spark/sql/functions"
"github.com/datalake-go/spark-connect-go/spark/sql/functions"

"github.com/apache/spark-connect-go/spark/sql"
"github.com/apache/spark-connect-go/spark/sql/utils"
"github.com/datalake-go/spark-connect-go/spark/sql"
"github.com/datalake-go/spark-connect-go/spark/sql/utils"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

module github.com/apache/spark-connect-go
module github.com/datalake-go/spark-connect-go

go 1.23.2
go 1.24

require (
github.com/apache/arrow-go/v18 v18.4.0
Expand Down
Loading