Skip to content

Conversation

@x-tong
Copy link

@x-tong x-tong commented Jan 19, 2026

Summary

  • Implement Flink RowData to Arrow format conversion for Auron-Flink integration
  • Add FlinkArrowUtils, FlinkArrowWriter, FlinkArrowFieldWriter, and FlinkArrowFFIExporter
  • Support all common Flink types including primitives, temporal, and complex types
  • Add comprehensive unit tests

Test plan

  • Unit tests for FlinkArrowUtils type conversion
  • Unit tests for FlinkArrowWriter data writing
  • Unit tests for FlinkArrowFFIExporter (skipped when native libs unavailable)
  • Build passes with ./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -DskipBuildNative
  • Code formatted with ./dev/reformat

Closes #1850

Implement Flink RowData to Arrow format conversion for Auron-Flink integration.

Key components:
- FlinkArrowUtils: Type conversion between Flink LogicalType and Arrow types
- FlinkArrowWriter: Converts Flink RowData to Arrow VectorSchemaRoot
- FlinkArrowFieldWriter: Field-level writers for all supported types
- FlinkArrowFFIExporter: Exports Arrow data via FFI for native consumption

Supported types:
- Primitive: Boolean, TinyInt, SmallInt, Int, BigInt, Float, Double
- String/Binary: VarChar, Char, VarBinary, Binary
- Temporal: Date, Time, Timestamp, LocalZonedTimestamp
- Complex: Array, Map, Row/Struct
- Decimal (128-bit)
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements Flink RowData to Arrow format conversion for the Auron-Flink integration. It introduces new utilities and writers to convert Flink's table data structures to Apache Arrow format, enabling efficient data exchange between Flink and native code via the Arrow C Data Interface.

Changes:

  • Added FlinkArrowUtils for type conversion between Flink LogicalType and Arrow types
  • Implemented FlinkArrowWriter and FlinkArrowFieldWriter for converting RowData to Arrow vectors
  • Added FlinkArrowFFIExporter for asynchronous FFI-based data export with producer-consumer pattern
  • Included comprehensive unit tests for all conversion and export functionality

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
FlinkArrowUtils.java Provides utilities for converting Flink types to Arrow types and creating Arrow schemas
FlinkArrowFieldWriter.java Implements field writers for all supported Flink types with recursive handling for complex types
FlinkArrowWriter.java Main writer class that orchestrates conversion of RowData to VectorSchemaRoot
FlinkArrowFFIExporter.java Asynchronous exporter using double-queue pattern for safe FFI data export
FlinkArrowUtilsTest.java Tests type conversion logic for all supported types
FlinkArrowWriterTest.java Tests data writing for basic, complex, and edge cases
FlinkArrowFFIExporterTest.java Tests FFI export functionality with native library availability checks
pom.xml Adds required Arrow and Flink dependencies
.gitignore Adds IDE/LSP configuration patterns

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +57 to +61
public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);

static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> ROOT_ALLOCATOR.close()));
}
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a global RootAllocator with Long.MAX_VALUE limit could lead to excessive memory consumption. The shutdown hook at line 60 may not execute in all termination scenarios (e.g., kill -9, OutOfMemoryError). Consider documenting the memory implications and whether users should be aware of memory limits, or provide a configuration option for the allocator limit.

Copilot uses AI. Check for mistakes.

// Create timestamp with microsecond precision
long millis = 1705622400000L; // 2024-01-19 00:00:00.000
int nanos = 123456; // 123.456 microseconds
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds".

Suggested change
int nanos = 123456; // 123.456 microseconds
int nanos = 123456; // 123456 nanoseconds (123.456 microseconds)

