Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.net.ssl</groupId>
<artifactId>jboss-jsse-api_8.0_spec</artifactId>
<version>1.0.0.Final</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
97 changes: 94 additions & 3 deletions src/main/java/com/yugabyte/sample/apps/AppBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@
import java.util.zip.Adler32;
import java.util.zip.Checksum;

import java.io.FileInputStream;
import java.io.IOException;

import javax.net.ssl.*;
import java.security.*;
import java.security.cert.CertificateException;

import com.datastax.driver.core.ConsistencyLevel;
import org.apache.log4j.Logger;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
Expand Down Expand Up @@ -179,11 +187,60 @@ protected synchronized void createCassandraClient(List<ContactPoint> contactPoin
LOG.info("Connecting to nodes: " + builder.getContactPoints().stream()
.map(it -> it.toString()).collect(Collectors.joining(",")));
setupLoadBalancingPolicy(builder);
cassandra_cluster =
builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
if (appConfig.enableSSL) {
LOG.info("Attempting SSL connection to this cluster");
try {
SSLContext context = getSSLContext(appConfig.ssl_truststore,
appConfig.ssl_truststorepassword,
appConfig.ssl_keystore,
appConfig.ssl_keystorepassword);

LOG.info("Got the SSL context");

JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(context)
.build();

if (appConfig.enableAuth) {
LOG.info("Connecting with user: " + appConfig.auth_user);
cassandra_cluster =
builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
.withCredentials(appConfig.auth_user, appConfig.auth_password)
.withSSL(sslOptions)
.build();
} else {
cassandra_cluster =
builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
.withSSL(sslOptions)
.build();
LOG.debug("Connected to cluster: " + cassandra_cluster.getClusterName());
}

} catch (Exception e) {
LOG.fatal("Caught exception on getSSLContext", e);
}
} else {
if (appConfig.enableAuth) {
LOG.info("Connecting with user: " + appConfig.auth_user);
cassandra_cluster =
builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
.withCredentials(appConfig.auth_user, appConfig.auth_password)
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
.build();
} else {
cassandra_cluster =
builder.withQueryOptions(new QueryOptions().setDefaultIdempotence(true))
.withCredentials(appConfig.auth_user, appConfig.auth_password)
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
.build();
}
}
if (appConfig.enableSSL) {
LOG.debug("Connected to cluster w/SSL: " + cassandra_cluster.getClusterName());
} else {
LOG.debug("Connected to cluster: " + cassandra_cluster.getClusterName());
}
}
if (cassandra_session == null) {
LOG.debug("Creating a session...");
Expand All @@ -206,6 +263,40 @@ protected void setupLoadBalancingPolicy(Cluster.Builder builder) {
}
}

private static SSLContext getSSLContext(String truststorePath,
String truststorePassword,
String keystorePath,
String keystorePassword)
throws Exception {

FileInputStream truststore = null;
KeyStore ks = KeyStore.getInstance("jks");
SSLContext ctx = SSLContext.getInstance("TLS");
try {
truststore = new FileInputStream(truststorePath);
ks.load(truststore, truststorePassword.toCharArray());
TrustManagerFactory tmf =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
ctx.init(null, tmf.getTrustManagers(), null);
} catch (KeyStoreException kse) {
LOG.fatal(kse.getMessage(), kse);
} catch (CertificateException e) {
LOG.fatal(e.getMessage(), e);
} catch (NoSuchAlgorithmException e) {
LOG.fatal(e.getMessage(), e);
} catch (KeyManagementException e) {
LOG.fatal(e.getMessage(), e);
} catch (IOException e) {
LOG.fatal(e.getMessage(), e);
} finally {
if (truststore!=null) {
truststore.close();
}
}
return ctx;
}

