Monday, June 19, 2017

Connecting to Cassandra Cluster via SSH Tunnels with the DataStax Java Client/Driver

Introduction

This is probably a little obscure, but if you have only one choice for connecting into a remote environment, like AWS, and that happens to be an SSH connection with tunnels to a "jump box", and you need to connect to a Cassandra cluster using the DataStax driver, I suspect that's why you found this, so read on.

The problem is...

DataStax wrote their Java driver to use Netty instead of using the core network connection classes in a typical Java virtual machine.  Netty is written to use Java's NIO API.  NIO does not recognize the JVM-wide settings like socksProxyHost, so it always attempts to make a direct connection to whatever host/port the Java code says.

The other part of the problem is...

Connecting the DataStax client/driver to one node of a Cassandra cluster results in a handshake that retrieves network information for the other nodes in the cluster and tries to open additional connections.  If the primary connection is established via an SSH tunnel, the network information for the rest of the cluster nodes is likely to still be routable only within the remote environment.  That doesn't work even if you created additional SSH tunnels.

The solution (in a nutshell)...

Create tunnels for all of the cluster nodes, and register an instance of the DataStax AddressTranslater when the connection to Cassandra is opened.

The solution (details)...

The JSCH library makes it somewhat easy to open an SSH connection with tunnels.

Assuming tunnelDefinitions is a collection of simple TunnelDefinition POJOs to contain a set of attributes for a local- to-remote host/port mappings.
A three node cluster might have mappings with bindAddress:localPort:remoteHost:remotePort like:

  • 127.0.0.1:19042:cassandra-cluster-node1:9042
  • 127.0.0.1:29042:cassandra-cluster-node2:9042
  • 127.0.0.1:39042:cassandra-cluster-node3:9042
public void connect(String jumpUserName, String sshPrivateKeyFilePath, String jumpHost, int jumpPort) {
    this.jumpHost = jumpHost;
    this.jumpPort = jumpPort;
    jsch = new JSch();
    try {
        LOGGER.info("Using SSH PK identity file: " + sshPrivateKeyFilePath);
        // Point to the PK file for authentication        jsch.addIdentity(sshPrivateKeyFilePath);
        LOGGER.info("Opening SSH Session to Jumpbox: " + jumpHost + ":" + jumpPort + " with username " + jumpUserName);
        session=jsch.getSession(jumpUserName, jumpHost, jumpPort);
        Properties config = new java.util.Properties();
        config.put("StrictHostKeyChecking", "no");
        session.setConfig(config);
        session.connect();
        for (TunnelDefinition tunnelDefinition : tunnelDefinitions) {
            // Note: Each call to "set" is actually an "add".
            // Note: The bind addresses are typically localhost or 127.0.0.1.
            session.setPortForwardingL(tunnelDefinition.bindAddress, 
                tunnelDefinition.localPort, tunnelDefinition.remoteHost, 
                tunnelDefinition.remotePort);
        }
    } catch (JSchException e) {
        e.printStackTrace();
    }
}

Then, using the same tunnelDefinitions to implement DataStax AddressTranslater...
AddressTranslater customAddressTranslater = new AddressTranslater() {
    private SshTunnelHelper sshTunnelHelperRef = sshTunnelHelper;
    private Map<String, InetSocketAddress> translationMappings = new HashMap<>();

    @Override    public InetSocketAddress translate(InetSocketAddress inetSocketAddress) {
        // Lazy Load        if (translationMappings.isEmpty()) {
            for (SshTunnelHelper.TunnelDefinition tunnelDefinition : sshTunnelHelper.getTunnelDefinitions()) {
                InetSocketAddress local = new InetSocketAddress(tunnelDefinition.bindAddress, tunnelDefinition.localPort);
                InetSocketAddress remote = new InetSocketAddress(tunnelDefinition.remoteHost, tunnelDefinition.remotePort);
                String mappingKey = remote.toString();
                LOGGER.info("Registering Cassandra Driver AddressTranslation mapping with key: '" + mappingKey + "'");
                translationMappings.put(mappingKey, local);
            }
        }
        // Note: The result of InetAddress.toString() has a leading "/"        String keyToMatch = inetSocketAddress.toString();
        LOGGER.info("Cassandra driver is attempting to establish a connection to: '" + keyToMatch + "'");
        InetSocketAddress matchingAddressTranslation = translationMappings.get(keyToMatch);
        if (matchingAddressTranslation != null) {
            LOGGER.info("Matched address translation from config properties for: " + inetSocketAddress.getAddress().toString());
            return matchingAddressTranslation;
        } else {
            LOGGER.info("Retaining unmatched InetSocketAddress: " + inetSocketAddress.toString());
            return inetSocketAddress;
        }
    }
};

The connection to the Cassandra cluster can then be established with the AddressTranslater...
Note: Even if the Cluster object is built with an AddressTranslater, the initial contact point must be manually translated first:
InetSocketAddress initialContactPoint = new InetSocketAddress("cassandra-cluster-node1", 9042);
InetSocketAddress initialContactPointTranslated = addressTranslaterWrapper.translate(initialContactPoint);
LOGGER.debug("Initial contact point (translated): " + initialContactPointTranslated.toString());
Set<InetSocketAddress> initialContactPoints = new HashSet<>();
initialContactPoints.add(initialContactPointTranslated);
final Cluster cluster = Cluster.builder().withAddressTranslater(addressTranslaterWrapper).addContactPointsWithPorts(initialContactPoints).build();
final Session session = cluster.connect("mykeyspace");

1 comment:

Unknown said...

Hiya Whirly. Thanks for this. I'm about to try your solution now.

First though, I found something simpler which should work and is simpler but doesn't. On my Mac, I created local ips to mirror the remote ones that I'll want to connect to on the jump server, and then port forwarded to them.

e.g.
# Create local ips that mirror the remote ones.
sudo ifconfig lo0 alias xxx.yyy.zzz.140 up
sudo ifconfig lo0 alias xxx.yyy.zzz.172 up
sudo ifconfig lo0 alias xxx.yyy.zzz.6 up

# Now setup an ssh tunnel from the local ips to the ips on the jump server.
ssh -v -D -C -N @>jumpHost> \
-Lxxx.yyy.zzz.140:9042:xxx.yyy.zzz.140:9042 \
-xxx.yyy.zzz.172:9042:xxx.yyy.zzz.172:9042 \
-Lxxx.yyy.zzz.6:9042:xxx.yyy.zzz.6:9042

This avoids needing to map hosts + ports to the real thing and may make your solution simpler.

Using these connections from Datastax Studio. I can connect and run queries. However, quite oddly running though the Datastax Java Driver fails though with the exception below. Your thoughts are very welcome. I'll likely file a bug/issue with Datastax on this today.

com.datastax.driver.core.exceptions.ConnectionException: [/xxx.yyy.zzz.6:9042] Pool was closed during initialization
at com.datastax.driver.core.HostConnectionPool$2.onSuccess(HostConnectionPool.java:149)
at com.datastax.driver.core.HostConnectionPool$2.onSuccess(HostConnectionPool.java:135)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:310)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
....blah blah blah.....
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:518)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:490)
at io.netty.util.concurrent.DefaultPromise.notifyListenersWithStackOverFlowProtection(DefaultPromise.java:431)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:126)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:222)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:394)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:748)