[GLUTEN-12298][FLINK] Support nexmark source multi-parallelism via ParallelSplit#12304
Conversation
|
Is there any test to cover this case? |
|
you need to update github workflow config to refer the right velox4j branch/commit |
| subtaskSplits.add( | ||
| new NexmarkConnectorSplit( | ||
| "connector-nexmark", toVeloxGeneratorConfig(generatorConfig), null)); |
There was a problem hiding this comment.
for safty, subtask splits shoud use a non-ParallelSplit class
There was a problem hiding this comment.
Addressed. Split into two types:
NexmarkConnectorSplit— leaf split,extends ConnectorSplit, only carries
NexmarkGeneratorConfig config. Serialized across the Java/C++ boundary as before.NexmarkParallelSplit— container,extends ParallelSplit, holds
List<NexmarkConnectorSplit> subtaskSplits. Only this type implements
getSubtaskSplit, so a leaf split can no longer be mistaken for a parallel
container.
NexmarkSourceFactory now builds a NexmarkParallelSplit wrapping the
per-subtask leaf splits, and the instanceof ParallelSplit check in
GlutenSourceFunction cleanly distinguishes container vs leaf.
|
cc @lgbo-ustc UTs were added and the velox4j branch was set. |
|
@ggjh-159 update velox4j reference in ci |
@lgbo-ustc done. |
|
@lgbo-ustc The tests for this PR might also fail. I'll fix it as soon as possible. |
|
@ggjh-159 update the code again |
What changes are proposed in this pull request?
fix: #12298
depends: bigo-sg/velox4j#36、bigo-sg/velox#45
This PR consumes the
ParallelSplitabstraction introduced by the companion velox4j PR:NexmarkSourceFactory.buildVeloxSourceiterates over all per-subtask splits fromgetSplits(parallelism)and packs them into a singleNexmarkConnectorSplitwhosesubtaskSplitscarries one entry per subtask.GlutenSourceFunction.initSessiondetectsParallelSplitwithparallelism > 1and selects the per-subtask split viagetSubtaskSplit(subtaskIndex, totalParallelism); otherwise it behaves as before.How was this patch tested?
Manual run on a standalone Flink cluster with
parallelism.default = 2, nexmarkevents.num = 10000, tps = 2000, query q0.dateTimespanfirstEventId=1, maxEvents=5000; subtask 1firstEventId=5001, maxEvents=5000