Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f91d7b
[fix][test] Fix ClientTlsTest license (#18204)
nodece Oct 26, 2022
29461bd
[doc] [client] [go] Add chunking to go-client doc (#17789)
Gleiphir2769 Oct 26, 2022
6657fe4
[fix][sql] Fix jline version to 3.21.0 (#18207)
tisonkun Oct 27, 2022
a48bc8b
[improve][doc] Improve the authentication enablement workflow across …
momo-jun Oct 27, 2022
59e00ba
[improve][doc] cherry-pick cpp client docs installation section (#18188)
tisonkun Oct 27, 2022
5c1f8af
[improve][ci] Add schedule trigger to get master branch code coverage…
yaalsn Oct 27, 2022
3d7f9e5
[fix][doc] Optimize URLs for CLI tools page (#18101)
Anonymitaet Oct 27, 2022
1c78e0a
[improve][broker] Support setting forceDeleteTenantAllowed dynamicall…
Technoboy- Oct 27, 2022
5b7c5c6
[improve][broker] Add UncaughtExceptionHandler for every thread pool …
315157973 Oct 27, 2022
2c9e729
[improve][ml] Remove the redundant judgment logic of ManagedCursorImp…
Pomelongan Oct 27, 2022
c7990b9
[fix][test] Fix flaky test `testDoNotGetOffloadPoliciesMultipleTimesW…
Technoboy- Oct 27, 2022
b061c6a
[improve][broker] Remove locallyAcquiredLock when removeOwnership (#1…
leizhiyuan Oct 27, 2022
5e3f8ba
[improve][client] support aggregate metrics for partition topic stats…
rdhabalia Oct 27, 2022
fad3ccc
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser…
Oct 27, 2022
b193051
Add HTTP Sink (#17581)
cbornet Oct 27, 2022
0bfbda8
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 19, 2022
6b3d606
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 25, 2022
d51fc1f
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 25, 2022
31ce1e5
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 25, 2022
eb3c6e1
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 25, 2022
99cbd4a
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 27, 2022
396be61
[improve][pulsar-broker] Add option to  unloadNamespaceBundle with bu…
Oct 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ on:
pull_request:
branches:
- master
schedule:
- cron: '0 12 * * *'
workflow_dispatch:

concurrency:
Expand Down
8 changes: 6 additions & 2 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ set -e
set -o pipefail
set -o errexit

MVN_TEST_OPTIONS='mvn -Pcoverage -B -ntp -DskipSourceReleaseAssembly=true -DskipBuildDistribution=true -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true'
MVN_TEST_OPTIONS='mvn -B -ntp -DskipSourceReleaseAssembly=true -DskipBuildDistribution=true -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true'

function mvn_test() {
(
Expand All @@ -33,7 +33,11 @@ function mvn_test() {
clean_arg="clean"
shift
fi
TARGET=verify
if echo "${FUNCNAME[@]}" | grep "flaky"; then
TARGET="verify"
else
TARGET="verify -Pcoverage"
fi
if [[ "$1" == "--install" ]]; then
TARGET="install"
shift
Expand Down
1 change: 1 addition & 0 deletions deployment/terraform-ansible/deploy-pulsar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
# - jdbc-mariadb
# - jdbc-postgres
# - jdbc-sqlite
# - http
- kafka
# - kafka-connect-adaptor
# - kinesis
Expand Down
1 change: 1 addition & 0 deletions distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/rabbitmq/target/pulsar-io-rabbitmq-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/nsq/target/pulsar-io-nsq-${project.version}.nar</source></file>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
if (config.isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
Expand Down Expand Up @@ -1227,7 +1227,7 @@ public void operationComplete() {
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null, null);
individualDeletedMessages.clear();
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newPosition.ackSet;
Expand Down Expand Up @@ -1866,7 +1866,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie

PositionImpl newPosition = (PositionImpl) position;

if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
if (newPosition.ackSet != null) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
Expand Down Expand Up @@ -2049,7 +2049,7 @@ public void operationComplete() {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
false, PositionImpl.get(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
Expand Down Expand Up @@ -2178,7 +2178,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
Expand All @@ -2190,7 +2190,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
continue;
}
if (position.ackSet == null) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
Expand All @@ -2207,7 +2207,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
} else if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
if (givenBitSet != bitSet) {
Expand Down Expand Up @@ -2862,8 +2862,7 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null
|| batchDeletedIndexes.isEmpty()) {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
Expand Down Expand Up @@ -3314,7 +3313,7 @@ private ManagedCursorImpl cursorImpl() {

@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3626,10 +3626,13 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc
LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
when(ledgerOffloader.getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()).thenReturn(-1L);
when(ledgerOffloader.getOffloadPolicies().getManagedLedgerOffloadThresholdInSeconds()).thenReturn(-1L);
when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
config.setLedgerOffloader(ledgerOffloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
"testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);
ManagedLedgerImpl ledger = spy((ManagedLedgerImpl)factory.open(
"testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config));
doNothing().when(ledger).trimConsumedLedgersInBackground(any(CompletableFuture.class));

// Retain the data.
ledger.openCursor("test-cursor");
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ flexible messaging model and an intuitive client API.</description>
<awaitility.version>4.2.0</awaitility.version>
<reload4j.version>1.2.22</reload4j.version>
<jettison.version>1.5.1</jettison.version>
<wiremock.version>2.33.2</wiremock.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down Expand Up @@ -290,6 +291,12 @@ flexible messaging model and an intuitive client API.</description>
<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.jline</groupId>
<artifactId>jline</artifactId>
<version>${jline3.version}</version>
</dependency>

<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Allow forced deletion of tenants. Default is false."
)
private boolean forceDeleteTenantAllowed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private void startBookieWithMetadataStore() throws Exception {
log.info("Starting BK with RocksDb metadata store");
metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath();
} else {
log.info("Starting BK with metadata store:", metadataStoreUrl);
log.info("Starting BK with metadata store: {}", metadataStoreUrl);
}

ServerConfiguration bkServerConf = new ServerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ public PulsarService(ServiceConfiguration config,
this.config = config;
this.processTerminator = processTerminator;
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
this.workerConfig = workerConfig;
this.functionWorkerService = functionWorkerService;
this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
new DefaultThreadFactory("pulsar"));
new ExecutorProvider.ExtendedThreadFactory("pulsar"));
this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
new DefaultThreadFactory("zk-cache-callback"));
new ExecutorProvider.ExtendedThreadFactory("zk-cache-callback"));

if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration()
Expand Down Expand Up @@ -615,7 +615,7 @@ private synchronized void resetMetricsServlet() {

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
Expand Down Expand Up @@ -1425,7 +1425,8 @@ public BookKeeperClientFactory getBookKeeperClientFactory() {

protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
compactorExecutor = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("compaction"));
}
return this.compactorExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class TransactionMetadataStoreService {
private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;

private final ThreadFactory threadFactory =
new DefaultThreadFactory("transaction-coordinator-thread-factory");
new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");


public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -852,6 +853,29 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
}
}

public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
if (destinationBroker != null) {
if (!this.isLeaderBroker()) {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerUrl = leaderBroker.getServiceUrl();
try {
URL redirectUrl = new URL(leaderBrokerUrl);
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(redirectUrl.getHost())
.port(redirectUrl.getPort()).replaceQueryParam("authoritative",
false).build();

// Redirect
log.debug("Redirecting the rest call to leader - {}, bundleRange - {}", redirect, bundleRange);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
} catch (MalformedURLException exception) {
log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
throw new RestException(exception);
}
}
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
}
}

public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
Expand Down Expand Up @@ -898,8 +922,9 @@ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleR
}
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle));
.thenCompose(nsBundle -> {
return pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle);
});
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception {

CompletableFuture<Set<String>> getAvailableBrokersAsync();

void setNamespaceBundleAffinity(String bundle, String broker);

String removeNamespaceBundleAffinity(String bundle);

void stop() throws PulsarServerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,8 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
* @return bundle data
*/
BundleData getBundleDataOrDefault(String bundle);

void setNamespaceBundleAffinity(String bundle, String broker);

String removeNamespaceBundleAffinity(String bundle);
}
Loading