Reactive MySQL Client

The Reactive MySQL Client is a client for MySQL with a straightforward API focusing on scalability and low overhead.

The client is reactive and non-blocking, allowing to handle many database connections with a single thread.

Features

  • Event driven

  • Lightweight

  • Built-in connection pooling

  • Prepared queries caching

  • Cursor support

  • Row streaming

  • RxJava API

  • Direct memory to object without unnecessary copies

  • Complete data type support

  • Stored Procedures support

  • TLS/SSL support

  • Query pipelining

  • MySQL utilities commands support

  • Working with MySQL and MariaDB

  • Rich collation and charset support

  • Unix domain socket

Usage

To use the Reactive MySQL Client add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-mysql-client</artifactId>
 <version>4.5.11</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-mysql-client:4.5.11'
}

Getting started

Here is the simplest way to connect, query and disconnect

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
SqlClient client = MySQLBuilder
  .client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

// A simple query
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> result = ar.result();
      System.out.println("Got " + result.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }

    // Now close the pool
    client.close();
  });

Connecting to MySQL

Most of the time you will use a pool to connect to MySQL:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .build();

The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.

If you are running with Vert.x you can pass it your Vertx instance:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

You need to release the pool when you don’t need it anymore:

pool.close();

When you need to execute several operations on the same connection, you need to use a client connection.

You can easily get one from the pool:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Get a connection from the pool
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // All operations execute on the same connection
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

Command pipelining

In some use cases, command pipelining can improve database access performance.

You can configure the client to use pipelining

Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions.setPipeliningLimit(16))
  .using(vertx)
  .build();

The default pipelining limit is 1 which disables pipelining.

Do not enable pipelining if you use a proxy that does not support it. Otherwise, the proxy might close client connections abruptly.

Pool versus pooled client

The MySQLBuilder allows you to create a pool or a pooled client

connectOptions.setPipeliningLimit(64);
SqlClient client = MySQLBuilder.client()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Pipelined
Future<RowSet<Row>> res1 = client.query(sql).execute();

// Connection pool
Pool pool = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Not pipelined
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • pool operations are not pipelined, only connections acquired from the pool are pipelined

  • pooled client operations are pipelined, you cannot acquire a connection from a pooled client

Pool sharing

You can share an pool between multiple verticles or instances of the same verticle. Such pool should be created outside a verticle otherwise it will be closed when the verticle that created it is undeployed

Pool pool = MySQLBuilder.pool()
  .with(new PoolOptions().setMaxSize(maxSize))
  .connectingTo(database)
  .using(vertx)
  .build();
vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    // Use the pool
  }
}, new DeploymentOptions().setInstances(4));

You can also create a shared pool in each verticle:

vertx.deployVerticle(() -> new AbstractVerticle() {
  Pool pool;
  @Override
  public void start() {
    // Get or create a shared pool
    // this actually creates a lease to the pool
    // when the verticle is undeployed, the lease will be released automaticaly
    pool = MySQLBuilder.pool()
      .with(new PoolOptions()
        .setMaxSize(maxSize)
        .setShared(true)
        .setName("my-pool"))
      .using(vertx)
      .build();
  }
}, new DeploymentOptions().setInstances(4));

The first time a shared pool is created it will create the resources for the pool. Subsequent calls will reuse this pool and create a lease to this pool. The resources are disposed after all leases have been closed.

By default, a pool reuses the current event-loop when it needs to create a TCP connection. The shared pool will therefore randomly use event-loops of verticles using it.

You can assign a number of event loop a pool will use independently of the context using it

Pool pool = MySQLBuilder.pool()
  .with(new PoolOptions()
    .setMaxSize(maxSize)
    .setShared(true)
    .setName("my-pool")
    .setEventLoopSize(4))
  .using(vertx)
  .build();

Unix Domain Socket

Sometimes for simplicity, security or performance reasons, it is required to connect via a Unix Domain Socket.

Since the JVM does not support domain sockets, first you must add native transport extensions to your project.

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-transport-native-epoll</artifactId>
 <version>${netty.version}</version>
 <classifier>linux-x86_64</classifier>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
