Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.25'
go-version: '1.26'

- uses: actions/checkout@v4

Expand Down Expand Up @@ -87,4 +87,3 @@ jobs:
export PATH="$(pwd)/lib:$PATH"
cd example
go run main.go

197 changes: 197 additions & 0 deletions arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package lbug

// #include "lbug.h"
// #include <stdlib.h>
import "C"

import (
"fmt"
"unsafe"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/cdata"
)

func exportArrowBatches(batches []arrow.RecordBatch) (*C.struct_ArrowSchema, []C.struct_ArrowArray, error) {
if len(batches) == 0 {
return nil, nil, fmt.Errorf("at least one Arrow record batch is required")
}
schema := batches[0].Schema()
for i, batch := range batches {
if !batch.Schema().Equal(schema) {
return nil, nil, fmt.Errorf("Arrow record batch %d has a different schema", i)
}
}

cSchema := new(C.struct_ArrowSchema)
cArrays := make([]C.struct_ArrowArray, len(batches))
cdata.ExportArrowSchema(schema, cdata.SchemaFromPtr(uintptr(unsafe.Pointer(cSchema))))
for i, batch := range batches {
cdata.ExportArrowRecordBatch(batch, cdata.ArrayFromPtr(uintptr(unsafe.Pointer(&cArrays[i]))), nil)
}
return cSchema, cArrays, nil
}

func releaseExportedArrow(schema *C.struct_ArrowSchema, arrays []C.struct_ArrowArray) {
if schema != nil {
cdata.ReleaseCArrowSchema(cdata.SchemaFromPtr(uintptr(unsafe.Pointer(schema))))
}
for i := range arrays {
cdata.ReleaseCArrowArray(cdata.ArrayFromPtr(uintptr(unsafe.Pointer(&arrays[i]))))
}
}

func lastCAPIError(fallback string) error {
cErr := C.lbug_get_last_error()
if cErr == nil {
return fmt.Errorf("%s", fallback)
}
defer C.lbug_destroy_string(cErr)
return fmt.Errorf("%s", C.GoString(cErr))
}

func queryResultFromArrowCall(conn *Connection, queryResult *QueryResult, status C.lbug_state, fallback string) (*QueryResult, error) {
queryResult.connection = conn
if status != C.LbugSuccess {
queryResult.Close()
return nil, lastCAPIError(fallback)
}
if !C.lbug_query_result_is_success(&queryResult.cQueryResult) {
cErrMsg := C.lbug_query_result_get_error_message(&queryResult.cQueryResult)
defer C.lbug_destroy_string(cErrMsg)
queryResult.Close()
return nil, fmt.Errorf("%s", C.GoString(cErrMsg))
}
return queryResult, nil
}

// CreateArrowTable registers Arrow memory as a node table.
// The first column is used as the table primary key.
// The registered table may outlive this call, so batches should be built with
// memory that is safe to hold through the Arrow C Data Interface.
func (conn *Connection) CreateArrowTable(tableName string, batches []arrow.RecordBatch) (*QueryResult, error) {
cSchema, cArrays, err := exportArrowBatches(batches)
if err != nil {
return nil, err
}
cTableName := C.CString(tableName)
defer C.free(unsafe.Pointer(cTableName))

queryResult := &QueryResult{}
status := C.lbug_connection_create_arrow_table(&conn.cConnection, cTableName, cSchema,
(*C.struct_ArrowArray)(unsafe.Pointer(&cArrays[0])), C.uint64_t(len(cArrays)),
&queryResult.cQueryResult)
if status == C.LbugSuccess {
cSchema = nil
cArrays = nil
}
defer releaseExportedArrow(cSchema, cArrays)
return queryResultFromArrowCall(conn, queryResult, status, "failed to create Arrow table")
}

// CreateArrowRelTable registers Arrow memory as a relationship table.
// The Arrow schema must include endpoint columns named "from" and "to".
// The registered table may outlive this call, so batches should be built with
// memory that is safe to hold through the Arrow C Data Interface.
func (conn *Connection) CreateArrowRelTable(tableName string, batches []arrow.RecordBatch, srcTableName string, dstTableName string) (*QueryResult, error) {
cSchema, cArrays, err := exportArrowBatches(batches)
if err != nil {
return nil, err
}
cTableName := C.CString(tableName)
cSrcTableName := C.CString(srcTableName)
cDstTableName := C.CString(dstTableName)
defer C.free(unsafe.Pointer(cTableName))
defer C.free(unsafe.Pointer(cSrcTableName))
defer C.free(unsafe.Pointer(cDstTableName))

queryResult := &QueryResult{}
status := C.lbug_connection_create_arrow_rel_table(&conn.cConnection, cTableName,
cSrcTableName, cDstTableName, cSchema, (*C.struct_ArrowArray)(unsafe.Pointer(&cArrays[0])),
C.uint64_t(len(cArrays)), &queryResult.cQueryResult)
if status == C.LbugSuccess {
cSchema = nil
cArrays = nil
}
defer releaseExportedArrow(cSchema, cArrays)
return queryResultFromArrowCall(conn, queryResult, status, "failed to create Arrow relationship table")
}

