From b69fb21dc82e7922f7b7e65c94c154d56e442e5e Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 20 Jun 2018 14:53:06 -0700 Subject: [PATCH 1/2] Add a simple pulsar source connector. --- .../flink-connector-pulsar/pom.xml | 102 ++++ .../streaming/connectors/pulsar/Defaults.java | 29 + .../streaming/connectors/pulsar/Metrics.java | 22 + .../pulsar/PulsarConsumerSource.java | 203 +++++++ .../connectors/pulsar/PulsarSourceBase.java | 30 + .../pulsar/PulsarSourceBuilder.java | 113 ++++ .../pulsar/PulsarConsumerSourceTests.java | 513 ++++++++++++++++++ flink-connectors/pom.xml | 1 + .../flink-examples-streaming/pom.xml | 41 ++ 9 files changed, 1054 insertions(+) create mode 100644 flink-connectors/flink-connector-pulsar/pom.xml create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Metrics.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java create mode 100644 flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml new file mode 100644 index 0000000000000..70711dc6389c7 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -0,0 +1,102 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.6-SNAPSHOT + .. + + + flink-connector-pulsar_${scala.binary.version} + flink-connector-pulsar + + 2.0.0-rc1-incubating + + + jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-table_${scala.binary.version} + ${project.version} + provided + + true + + + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-tests_${scala.binary.version} + ${project.version} + test + test-jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${project.version} + test + + + + org.javassist + javassist + 3.20.0-GA + test + + + diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java new file mode 100644 index 0000000000000..f12d74d2dd380 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Defaults.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +/** + * Default values for Pulsar connectors. + */ +public class Defaults { + + public static final String SERVICE_URL = "pulsar://localhost:6650"; + public static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100; + + private Defaults() {} +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Metrics.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Metrics.java new file mode 100644 index 0000000000000..f0a5d39a26e18 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/Metrics.java @@ -0,0 +1,22 @@ +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.Internal; + +/** + * A collection of Pulsar consumer metrics related constant strings. + * + *

The names must not be changed, as that would break backward compatibility for the consumer's metrics. + */ +@Internal +public class Metrics { + + // ------------------------------------------------------------------------ + // Per-subtask metrics + // ------------------------------------------------------------------------ + + public static final String COMMITS_SUCCEEDED_METRICS_COUNTER = "commitsSucceeded"; + public static final String COMMITS_FAILED_METRICS_COUNTER = "commitsFailed"; + + private Metrics() { + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java new file mode 100644 index 0000000000000..d66971bfbc800 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + *

When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource extends MessageAcknowledgingSourceBase implements PulsarSourceBase { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema deserializer; + + private PulsarClient client; + private Consumer consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return; + } + + Map> futures = new HashMap<>(messageIds.size()); + for (MessageId id : messageIds) { + futures.put(id.toString(), consumer.acknowledgeAsync(id)); + } + + futures.forEach((k, f) -> { + try { + f.get(); + } catch (Exception e) { + LOG.error("failed to acknowledge messageId " + k, e); + throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e); + } + }); + } + + @Override + public void run(SourceContext context) throws Exception { + Message message; + while (isRunning) { + message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS); + if (message == null) { + LOG.info("unexpected null message"); + continue; + } + + if (isCheckpointingEnabled) { + emitCheckpointing(context, message); + } else { + emitAutoAcking(context, message); + } + } + } + + private void emitCheckpointing(SourceContext context, Message message) throws IOException { + synchronized (context.getCheckpointLock()) { + if (!addId(message.getMessageId())) { + if (LOG.isDebugEnabled()) { + LOG.debug("messageId=" + message.getMessageId().toString() + " already processed."); + } + return; + } + context.collect(deserialize(message)); + totalMessageCount++; + } + } + + private void emitAutoAcking(SourceContext context, Message message) throws IOException { + context.collect(deserialize(message)); + batchCount++; + totalMessageCount++; + if (batchCount >= acknowledgementBatchSize) { + LOG.info("processed {} messages acknowledging messageId {}", batchCount, message.getMessageId()); + consumer.acknowledgeCumulative(message.getMessageId()); + batchCount = 0; + } + } + + private T deserialize(Message message) throws IOException { + return deserializer.deserialize(message.getData()); + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void close() throws Exception { + super.close(); + IOUtils.cleanup(LOG, consumer); + IOUtils.cleanup(LOG, client); + } + + @Override + public TypeInformation getProducedType() { + return deserializer.getProducedType(); + } + + boolean isCheckpointingEnabled() { + return isCheckpointingEnabled; + } + + PulsarClient createClient() throws PulsarClientException { + return PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + } + + Consumer createConsumer(PulsarClient client) throws PulsarClientException { + return client.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java new file mode 100644 index 0000000000000..bdfff350caf77 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBase.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; + +/** + * Base class for pulsar sources. + * @param + */ +@PublicEvolving +interface PulsarSourceBase extends ParallelSourceFunction, ResultTypeQueryable { +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java new file mode 100644 index 0000000000000..54baed06838d1 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Preconditions; + +/** + * A class for building a pulsar source. + */ +@PublicEvolving +public class PulsarSourceBuilder { + + final DeserializationSchema deserializationSchema; + String serviceUrl = Defaults.SERVICE_URL; + String topic; + String subscriptionName = "flink-sub"; + long acknowledgementBatchSize = Defaults.ACKNOWLEDGEMENT_BATCH_SIZE; + + private PulsarSourceBuilder(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + /** + * Sets the pulsar service url to connect to. Defaults to pulsar://localhost:6650. + * + * @param serviceUrl service url to connect to + * @return this builder + */ + public PulsarSourceBuilder serviceUrl(String serviceUrl) { + Preconditions.checkNotNull(serviceUrl); + this.serviceUrl = serviceUrl; + return this; + } + + /** + * Sets the topic to consumer from. This is required. + * + *

Topic names (https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics) + * are in the following format: + * {persistent|non-persistent}://tenant/namespace/topic + * + * @param topic the topic to consumer from + * @return this builder + */ + public PulsarSourceBuilder topic(String topic) { + Preconditions.checkNotNull(topic); + this.topic = topic; + return this; + } + + /** + * Sets the subscription name for the topic consumer. Defaults to flink-sub. + * + * @param subscriptionName the subscription name for the topic consumer + * @return this builder + */ + public PulsarSourceBuilder subscriptionName(String subscriptionName) { + Preconditions.checkNotNull(subscriptionName); + this.subscriptionName = subscriptionName; + return this; + } + + /** + * Sets the number of messages to receive before acknowledging. This defaults to 100. This + * value is only used when checkpointing is disabled. + * + * @param size number of messages to receive before acknowledging + * @return this builder + */ + public PulsarSourceBuilder acknowledgementBatchSize(long size) { + if (size > 0) { + acknowledgementBatchSize = size; + } + return this; + } + + public SourceFunction build() { + Preconditions.checkNotNull(serviceUrl, "a service url is required"); + Preconditions.checkNotNull(topic, "a topic is required"); + Preconditions.checkNotNull(subscriptionName, "a subscription name is required"); + + return new PulsarConsumerSource<>(this); + } + + /** + * Creates a PulsarSourceBuilder. + * + * @param deserializationSchema the deserializer used to convert between Pulsar's byte messages and Flink's objects. + * @return a builder + */ + public static PulsarSourceBuilder builder(DeserializationSchema deserializationSchema) { + Preconditions.checkNotNull(deserializationSchema); + return new PulsarSourceBuilder<>(deserializationSchema); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java new file mode 100644 index 0000000000000..33dea1cf62530 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; + +/** + * Tests for the PulsarConsumerSource. The source supports two operation modes. + * 1) At-least-once (when checkpointed) with Pulsar message acknowledgements and the deduplication mechanism in + * {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.. + * 3) No strong delivery guarantees (without checkpointing) with Pulsar acknowledging messages after + * after it receives x number of messages. + * + *

This tests assumes that the MessageIds are increasing monotonously. That doesn't have to be the + * case. The MessageId is used to uniquely identify messages. + */ +public class PulsarConsumerSourceTests { + + private PulsarConsumerSource source; + + private TestConsumer consumer; + + private TestSourceContext context; + + private Thread sourceThread; + + private Exception exception; + + @Before + public void before() { + context = new TestSourceContext(); + + sourceThread = new Thread(() -> { + try { + source.run(context); + } catch (Exception e) { + exception = e; + } + }); + } + + @After + public void after() throws Exception { + if (source != null) { + source.cancel(); + } + if (sourceThread != null) { + sourceThread.join(); + } + } + + @Test + public void testCheckpointing() throws Exception { + final int numMessages = 5; + consumer = new TestConsumer(numMessages); + + source = createSource(consumer, 1, true); + source.open(new Configuration()); + + final StreamSource> src = new StreamSource<>(source); + final AbstractStreamOperatorTestHarness testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + + testHarness.open(); + + sourceThread.start(); + + final Random random = new Random(System.currentTimeMillis()); + for (int i = 0; i < 3; ++i) { + + // wait and receive messages from the test consumer + receiveMessages(); + + final long snapshotId = random.nextLong(); + OperatorSubtaskState data; + synchronized (context.getCheckpointLock()) { + data = testHarness.snapshot(snapshotId, System.currentTimeMillis()); + } + + final TestPulsarConsumerSource sourceCopy = + createSource(Mockito.mock(Consumer.class), 1, true); + final StreamSource srcCopy = new StreamSource<>(sourceCopy); + final AbstractStreamOperatorTestHarness testHarnessCopy = + new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0); + + testHarnessCopy.setup(); + testHarnessCopy.initializeState(data); + testHarnessCopy.open(); + + final ArrayDeque>> deque = sourceCopy.getRestoredState(); + final Set messageIds = deque.getLast().f1; + + final int start = consumer.currentMessage.get() - numMessages; + for (int mi = start; mi < (start + numMessages); ++mi) { + Assert.assertTrue(messageIds.contains(consumer.messages.get(mi).getMessageId())); + } + + // check if the messages are being acknowledged + synchronized (context.getCheckpointLock()) { + source.notifyCheckpointComplete(snapshotId); + + Assert.assertEquals(consumer.acknowledgedIds.keySet(), messageIds); + // clear acknowledgements for the next snapshot comparison + consumer.acknowledgedIds.clear(); + } + + final int lastMessageIndex = consumer.currentMessage.get(); + consumer.addMessages(createMessages(lastMessageIndex, 5)); + } + } + + @Test + public void testCheckpointingDuplicatedIds() throws Exception { + consumer = new TestConsumer(5); + + source = createSource(consumer, 1, true); + source.open(new Configuration()); + + sourceThread.start(); + + receiveMessages(); + + Assert.assertEquals(5, context.elements.size()); + + // try to reprocess the messages we should not collect any more elements + consumer.reset(); + + receiveMessages(); + + Assert.assertEquals(5, context.elements.size()); + } + + @Test + public void testCheckpointingDisabledMessagesEqualBatchSize() throws Exception { + + consumer = new TestConsumer(5); + + source = createSource(consumer, 5, false); + source.open(new Configuration()); + + sourceThread.start(); + + receiveMessages(); + + Assert.assertEquals(1, consumer.acknowledgedIds.size()); + } + + @Test + public void testCheckpointingDisabledMoreMessagesThanBatchSize() throws Exception { + + consumer = new TestConsumer(6); + + source = createSource(consumer, 5, false); + source.open(new Configuration()); + + sourceThread.start(); + + receiveMessages(); + + Assert.assertEquals(1, consumer.acknowledgedIds.size()); + } + + @Test + public void testCheckpointingDisabledLessMessagesThanBatchSize() throws Exception { + + consumer = new TestConsumer(4); + + source = createSource(consumer, 5, false); + source.open(new Configuration()); + + sourceThread.start(); + + receiveMessages(); + + Assert.assertEquals(0, consumer.acknowledgedIds.size()); + } + + @Test + public void testCheckpointingDisabledMessages2XBatchSize() throws Exception { + + consumer = new TestConsumer(10); + + source = createSource(consumer, 5, false); + source.open(new Configuration()); + + sourceThread.start(); + + receiveMessages(); + + Assert.assertEquals(2, consumer.acknowledgedIds.size()); + } + + private void receiveMessages() throws InterruptedException { + while (consumer.currentMessage.get() < consumer.messages.size()) { + Thread.sleep(5); + } + } + + private TestPulsarConsumerSource createSource(Consumer testConsumer, + long batchSize, boolean isCheckpointingEnabled) throws Exception { + PulsarSourceBuilder builder = + PulsarSourceBuilder.builder(new SimpleStringSchema()) + .acknowledgementBatchSize(batchSize); + TestPulsarConsumerSource source = new TestPulsarConsumerSource(builder, testConsumer, isCheckpointingEnabled); + + OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class); + FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class); + Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore); + Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null); + + source.initializeState(mockContext); + + return source; + } + + private static class TestPulsarConsumerSource extends PulsarConsumerSource { + + private ArrayDeque>> restoredState; + + private Consumer testConsumer; + private boolean isCheckpointingEnabled; + + TestPulsarConsumerSource(PulsarSourceBuilder builder, + Consumer testConsumer, boolean isCheckpointingEnabled) { + super(builder); + this.testConsumer = testConsumer; + this.isCheckpointingEnabled = isCheckpointingEnabled; + } + + @Override + protected boolean addId(MessageId messageId) { + Assert.assertEquals(true, isCheckpointingEnabled()); + return super.addId(messageId); + } + + @Override + public RuntimeContext getRuntimeContext() { + StreamingRuntimeContext context = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(context.isCheckpointingEnabled()).thenReturn(isCheckpointingEnabled); + return context; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + super.initializeState(context); + this.restoredState = this.pendingCheckpoints; + } + + public ArrayDeque>> getRestoredState() { + return this.restoredState; + } + + @Override + PulsarClient createClient() { + return Mockito.mock(PulsarClient.class); + } + + @Override + Consumer createConsumer(PulsarClient client) { + return testConsumer; + } + } + + private static class TestSourceContext implements SourceFunction.SourceContext { + + private static final Object lock = new Object(); + + private final List elements = new ArrayList<>(); + + @Override + public void collect(String element) { + elements.add(element); + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public void markAsTemporarilyIdle() { + + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + + } + } + + private static class TestConsumer implements Consumer { + + private final List messages = new ArrayList<>(); + + private AtomicInteger currentMessage = new AtomicInteger(); + + private final Map acknowledgedIds = new ConcurrentHashMap<>(); + + private TestConsumer(int numMessages) { + messages.addAll(createMessages(0, numMessages)); + } + + private void reset() { + currentMessage.set(0); + } + + @Override + public String getTopic() { + return null; + } + + @Override + public String getSubscription() { + return null; + } + + @Override + public void unsubscribe() throws PulsarClientException { + + } + + @Override + public CompletableFuture unsubscribeAsync() { + return null; + } + + @Override + public Message receive() throws PulsarClientException { + return null; + } + + public synchronized void addMessages(List messages) { + this.messages.addAll(messages); + } + + @Override + public CompletableFuture> receiveAsync() { + return null; + } + + @Override + public Message receive(int i, TimeUnit timeUnit) throws PulsarClientException { + synchronized (this) { + if (currentMessage.get() == messages.size()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + System.out.println("no more messages sleeping index: " + currentMessage.get()); + } + return null; + } + return messages.get(currentMessage.getAndIncrement()); + } + } + + @Override + public void acknowledge(Message message) throws PulsarClientException { + + } + + @Override + public void acknowledge(MessageId messageId) throws PulsarClientException { + + } + + @Override + public void acknowledgeCumulative(Message message) throws PulsarClientException { + + } + + @Override + public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException { + acknowledgedIds.put(messageId, messageId); + } + + @Override + public CompletableFuture acknowledgeAsync(Message message) { + return null; + } + + @Override + public CompletableFuture acknowledgeAsync(MessageId messageId) { + acknowledgedIds.put(messageId, messageId); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture acknowledgeCumulativeAsync(Message message) { + return null; + } + + @Override + public CompletableFuture acknowledgeCumulativeAsync(MessageId messageId) { + return null; + } + + @Override + public ConsumerStats getStats() { + return null; + } + + @Override + public void close() throws PulsarClientException { + + } + + @Override + public CompletableFuture closeAsync() { + return null; + } + + @Override + public boolean hasReachedEndOfTopic() { + return false; + } + + @Override + public void redeliverUnacknowledgedMessages() { + + } + + @Override + public void seek(MessageId messageId) throws PulsarClientException { + + } + + @Override + public CompletableFuture seekAsync(MessageId messageId) { + return null; + } + } + + private static List createMessages(int startIndex, int numMessages) { + final List messages = new ArrayList<>(); + for (int i = startIndex; i < (startIndex + numMessages); ++i) { + String content = "message-" + i; + messages.add(createMessage(content, createMessageId(1, i + 1, 1))); + } + return messages; + } + + private static Message createMessage(String content, String messageId) { + return new MessageImpl<>(messageId, Collections.emptyMap(), content.getBytes(), Schema.BYTES); + } + + private static String createMessageId(long ledgerId, long entryId, long partitionIndex) { + return String.format("%d:%d:%d", ledgerId, entryId, partitionIndex); + } +} diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 782d2beb66c46..8d9063ec3ae4f 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -55,6 +55,7 @@ under the License. flink-connector-nifi flink-connector-cassandra flink-connector-filesystem + flink-connector-pulsar