The native epoll support for ARM64 can also be added with the classifier linux-aarch64.
If there are Mac users in your team, add netty-transport-native-kqueue with the classifier osx-x86_64.

Then set the path to the domain socket in MySQLConnectOptions#setHost:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setHost("/var/run/mysqld/mysqld.sock")
  .setDatabase("the-db");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
Pool client = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
// vertxOptions.setPreferNativeTransport(true);
Pool client2 = MySQLBuilder.pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

More information about native transports can be found in the [Vert.x documentation](https://vertx.io/docs/vertx-core/java/#_native_transports).

Configuration

There are several alternatives for you to configure the client.

Data Object

A simple way to configure the client is to specify a MySQLConnectOptions data object.

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
Pool pool = MySQLBuilder
  .pool()
  .with(poolOptions)
  .connectingTo(connectOptions)
  .using(vertx)
  .build();

pool.getConnection()
  .onComplete(ar -> {
    // Handling your connection
  });

collations and character sets

The Reactive MySQL client supports configuring collations or character sets and map them to a correlative java.nio.charset.Charset. For example, you can specify charset for a connection like

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// set connection character set to utf8 instead of the default charset utf8mb4
connectOptions.setCharset("utf8");

The Reactive MySQL Client will take utf8mb4 as the default charset. String values like password and error messages are always decoded in UTF-8 charset.

characterEncoding option is used to determine which Java charset will be used to encode String values such as query string and parameter values, the charset is UTF-8 by default and if it’s set to null then the client will use the default Java charset instead.

You can also specify collation for a connection like

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// set connection collation to utf8_general_ci instead of the default collation utf8mb4_general_ci
// setting a collation will override the charset option
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");

Note setting a collation on the data object will override the charset and characterEncoding option.

You can execute SQL SHOW COLLATION; or SHOW CHARACTER SET; to get the supported collations and charsets by the server.

More information about MySQL charsets and collations can be found in the MySQL Reference Manual.

connection attributes

You can also configure the connection attributes with the setProperties or addProperty methods. Note setProperties will override the default client properties.

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// Add a connection attribute
connectOptions.addProperty("_java_version", "1.8.0_212");

// Override the attributes
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);

More information about client connection attributes can be found in the MySQL Reference Manual.

useAffectedRows

You can configure the useAffectedRows option to decide whether to set CLIENT_FOUND_ROWS flag when connecting to the server. If the CLIENT_FOUND_ROWS flag is specified then the affected rows count is the numeric value of rows found rather than affected.

More information about this can be found in the MySQL Reference Manual

Connection URI

Apart from configuring with a MySQLConnectOptions data object, We also provide you an alternative way to connect when you want to configure with a connection URI:

String connectionUri = "mysql://dbuser:[email protected]:3306/mydb";

// Create the pool from the connection URI
Pool pool = MySQLBuilder
  .pool()
  .connectingTo(connectionUri)
  .using(vertx)
  .build();

// Create the connection from the connection URI
MySQLConnection.connect(vertx, connectionUri)
  .onComplete(res -> {
    // Handling your connection
  });

More information about connection string formats can be found in the MySQL Reference Manual.

Currently, the client supports the following parameter keys (case-insensitive):

  • host

  • port

  • user

  • password

  • schema

  • socket

  • useAffectedRows

Configuring parameters in connection URI will override the default properties.

Connect retries

You can configure the client to retry when a connection fails to be established.

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

Running queries

When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.

Here is how to run simple queries:

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Prepared queries

You can do the same with prepared queries.

The SQL string can refer to parameters by position, using the database syntax `?`

client
  .preparedQuery("SELECT * FROM users WHERE id=?")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Query methods provides an asynchronous RowSet instance that works for SELECT queries

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute()
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