// CreateArrowRelTableCSR registers Arrow memory in CSR form as a relationship table.
// If dstColName is omitted, the destination offset column defaults to "to".
// The registered table may outlive this call, so batches should be built with
// memory that is safe to hold through the Arrow C Data Interface.
func (conn *Connection) CreateArrowRelTableCSR(tableName string, indicesBatches []arrow.RecordBatch, indptrBatches []arrow.RecordBatch, srcTableName string, dstTableName string, dstColName ...string) (*QueryResult, error) {
cIndicesSchema, cIndicesArrays, err := exportArrowBatches(indicesBatches)
if err != nil {
return nil, err
}
cIndptrSchema, cIndptrArrays, err := exportArrowBatches(indptrBatches)
if err != nil {
releaseExportedArrow(cIndicesSchema, cIndicesArrays)
return nil, err
}

cTableName := C.CString(tableName)
cSrcTableName := C.CString(srcTableName)
cDstTableName := C.CString(dstTableName)
var cDstColName *C.char
if len(dstColName) > 0 && dstColName[0] != "" {
cDstColName = C.CString(dstColName[0])
defer C.free(unsafe.Pointer(cDstColName))
}
defer C.free(unsafe.Pointer(cTableName))
defer C.free(unsafe.Pointer(cSrcTableName))
defer C.free(unsafe.Pointer(cDstTableName))

queryResult := &QueryResult{}
status := C.lbug_connection_create_arrow_rel_table_csr(&conn.cConnection, cTableName,
cSrcTableName, cDstTableName, cIndicesSchema,
(*C.struct_ArrowArray)(unsafe.Pointer(&cIndicesArrays[0])),
C.uint64_t(len(cIndicesArrays)), cIndptrSchema,
(*C.struct_ArrowArray)(unsafe.Pointer(&cIndptrArrays[0])),
C.uint64_t(len(cIndptrArrays)), cDstColName, &queryResult.cQueryResult)
if status == C.LbugSuccess {
cIndicesSchema = nil
cIndicesArrays = nil
cIndptrSchema = nil
cIndptrArrays = nil
}
defer releaseExportedArrow(cIndicesSchema, cIndicesArrays)
defer releaseExportedArrow(cIndptrSchema, cIndptrArrays)
return queryResultFromArrowCall(conn, queryResult, status, "failed to create Arrow CSR relationship table")
}

// DropArrowTable drops an Arrow memory-backed table registered on this connection.
func (conn *Connection) DropArrowTable(tableName string) (*QueryResult, error) {
cTableName := C.CString(tableName)
defer C.free(unsafe.Pointer(cTableName))
queryResult := &QueryResult{}
status := C.lbug_connection_drop_arrow_table(&conn.cConnection, cTableName, &queryResult.cQueryResult)
return queryResultFromArrowCall(conn, queryResult, status, "failed to drop Arrow table")
}

// GetArrowSchema returns the query result schema as an Arrow schema.
func (queryResult *QueryResult) GetArrowSchema() (*arrow.Schema, error) {
var cSchema C.struct_ArrowSchema
status := C.lbug_query_result_get_arrow_schema(&queryResult.cQueryResult, &cSchema)
if status != C.LbugSuccess {
return nil, lastCAPIError("failed to get Arrow schema")
}
return cdata.ImportCArrowSchema(cdata.SchemaFromPtr(uintptr(unsafe.Pointer(&cSchema))))
}

// GetNextArrowChunk returns the next chunk of the query result as an Arrow record batch.
func (queryResult *QueryResult) GetNextArrowChunk(chunkSize int64) (arrow.RecordBatch, error) {
var cArray C.struct_ArrowArray
status := C.lbug_query_result_get_next_arrow_chunk(&queryResult.cQueryResult, C.int64_t(chunkSize), &cArray)
if status != C.LbugSuccess {
return nil, lastCAPIError("failed to get next Arrow chunk")
}
schema, err := queryResult.GetArrowSchema()
if err != nil {
cdata.ReleaseCArrowArray(cdata.ArrayFromPtr(uintptr(unsafe.Pointer(&cArray))))
return nil, err
}
return cdata.ImportCRecordBatchWithSchema(cdata.ArrayFromPtr(uintptr(unsafe.Pointer(&cArray))), schema)
}
124 changes: 124 additions & 0 deletions arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package lbug

