Skip to main content

A Vert.x client allowing applications to interact with an Apache Cassandra service.

Warning
This module has Tech Preview status, this means the API can change between versions.

Getting started

To use this module, add the following to the dependencies section of your Maven POM file:

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-lang-ruby</artifactId>
 <version>3.7.0</version>
</dependency>

Or, if you use Gradle:

compile 'io.vertx:vertx-lang-ruby:3.7.0'
Warning
The Cassandra client is not compatible with the Vert.x Dropwizard Metrics library. Both are using a different major version of the Dropwizard Metrics library and the Datastax Java driver won’t upgrade to the most recent version due to the drop of Java 7. The next major version (4.x) of the driver will use a more recent Dropwizard Metrics library version.

Creating a client

Client options

Cassandra is a distributed system, and it can have many nodes. To connect to Cassandra you need to specify the addresses of some cluster nodes when creating a CassandraClientOptions object:

require 'vertx-cassandra/cassandra_client'
options = {
  'contactPoints' => [
    "node1.address",
    "node2.address",
    "node3.address"
  ]
}
client = VertxCassandra::CassandraClient.create_non_shared(vertx, options)

By default, the Cassandra client for Vert.x connects to the local machine’s port 9042 and is not tied to any specific keyspace. But you can set either or both of these options:

require 'vertx-cassandra/cassandra_client'
options = {
  'port' => 9142,
  'keyspace' => "my_keyspace"
}
client = VertxCassandra::CassandraClient.create_non_shared(vertx, options)
Tip
For fine tuning purposes, CassandraClientOptions exposes a com.datastax.driver.core.Cluster.Builder instance.

Shared clients

If you deploy multiple instances of your verticle or have different verticles interacting with the same database, it is recommended to create a shared client:

require 'vertx-cassandra/cassandra_client'
options = {
  'contactPoints' => [
    "node1.address",
    "node2.address",
    "node3.address"
  ],
  'keyspace' => "my_keyspace"
}
client = VertxCassandra::CassandraClient.create_shared(vertx, "sharedClientName", options)

Shared clients with the same name will use a single underlying com.datastax.driver.core.Session.

Client lifecycle

After the client is created, it is not connected until the first query is executed.

Tip
A shared client can be connected after creation if another client with the same name has already executed a query.

Clients created inside a verticle are automatically stopped when the verticle is undeployed. In other words, you do not need to invoke close in the verticle stop method.

In all other cases, you must manually close the client.

Note
When a shared client is closed, the driver dession is not closed if other clients with the same name are still running.

Using the API

The client API is represented by CassandraClient.

Querying

You can get query results using three different ways.

Streaming

The streaming API is most appropriate when you need to consume results iteratively, e.g you want to process each item. This is very efficient specially for large amount of rows.

In order to give you some inspiration and ideas on how you can use the API, we’d like to you to consider this example:

cassandraClient.query_stream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'") { |queryStream_err,queryStream|
  if (queryStream_err == nil)
    stream = queryStream

    # resume stream when queue is ready to accept buffers again
    response.drain_handler() { |v|
      stream.resume()
    }

    stream.handler() { |row|
      value = row.get_string("my_string_col")
      response.write(value)

      # pause row stream when we buffer queue is full
      if (response.write_queue_full?())
        stream.pause()
      end
    }

    # end request when we reached end of the stream
    stream.end_handler() { |end|
      response.end()
    }

  else
    queryStream_err.print_stack_trace()
    # response with internal server error if we are not able to execute given query
    response.set_status_code(500).end("Unable to execute the query")
  end
}

In the example, we are executing a query, and stream results via HTTP.

Bulk fetching

This API should be used when you need to process all the rows at the same time.

cassandraClient.execute_with_full_fetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'") { |executeWithFullFetch_err,executeWithFullFetch|
  if (executeWithFullFetch_err == nil)
    rows = executeWithFullFetch
    rows.each do |row|
      # handle each row here
    end
  else
    puts "Unable to execute the query"
    executeWithFullFetch_err.print_stack_trace()
  end
}
Caution
Use bulk fetching only if you can afford to load the full result set in memory.

Collector queries

You can use Java collectors with the query API:

Code not translatable

Low level fetch

This API provides greater control over loading at the expense of being a bit lower-level than the streaming and bulk fetching APIs.

cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'") { |execute_err,execute|
  if (execute_err == nil)
    resultSet = execute

    resultSet.one() { |one_err,one|
      if (one_err == nil)
        row = one
        puts "One row successfully fetched"
      else
        puts "Unable to fetch a row"
        one_err.print_stack_trace()
      end
    }

    resultSet.fetch_more_results() { |fetchMoreResults_err,fetchMoreResults|
      if (fetchMoreResults_err == nil)
        availableWithoutFetching = resultSet.get_available_without_fetching()
        puts "Now we have #{availableWithoutFetching} rows fetched, but not consumed!"
        if (resultSet.fully_fetched?())
          puts "The result is fully fetched, we don't need to call this method for one more time!"
        else
          puts "The result still does not fully fetched"
        end
      else
        puts "Unable to fetch more results"
        fetchMoreResults_err.print_stack_trace()
      end
    }

  else
    puts "Unable to execute the query"
    execute_err.print_stack_trace()
  end
}

Prepared queries

For security and efficiency reasons, it is a good idea to use prepared statements for all the queries you are using more than once.

You can prepare a query:

cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ") { |preparedStatementResult_err,preparedStatementResult|
  if (preparedStatementResult_err == nil)
    puts "The query has successfully been prepared"
    preparedStatement = preparedStatementResult
    # now you can use this PreparedStatement object for the next queries
  else
    puts "Unable to prepare the query"
    preparedStatementResult_err.print_stack_trace()
  end
}

And then use the PreparedStatement for all the next queries:

# You can execute you prepared statement using any way to execute queries.

# Low level fetch API
cassandraClient.execute(preparedStatement.bind("my_value")) { |done_err,done|
  results = done
  # handle results here
}

# Bulk fetching API
cassandraClient.execute_with_full_fetch(preparedStatement.bind("my_value")) { |done_err,done|
  results = done
  # handle results here
}

# Streaming API
cassandraClient.query_stream(preparedStatement.bind("my_value")) { |done_err,done|
  results = done
  # handle results here
}

Batching

In case you’d like to execute several queries at once, you can use BatchStatement for that:

batchStatement = Java::ComDatastaxDriverCore::BatchStatement.new().add(Java::ComDatastaxDriverCore::SimpleStatement.new("INSERT INTO NAMES (name) VALUES ('Pavel')")).add(Java::ComDatastaxDriverCore::SimpleStatement.new("INSERT INTO NAMES (name) VALUES ('Thomas')")).add(Java::ComDatastaxDriverCore::SimpleStatement.new("INSERT INTO NAMES (name) VALUES ('Julien')"))

cassandraClient.execute(batchStatement) { |result_err,result|
  if (result_err == nil)
    puts "The given batch executed successfully"
  else
    puts "Unable to execute the batch"
    result_err.print_stack_trace()
  end
}

Object Mapper

You can use the object Mapper to map between domain classes and queries.

First, add the following to the dependencies section of your Maven POM file:

<dependency>
 <groupId>com.datastax.cassandra</groupId>
 <artifactId>cassandra-driver-mapping</artifactId>
 <version>${datastax.driver.version}</version>
</dependency>

Or, if you use Gradle:

compile 'com.datastax.cassandra:cassandra-driver-mapping:${datastax.driver.version}'

Consider the following entity:

@Table(keyspace = "test", name = "names")
public class MappedClass {
 @PartitionKey
 private String name;

 public MappedClass(String name) {
   this.name = name;
 }

 MappedClass() {
   // Required for mapping
 }

 // getters / setters
}

Create a mapper for it and you may save, retrieve or delete data from the corresponding table:

require 'vertx-cassandra/mapping_manager'
mappingManager = VertxCassandra::MappingManager.create(cassandraClient)
mapper = mappingManager.mapper(Java::ExamplesCassandraClientExamples::MappedClass::class)

value = Java::ExamplesCassandraClientExamples::MappedClass.new("foo")

mapper.save(value) { |handler_err,handler|
  # Entity saved
}

mapper.get(Java::JavaUtil::Collections.singleton_list("foo")) { |handler_err,handler|
  # Entity loaded
}

mapper.delete(Java::JavaUtil::Collections.singleton_list("foo")) { |handler_err,handler|
  # Entity deleted
}
Tip
It is safe to reuse mapping manager and mapper instances for a given Cassandra client.