or UPDATE/INSERT queries:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
  .execute(Tuple.of("Julien", "Viet"))
  .onComplete(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

The Row gives you access to your data by index

System.out.println("User " + row.getString(0) + " " + row.getString(1));
Column indexes start at 0, not at 1.

Alternatively, data can be retrieved by name:

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.

You can access a wide variety of of types

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

You can use cached prepared statements to execute one-shot prepared queries:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

You can create a PreparedStatement and manage the lifecycle by yourself.

sqlConnection
  .prepare("SELECT * FROM users WHERE id = ?")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"))
        .onComplete(ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

Batches

You can execute prepared batch

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
  .executeBatch(batch)
  .onComplete(res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

MySQL LAST_INSERT_ID

You can get the auto incremented value if you insert a record into the table.

client
  .query("INSERT INTO test(val) VALUES ('v1')")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
      System.out.println("Last inserted id is: " + lastInsertId);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

More information can be found in How to Get the Unique ID for the Last Inserted Row.

Using connections

Getting a connection

When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool. Remember that between acquiring the connection from the pool and returning it to the pool, you should take care of the connection because it might be closed by the server for some reason such as an idle time out.

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

Prepared queries can be created:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

Simplified connection API

When you use a pool, you can call withConnection to pass it a function executed within a connection.

It borrows a connection from the pool and calls the function with this connection.

The function must return a future of an arbitrary result.

After the future completes, the connection is returned to the pool and the overall result is provided.

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

Using transactions

Transactions with connections

You can execute transaction using SQL BEGIN/COMMIT/ROLLBACK, if you do so you must use a SqlConnection and manage it yourself.

Or you can use the transaction API of SqlConnection:

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
    // Begin the transaction
    conn.begin()
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(v -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until end of transaction block), the transaction is rollbacked and the completion future is failed with a TransactionRollbackException:

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

Simplified transaction API

When you use a pool, you can call withTransaction to pass it a function executed within a transaction.

It borrows a connection from the pool, begins the transaction and calls the function with a client executing all operations in the scope of this transaction.

The function must return a future of an arbitrary result:

  • when the future succeeds the client will commit the transaction

  • when the future fails the client will rollback the transaction

After the transaction completes, the connection is returned to the pool and the overall result is provided.

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

Cursors and streaming

At the time of writing, you cannot use the following features if you connect to your database through ProxySQL. If you try, you will see this error message:

RECEIVED AN UNKNOWN COMMAND: 28 -- PLEASE REPORT A BUG

This is because the proxy does not handle a type of command (COM_STMT_FETCH) that is required to fetch rows from a cursor.

By default, prepared query execution fetches all rows, you can use a Cursor to control the amount of rows you want to read:

connection
  .prepare("SELECT * FROM users WHERE age > ?")
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // Create a cursor
    Cursor cursor = pq.cursor(Tuple.of(18));

    // Read 50 rows
    cursor
      .read(50)
      .onComplete(ar2 -> {
      if (ar2.succeeded()) {
        RowSet<Row> rows = ar2.result();

        // Check for more ?
        if (cursor.hasMore()) {
          // Repeat the process...
        } else {
          // No more rows - close the cursor
          cursor.close();
        }
      }
    });
  }
});

Cursors shall be closed when they are released prematurely:

cursor
  .read(50)
  .onComplete(ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.

connection
  .prepare("SELECT * FROM users WHERE age > ?")
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // Fetch 50 rows at a time
    RowStream<Row> stream = pq.createStream(50, Tuple.of(18));

    // Use the stream
    stream.exceptionHandler(err -> {
      System.out.println("Error: " + err.getMessage());
    });
    stream.endHandler(v -> {
      System.out.println("End of stream");
    });
    stream.handler(row -> {
      System.out.println("User: " + row.getString("last_name"));
    });
  }
});

The stream read the rows by batch of 50 and stream them, when the rows have been passed to the handler, a new batch of 50 is read and so on.

The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.

Tracing queries

The SQL client can trace query execution when Vert.x has tracing enabled.

The client reports the following client spans:

  • Query operation name

  • tags

  • db.system: the database management system product

  • db.user: the database username

  • db.instance: the database instance

  • db.statement: the SQL query

  • db.type: sql

The default tracing policy is PROPAGATE, the client will only create a span when involved in an active trace.

