diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java new file mode 100644 index 0000000000000..59ac067d3531b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider; +import org.apache.pulsar.client.api.v5.config.BatchingPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for V5 message encryption: produce → broker → consume + * round-trip, with payloads encrypted on the producer side and decrypted on the + * consumer side. Reuses the test PEM keys under {@code certificate/} that the v4 + * tests already use. + * + *

Wiring under test: + *

+ */ +public class V5EncryptionTest extends V5ClientBaseTest { + + private static final String KEY_NAME = "client-rsa"; + private static final Path PUB_KEY = + Path.of("./src/test/resources/certificate/public-key.client-rsa.pem"); + private static final Path PRIV_KEY = + Path.of("./src/test/resources/certificate/private-key.client-rsa.pem"); + + private static PemFileKeyProvider producerKeys() { + return PemFileKeyProvider.builder() + .publicKey(KEY_NAME, PUB_KEY) + .build(); + } + + private static PemFileKeyProvider consumerKeys() { + return PemFileKeyProvider.builder() + .privateKey(KEY_NAME, PRIV_KEY) + .build(); + } + + private static ProducerEncryptionPolicy producerPolicy() { + return ProducerEncryptionPolicy.builder() + .publicKeyProvider(producerKeys()) + .keyName(KEY_NAME) + .build(); + } + + private static ConsumerEncryptionPolicy consumerPolicy() { + return ConsumerEncryptionPolicy.builder() + .privateKeyProvider(consumerKeys()) + .failureAction(ConsumerCryptoFailureAction.FAIL) + .build(); + } + + /** Single-segment round trip: producer encrypts, consumer decrypts, payload matches. */ + @Test + public void testProducerConsumerRoundTrip() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .encryptionPolicy(producerPolicy()) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("crypto-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .encryptionPolicy(consumerPolicy()) + .subscribe(); + + producer.newMessage().value("hello-encrypted").send(); + + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "consumer must receive the encrypted-then-decrypted message"); + assertEquals(msg.value(), "hello-encrypted"); + consumer.acknowledge(msg.id()); + } + + /** + * Multi-segment scalable topic: messages spread across segments by key, each + * segment's per-segment v4 producer/consumer carries the same crypto config, + * so every message decrypts correctly regardless of which segment it landed on. + */ + @Test + public void testEncryptionAcrossMultipleSegments() throws Exception { + String topic = newScalableTopic(3); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .encryptionPolicy(producerPolicy()) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("crypto-multi-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .encryptionPolicy(consumerPolicy()) + .subscribe(); + + int n = 30; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String value = "msg-" + i; + producer.newMessage().key("k-" + i).value(value).send(); + sent.add(value); + } + + Set received = new HashSet<>(); + for (int i = 0; i < n; i++) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "expected message #" + (i + 1)); + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + assertEquals(received, sent, "every encrypted message must decrypt to its original value"); + } + + /** + * Consumer with {@link ConsumerCryptoFailureAction#CONSUME} and no + * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} configured + * sees the still-encrypted payload, demonstrating the "I don't decrypt; just + * give me the bytes" mode. + * + *

Batching disabled on the producer: v4 drops batched encrypted messages + * even under CONSUME because it can't reframe a batch envelope it can't open. + */ + @Test + public void testConsumerWithoutProviderAndConsumeAction() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .batchingPolicy(BatchingPolicy.ofDisabled()) + .encryptionPolicy(producerPolicy()) + .create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.bytes()) + .topic(topic) + .subscriptionName("crypto-consume-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .encryptionPolicy(ConsumerEncryptionPolicy.builder() + .failureAction(ConsumerCryptoFailureAction.CONSUME) + .build()) + .subscribe(); + + producer.newMessage().value("plaintext-marker").send(); + + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "CONSUME must deliver the message even without a private key"); + // Payload is still encrypted — must not contain the plaintext marker. + String body = new String(msg.value()); + assertTrue(!body.contains("plaintext-marker"), + "payload should still be encrypted, got: " + body); + consumer.acknowledge(msg.id()); + } + + /** + * Consumer with {@link ConsumerCryptoFailureAction#DISCARD} and no provider + * silently drops undecryptable messages (cursor advances) — the application + * never sees them. + */ + @Test + public void testConsumerWithoutProviderAndDiscardAction() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .batchingPolicy(BatchingPolicy.ofDisabled()) + .encryptionPolicy(producerPolicy()) + .create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("crypto-discard-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .encryptionPolicy(ConsumerEncryptionPolicy.builder() + .failureAction(ConsumerCryptoFailureAction.DISCARD) + .build()) + .subscribe(); + + producer.newMessage().value("classified").send(); + + Message msg = consumer.receive(Duration.ofMillis(500)); + assertNull(msg, "DISCARD must drop the undecryptable message before delivery"); + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java index e9c6153fec5c0..1a67dbcfe84db 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; /** * Builder for configuring and creating a {@link CheckpointConsumer}. @@ -111,9 +111,9 @@ public interface CheckpointConsumerBuilder { * * @param policy the encryption policy to use * @return this builder instance for chaining - * @see EncryptionPolicy#forConsumer + * @see ConsumerEncryptionPolicy#builder() */ - CheckpointConsumerBuilder encryptionPolicy(EncryptionPolicy policy); + CheckpointConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy); // --- Metadata --- diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java index 65800756c19f0..207686e88d172 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java @@ -24,8 +24,8 @@ import org.apache.pulsar.client.api.v5.config.BatchingPolicy; import org.apache.pulsar.client.api.v5.config.ChunkingPolicy; import org.apache.pulsar.client.api.v5.config.CompressionPolicy; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; import org.apache.pulsar.client.api.v5.config.ProducerAccessMode; +import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy; /** * Builder for configuring and creating a {@link Producer}. @@ -133,11 +133,11 @@ public interface ProducerBuilder { /** * Configure end-to-end message encryption. * - * @param policy the encryption policy for producing encrypted messages + * @param policy the producer-side encryption policy * @return this builder instance for chaining - * @see EncryptionPolicy#forProducer(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader, String...) + * @see ProducerEncryptionPolicy#builder() */ - ProducerBuilder encryptionPolicy(EncryptionPolicy policy); + ProducerBuilder encryptionPolicy(ProducerEncryptionPolicy policy); /** * Set the initial sequence ID for producer message deduplication. Subsequent messages diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java index db03f3c546913..f616f79beee95 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java @@ -22,8 +22,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.v5.config.BackoffPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy; import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; @@ -194,9 +194,9 @@ public interface QueueConsumerBuilder { * * @param policy the encryption policy to use * @return this builder instance for chaining - * @see EncryptionPolicy#forConsumer + * @see ConsumerEncryptionPolicy#builder() */ - QueueConsumerBuilder encryptionPolicy(EncryptionPolicy policy); + QueueConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy); // --- Misc --- diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java index 0963f79203523..ab813842b4f5a 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java @@ -22,7 +22,7 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; /** @@ -164,9 +164,9 @@ public interface StreamConsumerBuilder { * * @param policy the encryption policy to use * @return this builder instance for chaining - * @see EncryptionPolicy#forConsumer + * @see ConsumerEncryptionPolicy#builder() */ - StreamConsumerBuilder encryptionPolicy(EncryptionPolicy policy); + StreamConsumerBuilder encryptionPolicy(ConsumerEncryptionPolicy policy); // --- Metadata --- diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java similarity index 53% rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java index bec3eeb6054b5..f4b1b5d3fe172 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java @@ -18,28 +18,29 @@ */ package org.apache.pulsar.client.api.v5.auth; -import java.util.Map; - /** - * Interface for loading encryption and decryption keys for end-to-end message encryption. + * Action a consumer takes when message decryption fails (e.g. the + * {@link PrivateKeyProvider} cannot be reached, returns no key, or the + * ciphertext is malformed). */ -public interface CryptoKeyReader { +public enum ConsumerCryptoFailureAction { + + /** + * Fail the {@code receive} call. The application sees the decryption error + * and the message stays unacknowledged so it will be redelivered. + */ + FAIL, /** - * Get the public key for encrypting messages. - * - * @param keyName the name of the key - * @param metadata additional metadata associated with the key - * @return the encryption key info containing the public key data + * Silently acknowledge and skip the message. Useful when the consumer + * legitimately cannot read some encrypted streams (e.g. a side channel) + * but should keep moving forward through the rest. */ - EncryptionKeyInfo getPublicKey(String keyName, Map metadata); + DISCARD, /** - * Get the private key for decrypting messages. - * - * @param keyName the name of the key - * @param metadata additional metadata associated with the key - * @return the encryption key info containing the private key data + * Deliver the message to the application as-is, with the still-encrypted + * payload. The application can then handle decryption out-of-band. */ - EncryptionKeyInfo getPrivateKey(String keyName, Map metadata); + CONSUME } diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java similarity index 60% rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java index cc429bbc8c935..6360a1590d3a5 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java @@ -18,20 +18,15 @@ */ package org.apache.pulsar.client.api.v5.auth; -import java.util.Map; -import java.util.Objects; - /** - * Holds an encryption key and associated metadata. + * Convenience interface for implementations that serve both public keys (for + * producer-side encryption) and private keys (for consumer-side decryption) — for + * example, a single PEM-file-backed key store used by both sides of an in-process + * round trip. * - * @param key the raw key bytes - * @param metadata key-value metadata associated with the key + *

