Monday, June 19, 2017

Connecting to Cassandra Cluster via SSH Tunnels with the DataStax Java Client/Driver

Introduction

This is probably a little obscure, but if you have only one choice for connecting into a remote environment, like AWS, and that happens to be an SSH connection with tunnels to a "jump box", and you need to connect to a Cassandra cluster using the DataStax driver, I suspect that's why you found this, so read on.

The problem is...

DataStax wrote their Java driver to use Netty instead of using the core network connection classes in a typical Java virtual machine.  Netty is written to use Java's NIO API.  NIO does not recognize the JVM-wide settings like socksProxyHost, so it always attempts to make a direct connection to whatever host/port the Java code says.

The other part of the problem is...

Connecting the DataStax client/driver to one node of a Cassandra cluster results in a handshake that retrieves network information for the other nodes in the cluster and tries to open additional connections.  If the primary connection is established via an SSH tunnel, the network information for the rest of the cluster nodes is likely to still be routable only within the remote environment.  That doesn't work even if you created additional SSH tunnels.

The solution (in a nutshell)...

Create tunnels for all of the cluster nodes, and register an instance of the DataStax AddressTranslater when the connection to Cassandra is opened.

The solution (details)...

The JSCH library makes it somewhat easy to open an SSH connection with tunnels.

Assuming tunnelDefinitions is a collection of simple TunnelDefinition POJOs to contain a set of attributes for a local- to-remote host/port mappings.
A three node cluster might have mappings with bindAddress:localPort:remoteHost:remotePort like:

  • 127.0.0.1:19042:cassandra-cluster-node1:9042
  • 127.0.0.1:29042:cassandra-cluster-node2:9042
  • 127.0.0.1:39042:cassandra-cluster-node3:9042
public void connect(String jumpUserName, String sshPrivateKeyFilePath, String jumpHost, int jumpPort) {
    this.jumpHost = jumpHost;
    this.jumpPort = jumpPort;
    jsch = new JSch();
    try {
        LOGGER.info("Using SSH PK identity file: " + sshPrivateKeyFilePath);
        // Point to the PK file for authentication        jsch.addIdentity(sshPrivateKeyFilePath);
        LOGGER.info("Opening SSH Session to Jumpbox: " + jumpHost + ":" + jumpPort + " with username " + jumpUserName);
        session=jsch.getSession(jumpUserName, jumpHost, jumpPort);
        Properties config = new java.util.Properties();
        config.put("StrictHostKeyChecking", "no");
        session.setConfig(config);
        session.connect();
        for (TunnelDefinition tunnelDefinition : tunnelDefinitions) {
            // Note: Each call to "set" is actually an "add".
            // Note: The bind addresses are typically localhost or 127.0.0.1.
            session.setPortForwardingL(tunnelDefinition.bindAddress, 
                tunnelDefinition.localPort, tunnelDefinition.remoteHost, 
                tunnelDefinition.remotePort);
        }
    } catch (JSchException e) {
        e.printStackTrace();
    }
}

Then, using the same tunnelDefinitions to implement DataStax AddressTranslater...
AddressTranslater customAddressTranslater = new AddressTranslater() {
    private SshTunnelHelper sshTunnelHelperRef = sshTunnelHelper;
    private Map<String, InetSocketAddress> translationMappings = new HashMap<>();

    @Override    public InetSocketAddress translate(InetSocketAddress inetSocketAddress) {
        // Lazy Load        if (translationMappings.isEmpty()) {
            for (SshTunnelHelper.TunnelDefinition tunnelDefinition : sshTunnelHelper.getTunnelDefinitions()) {
                InetSocketAddress local = new InetSocketAddress(tunnelDefinition.bindAddress, tunnelDefinition.localPort);
                InetSocketAddress remote = new InetSocketAddress(tunnelDefinition.remoteHost, tunnelDefinition.remotePort);
                String mappingKey = remote.toString();
                LOGGER.info("Registering Cassandra Driver AddressTranslation mapping with key: '" + mappingKey + "'");
                translationMappings.put(mappingKey, local);
            }
        }
        // Note: The result of InetAddress.toString() has a leading "/"        String keyToMatch = inetSocketAddress.toString();
        LOGGER.info("Cassandra driver is attempting to establish a connection to: '" + keyToMatch + "'");
        InetSocketAddress matchingAddressTranslation = translationMappings.get(keyToMatch);
        if (matchingAddressTranslation != null) {
            LOGGER.info("Matched address translation from config properties for: " + inetSocketAddress.getAddress().toString());
            return matchingAddressTranslation;
        } else {
            LOGGER.info("Retaining unmatched InetSocketAddress: " + inetSocketAddress.toString());
            return inetSocketAddress;
        }
    }
};

The connection to the Cassandra cluster can then be established with the AddressTranslater...
Note: Even if the Cluster object is built with an AddressTranslater, the initial contact point must be manually translated first:
InetSocketAddress initialContactPoint = new InetSocketAddress("cassandra-cluster-node1", 9042);
InetSocketAddress initialContactPointTranslated = addressTranslaterWrapper.translate(initialContactPoint);
LOGGER.debug("Initial contact point (translated): " + initialContactPointTranslated.toString());
Set<InetSocketAddress> initialContactPoints = new HashSet<>();
initialContactPoints.add(initialContactPointTranslated);
final Cluster cluster = Cluster.builder().withAddressTranslater(addressTranslaterWrapper).addContactPointsWithPorts(initialContactPoints).build();
final Session session = cluster.connect("mykeyspace");