-
Notifications
You must be signed in to change notification settings - Fork 204
[AURON #1850] Introduce Flink RowData to Arrow conversion #1930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Implement Flink RowData to Arrow format conversion for Auron-Flink integration. Key components: - FlinkArrowUtils: Type conversion between Flink LogicalType and Arrow types - FlinkArrowWriter: Converts Flink RowData to Arrow VectorSchemaRoot - FlinkArrowFieldWriter: Field-level writers for all supported types - FlinkArrowFFIExporter: Exports Arrow data via FFI for native consumption Supported types: - Primitive: Boolean, TinyInt, SmallInt, Int, BigInt, Float, Double - String/Binary: VarChar, Char, VarBinary, Binary - Temporal: Date, Time, Timestamp, LocalZonedTimestamp - Complex: Array, Map, Row/Struct - Decimal (128-bit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements Flink RowData to Arrow format conversion for the Auron-Flink integration. It introduces new utilities and writers to convert Flink's table data structures to Apache Arrow format, enabling efficient data exchange between Flink and native code via the Arrow C Data Interface.
Changes:
- Added FlinkArrowUtils for type conversion between Flink LogicalType and Arrow types
- Implemented FlinkArrowWriter and FlinkArrowFieldWriter for converting RowData to Arrow vectors
- Added FlinkArrowFFIExporter for asynchronous FFI-based data export with producer-consumer pattern
- Included comprehensive unit tests for all conversion and export functionality
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| FlinkArrowUtils.java | Provides utilities for converting Flink types to Arrow types and creating Arrow schemas |
| FlinkArrowFieldWriter.java | Implements field writers for all supported Flink types with recursive handling for complex types |
| FlinkArrowWriter.java | Main writer class that orchestrates conversion of RowData to VectorSchemaRoot |
| FlinkArrowFFIExporter.java | Asynchronous exporter using double-queue pattern for safe FFI data export |
| FlinkArrowUtilsTest.java | Tests type conversion logic for all supported types |
| FlinkArrowWriterTest.java | Tests data writing for basic, complex, and edge cases |
| FlinkArrowFFIExporterTest.java | Tests FFI export functionality with native library availability checks |
| pom.xml | Adds required Arrow and Flink dependencies |
| .gitignore | Adds IDE/LSP configuration patterns |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); | ||
|
|
||
| static { | ||
| Runtime.getRuntime().addShutdownHook(new Thread(() -> ROOT_ALLOCATOR.close())); | ||
| } |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a global RootAllocator with Long.MAX_VALUE limit could lead to excessive memory consumption. The shutdown hook at line 60 may not execute in all termination scenarios (e.g., kill -9, OutOfMemoryError). Consider documenting the memory implications and whether users should be aware of memory limits, or provide a configuration option for the allocator limit.
|
|
||
| // Create timestamp with microsecond precision | ||
| long millis = 1705622400000L; // 2024-01-19 00:00:00.000 | ||
| int nanos = 123456; // 123.456 microseconds |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds".
| int nanos = 123456; // 123.456 microseconds | |
| int nanos = 123456; // 123456 nanoseconds (123.456 microseconds) |
| try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); | ||
| ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) { | ||
| // Re-export schema for each batch since ArrowSchema is consumed | ||
| exporter.exportSchema(batchSchema.memoryAddress()); | ||
|
|
||
| if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { | ||
| break; | ||
| } | ||
|
|
||
| try (VectorSchemaRoot root = | ||
| Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) { |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment at line 242 states "Re-export schema for each batch since ArrowSchema is consumed". However, calling exportSchema() multiple times is not necessary - the schema only needs to be exported once at the beginning. The ArrowSchema used for importVectorSchemaRoot can be the same arrowSchema from line 234. Re-exporting the schema for each batch is inefficient and could potentially cause issues. The test should be updated to reuse the single schema export.
| try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator); | |
| ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) { | |
| // Re-export schema for each batch since ArrowSchema is consumed | |
| exporter.exportSchema(batchSchema.memoryAddress()); | |
| if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { | |
| break; | |
| } | |
| try (VectorSchemaRoot root = | |
| Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) { | |
| try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator)) { | |
| if (!exporter.exportNextBatch(arrowArray.memoryAddress())) { | |
| break; | |
| } | |
| try (VectorSchemaRoot root = | |
| Data.importVectorSchemaRoot(testAllocator, arrowArray, arrowSchema, null)) { |
| if (closed) { | ||
| return false; | ||
| } | ||
|
|
||
| // Wait for producer to provide next batch | ||
| Object state; | ||
| try { | ||
| state = outputQueue.take(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| return false; | ||
| } |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition between checking the closed flag at line 208 and taking from outputQueue at line 215. If close() is called between these two operations, the thread could block indefinitely on outputQueue.take(). Consider checking the closed flag again after the InterruptedException catch, or use a timeout-based poll operation instead of blocking take.
|
|
||
| // Create timestamp with microsecond precision | ||
| long millis = 1705622400000L; // 2024-01-19 00:00:00.000 | ||
| int nanos = 123456; // 123.456 microseconds |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds".
| int nanos = 123456; // 123.456 microseconds | |
| int nanos = 123456; // 123456 nanoseconds (123.456 microseconds) |
| try { | ||
| processingQueue.put(new Object()); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an InterruptedException occurs while trying to signal the producer at line 243-246, the method swallows the exception after setting the interrupt flag but still returns true, indicating success. This could lead to the producer thread remaining blocked indefinitely on processingQueue.take(). Consider returning false or handling this more explicitly to avoid potential deadlock scenarios.
| Thread.currentThread().interrupt(); | |
| Thread.currentThread().interrupt(); | |
| // Failed to signal producer; indicate failure to avoid potential deadlock | |
| return false; |
|
|
||
| // Fill the batch with data | ||
| while (!closed | ||
| && rowIterator.hasNext() | ||
| && allocator.getAllocatedMemory() < maxBatchMemorySize | ||
| && writer.currentCount() < maxBatchNumRows) { | ||
| writer.write(rowIterator.next()); | ||
| } | ||
| writer.finish(); | ||
|
|
||
| // Put batch in output queue for consumer | ||
| outputQueue.put(new NextBatch(root, allocator)); | ||
|
|
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an exception occurs after a batch is created (lines 131-134) but before it's successfully added to the queue or if the thread is interrupted during the fill operation (lines 137-143), the VectorSchemaRoot and BufferAllocator resources may leak. Consider wrapping the batch creation and queue insertion in a try-catch block that ensures cleanup of the current batch resources on failure.
| // Fill the batch with data | |
| while (!closed | |
| && rowIterator.hasNext() | |
| && allocator.getAllocatedMemory() < maxBatchMemorySize | |
| && writer.currentCount() < maxBatchNumRows) { | |
| writer.write(rowIterator.next()); | |
| } | |
| writer.finish(); | |
| // Put batch in output queue for consumer | |
| outputQueue.put(new NextBatch(root, allocator)); | |
| boolean batchEnqueued = false; | |
| try { | |
| // Fill the batch with data | |
| while (!closed | |
| && rowIterator.hasNext() | |
| && allocator.getAllocatedMemory() < maxBatchMemorySize | |
| && writer.currentCount() < maxBatchNumRows) { | |
| writer.write(rowIterator.next()); | |
| } | |
| writer.finish(); | |
| // Put batch in output queue for consumer | |
| outputQueue.put(new NextBatch(root, allocator)); | |
| batchEnqueued = true; | |
| } finally { | |
| // If the batch was not successfully enqueued, we must release resources here | |
| if (!batchEnqueued) { | |
| try { | |
| root.close(); | |
| } finally { | |
| allocator.close(); | |
| } | |
| } | |
| } |
| public static FlinkArrowWriter create(RowType rowType) { | ||
| Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType); | ||
| VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, FlinkArrowUtils.ROOT_ALLOCATOR); | ||
| return create(root, rowType); | ||
| } |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FlinkArrowWriter class lacks a close() or cleanup method to manage the lifecycle of the VectorSchemaRoot it creates in the create(RowType) factory method. Users who call create(RowType) have no way to properly close the created VectorSchemaRoot, potentially leading to resource leaks. Consider either making FlinkArrowWriter implement AutoCloseable or documenting that users are responsible for closing the VectorSchemaRoot obtained via getRoot().
| @Override | ||
| public void close() { | ||
| if (closed) { | ||
| return; | ||
| } | ||
| closed = true; | ||
| producerThread.interrupt(); | ||
| cleanupPreviousBatch(); | ||
|
|
||
| // Drain any remaining batches in the queue to prevent resource leaks | ||
| Object state; | ||
| while ((state = outputQueue.poll()) != null) { | ||
| if (state instanceof NextBatch) { | ||
| NextBatch batch = (NextBatch) state; | ||
| batch.root.close(); | ||
| batch.allocator.close(); | ||
| } | ||
| } | ||
| } |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the close method, after draining the queue, consider also draining the processingQueue. If the producer thread is blocked on processingQueue.take() at line 150, the thread interrupt alone may not be sufficient to guarantee timely termination. Adding processingQueue.clear() and/or placing a sentinel value could help ensure the producer thread can exit cleanly.
| try { | ||
| outputQueue.put(new Finished(e)); | ||
| } catch (InterruptedException ignored) { | ||
| // Ignore | ||
| } |
Copilot
AI
Jan 20, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The uncaught exception handler at lines 168-175 attempts to put into outputQueue after clearing it. However, if the queue is full or if another thread is waiting on the queue, this could cause issues. Additionally, this handler may be redundant since the producer thread's try-catch block already handles exceptions. Consider whether this handler is necessary or if it should be simplified.
| try { | |
| outputQueue.put(new Finished(e)); | |
| } catch (InterruptedException ignored) { | |
| // Ignore | |
| } | |
| // Use non-blocking offer here to avoid potential deadlock in an exceptional state. | |
| outputQueue.offer(new Finished(e)); |
Summary
Test plan
./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -DskipBuildNative./dev/reformatCloses #1850