msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected message #" + (i + 1));
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "every encrypted message must decrypt to its original value");
+ }
+
+ /**
+ * Consumer with {@link ConsumerCryptoFailureAction#CONSUME} and no
+ * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} configured
+ * sees the still-encrypted payload, demonstrating the "I don't decrypt; just
+ * give me the bytes" mode.
+ *
+ * Batching disabled on the producer: v4 drops batched encrypted messages
+ * even under CONSUME because it can't reframe a batch envelope it can't open.
+ */
+ @Test
+ public void testConsumerWithoutProviderAndConsumeAction() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .batchingPolicy(BatchingPolicy.ofDisabled())
+ .encryptionPolicy(producerPolicy())
+ .create();
+
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.bytes())
+ .topic(topic)
+ .subscriptionName("crypto-consume-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+ .failureAction(ConsumerCryptoFailureAction.CONSUME)
+ .build())
+ .subscribe();
+
+ producer.newMessage().value("plaintext-marker").send();
+
+ Message msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "CONSUME must deliver the message even without a private key");
+ // Payload is still encrypted — must not contain the plaintext marker.
+ String body = new String(msg.value());
+ assertTrue(!body.contains("plaintext-marker"),
+ "payload should still be encrypted, got: " + body);
+ consumer.acknowledge(msg.id());
+ }
+
+ /**
+ * Consumer with {@link ConsumerCryptoFailureAction#DISCARD} and no provider
+ * silently drops undecryptable messages (cursor advances) — the application
+ * never sees them.
+ */
+ @Test
+ public void testConsumerWithoutProviderAndDiscardAction() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .batchingPolicy(BatchingPolicy.ofDisabled())
+ .encryptionPolicy(producerPolicy())
+ .create();
+
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("crypto-discard-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+ .failureAction(ConsumerCryptoFailureAction.DISCARD)
+ .build())
+ .subscribe();
+
+ producer.newMessage().value("classified").send();
+
+ Message msg = consumer.receive(Duration.ofMillis(500));
+ assertNull(msg, "DISCARD must drop the undecryptable message before delivery");
+ }
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index e9c6153fec5c0..1a67dbcfe84db 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -20,7 +20,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
/**
* Builder for configuring and creating a {@link CheckpointConsumer}.
@@ -111,9 +111,9 @@ public interface CheckpointConsumerBuilder {
*
* @param policy the encryption policy to use
* @return this builder instance for chaining
- * @see EncryptionPolicy#forConsumer
+ * @see ConsumerEncryptionPolicy#builder()
*/
- CheckpointConsumerBuilder encryptionPolicy(EncryptionPolicy policy);
+ CheckpointConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy);
// --- Metadata ---
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
index 65800756c19f0..207686e88d172 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
@@ -24,8 +24,8 @@
import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
/**
* Builder for configuring and creating a {@link Producer}.
@@ -133,11 +133,11 @@ public interface ProducerBuilder {
/**
* Configure end-to-end message encryption.
*
- * @param policy the encryption policy for producing encrypted messages
+ * @param policy the producer-side encryption policy
* @return this builder instance for chaining
- * @see EncryptionPolicy#forProducer(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader, String...)
+ * @see ProducerEncryptionPolicy#builder()
*/
- ProducerBuilder encryptionPolicy(EncryptionPolicy policy);
+ ProducerBuilder encryptionPolicy(ProducerEncryptionPolicy policy);
/**
* Set the initial sequence ID for producer message deduplication. Subsequent messages
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index db03f3c546913..f616f79beee95 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -22,8 +22,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
@@ -194,9 +194,9 @@ public interface QueueConsumerBuilder {
*
* @param policy the encryption policy to use
* @return this builder instance for chaining
- * @see EncryptionPolicy#forConsumer
+ * @see ConsumerEncryptionPolicy#builder()
*/
- QueueConsumerBuilder encryptionPolicy(EncryptionPolicy policy);
+ QueueConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy);
// --- Misc ---
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index 0963f79203523..ab813842b4f5a 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -22,7 +22,7 @@
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
/**
@@ -164,9 +164,9 @@ public interface StreamConsumerBuilder {
*
* @param policy the encryption policy to use
* @return this builder instance for chaining
- * @see EncryptionPolicy#forConsumer
+ * @see ConsumerEncryptionPolicy#builder()
*/
- StreamConsumerBuilder encryptionPolicy(EncryptionPolicy policy);
+ StreamConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy);
// --- Metadata ---
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
similarity index 53%
rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
index bec3eeb6054b5..f4b1b5d3fe172 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
@@ -18,28 +18,29 @@
*/
package org.apache.pulsar.client.api.v5.auth;
-import java.util.Map;
-
/**
- * Interface for loading encryption and decryption keys for end-to-end message encryption.
+ * Action a consumer takes when message decryption fails (e.g. the
+ * {@link PrivateKeyProvider} cannot be reached, returns no key, or the
+ * ciphertext is malformed).
*/
-public interface CryptoKeyReader {
+public enum ConsumerCryptoFailureAction {
+
+ /**
+ * Fail the {@code receive} call. The application sees the decryption error
+ * and the message stays unacknowledged so it will be redelivered.
+ */
+ FAIL,
/**
- * Get the public key for encrypting messages.
- *
- * @param keyName the name of the key
- * @param metadata additional metadata associated with the key
- * @return the encryption key info containing the public key data
+ * Silently acknowledge and skip the message. Useful when the consumer
+ * legitimately cannot read some encrypted streams (e.g. a side channel)
+ * but should keep moving forward through the rest.
*/
- EncryptionKeyInfo getPublicKey(String keyName, Map metadata);
+ DISCARD,
/**
- * Get the private key for decrypting messages.
- *
- * @param keyName the name of the key
- * @param metadata additional metadata associated with the key
- * @return the encryption key info containing the private key data
+ * Deliver the message to the application as-is, with the still-encrypted
+ * payload. The application can then handle decryption out-of-band.
*/
- EncryptionKeyInfo getPrivateKey(String keyName, Map metadata);
+ CONSUME
}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
similarity index 60%
rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java
rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
index cc429bbc8c935..6360a1590d3a5 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
@@ -18,20 +18,15 @@
*/
package org.apache.pulsar.client.api.v5.auth;
-import java.util.Map;
-import java.util.Objects;
-
/**
- * Holds an encryption key and associated metadata.
+ * Convenience interface for implementations that serve both public keys (for
+ * producer-side encryption) and private keys (for consumer-side decryption) — for
+ * example, a single PEM-file-backed key store used by both sides of an in-process
+ * round trip.
*
- * @param key the raw key bytes
- * @param metadata key-value metadata associated with the key
+ * Producer-only or consumer-only implementations should implement
+ * {@link PublicKeyProvider} or {@link PrivateKeyProvider} directly instead — that
+ * makes the role explicit and avoids stub methods that throw.
*/
-public record EncryptionKeyInfo(byte[] key, Map metadata) {
- public EncryptionKeyInfo {
- Objects.requireNonNull(key, "key must not be null");
- if (metadata == null) {
- metadata = Map.of();
- }
- }
+public interface CryptoKeyProvider extends PublicKeyProvider, PrivateKeyProvider {
}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java
new file mode 100644
index 0000000000000..4d44235d06d77
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A single encryption / decryption key returned by a {@link PublicKeyProvider} or
+ * {@link PrivateKeyProvider}.
+ *
+ * The producer-side flow returns just the key bytes. The consumer-side flow may
+ * include {@link #metadata()} that the producer attached when the message was
+ * encrypted (e.g. a key version) — the {@link PrivateKeyProvider} can use it to
+ * pick the right private key when keys have been rotated.
+ *
+ *
Use {@link #of(byte[])} when there's no metadata, or {@link #of(byte[], Map)}
+ * to attach producer-side hints alongside the bytes.
+ */
+public final class EncryptionKey {
+
+ private final byte[] key;
+ private final Map metadata;
+
+ private EncryptionKey(byte[] key, Map metadata) {
+ this.key = Objects.requireNonNull(key, "key must not be null");
+ this.metadata = metadata == null ? Map.of() : Map.copyOf(metadata);
+ }
+
+ /**
+ * @return the raw key bytes
+ */
+ public byte[] key() {
+ return key;
+ }
+
+ /**
+ * @return key-value metadata associated with the key (never {@code null};
+ * empty when the producer did not attach any)
+ */
+ public Map metadata() {
+ return metadata;
+ }
+
+ /**
+ * Create an {@link EncryptionKey} with no metadata.
+ *
+ * @param key the raw key bytes
+ * @return a new {@link EncryptionKey}
+ */
+ public static EncryptionKey of(byte[] key) {
+ return new EncryptionKey(key, Map.of());
+ }
+
+ /**
+ * Create an {@link EncryptionKey} with associated metadata.
+ *
+ * @param key the raw key bytes
+ * @param metadata key-value metadata to attach to the key
+ * @return a new {@link EncryptionKey}
+ */
+ public static EncryptionKey of(byte[] key, Map metadata) {
+ return new EncryptionKey(key, metadata);
+ }
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java
new file mode 100644
index 0000000000000..7670d5dfecc38
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Batteries-included key provider that loads PEM-encoded keys from local files.
+ *
+ * The same instance can serve as both {@link PublicKeyProvider} and
+ * {@link PrivateKeyProvider} — register public keys for the producer side and
+ * private keys for the consumer side. In typical setups each side instantiates
+ * its own provider, configured only with the keys it actually needs.
+ *
+ *
For more complex sources (KMS, Vault, ...) implement {@link PublicKeyProvider}
+ * or {@link PrivateKeyProvider} directly.
+ *
+ *
{@code
+ * var keys = PemFileKeyProvider.builder()
+ * .publicKey("orders-v1", Path.of("/etc/keys/orders-pub.pem"))
+ * .privateKey("orders-v1", Path.of("/etc/keys/orders-priv.pem"))
+ * .build();
+ *
+ * client.newProducer(Schema.string())
+ * .topic("orders")
+ * .encryptionPolicy(ProducerEncryptionPolicy.builder()
+ * .publicKeyProvider(keys)
+ * .keyName("orders-v1")
+ * .build())
+ * .create();
+ * }
+ */
+public final class PemFileKeyProvider implements CryptoKeyProvider {
+
+ private final Map publicKeys;
+ private final Map privateKeys;
+
+ private PemFileKeyProvider(Map publicKeys, Map privateKeys) {
+ this.publicKeys = Map.copyOf(publicKeys);
+ this.privateKeys = Map.copyOf(privateKeys);
+ }
+
+ @Override
+ public CompletableFuture getPublicKey(String keyName) {
+ return loadKey(keyName, publicKeys, "public");
+ }
+
+ @Override
+ public CompletableFuture getPrivateKey(String keyName, Map metadata) {
+ return loadKey(keyName, privateKeys, "private");
+ }
+
+ private static CompletableFuture loadKey(String keyName,
+ Map keys,
+ String role) {
+ Path path = keys.get(keyName);
+ if (path == null) {
+ return CompletableFuture.failedFuture(new IllegalArgumentException(
+ "no " + role + " key registered for name: " + keyName));
+ }
+ try {
+ byte[] bytes = Files.readAllBytes(path);
+ return CompletableFuture.completedFuture(EncryptionKey.of(bytes));
+ } catch (IOException e) {
+ return CompletableFuture.failedFuture(new IOException(
+ "failed to read " + role + " key '" + keyName + "' from " + path, e));
+ }
+ }
+
+ /**
+ * @return a new builder for constructing a {@link PemFileKeyProvider}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link PemFileKeyProvider}.
+ */
+ public static final class Builder {
+ private final Map publicKeys = new HashMap<>();
+ private final Map privateKeys = new HashMap<>();
+
+ private Builder() {
+ }
+
+ /**
+ * Register a public key file under the given name. Producer-side use.
+ *
+ * @param keyName the key identifier the producer will reference
+ * @param path path to the PEM-encoded public key file
+ * @return this builder
+ */
+ public Builder publicKey(String keyName, Path path) {
+ publicKeys.put(keyName, path);
+ return this;
+ }
+
+ /**
+ * Register a private key file under the given name. Consumer-side use.
+ *
+ * @param keyName the key identifier the producer used to encrypt
+ * @param path path to the PEM-encoded private key file
+ * @return this builder
+ */
+ public Builder privateKey(String keyName, Path path) {
+ privateKeys.put(keyName, path);
+ return this;
+ }
+
+ /**
+ * @return a new {@link PemFileKeyProvider} instance
+ */
+ public PemFileKeyProvider build() {
+ return new PemFileKeyProvider(publicKeys, privateKeys);
+ }
+ }
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java
new file mode 100644
index 0000000000000..acdfe60d649f5
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consumer-side SPI: load a private key by name for end-to-end message decryption.
+ *
+ * The provider is consulted on every encrypted message the consumer receives.
+ * The signature is asynchronous so an implementation backed by a remote KMS
+ * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's IO
+ * thread. For local key stores, return a completed future.
+ *
+ *
For a simple file-based provider, see {@link PemFileKeyProvider}.
+ */
+public interface PrivateKeyProvider {
+
+ /**
+ * Look up the private key for the given name.
+ *
+ *
{@code metadata} carries any hints the producer attached when the message
+ * was encrypted — typically a key version or rotation marker. Implementations
+ * that don't rotate keys can ignore it.
+ *
+ * @param keyName the key identifier the producer used to encrypt
+ * @param metadata producer-supplied hints about the key (never {@code null};
+ * empty when the producer didn't attach any)
+ * @return a future completing with the private key, or completing exceptionally
+ * if the key cannot be found or loaded
+ */
+ CompletableFuture getPrivateKey(String keyName, Map metadata);
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
similarity index 66%
rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java
rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
index 3b48c2a46f796..0ef6e71c2a912 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
@@ -19,23 +19,20 @@
package org.apache.pulsar.client.api.v5.auth;
/**
- * Action to take when a message encryption or decryption operation fails.
+ * Action a producer takes when message encryption fails (e.g. the
+ * {@link PublicKeyProvider} cannot be reached or returns no key).
*/
-public enum CryptoFailureAction {
+public enum ProducerCryptoFailureAction {
/**
- * Fail the operation and return an error to the caller.
+ * Fail the {@code send} call. The send future completes exceptionally and
+ * the application sees the error.
*/
FAIL,
/**
- * Silently discard the message (consumer side only).
+ * Send the message unencrypted instead of failing. Useful when encryption
+ * is opportunistic — for example, during a key-rollout migration.
*/
- DISCARD,
-
- /**
- * Deliver the message to the consumer without decrypting (consumer side only).
- * The message will contain encrypted payload.
- */
- CONSUME
+ SEND_UNENCRYPTED
}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java
new file mode 100644
index 0000000000000..af2b3172c8b33
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Producer-side SPI: load a public key by name for end-to-end message encryption.
+ *
+ * The provider is consulted at producer creation time and on every key rotation.
+ * The signature is asynchronous so an implementation backed by a remote KMS
+ * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's IO
+ * thread. For local key stores, return a completed future.
+ *
+ *
For a simple file-based provider, see {@link PemFileKeyProvider}.
+ */
+public interface PublicKeyProvider {
+
+ /**
+ * Look up the public key with the given name.
+ *
+ * @param keyName the key identifier as configured on the producer
+ * @return a future completing with the public key, or completing exceptionally
+ * if the key cannot be found or loaded
+ */
+ CompletableFuture getPublicKey(String keyName);
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
index 7d8141ad558ba..f7f22f13a3719 100644
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
@@ -23,6 +23,8 @@
* Provides pluggable authentication via {@link org.apache.pulsar.client.api.v5.auth.Authentication}
* and convenience factories in {@link org.apache.pulsar.client.api.v5.auth.AuthenticationFactory},
* as well as end-to-end encryption support via
- * {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader}.
+ * {@link org.apache.pulsar.client.api.v5.auth.PublicKeyProvider} (producer side) and
+ * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} (consumer side).
+ * For local PEM-file-backed setups, use {@link org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider}.
*/
package org.apache.pulsar.client.api.v5.auth;
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java
new file mode 100644
index 0000000000000..08dc86b57accf
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+
+/**
+ * Consumer-side end-to-end decryption configuration.
+ *
+ *
Construct via {@link #builder()}. The {@link PrivateKeyProvider} is required
+ * when {@link #failureAction()} is {@link ConsumerCryptoFailureAction#FAIL} (the
+ * default — strict mode); for {@link ConsumerCryptoFailureAction#DISCARD} or
+ * {@link ConsumerCryptoFailureAction#CONSUME} the provider may be omitted, in
+ * which case the consumer just relies on the failure action to decide what to do
+ * with encrypted messages it can't decrypt.
+ */
+@EqualsAndHashCode
+@ToString
+public final class ConsumerEncryptionPolicy {
+
+ private final PrivateKeyProvider privateKeyProvider;
+ private final ConsumerCryptoFailureAction failureAction;
+
+ private ConsumerEncryptionPolicy(PrivateKeyProvider privateKeyProvider,
+ ConsumerCryptoFailureAction failureAction) {
+ Objects.requireNonNull(failureAction, "failureAction must not be null");
+ if (failureAction == ConsumerCryptoFailureAction.FAIL && privateKeyProvider == null) {
+ throw new IllegalArgumentException(
+ "privateKeyProvider must be set when failureAction is FAIL");
+ }
+ this.privateKeyProvider = privateKeyProvider;
+ this.failureAction = failureAction;
+ }
+
+ /**
+ * @return the provider used to load private keys for decryption, or {@code null}
+ * when the consumer doesn't decrypt and falls back to the failure action
+ * (DISCARD or CONSUME)
+ */
+ public PrivateKeyProvider privateKeyProvider() {
+ return privateKeyProvider;
+ }
+
+ /**
+ * @return the action the consumer takes when decryption fails
+ */
+ public ConsumerCryptoFailureAction failureAction() {
+ return failureAction;
+ }
+
+ /**
+ * @return a new builder for constructing a {@link ConsumerEncryptionPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link ConsumerEncryptionPolicy}.
+ */
+ public static final class Builder {
+ private PrivateKeyProvider privateKeyProvider;
+ private ConsumerCryptoFailureAction failureAction = ConsumerCryptoFailureAction.FAIL;
+
+ private Builder() {
+ }
+
+ /**
+ * Provider used to load private keys. Required.
+ *
+ * @param privateKeyProvider the private-key provider
+ * @return this builder
+ */
+ public Builder privateKeyProvider(PrivateKeyProvider privateKeyProvider) {
+ this.privateKeyProvider = privateKeyProvider;
+ return this;
+ }
+
+ /**
+ * Action to take when decryption fails. Default: {@link ConsumerCryptoFailureAction#FAIL}.
+ *
+ * @param failureAction the failure action
+ * @return this builder
+ */
+ public Builder failureAction(ConsumerCryptoFailureAction failureAction) {
+ this.failureAction = failureAction;
+ return this;
+ }
+
+ /**
+ * @return a new {@link ConsumerEncryptionPolicy} instance
+ */
+ public ConsumerEncryptionPolicy build() {
+ return new ConsumerEncryptionPolicy(privateKeyProvider, failureAction);
+ }
+ }
+}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
deleted file mode 100644
index c196a8f20fb94..0000000000000
--- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api.v5.config;
-
-import java.util.List;
-import java.util.Objects;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import org.apache.pulsar.client.api.v5.auth.CryptoFailureAction;
-import org.apache.pulsar.client.api.v5.auth.CryptoKeyReader;
-
-/**
- * End-to-end encryption configuration for producers and consumers.
- *
- *
For producers, supply a {@link CryptoKeyReader} and one or more encryption key names —
- * use {@link #forProducer(CryptoKeyReader, String...)} as the typical entry point.
- * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link CryptoFailureAction} —
- * use {@link #forConsumer(CryptoKeyReader, CryptoFailureAction)}.
- *
- *
{@link #builder()} is available for callers that need to tune both sides explicitly.
- */
-@EqualsAndHashCode
-@ToString
-public final class EncryptionPolicy {
-
- private final CryptoKeyReader keyReader;
- private final List keyNames;
- private final CryptoFailureAction failureAction;
-
- private EncryptionPolicy(CryptoKeyReader keyReader, List keyNames,
- CryptoFailureAction failureAction) {
- Objects.requireNonNull(keyReader, "keyReader must not be null");
- if (keyNames == null) {
- keyNames = List.of();
- }
- if (failureAction == null) {
- failureAction = CryptoFailureAction.FAIL;
- }
- this.keyReader = keyReader;
- this.keyNames = List.copyOf(keyNames);
- this.failureAction = failureAction;
- }
-
- /**
- * @return the crypto key reader for loading encryption/decryption keys
- */
- public CryptoKeyReader keyReader() {
- return keyReader;
- }
-
- /**
- * @return the producer-side encryption key names (empty list for consumer/reader)
- */
- public List keyNames() {
- return keyNames;
- }
-
- /**
- * @return the action to take when encryption or decryption fails
- */
- public CryptoFailureAction failureAction() {
- return failureAction;
- }
-
- /**
- * Create an encryption policy for producers.
- *
- * @param keyReader the crypto key reader for loading encryption keys
- * @param keyNames one or more encryption key names to use
- * @return an {@link EncryptionPolicy} configured for producer-side encryption
- */
- public static EncryptionPolicy forProducer(CryptoKeyReader keyReader, String... keyNames) {
- return new EncryptionPolicy(keyReader, List.of(keyNames), CryptoFailureAction.FAIL);
- }
-
- /**
- * Create an encryption policy for consumers/readers.
- *
- * @param keyReader the crypto key reader for loading decryption keys
- * @param failureAction the action to take when decryption fails
- * @return an {@link EncryptionPolicy} configured for consumer-side decryption
- */
- public static EncryptionPolicy forConsumer(CryptoKeyReader keyReader, CryptoFailureAction failureAction) {
- return new EncryptionPolicy(keyReader, List.of(), failureAction);
- }
-
- /**
- * @return a new builder for constructing an {@link EncryptionPolicy}
- */
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * Builder for {@link EncryptionPolicy}.
- */
- public static final class Builder {
- private CryptoKeyReader keyReader;
- private List keyNames = List.of();
- private CryptoFailureAction failureAction = CryptoFailureAction.FAIL;
-
- private Builder() {
- }
-
- /**
- * Crypto key reader used to load encryption/decryption keys. Required.
- *
- * @param keyReader the key reader
- * @return this builder
- */
- public Builder keyReader(CryptoKeyReader keyReader) {
- this.keyReader = keyReader;
- return this;
- }
-
- /**
- * Producer-side encryption key names. Leave empty (default) for consumer-side use.
- *
- * @param keyNames the key names
- * @return this builder
- */
- public Builder keyNames(String... keyNames) {
- this.keyNames = List.of(keyNames);
- return this;
- }
-
- /**
- * Action to take when encryption (producer) or decryption (consumer) fails.
- * Default is {@link CryptoFailureAction#FAIL}.
- *
- * @param failureAction the failure action
- * @return this builder
- */
- public Builder failureAction(CryptoFailureAction failureAction) {
- this.failureAction = failureAction;
- return this;
- }
-
- /**
- * @return a new {@link EncryptionPolicy} instance
- */
- public EncryptionPolicy build() {
- return new EncryptionPolicy(keyReader, keyNames, failureAction);
- }
- }
-}
diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java
new file mode 100644
index 0000000000000..0e793b3737e31
--- /dev/null
+++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.util.List;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
+
+/**
+ * Producer-side end-to-end encryption configuration.
+ *
+ * Construct via {@link #builder()}. Required: a {@link PublicKeyProvider} and at
+ * least one key name.
+ */
+@EqualsAndHashCode
+@ToString
+public final class ProducerEncryptionPolicy {
+
+ private final PublicKeyProvider publicKeyProvider;
+ private final List keyNames;
+ private final ProducerCryptoFailureAction failureAction;
+
+ private ProducerEncryptionPolicy(PublicKeyProvider publicKeyProvider,
+ List keyNames,
+ ProducerCryptoFailureAction failureAction) {
+ Objects.requireNonNull(publicKeyProvider, "publicKeyProvider must not be null");
+ Objects.requireNonNull(keyNames, "keyNames must not be null");
+ if (keyNames.isEmpty()) {
+ throw new IllegalArgumentException("at least one key name must be configured");
+ }
+ Objects.requireNonNull(failureAction, "failureAction must not be null");
+ this.publicKeyProvider = publicKeyProvider;
+ this.keyNames = List.copyOf(keyNames);
+ this.failureAction = failureAction;
+ }
+
+ /**
+ * @return the provider used to load public keys for encryption
+ */
+ public PublicKeyProvider publicKeyProvider() {
+ return publicKeyProvider;
+ }
+
+ /**
+ * @return the configured key names; the producer encrypts each message's data
+ * key with every public key listed here
+ */
+ public List keyNames() {
+ return keyNames;
+ }
+
+ /**
+ * @return the action the producer takes when encryption fails
+ */
+ public ProducerCryptoFailureAction failureAction() {
+ return failureAction;
+ }
+
+ /**
+ * @return a new builder for constructing a {@link ProducerEncryptionPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link ProducerEncryptionPolicy}.
+ */
+ public static final class Builder {
+ private PublicKeyProvider publicKeyProvider;
+ private List keyNames = List.of();
+ private ProducerCryptoFailureAction failureAction = ProducerCryptoFailureAction.FAIL;
+
+ private Builder() {
+ }
+
+ /**
+ * Provider used to load public keys. Required.
+ *
+ * @param publicKeyProvider the public-key provider
+ * @return this builder
+ */
+ public Builder publicKeyProvider(PublicKeyProvider publicKeyProvider) {
+ this.publicKeyProvider = publicKeyProvider;
+ return this;
+ }
+
+ /**
+ * Single key name shortcut — equivalent to {@code keyNames(List.of(name))}.
+ *
+ * @param keyName the key name
+ * @return this builder
+ */
+ public Builder keyName(String keyName) {
+ this.keyNames = List.of(keyName);
+ return this;
+ }
+
+ /**
+ * Multiple key names. The producer encrypts each message's data key with
+ * every public key listed here, so any consumer with one of the matching
+ * private keys can decrypt.
+ *
+ * @param keyNames one or more key names
+ * @return this builder
+ */
+ public Builder keyNames(String... keyNames) {
+ this.keyNames = List.of(keyNames);
+ return this;
+ }
+
+ /**
+ * Action to take when encryption fails. Default: {@link ProducerCryptoFailureAction#FAIL}.
+ *
+ * @param failureAction the failure action
+ * @return this builder
+ */
+ public Builder failureAction(ProducerCryptoFailureAction failureAction) {
+ this.failureAction = failureAction;
+ return this;
+ }
+
+ /**
+ * @return a new {@link ProducerEncryptionPolicy} instance
+ */
+ public ProducerEncryptionPolicy build() {
+ return new ProducerEncryptionPolicy(publicKeyProvider, keyNames, failureAction);
+ }
+ }
+}
diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index f05cfbd4a21ce..b3439d31c0a46 100644
--- a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api.v5;
+import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
@@ -26,14 +27,18 @@
import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
import org.apache.pulsar.client.api.v5.auth.AuthenticationFactory;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
import org.apache.pulsar.client.api.v5.config.CompressionType;
import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.config.MemorySize;
import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.config.TlsPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -300,6 +305,42 @@ void queueConsumerWithDLQ(PulsarClient client) throws Exception {
}
}
+ /**
+ * End-to-end encryption — producer encrypts with a public key, consumer decrypts
+ * with the matching private key. The {@link PemFileKeyProvider} is the
+ * batteries-included reader for local PEM files; for remote key stores
+ * (KMS, Vault, ...) implement {@link org.apache.pulsar.client.api.v5.auth.PublicKeyProvider}
+ * or {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} directly.
+ */
+ void encryptedProducerAndConsumer(PulsarClient client) throws Exception {
+ try (var producer = client.newProducer(Schema.string())
+ .topic("orders")
+ .encryptionPolicy(ProducerEncryptionPolicy.builder()
+ .publicKeyProvider(PemFileKeyProvider.builder()
+ .publicKey("orders-v1", Path.of("/etc/keys/orders-pub.pem"))
+ .build())
+ .keyName("orders-v1")
+ .build())
+ .create()) {
+ producer.newMessage().value("classified payload").send();
+ }
+
+ try (var consumer = client.newQueueConsumer(Schema.string())
+ .topic("orders")
+ .subscriptionName("trusted")
+ .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(PemFileKeyProvider.builder()
+ .privateKey("orders-v1", Path.of("/etc/keys/orders-priv.pem"))
+ .build())
+ .failureAction(ConsumerCryptoFailureAction.FAIL)
+ .build())
+ .subscribe()) {
+ Message msg = consumer.receive(Duration.ofSeconds(5));
+ // ... use msg.value()
+ consumer.acknowledge(msg.id());
+ }
+ }
+
/** Async queue consumer — high-throughput parallel processing. */
void asyncQueueConsumer(PulsarClient client) throws Exception {
try (var consumer = client.newQueueConsumer(Schema.json(Order.class))
diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java
new file mode 100644
index 0000000000000..74fbc5128fac6
--- /dev/null
+++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit coverage for {@link PemFileKeyProvider}: registering a key file → reading it
+ * back as bytes; missing key → failed future; missing file on disk → failed future.
+ */
+public class PemFileKeyProviderTest {
+
+ private Path tempDir;
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+ tempDir = Files.createTempDirectory("pem-file-key-provider-test");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws IOException {
+ if (tempDir != null) {
+ try (var stream = Files.walk(tempDir)) {
+ stream.sorted(java.util.Comparator.reverseOrder())
+ .forEach(p -> {
+ try {
+ Files.deleteIfExists(p);
+ } catch (IOException ignored) {
+ // best effort
+ }
+ });
+ }
+ }
+ }
+
+ @Test
+ public void testReadRegisteredPublicKey() throws Exception {
+ Path keyFile = Files.writeString(tempDir.resolve("pub.pem"), "PUBLIC-KEY-BYTES");
+ var provider = PemFileKeyProvider.builder()
+ .publicKey("orders-v1", keyFile)
+ .build();
+
+ EncryptionKey key = provider.getPublicKey("orders-v1").get();
+ assertEquals(new String(key.key()), "PUBLIC-KEY-BYTES");
+ assertTrue(key.metadata().isEmpty());
+ }
+
+ @Test
+ public void testReadRegisteredPrivateKey() throws Exception {
+ Path keyFile = Files.writeString(tempDir.resolve("priv.pem"), "PRIVATE-KEY-BYTES");
+ var provider = PemFileKeyProvider.builder()
+ .privateKey("orders-v1", keyFile)
+ .build();
+
+ EncryptionKey key = provider.getPrivateKey("orders-v1", Map.of()).get();
+ assertEquals(new String(key.key()), "PRIVATE-KEY-BYTES");
+ }
+
+ @Test
+ public void testUnknownKeyNameFailsFuture() {
+ var provider = PemFileKeyProvider.builder().build();
+
+ var ex = expectThrows(CompletionException.class,
+ () -> provider.getPublicKey("missing").join());
+ assertTrue(ex.getCause() instanceof IllegalArgumentException,
+ "expected IllegalArgumentException, got: " + ex.getCause());
+ }
+
+ @Test
+ public void testMissingFileOnDiskFailsFuture() {
+ Path missing = tempDir.resolve("does-not-exist.pem");
+ var provider = PemFileKeyProvider.builder()
+ .publicKey("orders-v1", missing)
+ .build();
+
+ var ex = expectThrows(CompletionException.class,
+ () -> provider.getPublicKey("orders-v1").join());
+ assertTrue(ex.getCause() instanceof IOException,
+ "expected IOException, got: " + ex.getCause());
+ }
+
+ @Test
+ public void testProviderUsableAsBothSides() throws Exception {
+ Path pub = Files.writeString(tempDir.resolve("pub.pem"), "PUB");
+ Path priv = Files.writeString(tempDir.resolve("priv.pem"), "PRIV");
+ var provider = PemFileKeyProvider.builder()
+ .publicKey("k", pub)
+ .privateKey("k", priv)
+ .build();
+
+ assertEquals(new String(provider.getPublicKey("k").get().key()), "PUB");
+ assertEquals(new String(provider.getPrivateKey("k", Map.of()).get().key()), "PRIV");
+ }
+}
diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java
new file mode 100644
index 0000000000000..079b5b2962e6e
--- /dev/null
+++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.expectThrows;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Builder validation for {@link ProducerEncryptionPolicy} and
+ * {@link ConsumerEncryptionPolicy}.
+ */
+public class EncryptionPolicyTest {
+
+ private static final PublicKeyProvider STUB_PUB =
+ keyName -> CompletableFuture.completedFuture(EncryptionKey.of(new byte[0]));
+ private static final PrivateKeyProvider STUB_PRIV =
+ (keyName, metadata) -> CompletableFuture.completedFuture(EncryptionKey.of(new byte[0]));
+
+ // --- Producer side ---
+
+ @Test
+ public void testProducerPolicyMissingProviderRejected() {
+ expectThrows(NullPointerException.class, () ->
+ ProducerEncryptionPolicy.builder()
+ .keyName("k1")
+ .build());
+ }
+
+ @Test
+ public void testProducerPolicyMissingKeyNameRejected() {
+ expectThrows(IllegalArgumentException.class, () ->
+ ProducerEncryptionPolicy.builder()
+ .publicKeyProvider(STUB_PUB)
+ .build());
+ }
+
+ @Test
+ public void testProducerPolicyDefaultsFailureActionToFail() {
+ ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder()
+ .publicKeyProvider(STUB_PUB)
+ .keyName("k1")
+ .build();
+ assertSame(p.failureAction(), ProducerCryptoFailureAction.FAIL);
+ assertSame(p.publicKeyProvider(), STUB_PUB);
+ assertEquals(p.keyNames(), List.of("k1"));
+ }
+
+ @Test
+ public void testProducerPolicyMultipleKeyNames() {
+ ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder()
+ .publicKeyProvider(STUB_PUB)
+ .keyNames("k1", "k2", "k3")
+ .failureAction(ProducerCryptoFailureAction.SEND_UNENCRYPTED)
+ .build();
+ assertEquals(p.keyNames(), List.of("k1", "k2", "k3"));
+ assertSame(p.failureAction(), ProducerCryptoFailureAction.SEND_UNENCRYPTED);
+ }
+
+ // --- Consumer side ---
+
+ @Test
+ public void testConsumerPolicyFailModeRequiresProvider() {
+ // FAIL is the default failure action and requires a provider.
+ expectThrows(IllegalArgumentException.class, () ->
+ ConsumerEncryptionPolicy.builder().build());
+ }
+
+ @Test
+ public void testConsumerPolicyDiscardModeWithoutProviderAllowed() {
+ // DISCARD / CONSUME accept a null provider (consumer doesn't decrypt).
+ ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+ .failureAction(ConsumerCryptoFailureAction.DISCARD)
+ .build();
+ assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD);
+ org.testng.Assert.assertNull(p.privateKeyProvider());
+ }
+
+ @Test
+ public void testConsumerPolicyDefaultsFailureActionToFail() {
+ ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(STUB_PRIV)
+ .build();
+ assertSame(p.failureAction(), ConsumerCryptoFailureAction.FAIL);
+ assertSame(p.privateKeyProvider(), STUB_PRIV);
+ }
+
+ @Test
+ public void testConsumerPolicyExplicitFailureAction() {
+ ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(STUB_PRIV)
+ .failureAction(ConsumerCryptoFailureAction.DISCARD)
+ .build();
+ assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD);
+ }
+
+ // --- EncryptionKey factories ---
+
+ @Test
+ public void testEncryptionKeyFactories() {
+ byte[] bytes = "key-material".getBytes();
+ EncryptionKey k1 = EncryptionKey.of(bytes);
+ assertSame(k1.key(), bytes);
+ assertEquals(k1.metadata(), Map.of());
+
+ EncryptionKey k2 = EncryptionKey.of(bytes, Map.of("version", "v1"));
+ assertEquals(k2.metadata(), Map.of("version", "v1"));
+ }
+}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index c65e16d0fc51e..119702e949748 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -24,7 +24,7 @@
import org.apache.pulsar.client.api.v5.CheckpointConsumer;
import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
import org.apache.pulsar.client.api.v5.PulsarClientException;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
@@ -40,7 +40,7 @@ final class CheckpointConsumerBuilderV5 implements CheckpointConsumerBuilder<
private Checkpoint startPosition = CheckpointV5.LATEST;
private String consumerName;
private String consumerGroup;
- private EncryptionPolicy encryptionPolicy;
+ private ConsumerEncryptionPolicy encryptionPolicy;
CheckpointConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) {
this.client = client;
@@ -116,7 +116,7 @@ public CheckpointConsumerBuilderV5 consumerGroup(String group) {
}
@Override
- public CheckpointConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) {
+ public CheckpointConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) {
this.encryptionPolicy = policy;
return this;
}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
index d9d2f7e82581d..e5161d848f045 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
@@ -21,32 +21,63 @@
import java.util.Map;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
/**
- * Adapts a V5 {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader} to the V4
- * {@link CryptoKeyReader} interface used by the underlying client implementation.
+ * Bridges the V5 split-by-role key SPIs ({@link PublicKeyProvider},
+ * {@link PrivateKeyProvider}) to the v4 {@link CryptoKeyReader}, which has both
+ * methods on a single interface.
+ *
+ * v4's {@code CryptoKeyReader} is synchronous, so the adapter blocks on the V5
+ * provider's {@link java.util.concurrent.CompletableFuture future} via {@code join()}.
+ * For local providers (e.g. {@code PemFileKeyProvider}) the future is already
+ * complete; for remote providers (e.g. KMS-backed) this blocks the v4 thread that
+ * called {@code getXxxKey} — same constraint v4 already imposes today. Async
+ * end-to-end requires deeper plumbing into {@code MessageCrypto}; out of scope here.
*/
final class CryptoKeyReaderAdapter implements CryptoKeyReader {
- private final org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader;
+ private final PublicKeyProvider publicKeyProvider;
+ private final PrivateKeyProvider privateKeyProvider;
+
+ private CryptoKeyReaderAdapter(PublicKeyProvider publicKeyProvider,
+ PrivateKeyProvider privateKeyProvider) {
+ this.publicKeyProvider = publicKeyProvider;
+ this.privateKeyProvider = privateKeyProvider;
+ }
- private CryptoKeyReaderAdapter(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader) {
- this.v5Reader = v5Reader;
+ /**
+ * Producer-side adapter: only {@link CryptoKeyReader#getPublicKey} is supported.
+ */
+ static CryptoKeyReader forProducer(PublicKeyProvider provider) {
+ return new CryptoKeyReaderAdapter(provider, null);
}
- static CryptoKeyReader wrap(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader) {
- return new CryptoKeyReaderAdapter(v5Reader);
+ /**
+ * Consumer-side adapter: only {@link CryptoKeyReader#getPrivateKey} is supported.
+ */
+ static CryptoKeyReader forConsumer(PrivateKeyProvider provider) {
+ return new CryptoKeyReaderAdapter(null, provider);
}
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map metadata) {
- var v5Key = v5Reader.getPublicKey(keyName, metadata);
+ if (publicKeyProvider == null) {
+ throw new UnsupportedOperationException(
+ "getPublicKey called on a consumer-side CryptoKeyReaderAdapter");
+ }
+ var v5Key = publicKeyProvider.getPublicKey(keyName).join();
return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata());
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) {
- var v5Key = v5Reader.getPrivateKey(keyName, metadata);
+ if (privateKeyProvider == null) {
+ throw new UnsupportedOperationException(
+ "getPrivateKey called on a producer-side CryptoKeyReaderAdapter");
+ }
+ var v5Key = privateKeyProvider.getPrivateKey(keyName, metadata).join();
return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata());
}
}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index d5c261ede8950..922eb95885c52 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -29,8 +29,8 @@
import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
@@ -156,12 +156,13 @@ public ProducerBuilderV5 chunkingPolicy(ChunkingPolicy policy) {
}
@Override
- public ProducerBuilderV5 encryptionPolicy(EncryptionPolicy policy) {
- conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+ public ProducerBuilderV5 encryptionPolicy(ProducerEncryptionPolicy policy) {
+ conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forProducer(policy.publicKeyProvider()));
conf.setEncryptionKeys(new HashSet<>(policy.keyNames()));
- conf.setCryptoFailureAction(
- org.apache.pulsar.client.api.ProducerCryptoFailureAction.valueOf(
- policy.failureAction().name()));
+ conf.setCryptoFailureAction(switch (policy.failureAction()) {
+ case FAIL -> org.apache.pulsar.client.api.ProducerCryptoFailureAction.FAIL;
+ case SEND_UNENCRYPTED -> org.apache.pulsar.client.api.ProducerCryptoFailureAction.SEND;
+ });
return this;
}
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index 7f9e32963be84..d153a77b0f4ff 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -26,8 +26,8 @@
import org.apache.pulsar.client.api.v5.QueueConsumer;
import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -205,8 +205,10 @@ public QueueConsumerBuilderV5 deadLetterPolicy(DeadLetterPolicy policy) {
}
@Override
- public QueueConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) {
- conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+ public QueueConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) {
+ if (policy.privateKeyProvider() != null) {
+ conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider()));
+ }
conf.setCryptoFailureAction(
org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf(
policy.failureAction().name()));
diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index 6b9f6b67e0736..24cf63dacbc1e 100644
--- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -27,7 +27,7 @@
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.StreamConsumer;
import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -178,8 +178,10 @@ public StreamConsumerBuilderV5 replicateSubscriptionState(boolean replicate)
}
@Override
- public StreamConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) {
- conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+ public StreamConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) {
+ if (policy.privateKeyProvider() != null) {
+ conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider()));
+ }
conf.setCryptoFailureAction(
org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf(
policy.failureAction().name()));