diff --git a/README.md b/README.md index 67d04dea..d88ca1e5 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ The RocketMQSourceFunction is based on RocketMQ pull consumer mode, and provides Otherwise, the source doesn't provide any reliability guarantees. ### KeyValueDeserializationSchema + The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema` interface. `rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`. @@ -23,7 +24,16 @@ public interface KeyValueDeserializationSchema extends ResultTypeQueryable } ``` +## RocketMQSource + +RocketMQSource implement flink's new source interface,which provide capability of flow-batch integration.Now you can construct an instance by `RocketMQSourceBuilder.build()`. + +### RocketMQDeserializationSchema + +The mian API for deserializing topic and tags is the `org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema`interface.`rocketmq-flink` includes general purpose `RocketMQDeserializationSchema` implementations called `RocketMQRowDeserializationSchema` and `SimpleStringSchema`.If you only focus on the value of message,you can use the wrapper class of `RocketMQValueOnlyDeserializationSchemaWrapper` to expand. + ## RocketMQSink + To use the `RocketMQSink`, you construct an instance of it by specifying KeyValueSerializationSchema & TopicSelector instances and a Properties instance which including rocketmq configs. `RocketMQSink(KeyValueSerializationSchema schema, TopicSelector topicSelector, Properties props)` The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set. @@ -57,6 +67,9 @@ public interface TopicSelector extends Serializable { ``` ## Examples + +You can find more examples in directory of `org.apache.rocketmq.flink.legacy.example` + The following is an example which receive messages from RocketMQ brokers and send messages to broker after processing. ```java @@ -119,7 +132,36 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm } ``` +The following is an example which use new source function to fetch the records and deserialize to a simple string. + +```java + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(30000L); + + RocketMQSource source = + RocketMQSource.builder() + .setNameServerAddress(nameServerAddress) + .setTopic(topic) + .setConsumerGroup(consumerGroup) + .setStartFromEarliest() + .setDeserializer( + new RocketMQValueOnlyDeserializationSchemaWrapper<>( + new SimpleStringSchema())) + .build(); + + DataStreamSource newSource = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "new source") + .setParallelism(4); + + newSource.print().setParallelism(1); + + env.execute(); +``` + + + ## Configurations + The following configurations are all from the class `org.apache.rocketmq.flink.legacy.RocketMQConfig`. ### Producer Configurations diff --git a/pom.xml b/pom.xml index 8617bd2a..79e2393e 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,24 @@ + + org.slf4j + slf4j-log4j12 + 1.7.10 + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + provided + org.apache.flink flink-java @@ -114,11 +132,27 @@ org.apache.rocketmq rocketmq-namesrv ${rocketmq.version} + + + logback-core + ch.qos.logback + + + logback-classic + ch.qos.logback + + org.apache.rocketmq rocketmq-broker ${rocketmq.version} + + + logback-classic + ch.qos.logback + + org.apache.rocketmq @@ -142,7 +176,7 @@ junit junit test - 4.12 + 4.13.2 org.powermock @@ -179,6 +213,7 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + log4j.properties @@ -194,18 +229,6 @@ - - - maven-compiler-plugin - 3.5.1 - - ${maven.compiler.source} - ${maven.compiler.target} - ${maven.compiler.source} - true - true - - org.apache.maven.plugins maven-surefire-plugin diff --git a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java index 22903c5c..045e3f3f 100644 --- a/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java +++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.flink.common; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET; - /** Includes config options of RocketMQ connector type. */ public class RocketMQOptions { @@ -44,25 +45,11 @@ public class RocketMQOptions { public static final ConfigOption OPTIONAL_SQL = ConfigOptions.key("sql").stringType().noDefaultValue(); - public static final ConfigOption OPTIONAL_START_MESSAGE_OFFSET = - ConfigOptions.key("startMessageOffset") - .longType() - .defaultValue(DEFAULT_START_MESSAGE_OFFSET); - - public static final ConfigOption OPTIONAL_START_TIME_MILLS = - ConfigOptions.key("startTimeMs").longType().defaultValue(-1L); - - public static final ConfigOption OPTIONAL_START_TIME = - ConfigOptions.key("startTime").stringType().noDefaultValue(); - - public static final ConfigOption OPTIONAL_END_TIME = - ConfigOptions.key("endTime").stringType().noDefaultValue(); - - public static final ConfigOption OPTIONAL_TIME_ZONE = - ConfigOptions.key("timeZone").stringType().noDefaultValue(); + public static final ConfigOption OPTIONAL_END_TIME_STAMP = + ConfigOptions.key("endTimestamp").longType().defaultValue(Long.MAX_VALUE); public static final ConfigOption OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS = - ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L); + ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(-1L); public static final ConfigOption OPTIONAL_USE_NEW_API = ConfigOptions.key("useNewApi").booleanType().defaultValue(true); @@ -109,9 +96,42 @@ public class RocketMQOptions { public static final ConfigOption OPTIONAL_SECRET_KEY = ConfigOptions.key("secretKey").stringType().noDefaultValue(); - public static final ConfigOption OPTIONAL_SCAN_STARTUP_MODE = - ConfigOptions.key("scanStartupMode").stringType().defaultValue("latest"); - - public static final ConfigOption OPTIONAL_OFFSET_FROM_TIMESTAMP = - ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue(); + // -------------------------------------------------------------------------------------------- + // Scan specific options + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption OPTIONAL_SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .enumType(StartupMode.class) + .defaultValue(StartupMode.GROUP_OFFSETS) + .withDescription("Startup mode for RocketMQ consumer."); + + public static final ConfigOption OPTIONAL_SCAN_OFFSET_RESET_STRATEGY = + ConfigOptions.key("scan.offsetReset.strategy") + .enumType(OffsetResetStrategy.class) + .defaultValue(OffsetResetStrategy.LATEST) + .withDescription( + "The offsetReset strategy only be used if group offsets is not found"); + + public static final ConfigOption OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS = + ConfigOptions.key("scan.startup.specific-offsets") + .stringType() + .noDefaultValue() + .withDescription( + "Optional offsets used in case of \"specific-offsets\" startup mode"); + + public static final ConfigOption OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.startup.timestamp-millis") + .longType() + .defaultValue(-1L) + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption OPTIONAL_COMMIT_OFFSET_AUTO = + ConfigOptions.key("commit.offset.auto") + .booleanType() + .defaultValue(false) + .withDescription( + "Commit offset immediately when each message is fetched." + + "If you don't enable the flink checkpoint, make sure this option is set to true."); } diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java index b078056b..6e6b5b4c 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java @@ -166,9 +166,6 @@ public void open(Configuration parameters) throws Exception { this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); - if (offsetTable == null) { - offsetTable = new ConcurrentHashMap<>(); - } if (restoredOffsets == null) { restoredOffsets = new ConcurrentHashMap<>(); } @@ -247,7 +244,16 @@ public void open(Configuration parameters) throws Exception { // If the job recovers from the state, the state has already contained the offsets of last // commit. if (!restored) { - initOffsets(messageQueues); + this.offsetTable = + RocketMQUtils.initOffsets( + messageQueues, + consumer, + startMode, + offsetResetStrategy, + specificTimeStamp, + specificStartupOffsets); + } else { + this.offsetTable = new ConcurrentHashMap<>(); } } @@ -394,76 +400,6 @@ private void awaitTermination() throws InterruptedException { } } - /** - * only flink job start with no state can init offsets from broker - * - * @param messageQueues - * @throws MQClientException - */ - private void initOffsets(List messageQueues) throws MQClientException { - for (MessageQueue mq : messageQueues) { - long offset; - switch (startMode) { - case LATEST: - offset = consumer.maxOffset(mq); - break; - case EARLIEST: - offset = consumer.minOffset(mq); - break; - case GROUP_OFFSETS: - offset = consumer.fetchConsumeOffset(mq, false); - // the min offset return if consumer group first join,return a negative number - // if - // catch exception when fetch from broker. - // If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST - if (offset <= 0) { - switch (offsetResetStrategy) { - case LATEST: - offset = consumer.maxOffset(mq); - log.info( - "current consumer thread:{} has no committed offset,use Strategy:{} instead", - mq, - offsetResetStrategy); - break; - case EARLIEST: - log.info( - "current consumer thread:{} has no committed offset,use Strategy:{} instead", - mq, - offsetResetStrategy); - offset = consumer.minOffset(mq); - break; - default: - break; - } - } - break; - case TIMESTAMP: - offset = consumer.searchOffset(mq, specificTimeStamp); - break; - case SPECIFIC_OFFSETS: - if (specificStartupOffsets == null) { - throw new RuntimeException( - "StartMode is specific_offsets.But none offsets has been specified"); - } - Long specificOffset = specificStartupOffsets.get(mq); - if (specificOffset != null) { - offset = specificOffset; - } else { - offset = consumer.fetchConsumeOffset(mq, false); - } - break; - default: - throw new IllegalArgumentException( - "current startMode is not supported" + startMode); - } - log.info( - "current consumer queue:{} start from offset of: {}", - mq.getBrokerName() + "-" + mq.getQueueId(), - offset); - offsetTable.put(mq, offset); - } - } - /** consume from the min offset at every restart with no state */ public RocketMQSourceFunction setStartFromEarliest() { this.startMode = StartupMode.EARLIEST; diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java index 163dae46..2e9067af 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/config/StartupMode.java @@ -19,9 +19,26 @@ /** RocketMQ startup mode. */ public enum StartupMode { - EARLIEST, - LATEST, - GROUP_OFFSETS, - TIMESTAMP, - SPECIFIC_OFFSETS + EARLIEST("earliest-offset", "Start from the earliest offset possible."), + LATEST("latest-offset", "Start from the latest offset."), + GROUP_OFFSETS( + "group-offsets", + "Start from committed offsets in brokers of a specific consumer group."), + TIMESTAMP("timestamp", "Start from user-supplied timestamp for each message queue."), + SPECIFIC_OFFSETS( + "specific-offsets", + "Start from user-supplied specific offsets for each message queue."); + + private final String value; + private final String description; + + StartupMode(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } } diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java index 1f084a85..a203f4df 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java @@ -17,16 +17,26 @@ package org.apache.rocketmq.flink.legacy.common.util; import org.apache.rocketmq.client.AccessChannel; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; public final class RocketMQUtils { + private static final Logger log = LoggerFactory.getLogger(RocketMQUtils.class); public static int getInteger(Properties props, String key, int defaultValue) { return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue))); @@ -78,4 +88,76 @@ public static List allocate( } return result; } + + public static Map initOffsets( + List messageQueues, + DefaultMQPullConsumer consumer, + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + long specificTimeStamp, + Map specificStartupOffsets) + throws MQClientException { + Map offsetTable = new ConcurrentHashMap<>(); + for (MessageQueue mq : messageQueues) { + long offset; + switch (startMode) { + case LATEST: + offset = consumer.maxOffset(mq); + break; + case EARLIEST: + offset = consumer.minOffset(mq); + break; + case GROUP_OFFSETS: + offset = consumer.fetchConsumeOffset(mq, false); + // the min offset return if consumer group first join,return a negative number + // if catch exception when fetch from broker. + // If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST + if (offset <= 0) { + switch (offsetResetStrategy) { + case LATEST: + offset = consumer.maxOffset(mq); + log.info( + "current consumer thread:{} has no committed offset,use Strategy:{} instead", + mq, + offsetResetStrategy); + break; + case EARLIEST: + log.info( + "current consumer thread:{} has no committed offset,use Strategy:{} instead", + mq, + offsetResetStrategy); + offset = consumer.minOffset(mq); + break; + default: + break; + } + } + break; + case TIMESTAMP: + offset = consumer.searchOffset(mq, specificTimeStamp); + break; + case SPECIFIC_OFFSETS: + if (specificStartupOffsets == null) { + throw new RuntimeException( + "StartMode is specific_offsets.But none offsets has been specified"); + } + Long specificOffset = specificStartupOffsets.get(mq); + if (specificOffset != null) { + offset = specificOffset; + } else { + offset = consumer.fetchConsumeOffset(mq, false); + } + break; + default: + throw new IllegalArgumentException( + "current startMode is not supported" + startMode); + } + log.info( + "current consumer queue:{} start from offset of: {}", + mq.getBrokerName() + "-" + mq.getQueueId(), + offset); + offsetTable.put(mq, offset); + } + return offsetTable; + } } diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSourceExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSourceExample.java new file mode 100644 index 00000000..da795659 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSourceExample.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.legacy.example; + +import org.apache.rocketmq.flink.source.RocketMQSource; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQValueOnlyDeserializationSchemaWrapper; +import org.apache.rocketmq.flink.source.reader.deserializer.SimpleStringSchema; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class RocketMQSourceExample { + private static String topic = "tp_test"; + private static String consumerGroup = "cg_test"; + private static String nameServerAddress = "10.13.66.140:9876"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(30000L); + + RocketMQSource source = + RocketMQSource.builder() + .setNameServerAddress(nameServerAddress) + .setTopic(topic) + .setConsumerGroup(consumerGroup) + .setStartFromEarliest() + .setDeserializer( + new RocketMQValueOnlyDeserializationSchemaWrapper<>( + new SimpleStringSchema())) + .build(); + + DataStreamSource newSource = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "new source") + .setParallelism(4); + + newSource.print().setParallelism(1); + + env.execute(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSqlExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSqlExample.java new file mode 100644 index 00000000..2e9d68f0 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQSqlExample.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.legacy.example; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +public class RocketMQSqlExample { + private static String topic = "tp_test"; + private static String consumerGroup = "cg_test"; + private static String nameServerAddress = "10.13.66.140:9876"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + String sqlCreate = + String.format( + "CREATE TABLE rocketmq_source (\n" + + " `user_id` BIGINT,\n" + + " `behavior` STRING\n" + + ") WITH (\n" + + " 'connector' = 'rocketmq',\n" + + " 'topic' = '%s',\n" + + " 'consumerGroup' = '%s',\n" + + " 'nameServerAddress' = '%s',\n" + + " 'fieldDelimiter' = ' ',\n" + + " 'scan.startup.mode' = 'specific-offsets',\n" + + " 'commit.offset.auto' = 'true',\n" + + " 'scan.startup.specific-offsets' = 'broker:broker-a,queue:0,offset:21;broker:broker-a,queue:1,offset:14'\n" + + ");", + topic, consumerGroup, nameServerAddress); + tableEnv.executeSql(sqlCreate); + Table table = tableEnv.sqlQuery("select * from rocketmq_source"); + table.execute().print(); + env.execute(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java index 27c69f10..1fb16b90 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java @@ -18,7 +18,11 @@ package org.apache.rocketmq.flink.source; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.RocketMQOptions; import org.apache.rocketmq.flink.legacy.RocketMQConfig; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState; import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer; import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator; @@ -26,9 +30,11 @@ import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter; import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader; import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; +import org.apache.rocketmq.flink.source.reader.fetcher.RocketMQSourceFetcherManager; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; @@ -50,6 +56,9 @@ import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.Supplier; /** The Source implementation of RocketMQ. */ @@ -58,8 +67,6 @@ public class RocketMQSource ResultTypeQueryable { private static final long serialVersionUID = -1L; - private final String consumerOffsetMode; - private final long consumerOffsetTimestamp; private final String topic; private final String consumerGroup; private final String nameServerAddress; @@ -70,14 +77,20 @@ public class RocketMQSource private final String secretKey; private final long stopInMs; - private final long startTime; - private final long startOffset; private final long partitionDiscoveryIntervalMs; // Boundedness private final Boundedness boundedness; private final RocketMQDeserializationSchema deserializationSchema; + // consumer strategy + private StartupMode startMode = StartupMode.GROUP_OFFSETS; + private OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.LATEST; + private final Map specificOffsets; + private long consumerOffsetTimestamp; + + private boolean commitOffsetAuto; + public RocketMQSource( String topic, String consumerGroup, @@ -87,16 +100,22 @@ public RocketMQSource( String tag, String sql, long stopInMs, - long startTime, - long startOffset, long partitionDiscoveryIntervalMs, Boundedness boundedness, RocketMQDeserializationSchema deserializationSchema, - String cosumerOffsetMode, - long consumerOffsetTimestamp) { + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + Map specificOffsets, + long consumerOffsetTimestamp, + boolean commitOffsetAuto) { Validate.isTrue( !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)), "Consumer tag and sql can not set value at the same time"); + Validate.isTrue( + !(boundedness == Boundedness.BOUNDED && partitionDiscoveryIntervalMs > 0), + "Bounded stream didn't support partitionDiscovery." + + "PartitionDiscovery will hold the split reader and keep the thread running " + + "even though all of subtasks has emitted the last record."); this.topic = topic; this.consumerGroup = consumerGroup; this.nameServerAddress = nameServerAddress; @@ -105,13 +124,18 @@ public RocketMQSource( this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag; this.sql = sql; this.stopInMs = stopInMs; - this.startTime = startTime; - this.startOffset = startOffset > 0 ? startOffset : startTime; this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; this.boundedness = boundedness; this.deserializationSchema = deserializationSchema; - this.consumerOffsetMode = cosumerOffsetMode; + if (null != startMode) { + this.startMode = startMode; + } + if (null != offsetResetStrategy) { + this.offsetResetStrategy = offsetResetStrategy; + } + this.specificOffsets = specificOffsets == null ? new HashMap<>() : specificOffsets; this.consumerOffsetTimestamp = consumerOffsetTimestamp; + this.commitOffsetAuto = commitOffsetAuto; } @Override @@ -120,8 +144,8 @@ public Boundedness getBoundedness() { } @Override - public SourceReader createReader( - SourceReaderContext readerContext) { + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { FutureCompletingBlockingQueue>> elementsQueue = new FutureCompletingBlockingQueue<>(); deserializationSchema.open( @@ -147,15 +171,15 @@ public UserCodeClassLoader getUserCodeClassLoader() { secretKey, tag, sql, - stopInMs, - startTime, - startOffset, - deserializationSchema); + deserializationSchema, + readerContext, + commitOffsetAuto); RocketMQRecordEmitter recordEmitter = new RocketMQRecordEmitter<>(); return new RocketMQSourceReader<>( elementsQueue, - splitReaderSupplier, + new RocketMQSourceFetcherManager<>( + elementsQueue, splitReaderSupplier, (ignore) -> {}), recordEmitter, new Configuration(), readerContext); @@ -172,11 +196,12 @@ public SplitEnumerator createEn accessKey, secretKey, stopInMs, - startOffset, partitionDiscoveryIntervalMs, boundedness, enumContext, - consumerOffsetMode, + startMode, + offsetResetStrategy, + specificOffsets, consumerOffsetTimestamp); } @@ -184,6 +209,8 @@ public SplitEnumerator createEn public SplitEnumerator restoreEnumerator( SplitEnumeratorContext enumContext, RocketMQSourceEnumState checkpoint) { + Map> listMap = checkpoint.getCurrentAssignment(); + listMap.forEach((a, b) -> System.out.println("xixixixixix" + b)); return new RocketMQSourceEnumerator( topic, @@ -192,12 +219,13 @@ public SplitEnumerator restoreE accessKey, secretKey, stopInMs, - startOffset, partitionDiscoveryIntervalMs, boundedness, enumContext, checkpoint.getCurrentAssignment(), - consumerOffsetMode, + startMode, + offsetResetStrategy, + specificOffsets, consumerOffsetTimestamp); } @@ -215,4 +243,26 @@ public SimpleVersionedSerializer getEnumeratorCheckpoin public TypeInformation getProducedType() { return deserializationSchema.getProducedType(); } + + /** + * Builder to build a {@link RocketMQSource} + * + * @param + * @return + */ + public static RocketMQSourceBuilder builder() { + return new RocketMQSourceBuilder<>(); + } + + @VisibleForTesting + Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE, startMode); + conf.set(RocketMQOptions.OPTIONAL_SCAN_OFFSET_RESET_STRATEGY, offsetResetStrategy); + conf.set(RocketMQOptions.OPTIONAL_END_TIME_STAMP, stopInMs); + conf.set(RocketMQOptions.OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS, consumerOffsetTimestamp); + conf.set( + RocketMQOptions.OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS, specificOffsets.toString()); + return conf; + } } diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilder.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilder.java new file mode 100644 index 00000000..876a641e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilder.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.RocketMQOptions; +import org.apache.rocketmq.flink.legacy.RocketMQConfig; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; + +import org.apache.flink.api.connector.source.Boundedness; + +import org.apache.commons.lang.Validate; + +import java.util.Map; +import java.util.Properties; + +/** + * The @builder class for {@link RocketMQSource}to make it easier for the users to construct a + * {@link RocketMQSource}. + */ +public class RocketMQSourceBuilder { + private static final String[] REQUIRED_CONFIGS = { + RocketMQConfig.NAME_SERVER_ADDR, + RocketMQConfig.CONSUMER_GROUP, + RocketMQConfig.CONSUMER_TOPIC + }; + protected Properties props; + private RocketMQDeserializationSchema deserializationSchema; + private Boundedness boundedness; + private long consumerOffsetTimestamp; + private long stopInMs; + private long partitionDiscoveryIntervalMs; + private boolean commitOffsetAuto; + + /** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ + private StartupMode startMode = StartupMode.GROUP_OFFSETS; + + /** + * If StartupMode#GROUP_OFFSETS has no commit offset.OffsetResetStrategy would offer init + * strategy. + */ + private OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.LATEST; + + /** + * Specific startup offsets; only relevant when startup mode is {@link + * StartupMode#SPECIFIC_OFFSETS}. + */ + private Map specificStartupOffsets; + + public RocketMQSourceBuilder() { + this.props = new Properties(); + this.deserializationSchema = null; + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + this.stopInMs = RocketMQOptions.OPTIONAL_END_TIME_STAMP.defaultValue(); + this.partitionDiscoveryIntervalMs = + RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS.defaultValue(); + this.commitOffsetAuto = RocketMQOptions.OPTIONAL_COMMIT_OFFSET_AUTO.defaultValue(); + } + + public RocketMQSourceBuilder setTopic(String topic) { + return setProperty(RocketMQConfig.CONSUMER_TOPIC, topic); + } + + public RocketMQSourceBuilder setConsumerGroup(String consumerGroup) { + return setProperty(RocketMQConfig.CONSUMER_GROUP, consumerGroup); + } + + public RocketMQSourceBuilder setNameServerAddress(String nameServerAddress) { + return setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); + } + + public RocketMQSourceBuilder setTag(String tag) { + return setProperty(RocketMQConfig.CONSUMER_TAG, tag); + } + + public RocketMQSourceBuilder setSql(String sql) { + return setProperty(RocketMQConfig.CONSUMER_SQL, sql); + } + + public RocketMQSourceBuilder setAccessKey(String accessKey) { + return setProperty(RocketMQConfig.ACCESS_KEY, accessKey); + } + + public RocketMQSourceBuilder setSecretKey(String secretKey) { + return setProperty(RocketMQConfig.SECRET_KEY, secretKey); + } + + public RocketMQSourceBuilder setDeserializer( + RocketMQDeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + return this; + } + + public RocketMQSourceBuilder setStopInMs(long stopInMs) { + this.boundedness = Boundedness.BOUNDED; + this.stopInMs = stopInMs; + return this; + } + + public RocketMQSourceBuilder setPartitionDiscoveryIntervalMs( + long partitionDiscoveryIntervalMs) { + this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; + return this; + } + + public RocketMQSourceBuilder setProperty(String key, String value) { + props.setProperty(key, value); + return this; + } + + public RocketMQSourceBuilder setProperties(Properties props) { + this.props.putAll(props); + return this; + } + + /** consume from the min offset at every restart with no state */ + public RocketMQSourceBuilder setStartFromEarliest() { + this.startMode = StartupMode.EARLIEST; + return this; + } + + /** consume from the max offset of each broker's queue at every restart with no state */ + public RocketMQSourceBuilder setStartFromLatest() { + this.startMode = StartupMode.LATEST; + return this; + } + + /** consume from the closest offset */ + public RocketMQSourceBuilder setStartFromTimeStamp(long consumerOffsetTimestamp) { + this.startMode = StartupMode.TIMESTAMP; + this.consumerOffsetTimestamp = consumerOffsetTimestamp; + return this; + } + + /** consume from the group offsets those was stored in brokers. */ + public RocketMQSourceBuilder setStartFromGroupOffsets() { + this.startMode = StartupMode.GROUP_OFFSETS; + return this; + } + + /** + * consume from the group offsets those was stored in brokers. If there is no committed + * offset,#{@link OffsetResetStrategy} would provide initialization policy. + */ + public RocketMQSourceBuilder setStartFromGroupOffsets( + OffsetResetStrategy offsetResetStrategy) { + this.startMode = StartupMode.GROUP_OFFSETS; + this.offsetResetStrategy = offsetResetStrategy; + return this; + } + + /** + * consume from the specific offset. Group offsets is enable while the broker didn't specify + * offset. + */ + public RocketMQSourceBuilder setStartFromSpecificOffsets( + Map specificOffsets) { + this.specificStartupOffsets = specificOffsets; + this.startMode = StartupMode.SPECIFIC_OFFSETS; + return this; + } + + /** + * If checkpoint is disabled,this option must be set to true. + * + * @param commitOffsetAuto + */ + public RocketMQSourceBuilder setCommitOffsetAuto(boolean commitOffsetAuto) { + this.commitOffsetAuto = commitOffsetAuto; + return this; + } + + public RocketMQSource build() { + sanityCheck(); + return new RocketMQSource( + props.getProperty(RocketMQConfig.CONSUMER_TOPIC), + props.getProperty(RocketMQConfig.CONSUMER_GROUP), + props.getProperty(RocketMQConfig.NAME_SERVER_ADDR), + props.getProperty(RocketMQConfig.ACCESS_KEY), + props.getProperty(RocketMQConfig.SECRET_KEY), + props.getProperty(RocketMQConfig.CONSUMER_TAG), + props.getProperty(RocketMQConfig.CONSUMER_SQL), + stopInMs, + partitionDiscoveryIntervalMs, + boundedness, + deserializationSchema, + startMode, + offsetResetStrategy, + specificStartupOffsets, + consumerOffsetTimestamp, + commitOffsetAuto); + } + + private void sanityCheck() { + for (String requiredConfig : REQUIRED_CONFIGS) { + Validate.notNull( + props.getProperty(requiredConfig), + String.format("Property %s is required but not provided", requiredConfig)); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java index 38aa132a..20379a05 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java @@ -23,6 +23,9 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; +import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.flink.annotation.Internal; @@ -50,10 +53,6 @@ import java.util.Map; import java.util.Set; -import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST; -import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST; -import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP; - /** The enumerator class for RocketMQ source. */ @Internal public class RocketMQSourceEnumerator @@ -61,7 +60,6 @@ public class RocketMQSourceEnumerator private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class); private final Map offsetTable = new HashMap<>(); - private final String consumerOffsetMode; private final long consumerOffsetTimestamp; /** The topic used for this RocketMQSource. */ private final String topic; @@ -71,8 +69,6 @@ public class RocketMQSourceEnumerator private final String nameServerAddress; /** The stop timestamp for this RocketMQSource. */ private final long stopInMs; - /** The start offset for this RocketMQSource. */ - private final long startOffset; /** The partition discovery interval for this RocketMQSource. */ private final long partitionDiscoveryIntervalMs; /** The boundedness of this RocketMQSource. */ @@ -103,6 +99,11 @@ public class RocketMQSourceEnumerator private DefaultMQPullConsumer consumer; private boolean noMoreNewPartitionSplits = false; + // consumer strategy + private StartupMode startMode; + private OffsetResetStrategy offsetResetStrategy; + private Map specificStartupOffsets; + public RocketMQSourceEnumerator( String topic, String consumerGroup, @@ -110,11 +111,12 @@ public RocketMQSourceEnumerator( String accessKey, String secretKey, long stopInMs, - long startOffset, long partitionDiscoveryIntervalMs, Boundedness boundedness, SplitEnumeratorContext context, - String consumerOffsetMode, + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + Map specificStartupOffsets, long consumerOffsetTimestamp) { this( topic, @@ -123,12 +125,13 @@ public RocketMQSourceEnumerator( accessKey, secretKey, stopInMs, - startOffset, partitionDiscoveryIntervalMs, boundedness, context, new HashMap<>(), - consumerOffsetMode, + startMode, + offsetResetStrategy, + specificStartupOffsets, consumerOffsetTimestamp); } @@ -139,12 +142,13 @@ public RocketMQSourceEnumerator( String accessKey, String secretKey, long stopInMs, - long startOffset, long partitionDiscoveryIntervalMs, Boundedness boundedness, SplitEnumeratorContext context, Map> currentSplitsAssignments, - String consumerOffsetMode, + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + Map specificStartupOffsets, long consumerOffsetTimestamp) { this.topic = topic; this.consumerGroup = consumerGroup; @@ -152,40 +156,54 @@ public RocketMQSourceEnumerator( this.accessKey = accessKey; this.secretKey = secretKey; this.stopInMs = stopInMs; - this.startOffset = startOffset; this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; this.boundedness = boundedness; this.context = context; - this.discoveredPartitions = new HashSet<>(); this.readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments); this.readerIdToSplitAssignments.forEach( (reader, splits) -> splits.forEach( - s -> - discoveredPartitions.add( - new Tuple3<>( - s.getTopic(), - s.getBroker(), - s.getPartition())))); + s -> { + // Recover offset from checkpoint + offsetTable.put( + new MessageQueue( + s.getTopic(), s.getBroker(), s.getPartition()), + s.getStartingOffset()); + // Recover discovered partitions from checkpoint + discoveredPartitions.add( + new Tuple3<>( + s.getTopic(), s.getBroker(), s.getPartition())); + })); this.pendingPartitionSplitAssignment = new HashMap<>(); - this.consumerOffsetMode = consumerOffsetMode; this.consumerOffsetTimestamp = consumerOffsetTimestamp; + this.startMode = startMode; + this.offsetResetStrategy = offsetResetStrategy; + this.specificStartupOffsets = specificStartupOffsets; } @Override public void start() { initialRocketMQConsumer(); - LOG.info( - "Starting the RocketMQSourceEnumerator for consumer group {} " - + "with partition discovery interval of {} ms.", - consumerGroup, - partitionDiscoveryIntervalMs); - context.callAsync( - this::discoverAndInitializePartitionSplit, - this::handlePartitionSplitChanges, - 0, - partitionDiscoveryIntervalMs); + if (partitionDiscoveryIntervalMs > 0) { + LOG.info( + "Starting the RocketMQSourceEnumerator for consumer group {} " + + "with partition discovery interval of {} ms.", + consumerGroup, + partitionDiscoveryIntervalMs); + context.callAsync( + this::discoverAndInitializePartitionSplit, + this::handlePartitionSplitChanges, + 0, + partitionDiscoveryIntervalMs); + } else { + LOG.info( + "Starting the RocketMQSourceEnumerator for consumer group {} " + + "without periodic partition discovery.", + consumerGroup); + context.callAsync( + this::discoverAndInitializePartitionSplit, this::handlePartitionSplitChanges); + } } @Override @@ -196,7 +214,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void addSplitsBack(List splits, int subtaskId) { addPartitionSplitChangeToPendingAssignments(splits); - assignPendingPartitionSplits(); + assignPendingPartitionSplits(Collections.singleton(subtaskId)); } @Override @@ -205,13 +223,7 @@ public void addReader(int subtaskId) { "Adding reader {} to RocketMQSourceEnumerator for consumer group {}.", subtaskId, consumerGroup); - assignPendingPartitionSplits(); - if (boundedness == Boundedness.BOUNDED) { - // for RocketMQ bounded source, send this signal to ensure the task can end after all - // the - // splits assigned are completed. - context.signalNoMoreSplits(subtaskId); - } + assignPendingPartitionSplits(Collections.singleton(subtaskId)); } @Override @@ -234,6 +246,10 @@ private Set discoverAndInitializePartitionSplit() Set> removedPartitions = new HashSet<>(Collections.unmodifiableSet(discoveredPartitions)); Set messageQueues = consumer.fetchSubscribeMessageQueues(topic); + Set storedMQs = offsetTable.keySet(); + if (messageQueues.removeAll(storedMQs) || storedMQs.isEmpty()) { + initOffsets(messageQueues); + } Set result = new HashSet<>(); for (MessageQueue messageQueue : messageQueues) { Tuple3 topicPartition = @@ -248,7 +264,7 @@ private Set discoverAndInitializePartitionSplit() topicPartition.f0, topicPartition.f1, topicPartition.f2, - getOffsetByMessageQueue(messageQueue), + offsetTable.get(messageQueue), stopInMs)); } } @@ -263,11 +279,11 @@ private void handlePartitionSplitChanges( throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t); } if (partitionDiscoveryIntervalMs < 0) { - LOG.debug(""); + LOG.debug("Partition discovery is disabled."); noMoreNewPartitionSplits = true; } addPartitionSplitChangeToPendingAssignments(partitionSplits); - assignPendingPartitionSplits(); + assignPendingPartitionSplits(context.registeredReaders().keySet()); } // This method should only be invoked in the coordinator executor thread. @@ -290,7 +306,7 @@ private void addPartitionSplitChangeToPendingAssignments( } // This method should only be invoked in the coordinator executor thread. - private void assignPendingPartitionSplits() { + private void assignPendingPartitionSplits(Set pendingReaders) { Map> incrementalAssignment = new HashMap<>(); pendingPartitionSplitAssignment.forEach( (ownerReader, pendingSplits) -> { @@ -317,45 +333,16 @@ private void assignPendingPartitionSplits() { .addAll(newPartitionSplits); // Clear the pending splits for the reader owner. pendingPartitionSplitAssignment.remove(readerOwner); - // Sends NoMoreSplitsEvent to the readers if there is no more partition splits - // to be assigned. - if (noMoreNewPartitionSplits) { - LOG.debug( - "No more RocketMQPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers " - + "in consumer group {}.", - consumerGroup); - context.signalNoMoreSplits(readerOwner); - } }); - } - - private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException { - Long offset = offsetTable.get(mq); - if (offset == null) { - if (startOffset > 0) { - offset = startOffset; - } else { - switch (consumerOffsetMode) { - case CONSUMER_OFFSET_EARLIEST: - offset = consumer.minOffset(mq); - break; - case CONSUMER_OFFSET_LATEST: - offset = consumer.maxOffset(mq); - break; - case CONSUMER_OFFSET_TIMESTAMP: - offset = consumer.searchOffset(mq, consumerOffsetTimestamp); - break; - default: - offset = consumer.fetchConsumeOffset(mq, false); - if (offset < 0) { - throw new IllegalArgumentException( - "Unknown value for CONSUMER_OFFSET_RESET_TO."); - } - } - } + // Sends NoMoreSplitsEvent to the readers if there is no more partition splits + // to be assigned.Whether bounded or unbounded flow,the idle subtask will be released. + if (noMoreNewPartitionSplits) { + LOG.debug( + "No more RocketMQPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers " + + "in consumer group {}.", + consumerGroup); + pendingReaders.forEach(context::signalNoMoreSplits); } - offsetTable.put(mq, offset); - return offsetTable.get(mq); } private void initialRocketMQConsumer() { @@ -384,6 +371,22 @@ private void initialRocketMQConsumer() { } } + private void initOffsets(Set mqSets) { + try { + Map map = + RocketMQUtils.initOffsets( + new ArrayList<>(mqSets), + consumer, + startMode, + offsetResetStrategy, + consumerOffsetTimestamp, + specificStartupOffsets); + offsetTable.putAll(map); + } catch (MQClientException e) { + LOG.error("Failed to initial consumer offsets", e); + } + } + /** * Returns the index of the target subtask that a specific RocketMQ partition should be assigned * to. diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/OffsetCommitCallback.java b/src/main/java/org/apache/rocketmq/flink/source/reader/OffsetCommitCallback.java new file mode 100644 index 00000000..3d6bc9a6 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/OffsetCommitCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader; + +/** + * A callback interface that user can implement to trigger custom actions when a commit request + * completes. + */ +public interface OffsetCommitCallback { + + /** This method will be called when the commit request has been acknowledged. */ + void onComplete(); +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java index ca9c3f1f..7a96272e 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java @@ -27,10 +27,12 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.legacy.common.util.MetricUtils; import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -71,12 +73,7 @@ public class RocketMQPartitionSplitReader private final String topic; private final String tag; private final String sql; - private final long stopInMs; - private final long startTime; - private final long startOffset; - - private final String accessKey; - private final String secretKey; + private final boolean commitOffsetAuto; private final RocketMQDeserializationSchema deserializationSchema; private final Map, Long> startingOffsets; @@ -89,6 +86,8 @@ public class RocketMQPartitionSplitReader private static final int MAX_MESSAGE_NUMBER_PER_BLOCK = 64; + private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge(); + public RocketMQPartitionSplitReader( String topic, String consumerGroup, @@ -97,29 +96,26 @@ public RocketMQPartitionSplitReader( String secretKey, String tag, String sql, - long stopInMs, - long startTime, - long startOffset, - RocketMQDeserializationSchema deserializationSchema) { + RocketMQDeserializationSchema deserializationSchema, + SourceReaderContext readerContext, + boolean commitOffsetAuto) { this.topic = topic; this.tag = tag; this.sql = sql; - this.accessKey = accessKey; - this.secretKey = secretKey; - this.stopInMs = stopInMs; - this.startTime = startTime; - this.startOffset = startOffset; this.deserializationSchema = deserializationSchema; this.startingOffsets = new HashMap<>(); this.stoppingTimestamps = new HashMap<>(); this.collector = new SimpleCollector<>(); + this.commitOffsetAuto = commitOffsetAuto; initialRocketMQConsumer(consumerGroup, nameServerAddress, accessKey, secretKey); + readerContext.metricGroup().gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay); } @Override public RecordsWithSplitIds> fetch() throws IOException { RocketMQPartitionSplitRecords> recordsBySplits = new RocketMQPartitionSplitRecords<>(); + long fetchTime = 0L; Set messageQueues; try { messageQueues = consumer.fetchSubscribeMessageQueues(topic); @@ -140,23 +136,6 @@ public RecordsWithSplitIds> fetch() throws IOException { messageQueue.getQueueId()); if (startingOffsets.containsKey(topicPartition)) { long messageOffset = startingOffsets.get(topicPartition); - if (messageOffset == 0) { - try { - messageOffset = - startTime > 0 - ? consumer.searchOffset(messageQueue, startTime) - : startOffset; - } catch (MQClientException e) { - LOG.warn( - String.format( - "Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.", - messageQueue.getTopic(), - messageQueue.getBrokerName(), - messageQueue.getQueueId()), - e); - } - messageOffset = messageOffset > -1 ? messageOffset : 0; - } PullResult pullResult = null; try { if (wakeup) { @@ -188,6 +167,7 @@ public RecordsWithSplitIds> fetch() throws IOException { messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK); } + fetchTime = System.currentTimeMillis(); } catch (MQClientException | RemotingException | MQBrokerException @@ -239,6 +219,13 @@ public RecordsWithSplitIds> fetch() throws IOException { messageExt.getQueueOffset(), messageExt .getStoreTimestamp()))); + if (commitOffsetAuto) { + consumer.updateConsumeOffset( + messageQueue, startingOffsets.get(topicPartition)); + consumer.getOffsetStore() + .persist(consumer.queueWithNamespace(messageQueue)); + } + fetchDelay.report(Math.abs(fetchTime - storeTimestamp)); } catch (Exception e) { throw new IOException( "Failed to deserialize consumer record due to", e); @@ -308,7 +295,21 @@ private void finishSplitAtRecord( } private long getStoppingTimestamp(Tuple3 topicPartition) { - return stoppingTimestamps.getOrDefault(topicPartition, stopInMs); + return stoppingTimestamps.getOrDefault(topicPartition, Long.MAX_VALUE); + } + + public void notifyCheckpointComplete( + Map committedOffsets, OffsetCommitCallback callback) + throws MQClientException { + if (commitOffsetAuto) { + return; + } + for (Map.Entry entry : committedOffsets.entrySet()) { + consumer.updateConsumeOffset(entry.getKey(), entry.getValue()); + consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey())); + LOG.info("Offset commit success.{},offset:{}", entry.getKey(), entry.getValue()); + } + callback.onComplete(); } // --------------- private helper method ---------------------- diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java index 0257e34a..54948958 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQSourceReader.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.flink.source.reader; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.source.reader.fetcher.RocketMQSourceFetcherManager; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitState; @@ -27,29 +29,56 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; -import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.function.Supplier; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** The source reader for RocketMQ partitions. */ public class RocketMQSourceReader extends SingleThreadMultiplexSourceReaderBase< Tuple3, T, RocketMQPartitionSplit, RocketMQPartitionSplitState> { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceReader.class); + // These maps need to be concurrent because it will be accessed by both the main thread + // and the split fetcher thread in the callback. + private final SortedMap> offsetsToCommit; + private final ConcurrentMap offsetsOfFinishedSplits; + public RocketMQSourceReader( FutureCompletingBlockingQueue>> elementsQueue, - Supplier, RocketMQPartitionSplit>> - splitReaderSupplier, + RocketMQSourceFetcherManager rocketMQSourceFetcherManager, RecordEmitter, T, RocketMQPartitionSplitState> recordEmitter, Configuration config, SourceReaderContext context) { - super(elementsQueue, splitReaderSupplier, recordEmitter, config, context); + super(elementsQueue, rocketMQSourceFetcherManager, recordEmitter, config, context); + this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); + this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); } @Override - protected void onSplitFinished(Map map) {} + protected void onSplitFinished(Map map) { + map.forEach( + (ignored, splitState) -> { + if (splitState.getCurrentOffset() >= 0) { + offsetsOfFinishedSplits.put( + new MessageQueue( + splitState.getTopic(), + splitState.getBroker(), + splitState.getPartition()), + splitState.getCurrentOffset()); + } + }); + } @Override protected RocketMQPartitionSplitState initializedState(RocketMQPartitionSplit partitionSplit) { @@ -61,4 +90,52 @@ protected RocketMQPartitionSplit toSplitType( String splitId, RocketMQPartitionSplitState splitState) { return splitState.toRocketMQPartitionSplit(); } + + @Override + public List snapshotState(long checkpointId) { + List splits = super.snapshotState(checkpointId); + + if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { + offsetsToCommit.put(checkpointId, Collections.emptyMap()); + } else { + Map offsetMap = + offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); + for (RocketMQPartitionSplit split : splits) { + if (split.getStartingOffset() >= 0) { + offsetMap.put( + new MessageQueue( + split.getTopic(), split.getBroker(), split.getPartition()), + split.getStartingOffset()); + } + } + // Put offsets of all the finished splits. + offsetMap.putAll(offsetsOfFinishedSplits); + } + return splits; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + LOG.debug("Committing offsets for checkpoint {}", checkpointId); + Map committedOffsets = offsetsToCommit.get(checkpointId); + if (committedOffsets == null || committedOffsets.isEmpty()) { + LOG.debug( + "Offsets for checkpoint {} either do not exist or have already been committed.", + checkpointId); + return; + } + ((RocketMQSourceFetcherManager) splitFetcherManager) + .commitOffsets( + committedOffsets, + () -> { + offsetsOfFinishedSplits + .entrySet() + .removeIf( + entry -> committedOffsets.containsKey(entry.getKey())); + while (!offsetsToCommit.isEmpty() + && offsetsToCommit.firstKey() <= checkpointId) { + offsetsToCommit.remove(offsetsToCommit.firstKey()); + } + }); + } } diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java index e50b7026..83533103 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java @@ -42,7 +42,7 @@ public interface RocketMQDeserializationSchema */ @Override @PublicEvolving - default void open(InitializationContext context) {} + default void open(InitializationContext context) throws Exception {} /** * Deserializes the byte message. diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQValueOnlyDeserializationSchemaWrapper.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQValueOnlyDeserializationSchemaWrapper.java new file mode 100644 index 00000000..0cd42b9c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQValueOnlyDeserializationSchemaWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.List; + +/** DeserializeWrapper to wrap DeserializationSchema */ +public class RocketMQValueOnlyDeserializationSchemaWrapper + implements RocketMQDeserializationSchema { + private static final long serialVersionUID = 1L; + private final DeserializationSchema, T> deserializationSchema; + + public RocketMQValueOnlyDeserializationSchemaWrapper( + DeserializationSchema, T> deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + @Override + public void open(InitializationContext context) throws Exception { + deserializationSchema.open(context); + } + + @Override + public void deserialize(List record, Collector out) throws IOException { + deserializationSchema.deserialize(record, out); + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/SimpleStringSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/SimpleStringSchema.java new file mode 100644 index 00000000..efe48eed --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/SimpleStringSchema.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.deserializer; + +import org.apache.rocketmq.common.message.MessageExt; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Deserialize value to string */ +public class SimpleStringSchema implements DeserializationSchema, String> { + + private static final long serialVersionUID = 1L; + + private String charName; + + /** + * The charset to use to convert between strings and bytes. The field is transient because we + * serialize a different delegate object instead + */ + private transient Charset charset; + + /** Creates a new SimpleStringSchema that uses "UTF-8" as the encoding. */ + public SimpleStringSchema() { + this(StandardCharsets.UTF_8); + } + + /** + * Creates a new SimpleStringSchema that uses the given charset to convert between strings and + * bytes. + * + * @param charset The charset to use to convert between strings and bytes. + */ + public SimpleStringSchema(Charset charset) { + this.charset = checkNotNull(charset); + this.charName = charset.name(); + } + + @Override + public void open( + org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext + context) + throws Exception { + DeserializationSchema.super.open(context); + if (charset == null) { + charset = Charset.forName(charName); + } + } + + @Override + public void deserialize(List record, Collector out) throws IOException { + for (MessageExt messageExt : record) { + byte[] body = messageExt.getBody(); + String value = new String(body, charset); + out.collect(value); + } + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/fetcher/RocketMQSourceFetcherManager.java b/src/main/java/org/apache/rocketmq/flink/source/reader/fetcher/RocketMQSourceFetcherManager.java new file mode 100644 index 00000000..17dba5c6 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/fetcher/RocketMQSourceFetcherManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.reader.fetcher; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.source.reader.OffsetCommitCallback; +import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader; +import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import lombok.SneakyThrows; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * The SplitFetcherManager for RocketMQ source. This class is needed to help commit the offsets to + * RocketMQ using the consumer inside the {@link RocketMQPartitionSplitReader}. + */ +public class RocketMQSourceFetcherManager + extends SingleThreadFetcherManager, RocketMQPartitionSplit> { + + /** + * Creates a new SplitFetcherManager with a single I/O threads. + * + * @param elementsQueue The queue that is used to hand over data from the I/O thread (the + * fetchers) to the reader (which emits the records and book-keeps the state. This must be + * the same queue instance that is also passed to the {@link SourceReaderBase}. + * @param splitReaderSupplier The factory for the split reader that connects to the source + * system. + * @param splitFinishedHook Hook for handling finished splits in split fetchers. + */ + public RocketMQSourceFetcherManager( + FutureCompletingBlockingQueue>> elementsQueue, + Supplier, RocketMQPartitionSplit>> + splitReaderSupplier, + Consumer> splitFinishedHook) { + super(elementsQueue, splitReaderSupplier, splitFinishedHook); + } + + public void commitOffsets( + Map committedOffsets, OffsetCommitCallback callback) { + if (committedOffsets.isEmpty()) { + return; + } + + SplitFetcher, RocketMQPartitionSplit> splitFetcher = fetchers.get(0); + if (splitFetcher != null) { + commit(splitFetcher, committedOffsets, callback); + } else { + splitFetcher = createSplitFetcher(); + commit(splitFetcher, committedOffsets, callback); + startFetcher(splitFetcher); + } + } + + private void commit( + SplitFetcher, RocketMQPartitionSplit> splitFetcher, + Map committedOffsets, + OffsetCommitCallback callback) { + RocketMQPartitionSplitReader rocketMQReader = + (RocketMQPartitionSplitReader) splitFetcher.getSplitReader(); + + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @SneakyThrows + @Override + public boolean run() { + rocketMQReader.notifyCheckpointComplete(committedOffsets, callback); + return true; + } + + @Override + public void wakeUp() {} + }); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java index 6db5075b..bb72f9ba 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java @@ -18,7 +18,10 @@ package org.apache.rocketmq.flink.source.table; -import org.apache.rocketmq.flink.common.RocketMQOptions; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; +import org.apache.rocketmq.flink.source.util.RocketMQOptionsUtil; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -28,39 +31,34 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; - -import java.text.ParseException; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.TimeZone; import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; import static org.apache.rocketmq.flink.common.RocketMQOptions.CONSUMER_GROUP; import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COMMIT_OFFSET_AUTO; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING; -import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME_STAMP; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_OFFSET_RESET_STRATEGY; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SECRET_KEY; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SQL; -import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET; -import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME; -import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG; -import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE; import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API; import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC; -import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST; +import static org.apache.rocketmq.flink.source.util.RocketMQOptionsUtil.validateStartUpMode; /** * Defines the {@link DynamicTableSourceFactory} implementation to create {@link @@ -68,8 +66,6 @@ */ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFactory { - private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - @Override public String factoryIdentifier() { return "rocketmq"; @@ -89,11 +85,7 @@ public Set> optionalOptions() { Set> optionalOptions = new HashSet<>(); optionalOptions.add(OPTIONAL_TAG); optionalOptions.add(OPTIONAL_SQL); - optionalOptions.add(OPTIONAL_START_MESSAGE_OFFSET); - optionalOptions.add(OPTIONAL_START_TIME_MILLS); - optionalOptions.add(OPTIONAL_START_TIME); - optionalOptions.add(OPTIONAL_END_TIME); - optionalOptions.add(OPTIONAL_TIME_ZONE); + optionalOptions.add(OPTIONAL_END_TIME_STAMP); optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); optionalOptions.add(OPTIONAL_USE_NEW_API); optionalOptions.add(OPTIONAL_ENCODING); @@ -104,6 +96,10 @@ public Set> optionalOptions() { optionalOptions.add(OPTIONAL_ACCESS_KEY); optionalOptions.add(OPTIONAL_SECRET_KEY); optionalOptions.add(OPTIONAL_SCAN_STARTUP_MODE); + optionalOptions.add(OPTIONAL_SCAN_OFFSET_RESET_STRATEGY); + optionalOptions.add(OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS); + optionalOptions.add(OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS); + optionalOptions.add(OPTIONAL_COMMIT_OFFSET_AUTO); return optionalOptions; } @@ -118,55 +114,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS); String tag = configuration.getString(OPTIONAL_TAG); String sql = configuration.getString(OPTIONAL_SQL); - if (configuration.contains(OPTIONAL_SCAN_STARTUP_MODE) - && (configuration.contains(OPTIONAL_START_MESSAGE_OFFSET) - || configuration.contains(OPTIONAL_START_TIME_MILLS) - || configuration.contains(OPTIONAL_START_TIME))) { - throw new IllegalArgumentException( - String.format( - "cannot support these configs when %s has been set: [%s, %s, %s] !", - OPTIONAL_SCAN_STARTUP_MODE.key(), - OPTIONAL_START_MESSAGE_OFFSET.key(), - OPTIONAL_START_TIME.key(), - OPTIONAL_START_TIME_MILLS.key())); - } - long startMessageOffset = configuration.getLong(OPTIONAL_START_MESSAGE_OFFSET); - long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS); - String startDateTime = configuration.getString(OPTIONAL_START_TIME); - String timeZone = configuration.getString(OPTIONAL_TIME_ZONE); + validateStartUpMode(configuration); String accessKey = configuration.getString(OPTIONAL_ACCESS_KEY); String secretKey = configuration.getString(OPTIONAL_SECRET_KEY); - long startTime = startTimeMs; - if (startTime == -1) { - if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) { - try { - startTime = parseDateString(startDateTime, timeZone); - } catch (ParseException e) { - throw new RuntimeException( - String.format( - "Incorrect datetime format: %s, pls use ISO-8601 " - + "complete date plus hours, minutes and seconds format:%s.", - startDateTime, DATE_FORMAT), - e); - } - } - } - long stopInMs = Long.MAX_VALUE; - String endDateTime = configuration.getString(OPTIONAL_END_TIME); - if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) { - try { - stopInMs = parseDateString(endDateTime, timeZone); - } catch (ParseException e) { - throw new RuntimeException( - String.format( - "Incorrect datetime format: %s, pls use ISO-8601 " - + "complete date plus hours, minutes and seconds format:%s.", - endDateTime, DATE_FORMAT), - e); - } - Preconditions.checkArgument( - stopInMs >= startTime, "Start time should be less than stop time."); - } + long stopInMs = configuration.getLong(OPTIONAL_END_TIME_STAMP); long partitionDiscoveryIntervalMs = configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS); boolean useNewApi = configuration.getBoolean(OPTIONAL_USE_NEW_API); @@ -175,12 +126,18 @@ public DynamicTableSource createDynamicTableSource(Context context) { TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); descriptorProperties.putTableSchema("schema", physicalSchema); - String consumerOffsetMode = - configuration.getString( - RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE, CONSUMER_OFFSET_LATEST); + OffsetResetStrategy offsetResetStrategy = + configuration.get(OPTIONAL_SCAN_OFFSET_RESET_STRATEGY); long consumerOffsetTimestamp = - configuration.getLong( - RocketMQOptions.OPTIONAL_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()); + configuration.getLong(OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS); + Map specificOffsets = new HashMap<>(); + StartupMode startupMode = configuration.get(OPTIONAL_SCAN_STARTUP_MODE); + if (startupMode.equals(StartupMode.SPECIFIC_OFFSETS)) { + String specificOffsetsStr = + configuration.getString(OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS); + specificOffsets = RocketMQOptionsUtil.parseSpecificOffsets(specificOffsetsStr, topic); + } + boolean commitOffsetAuto = configuration.getBoolean(OPTIONAL_COMMIT_OFFSET_AUTO); return new RocketMQScanTableSource( descriptorProperties, physicalSchema, @@ -192,17 +149,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { tag, sql, stopInMs, - startMessageOffset, - startMessageOffset < 0 ? startTime : -1L, + useNewApi, partitionDiscoveryIntervalMs, - consumerOffsetMode, + startupMode, + offsetResetStrategy, + specificOffsets, consumerOffsetTimestamp, - useNewApi); - } - - private Long parseDateString(String dateString, String timeZone) throws ParseException { - FastDateFormat simpleDateFormat = - FastDateFormat.getInstance(DATE_FORMAT, TimeZone.getTimeZone(timeZone)); - return simpleDateFormat.parse(dateString).getTime(); + commitOffsetAuto); } } diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java index dc92a478..be316308 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java +++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java @@ -17,8 +17,11 @@ package org.apache.rocketmq.flink.source.table; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.flink.legacy.RocketMQConfig; import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema; import org.apache.rocketmq.flink.legacy.common.serialization.RowKeyValueDeserializationSchema; import org.apache.rocketmq.flink.source.RocketMQSource; @@ -56,9 +59,6 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading private final DescriptorProperties properties; private final TableSchema schema; - private final String consumerOffsetMode; - private final long consumerOffsetTimestamp; - private final String topic; private final String consumerGroup; private final String nameServerAddress; @@ -70,9 +70,14 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading private final long stopInMs; private final long partitionDiscoveryIntervalMs; - private final long startMessageOffset; - private final long startTime; private final boolean useNewApi; + private final boolean commitOffsetAuto; + + // consumer strategy + private final StartupMode startMode; + private final OffsetResetStrategy offsetResetStrategy; + private final Map specificOffsets; + private final long consumerOffsetTimestamp; private List metadataKeys; @@ -87,12 +92,13 @@ public RocketMQScanTableSource( String tag, String sql, long stopInMs, - long startMessageOffset, - long startTime, + boolean useNewApi, long partitionDiscoveryIntervalMs, - String consumerOffsetMode, + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + Map specificOffsets, long consumerOffsetTimestamp, - boolean useNewApi) { + boolean commitOffsetAuto) { this.properties = properties; this.schema = schema; this.topic = topic; @@ -103,13 +109,14 @@ public RocketMQScanTableSource( this.tag = tag; this.sql = sql; this.stopInMs = stopInMs; - this.startMessageOffset = startMessageOffset; - this.startTime = startTime; this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; this.useNewApi = useNewApi; this.metadataKeys = Collections.emptyList(); - this.consumerOffsetMode = consumerOffsetMode; + this.startMode = startMode; + this.offsetResetStrategy = offsetResetStrategy; + this.specificOffsets = specificOffsets; this.consumerOffsetTimestamp = consumerOffsetTimestamp; + this.commitOffsetAuto = commitOffsetAuto; } @Override @@ -130,18 +137,25 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { tag, sql, stopInMs, - startTime, - startMessageOffset < 0 ? 0 : startMessageOffset, partitionDiscoveryIntervalMs, isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED, createRocketMQDeserializationSchema(), - consumerOffsetMode, - consumerOffsetTimestamp)); + startMode, + offsetResetStrategy, + specificOffsets, + consumerOffsetTimestamp, + commitOffsetAuto)); } else { - return SourceFunctionProvider.of( + RocketMQSourceFunction rocketMQSource = new RocketMQSourceFunction<>( - createKeyValueDeserializationSchema(), getConsumerProps()), - isBounded()); + createKeyValueDeserializationSchema(), getConsumerProps()); + setStartupMode( + rocketMQSource, + startMode, + offsetResetStrategy, + specificOffsets, + consumerOffsetTimestamp); + return SourceFunctionProvider.of(rocketMQSource, isBounded()); } } @@ -172,12 +186,13 @@ public DynamicTableSource copy() { tag, sql, stopInMs, - startMessageOffset, - startTime, + useNewApi, partitionDiscoveryIntervalMs, - consumerOffsetMode, + startMode, + offsetResetStrategy, + specificOffsets, consumerOffsetTimestamp, - useNewApi); + commitOffsetAuto); tableSource.metadataKeys = metadataKeys; return tableSource; } @@ -220,13 +235,38 @@ private Properties getConsumerProps() { consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag); consumerProps.setProperty(RocketMQConfig.CONSUMER_SQL, sql); - consumerProps.setProperty( - RocketMQConfig.CONSUMER_START_MESSAGE_OFFSET, String.valueOf(startMessageOffset)); consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey); consumerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey); return consumerProps; } + private void setStartupMode( + RocketMQSourceFunction rocketMQSource, + StartupMode startMode, + OffsetResetStrategy offsetResetStrategy, + Map specificOffsets, + long consumerOffsetTimestamp) { + switch (startMode) { + case LATEST: + rocketMQSource.setStartFromLatest(); + break; + case EARLIEST: + rocketMQSource.setStartFromEarliest(); + break; + case GROUP_OFFSETS: + rocketMQSource.setStartFromGroupOffsets(offsetResetStrategy); + break; + case SPECIFIC_OFFSETS: + rocketMQSource.setStartFromSpecificOffsets(specificOffsets); + break; + case TIMESTAMP: + rocketMQSource.setStartFromTimeStamp(consumerOffsetTimestamp); + break; + default: + break; + } + } + // -------------------------------------------------------------------------------------------- // Metadata handling // -------------------------------------------------------------------------------------------- diff --git a/src/main/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtil.java b/src/main/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtil.java new file mode 100644 index 00000000..16ad0bd4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtil.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.util; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.RocketMQOptions; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC; + +/** util for {@link RocketMQOptions} */ +public class RocketMQOptionsUtil { + + private static final String BROKER = "broker"; + private static final String QUEUE = "queue"; + private static final String OFFSET = "offset"; + + /** + * Parse SpecificOffsets String to Map + * + * @param specificOffsetsStr scan.startup.specific-offsets = + * broker:a,queue:0,offset:123;broker:a,queue:1,offset:38 + * @param topicName topic name + * @return SpecificOffsets with Map format, key is messageQueue, and value is offset + */ + public static Map parseSpecificOffsets( + String specificOffsetsStr, String topicName) { + final Map offsetMap = new HashMap<>(); + final String[] pairs = specificOffsetsStr.split(";"); + final String validationExceptionMessage = + String.format( + "Invalid properties '%s' should follow the format " + + "'broker:a,queue:0,offset:123;broker:a,queue:1,offset:38', but is '%s'.", + OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS.key(), specificOffsetsStr); + + if (pairs.length == 0) { + throw new ValidationException(validationExceptionMessage); + } + + for (String pair : pairs) { + if (StringUtils.isBlank(pair)) { + throw new ValidationException(validationExceptionMessage); + } + + final String[] kv = pair.split(","); + if (kv.length != 3 + || !kv[0].startsWith(BROKER + ':') + || !kv[1].startsWith(QUEUE + ':') + || !kv[2].startsWith(OFFSET + ':')) { + throw new ValidationException(validationExceptionMessage); + } + + String brokerValue = kv[0].substring(kv[0].indexOf(":") + 1); + String queueValue = kv[1].substring(kv[1].indexOf(":") + 1); + String offsetValue = kv[2].substring(kv[2].indexOf(":") + 1); + try { + final Integer queue = Integer.valueOf(queueValue); + final Long offset = Long.valueOf(offsetValue); + MessageQueue messageQueue = new MessageQueue(topicName, brokerValue, queue); + offsetMap.put(messageQueue, offset); + } catch (NumberFormatException e) { + throw new ValidationException(validationExceptionMessage, e); + } + } + return offsetMap; + } + + /** validate properties when startUpMode is set */ + public static void validateStartUpMode(Configuration configuration) { + configuration + .getOptional(OPTIONAL_SCAN_STARTUP_MODE) + .ifPresent( + mode -> { + switch (mode) { + case SPECIFIC_OFFSETS: + if (!configuration + .getOptional(OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' startup mode" + + " but missing.", + OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS + .key(), + StartupMode.SPECIFIC_OFFSETS)); + } + String specificOffsets = + configuration.getString( + OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS); + String topic = configuration.getString(TOPIC); + RocketMQOptionsUtil.parseSpecificOffsets( + specificOffsets, topic); + break; + case TIMESTAMP: + if (!configuration + .getOptional(OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' startup mode" + + " but missing.", + OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS + .key(), + StartupMode.TIMESTAMP)); + } + break; + default: + break; + } + }); + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 00000000..7ecf1481 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %C: %m%n + diff --git a/src/test/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilderTest.java b/src/test/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilderTest.java new file mode 100644 index 00000000..ea01c397 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/source/RocketMQSourceBuilderTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.RocketMQOptions; +import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; +import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQValueOnlyDeserializationSchemaWrapper; +import org.apache.rocketmq.flink.source.reader.deserializer.SimpleStringSchema; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +/** Tests for {@link RocketMQSourceBuilder}. */ +public class RocketMQSourceBuilderTest { + + private RocketMQSourceBuilder builder; + + @Before + public void open() { + builder = + new RocketMQSourceBuilder() + .setNameServerAddress("localhost:5789") + .setTopic("tp_test") + .setConsumerGroup("group_test") + .setDeserializer( + new RocketMQValueOnlyDeserializationSchemaWrapper<>( + new SimpleStringSchema())); + } + + @Test + public void testPartitionDiscoverOnBoundedness() { + final IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> + builder.setStopInMs(3000L) + .setPartitionDiscoveryIntervalMs(50000L) + .build()); + MatcherAssert.assertThat( + exception.getMessage(), + CoreMatchers.containsString("Bounded stream didn't support partitionDiscovery.")); + } + + @Test + public void testStartFromEarliest() { + RocketMQSource source = builder.setStartFromEarliest().build(); + Assert.assertEquals( + StartupMode.EARLIEST, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE)); + Assert.assertEquals( + Long.MAX_VALUE, + source.getConfiguration().getLong(RocketMQOptions.OPTIONAL_END_TIME_STAMP)); + } + + @Test + public void testStartFromLatest() { + RocketMQSource source = builder.setStartFromLatest().build(); + Assert.assertEquals( + StartupMode.LATEST, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE)); + Assert.assertEquals( + Long.MAX_VALUE, + source.getConfiguration().getLong(RocketMQOptions.OPTIONAL_END_TIME_STAMP)); + } + + @Test + public void testStartFromTimeStamp() { + long startFlag = 1666794040000L; + RocketMQSource source = builder.setStartFromTimeStamp(startFlag).build(); + Assert.assertEquals( + StartupMode.TIMESTAMP, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE)); + Assert.assertEquals( + startFlag, + source.getConfiguration() + .getLong(RocketMQOptions.OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS)); + Assert.assertEquals( + Long.MAX_VALUE, + source.getConfiguration().getLong(RocketMQOptions.OPTIONAL_END_TIME_STAMP)); + } + + @Test + public void testStartFromGroupOffsets() { + RocketMQSource source = builder.setStartFromGroupOffsets().build(); + Assert.assertEquals( + StartupMode.GROUP_OFFSETS, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE)); + Assert.assertEquals( + OffsetResetStrategy.LATEST, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_OFFSET_RESET_STRATEGY)); + Assert.assertEquals( + OffsetResetStrategy.EARLIEST, + builder.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST) + .build() + .getConfiguration() + .get(RocketMQOptions.OPTIONAL_SCAN_OFFSET_RESET_STRATEGY)); + } + + @Test + public void StartFromSpecificOffsets() { + HashMap specificOffsets = new HashMap<>(); + specificOffsets.put(new MessageQueue("topic", "broker", 0), 1L); + specificOffsets.put(new MessageQueue("topic", "broker", 1), 2L); + RocketMQSource source = + builder.setStartFromSpecificOffsets(specificOffsets) + .setCommitOffsetAuto(true) + .build(); + Assert.assertEquals( + StartupMode.SPECIFIC_OFFSETS, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE)); + Assert.assertEquals( + OffsetResetStrategy.LATEST, + source.getConfiguration().get(RocketMQOptions.OPTIONAL_SCAN_OFFSET_RESET_STRATEGY)); + Assert.assertEquals( + specificOffsets.toString(), + source.getConfiguration() + .get(RocketMQOptions.OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS)); + } +} diff --git a/src/test/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtilTest.java b/src/test/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtilTest.java new file mode 100644 index 00000000..ac60a94f --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/source/util/RocketMQOptionsUtilTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.source.util; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.legacy.common.config.StartupMode; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_MODE; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS; + +/** Tests for {@link RocketMQOptionsUtil}. */ +public class RocketMQOptionsUtilTest { + + @Test + public void testParseSpecificOffsets() { + String topic = "tp_test"; + Map expect = new HashMap<>(); + expect.put(new MessageQueue(topic, "a", 0), 123L); + expect.put(new MessageQueue(topic, "a", 1), 38L); + String specificOffsetsStr = "broker:a,queue:0,offset:123;broker:a,queue:1,offset:38"; + Map actual = + RocketMQOptionsUtil.parseSpecificOffsets(specificOffsetsStr, topic); + Assert.assertEquals(expect, actual); + } + + @Test(expected = ValidationException.class) + public void testException() { + String topic = "tp_test"; + String specificOffsetsStr = ""; + RocketMQOptionsUtil.parseSpecificOffsets(specificOffsetsStr, topic); + } + + @Test(expected = ValidationException.class) + public void testException2() { + String topic = "tp_test"; + String specificOffsetsStr = "broker:a,queue:0,offset:123;broker:a,queue:1"; + RocketMQOptionsUtil.parseSpecificOffsets(specificOffsetsStr, topic); + } + + @Test + public void testValidateStartUpMode() { + // specific-offsets + Configuration conf = new Configuration(); + conf.set(OPTIONAL_SCAN_STARTUP_MODE, StartupMode.SPECIFIC_OFFSETS); + try { + RocketMQOptionsUtil.validateStartUpMode(conf); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.assertTrue(e instanceof ValidationException); + } + conf.set( + OPTIONAL_SCAN_STARTUP_SPECIFIC_OFFSETS, + "broker:a,queue:0,offset:123;broker:a,queue:1,offset:38"); + RocketMQOptionsUtil.validateStartUpMode(conf); + // timestamp + conf.set(OPTIONAL_SCAN_STARTUP_MODE, StartupMode.TIMESTAMP); + try { + RocketMQOptionsUtil.validateStartUpMode(conf); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.assertTrue(e instanceof ValidationException); + } + conf.set(OPTIONAL_SCAN_STARTUP_TIMESTAMP_MILLIS, 200L); + RocketMQOptionsUtil.validateStartUpMode(conf); + } +}