You can change the client policy with setTracingPolicy, e.g you can set ALWAYS to always report a span:

options.setTracingPolicy(TracingPolicy.ALWAYS);

MySQL type mapping

Currently the client supports the following MySQL types

  • BOOL,BOOLEAN (java.lang.Byte)

  • TINYINT (java.lang.Byte)

  • TINYINT UNSIGNED(java.lang.Short)

  • SMALLINT (java.lang.Short)

  • SMALLINT UNSIGNED(java.lang.Integer)

  • MEDIUMINT (java.lang.Integer)

  • MEDIUMINT UNSIGNED(java.lang.Integer)

  • INT,INTEGER (java.lang.Integer)

  • INTEGER UNSIGNED(java.lang.Long)

  • BIGINT (java.lang.Long)

  • BIGINT UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • FLOAT (java.lang.Float)

  • FLOAT UNSIGNED(java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DOUBLE UNSIGNED(java.lang.Double)

  • BIT (java.lang.Long)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • NUMERIC UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • DATE (java.time.LocalDate)

  • DATETIME (java.time.LocalDateTime)

  • TIME (java.time.Duration)

  • TIMESTAMP (java.time.LocalDateTime)

  • YEAR (java.lang.Short)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • BINARY (io.vertx.core.buffer.Buffer)

  • VARBINARY (io.vertx.core.buffer.Buffer)

  • TINYBLOB (io.vertx.core.buffer.Buffer)

  • TINYTEXT (java.lang.String)

  • BLOB (io.vertx.core.buffer.Buffer)

  • TEXT (java.lang.String)

  • MEDIUMBLOB (io.vertx.core.buffer.Buffer)

  • MEDIUMTEXT (java.lang.String)

  • LONGBLOB (io.vertx.core.buffer.Buffer)

  • LONGTEXT (java.lang.String)

  • ENUM (java.lang.String)

  • SET (java.lang.String)

  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • GEOMETRY(io.vertx.mysqlclient.data.spatial.*)

Tuple decoding uses the above types when storing values

Note: In Java there is no specific representations for unsigned numeric values, so this client will convert an unsigned value to the correlated Java type.

Implicit type conversion

The Reactive MySQL Client supports implicit type conversions when executing a prepared statement. Suppose you have a TIME column in your table, the two examples below will both work here.

client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of(LocalTime.of(19, 10, 25)))
  .onComplete(ar -> {
    // handle the results
  });
// this will also work with implicit type conversion
client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of("19:10:25"))
  .onComplete(ar -> {
    // handle the results
  });

The MySQL data type for encoding will be inferred from the parameter values and here is the type mapping

Parameter value type encoding MySQL type

null

MYSQL_TYPE_NULL

java.lang.Byte

MYSQL_TYPE_TINY

java.lang.Boolean

MYSQL_TYPE_TINY

java.lang.Short

MYSQL_TYPE_SHORT

java.lang.Integer

MYSQL_TYPE_LONG

java.lang.Long

MYSQL_TYPE_LONGLONG

java.lang.Double

MYSQL_TYPE_DOUBLE

java.lang.Float

MYSQL_TYPE_FLOAT

java.time.LocalDate

MYSQL_TYPE_DATE

java.time.Duration

MYSQL_TYPE_TIME

java.time.LocalTime

MYSQL_TYPE_TIME

io.vertx.core.buffer.Buffer

MYSQL_TYPE_BLOB

java.time.LocalDateTime

MYSQL_TYPE_DATETIME

io.vertx.mysqlclient.data.spatial.*

MYSQL_TYPE_BLOB

default

MYSQL_TYPE_STRING

Handling BOOLEAN

In MySQL BOOLEAN and BOOL data types are synonyms for TINYINT(1). A value of zero is considered false, non-zero values are considered true. A BOOLEAN data type value is stored in Row or Tuple as java.lang.Byte type, you can call Row#getValue to retrieve it as a java.lang.Byte value, or you can call Row#getBoolean to retrieve it as java.lang.Boolean value.