Producer-only or consumer-only implementations should implement + * {@link PublicKeyProvider} or {@link PrivateKeyProvider} directly instead — that + * makes the role explicit and avoids stub methods that throw. */ -public record EncryptionKeyInfo(byte[] key, Map metadata) { - public EncryptionKeyInfo { - Objects.requireNonNull(key, "key must not be null"); - if (metadata == null) { - metadata = Map.of(); - } - } +public interface CryptoKeyProvider extends PublicKeyProvider, PrivateKeyProvider { } diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java new file mode 100644 index 0000000000000..4d44235d06d77 --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.auth; + +import java.util.Map; +import java.util.Objects; + +/** + * A single encryption / decryption key returned by a {@link PublicKeyProvider} or + * {@link PrivateKeyProvider}. + * + *

The producer-side flow returns just the key bytes. The consumer-side flow may + * include {@link #metadata()} that the producer attached when the message was + * encrypted (e.g. a key version) — the {@link PrivateKeyProvider} can use it to + * pick the right private key when keys have been rotated. + * + *

Use {@link #of(byte[])} when there's no metadata, or {@link #of(byte[], Map)} + * to attach producer-side hints alongside the bytes. + */ +public final class EncryptionKey { + + private final byte[] key; + private final Map metadata; + + private EncryptionKey(byte[] key, Map metadata) { + this.key = Objects.requireNonNull(key, "key must not be null"); + this.metadata = metadata == null ? Map.of() : Map.copyOf(metadata); + } + + /** + * @return the raw key bytes + */ + public byte[] key() { + return key; + } + + /** + * @return key-value metadata associated with the key (never {@code null}; + * empty when the producer did not attach any) + */ + public Map metadata() { + return metadata; + } + + /** + * Create an {@link EncryptionKey} with no metadata. + * + * @param key the raw key bytes + * @return a new {@link EncryptionKey} + */ + public static EncryptionKey of(byte[] key) { + return new EncryptionKey(key, Map.of()); + } + + /** + * Create an {@link EncryptionKey} with associated metadata. + * + * @param key the raw key bytes + * @param metadata key-value metadata to attach to the key + * @return a new {@link EncryptionKey} + */ + public static EncryptionKey of(byte[] key, Map metadata) { + return new EncryptionKey(key, metadata); + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java new file mode 100644 index 0000000000000..7670d5dfecc38 --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.auth; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Batteries-included key provider that loads PEM-encoded keys from local files. + * + *

The same instance can serve as both {@link PublicKeyProvider} and + * {@link PrivateKeyProvider} — register public keys for the producer side and + * private keys for the consumer side. In typical setups each side instantiates + * its own provider, configured only with the keys it actually needs. + * + *

For more complex sources (KMS, Vault, ...) implement {@link PublicKeyProvider} + * or {@link PrivateKeyProvider} directly. + * + *

{@code
+ * var keys = PemFileKeyProvider.builder()
+ *         .publicKey("orders-v1", Path.of("/etc/keys/orders-pub.pem"))
+ *         .privateKey("orders-v1", Path.of("/etc/keys/orders-priv.pem"))
+ *         .build();
+ *
+ * client.newProducer(Schema.string())
+ *       .topic("orders")
+ *       .encryptionPolicy(ProducerEncryptionPolicy.builder()
+ *               .publicKeyProvider(keys)
+ *               .keyName("orders-v1")
+ *               .build())
+ *       .create();
+ * }
+ */ +public final class PemFileKeyProvider implements CryptoKeyProvider { + + private final Map publicKeys; + private final Map privateKeys; + + private PemFileKeyProvider(Map publicKeys, Map privateKeys) { + this.publicKeys = Map.copyOf(publicKeys); + this.privateKeys = Map.copyOf(privateKeys); + } + + @Override + public CompletableFuture getPublicKey(String keyName) { + return loadKey(keyName, publicKeys, "public"); + } + + @Override + public CompletableFuture getPrivateKey(String keyName, Map metadata) { + return loadKey(keyName, privateKeys, "private"); + } + + private static CompletableFuture loadKey(String keyName, + Map keys, + String role) { + Path path = keys.get(keyName); + if (path == null) { + return CompletableFuture.failedFuture(new IllegalArgumentException( + "no " + role + " key registered for name: " + keyName)); + } + try { + byte[] bytes = Files.readAllBytes(path); + return CompletableFuture.completedFuture(EncryptionKey.of(bytes)); + } catch (IOException e) { + return CompletableFuture.failedFuture(new IOException( + "failed to read " + role + " key '" + keyName + "' from " + path, e)); + } + } + + /** + * @return a new builder for constructing a {@link PemFileKeyProvider} + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link PemFileKeyProvider}. + */ + public static final class Builder { + private final Map publicKeys = new HashMap<>(); + private final Map privateKeys = new HashMap<>(); + + private Builder() { + } + + /** + * Register a public key file under the given name. Producer-side use. + * + * @param keyName the key identifier the producer will reference + * @param path path to the PEM-encoded public key file + * @return this builder + */ + public Builder publicKey(String keyName, Path path) { + publicKeys.put(keyName, path); + return this; + } + + /** + * Register a private key file under the given name. Consumer-side use. + * + * @param keyName the key identifier the producer used to encrypt + * @param path path to the PEM-encoded private key file + * @return this builder + */ + public Builder privateKey(String keyName, Path path) { + privateKeys.put(keyName, path); + return this; + } + + /** + * @return a new {@link PemFileKeyProvider} instance + */ + public PemFileKeyProvider build() { + return new PemFileKeyProvider(publicKeys, privateKeys); + } + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java new file mode 100644 index 0000000000000..acdfe60d649f5 --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.auth; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Consumer-side SPI: load a private key by name for end-to-end message decryption. + * + *

The provider is consulted on every encrypted message the consumer receives. + * The signature is asynchronous so an implementation backed by a remote KMS + * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's IO + * thread. For local key stores, return a completed future. + * + *

For a simple file-based provider, see {@link PemFileKeyProvider}. + */ +public interface PrivateKeyProvider { + + /** + * Look up the private key for the given name. + * + *

{@code metadata} carries any hints the producer attached when the message + * was encrypted — typically a key version or rotation marker. Implementations + * that don't rotate keys can ignore it. + * + * @param keyName the key identifier the producer used to encrypt + * @param metadata producer-supplied hints about the key (never {@code null}; + * empty when the producer didn't attach any) + * @return a future completing with the private key, or completing exceptionally + * if the key cannot be found or loaded + */ + CompletableFuture getPrivateKey(String keyName, Map metadata); +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java similarity index 66% rename from pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java rename to pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java index 3b48c2a46f796..0ef6e71c2a912 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java @@ -19,23 +19,20 @@ package org.apache.pulsar.client.api.v5.auth; /** - * Action to take when a message encryption or decryption operation fails. + * Action a producer takes when message encryption fails (e.g. the + * {@link PublicKeyProvider} cannot be reached or returns no key). */ -public enum CryptoFailureAction { +public enum ProducerCryptoFailureAction { /** - * Fail the operation and return an error to the caller. + * Fail the {@code send} call. The send future completes exceptionally and + * the application sees the error. */ FAIL, /** - * Silently discard the message (consumer side only). + * Send the message unencrypted instead of failing. Useful when encryption + * is opportunistic — for example, during a key-rollout migration. */ - DISCARD, - - /** - * Deliver the message to the consumer without decrypting (consumer side only). - * The message will contain encrypted payload. - */ - CONSUME + SEND_UNENCRYPTED } diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java new file mode 100644 index 0000000000000..af2b3172c8b33 --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.auth; + +import java.util.concurrent.CompletableFuture; + +/** + * Producer-side SPI: load a public key by name for end-to-end message encryption. + * + *

The provider is consulted at producer creation time and on every key rotation. + * The signature is asynchronous so an implementation backed by a remote KMS + * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's IO + * thread. For local key stores, return a completed future. + * + *

For a simple file-based provider, see {@link PemFileKeyProvider}. + */ +public interface PublicKeyProvider { + + /** + * Look up the public key with the given name. + * + * @param keyName the key identifier as configured on the producer + * @return a future completing with the public key, or completing exceptionally + * if the key cannot be found or loaded + */ + CompletableFuture getPublicKey(String keyName); +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java index 7d8141ad558ba..f7f22f13a3719 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java @@ -23,6 +23,8 @@ *

Provides pluggable authentication via {@link org.apache.pulsar.client.api.v5.auth.Authentication} * and convenience factories in {@link org.apache.pulsar.client.api.v5.auth.AuthenticationFactory}, * as well as end-to-end encryption support via - * {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader}. + * {@link org.apache.pulsar.client.api.v5.auth.PublicKeyProvider} (producer side) and + * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} (consumer side). + * For local PEM-file-backed setups, use {@link org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider}. */ package org.apache.pulsar.client.api.v5.auth; diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java new file mode 100644 index 0000000000000..08dc86b57accf --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.config; + +import java.util.Objects; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider; + +/** + * Consumer-side end-to-end decryption configuration. + * + *

Construct via {@link #builder()}. The {@link PrivateKeyProvider} is required + * when {@link #failureAction()} is {@link ConsumerCryptoFailureAction#FAIL} (the + * default — strict mode); for {@link ConsumerCryptoFailureAction#DISCARD} or + * {@link ConsumerCryptoFailureAction#CONSUME} the provider may be omitted, in + * which case the consumer just relies on the failure action to decide what to do + * with encrypted messages it can't decrypt. + */ +@EqualsAndHashCode +@ToString +public final class ConsumerEncryptionPolicy { + + private final PrivateKeyProvider privateKeyProvider; + private final ConsumerCryptoFailureAction failureAction; + + private ConsumerEncryptionPolicy(PrivateKeyProvider privateKeyProvider, + ConsumerCryptoFailureAction failureAction) { + Objects.requireNonNull(failureAction, "failureAction must not be null"); + if (failureAction == ConsumerCryptoFailureAction.FAIL && privateKeyProvider == null) { + throw new IllegalArgumentException( + "privateKeyProvider must be set when failureAction is FAIL"); + } + this.privateKeyProvider = privateKeyProvider; + this.failureAction = failureAction; + } + + /** + * @return the provider used to load private keys for decryption, or {@code null} + * when the consumer doesn't decrypt and falls back to the failure action + * (DISCARD or CONSUME) + */ + public PrivateKeyProvider privateKeyProvider() { + return privateKeyProvider; + } + + /** + * @return the action the consumer takes when decryption fails + */ + public ConsumerCryptoFailureAction failureAction() { + return failureAction; + } + + /** + * @return a new builder for constructing a {@link ConsumerEncryptionPolicy} + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link ConsumerEncryptionPolicy}. + */ + public static final class Builder { + private PrivateKeyProvider privateKeyProvider; + private ConsumerCryptoFailureAction failureAction = ConsumerCryptoFailureAction.FAIL; + + private Builder() { + } + + /** + * Provider used to load private keys. Required. + * + * @param privateKeyProvider the private-key provider + * @return this builder + */ + public Builder privateKeyProvider(PrivateKeyProvider privateKeyProvider) { + this.privateKeyProvider = privateKeyProvider; + return this; + } + + /** + * Action to take when decryption fails. Default: {@link ConsumerCryptoFailureAction#FAIL}. + * + * @param failureAction the failure action + * @return this builder + */ + public Builder failureAction(ConsumerCryptoFailureAction failureAction) { + this.failureAction = failureAction; + return this; + } + + /** + * @return a new {@link ConsumerEncryptionPolicy} instance + */ + public ConsumerEncryptionPolicy build() { + return new ConsumerEncryptionPolicy(privateKeyProvider, failureAction); + } + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java deleted file mode 100644 index c196a8f20fb94..0000000000000 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api.v5.config; - -import java.util.List; -import java.util.Objects; -import lombok.EqualsAndHashCode; -import lombok.ToString; -import org.apache.pulsar.client.api.v5.auth.CryptoFailureAction; -import org.apache.pulsar.client.api.v5.auth.CryptoKeyReader; - -/** - * End-to-end encryption configuration for producers and consumers. - * - *

For producers, supply a {@link CryptoKeyReader} and one or more encryption key names — - * use {@link #forProducer(CryptoKeyReader, String...)} as the typical entry point. - * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link CryptoFailureAction} — - * use {@link #forConsumer(CryptoKeyReader, CryptoFailureAction)}. - * - *

{@link #builder()} is available for callers that need to tune both sides explicitly. - */ -@EqualsAndHashCode -@ToString -public final class EncryptionPolicy { - - private final CryptoKeyReader keyReader; - private final List keyNames; - private final CryptoFailureAction failureAction; - - private EncryptionPolicy(CryptoKeyReader keyReader, List keyNames, - CryptoFailureAction failureAction) { - Objects.requireNonNull(keyReader, "keyReader must not be null"); - if (keyNames == null) { - keyNames = List.of(); - } - if (failureAction == null) { - failureAction = CryptoFailureAction.FAIL; - } - this.keyReader = keyReader; - this.keyNames = List.copyOf(keyNames); - this.failureAction = failureAction; - } - - /** - * @return the crypto key reader for loading encryption/decryption keys - */ - public CryptoKeyReader keyReader() { - return keyReader; - } - - /** - * @return the producer-side encryption key names (empty list for consumer/reader) - */ - public List keyNames() { - return keyNames; - } - - /** - * @return the action to take when encryption or decryption fails - */ - public CryptoFailureAction failureAction() { - return failureAction; - } - - /** - * Create an encryption policy for producers. - * - * @param keyReader the crypto key reader for loading encryption keys - * @param keyNames one or more encryption key names to use - * @return an {@link EncryptionPolicy} configured for producer-side encryption - */ - public static EncryptionPolicy forProducer(CryptoKeyReader keyReader, String... keyNames) { - return new EncryptionPolicy(keyReader, List.of(keyNames), CryptoFailureAction.FAIL); - } - - /** - * Create an encryption policy for consumers/readers. - * - * @param keyReader the crypto key reader for loading decryption keys - * @param failureAction the action to take when decryption fails - * @return an {@link EncryptionPolicy} configured for consumer-side decryption - */ - public static EncryptionPolicy forConsumer(CryptoKeyReader keyReader, CryptoFailureAction failureAction) { - return new EncryptionPolicy(keyReader, List.of(), failureAction); - } - - /** - * @return a new builder for constructing an {@link EncryptionPolicy} - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Builder for {@link EncryptionPolicy}. - */ - public static final class Builder { - private CryptoKeyReader keyReader; - private List keyNames = List.of(); - private CryptoFailureAction failureAction = CryptoFailureAction.FAIL; - - private Builder() { - } - - /** - * Crypto key reader used to load encryption/decryption keys. Required. - * - * @param keyReader the key reader - * @return this builder - */ - public Builder keyReader(CryptoKeyReader keyReader) { - this.keyReader = keyReader; - return this; - } - - /** - * Producer-side encryption key names. Leave empty (default) for consumer-side use. - * - * @param keyNames the key names - * @return this builder - */ - public Builder keyNames(String... keyNames) { - this.keyNames = List.of(keyNames); - return this; - } - - /** - * Action to take when encryption (producer) or decryption (consumer) fails. - * Default is {@link CryptoFailureAction#FAIL}. - * - * @param failureAction the failure action - * @return this builder - */ - public Builder failureAction(CryptoFailureAction failureAction) { - this.failureAction = failureAction; - return this; - } - - /** - * @return a new {@link EncryptionPolicy} instance - */ - public EncryptionPolicy build() { - return new EncryptionPolicy(keyReader, keyNames, failureAction); - } - } -} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java new file mode 100644 index 0000000000000..0e793b3737e31 --- /dev/null +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.config; + +import java.util.List; +import java.util.Objects; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider; + +/** + * Producer-side end-to-end encryption configuration. + * + *

Construct via {@link #builder()}. Required: a {@link PublicKeyProvider} and at + * least one key name. + */ +@EqualsAndHashCode +@ToString +public final class ProducerEncryptionPolicy { + + private final PublicKeyProvider publicKeyProvider; + private final List keyNames; + private final ProducerCryptoFailureAction failureAction; + + private ProducerEncryptionPolicy(PublicKeyProvider publicKeyProvider, + List keyNames, + ProducerCryptoFailureAction failureAction) { + Objects.requireNonNull(publicKeyProvider, "publicKeyProvider must not be null"); + Objects.requireNonNull(keyNames, "keyNames must not be null"); + if (keyNames.isEmpty()) { + throw new IllegalArgumentException("at least one key name must be configured"); + } + Objects.requireNonNull(failureAction, "failureAction must not be null"); + this.publicKeyProvider = publicKeyProvider; + this.keyNames = List.copyOf(keyNames); + this.failureAction = failureAction; + } + + /** + * @return the provider used to load public keys for encryption + */ + public PublicKeyProvider publicKeyProvider() { + return publicKeyProvider; + } + + /** + * @return the configured key names; the producer encrypts each message's data + * key with every public key listed here + */ + public List keyNames() { + return keyNames; + } + + /** + * @return the action the producer takes when encryption fails + */ + public ProducerCryptoFailureAction failureAction() { + return failureAction; + } + + /** + * @return a new builder for constructing a {@link ProducerEncryptionPolicy} + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link ProducerEncryptionPolicy}. + */ + public static final class Builder { + private PublicKeyProvider publicKeyProvider; + private List keyNames = List.of(); + private ProducerCryptoFailureAction failureAction = ProducerCryptoFailureAction.FAIL; + + private Builder() { + } + + /** + * Provider used to load public keys. Required. + * + * @param publicKeyProvider the public-key provider + * @return this builder + */ + public Builder publicKeyProvider(PublicKeyProvider publicKeyProvider) { + this.publicKeyProvider = publicKeyProvider; + return this; + } + + /** + * Single key name shortcut — equivalent to {@code keyNames(List.of(name))}. + * + * @param keyName the key name + * @return this builder + */ + public Builder keyName(String keyName) { + this.keyNames = List.of(keyName); + return this; + } + + /** + * Multiple key names. The producer encrypts each message's data key with + * every public key listed here, so any consumer with one of the matching + * private keys can decrypt. + * + * @param keyNames one or more key names + * @return this builder + */ + public Builder keyNames(String... keyNames) { + this.keyNames = List.of(keyNames); + return this; + } + + /** + * Action to take when encryption fails. Default: {@link ProducerCryptoFailureAction#FAIL}. + * + * @param failureAction the failure action + * @return this builder + */ + public Builder failureAction(ProducerCryptoFailureAction failureAction) { + this.failureAction = failureAction; + return this; + } + + /** + * @return a new {@link ProducerEncryptionPolicy} instance + */ + public ProducerEncryptionPolicy build() { + return new ProducerEncryptionPolicy(publicKeyProvider, keyNames, failureAction); + } + } +} diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java index f05cfbd4a21ce..b3439d31c0a46 100644 --- a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api.v5; +import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.Map; @@ -26,14 +27,18 @@ import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer; import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer; import org.apache.pulsar.client.api.v5.auth.AuthenticationFactory; +import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider; import org.apache.pulsar.client.api.v5.config.BackoffPolicy; import org.apache.pulsar.client.api.v5.config.BatchingPolicy; import org.apache.pulsar.client.api.v5.config.CompressionPolicy; import org.apache.pulsar.client.api.v5.config.CompressionType; import org.apache.pulsar.client.api.v5.config.ConnectionPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy; import org.apache.pulsar.client.api.v5.config.MemorySize; import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy; +import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; import org.apache.pulsar.client.api.v5.config.TlsPolicy; import org.apache.pulsar.client.api.v5.schema.Schema; @@ -300,6 +305,42 @@ void queueConsumerWithDLQ(PulsarClient client) throws Exception { } } + /** + * End-to-end encryption — producer encrypts with a public key, consumer decrypts + * with the matching private key. The {@link PemFileKeyProvider} is the + * batteries-included reader for local PEM files; for remote key stores + * (KMS, Vault, ...) implement {@link org.apache.pulsar.client.api.v5.auth.PublicKeyProvider} + * or {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} directly. + */ + void encryptedProducerAndConsumer(PulsarClient client) throws Exception { + try (var producer = client.newProducer(Schema.string()) + .topic("orders") + .encryptionPolicy(ProducerEncryptionPolicy.builder() + .publicKeyProvider(PemFileKeyProvider.builder() + .publicKey("orders-v1", Path.of("/etc/keys/orders-pub.pem")) + .build()) + .keyName("orders-v1") + .build()) + .create()) { + producer.newMessage().value("classified payload").send(); + } + + try (var consumer = client.newQueueConsumer(Schema.string()) + .topic("orders") + .subscriptionName("trusted") + .encryptionPolicy(ConsumerEncryptionPolicy.builder() + .privateKeyProvider(PemFileKeyProvider.builder() + .privateKey("orders-v1", Path.of("/etc/keys/orders-priv.pem")) + .build()) + .failureAction(ConsumerCryptoFailureAction.FAIL) + .build()) + .subscribe()) { + Message msg = consumer.receive(Duration.ofSeconds(5)); + // ... use msg.value() + consumer.acknowledge(msg.id()); + } + } + /** Async queue consumer — high-throughput parallel processing. */ void asyncQueueConsumer(PulsarClient client) throws Exception { try (var consumer = client.newQueueConsumer(Schema.json(Order.class)) diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java new file mode 100644 index 0000000000000..74fbc5128fac6 --- /dev/null +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.auth; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletionException; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit coverage for {@link PemFileKeyProvider}: registering a key file → reading it + * back as bytes; missing key → failed future; missing file on disk → failed future. + */ +public class PemFileKeyProviderTest { + + private Path tempDir; + + @BeforeMethod + public void setUp() throws IOException { + tempDir = Files.createTempDirectory("pem-file-key-provider-test"); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + if (tempDir != null) { + try (var stream = Files.walk(tempDir)) { + stream.sorted(java.util.Comparator.reverseOrder()) + .forEach(p -> { + try { + Files.deleteIfExists(p); + } catch (IOException ignored) { + // best effort + } + }); + } + } + } + + @Test + public void testReadRegisteredPublicKey() throws Exception { + Path keyFile = Files.writeString(tempDir.resolve("pub.pem"), "PUBLIC-KEY-BYTES"); + var provider = PemFileKeyProvider.builder() + .publicKey("orders-v1", keyFile) + .build(); + + EncryptionKey key = provider.getPublicKey("orders-v1").get(); + assertEquals(new String(key.key()), "PUBLIC-KEY-BYTES"); + assertTrue(key.metadata().isEmpty()); + } + + @Test + public void testReadRegisteredPrivateKey() throws Exception { + Path keyFile = Files.writeString(tempDir.resolve("priv.pem"), "PRIVATE-KEY-BYTES"); + var provider = PemFileKeyProvider.builder() + .privateKey("orders-v1", keyFile) + .build(); + + EncryptionKey key = provider.getPrivateKey("orders-v1", Map.of()).get(); + assertEquals(new String(key.key()), "PRIVATE-KEY-BYTES"); + } + + @Test + public void testUnknownKeyNameFailsFuture() { + var provider = PemFileKeyProvider.builder().build(); + + var ex = expectThrows(CompletionException.class, + () -> provider.getPublicKey("missing").join()); + assertTrue(ex.getCause() instanceof IllegalArgumentException, + "expected IllegalArgumentException, got: " + ex.getCause()); + } + + @Test + public void testMissingFileOnDiskFailsFuture() { + Path missing = tempDir.resolve("does-not-exist.pem"); + var provider = PemFileKeyProvider.builder() + .publicKey("orders-v1", missing) + .build(); + + var ex = expectThrows(CompletionException.class, + () -> provider.getPublicKey("orders-v1").join()); + assertTrue(ex.getCause() instanceof IOException, + "expected IOException, got: " + ex.getCause()); + } + + @Test + public void testProviderUsableAsBothSides() throws Exception { + Path pub = Files.writeString(tempDir.resolve("pub.pem"), "PUB"); + Path priv = Files.writeString(tempDir.resolve("priv.pem"), "PRIV"); + var provider = PemFileKeyProvider.builder() + .publicKey("k", pub) + .privateKey("k", priv) + .build(); + + assertEquals(new String(provider.getPublicKey("k").get().key()), "PUB"); + assertEquals(new String(provider.getPrivateKey("k", Map.of()).get().key()), "PRIV"); + } +} diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java new file mode 100644 index 0000000000000..079b5b2962e6e --- /dev/null +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5.config; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.expectThrows; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.EncryptionKey; +import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider; +import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider; +import org.testng.annotations.Test; + +/** + * Builder validation for {@link ProducerEncryptionPolicy} and + * {@link ConsumerEncryptionPolicy}. + */ +public class EncryptionPolicyTest { + + private static final PublicKeyProvider STUB_PUB = + keyName -> CompletableFuture.completedFuture(EncryptionKey.of(new byte[0])); + private static final PrivateKeyProvider STUB_PRIV = + (keyName, metadata) -> CompletableFuture.completedFuture(EncryptionKey.of(new byte[0])); + + // --- Producer side --- + + @Test + public void testProducerPolicyMissingProviderRejected() { + expectThrows(NullPointerException.class, () -> + ProducerEncryptionPolicy.builder() + .keyName("k1") + .build()); + } + + @Test + public void testProducerPolicyMissingKeyNameRejected() { + expectThrows(IllegalArgumentException.class, () -> + ProducerEncryptionPolicy.builder() + .publicKeyProvider(STUB_PUB) + .build()); + } + + @Test + public void testProducerPolicyDefaultsFailureActionToFail() { + ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder() + .publicKeyProvider(STUB_PUB) + .keyName("k1") + .build(); + assertSame(p.failureAction(), ProducerCryptoFailureAction.FAIL); + assertSame(p.publicKeyProvider(), STUB_PUB); + assertEquals(p.keyNames(), List.of("k1")); + } + + @Test + public void testProducerPolicyMultipleKeyNames() { + ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder() + .publicKeyProvider(STUB_PUB) + .keyNames("k1", "k2", "k3") + .failureAction(ProducerCryptoFailureAction.SEND_UNENCRYPTED) + .build(); + assertEquals(p.keyNames(), List.of("k1", "k2", "k3")); + assertSame(p.failureAction(), ProducerCryptoFailureAction.SEND_UNENCRYPTED); + } + + // --- Consumer side --- + + @Test + public void testConsumerPolicyFailModeRequiresProvider() { + // FAIL is the default failure action and requires a provider. + expectThrows(IllegalArgumentException.class, () -> + ConsumerEncryptionPolicy.builder().build()); + } + + @Test + public void testConsumerPolicyDiscardModeWithoutProviderAllowed() { + // DISCARD / CONSUME accept a null provider (consumer doesn't decrypt). + ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder() + .failureAction(ConsumerCryptoFailureAction.DISCARD) + .build(); + assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD); + org.testng.Assert.assertNull(p.privateKeyProvider()); + } + + @Test + public void testConsumerPolicyDefaultsFailureActionToFail() { + ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder() + .privateKeyProvider(STUB_PRIV) + .build(); + assertSame(p.failureAction(), ConsumerCryptoFailureAction.FAIL); + assertSame(p.privateKeyProvider(), STUB_PRIV); + } + + @Test + public void testConsumerPolicyExplicitFailureAction() { + ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder() + .privateKeyProvider(STUB_PRIV) + .failureAction(ConsumerCryptoFailureAction.DISCARD) + .build(); + assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD); + } + + // --- EncryptionKey factories --- + + @Test + public void testEncryptionKeyFactories() { + byte[] bytes = "key-material".getBytes(); + EncryptionKey k1 = EncryptionKey.of(bytes); + assertSame(k1.key(), bytes); + assertEquals(k1.metadata(), Map.of()); + + EncryptionKey k2 = EncryptionKey.of(bytes, Map.of("version", "v1")); + assertEquals(k2.metadata(), Map.of("version", "v1")); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java index c65e16d0fc51e..119702e949748 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java @@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.v5.CheckpointConsumer; import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder; import org.apache.pulsar.client.api.v5.PulsarClientException; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.schema.Schema; import org.apache.pulsar.common.api.proto.ScalableConsumerType; import org.apache.pulsar.common.naming.TopicName; @@ -40,7 +40,7 @@ final class CheckpointConsumerBuilderV5 implements CheckpointConsumerBuilder< private Checkpoint startPosition = CheckpointV5.LATEST; private String consumerName; private String consumerGroup; - private EncryptionPolicy encryptionPolicy; + private ConsumerEncryptionPolicy encryptionPolicy; CheckpointConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) { this.client = client; @@ -116,7 +116,7 @@ public CheckpointConsumerBuilderV5 consumerGroup(String group) { } @Override - public CheckpointConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { + public CheckpointConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) { this.encryptionPolicy = policy; return this; } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java index d9d2f7e82581d..e5161d848f045 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java @@ -21,32 +21,63 @@ import java.util.Map; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider; +import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider; /** - * Adapts a V5 {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader} to the V4 - * {@link CryptoKeyReader} interface used by the underlying client implementation. + * Bridges the V5 split-by-role key SPIs ({@link PublicKeyProvider}, + * {@link PrivateKeyProvider}) to the v4 {@link CryptoKeyReader}, which has both + * methods on a single interface. + * + *

v4's {@code CryptoKeyReader} is synchronous, so the adapter blocks on the V5 + * provider's {@link java.util.concurrent.CompletableFuture future} via {@code join()}. + * For local providers (e.g. {@code PemFileKeyProvider}) the future is already + * complete; for remote providers (e.g. KMS-backed) this blocks the v4 thread that + * called {@code getXxxKey} — same constraint v4 already imposes today. Async + * end-to-end requires deeper plumbing into {@code MessageCrypto}; out of scope here. */ final class CryptoKeyReaderAdapter implements CryptoKeyReader { - private final org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader; + private final PublicKeyProvider publicKeyProvider; + private final PrivateKeyProvider privateKeyProvider; + + private CryptoKeyReaderAdapter(PublicKeyProvider publicKeyProvider, + PrivateKeyProvider privateKeyProvider) { + this.publicKeyProvider = publicKeyProvider; + this.privateKeyProvider = privateKeyProvider; + } - private CryptoKeyReaderAdapter(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader) { - this.v5Reader = v5Reader; + /** + * Producer-side adapter: only {@link CryptoKeyReader#getPublicKey} is supported. + */ + static CryptoKeyReader forProducer(PublicKeyProvider provider) { + return new CryptoKeyReaderAdapter(provider, null); } - static CryptoKeyReader wrap(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader) { - return new CryptoKeyReaderAdapter(v5Reader); + /** + * Consumer-side adapter: only {@link CryptoKeyReader#getPrivateKey} is supported. + */ + static CryptoKeyReader forConsumer(PrivateKeyProvider provider) { + return new CryptoKeyReaderAdapter(null, provider); } @Override public EncryptionKeyInfo getPublicKey(String keyName, Map metadata) { - var v5Key = v5Reader.getPublicKey(keyName, metadata); + if (publicKeyProvider == null) { + throw new UnsupportedOperationException( + "getPublicKey called on a consumer-side CryptoKeyReaderAdapter"); + } + var v5Key = publicKeyProvider.getPublicKey(keyName).join(); return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata()); } @Override public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) { - var v5Key = v5Reader.getPrivateKey(keyName, metadata); + if (privateKeyProvider == null) { + throw new UnsupportedOperationException( + "getPrivateKey called on a producer-side CryptoKeyReaderAdapter"); + } + var v5Key = privateKeyProvider.getPrivateKey(keyName, metadata).join(); return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata()); } } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java index d5c261ede8950..922eb95885c52 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java @@ -29,8 +29,8 @@ import org.apache.pulsar.client.api.v5.config.BatchingPolicy; import org.apache.pulsar.client.api.v5.config.ChunkingPolicy; import org.apache.pulsar.client.api.v5.config.CompressionPolicy; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; import org.apache.pulsar.client.api.v5.config.ProducerAccessMode; +import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy; import org.apache.pulsar.client.api.v5.schema.Schema; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; @@ -156,12 +156,13 @@ public ProducerBuilderV5 chunkingPolicy(ChunkingPolicy policy) { } @Override - public ProducerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { - conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader())); + public ProducerBuilderV5 encryptionPolicy(ProducerEncryptionPolicy policy) { + conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forProducer(policy.publicKeyProvider())); conf.setEncryptionKeys(new HashSet<>(policy.keyNames())); - conf.setCryptoFailureAction( - org.apache.pulsar.client.api.ProducerCryptoFailureAction.valueOf( - policy.failureAction().name())); + conf.setCryptoFailureAction(switch (policy.failureAction()) { + case FAIL -> org.apache.pulsar.client.api.ProducerCryptoFailureAction.FAIL; + case SEND_UNENCRYPTED -> org.apache.pulsar.client.api.ProducerCryptoFailureAction.SEND; + }); return this; } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java index 7f9e32963be84..d153a77b0f4ff 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java @@ -26,8 +26,8 @@ import org.apache.pulsar.client.api.v5.QueueConsumer; import org.apache.pulsar.client.api.v5.QueueConsumerBuilder; import org.apache.pulsar.client.api.v5.config.BackoffPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy; import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; import org.apache.pulsar.client.api.v5.schema.Schema; @@ -205,8 +205,10 @@ public QueueConsumerBuilderV5 deadLetterPolicy(DeadLetterPolicy policy) { } @Override - public QueueConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { - conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader())); + public QueueConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) { + if (policy.privateKeyProvider() != null) { + conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider())); + } conf.setCryptoFailureAction( org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf( policy.failureAction().name())); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java index 6b9f6b67e0736..24cf63dacbc1e 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java @@ -27,7 +27,7 @@ import org.apache.pulsar.client.api.v5.PulsarClientException; import org.apache.pulsar.client.api.v5.StreamConsumer; import org.apache.pulsar.client.api.v5.StreamConsumerBuilder; -import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; +import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy; import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; import org.apache.pulsar.client.api.v5.schema.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -178,8 +178,10 @@ public StreamConsumerBuilderV5 replicateSubscriptionState(boolean replicate) } @Override - public StreamConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { - conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader())); + public StreamConsumerBuilderV5 encryptionPolicy(ConsumerEncryptionPolicy policy) { + if (policy.privateKeyProvider() != null) { + conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider())); + } conf.setCryptoFailureAction( org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf( policy.failureAction().name()));