import (
"path/filepath"
"strings"
"testing"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func setupArrowTestConnection(t *testing.T) (*Database, *Connection) {
t.Helper()
dbPath := filepath.Join(t.TempDir(), "testdb")
dbPath = strings.ReplaceAll(dbPath, "\\", "/")
db, err := OpenDatabase(dbPath, DefaultSystemConfig())
require.NoError(t, err)
conn, err := OpenConnection(db)
require.NoError(t, err)
t.Cleanup(func() {
conn.Close()
db.Close()
})
return db, conn
}

func newPersonBatch(t *testing.T, alloc memory.Allocator) arrow.RecordBatch {
t.Helper()
schema := arrow.NewSchema([]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
{Name: "name", Type: arrow.BinaryTypes.String},
}, nil)
builder := array.NewRecordBuilder(alloc, schema)
defer builder.Release()
builder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2}, nil)
builder.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice", "Bob"}, nil)
batch := builder.NewRecordBatch()
t.Cleanup(batch.Release)
return batch
}

func TestCreateArrowTableAndDrop(t *testing.T) {
_, conn := setupArrowTestConnection(t)
alloc := memory.DefaultAllocator
batch := newPersonBatch(t, alloc)

result, err := conn.CreateArrowTable("Person", []arrow.RecordBatch{batch})
require.NoError(t, err)
result.Close()

result, err = conn.Query("MATCH (p:Person) RETURN p.name ORDER BY p.id;")
require.NoError(t, err)
assert.Equal(t, "p.name\nAlice\nBob\n", result.ToString())
result.Close()

result, err = conn.DropArrowTable("Person")
require.NoError(t, err)
result.Close()

_, err = conn.Query("MATCH (p:Person) RETURN p.name;")
require.Error(t, err)
assert.Contains(t, err.Error(), "Table Person does not exist")
}

func TestCreateArrowRelTableCSR(t *testing.T) {
_, conn := setupArrowTestConnection(t)
alloc := memory.DefaultAllocator
nodes := newPersonBatch(t, alloc)
result, err := conn.CreateArrowTable("Person", []arrow.RecordBatch{nodes})
require.NoError(t, err)
result.Close()

indicesSchema := arrow.NewSchema([]arrow.Field{
{Name: "to", Type: arrow.PrimitiveTypes.Uint64},
{Name: "weight", Type: arrow.PrimitiveTypes.Int64},
}, nil)
indicesBuilder := array.NewRecordBuilder(alloc, indicesSchema)
defer indicesBuilder.Release()
indicesBuilder.Field(0).(*array.Uint64Builder).AppendValues([]uint64{1, 0}, nil)
indicesBuilder.Field(1).(*array.Int64Builder).AppendValues([]int64{7, 9}, nil)
indices := indicesBuilder.NewRecordBatch()
defer indices.Release()

indptrSchema := arrow.NewSchema([]arrow.Field{
{Name: "indptr", Type: arrow.PrimitiveTypes.Uint64},
}, nil)
indptrBuilder := array.NewRecordBuilder(alloc, indptrSchema)
defer indptrBuilder.Release()
indptrBuilder.Field(0).(*array.Uint64Builder).AppendValues([]uint64{0, 1, 2}, nil)
indptr := indptrBuilder.NewRecordBatch()
defer indptr.Release()

result, err = conn.CreateArrowRelTableCSR("Knows", []arrow.RecordBatch{indices},
[]arrow.RecordBatch{indptr}, "Person", "Person")
require.NoError(t, err)
result.Close()

result, err = conn.Query("MATCH (a:Person)-[r:Knows]->(b:Person) RETURN a.id, r.weight, b.id ORDER BY a.id, b.id;")
require.NoError(t, err)
assert.Equal(t, "a.id|r.weight|b.id\n1|7|2\n2|9|1\n", result.ToString())
result.Close()
}

func TestQueryResultArrowChunk(t *testing.T) {
_, conn := setupArrowTestConnection(t)
result, err := conn.Query("RETURN CAST(1, \"INT64\") AS one;")
require.NoError(t, err)
defer result.Close()

schema, err := result.GetArrowSchema()
require.NoError(t, err)
require.Len(t, schema.Fields(), 1)
assert.Equal(t, "one", schema.Field(0).Name)

batch, err := result.GetNextArrowChunk(8)
require.NoError(t, err)
defer batch.Release()
require.Equal(t, int64(1), batch.NumRows())
values := batch.Column(0).(*array.Int64)
assert.Equal(t, int64(1), values.Value(0))
}
15 changes: 13 additions & 2 deletions example/go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
module github.com/LadybugDB/go-ladybug/examples

go 1.25
go 1.26

replace github.com/LadybugDB/go-ladybug => ../

require github.com/LadybugDB/go-ladybug v0.0.0

require github.com/google/uuid v1.6.0 // indirect

require github.com/shopspring/decimal v1.4.0 // indirect
require (
github.com/apache/arrow-go/v18 v18.6.0 // indirect
github.com/goccy/go-json v0.10.6 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/sys v0.43.0 // indirect
)
Loading
Loading