client
  .query("SELECT graduated FROM students WHERE id = 0")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rowSet = ar.result();
      for (Row row : rowSet) {
        int pos = row.getColumnIndex("graduated");
        Byte value = row.get(Byte.class, pos);
        Boolean graduated = row.getBoolean("graduated");
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

When you want to execute a prepared statement with a param of a BOOLEAN value, you can simply add the java.lang.Boolean value to the params list.

client
  .preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
  .execute(Tuple.of(true))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Updated with the boolean value");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

Handling JSON

MySQL JSON data type is represented by the following Java types:

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL for representing the JSON null literal

Tuple tuple = Tuple.of(
  Tuple.JSON_NULL,
  new JsonObject().put("foo", "bar"),
  3);

// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL

//
value = tuple.get(JsonObject.class, 1); // Expect JSON object

//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3

Handling BIT

The BIT data type is mapped to java.lang.Long type, but Java has no notion of unsigned numeric values, so if you want to insert or update a record with the max value of BIT(64), you can do some tricks setting the parameter to -1L.

Handling TIME

MySQL TIME data type can be used to represent either time of a day or a time interval which ranges from -838:59:59 to 838:59:59. In Reactive MySQL client the TIME data type is mapped to java.time.Duration natively, but you can also retrieve it as a java.time.LocalTime via Row#getLocalTime accessor.

Handling NUMERIC

The Numeric Java type is used to represent the MySQL NUMERIC type.

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // Handle NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

Handling ENUM

MySQL supports ENUM data type and the client retrieves these types as String data type.

You can encode Java enums as String like this:

client
  .preparedQuery("INSERT INTO colors VALUES (?)")
  .execute(Tuple.of(Color.red))
  .onComplete(res -> {
    // ...
  });

You can retrieve the ENUM column as Java enums like this:

client
  .preparedQuery("SELECT color FROM colors")
  .execute()
  .onComplete(res -> {
  if (res.succeeded()) {
    RowSet<Row> rows = res.result();
    for (Row row : rows) {
      System.out.println(row.get(Color.class, "color"));
    }
  }
});

Handling GEOMETRY

MYSQL GEOMETRY data type is also supported, Here are some examples showing that you can handle the geometry data in Well-Known Text (WKT) format or Well-Known Binary (WKB) format, the data are decoded as MySQL TEXT OR BLOB data type. There are many great third-party libraries for handling data in this format.

You can fetch spatial data in WKT format:

client
  .query("SELECT ST_AsText(g) FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data in WKT format
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        String wktString = row.getString(0);
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

Or you can fetch spatial data in WKB format:

client
  .query("SELECT ST_AsBinary(g) FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data in WKB format
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        Buffer wkbValue = row.getBuffer(0);
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

We also provide you a simple way to handle the geometry data type in Reactive MySQL Client.

You can retrieve the geometry data as Vert.x Data Object:

client
  .query("SELECT g FROM geom;")
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      // Fetch the spatial data as a Vert.x Data Object
      RowSet<Row> result = ar.result();
      for (Row row : result) {
        Point point = row.get(Point.class, 0);
        System.out.println("Point x: " + point.getX());
        System.out.println("Point y: " + point.getY());
      }
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

You can also take it as a prepared statement parameter in a WKB representation.

Point point = new Point(0, 1.5, 1.5);
// Send as a WKB representation
client
  .preparedQuery("INSERT INTO geom VALUES (ST_GeomFromWKB(?))")
  .execute(Tuple.of(point))
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Success");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

Collector queries

You can use Java collectors with the query API:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // Get the map created by the collector
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

The collector processing must not keep a reference on the Row as there is a single row used for processing the entire set.

The Java Collectors provides many interesting predefined collectors, for example you can create easily create a string directly from the row set:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL Stored Procedure

You can run stored procedures in queries. The result will be retrieved from the server following the MySQL protocol without any magic here.

client.query("CREATE PROCEDURE multi() BEGIN\n" +
  "  SELECT 1;\n" +
  "  SELECT 1;\n" +
  "  INSERT INTO ins VALUES (1);\n" +
  "  INSERT INTO ins VALUES (2);\n" +
  "END;")
  .execute()
  .onComplete(ar1 -> {
  if (ar1.succeeded()) {
    // create stored procedure success
    client
      .query("CALL multi();")
      .execute()
      .onComplete(ar2 -> {
      if (ar2.succeeded()) {
        // handle the result
        RowSet<Row> result1 = ar2.result();
        Row row1 = result1.iterator().next();
        System.out.println("First result: " + row1.getInteger(0));

        RowSet<Row> result2 = result1.next();
        Row row2 = result2.iterator().next();
        System.out.println("Second result: " + row2.getInteger(0));

        RowSet<Row> result3 = result2.next();
        System.out.println("Affected rows: " + result3.rowCount());
      } else {
        System.out.println("Failure: " + ar2.cause().getMessage());
      }
    });
  } else {
    System.out.println("Failure: " + ar1.cause().getMessage());
  }
});

Note: Prepared statements binding OUT parameters is not supported for now.

MySQL LOCAL INFILE

This client supports for handling the LOCAL INFILE Request, if you want to load data from a local file into the server, you can use query LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;. More information can be found in the MySQL Reference Manual.

Authentication

Default authentication plugin

This client supports for specifying the default authentication plugin to use at the connection start. Currently the following plugins are supported:

  • mysql_native_password

  • caching_sha2_password

  • mysql_clear_password

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // set the default authentication plugin

New authentication method introduced in MySQL 8

MySQL 8.0 introduces a new authentication method named caching_sha2_password and it’s the default one to authenticate. In order to connect to the server using this new authentication method, you need either use a secure connection(i.e. enable TLS/SSL) or exchange the encrypted password using an RSA key pair to avoid leaks of password. The RSA key pair is automatically exchanged during the communication, but the server RSA public key may be hacked during the process since it’s transferred on a insecure connection. So if you’re on a insecure connection and want to avoid the risk of exposing the server RSA public key, you can set the server RSA public key like this:

MySQLConnectOptions options1 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyPath("tls/files/public_key.pem"); // configure with path of the public key

MySQLConnectOptions options2 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
    "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
    "xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
    "HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
    "aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
    "0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
    "7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
    "5QIDAQAB\n" +
    "-----END PUBLIC KEY-----\n")); // configure with buffer of the public key

More information about the caching_sha2_password authentication method can be found in the MySQL Reference Manual.

Using SSL/TLS

To configure the client to use SSL connection, you can configure the MySQLConnectOptions like a Vert.x NetClient. All SSL modes are supported and you are able to configure sslmode. The client is in DISABLED SSL mode by default. ssl parameter is kept as a mere shortcut for setting sslmode. setSsl(true) is equivalent to setSslMode(VERIFY_CA) and setSsl(false) is equivalent to setSslMode(DISABLED).

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));

