Skip to main content

The Reactive MySQL Client is a client for MySQL with a straightforward API focusing on scalability and low overhead.

Features

  • Event driven

  • Lightweight

  • Built-in connection pooling

  • Prepared queries caching

  • Cursor support

  • Row streaming

  • RxJava 1 and RxJava 2

  • Direct memory to object without unnecessary copies

  • Java 8 Date and Time

  • MySQL utilities commands support

  • Compatible with MySQL 5.6 and 5.7

Usage

To use the Reactive MySQL Client add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-mysql-client</artifactId>
 <version>3.8.0</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-mysql-client:3.8.0'
}

Getting started

Here is the simplest way to connect, query and disconnect

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

// A simple query
client.query("SELECT * FROM users WHERE id='julien'", ar -> {
  if (ar.succeeded()) {
    RowSet result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // Now close the pool
  client.close();
});

Connecting to MySQL

Most of the time you will use a pool to connect to MySQL:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.

If you are running with Vert.x you can pass it your Vertx instance:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// Create the pooled client
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

You need to release the pool when you don’t need it anymore:

pool.close();

When you need to execute several operations on the same connection, you need to use a client connection.

You can easily get one from the pool:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

// Get a connection from the pool
client.getConnection(ar1 -> {

  if (ar1.succeeded()) {

    System.out.println("Connected");

    // Obtain our connection
    SqlConnection conn = ar1.result();

    // All operations execute on the same connection
    conn.query("SELECT * FROM users WHERE id='julien'", ar2 -> {
      if (ar2.succeeded()) {
        conn.query("SELECT * FROM users WHERE id='emad'", ar3 -> {
          // Release the connection to the pool
          conn.close();
        });
      } else {
        // Release the connection to the pool
        conn.close();
      }
    });
  } else {
    System.out.println("Could not connect: " + ar1.cause().getMessage());
  }
});

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

Configuration

There are several alternatives for you to configure the client.

data object

A simple way to configure the client is to specify a MySQLConnectOptions data object.

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

pool.getConnection(ar -> {
  // Handling your connection
});

You can also configure the connection attributes with the setProperties or addProperty methods. Note setProperties will override the default client properties.

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// Add a connection attribute
connectOptions.addProperty("_java_version", "1.8.0_212");

// Override the attributes
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);

More information about client connection attributes can be found in the MySQL Reference Manual.

connection uri

Apart from configuring with a MySQLConnectOptions data object, We also provide you an alternative way to connect when you want to configure with a connection URI:

String connectionUri = "mysql://dbuser:[email protected]:3211/mydb";

// Create the pool from the connection URI
MySQLPool pool = MySQLPool.pool(connectionUri);

// Create the connection from the connection URI
MySQLConnection.connect(vertx, connectionUri, res -> {
  // Handling your connection
});

More information about connection string formats can be found in the MySQL Reference Manual.

Currently the client supports the following parameter key words in connection uri

  • host

  • port

  • user

  • password

  • schema

  • socket

Running queries

When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.

Here is how to run simple queries:

