<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-sql-common</artifactId>
<version>3.9.16</version>
</dependency>
Vert.x Common SQL interface
The common SQL interface is used to interact with Vert.x SQL services.
You obtain a connection to the database via the service interface for the specific SQL service that you are using (e.g. JDBC/MySQL/PostgreSQL).
To use this project, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-sql-common:3.9.16'
Simple SQL Operations
There are times when you will want to run a single SQL operation, e.g.: a single select of a row, or a update to a set of rows which do not require to be part of a transaction or have dependencies on the previous or next operation.
For these cases, clients provide a boilerplate-less API SQLOperations
. This interface will perform the following steps for you:
-
acquire a connection from the connection pool
-
perform your action
-
close and return the connection to the connection pool
An example where users get loaded from the USERS
table could be:
client.query("SELECT * FROM USERS", ar -> {
if (ar.succeeded()) {
if (ar.succeeded()) {
ResultSet result = ar.result();
} else {
// Failed!
}
// NOTE that you don't need to worry about
// the connection management (e.g.: close)
}
});
You can perform the following operations as a simple one "shot" method call:
For further details on these API please refer to the SQLOperations
interface.
The SQL Connection
A connection to the database is represented by SQLConnection
.
Auto-commit
When you obtain a connection auto commit is set to true
. This means that each operation you perform will effectively execute in its own transaction.
If you wish to perform multiple operations in a single transaction you should set auto commit to false with setAutoCommit
.
When the operation is complete, the handler will be called:
connection.setAutoCommit(false, res -> {
if (res.succeeded()) {
// OK!
} else {
// Failed!
}
});
Executing queries
To execute a query use query
The query string is raw SQL that is passed through without changes to the actual database.
The handler will be called with the results, represented by ResultSet
when the query has been run.
connection.query("SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE", res -> {
if (res.succeeded()) {
// Get the result set
ResultSet resultSet = res.result();
} else {
// Failed!
}
});
The ResultSet
instance represents the results of a query.
The list of column names are available with getColumnNames
, and the actual results available with getResults
The results are a list of JsonArray
instances, one for each row of the results.
List<String> columnNames = resultSet.getColumnNames();
List<JsonArray> results = resultSet.getResults();
for (JsonArray row : results) {
String id = row.getString(0);
String fName = row.getString(1);
String lName = row.getString(2);
int shoeSize = row.getInteger(3);
}
You can also retrieve the rows as a list of Json object instances with getRows
- this can give you a somewhat simpler API to work with, but please be aware that SQL results can contain duplicate column names - if that’s the case you should use getResults
instead.
Here’s an example of iterating through the results as Json object instances:
List<JsonObject> rows = resultSet.getRows();
for (JsonObject row : rows) {
String id = row.getString("ID");
String fName = row.getString("FNAME");
String lName = row.getString("LNAME");
int shoeSize = row.getInteger("SHOE_SIZE");
}
Prepared statement queries
To execute a prepared statement query you can use queryWithParams
.
This takes the query, containing the parameter place holders, and a JsonArray
or parameter values.
String query = "SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE WHERE LNAME=? AND SHOE_SIZE > ?";
JsonArray params = new JsonArray().add("Fox").add(9);
connection.queryWithParams(query, params, res -> {
if (res.succeeded()) {
// Get the result set
ResultSet resultSet = res.result();
} else {
// Failed!
}
});
Executing INSERT, UPDATE or DELETE
To execute an operation which updates the database use update
.
The update string is raw SQL that is passed through without changes to the actual database.
The handler will be called with the results, represented by UpdateResult
when the update has been run.
The update result holds the number of rows updated with getUpdated
, and if the update generated keys, they are available with getKeys
.
connection.update("INSERT INTO PEOPLE VALUES (null, 'john', 'smith', 9)", res -> {
if (res.succeeded()) {
UpdateResult result = res.result();
System.out.println("Updated no. of rows: " + result.getUpdated());
System.out.println("Generated keys: " + result.getKeys());
} else {
// Failed!
}
});
Prepared statement updates
To execute a prepared statement update you can use updateWithParams
.
This takes the update, containing the parameter place holders, and a JsonArray
or parameter values.
String update = "UPDATE PEOPLE SET SHOE_SIZE = 10 WHERE LNAME=?";
JsonArray params = new JsonArray().add("Fox");
connection.updateWithParams(update, params, res -> {
if (res.succeeded()) {
UpdateResult updateResult = res.result();
System.out.println("No. of rows updated: " + updateResult.getUpdated());
} else {
// Failed!
}
});
Callable statements
To execute a callable statement (either SQL functions or SQL procedures) you can use callWithParams
.
This takes the callable statement using the standard JDBC format { call func_proc_name() }
, optionally including parameter place holders e.g.: { call func_proc_name(?, ?) }
, a JsonArray
containing the parameter values and finally a JsonArray
containing the output types e.g.: [null, 'VARCHAR']
.
Note that the index of the output type is as important as the params array. If the return value is the second argument then the output array must contain a null value as the first element.
A SQL function returns some output using the return
keyword, and in this case one can call it like this:
String func = "{ call one_hour_ago() }";
connection.call(func, res -> {
if (res.succeeded()) {
ResultSet result = res.result();
} else {
// Failed!
}
});
When working with Procedures you and still return values from your procedures via its arguments, in the case you do not return anything the usage is as follows:
String func = "{ call new_customer(?, ?) }";
connection.callWithParams(func, new JsonArray().add("John").add("Doe"), null, res -> {
if (res.succeeded()) {
// Success!
} else {
// Failed!
}
});
However you can also return values like this:
String func = "{ call customer_lastname(?, ?) }";
connection.callWithParams(func, new JsonArray().add("John"), new JsonArray().addNull().add("VARCHAR"), res -> {
if (res.succeeded()) {
ResultSet result = res.result();
} else {
// Failed!
}
});
Note that the index of the arguments matches the index of the ?
and that the output parameters expect to be a String describing the type you want to receive.
To avoid ambiguation the implementations are expected to follow the following rules:
-
When a place holder in the
IN
array isNOT NULL
it will be taken -
When the
IN
value is NULL a check is performed on the OUT When theOUT
value is not null it will be registered as a output parameter When theOUT
is also null it is expected that the IN value is theNULL
value.
The registered OUT
parameters will be available as an array in the result set under the output property.
Batch operations
The SQL common interface also defines how to execute batch operations. There are 3 types of batch operations:
-
Batched statements
batch
-
Batched prepared statements
batchWithParams
-
Batched callable statements
batchCallableWithParams
A batches statement will exeucte a list of sql statements as for example:
List<String> batch = new ArrayList<>();
batch.add("INSERT INTO emp (NAME) VALUES ('JOE')");
batch.add("INSERT INTO emp (NAME) VALUES ('JANE')");
connection.batch(batch, res -> {
if (res.succeeded()) {
List<Integer> result = res.result();
} else {
// Failed!
}
});
While a prepared or callable statement batch will reuse the sql statement and take an list of arguments as for example:
List<JsonArray> batch = new ArrayList<>();
batch.add(new JsonArray().add("joe"));
batch.add(new JsonArray().add("jane"));
connection.batchWithParams("INSERT INTO emp (name) VALUES (?)", batch, res -> {
if (res.succeeded()) {
List<Integer> result = res.result();
} else {
// Failed!
}
});
Executing other operations
To execute any other database operation, e.g. a CREATE TABLE
you can use execute
.
The string is passed through without changes to the actual database. The handler is called when the operation is complete
String sql = "CREATE TABLE PEOPLE (ID int generated by default as identity (start with 1 increment by 1) not null," +
"FNAME varchar(255), LNAME varchar(255), SHOE_SIZE int);";
connection.execute(sql, execute -> {
if (execute.succeeded()) {
System.out.println("Table created !");
} else {
// Failed!
}
});
Multiple ResultSet responses
In some cases your query might return more than one result set, in this case and to preserve the compatibility when the returned result set object is converted to pure json, the next result sets are chained to the current result set under the property next
. A simple walk of all result sets can be achieved like this:
while (rs != null) {
// do something with the result set...
// next step
rs = rs.getNext();
}
Streaming
When dealing with large data sets, it is not advised to use API just described but to stream data since it avoids inflating the whole response into memory and JSON and data is just processed on a row by row basis, for example:
connection.queryStream("SELECT * FROM large_table", stream -> {
if (stream.succeeded()) {
stream.result().handler(row -> {
// do something with the row...
});
}
});
You still have full control on when the stream is pauses, resumed and ended. For cases where your query returns multiple result sets you should use the result set ended event to fetch the next one if available. If there is more data the stream handler will receive the new data, otherwise the end handler is invoked.
connection.queryStream("SELECT * FROM large_table; SELECT * FROM other_table", stream -> {
if (stream.succeeded()) {
SQLRowStream sqlRowStream = stream.result();
sqlRowStream
.resultSetClosedHandler(v -> {
// will ask to restart the stream with the new result set if any
sqlRowStream.moreResults();
})
.handler(row -> {
// do something with the row...
})
.endHandler(v -> {
// no more data available...
});
}
});
Using transactions
To use transactions first set auto-commit to false with setAutoCommit
.
You then do your transactional operations and when you want to commit or rollback use commit
or rollback
.
Once the commit/rollback is complete the handler will be called and the next transaction will be automatically started.
connection.commit(res -> {
if (res.succeeded()) {
// Committed OK!
} else {
// Failed!
}
});
Closing connections
When you’ve done with the connection you should return it to the pool with close
.
RxJava 2 API
SQL Transaction management with RxJava
Managing SQL transactions manually with the Rxified SQLConnection
requires quite a lot of boilerplate code.
Vert.x provides observable transformers that you can apply to your flows with compose
to make them transactional:
-
SQLClientHelper#txFlowableTransformer
-
SQLClientHelper#txObservableTransformer
-
SQLClientHelper#txSingleTransformer
-
SQLClientHelper#txMaybeTransformer
-
SQLClientHelper#txCompletableTransformer
These transformers wrap the corresponding source of events with SQL transaction management. Let’s take an example.
In your music library application, you need to insert a row in table albums
, then some rows in table tracks
. These two steps shall be part of the same atomic transaction. If it succeeds, the application must return results from a query involving both tables.
After your got an instance of io.vertx.reactivex.ext.sql.SQLConnection
, you can use it to perform the SQL operations:
conn.rxExecute("... insert into album ...")
.andThen(conn.rxExecute("... insert into tracks ..."))
.compose(SQLClientHelper.txCompletableTransformer(conn)) (1)
.andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults))
.subscribe(rows -> {
// send to client
}, throwable -> {
// handle error
});
1 | Transaction management appplied to the Completable source |
Source transformers provide maximum flexibility: you are still able to execute operations with the connection after the transaction completes.
But more often than not, you do not need the connection after the changes are commited or rollbacked. In this case, you may simply create you source observable with one of the transactional helper methods in io.vertx.reactivex.ext.sql.SQLClientHelper
.
Let’s rewrite the previous example:
SQLClientHelper.inTransactionSingle(sqlClient, conn -> {
return conn.rxExecute("... insert into album ...")
.andThen(conn.rxExecute("... insert into tracks ..."))
.andThen(conn.rxQuery("... select from album, tracks ...").map(ResultSet::getResults)); (1)
}).subscribe(rows -> {
// send to client
}, throwable -> {
// handle error
});
1 | the SELECT query is now part of the transaction |