diff --git a/pom.xml b/pom.xml
index 304a128..c30bc44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,11 @@
commons-cli
1.2
+
+ org.jboss.spec.javax.net.ssl
+ jboss-jsse-api_8.0_spec
+ 1.0.0.Final
+
commons-codec
commons-codec
diff --git a/src/main/java/com/yugabyte/sample/apps/AppBase.java b/src/main/java/com/yugabyte/sample/apps/AppBase.java
index efa8406..c0e5901 100644
--- a/src/main/java/com/yugabyte/sample/apps/AppBase.java
+++ b/src/main/java/com/yugabyte/sample/apps/AppBase.java
@@ -32,6 +32,13 @@
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import javax.net.ssl.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+
import com.datastax.driver.core.ConsistencyLevel;
import org.apache.log4j.Logger;
@@ -39,6 +46,7 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
@@ -179,11 +187,60 @@ protected synchronized void createCassandraClient(List contactPoin
LOG.info("Connecting to nodes: " + builder.getContactPoints().stream()
.map(it -> it.toString()).collect(Collectors.joining(",")));
setupLoadBalancingPolicy(builder);
- cassandra_cluster =
- builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
+ if (appConfig.enableSSL) {
+ LOG.info("Attempting SSL connection to this cluster");
+ try {
+ SSLContext context = getSSLContext(appConfig.ssl_truststore,
+ appConfig.ssl_truststorepassword,
+ appConfig.ssl_keystore,
+ appConfig.ssl_keystorepassword);
+
+ LOG.info("Got the SSL context");
+
+ JdkSSLOptions sslOptions = JdkSSLOptions.builder()
+ .withSSLContext(context)
+ .build();
+
+ if (appConfig.enableAuth) {
+ LOG.info("Connecting with user: " + appConfig.auth_user);
+ cassandra_cluster =
+ builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
+ .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
+ .withCredentials(appConfig.auth_user, appConfig.auth_password)
+ .withSSL(sslOptions)
+ .build();
+ } else {
+ cassandra_cluster =
+ builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
+ .withSSL(sslOptions)
.build();
- LOG.debug("Connected to cluster: " + cassandra_cluster.getClusterName());
+ }
+
+ } catch (Exception e) {
+ LOG.fatal("Caught exception on getSSLContext", e);
+ }
+ } else {
+ if (appConfig.enableAuth) {
+ LOG.info("Connecting with user: " + appConfig.auth_user);
+ cassandra_cluster =
+ builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
+ .withCredentials(appConfig.auth_user, appConfig.auth_password)
+ .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
+ .build();
+ } else {
+ cassandra_cluster =
+ builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
+ .withCredentials(appConfig.auth_user, appConfig.auth_password)
+ .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
+ .build();
+ }
+ }
+ if (appConfig.enableSSL) {
+ LOG.debug("Connected to cluster w/SSL: " + cassandra_cluster.getClusterName());
+ } else {
+ LOG.debug("Connected to cluster: " + cassandra_cluster.getClusterName());
+ }
}
if (cassandra_session == null) {
LOG.debug("Creating a session...");
@@ -206,6 +263,40 @@ protected void setupLoadBalancingPolicy(Cluster.Builder builder) {
}
}
+ private static SSLContext getSSLContext(String truststorePath,
+ String truststorePassword,
+ String keystorePath,
+ String keystorePassword)
+ throws Exception {
+
+ FileInputStream truststore = null;
+ KeyStore ks = KeyStore.getInstance("jks");
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ try {
+ truststore = new FileInputStream(truststorePath);
+ ks.load(truststore, truststorePassword.toCharArray());
+ TrustManagerFactory tmf =
+ TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ks);
+ ctx.init(null, tmf.getTrustManagers(), null);
+ } catch (KeyStoreException kse) {
+ LOG.fatal(kse.getMessage(), kse);
+ } catch (CertificateException e) {
+ LOG.fatal(e.getMessage(), e);
+ } catch (NoSuchAlgorithmException e) {
+ LOG.fatal(e.getMessage(), e);
+ } catch (KeyManagementException e) {
+ LOG.fatal(e.getMessage(), e);
+ } catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ } finally {
+ if (truststore!=null) {
+ truststore.close();
+ }
+ }
+ return ctx;
+ }
+
/**
* Helper method to create a Jedis client.
* @return a Jedis (Redis client) object.
diff --git a/src/main/java/com/yugabyte/sample/apps/AppConfig.java b/src/main/java/com/yugabyte/sample/apps/AppConfig.java
index d3352de..ff21f47 100644
--- a/src/main/java/com/yugabyte/sample/apps/AppConfig.java
+++ b/src/main/java/com/yugabyte/sample/apps/AppConfig.java
@@ -133,7 +133,15 @@ public static enum Type {
public long runTimeSeconds = -1;
public String localDc;
-
+ public boolean enableSSL = false;
+ public String ssl_keystore;
+ public String ssl_truststore;
+ public String ssl_keystorepassword;
+ public String ssl_truststorepassword;
+
+ public boolean enableAuth = false;
+ public String auth_user;
+ public String auth_password;
// Used by CassandraPersonalization workload.
// Number of stores.
diff --git a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
index d4ead15..82de9e9 100644
--- a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
+++ b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
@@ -266,6 +266,70 @@ public void initialize(CommandLine commandLine) throws ClassNotFoundException {
}
AppBase.appConfig.localDc = commandLine.getOptionValue("with_local_dc");
}
+ if (commandLine.hasOption("auth_user") && commandLine.hasOption("auth_password")) {
+ AppBase.appConfig.enableAuth = true;
+ AppBase.appConfig.auth_user = commandLine.getOptionValue("auth_user");
+ AppBase.appConfig.auth_password = commandLine.getOptionValue("auth_password");
+ }
+ else {
+ AppBase.appConfig.enableAuth = false;
+ }
+ if (commandLine.hasOption("auth_user") && !commandLine.hasOption("auth_password")) {
+ LOG.fatal("--auth_user requires --auth_password to be specified");
+ System.exit(-1);
+ }
+ if (commandLine.hasOption("auth_password") && !commandLine.hasOption("auth_user")) {
+ LOG.fatal("--auth_password requires --auth_user to be specified");
+ System.exit(-1);
+ }
+ if (commandLine.hasOption("enable_ssl")) {
+ AppBase.appConfig.enableSSL = true;
+ }
+ else {
+ AppBase.appConfig.enableSSL = false;
+ }
+ if (AppBase.appConfig.enableSSL && !commandLine.hasOption("ssl_truststore")) {
+ LOG.fatal("--ssl_truststore must be provided with --enableSSL");
+ System.exit(-1);
+ }
+ if (AppBase.appConfig.enableSSL && commandLine.hasOption("ssl_truststore") &&
+ !commandLine.hasOption("ssl_truststorepassword")) {
+ LOG.fatal("--enableSSL and --ssltruststore usually require --ssl_truststorepassword");
+ System.exit(1);
+ }
+ if (AppBase.appConfig.enableSSL && commandLine.hasOption("ssl_keystore") &&
+ !commandLine.hasOption("ssl_keystorepassword")) {
+ LOG.fatal("--enableSSL and --sslkeystore usually require --ssl_keystorepassword");
+ System.exit(-1);
+ }
+ if (commandLine.hasOption("ssl_keystore")) {
+ if (AppBase.appConfig.enableSSL == false) {
+ LOG.fatal("--ssl_keystore must be used with --enableSSL");
+ System.exit(-1);
+ }
+ AppBase.appConfig.ssl_keystore = commandLine.getOptionValue("ssl_keystore");
+ }
+ if (commandLine.hasOption("ssl_truststore")) {
+ if (AppBase.appConfig.enableSSL == false) {
+ LOG.fatal("--ssl_truststore must be used with --enableSSL");
+ System.exit(-1);
+ }
+ AppBase.appConfig.ssl_truststore = commandLine.getOptionValue("ssl_truststore");
+ }
+ if (commandLine.hasOption("ssl_keystorepassword")) {
+ if (AppBase.appConfig.enableSSL == false) {
+ LOG.fatal("--ssl_keystorepassword must be used with --enableSSL");
+ System.exit(-1);
+ }
+ AppBase.appConfig.ssl_keystorepassword = commandLine.getOptionValue("ssl_keystorepassword");
+ }
+ if (commandLine.hasOption("ssl_truststorepassword")) {
+ if (AppBase.appConfig.enableSSL == false) {
+ LOG.fatal("--ssl_truststorepassword must be used with --enableSSL");
+ System.exit(-1);
+ }
+ AppBase.appConfig.ssl_truststorepassword = commandLine.getOptionValue("ssl_truststorepassword");
+ }
if (commandLine.hasOption("use_redis_cluster")) {
AppBase.appConfig.useRedisCluster = true;
}
@@ -556,6 +620,13 @@ public static CmdLineOpts createFromArgs(String[] args) throws Exception {
"[CassandraBatchTimeseries] Time unit delta back from current time unit.");
options.addOption("with_local_dc", true, "Local DC name.");
+ options.addOption("enable_ssl", false, "Enable SSL connections to cluster");
+ options.addOption("ssl_truststore", true, "Path to trust store");
+ options.addOption("ssl_truststorepassword", true, "Password for trust store");
+ options.addOption("ssl_keystore", true, "Path for client keystore");
+ options.addOption("ssl_keystorepassword", true, "Password for keystore");
+ options.addOption("auth_user", true, "Username for authentication");
+ options.addOption("auth_password", true, "Password for authentication");
// Options for CassandraSparkWordCount app.
options.addOption("wordcount_input_file", true,