Vert.x Circuit Breaker

Vert.x Circuit Breaker is an implementation of the Circuit Breaker pattern for Vert.x. It keeps track of the number of failures and opens the circuit when a threshold is reached. Optionally, a fallback is executed.

Supported failures are:

  • failures reported by your code in a Future

  • exception thrown by your code

  • uncompleted futures (timeout)

Operations guarded by a circuit breaker are intended to be non-blocking and asynchronous in order to benefit from the Vert.x execution model.

Using the vert.x circuit breaker

To use the Vert.x Circuit Breaker, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-circuit-breaker</artifactId>
 <version>4.5.7</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-circuit-breaker:4.5.7'

Using the circuit breaker

To use the circuit breaker you need to:

  1. Create a circuit breaker, with the configuration you want (timeout, number of failure before opening the circuit)

  2. Execute some code using the breaker

Important: Don’t recreate a circuit breaker on every call. A circuit breaker is a stateful entity. It is recommended to store the circuit breaker instance in a field.

Here is an example:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions()
        .setMaxFailures(5) // number of failure before opening the circuit
        .setTimeout(2000) // consider a failure if the operation does not succeed in time
        .setFallbackOnFailure(true) // do we call the fallback on failure
        .setResetTimeout(10000) // time spent in open state before attempting to re-try
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.execute(promise -> {
  // some code executing with the breaker
  // the code reports failures or success on the given promise.
  // if this promise is marked as failed, the breaker increased the
  // number of failures
}).onComplete(ar -> {
  // Get the operation result.
});

The executed block receives a Future object as parameter, to denote the success or failure of the operation as well as the result. For example in the following example, the result is the output of a REST endpoint invocation:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.<String>execute(promise -> {
  vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(req -> req
      .send()
      .compose(resp -> {
        if (resp.statusCode() != 200) {
          return Future.failedFuture("HTTP error");
        } else {
          return resp.body().map(Buffer::toString);
        }
      })).onComplete(promise);
}).onComplete(ar -> {
  // Do something with the result
});

The result of the operation is provided using the:

  • returned Future when calling execute methods

  • provided Future when calling the executeAndReport methods

Optionally, you can provide a fallback which is executed when the circuit is open:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);

// ---
// Store the circuit breaker in a field and access it as follows
// ---

breaker.executeWithFallback(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
      }, v -> {
      // Executed when the circuit is opened
      return "Hello";
    })
    .onComplete(ar -> {
      // Do something with the result
    });

The fallback is called whenever the circuit is open, or if the isFallbackOnFailure is enabled. When a fallback is set, the result is using the output of the fallback function. The fallback function takes as parameter a Throwable object and returns an object of the expected type.

The fallback can also be set on the CircuitBreaker object directly:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
  // Executed when the circuit is opened.
  return "hello";
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

Retries

You can also specify how often the circuit breaker should try your code before failing with setMaxRetries. If you set this to something higher than 0, your code gets executed several times before finally failing in the last execution. If the code succeeded in one of the retries your handler gets notified and any retries left are skipped. Retries are only supported when the circuit is closed.

Note
If you set maxRetries to 2, your operation may be called 3 times: the initial attempt and 2 retries.

By default, the timeout between retries is set to 0, which means that retries will be executed one after another without any delay. This, however, will result in increased load on the called service and may delay its recovery. In order to mitigate this problem, it is recommended to execute retries with a delay.

The retryPolicy method can be used to specify a retry policy. A retry policy is a function which receives the operation failure and retry count as arguments and returns a timeout in milliseconds before retry is executed.

It allows to implement complex policies, e.g. using the value of the Retry-After header sent by an unavailable service. But a few common policies are provided: RetryPolicy.constantDelay, RetryPolicy.linearDelay and RetryPolicy.exponentialDelayWithJitter

Below is an example of exponential delay with jitter:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
  new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
}).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500));

breaker.<String>execute(
  promise -> {
    vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
      .compose(req -> req
        .send()
        .compose(resp -> {
          if (resp.statusCode() != 200) {
            return Future.failedFuture("HTTP error");
          } else {
            return resp.body().map(Buffer::toString);
          }
        })).onComplete(promise);
  });

Failure Policy

By default, the failure policy of a circuit breaker is to report a failure if the command doesn’t complete successfully. Alternatively, you may configure the failure policy of the circuit breaker with failurePolicy. This will let you specify the criteria in which an AsyncResult is treated as a failure by the circuit breaker. If you decide to override the failure policy, just be aware that it could allow failed results in the future provided in functions like executeAndReport.

Below is an example of using a custom defined failure policy.

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
breaker.<HttpClientResponse>failurePolicy(ar -> {
  // A failure will be either a failed operation or a response with a status code other than 200
  if (ar.failed()) {
    return true;
  }
  HttpClientResponse resp = ar.result();
  return resp.statusCode() != 200;
});

