Skip to content

Commit e65212b

Browse files
committed
[client] Fix corruption in zero-copy lazy parse ByteBuf
1 parent 17f5400 commit e65212b

3 files changed

Lines changed: 59 additions & 40 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ private static LogRecordReadContext createArrowReadContext(
116116
SchemaGetter schemaGetter) {
117117
// TODO: use a more reasonable memory limit
118118
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
119-
FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields);
119+
FieldGetter[] fieldGetters =
120+
buildProjectedFieldGetters(dataRowType, selectedFields, LogFormat.ARROW);
120121
return new LogRecordReadContext(
121122
LogFormat.ARROW,
122123
dataRowType,
@@ -191,7 +192,8 @@ public static LogRecordReadContext createCompactedRowReadContext(
191192
*/
192193
public static LogRecordReadContext createIndexedReadContext(
193194
RowType rowType, int schemaId, int[] selectedFields, SchemaGetter schemaGetter) {
194-
FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields);
195+
FieldGetter[] fieldGetters =
196+
buildProjectedFieldGetters(rowType, selectedFields, LogFormat.INDEXED);
195197
// for INDEXED log format, the projection is NEVER push downed to the server side
196198
return new LogRecordReadContext(
197199
LogFormat.INDEXED, rowType, schemaId, null, fieldGetters, false, schemaGetter);
@@ -222,7 +224,8 @@ public static LogRecordReadContext createCompactedRowReadContext(
222224
int schemaId,
223225
int[] selectedFields,
224226
@Nullable SchemaGetter schemaGetter) {
225-
FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType, selectedFields);
227+
FieldGetter[] fieldGetters =
228+
buildProjectedFieldGetters(rowType, selectedFields, LogFormat.COMPACTED);
226229
// for COMPACTED log format, the projection is NEVER push downed to the server side
227230
return new LogRecordReadContext(
228231
LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters, false, schemaGetter);
@@ -325,14 +328,17 @@ private boolean isSameRowType(int schemaId) {
325328
return targetSchemaId == schemaId || isProjectionPushDowned();
326329
}
327330

328-
private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] selectedFields) {
331+
private static FieldGetter[] buildProjectedFieldGetters(
332+
RowType rowType, int[] selectedFields, LogFormat logFormat) {
333+
// ARROW already copies strings during deserialization;
334+
// other formats reference pooled network buffers and need explicit copying.
335+
boolean copyStrings = logFormat != LogFormat.ARROW;
329336
List<DataType> dataTypeList = rowType.getChildren();
330337
FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length];
331338
for (int i = 0; i < fieldGetters.length; i++) {
332-
// build deep field getter to support nested types
333339
fieldGetters[i] =
334340
InternalRow.createDeepFieldGetter(
335-
dataTypeList.get(selectedFields[i]), selectedFields[i]);
341+
dataTypeList.get(selectedFields[i]), selectedFields[i], copyStrings);
336342
}
337343
return fieldGetters;
338344
}

fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.fluss.row;
2020

2121
import org.apache.fluss.annotation.PublicEvolving;
22-
import org.apache.fluss.row.columnar.ColumnarRow;
23-
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
2422
import org.apache.fluss.types.ArrayType;
2523
import org.apache.fluss.types.DataType;
2624
import org.apache.fluss.types.MapType;
@@ -161,23 +159,26 @@ static ElementGetter createElementGetter(DataType fieldType) {
161159
};
162160
}
163161

