Skip to content

Commit bf08166

Browse files
feat: Byte-sized cap instead of hard one for ArrowWriter (#443)
* [TASK-431] Byte-sized cap instead of hard one for ArrowWriter * Address feedback * rebase fix * Remove unused as_builder_mut * Remove unused finish_cloned --------- Co-authored-by: Keith Lee <leekei@apache.org>
1 parent 4ced419 commit bf08166

File tree

8 files changed

+614
-45
lines changed

8 files changed

+614
-45
lines changed

crates/fluss/src/client/table/log_fetch_buffer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,8 @@ mod tests {
831831
use super::*;
832832
use crate::client::WriteRecord;
833833
use crate::compression::{
834-
ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
834+
ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType,
835+
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
835836
};
836837
use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath};
837838
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema};
@@ -908,6 +909,8 @@ mod tests {
908909
compression_type: ArrowCompressionType::None,
909910
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
910911
},
912+
usize::MAX,
913+
Arc::new(ArrowCompressionRatioEstimator::default()),
911914
)?;
912915

913916
let mut row = GenericRow::new(2);

crates/fluss/src/client/table/scanner.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,8 @@ mod tests {
17001700
use crate::client::WriteRecord;
17011701
use crate::client::metadata::Metadata;
17021702
use crate::compression::{
1703-
ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1703+
ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType,
1704+
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
17041705
};
17051706
use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath};
17061707
use crate::record::MemoryLogRecordsArrowBuilder;
@@ -1717,6 +1718,8 @@ mod tests {
17171718
compression_type: ArrowCompressionType::None,
17181719
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
17191720
},
1721+
usize::MAX,
1722+
Arc::new(ArrowCompressionRatioEstimator::default()),
17201723
)?;
17211724
let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
17221725
let row = GenericRow {

crates/fluss/src/client/write/accumulator.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::client::write::batch::WriteBatch::{ArrowLog, Kv};
2121
use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch};
2222
use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord};
2323
use crate::cluster::{BucketLocation, Cluster, ServerNode};
24+
use crate::compression::ArrowCompressionRatioEstimator;
2425
use crate::config::Config;
2526
use crate::error::{Error, Result};
2627
use crate::metadata::{PhysicalTablePath, TableBucket};
@@ -235,6 +236,7 @@ impl RecordAccumulator {
235236
dq: &mut VecDeque<WriteBatch>,
236237
permit: MemoryPermit,
237238
alloc_size: usize,
239+
compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
238240
) -> Result<RecordAppendResult> {
239241
let physical_table_path = &record.physical_table_path;
240242
let table_path = physical_table_path.get_table_path();
@@ -253,6 +255,8 @@ impl RecordAccumulator {
253255
row_type,
254256
current_time_ms(),
255257
matches!(&record.record, Record::Log(LogWriteRecord::RecordBatch(_))),
258+
alloc_size,
259+
compression_ratio_estimator,
256260
)?),
257261
Record::Kv(kv_record) => Kv(KvWriteBatch::new(
258262
self.batch_id.fetch_add(1, Ordering::Relaxed),
@@ -303,22 +307,29 @@ impl RecordAccumulator {
303307
None
304308
};
305309

306-
let dq = {
307-
let mut binding = self
308-
.write_batches
309-
.entry(Arc::clone(physical_table_path))
310-
.or_insert_with(|| BucketAndWriteBatches {
311-
table_id: table_info.table_id,
312-
is_partitioned_table,
313-
partition_id,
314-
batches: Default::default(),
315-
});
310+
let (dq, compression_ratio_estimator) = {
311+
let mut binding =
312+
self.write_batches
313+
.entry(Arc::clone(physical_table_path))
314+
.or_insert_with(|| BucketAndWriteBatches {
315+
table_id: table_info.table_id,
316+
is_partitioned_table,
317+
partition_id,
318+
batches: Default::default(),
319+
compression_ratio_estimator: Arc::new(
320+
ArrowCompressionRatioEstimator::default(),
321+
),
322+
});
316323
let bucket_and_batches = binding.value_mut();
317-
bucket_and_batches
324+
let dq = bucket_and_batches
318325
.batches
319326
.entry(bucket_id)
320327
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
321-
.clone()
328+
.clone();
329+
(
330+
dq,
331+
Arc::clone(&bucket_and_batches.compression_ratio_estimator),
332+
)
322333
};
323334

324335
let mut dq_guard = dq.lock();
@@ -336,6 +347,11 @@ impl RecordAccumulator {
336347
// producer holds dq + blocks on memory, while sender needs dq to drain.
337348
drop(dq_guard);
338349

350+
// TODO: Implement DynamicWriteBatchSizeEstimator matching Java's
351+
// client.writer.dynamic-batch-size-enabled. Adjusts the batch size target
352+
// per table based on observed actual batch sizes (grow 10% when >80% full,
353+
// shrink 5% when <50% full, clamped to [2*pageSize, maxBatchSize]).
354+
// This would improve memory limiter utilization for tables with small rows.
339355
let batch_size = self.config.writer_batch_size as usize;
340356
let record_size = record.estimated_record_size();
341357
let alloc_size = batch_size.max(record_size);
@@ -348,7 +364,14 @@ impl RecordAccumulator {
348364
return Ok(append_result); // permit drops here, memory released
349365
}
350366

351-
self.append_new_batch(cluster, record, &mut dq_guard, permit, alloc_size)
367+
self.append_new_batch(
368+
cluster,
369+
record,
370+
&mut dq_guard,
371+
permit,
372+
alloc_size,
373+
compression_ratio_estimator,
374+
)
352375
}
353376

354377
pub fn ready(&self, cluster: &Arc<Cluster>) -> Result<ReadyCheckResult> {
@@ -767,6 +790,7 @@ impl RecordAccumulator {
767790
is_partitioned_table,
768791
partition_id,
769792
batches: Default::default(),
793+
compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()),
770794
});
771795
let bucket_and_batches = binding.value_mut();
772796
bucket_and_batches
@@ -912,6 +936,8 @@ struct BucketAndWriteBatches {
912936
is_partitioned_table: bool,
913937
partition_id: Option<PartitionId>,
914938
batches: HashMap<BucketId, Arc<Mutex<VecDeque<WriteBatch>>>>,
939+
/// Compression ratio estimator shared across Arrow log batches for this table.
940+
compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
915941
}
916942

917943
pub struct RecordAppendResult {

0 commit comments

Comments
 (0)