client.query("SELECT * FROM users WHERE id='julien'", ar -> {
  if (ar.succeeded()) {
    RowSet result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

You can do the same with prepared queries.

The SQL string can refer to parameters by position, using $1, $2, etc…​

client.preparedQuery("SELECT * FROM users WHERE id=?", Tuple.of("julien"), ar -> {
  if (ar.succeeded()) {
    RowSet rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Query methods provides an asynchronous RowSet instance that works for SELECT queries

client.preparedQuery("SELECT first_name, last_name FROM users", ar -> {
  if (ar.succeeded()) {
    RowSet rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

or UPDATE/INSERT queries:

client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)", Tuple.of("Julien", "Viet"), ar -> {
  if (ar.succeeded()) {
    RowSet rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

The Row gives you access to your data by index

System.out.println("User " + row.getString(0) + " " + row.getString(1));

or by name

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

You can access a wide variety of of types

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

You can execute prepared batch

You can cache prepared queries:

You can fetch generated keys with a 'RETURNING' clause in your query:

Using connections

When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool:

pool.getConnection(ar1 -> {
  if (ar1.succeeded()) {
    SqlConnection connection = ar1.result();

    connection.query("SELECT * FROM users WHERE id='julien'", ar2 -> {
      if (ar1.succeeded()) {
        connection.query("SELECT * FROM users WHERE id='paulo'", ar3 -> {
          // Do something with rows and return the connection to the pool
          connection.close();
        });
      } else {
        // Return the connection to the pool
        connection.close();
      }
    });
  }
});

Prepared queries can be created:

connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> {
  if (ar1.succeeded()) {
    PreparedQuery pq = ar1.result();
    pq.execute(Tuple.of("julien"), ar2 -> {
      if (ar2.succeeded()) {
        // All rows
        RowSet rows = ar2.result();
      }
    });
  }
});
Note
prepared query caching depends on the setCachePreparedStatements and does not depend on whether you are creating prepared queries or use direct prepared queries

PreparedQuery can perform efficient batching:

Cursors and streaming

By default prepared query execution fetches all rows, you can use a Cursor to control the amount of rows you want to read:

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  if (ar1.succeeded()) {
    PreparedQuery pq = ar1.result();

    // Create a cursor
    Cursor cursor = pq.cursor(Tuple.of(18));

    // Read 50 rows
    cursor.read(50, ar2 -> {
      if (ar2.succeeded()) {
        RowSet rows = ar2.result();

        // Check for more ?
        if (cursor.hasMore()) {
          // Repeat the process...
        } else {
          // No more rows - close the cursor
          cursor.close();
        }
      }
    });
  }
});

PostreSQL destroys cursors at the end of a transaction, so the cursor API shall be used within a transaction, otherwise you will likely get the 34000 PostgreSQL error.

Cursors shall be closed when they are released prematurely:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  if (ar1.succeeded()) {
    PreparedQuery pq = ar1.result();

    // Fetch 50 rows at a time
    RowStream<Row> stream = pq.createStream(50, Tuple.of(18));

    // Use the stream
    stream.exceptionHandler(err -> {
      System.out.println("Error: " + err.getMessage());
    });
    stream.endHandler(v -> {
      System.out.println("End of stream");
    });
    stream.handler(row -> {
      System.out.println("User: " + row.getString("last_name"));
    });
  }
});

The stream read the rows by batch of 50 and stream them, when the rows have been passed to the handler, a new batch of 50 is read and so on.

The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.

MySQL type mapping

Currently the client supports the following MySQL types

  • BOOL,BOOLEAN (java.lang.Byte)

  • TINYINT (java.lang.Byte)

  • SMALLINT (java.lang.Short)

  • MEDIUMINT (java.lang.Integer)

  • INT,INTEGER (java.lang.Integer)

  • BIGINT (java.lang.Long)

  • FLOAT (java.lang.Float)

  • DOUBLE (java.lang.Double)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • DATE (java.time.LocalDate)

  • DATETIME (java.time.LocalDateTime)

  • TIME (java.time.Duration)

  • TIMESTAMP (java.time.LocalDateTime)

  • YEAR (java.lang.Short)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • BINARY (io.vertx.core.buffer.Buffer)

  • VARBINARY (io.vertx.core.buffer.Buffer)

  • TINYBLOB (io.vertx.core.buffer.Buffer)

  • TINYTEXT (java.lang.String)

  • BLOB (io.vertx.core.buffer.Buffer)

  • TEXT (java.lang.String)

  • MEDIUMBLOB (io.vertx.core.buffer.Buffer)

  • MEDIUMTEXT (java.lang.String)

  • LONGBLOB (io.vertx.core.buffer.Buffer)

  • LONGTEXT (java.lang.String)

Tuple decoding uses the above types when storing values

Handling BOOLEAN

In MySQL BOOLEAN and BOOL data types are synonyms for TINYINT(1). A value of zero is considered false, non-zero values are considered true. A BOOLEAN data type value is stored in Row or Tuple as java.lang.Byte type, you can call Row#getValue to retrieve it as a java.lang.Byte value, or you can call Row#getBoolean to retrieve it as java.lang.Boolean value.

client.query("SELECT graduated FROM students WHERE id = 0", ar -> {
  if (ar.succeeded()) {
    RowSet rowSet = ar.result();
    for (Row row : rowSet) {
      int pos = row.getColumnIndex("graduated");
      Byte value = row.get(Byte.class, pos);
      Boolean graduated = row.getBoolean("graduated");
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

When you want to execute a prepared statement with a param of a BOOLEAN value, you can simply add the java.lang.Boolean value to the params list.

client.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0", Tuple.of(true), ar -> {
  if (ar.succeeded()) {
    System.out.println("Updated with the boolean value");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Handling NUMERIC

The Numeric Java type is used to represent the MySQL NUMERIC type.

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // Handle NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

Collector queries

You can use Java collectors with the query API:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users",
  collector,
  ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // Get the map created by the collector
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

The collector processing must not keep a reference on the Row as there is a single row used for processing the entire set.

The Java Collectors provides many interesting predefined collectors, for example you can create easily create a string directly from the row set:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client.query("SELECT * FROM users",
  collector,
  ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL utility command

Sometimes you want to use MySQL utility commands and we provide support for this. More information can be found in the MySQL utility commands.

COM_PING

You can use COM_PING command to check if the server is alive. The handler will be notified if the server responds to the PING, otherwise the handler will never be called.

connection.ping(ar -> {
  System.out.println("The server has responded to the PING");
});

COM_RESET_CONNECTION

You can reset the session state with COM_RESET_CONNECTION command, this will reset the connection state like: - user variables - temporary tables - prepared statements

connection.resetConnection(ar -> {
  if (ar.succeeded()) {
    System.out.println("Connection has been reset now");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_CHANGE_USER

You can change the user of the current connection, this will perform a re-authentication and reset the connection state like COM_RESET_CONNECTION.

MySQLConnectOptions authenticationOptions = new MySQLConnectOptions()
  .setUser("newuser")
  .setPassword("newpassword")
  .setDatabase("newdatabase");
connection.changeUser(authenticationOptions, ar -> {
  if (ar.succeeded()) {
    System.out.println("User of current connection has been changed.");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_INIT_DB

You can use COM_INIT_DB command to change the default schema of the connection.

connection.specifySchema("newschema", ar -> {
  if (ar.succeeded()) {
    System.out.println("Default schema changed to newschema");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_STATISTICS

You can use COM_STATISTICS command to get a human readable string of some internal status variables in MySQL server.

connection.getInternalStatistics(ar -> {
  if (ar.succeeded()) {
    System.out.println("Statistics: " + ar.result());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_DEBUG

You can use COM_DEBUG command to dump debug info to the MySQL server’s STDOUT.

connection.debug(ar -> {
  if (ar.succeeded()) {
    System.out.println("Debug info dumped to server's STDOUT");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_SET_OPTION

You can use COM_SET_OPTION command to set options for the current connection. Currently only CLIENT_MULTI_STATEMENTS can be set.

For example, you can disable CLIENT_MULTI_STATEMENTS with this command.

connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
  if (ar.succeeded()) {
    System.out.println("CLIENT_MULTI_STATEMENTS is off now");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});