Vert.x Common SQL interface

The common SQL interface is used to interact with Vert.x SQL services.

You obtain a connection to the database via the service interface for the specific SQL service that you are using (e.g. JDBC/MySQL/PostgreSQL).

To use this project, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-sql-common</artifactId>
 <version>3.9.16</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-sql-common:3.9.16'

Simple SQL Operations

There are times when you will want to run a single SQL operation, e.g.: a single select of a row, or a update to a set of rows which do not require to be part of a transaction or have dependencies on the previous or next operation.

For these cases, clients provide a boilerplate-less API SQLOperations. This interface will perform the following steps for you:

  1. acquire a connection from the connection pool

  2. perform your action

  3. close and return the connection to the connection pool

An example where users get loaded from the USERS table could be:

client.query("SELECT * FROM USERS", ar -> {
  if (ar.succeeded()) {
    if (ar.succeeded()) {
      ResultSet result = ar.result();
    } else {
      // Failed!
    }
    // NOTE that you don't need to worry about
    // the connection management (e.g.: close)
  }
});

You can perform the following operations as a simple one "shot" method call:

For further details on these API please refer to the SQLOperations interface.

The SQL Connection

A connection to the database is represented by SQLConnection.

Auto-commit

When you obtain a connection auto commit is set to true. This means that each operation you perform will effectively execute in its own transaction.

If you wish to perform multiple operations in a single transaction you should set auto commit to false with setAutoCommit.

When the operation is complete, the handler will be called:

connection.setAutoCommit(false, res -> {
  if (res.succeeded()) {
    // OK!
  } else {
    // Failed!
  }
});

Executing queries

To execute a query use query

The query string is raw SQL that is passed through without changes to the actual database.

The handler will be called with the results, represented by ResultSet when the query has been run.

connection.query("SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE", res -> {
  if (res.succeeded()) {
    // Get the result set
    ResultSet resultSet = res.result();
  } else {
    // Failed!
  }
});

The ResultSet instance represents the results of a query.

The list of column names are available with getColumnNames, and the actual results available with getResults

The results are a list of JsonArray instances, one for each row of the results.

List<String> columnNames = resultSet.getColumnNames();

List<JsonArray> results = resultSet.getResults();

for (JsonArray row : results) {

  String id = row.getString(0);
  String fName = row.getString(1);
  String lName = row.getString(2);
  int shoeSize = row.getInteger(3);

}

You can also retrieve the rows as a list of Json object instances with getRows - this can give you a somewhat simpler API to work with, but please be aware that SQL results can contain duplicate column names - if that’s the case you should use getResults instead.

Here’s an example of iterating through the results as Json object instances:

List<JsonObject> rows = resultSet.getRows();

for (JsonObject row : rows) {

  String id = row.getString("ID");
  String fName = row.getString("FNAME");
  String lName = row.getString("LNAME");
  int shoeSize = row.getInteger("SHOE_SIZE");

}

Prepared statement queries

To execute a prepared statement query you can use queryWithParams.

This takes the query, containing the parameter place holders, and a JsonArray or parameter values.

String query = "SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE WHERE LNAME=? AND SHOE_SIZE > ?";
JsonArray params = new JsonArray().add("Fox").add(9);

connection.queryWithParams(query, params, res -> {

  if (res.succeeded()) {
    // Get the result set
    ResultSet resultSet = res.result();
  } else {
    // Failed!
  }
});

Executing INSERT, UPDATE or DELETE

To execute an operation which updates the database use update.

The update string is raw SQL that is passed through without changes to the actual database.

The handler will be called with the results, represented by UpdateResult when the update has been run.

The update result holds the number of rows updated with getUpdated, and if the update generated keys, they are available with getKeys.

connection.update("INSERT INTO PEOPLE VALUES (null, 'john', 'smith', 9)", res -> {
  if (res.succeeded()) {

    UpdateResult result = res.result();
    System.out.println("Updated no. of rows: " + result.getUpdated());
    System.out.println("Generated keys: " + result.getKeys());

  } else {
    // Failed!
  }
});

Prepared statement updates

To execute a prepared statement update you can use updateWithParams.

This takes the update, containing the parameter place holders, and a JsonArray or parameter values.

String update = "UPDATE PEOPLE SET SHOE_SIZE = 10 WHERE LNAME=?";
JsonArray params = new JsonArray().add("Fox");

connection.updateWithParams(update, params, res -> {

  if (res.succeeded()) {

    UpdateResult updateResult = res.result();

    System.out.println("No. of rows updated: " + updateResult.getUpdated());

  } else {

    // Failed!

  }
});

Callable statements

To execute a callable statement (either SQL functions or SQL procedures) you can use callWithParams.

This takes the callable statement using the standard JDBC format { call func_proc_name() }, optionally including parameter place holders e.g.: { call func_proc_name(?, ?) }, a JsonArray containing the parameter values and finally a JsonArray containing the output types e.g.: [null, 'VARCHAR'].

Note that the index of the output type is as important as the params array. If the return value is the second argument then the output array must contain a null value as the first element.

A SQL function returns some output using the return keyword, and in this case one can call it like this:

String func = "{ call one_hour_ago() }";

connection.call(func, res -> {

  if (res.succeeded()) {
    ResultSet result = res.result();
  } else {
    // Failed!
  }
});

When working with Procedures you and still return values from your procedures via its arguments, in the case you do not return anything the usage is as follows:

String func = "{ call new_customer(?, ?) }";

connection.callWithParams(func, new JsonArray().add("John").add("Doe"), null, res -> {

  if (res.succeeded()) {
    // Success!
  } else {
    // Failed!
  }
});

However you can also return values like this:

String func = "{ call customer_lastname(?, ?) }";

