From 5877195f58deffa36bca0aa80d2c11da9427fcbd Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Mon, 15 Jun 2026 17:25:18 +0800 Subject: [PATCH 1/6] fix(nexmark): support multi-parallelism via ParallelSplit --- .../gluten/velox/NexmarkSourceFactory.java | 54 ++++++++++++++----- .../operators/GlutenSourceFunction.java | 11 +++- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index ae0d94fb3cc..a49381a6a7a 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -22,6 +22,7 @@ import org.apache.gluten.util.PlanNodeIdGenerator; import org.apache.gluten.util.ReflectUtils; +import io.github.zhztheplayer.velox4j.connector.GeneratorConfig; import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; @@ -32,17 +33,26 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.List; import java.util.Map; public class NexmarkSourceFactory implements VeloxSourceSinkFactory { - private static final Logger LOG = LoggerFactory.getLogger(NexmarkSourceFactory.class); + private static final ObjectMapper MAPPER = + new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE); @SuppressWarnings("rawtypes") @Override @@ -72,26 +82,32 @@ public Transformation buildVeloxSource( "getSplits", new Class[] {int.class}, new Object[] {transformation.getParallelism()}); - Object nexmarkSourceSplit = nexmarkSourceSplits.get(0); - Object generatorConfig = - ReflectUtils.getObjectField( - nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); - Long maxEvents = - (Long) - ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents"); + + // Convert each subtask's GeneratorConfig to velox4j + List subtaskSplits = new ArrayList<>(); + for (Object nexmarkSourceSplit : nexmarkSourceSplits) { + Object generatorConfig = + ReflectUtils.getObjectField( + nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); + subtaskSplits.add( + new NexmarkConnectorSplit( + "connector-nexmark", toVeloxGeneratorConfig(generatorConfig), null)); + } + + // Base split uses first subtask's config (for parallelism = 1 case) + GeneratorConfig baseConfig = subtaskSplits.get(0).getConfig(); PlanNode tableScan = new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); + NexmarkConnectorSplit split = + new NexmarkConnectorSplit("connector-nexmark", baseConfig, subtaskSplits); GlutenStreamSource sourceOp = new GlutenStreamSource( new GlutenSourceFunction( new StatefulPlanNode(tableScan.getId(), tableScan), Map.of(id, outputType), id, - new NexmarkConnectorSplit( - "connector-nexmark", - maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : maxEvents.intValue()), + split, RowData.class)); - return new LegacySourceTransformation( transformation.getName(), sourceOp, @@ -106,4 +122,14 @@ public Transformation buildVeloxSink( Transformation transformation, Map parameters) { throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); } + + /** Convert Flink nexmark GeneratorConfig to velox4j GeneratorConfig via Jackson. */ + private static GeneratorConfig toVeloxGeneratorConfig(Object javaConfig) { + try { + String json = MAPPER.writeValueAsString(javaConfig); + return MAPPER.readValue(json, GeneratorConfig.class); + } catch (JsonProcessingException e) { + throw new TableException("Failed to convert nexmark GeneratorConfig to velox4j", e); + } + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 53f36fcf67c..29b0f37ee9b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -22,6 +22,7 @@ import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.ParallelSplit; import io.github.zhztheplayer.velox4j.iterator.UpIterator; import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; import io.github.zhztheplayer.velox4j.query.Query; @@ -230,6 +231,14 @@ private void initSession() { if (sessionResource != null) { return; } + + ConnectorSplit activeSplit = split; + int totalParallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (split instanceof ParallelSplit && totalParallelism > 1) { + activeSplit = ((ParallelSplit) split).getSubtaskSplit(subtaskIndex, totalParallelism); + } + sessionResource = new GlutenSessionResource(); GlutenSessionResources.getInstance().addSessionResource(id, sessionResource); Session session = sessionResource.getSession(); @@ -239,7 +248,7 @@ private void initSession() { VeloxQueryConfig.getConfig(getRuntimeContext()), VeloxConnectorConfig.getConfig(getRuntimeContext())); task = session.queryOps().execute(query); - task.addSplit(id, split); + task.addSplit(id, activeSplit); task.noMoreSplits(id); taskMetrics = new SourceTaskMetrics(getRuntimeContext().getMetricGroup()); } From 8196b8b3f97347c7fad8ea4fdaae35f61305aa3a Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Thu, 18 Jun 2026 15:39:46 +0800 Subject: [PATCH 2/6] rename GeneratorConfig to NexmarkGeneratorConfig --- .../gluten/velox/NexmarkSourceFactory.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index a49381a6a7a..a4967cd7408 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -22,8 +22,8 @@ import org.apache.gluten.util.PlanNodeIdGenerator; import org.apache.gluten.util.ReflectUtils; -import io.github.zhztheplayer.velox4j.connector.GeneratorConfig; import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; @@ -83,7 +83,7 @@ public Transformation buildVeloxSource( new Class[] {int.class}, new Object[] {transformation.getParallelism()}); - // Convert each subtask's GeneratorConfig to velox4j + // Convert each subtask's NexmarkGeneratorConfig to velox4j List subtaskSplits = new ArrayList<>(); for (Object nexmarkSourceSplit : nexmarkSourceSplits) { Object generatorConfig = @@ -91,11 +91,11 @@ public Transformation buildVeloxSource( nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); subtaskSplits.add( new NexmarkConnectorSplit( - "connector-nexmark", toVeloxGeneratorConfig(generatorConfig), null)); + "connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig), null)); } // Base split uses first subtask's config (for parallelism = 1 case) - GeneratorConfig baseConfig = subtaskSplits.get(0).getConfig(); + NexmarkGeneratorConfig baseConfig = subtaskSplits.get(0).getConfig(); PlanNode tableScan = new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); NexmarkConnectorSplit split = @@ -123,13 +123,13 @@ public Transformation buildVeloxSink( throw new UnsupportedOperationException("Unimplemented method 'buildSink'"); } - /** Convert Flink nexmark GeneratorConfig to velox4j GeneratorConfig via Jackson. */ - private static GeneratorConfig toVeloxGeneratorConfig(Object javaConfig) { + /** Convert Flink nexmark NexmarkGeneratorConfig to velox4j NexmarkGeneratorConfig via Jackson. */ + private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object javaConfig) { try { String json = MAPPER.writeValueAsString(javaConfig); - return MAPPER.readValue(json, GeneratorConfig.class); + return MAPPER.readValue(json, NexmarkGeneratorConfig.class); } catch (JsonProcessingException e) { - throw new TableException("Failed to convert nexmark GeneratorConfig to velox4j", e); + throw new TableException("Failed to convert nexmark NexmarkGeneratorConfig to velox4j", e); } } } From d15daaeb34a0af2556ace27948ae8635f9c393bd Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Thu, 18 Jun 2026 16:10:21 +0800 Subject: [PATCH 3/6] use NexmarkParallelSplit for nexmark subtask dispatch --- .../org/apache/gluten/velox/NexmarkSourceFactory.java | 8 +++----- .../table/runtime/operators/GlutenSourceFunction.java | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java index a4967cd7408..0767cd38299 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java @@ -24,6 +24,7 @@ import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig; +import io.github.zhztheplayer.velox4j.connector.NexmarkParallelSplit; import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle; import io.github.zhztheplayer.velox4j.plan.PlanNode; import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; @@ -91,15 +92,12 @@ public Transformation buildVeloxSource( nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig"); subtaskSplits.add( new NexmarkConnectorSplit( - "connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig), null)); + "connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig))); } - // Base split uses first subtask's config (for parallelism = 1 case) - NexmarkGeneratorConfig baseConfig = subtaskSplits.get(0).getConfig(); PlanNode tableScan = new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of()); - NexmarkConnectorSplit split = - new NexmarkConnectorSplit("connector-nexmark", baseConfig, subtaskSplits); + NexmarkParallelSplit split = new NexmarkParallelSplit("connector-nexmark", subtaskSplits); GlutenStreamSource sourceOp = new GlutenStreamSource( new GlutenSourceFunction( diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 29b0f37ee9b..76cced93f15 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -235,7 +235,7 @@ private void initSession() { ConnectorSplit activeSplit = split; int totalParallelism = getRuntimeContext().getNumberOfParallelSubtasks(); int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - if (split instanceof ParallelSplit && totalParallelism > 1) { + if (split instanceof ParallelSplit) { activeSplit = ((ParallelSplit) split).getSubtaskSplit(subtaskIndex, totalParallelism); } From 7d3b45c456a3e6062a747c9f8f272dac0016d0f1 Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Thu, 18 Jun 2026 16:59:00 +0800 Subject: [PATCH 4/6] build: point flink CI at nexmark velox4j branch --- .github/workflows/flink.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..ab4758a66bd 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -69,8 +69,8 @@ jobs: export VELOX_DEPENDENCY_SOURCE=BUNDLED export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED - git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + git clone -b fix/nexmark-source-multi-parallelism https://github.com/ggjh-159/velox4j.git + cd velox4j git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From b488083358490a952095c924fea397ddf8362d09 Mon Sep 17 00:00:00 2001 From: ggjh-159 Date: Thu, 18 Jun 2026 17:38:03 +0800 Subject: [PATCH 5/6] test(nexmark): add NexmarkSourceFactoryTest for ParallelSplit wiring --- .../velox/NexmarkSourceFactoryTest.java | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java new file mode 100644 index 00000000000..8f149cfb73d --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.velox; + +import org.apache.gluten.streaming.api.operators.GlutenStreamSource; + +import io.github.zhztheplayer.velox4j.connector.ConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit; +import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig; +import io.github.zhztheplayer.velox4j.connector.NexmarkParallelSplit; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Constructor; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class NexmarkSourceFactoryTest { + + private static final String NEXMARK_SOURCE_CN = "com.github.nexmark.flink.source.NexmarkSource"; + private static final String NEXMARK_CONFIG_CN = "com.github.nexmark.flink.NexmarkConfiguration"; + private static final String GENERATOR_CONFIG_CN = + "com.github.nexmark.flink.generator.GeneratorConfig"; + + @SuppressWarnings("rawtypes") + @Test + void testBuildVeloxSourceWrapsSplitsInNexmarkParallelSplit() throws Exception { + SourceTransformation tx = newSourceTransformation(/* parallelism= */ 2); + + NexmarkSourceFactory factory = new NexmarkSourceFactory(); + Transformation result = factory.buildVeloxSource(tx, Collections.emptyMap()); + + LegacySourceTransformation legacy = + assertInstanceOf(LegacySourceTransformation.class, result); + GlutenStreamSource streamSource = + assertInstanceOf(GlutenStreamSource.class, legacy.getOperator()); + + ConnectorSplit split = streamSource.getConnectorSplit(); + NexmarkParallelSplit parallel = assertInstanceOf(NexmarkParallelSplit.class, split); + + NexmarkConnectorSplit s0 = + assertInstanceOf(NexmarkConnectorSplit.class, parallel.getSubtaskSplit(0, 2)); + NexmarkConnectorSplit s1 = + assertInstanceOf(NexmarkConnectorSplit.class, parallel.getSubtaskSplit(1, 2)); + + NexmarkGeneratorConfig c0 = s0.getConfig(); + NexmarkGeneratorConfig c1 = s1.getConfig(); + assertEquals(0L, c0.getFirstEventId()); + assertEquals(500L, c0.getMaxEventsOrZero()); + assertEquals(500L, c1.getFirstEventId()); + assertEquals(500L, c1.getMaxEventsOrZero()); + } + + @SuppressWarnings("rawtypes") + @Test + void testBuildVeloxSourceAtParallelismOneStillProducesParallelSplit() throws Exception { + SourceTransformation tx = newSourceTransformation(/* parallelism= */ 1); + + NexmarkSourceFactory factory = new NexmarkSourceFactory(); + Transformation result = factory.buildVeloxSource(tx, Collections.emptyMap()); + + LegacySourceTransformation legacy = + assertInstanceOf(LegacySourceTransformation.class, result); + GlutenStreamSource streamSource = + assertInstanceOf(GlutenStreamSource.class, legacy.getOperator()); + + NexmarkParallelSplit parallel = + assertInstanceOf(NexmarkParallelSplit.class, streamSource.getConnectorSplit()); + NexmarkConnectorSplit s0 = + assertInstanceOf(NexmarkConnectorSplit.class, parallel.getSubtaskSplit(0, 1)); + + assertEquals(0L, s0.getConfig().getFirstEventId()); + assertEquals(1000L, s0.getConfig().getMaxEventsOrZero()); + } + + @Test + void testBuildVeloxSourceRejectsNonSourceTransformation() { + NexmarkSourceFactory factory = new NexmarkSourceFactory(); + assertThrows( + ClassCastException.class, + () -> factory.buildVeloxSource(new StubTransformation(), Collections.emptyMap())); + } + + @SuppressWarnings("rawtypes") + private static SourceTransformation newSourceTransformation(int parallelism) throws Exception { + Object nexmarkSource = newNexmarkSource(1000L); + Constructor ctor = + SourceTransformation.class.getDeclaredConstructor( + String.class, + org.apache.flink.api.connector.source.Source.class, + org.apache.flink.api.common.eventtime.WatermarkStrategy.class, + org.apache.flink.api.common.typeinfo.TypeInformation.class, + int.class); + return (SourceTransformation) + ctor.newInstance( + "nexmark-source", + nexmarkSource, + org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), + InternalTypeInfo.of(RowType.of(new IntType())), + parallelism); + } + + private static Object newNexmarkSource(long maxEvents) throws Exception { + Object nexmarkConfig = Class.forName(NEXMARK_CONFIG_CN).getDeclaredConstructor().newInstance(); + java.lang.reflect.Field numEvents = nexmarkConfig.getClass().getDeclaredField("numEvents"); + numEvents.setAccessible(true); + numEvents.setLong(nexmarkConfig, maxEvents); + + Class generatorConfigCls = Class.forName(GENERATOR_CONFIG_CN); + Constructor generatorConfigCtor = + generatorConfigCls.getDeclaredConstructor( + Class.forName(NEXMARK_CONFIG_CN), + long.class, + long.class, + long.class, + long.class, + long.class); + Object generatorConfig = + generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents, maxEvents, 0L); + + Class nexmarkSourceCls = Class.forName(NEXMARK_SOURCE_CN); + Constructor nexmarkSourceCtor = + nexmarkSourceCls.getDeclaredConstructor( + generatorConfigCls, org.apache.flink.api.common.typeinfo.TypeInformation.class); + nexmarkSourceCtor.setAccessible(true); + return nexmarkSourceCtor.newInstance( + generatorConfig, InternalTypeInfo.of(RowType.of(new IntType()))); + } + + private static final class StubTransformation extends Transformation { + StubTransformation() { + super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1); + } + + @Override + public java.util.List> getInputs() { + return Collections.emptyList(); + } + + @Override + protected java.util.List> getTransitivePredecessorsInternal() { + return Collections.emptyList(); + } + } +} From 9dcd5406cb5a532e0fa4efc43a44bb177ffd19ae Mon Sep 17 00:00:00 2001 From: GGboom Date: Wed, 24 Jun 2026 11:37:50 +0800 Subject: [PATCH 6/6] update velox4j reference for ci --- .github/workflows/flink.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index ab4758a66bd..4ade2658b0b 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -69,8 +69,8 @@ jobs: export VELOX_DEPENDENCY_SOURCE=BUNDLED export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED - git clone -b fix/nexmark-source-multi-parallelism https://github.com/ggjh-159/velox4j.git - cd velox4j + git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git + cd velox4j && git reset --hard 219cbeae1107df872c0b142c784c750c6432e55e git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd ..