diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index beb33df..ef4e192 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -17,7 +17,7 @@ jobs: name: Build and test strategy: matrix: - runner: [ubuntu-24.04, ubuntu-24.04-arm, macos-14] + runner: [ubuntu-24.04, ubuntu-24.04-arm, macos-latest] runs-on: ${{ matrix.runner }} steps: - uses: actions/checkout@v4 @@ -35,10 +35,7 @@ jobs: - name: Prepare test datasets run: | - git clone --depth 1 https://github.com/LadybugDB/go-ladybug ../go-ladybug - mkdir -p ../../dataset/tinysnb ../../dataset/tinysnb-serial - cp -R ../go-ladybug/dataset/tinysnb/. ../../dataset/tinysnb/ - cp ../go-ladybug/dataset/tinysnb/vMoviesSerial.csv ../../dataset/tinysnb-serial/vMovies.csv + git clone --depth 1 https://github.com/ladybugdb/dataset.git ./dataset - name: Build env: diff --git a/build.gradle b/build.gradle index 4386ccd..a5a7e11 100644 --- a/build.gradle +++ b/build.gradle @@ -36,8 +36,12 @@ repositories { } dependencies { + implementation 'org.apache.arrow:arrow-c-data:18.2.0' + implementation 'org.apache.arrow:arrow-vector:18.2.0' + implementation 'org.apache.arrow:arrow-memory-core:18.2.0' testImplementation platform('org.junit:junit-bom:5.10.0') testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.apache.arrow:arrow-memory-netty:18.2.0' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } diff --git a/src/jni/lbug_java.cpp b/src/jni/lbug_java.cpp index 8f1cc6c..0a200e6 100644 --- a/src/jni/lbug_java.cpp +++ b/src/jni/lbug_java.cpp @@ -524,6 +524,19 @@ void bindJavaParamsToPreparedStatement(JNIEnv* env, lbug_prepared_statement* pre } } +void throwLastError(JNIEnv* env, const char* fallback) { + if (auto* errorMessage = lbug_get_last_error()) { + throwJNIException(env, errorMessage); + free(errorMessage); + } else { + throwJNIException(env, fallback); + } +} + +jobject createQueryResultObject(JNIEnv* env, lbug_query_result* queryResult) { + return createJavaObject(env, queryResult, J_C_QueryResult, J_C_QueryResult_F_qr_ref); +} + /** * All Database native functions */ @@ -550,16 +563,20 @@ JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugNativeReloadLibrary(JNIEnv* } } -JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugDatabaseInit(JNIEnv* env, jclass, - jstring databasePath, jlong bufferPoolSize, jboolean enableCompression, jboolean readOnly, - jlong maxDbSize, jboolean autoCheckpoint, jlong checkpointThreshold, - jboolean throwOnWalReplayFailure, jboolean enableChecksums) { +JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugDatabaseInitExtended(JNIEnv* env, jclass, + jstring databasePath, jlong bufferPoolSize, jlong maxNumThreads, jboolean enableCompression, + jboolean readOnly, jlong maxDbSize, jboolean autoCheckpoint, jlong checkpointThreshold, + jboolean throwOnWalReplayFailure, jboolean enableChecksums, jboolean enableMultiWrites, + jboolean enableDefaultHashIndex) { try { const char* path = env->GetStringUTFChars(databasePath, JNI_FALSE); auto systemConfig = lbug_default_system_config(); if (bufferPoolSize != 0) { systemConfig.buffer_pool_size = static_cast(bufferPoolSize); } + if (maxNumThreads != 0) { + systemConfig.max_num_threads = static_cast(maxNumThreads); + } systemConfig.enable_compression = enableCompression; systemConfig.read_only = readOnly; if (maxDbSize != 0) { @@ -578,6 +595,8 @@ JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugDatabaseInit(JNIEnv* env, } systemConfig.throw_on_wal_replay_failure = throwOnWalReplayFailure; systemConfig.enable_checksums = enableChecksums; + systemConfig.enable_multi_writes = enableMultiWrites; + systemConfig.enable_default_hash_index = enableDefaultHashIndex; try { auto* db = new lbug_database(); auto state = lbug_database_init(path, systemConfig, db); @@ -605,6 +624,15 @@ JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugDatabaseInit(JNIEnv* env, return 0; } +JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugDatabaseInit(JNIEnv* env, jclass clazz, + jstring databasePath, jlong bufferPoolSize, jboolean enableCompression, jboolean readOnly, + jlong maxDbSize, jboolean autoCheckpoint, jlong checkpointThreshold, + jboolean throwOnWalReplayFailure, jboolean enableChecksums) { + return Java_com_ladybugdb_Native_lbugDatabaseInitExtended(env, clazz, databasePath, + bufferPoolSize, 0, enableCompression, readOnly, maxDbSize, autoCheckpoint, + checkpointThreshold, throwOnWalReplayFailure, enableChecksums, false, true); +} + JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugDatabaseDestroy(JNIEnv* env, jclass, jobject thisDB) { try { @@ -792,6 +820,118 @@ JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugConnectionSetQueryTimeout(J } } +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugConnectionCreateArrowTable(JNIEnv* env, + jclass, jobject thisConn, jstring tableName, jlong arrowSchemaAddress, jlong arrowArraysAddress, + jlong numArrays) { + try { + auto* conn = getConnection(env, thisConn); + std::string table = jstringToUtf8String(env, tableName); + auto* schema = reinterpret_cast(static_cast(arrowSchemaAddress)); + auto* arrays = reinterpret_cast(static_cast(arrowArraysAddress)); + auto* queryResult = new lbug_query_result(); + auto state = lbug_connection_create_arrow_table(conn, table.c_str(), schema, arrays, + static_cast(numArrays), queryResult); + if (state != LbugSuccess) { + delete queryResult; + throwLastError(env, "Failed to create Arrow table"); + return jobject(); + } + return createQueryResultObject(env, queryResult); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugConnectionCreateArrowRelTable(JNIEnv* env, + jclass, jobject thisConn, jstring tableName, jstring srcTableName, jstring dstTableName, + jlong arrowSchemaAddress, jlong arrowArraysAddress, jlong numArrays) { + try { + auto* conn = getConnection(env, thisConn); + std::string table = jstringToUtf8String(env, tableName); + std::string srcTable = jstringToUtf8String(env, srcTableName); + std::string dstTable = jstringToUtf8String(env, dstTableName); + auto* schema = reinterpret_cast(static_cast(arrowSchemaAddress)); + auto* arrays = reinterpret_cast(static_cast(arrowArraysAddress)); + auto* queryResult = new lbug_query_result(); + auto state = lbug_connection_create_arrow_rel_table(conn, table.c_str(), srcTable.c_str(), + dstTable.c_str(), schema, arrays, static_cast(numArrays), queryResult); + if (state != LbugSuccess) { + delete queryResult; + throwLastError(env, "Failed to create Arrow relationship table"); + return jobject(); + } + return createQueryResultObject(env, queryResult); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugConnectionCreateArrowRelTableCSR( + JNIEnv* env, jclass, jobject thisConn, jstring tableName, jstring srcTableName, + jstring dstTableName, jlong indicesSchemaAddress, jlong indicesArraysAddress, + jlong numIndicesArrays, jlong indptrSchemaAddress, jlong indptrArraysAddress, + jlong numIndptrArrays, jstring dstColumnName) { + try { + auto* conn = getConnection(env, thisConn); + std::string table = jstringToUtf8String(env, tableName); + std::string srcTable = jstringToUtf8String(env, srcTableName); + std::string dstTable = jstringToUtf8String(env, dstTableName); + std::string dstColumn = jstringToUtf8String(env, dstColumnName); + auto* indicesSchema = + reinterpret_cast(static_cast(indicesSchemaAddress)); + auto* indicesArrays = + reinterpret_cast(static_cast(indicesArraysAddress)); + auto* indptrSchema = + reinterpret_cast(static_cast(indptrSchemaAddress)); + auto* indptrArrays = + reinterpret_cast(static_cast(indptrArraysAddress)); + auto* queryResult = new lbug_query_result(); + auto* dstColumnPtr = dstColumn.empty() ? nullptr : dstColumn.c_str(); + auto state = lbug_connection_create_arrow_rel_table_csr(conn, table.c_str(), + srcTable.c_str(), dstTable.c_str(), indicesSchema, indicesArrays, + static_cast(numIndicesArrays), indptrSchema, indptrArrays, + static_cast(numIndptrArrays), dstColumnPtr, queryResult); + if (state != LbugSuccess) { + delete queryResult; + throwLastError(env, "Failed to create Arrow CSR relationship table"); + return jobject(); + } + return createQueryResultObject(env, queryResult); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugConnectionDropArrowTable(JNIEnv* env, + jclass, jobject thisConn, jstring tableName) { + try { + auto* conn = getConnection(env, thisConn); + std::string table = jstringToUtf8String(env, tableName); + auto* queryResult = new lbug_query_result(); + auto state = lbug_connection_drop_arrow_table(conn, table.c_str(), queryResult); + if (state != LbugSuccess) { + delete queryResult; + throwLastError(env, "Failed to drop Arrow table"); + return jobject(); + } + return createQueryResultObject(env, queryResult); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + /** * All PreparedStatement native functions */ @@ -822,6 +962,19 @@ JNIEXPORT jboolean JNICALL Java_com_ladybugdb_Native_lbugPreparedStatementIsSucc return jboolean(); } +JNIEXPORT jboolean JNICALL Java_com_ladybugdb_Native_lbugPreparedStatementIsReadOnly(JNIEnv* env, + jclass, jobject thisPS) { + try { + auto* ps = getPreparedStatement(env, thisPS); + return static_cast(lbug_prepared_statement_is_read_only(ps)); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jboolean(); +} + JNIEXPORT jstring JNICALL Java_com_ladybugdb_Native_lbugPreparedStatementGetErrorMessage( JNIEnv* env, jclass, jobject thisPS) { try { @@ -1061,6 +1214,35 @@ JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugQueryResultResetIterator(JN } } +JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugQueryResultGetArrowSchema(JNIEnv* env, jclass, + jobject thisQR, jlong arrowSchemaAddress) { + try { + auto* qr = getQueryResult(env, thisQR); + auto* schema = reinterpret_cast(static_cast(arrowSchemaAddress)); + throwIfError(lbug_query_result_get_arrow_schema(qr, schema), + "Failed to get Arrow schema"); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } +} + +JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugQueryResultGetNextArrowChunk(JNIEnv* env, + jclass, jobject thisQR, jlong chunkSize, jlong arrowArrayAddress) { + try { + auto* qr = getQueryResult(env, thisQR); + auto* array = reinterpret_cast(static_cast(arrowArrayAddress)); + throwIfError(lbug_query_result_get_next_arrow_chunk(qr, static_cast(chunkSize), + array), + "Failed to get next Arrow chunk"); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } +} + /** * All FlatTuple native functions */ @@ -1605,6 +1787,46 @@ JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugValueGetMapValue(JNIEnv* return jobject(); } +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugValueGetRecursiveRelNodeList(JNIEnv* env, + jclass, jobject thisValue) { + try { + auto* v = getValue(env, thisValue); + auto* nodes = new lbug_value(); + if (lbug_value_get_recursive_rel_node_list(v, nodes) != LbugSuccess) { + delete nodes; + return nullptr; + } + jobject result = createJavaObject(env, nodes, J_C_Value, J_C_Value_F_v_ref); + env->SetBooleanField(result, J_C_Value_F_isOwnedByCPP, static_cast(true)); + return result; + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + +JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugValueGetRecursiveRelRelList(JNIEnv* env, + jclass, jobject thisValue) { + try { + auto* v = getValue(env, thisValue); + auto* rels = new lbug_value(); + if (lbug_value_get_recursive_rel_rel_list(v, rels) != LbugSuccess) { + delete rels; + return nullptr; + } + jobject result = createJavaObject(env, rels, J_C_Value, J_C_Value_F_v_ref); + env->SetBooleanField(result, J_C_Value_F_isOwnedByCPP, static_cast(true)); + return result; + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return jobject(); +} + JNIEXPORT jobject JNICALL Java_com_ladybugdb_Native_lbugValueGetDataType(JNIEnv* env, jclass, jobject thisValue) { try { @@ -2193,6 +2415,50 @@ JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugValueGetStructIndex(JNIEnv return jlong(); } +JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugArrowArrayAllocate(JNIEnv* env, jclass, + jlong numArrays) { + try { + if (numArrays <= 0) { + throw NotImplementedException("Number of Arrow arrays must be positive"); + } + auto* arrays = new ArrowArray[static_cast(numArrays)](); + return static_cast(reinterpret_cast(arrays)); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return 0; +} + +JNIEXPORT jlong JNICALL Java_com_ladybugdb_Native_lbugArrowArrayGetAddress(JNIEnv* env, jclass, + jlong arrowArraysAddress, jlong index) { + try { + if (index < 0) { + throw NotImplementedException("Arrow array index must be non-negative"); + } + auto* arrays = reinterpret_cast(static_cast(arrowArraysAddress)); + return static_cast(reinterpret_cast(&arrays[index])); + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } + return 0; +} + +JNIEXPORT void JNICALL Java_com_ladybugdb_Native_lbugArrowArrayFree(JNIEnv* env, jclass, + jlong arrowArraysAddress) { + try { + auto* arrays = reinterpret_cast(static_cast(arrowArraysAddress)); + delete[] arrays; + } catch (const Exception& e) { + throwJNIException(env, e.what()); + } catch (...) { + throwJNIException(env, "Unknown Error"); + } +} + JNIEXPORT jstring JNICALL Java_com_ladybugdb_Native_lbugGetVersion(JNIEnv* env, jclass) { try { return takeOwnedCStringAsJString(env, lbug_get_version()); diff --git a/src/main/java/com/lbugdb/ArrowArrays.java b/src/main/java/com/lbugdb/ArrowArrays.java new file mode 100644 index 0000000..d33485b --- /dev/null +++ b/src/main/java/com/lbugdb/ArrowArrays.java @@ -0,0 +1,41 @@ +package com.ladybugdb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.c.ArrowArray; + +final class ArrowArrays implements AutoCloseable { + private long address; + private final List wrappers = new ArrayList<>(); + + ArrowArrays(long numArrays) { + address = Native.lbugArrowArrayAllocate(numArrays); + } + + long address() { + return address; + } + + long elementAddress(long index) { + return Native.lbugArrowArrayGetAddress(address, index); + } + + ArrowArray wrap(long index) { + ArrowArray array = ArrowArray.wrap(elementAddress(index)); + wrappers.add(array); + return array; + } + + @Override + public void close() { + for (ArrowArray wrapper : wrappers) { + wrapper.close(); + } + wrappers.clear(); + if (address != 0) { + Native.lbugArrowArrayFree(address); + address = 0; + } + } +} diff --git a/src/main/java/com/lbugdb/ArrowUtil.java b/src/main/java/com/lbugdb/ArrowUtil.java new file mode 100644 index 0000000..388e14f --- /dev/null +++ b/src/main/java/com/lbugdb/ArrowUtil.java @@ -0,0 +1,50 @@ +package com.ladybugdb; + +import java.util.List; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.CDataDictionaryProvider; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + +final class ArrowUtil { + private ArrowUtil() { + } + + static ArrowArrays exportRoots(BufferAllocator allocator, List roots, + ArrowSchema schema) { + if (roots == null || roots.isEmpty()) { + throw new IllegalArgumentException("At least one Arrow VectorSchemaRoot is required."); + } + ArrowArrays arrays = new ArrowArrays(roots.size()); + boolean success = false; + try { + for (int i = 0; i < roots.size(); i++) { + VectorSchemaRoot root = roots.get(i); + if (!root.getSchema().equals(roots.get(0).getSchema())) { + throw new IllegalArgumentException("All Arrow VectorSchemaRoot schemas must match."); + } + ArrowArray array = arrays.wrap(i); + if (i == 0) { + Data.exportVectorSchemaRoot(allocator, root, null, array, schema); + } else { + Data.exportVectorSchemaRoot(allocator, root, null, array); + } + } + success = true; + return arrays; + } finally { + if (!success) { + arrays.close(); + } + } + } + + static VectorSchemaRoot importRoot(BufferAllocator allocator, ArrowSchema schema, + ArrowArray array) { + return Data.importVectorSchemaRoot(allocator, array, schema, + (CDataDictionaryProvider) null); + } +} diff --git a/src/main/java/com/lbugdb/Connection.java b/src/main/java/com/lbugdb/Connection.java index 1285f2a..caeb6fa 100644 --- a/src/main/java/com/lbugdb/Connection.java +++ b/src/main/java/com/lbugdb/Connection.java @@ -1,6 +1,11 @@ package com.ladybugdb; import java.util.Map; +import java.util.List; + +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; /** * Connection is used to interact with a Database instance. Each Connection is thread-safe. Multiple @@ -133,4 +138,58 @@ public void setQueryTimeout(long timeoutInMs) { checkNotDestroyed(); Native.lbugConnectionSetQueryTimeout(this, timeoutInMs); } + + /** + * Registers Arrow memory as a node table. The first column is used as the table primary key. + */ + public QueryResult createArrowTable(String tableName, List roots, + BufferAllocator allocator) { + checkNotDestroyed(); + try (ArrowSchema schema = ArrowSchema.allocateNew(allocator); + ArrowArrays arrays = ArrowUtil.exportRoots(allocator, roots, schema)) { + return Native.lbugConnectionCreateArrowTable(this, tableName, schema.memoryAddress(), + arrays.address(), roots.size()); + } + } + + /** + * Registers Arrow memory as a relationship table with endpoint columns named "from" and "to". + */ + public QueryResult createArrowRelTable(String tableName, List roots, + String srcTableName, String dstTableName, BufferAllocator allocator) { + checkNotDestroyed(); + try (ArrowSchema schema = ArrowSchema.allocateNew(allocator); + ArrowArrays arrays = ArrowUtil.exportRoots(allocator, roots, schema)) { + return Native.lbugConnectionCreateArrowRelTable(this, tableName, srcTableName, + dstTableName, schema.memoryAddress(), arrays.address(), roots.size()); + } + } + + /** + * Registers Arrow memory in CSR form as a relationship table. + */ + public QueryResult createArrowRelTableCSR(String tableName, List indicesRoots, + List indptrRoots, String srcTableName, String dstTableName, + String dstColumnName, BufferAllocator allocator) { + checkNotDestroyed(); + try (ArrowSchema indicesSchema = ArrowSchema.allocateNew(allocator); + ArrowArrays indicesArrays = ArrowUtil.exportRoots(allocator, indicesRoots, + indicesSchema); + ArrowSchema indptrSchema = ArrowSchema.allocateNew(allocator); + ArrowArrays indptrArrays = ArrowUtil.exportRoots(allocator, indptrRoots, + indptrSchema)) { + return Native.lbugConnectionCreateArrowRelTableCSR(this, tableName, srcTableName, + dstTableName, indicesSchema.memoryAddress(), indicesArrays.address(), + indicesRoots.size(), indptrSchema.memoryAddress(), indptrArrays.address(), + indptrRoots.size(), dstColumnName); + } + } + + /** + * Drops an Arrow memory-backed table registered on this connection. + */ + public QueryResult dropArrowTable(String tableName) { + checkNotDestroyed(); + return Native.lbugConnectionDropArrowTable(this, tableName); + } } diff --git a/src/main/java/com/lbugdb/Database.java b/src/main/java/com/lbugdb/Database.java index 447972d..e316d31 100644 --- a/src/main/java/com/lbugdb/Database.java +++ b/src/main/java/com/lbugdb/Database.java @@ -15,6 +15,9 @@ public class Database implements AutoCloseable { boolean autoCheckpoint = true; boolean throwOnWalReplayFailure = true; boolean enableChecksums = true; + boolean enableMultiWrites = false; + boolean enableDefaultHashIndex = true; + long max_num_threads; long checkpointThreshold; /** @@ -34,10 +37,26 @@ public Database() { public Database(String databasePath) { this.db_path = databasePath; this.buffer_size = 0; + this.max_num_threads = 0; this.max_db_size = 0; this.checkpointThreshold = -1; - db_ref = Native.lbugDatabaseInit(databasePath, 0, true, false, max_db_size, autoCheckpoint, - checkpointThreshold, throwOnWalReplayFailure, enableChecksums); + db_ref = Native.lbugDatabaseInit(databasePath, 0, true, false, max_db_size, + autoCheckpoint, checkpointThreshold, throwOnWalReplayFailure, enableChecksums); + } + + /** + * Creates a database object. + * + * @param databasePath: Database path. If the path is empty, or equal to + * `:memory:`, the database will be created in memory. + * @param systemConfig: Runtime database configuration. + */ + public Database(String databasePath, SystemConfig systemConfig) { + this(databasePath, systemConfig.bufferPoolSize, systemConfig.maxNumThreads, + systemConfig.enableCompression, systemConfig.readOnly, systemConfig.maxDBSize, + systemConfig.autoCheckpoint, systemConfig.checkpointThreshold, + systemConfig.throwOnWalReplayFailure, systemConfig.enableChecksums, + systemConfig.enableMultiWrites, systemConfig.enableDefaultHashIndex); } /** @@ -71,6 +90,7 @@ public Database(String databasePath, long bufferPoolSize, boolean enableCompress long maxDBSize, boolean autoCheckpoint, long checkpointThreshold, boolean throwOnWalReplayFailure, boolean enableChecksums) { this.db_path = databasePath; this.buffer_size = bufferPoolSize; + this.max_num_threads = 0; this.enableCompression = enableCompression; this.readOnly = readOnly; this.max_db_size = maxDBSize; @@ -78,8 +98,48 @@ public Database(String databasePath, long bufferPoolSize, boolean enableCompress this.checkpointThreshold = checkpointThreshold; this.throwOnWalReplayFailure = throwOnWalReplayFailure; this.enableChecksums = enableChecksums; - db_ref = Native.lbugDatabaseInit(databasePath, bufferPoolSize, enableCompression, readOnly, maxDBSize, - autoCheckpoint, checkpointThreshold, throwOnWalReplayFailure, enableChecksums); + db_ref = Native.lbugDatabaseInit(databasePath, bufferPoolSize, enableCompression, readOnly, + maxDBSize, autoCheckpoint, checkpointThreshold, throwOnWalReplayFailure, + enableChecksums); + } + + /** + * Creates a database object. + * + * @param databasePath Database path. If the path is empty, or equal to + * `:memory:`, the database will be created in memory. + * @param bufferPoolSize Max size of the buffer pool in bytes. + * @param maxNumThreads The maximum number of threads available to the database. + * @param enableCompression Enable compression in storage. + * @param readOnly Open the database in READ_ONLY mode. + * @param maxDBSize The maximum size of the database in bytes. + * @param autoCheckpoint If true, the database will automatically checkpoint. + * @param checkpointThreshold The threshold of the WAL file size in bytes. + * @param throwOnWalReplayFailure If true, WAL replay failures raise an error. + * @param enableChecksums If true, WAL and storage page checksums are enabled. + * @param enableMultiWrites If true, multiple concurrent write transactions are allowed. + * @param enableDefaultHashIndex If true, node tables create the default primary-key hash index. + */ + public Database(String databasePath, long bufferPoolSize, long maxNumThreads, + boolean enableCompression, boolean readOnly, long maxDBSize, boolean autoCheckpoint, + long checkpointThreshold, boolean throwOnWalReplayFailure, boolean enableChecksums, + boolean enableMultiWrites, boolean enableDefaultHashIndex) { + this.db_path = databasePath; + this.buffer_size = bufferPoolSize; + this.max_num_threads = maxNumThreads; + this.enableCompression = enableCompression; + this.readOnly = readOnly; + this.max_db_size = maxDBSize; + this.autoCheckpoint = autoCheckpoint; + this.checkpointThreshold = checkpointThreshold; + this.throwOnWalReplayFailure = throwOnWalReplayFailure; + this.enableChecksums = enableChecksums; + this.enableMultiWrites = enableMultiWrites; + this.enableDefaultHashIndex = enableDefaultHashIndex; + db_ref = Native.lbugDatabaseInitExtended(databasePath, bufferPoolSize, maxNumThreads, + enableCompression, readOnly, maxDBSize, autoCheckpoint, checkpointThreshold, + throwOnWalReplayFailure, enableChecksums, enableMultiWrites, + enableDefaultHashIndex); } /** diff --git a/src/main/java/com/lbugdb/Native.java b/src/main/java/com/lbugdb/Native.java index 884511c..9502b5a 100644 --- a/src/main/java/com/lbugdb/Native.java +++ b/src/main/java/com/lbugdb/Native.java @@ -75,7 +75,15 @@ else if (os_name_detect.startsWith("windows")) { // Database protected static native long lbugDatabaseInit(String databasePath, long bufferPoolSize, boolean enableCompression, boolean readOnly, long maxDbSize, boolean autoCheckpoint, - long checkpointThreshold,boolean throwOnWalReplayFailure, boolean enableChecksums); + long checkpointThreshold, boolean throwOnWalReplayFailure, boolean enableChecksums); + + // TODO: Collapse this back into lbugDatabaseInit after published main-repo + // Java native artifacts have been rebuilt with the extended system_config fields. + protected static native long lbugDatabaseInitExtended(String databasePath, long bufferPoolSize, + long maxNumThreads, + boolean enableCompression, boolean readOnly, long maxDbSize, boolean autoCheckpoint, + long checkpointThreshold, boolean throwOnWalReplayFailure, boolean enableChecksums, + boolean enableMultiWrites, boolean enableDefaultHashIndex); protected static native void lbugDatabaseDestroy(Database db); @@ -104,11 +112,28 @@ protected static native QueryResult lbugConnectionExecute( protected static native void lbugConnectionSetQueryTimeout( Connection connection, long timeoutInMs); + protected static native QueryResult lbugConnectionCreateArrowTable(Connection connection, + String tableName, long arrowSchemaAddress, long arrowArraysAddress, long numArrays); + + protected static native QueryResult lbugConnectionCreateArrowRelTable(Connection connection, + String tableName, String srcTableName, String dstTableName, long arrowSchemaAddress, + long arrowArraysAddress, long numArrays); + + protected static native QueryResult lbugConnectionCreateArrowRelTableCSR(Connection connection, + String tableName, String srcTableName, String dstTableName, long indicesSchemaAddress, + long indicesArraysAddress, long numIndicesArrays, long indptrSchemaAddress, + long indptrArraysAddress, long numIndptrArrays, String dstColumnName); + + protected static native QueryResult lbugConnectionDropArrowTable(Connection connection, + String tableName); + // PreparedStatement protected static native void lbugPreparedStatementDestroy(PreparedStatement preparedStatement); protected static native boolean lbugPreparedStatementIsSuccess(PreparedStatement preparedStatement); + protected static native boolean lbugPreparedStatementIsReadOnly(PreparedStatement preparedStatement); + protected static native String lbugPreparedStatementGetErrorMessage( PreparedStatement preparedStatement); @@ -142,6 +167,12 @@ protected static native DataType lbugQueryResultGetColumnDataType( protected static native void lbugQueryResultResetIterator(QueryResult queryResult); + protected static native void lbugQueryResultGetArrowSchema(QueryResult queryResult, + long arrowSchemaAddress); + + protected static native void lbugQueryResultGetNextArrowChunk(QueryResult queryResult, + long chunkSize, long arrowArrayAddress); + // FlatTuple protected static native void lbugFlatTupleDestroy(FlatTuple flatTuple); @@ -204,6 +235,10 @@ protected static native long lbugDataTypeCreate( protected static native Value lbugValueGetMapValue(Value value, long index); + protected static native Value lbugValueGetRecursiveRelNodeList(Value value); + + protected static native Value lbugValueGetRecursiveRelRelList(Value value); + protected static native DataType lbugValueGetDataType(Value value); protected static native T lbugValueGetValue(Value value); @@ -244,6 +279,12 @@ protected static native long lbugDataTypeCreate( protected static native long lbugValueGetStructIndex(Value structVal, String fieldName); + protected static native long lbugArrowArrayAllocate(long numArrays); + + protected static native long lbugArrowArrayGetAddress(long arrowArraysAddress, long index); + + protected static native void lbugArrowArrayFree(long arrowArraysAddress); + protected static native String lbugGetVersion(); protected static native long lbugGetStorageVersion(); diff --git a/src/main/java/com/lbugdb/PreparedStatement.java b/src/main/java/com/lbugdb/PreparedStatement.java index 1aab3b9..696074d 100644 --- a/src/main/java/com/lbugdb/PreparedStatement.java +++ b/src/main/java/com/lbugdb/PreparedStatement.java @@ -49,6 +49,17 @@ public boolean isSuccess() { return Native.lbugPreparedStatementIsSuccess(this); } + /** + * Check if the query is read-only. + * + * @return The query is read-only or not. + * @throws RuntimeException If the prepared statement has been destroyed. + */ + public boolean isReadOnly() { + checkNotDestroyed(); + return Native.lbugPreparedStatementIsReadOnly(this); + } + /** * Get the error message if the query is not prepared successfully. * diff --git a/src/main/java/com/lbugdb/QueryResult.java b/src/main/java/com/lbugdb/QueryResult.java index 95bba52..f6da8dc 100644 --- a/src/main/java/com/lbugdb/QueryResult.java +++ b/src/main/java/com/lbugdb/QueryResult.java @@ -1,5 +1,10 @@ package com.ladybugdb; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + /** * QueryResult stores the result of a query execution. */ @@ -194,4 +199,28 @@ public void resetIterator() { checkNotDestroyed(); Native.lbugQueryResultResetIterator(this); } + + /** + * Returns the query result schema as an Arrow VectorSchemaRoot with no loaded rows. + */ + public VectorSchemaRoot getArrowSchema(BufferAllocator allocator) { + checkNotDestroyed(); + try (ArrowSchema schema = ArrowSchema.allocateNew(allocator)) { + Native.lbugQueryResultGetArrowSchema(this, schema.memoryAddress()); + return ArrowUtil.importRoot(allocator, schema, null); + } + } + + /** + * Returns the next chunk of the query result as an Arrow VectorSchemaRoot. + */ + public VectorSchemaRoot getNextArrowChunk(long chunkSize, BufferAllocator allocator) { + checkNotDestroyed(); + try (ArrowSchema schema = ArrowSchema.allocateNew(allocator); + ArrowArray array = ArrowArray.allocateNew(allocator)) { + Native.lbugQueryResultGetArrowSchema(this, schema.memoryAddress()); + Native.lbugQueryResultGetNextArrowChunk(this, chunkSize, array.memoryAddress()); + return ArrowUtil.importRoot(allocator, schema, array); + } + } } diff --git a/src/main/java/com/lbugdb/SystemConfig.java b/src/main/java/com/lbugdb/SystemConfig.java new file mode 100644 index 0000000..3bce7fb --- /dev/null +++ b/src/main/java/com/lbugdb/SystemConfig.java @@ -0,0 +1,18 @@ +package com.ladybugdb; + +/** + * Runtime configuration for creating or opening a Database. + */ +public class SystemConfig { + public long bufferPoolSize = 0; + public long maxNumThreads = 0; + public boolean enableCompression = true; + public boolean readOnly = false; + public long maxDBSize = 0; + public boolean autoCheckpoint = true; + public long checkpointThreshold = -1; + public boolean throwOnWalReplayFailure = true; + public boolean enableChecksums = true; + public boolean enableMultiWrites = false; + public boolean enableDefaultHashIndex = true; +} diff --git a/src/main/java/com/lbugdb/ValueRecursiveRelUtil.java b/src/main/java/com/lbugdb/ValueRecursiveRelUtil.java index 1380a3b..8c4c2a1 100644 --- a/src/main/java/com/lbugdb/ValueRecursiveRelUtil.java +++ b/src/main/java/com/lbugdb/ValueRecursiveRelUtil.java @@ -13,7 +13,13 @@ public class ValueRecursiveRelUtil { * destroyed. */ public static Value getNodeList(Value value) { - return new LbugStruct(value).getValueByIndex(0); + value.checkNotDestroyed(); + try { + return Native.lbugValueGetRecursiveRelNodeList(value); + } catch (UnsatisfiedLinkError e) { + // TODO: Remove after published native artifacts include recursive-rel accessors. + return new LbugStruct(value).getValueByIndex(0); + } } /** @@ -25,6 +31,12 @@ public static Value getNodeList(Value value) { * destroyed. */ public static Value getRelList(Value value) { - return new LbugStruct(value).getValueByIndex(1); + value.checkNotDestroyed(); + try { + return Native.lbugValueGetRecursiveRelRelList(value); + } catch (UnsatisfiedLinkError e) { + // TODO: Remove after published native artifacts include recursive-rel accessors. + return new LbugStruct(value).getValueByIndex(1); + } } } diff --git a/src/test/java/com/lbugdb/TestHelper.java b/src/test/java/com/lbugdb/TestHelper.java index 2ab6f3e..f1a3710 100644 --- a/src/test/java/com/lbugdb/TestHelper.java +++ b/src/test/java/com/lbugdb/TestHelper.java @@ -12,11 +12,34 @@ public class TestHelper { private static Database db; private static Connection conn; private static final Path projectDir = Paths.get(System.getProperty("ladybug.projectDir", ".")); - private static final Path tinysnbDir = projectDir.resolve("../../dataset/tinysnb").normalize(); - private static final Path tinysnbSerialDir = projectDir.resolve("../../dataset/tinysnb-serial").normalize(); + private static final Path datasetDir = findDatasetDir(); + private static final Path tinysnbDir = datasetDir.resolve("tinysnb"); + private static final Path tinysnbSerialDir = findTinysnbSerialDir(datasetDir); private final static String extensions = "csv|parquet|npy|ttl|nq|json|lbug_extension"; private final static Pattern dataFileRegex = Pattern.compile("\"([^\"]+\\.(" + extensions + "))\"", Pattern.CASE_INSENSITIVE); + private static Path findDatasetDir() { + Path[] candidates = { + projectDir.resolve("../../dataset").normalize(), + projectDir.resolve("dataset").normalize(), + projectDir.resolve("../go-ladybug/dataset").normalize(), + }; + for (Path candidate : candidates) { + if (candidate.resolve("tinysnb/schema.cypher").toFile().exists()) { + return candidate; + } + } + return candidates[0]; + } + + private static Path findTinysnbSerialDir(Path datasetDir) { + Path serialDir = datasetDir.resolve("tinysnb-serial"); + if (serialDir.resolve("vMovies.csv").toFile().exists()) { + return serialDir; + } + return datasetDir.resolve("tinysnb"); + } + public static Database getDatabase() { return db; } @@ -39,6 +62,7 @@ public static void loadData(String dbPath) throws IOException { continue; } try (QueryResult result = conn.query(line)) { + assertQuerySucceeded(result); } } reader.close(); @@ -53,9 +77,11 @@ public static void loadData(String dbPath) throws IOException { // handle multiple data files in one statement Matcher matcher = dataFileRegex.matcher(line); - String statement = matcher.replaceAll("\"" + tinysnbDir.resolve("$1").normalize() + "\""); + String statement = matcher.replaceAll(match -> "\"" + resolveDataFile(match.group(1)) + "\""); - try (QueryResult result = conn.query(statement)) {} + try (QueryResult result = conn.query(statement)) { + assertQuerySucceeded(result); + } } reader.close(); @@ -65,6 +91,21 @@ public static void loadData(String dbPath) throws IOException { try (QueryResult result = conn.query("copy moviesSerial from \"" + tinysnbSerialDir.resolve("vMovies.csv").normalize() + "\"")) { + assertQuerySucceeded(result); + } + } + + private static Path resolveDataFile(String fileName) { + Path path = Paths.get(fileName); + if (path.getNameCount() > 0 && "dataset".equals(path.getName(0).toString())) { + return datasetDir.getParent().resolve(path).normalize(); + } + return tinysnbDir.resolve(path).normalize(); + } + + private static void assertQuerySucceeded(QueryResult result) { + if (!result.isSuccess()) { + throw new RuntimeException(result.getErrorMessage()); } } }