connection.callWithParams(func, new JsonArray().add("John"), new JsonArray().addNull().add("VARCHAR"), res -> {

  if (res.succeeded()) {
    ResultSet result = res.result();
  } else {
    // Failed!
  }
});

Note that the index of the arguments matches the index of the ? and that the output parameters expect to be a String describing the type you want to receive.

To avoid ambiguation the implementations are expected to follow the following rules:

  • When a place holder in the IN array is NOT NULL it will be taken

  • When the IN value is NULL a check is performed on the OUT When the OUT value is not null it will be registered as a output parameter When the OUT is also null it is expected that the IN value is the NULL value.

The registered OUT parameters will be available as an array in the result set under the output property.

Batch operations

The SQL common interface also defines how to execute batch operations. There are 3 types of batch operations:

A batches statement will exeucte a list of sql statements as for example:

List<String> batch = new ArrayList<>();
batch.add("INSERT INTO emp (NAME) VALUES ('JOE')");
batch.add("INSERT INTO emp (NAME) VALUES ('JANE')");

connection.batch(batch, res -> {
  if (res.succeeded()) {
    List<Integer> result = res.result();
  } else {
    // Failed!
  }
});

While a prepared or callable statement batch will reuse the sql statement and take an list of arguments as for example:

List<JsonArray> batch = new ArrayList<>();
batch.add(new JsonArray().add("joe"));
batch.add(new JsonArray().add("jane"));

connection.batchWithParams("INSERT INTO emp (name) VALUES (?)", batch, res -> {
  if (res.succeeded()) {
    List<Integer> result = res.result();
  } else {
    // Failed!
  }
});

Executing other operations

To execute any other database operation, e.g. a CREATE TABLE you can use execute.

The string is passed through without changes to the actual database. The handler is called when the operation is complete

String sql = "CREATE TABLE PEOPLE (ID int generated by default as identity (start with 1 increment by 1) not null," +
  "FNAME varchar(255), LNAME varchar(255), SHOE_SIZE int);";

connection.execute(sql, execute -> {
  if (execute.succeeded()) {
    System.out.println("Table created !");
  } else {
    // Failed!
  }
});

Multiple ResultSet responses

In some cases your query might return more than one result set, in this case and to preserve the compatibility when the returned result set object is converted to pure json, the next result sets are chained to the current result set under the property next. A simple walk of all result sets can be achieved like this:

while (rs != null) {
  // do something with the result set...

  // next step
  rs = rs.getNext();
}

Streaming

When dealing with large data sets, it is not advised to use API just described but to stream data since it avoids inflating the whole response into memory and JSON and data is just processed on a row by row basis, for example:

connection.queryStream("SELECT * FROM large_table", stream -> {
  if (stream.succeeded()) {
    stream.result().handler(row -> {
      // do something with the row...
    });
  }
});

You still have full control on when the stream is pauses, resumed and ended. For cases where your query returns multiple result sets you should use the result set ended event to fetch the next one if available. If there is more data the stream handler will receive the new data, otherwise the end handler is invoked.

connection.queryStream("SELECT * FROM large_table; SELECT * FROM other_table", stream -> {
  if (stream.succeeded()) {
    SQLRowStream sqlRowStream = stream.result();

    sqlRowStream
      .resultSetClosedHandler(v -> {
        // will ask to restart the stream with the new result set if any
        sqlRowStream.moreResults();
      })
      .handler(row -> {
        // do something with the row...
      })
      .endHandler(v -> {
        // no more data available...
      });
  }
});

Using transactions

To use transactions first set auto-commit to false with setAutoCommit.

You then do your transactional operations and when you want to commit or rollback use commit or rollback.

Once the commit/rollback is complete the handler will be called and the next transaction will be automatically started.

connection.commit(res -> {
  if (res.succeeded()) {
    // Committed OK!
  } else {
    // Failed!
  }
});

Closing connections

When you’ve done with the connection you should return it to the pool with close.

RxJava 2 API

SQL Transaction management with RxJava

Managing SQL transactions manually with the Rxified SQLConnection requires quite a lot of boilerplate code.

Vert.x provides observable transformers that you can apply to your flows with compose to make them transactional:

  • SQLClientHelper#txFlowableTransformer

  • SQLClientHelper#txObservableTransformer

  • SQLClientHelper#txSingleTransformer

  • SQLClientHelper#txMaybeTransformer

  • SQLClientHelper#txCompletableTransformer

These transformers wrap the corresponding source of events with SQL transaction management. Let’s take an example.

In your music library application, you need to insert a row in table albums, then some rows in table tracks. These two steps shall be part of the same atomic transaction. If it succeeds, the application must return results from a query involving both tables.

After your got an instance of io.vertx.reactivex.ext.sql.SQLConnection, you can use it to perform the SQL operations:

conn.rxExecute("... insert into album ...")
  .andThen(conn.rxExecute("... insert into tracks ..."))
  .compose(SQLClientHelper.txCompletableTransformer(conn)) (1)
  .andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults))
  .subscribe(rows -> {
    // send to client
  }, throwable -> {
    // handle error
  });
  1. Transaction management appplied to the Completable source

Source transformers provide maximum flexibility: you are still able to execute operations with the connection after the transaction completes.

But more often than not, you do not need the connection after the changes are commited or rollbacked. In this case, you may simply create you source observable with one of the transactional helper methods in io.vertx.reactivex.ext.sql.SQLClientHelper.

Let’s rewrite the previous example:

SQLClientHelper.inTransactionSingle(sqlClient, conn -> {
  return conn.rxExecute("... insert into album ...")
    .andThen(conn.rxExecute("... insert into tracks ..."))
    .andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults)); (1)
}).subscribe(rows -> {
  // send to client
}, throwable -> {
  // handle error
});
  1. the SELECT query is now part of the transaction