MySQLConnection.connect(vertx, options)
  .onComplete(res -> {
    if (res.succeeded()) {
      // Connected with SSL
    } else {
      System.out.println("Could not connect " + res.cause());
    }
  });

More information can be found in the Vert.x documentation.

MySQL utility command

Sometimes you want to use MySQL utility commands and we provide support for this. More information can be found in the MySQL utility commands.

COM_PING

You can use COM_PING command to check if the server is alive. The handler will be notified if the server responds to the PING, otherwise the handler will never be called.

connection.ping().onComplete(ar -> {
  System.out.println("The server has responded to the PING");
});

COM_RESET_CONNECTION

You can reset the session state with COM_RESET_CONNECTION command, this will reset the connection state like: - user variables - temporary tables - prepared statements

connection
  .resetConnection()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Connection has been reset now");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_CHANGE_USER

You can change the user of the current connection, this will perform a re-authentication and reset the connection state like COM_RESET_CONNECTION.

MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
  .setUser("newuser")
  .setPassword("newpassword")
  .setDatabase("newdatabase");
connection
  .changeUser(authenticationOptions)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("User of current connection has been changed.");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_INIT_DB

You can use COM_INIT_DB command to change the default schema of the connection.

connection
  .specifySchema("newschema")
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Default schema changed to newschema");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_STATISTICS