/**
* Helper method to create a Jedis client.
* @return a Jedis (Redis client) object.
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/yugabyte/sample/apps/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,15 @@ public static enum Type {
public long runTimeSeconds = -1;

public String localDc;

public boolean enableSSL = false;
public String ssl_keystore;
public String ssl_truststore;
public String ssl_keystorepassword;
public String ssl_truststorepassword;

public boolean enableAuth = false;
public String auth_user;
public String auth_password;
// Used by CassandraPersonalization workload.

// Number of stores.
Expand Down
71 changes: 71 additions & 0 deletions src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,70 @@ public void initialize(CommandLine commandLine) throws ClassNotFoundException {
}
AppBase.appConfig.localDc = commandLine.getOptionValue("with_local_dc");
}
if (commandLine.hasOption("auth_user") && commandLine.hasOption("auth_password")) {
AppBase.appConfig.enableAuth = true;
AppBase.appConfig.auth_user = commandLine.getOptionValue("auth_user");
AppBase.appConfig.auth_password = commandLine.getOptionValue("auth_password");
}
else {
AppBase.appConfig.enableAuth = false;
}
if (commandLine.hasOption("auth_user") && !commandLine.hasOption("auth_password")) {
LOG.fatal("--auth_user requires --auth_password to be specified");
System.exit(-1);
}
if (commandLine.hasOption("auth_password") && !commandLine.hasOption("auth_user")) {
LOG.fatal("--auth_password requires --auth_user to be specified");
System.exit(-1);
}
if (commandLine.hasOption("enable_ssl")) {
AppBase.appConfig.enableSSL = true;
}
else {
AppBase.appConfig.enableSSL = false;
}
if (AppBase.appConfig.enableSSL && !commandLine.hasOption("ssl_truststore")) {
LOG.fatal("--ssl_truststore must be provided with --enableSSL");
System.exit(-1);
}
if (AppBase.appConfig.enableSSL && commandLine.hasOption("ssl_truststore") &&
!commandLine.hasOption("ssl_truststorepassword")) {
LOG.fatal("--enableSSL and --ssltruststore usually require --ssl_truststorepassword");
System.exit(1);
}
if (AppBase.appConfig.enableSSL && commandLine.hasOption("ssl_keystore") &&
!commandLine.hasOption("ssl_keystorepassword")) {
LOG.fatal("--enableSSL and --sslkeystore usually require --ssl_keystorepassword");
System.exit(-1);
}
if (commandLine.hasOption("ssl_keystore")) {
if (AppBase.appConfig.enableSSL == false) {
LOG.fatal("--ssl_keystore must be used with --enableSSL");
System.exit(-1);
}
AppBase.appConfig.ssl_keystore = commandLine.getOptionValue("ssl_keystore");
}
if (commandLine.hasOption("ssl_truststore")) {
if (AppBase.appConfig.enableSSL == false) {
LOG.fatal("--ssl_truststore must be used with --enableSSL");
System.exit(-1);
}
AppBase.appConfig.ssl_truststore = commandLine.getOptionValue("ssl_truststore");
}
if (commandLine.hasOption("ssl_keystorepassword")) {
if (AppBase.appConfig.enableSSL == false) {
LOG.fatal("--ssl_keystorepassword must be used with --enableSSL");
System.exit(-1);
}
AppBase.appConfig.ssl_keystorepassword = commandLine.getOptionValue("ssl_keystorepassword");
}
if (commandLine.hasOption("ssl_truststorepassword")) {
if (AppBase.appConfig.enableSSL == false) {
LOG.fatal("--ssl_truststorepassword must be used with --enableSSL");
System.exit(-1);
}
AppBase.appConfig.ssl_truststorepassword = commandLine.getOptionValue("ssl_truststorepassword");
}
if (commandLine.hasOption("use_redis_cluster")) {
AppBase.appConfig.useRedisCluster = true;
}
Expand Down Expand Up @@ -556,6 +620,13 @@ public static CmdLineOpts createFromArgs(String[] args) throws Exception {
"[CassandraBatchTimeseries] Time unit delta back from current time unit.");

options.addOption("with_local_dc", true, "Local DC name.");
options.addOption("enable_ssl", false, "Enable SSL connections to cluster");
options.addOption("ssl_truststore", true, "Path to trust store");
options.addOption("ssl_truststorepassword", true, "Password for trust store");
options.addOption("ssl_keystore", true, "Path for client keystore");
options.addOption("ssl_keystorepassword", true, "Password for keystore");
options.addOption("auth_user", true, "Username for authentication");
options.addOption("auth_password", true, "Password for authentication");

// Options for CassandraSparkWordCount app.
options.addOption("wordcount_input_file", true,
Expand Down