Future<HttpClientResponse> future = breaker.execute(promise -> {
  vertx.createHttpClient()
    .request(HttpMethod.GET, 8080, "localhost", "/")
    .compose(request -> request.send()
      // Complete when the body is fully received
      .compose(response -> response.body().map(response)))
    .onComplete(promise);
});

Callbacks

You can also configure callbacks invoked when the circuit is opened or closed:

CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
    new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
  System.out.println("Circuit opened");
}).closeHandler(v -> {
  System.out.println("Circuit closed");
});

breaker.<String>execute(
    promise -> {
      vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
        .compose(req -> req
          .send()
          .compose(resp -> {
            if (resp.statusCode() != 200) {
              return Future.failedFuture("HTTP error");
            } else {
              return resp.body().map(Buffer::toString);
            }
          })).onComplete(promise);
    });

You can also be notified when the circuit breaker decides to attempt to reset (half-open state). You can register such a callback with halfOpenHandler.

Event bus notification

Every time the circuit state changes, an event can be published on the event bus.

To enable this feature, set the notification address to a value that is not null:

options.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);

The event contains circuit breaker metrics which computation requires the following dependency to be added the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>org.hdrhistogram</groupId>
 <artifactId>HdrHistogram</artifactId>
 <version>2.1.12</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'org.hdrhistogram:HdrHistogram:2.1.12'
Note

When enabled, notifications are, by default, delivered only to local consumers. If the notification must be sent to all consumers in a cluster, you can change this behavior with setNotificationLocalOnly.

Each event contains a Json Object with:

  • state: the new circuit breaker state (OPEN, CLOSED, HALF_OPEN)

  • name: the name of the circuit breaker

  • failures: the number of failures

  • node: the identifier of the node (local if Vert.x is not running in cluster mode)

  • metrics

The half-open state

When the circuit is "open", calls to the circuit breaker fail immediately, without any attempt to execute the real operation. After a suitable amount of time (configured from setResetTimeout, the circuit breaker decides that the operation has a chance of succeeding, so it goes into the half-open state. In this state, the next call to the circuit breaker is allowed to execute the dangerous operation. Should the call succeed, the circuit breaker resets and returns to the closed state, ready for more routine operation. If this trial call fails, however, the circuit breaker returns to the open state until another timeout elapses.

Reported exceptions

The fallback receives a:

Pushing circuit breaker metrics to the Hystrix Dashboard

Netflix Hystrix comes with a dashboard to present the current state of the circuit breakers. The Vert.x circuit breakers can publish their metrics in order to be consumed by this Hystrix Dashboard. The Hystrix dashboard requires a SSE stream sending the metrics. This stream is provided by the HystrixMetricHandler Vert.x Web Handler:

CircuitBreakerOptions options = new CircuitBreakerOptions()
  .setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions(options));
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx, new CircuitBreakerOptions(options));

// Create a Vert.x Web router
Router router = Router.router(vertx);
// Register the metric handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));

// Create the HTTP server using the router to dispatch the requests
vertx.createHttpServer()
  .requestHandler(router)
  .listen(8080);

In the Hystrix Dashboard, configure the stream url like: http://localhost:8080/metrics. The dashboard now consumes the metrics from the Vert.x circuit breakers.

Important
The metrics are collected by the Vert.x Web handler using Event bus notification. The feature must be enabled and, if you don’t use the default notification address, you need to pass it when creating the metrics handler.

Using Netflix Hystrix

Hystrix provides an implementation of the circuit breaker pattern. You can use Hystrix with Vert.x instead of this circuit breaker or in combination of. This section describes the tricks to use Hystrix in a vert.x application.

First you would need to add the Hystrix dependency to your classpath or build descriptor. Refer to the Hystrix page for details. Then, you need to isolate the "protected" call in a Command. Once you have your command, you can execute it:

HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();

However, the command execution is blocking, so have to call the command execution either in an executeBlocking block or in a worker verticle:

HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute())).onComplete(ar -> {
// back on the event loop
String result = ar.result();
}
);

If you use the async support of Hystrix, be careful that callbacks are not called in a vert.x thread and you have to keep a reference on the context before the execution (with getOrCreateContext, and in the callback, switch back to the event loop using runOnContext. Without this, you are loosing the Vert.x concurrency model and have to manage the synchronization and ordering yourself:

vertx.runOnContext(v -> {
Context context = vertx.getOrCreateContext();
HystrixCommand<String> command = getSomeCommandInstance();
command.observe().subscribe(result -> {
context.runOnContext(v2 -> {
// Back on context (event loop or worker)
String r = result;
});
});
});