You can use COM_STATISTICS command to get a human readable string of some internal status variables in MySQL server.

connection
  .getInternalStatistics()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Statistics: " + ar.result());
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_DEBUG

You can use COM_DEBUG command to dump debug info to the MySQL server’s STDOUT.

connection
  .debug()
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("Debug info dumped to server's STDOUT");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

COM_SET_OPTION

You can use COM_SET_OPTION command to set options for the current connection. Currently only CLIENT_MULTI_STATEMENTS can be set.

For example, you can disable CLIENT_MULTI_STATEMENTS with this command.

connection
  .setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF)
  .onComplete(ar -> {
    if (ar.succeeded()) {
      System.out.println("CLIENT_MULTI_STATEMENTS is off now");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL and MariaDB version support matrix

MySQL MariaDB

Version

Supported

Version

Supported

5.5

10.1

5.6

10.2

5.7

10.3

8.0

10.4

Known issues:

  • Reset connection utility command does not work in MySQL 5.5, 5.6 and MariaDB 10.1

  • Change user utility command is not supported with MariaDB 10.2 and 10.3

Pitfalls & Good Practices

Here are some good practices for you to avoid common pitfalls when using the Reactive MySQL Client.

prepared statement count limit

Sometimes you might meet the notorious error Can’t create more than max_prepared_stmt_count statements (current value: 16382), this is because the server has reached the limit of total number of prepared statement.

You can adjust the server system variable max_prepared_stmt_count but it has an upper bound value so you can’t get rid of the error in this way.

The best way to alleviate this is enabling prepared statement caching, so the prepared statements with the same SQL string could be reused and the client does not have to create a brand new prepared statement for every request. The prepared statement will be automatically closed after the statement is executed. In this way the chances of reaching the limit could be greatly reduced though it could not be totally eliminated.

You can also manage the lifecycle of prepared statements manually by creating a PreparedStatement object via SqlConnection#prepare interface so that you can choose when to deallocate the statement handle, or even use the SQL syntax prepared statement.

demystifying prepared batch

There is time when you want to batch insert data into the database, you can use PreparedQuery#executeBatch which provides a simple API to handle this. Keep in mind that MySQL does not natively support batching protocol so the API is only a sugar by executing the prepared statement one after another, which means more network round trips are required comparing to inserting multiple rows by executing one prepared statement with a list of values.

tricky DATE & TIME data types

Handling MYSQL DATE and TIME data types especially with time zones is tricky therefore the Reactive MySQL Client does no magic transformation for those values.

  • MySQL DATETIME data type does not contain time zone info, so what you get is identical to what you set no matter what time zone is in the current session.

  • MySQL TIMESTAMP data type contains time zone info, so when you set or get the value it’s always transformed by the server with the timezone set in the current session.

Advanced pool configuration

Server load balancing

You can configure the pool with a list of servers instead of a single server.

Pool pool = MySQLBuilder.pool()
  .with(options)
  .connectingTo(Arrays.asList(server1, server2, server3))
  .using(vertx)
  .build();

The pool uses a round-robin load balancing when a connection is created to select different servers.

this provides load balancing when the connection is created and not when the connection is borrowed from the pool.

Pool connection initialization

You can use the withConnectHandler to interact with a connection after it has been created and before it is inserted in a pool.

builder.withConnectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    // Release the connection to the pool, ready to be used by the application
    conn.close();
  });
});

Once you are done with the connection, you should simply close it to signal the pool to use it.

Dynamic connection configuration

You can configure the pool connection details using a Java supplier instead of an instance of SqlConnectOptions.

Since the supplier is asynchronous, it can be used to provide dynamic pool configuration (e.g. password rotation).

Pool pool = MySQLBuilder.pool()
  .connectingTo(() -> {
    Future<SqlConnectOptions> connectOptions = retrieveOptions();
    return connectOptions;
  })
  .using(vertx)
  .build();