164-
/**
165-
* Creates a deep accessor for getting elements in an internal array data structure at the given
166-
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
167-
* array/map/row types.
168-
*
169-
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
170-
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
171-
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
172-
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
173-
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
174-
*/
175-
static ElementGetter createDeepElementGetter(DataType fieldType) {
162+
/** Same as InternalRow.createDeepFieldGetter but for array elements. */
163+
static ElementGetter createDeepElementGetter(DataType fieldType, boolean copyStrings) {
176164
final ElementGetter elementGetter;
177165
switch (fieldType.getTypeRoot()) {
166+
case CHAR:
167+
final int charLen = getLength(fieldType);
168+
elementGetter =
169+
copyStrings
170+
? (array, pos) -> array.getChar(pos, charLen).copy()
171+
: (array, pos) -> array.getChar(pos, charLen);
172+
break;
173+
case STRING:
174+
elementGetter =
175+
copyStrings
176+
? (array, pos) -> array.getString(pos).copy()
177+
: InternalArray::getString;
178+
break;
178179
case ARRAY:
179180
DataType nestedType = ((ArrayType) fieldType).getElementType();
180-
ElementGetter nestedGetter = createDeepElementGetter(nestedType);
181+
ElementGetter nestedGetter = createDeepElementGetter(nestedType, copyStrings);
181182
elementGetter =
182183
(array, pos) -> {
183184
InternalArray inner = array.getArray(pos);
@@ -191,8 +192,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) {
191192
case MAP:
192193
DataType keyType = ((MapType) fieldType).getKeyType();
193194
DataType valueType = ((MapType) fieldType).getValueType();
194-
ElementGetter keyGetter = createDeepElementGetter(keyType);
195-
ElementGetter valueGetter = createDeepElementGetter(valueType);
195+
ElementGetter keyGetter = createDeepElementGetter(keyType, copyStrings);
196+
ElementGetter valueGetter = createDeepElementGetter(valueType, copyStrings);
196197
elementGetter =
197198
(array, pos) -> {
198199
InternalMap inner = array.getMap(pos);
@@ -212,7 +213,8 @@ static ElementGetter createDeepElementGetter(DataType fieldType) {
212213
int numFields = rowType.getFieldCount();
213214
InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[numFields];
214215
for (int i = 0; i < numFields; i++) {
215-
fieldGetters[i] = InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i);
216+
fieldGetters[i] =
217+
InternalRow.createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings);
216218
}
217219
elementGetter =
218220
(array, pos) -> {

fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.record.ChangeType;
22-
import org.apache.fluss.row.columnar.ColumnarRow;
23-
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
2422
import org.apache.fluss.types.ArrayType;
2523
import org.apache.fluss.types.DataType;
2624
import org.apache.fluss.types.MapType;
@@ -32,7 +30,6 @@
3230
import java.util.HashMap;
3331
import java.util.Map;
3432

35-
import static org.apache.fluss.row.InternalArray.createDeepElementGetter;
3633
import static org.apache.fluss.types.DataTypeChecks.getLength;
3734
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
3835
import static org.apache.fluss.types.DataTypeChecks.getScale;
@@ -239,22 +236,35 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
239236
}
240237

241238
/**
242-
* Creates a deep accessor for getting elements in an internal array data structure at the given
243-
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
244-
* array/map/row types.
239+
* Creates a deep accessor for getting elements in an internal row data structure at the given
240+
* position. Returns new objects (GenericArray/GenericMap/GenericRow) for nested array/map/row
241+
* types to prevent use-after-free when the underlying buffer is released.
245242
*
246-
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
247-
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
248-
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
249-
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
250-
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
243+
* <p>ARROW already deep copies strings in VectorizedColumnBatch, so copyStrings should be
244+
* false. INDEXED and COMPACTED rows reference pooled network buffers, so copyStrings should be
245+
* true to copy STRING/CHAR via BinaryString.copy().
251246
*/
252-
static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
247+
static FieldGetter createDeepFieldGetter(
248+
DataType fieldType, int fieldPos, boolean copyStrings) {
253249
final FieldGetter fieldGetter;
254250
switch (fieldType.getTypeRoot()) {
251+
case CHAR:
252+
final int charLen = getLength(fieldType);
253+
fieldGetter =
254+
copyStrings
255+
? row -> row.getChar(fieldPos, charLen).copy()
256+
: row -> row.getChar(fieldPos, charLen);
257+
break;
258+
case STRING:
259+
fieldGetter =
260+
copyStrings
261+
? row -> row.getString(fieldPos).copy()
262+
: row -> row.getString(fieldPos);
263+
break;
255264
case ARRAY:
256265
DataType elementType = ((ArrayType) fieldType).getElementType();
257-
InternalArray.ElementGetter nestedGetter = createDeepElementGetter(elementType);
266+
InternalArray.ElementGetter nestedGetter =
267+
InternalArray.createDeepElementGetter(elementType, copyStrings);
258268
fieldGetter =
259269
row -> {
260270
InternalArray array = row.getArray(fieldPos);
@@ -268,9 +278,9 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
268278
case MAP:
269279
MapType mapType = (MapType) fieldType;
270280
InternalArray.ElementGetter keyGetter =
271-
createDeepElementGetter(mapType.getKeyType());
281+
InternalArray.createDeepElementGetter(mapType.getKeyType(), copyStrings);
272282
InternalArray.ElementGetter valueGetter =
273-
createDeepElementGetter(mapType.getValueType());
283+
InternalArray.createDeepElementGetter(mapType.getValueType(), copyStrings);
274284
fieldGetter =
275285
row -> {
276286
InternalMap map = row.getMap(fieldPos);
@@ -288,7 +298,8 @@ static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
288298
int numFields = rowType.getFieldCount();
289299
FieldGetter[] nestedFieldGetters = new FieldGetter[numFields];
290300
for (int i = 0; i < numFields; i++) {
291-
nestedFieldGetters[i] = createDeepFieldGetter(rowType.getTypeAt(i), i);
301+
nestedFieldGetters[i] =
302+
createDeepFieldGetter(rowType.getTypeAt(i), i, copyStrings);
292303
}
293304
fieldGetter =
294305
row -> {

0 commit comments

Comments
 (0)