Copilot uses AI. Check for mistakes.
Comment on lines +240 to +250
try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator);
ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) {
// Re-export schema for each batch since ArrowSchema is consumed
exporter.exportSchema(batchSchema.memoryAddress());

if (!exporter.exportNextBatch(arrowArray.memoryAddress())) {
break;
}

try (VectorSchemaRoot root =
Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) {
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment at line 242 states "Re-export schema for each batch since ArrowSchema is consumed". However, calling exportSchema() multiple times is not necessary - the schema only needs to be exported once at the beginning. The ArrowSchema used for importVectorSchemaRoot can be the same arrowSchema from line 234. Re-exporting the schema for each batch is inefficient and could potentially cause issues. The test should be updated to reuse the single schema export.

Suggested change
try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator);
ArrowSchema batchSchema = ArrowSchema.allocateNew(testAllocator)) {
// Re-export schema for each batch since ArrowSchema is consumed
exporter.exportSchema(batchSchema.memoryAddress());
if (!exporter.exportNextBatch(arrowArray.memoryAddress())) {
break;
}
try (VectorSchemaRoot root =
Data.importVectorSchemaRoot(testAllocator, arrowArray, batchSchema, null)) {
try (ArrowArray arrowArray = ArrowArray.allocateNew(testAllocator)) {
if (!exporter.exportNextBatch(arrowArray.memoryAddress())) {
break;
}
try (VectorSchemaRoot root =
Data.importVectorSchemaRoot(testAllocator, arrowArray, arrowSchema, null)) {

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +219
if (closed) {
return false;
}

// Wait for producer to provide next batch
Object state;
try {
state = outputQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition between checking the closed flag at line 208 and taking from outputQueue at line 215. If close() is called between these two operations, the thread could block indefinitely on outputQueue.take(). Consider checking the closed flag again after the InterruptedException catch, or use a timeout-based poll operation instead of blocking take.

Copilot uses AI. Check for mistakes.

// Create timestamp with microsecond precision
long millis = 1705622400000L; // 2024-01-19 00:00:00.000
int nanos = 123456; // 123.456 microseconds
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "123.456 microseconds" but the value 123456 represents nanoseconds, not microseconds. In the context of TimestampData.fromEpochMillis(), the second parameter is the nanosecond-of-millisecond value (0-999999). The comment should be corrected to "123456 nanoseconds" or "123.456 microseconds worth of nanoseconds".

Suggested change
int nanos = 123456; // 123.456 microseconds
int nanos = 123456; // 123456 nanoseconds (123.456 microseconds)

Copilot uses AI. Check for mistakes.
try {
processingQueue.put(new Object());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an InterruptedException occurs while trying to signal the producer at line 243-246, the method swallows the exception after setting the interrupt flag but still returns true, indicating success. This could lead to the producer thread remaining blocked indefinitely on processingQueue.take(). Consider returning false or handling this more explicitly to avoid potential deadlock scenarios.

Suggested change
Thread.currentThread().interrupt();
Thread.currentThread().interrupt();
// Failed to signal producer; indicate failure to avoid potential deadlock
return false;

Copilot uses AI. Check for mistakes.
Comment on lines +135 to +147

// Fill the batch with data
while (!closed
&& rowIterator.hasNext()
&& allocator.getAllocatedMemory() < maxBatchMemorySize
&& writer.currentCount() < maxBatchNumRows) {
writer.write(rowIterator.next());
}
writer.finish();

// Put batch in output queue for consumer
outputQueue.put(new NextBatch(root, allocator));

Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs after a batch is created (lines 131-134) but before it's successfully added to the queue or if the thread is interrupted during the fill operation (lines 137-143), the VectorSchemaRoot and BufferAllocator resources may leak. Consider wrapping the batch creation and queue insertion in a try-catch block that ensures cleanup of the current batch resources on failure.

Suggested change
// Fill the batch with data
while (!closed
&& rowIterator.hasNext()
&& allocator.getAllocatedMemory() < maxBatchMemorySize
&& writer.currentCount() < maxBatchNumRows) {
writer.write(rowIterator.next());
}
writer.finish();
// Put batch in output queue for consumer
outputQueue.put(new NextBatch(root, allocator));
boolean batchEnqueued = false;
try {
// Fill the batch with data
while (!closed
&& rowIterator.hasNext()
&& allocator.getAllocatedMemory() < maxBatchMemorySize
&& writer.currentCount() < maxBatchNumRows) {
writer.write(rowIterator.next());
}
writer.finish();
// Put batch in output queue for consumer
outputQueue.put(new NextBatch(root, allocator));
batchEnqueued = true;
} finally {
// If the batch was not successfully enqueued, we must release resources here
if (!batchEnqueued) {
try {
root.close();
} finally {
allocator.close();
}
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +68 to +72
public static FlinkArrowWriter create(RowType rowType) {
Schema arrowSchema = FlinkArrowUtils.toArrowSchema(rowType);
VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, FlinkArrowUtils.ROOT_ALLOCATOR);
return create(root, rowType);
}
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FlinkArrowWriter class lacks a close() or cleanup method to manage the lifecycle of the VectorSchemaRoot it creates in the create(RowType) factory method. Users who call create(RowType) have no way to properly close the created VectorSchemaRoot, potentially leading to resource leaks. Consider either making FlinkArrowWriter implement AutoCloseable or documenting that users are responsible for closing the VectorSchemaRoot obtained via getRoot().

Copilot uses AI. Check for mistakes.
Comment on lines +265 to +283
@Override
public void close() {
if (closed) {
return;
}
closed = true;
producerThread.interrupt();
cleanupPreviousBatch();

// Drain any remaining batches in the queue to prevent resource leaks
Object state;
while ((state = outputQueue.poll()) != null) {
if (state instanceof NextBatch) {
NextBatch batch = (NextBatch) state;
batch.root.close();
batch.allocator.close();
}
}
}
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the close method, after draining the queue, consider also draining the processingQueue. If the producer thread is blocked on processingQueue.take() at line 150, the thread interrupt alone may not be sufficient to guarantee timely termination. Adding processingQueue.clear() and/or placing a sentinel value could help ensure the producer thread can exit cleanly.

Copilot uses AI. Check for mistakes.
Comment on lines +170 to +174
try {
outputQueue.put(new Finished(e));
} catch (InterruptedException ignored) {
// Ignore
}
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The uncaught exception handler at lines 168-175 attempts to put into outputQueue after clearing it. However, if the queue is full or if another thread is waiting on the queue, this could cause issues. Additionally, this handler may be redundant since the producer thread's try-catch block already handles exceptions. Consider whether this handler is necessary or if it should be simplified.

Suggested change
try {
outputQueue.put(new Finished(e));
} catch (InterruptedException ignored) {
// Ignore
}
// Use non-blocking offer here to avoid potential deadlock in an exceptional state.
outputQueue.offer(new Finished(e));

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce Flink RowData to Arrow

1 participant