[Reactor 3 Reference Guide] 4. Reactor Core Features - 2
4.6. Handling Errors
For a quick look at the available operators for error handling, see the relevant operator decision tree. |
In Reactive Streams, errors are terminal events. As soon as an error occurs, it stops the sequence and gets propagated down the chain of operators to the last step, the Subscriber you defined and its onError method.
Such errors should still be dealt with at the application level. For instance, you might display an error notification in a UI or send a meaningful error payload in a REST endpoint. For this reason, the subscriber’s onError method should always be defined.
Reactive Streams에서 오류는 터미널 이벤트입니다.
오류가 발생하는 즉시 시퀀스를 중지하고 operator 체인을 따라 onError 메서드를 정의한 Subscriber로 전파됩니다.
Subscriber의 onError 메소드는 항상 정의되어야 합니다.
If not defined, onError throws an UnsupportedOperationException. You can further detect and triage it with the Exceptions.isErrorCallbackNotImplemented method. |
Reactor also offers alternative means of dealing with errors in the middle of the chain, as error-handling operators. The following example shows how to do so:
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it. |
오류 처리 연산자에 대해 배우기 전에 반응 시퀀스의 오류는 터미널 이벤트라는 점을 염두에 두어야 합니다.
error-handling operator 를 사용하더라도 원래 시퀀스가 계속되지 않습니다.
오히려 onError 신호를 새 시퀀스의 시작(대체 시퀀스)으로 변환합니다. 즉, 종료된 시퀀스 업스트림을 대체합니다.
Now we can consider each means of error handling one-by-one. When relevant, we make a parallel with imperative programming’s try patterns.
4.6.1. Error Handling Operators
You may be familiar with several ways of dealing with exceptions in a try-catch block. Most notably, these include the following:
- Catch and return a static default value.
- Catch and execute an alternative path with a fallback method.
- Catch and dynamically compute a fallback value.
- Catch, wrap to a BusinessException, and re-throw.
- Catch, log an error-specific message, and re-throw.
- Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
All of these have equivalents in Reactor, in the form of error-handling operators. Before looking into these operators, we first want to establish a parallel between a reactive chain and a try-catch block.
When subscribing, the onError callback at the end of the chain is akin to a catch block. There, execution skips to the catch in case an Exception is thrown, as the following example shows:
Flux<String> s = Flux.range(1, 10)
.map(v -> doSomethingDangerous(v))
.map(v -> doSecondTransform(v));
s.subscribe(value -> System.out.println("RECEIVED " + value),
error -> System.err.println("CAUGHT " + error)
);
A transformation that can throw an exception is performed. | |
If everything went well, a second transformation is performed. | |
Each successfully transformed value is printed out. | |
In case of an error, the sequence terminates and an error message is displayed. |
The preceding example is conceptually similar to the following try-catch block:
try {
for (int i = 1; i < 11; i++) {
String v1 = doSomethingDangerous(i);
String v2 = doSecondTransform(v1);
System.out.println("RECEIVED " + v2);
}
} catch (Throwable t) {
System.err.println("CAUGHT " + t);
}
If an exception is thrown here… | |
…the rest of the loop is skipped… | |
… and the execution goes straight to here. |
Now that we have established a parallel, we can look at the different error handling cases and their equivalent operators.
Static Fallback Value
The equivalent of “Catch and return a static default value” is onErrorReturn. The following example shows how to use it:
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
The following example shows the Reactor equivalent:
Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED");
You also have the option of applying a Predicate on the exception to decide whether or not to recover, as the following example shows:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
Recover only if the message of the exception is "boom10" |
Fallback Method
If you want more than a single default value and you have an alternative (safer) way of processing your data, you can use onErrorResume. This would be the equivalent of “Catch and execute an alternative path with a fallback method”.
For example, if your nominal process is fetching data from an external and unreliable service but you also keep a local cache of the same data that can be a bit more out of date but is more reliable, you could do the following:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
The following example shows the Reactor equivalent:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
For each key, asynchronously call the external service. | |
If the external service call fails, fall back to the cache for that key. Note that we always apply the same fallback, whatever the source error, e, is. |
Like onErrorReturn, onErrorResume has variants that let you filter which exceptions to fall back on, based either on the exception’s class or on a Predicate. The fact that it takes a Function also lets you choose a different fallback sequence to switch to, depending on the error encountered. The following example shows how to do so:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> {
if (error instanceof TimeoutException)
return getFromCache(k);
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error);
})
);
The function allows dynamically choosing how to continue. | |
If the source times out, hit the local cache. | |
If the source says the key is unknown, create a new entry. | |
In all other cases, “re-throw”. |
Dynamic Fallback Value
Even if you do not have an alternative (safer) way of processing your data, you might want to compute a fallback value out of the exception you received. This would be the equivalent of “Catch and dynamically compute a fallback value”.
For instance, if your return type (MyWrapper) has a variant dedicated to holding an exception (think Future.complete(T success) versus Future.completeExceptionally(Throwable error)), you could instantiate the error-holding variant and pass the exception.
An imperative example would look like the following:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
You can do this reactively in the same way as the fallback method solution, by using onErrorResume, with a tiny bit of boilerplate, as follows:
erroringFlux.onErrorResume(error -> Mono.just( MyWrapper.fromError(error) ));
Since you expect a MyWrapper representation of the error, you need to get a Mono<MyWrapper> for onErrorResume. We use Mono.just() for that. | |
We need to compute the value out of the exception. Here, we achieved that by wrapping the exception with a relevant MyWrapper factory method. |
Catch and Rethrow
"Catch, wrap to a BusinessException, and re-throw" looks like the following in the imperative world:
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
In the “fallback method” example, the last line inside the flatMap gives us a hint at achieving the same reactively, as follows:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
However, there is a more straightforward way of achieving the same effect with onErrorMap:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
Log or React on the Side
For cases where you want the error to continue propagating but still want to react to it without modifying the sequence (logging it, for instance), you can use the doOnError operator. This is the equivalent of “Catch, log an error-specific message, and re-throw” pattern, as the following example shows:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
The doOnError operator, as well as all operators prefixed with doOn , are sometimes referred to as having a “side-effect”. They let you peek inside the sequence’s events without modifying them.
Like the imperative example shown earlier, the following example still propagates the error yet ensures that we at least log that the external service had a failure:
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
The external service call that can fail… | |
…is decorated with a logging and stats side-effect… | |
…after which, it still terminates with an error, unless we use an error-recovery operator here. |
We can also imagine we have statistic counters to increment as a second error side-effect.
Using Resources and the Finally Block
The last parallel to draw with imperative programming is the cleaning up that can be done either by using a “Use of the finally block to clean up resources” or by using a “Java 7 try-with-resource construct”, both shown below:
Example 14. Imperative use of finally
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
Example 15. Imperative use of try-with-resource
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
Both have their Reactor equivalents: doFinally and using.
doFinally is about side-effects that you want to be executed whenever the sequence terminates (with onComplete or onError) or is cancelled. It gives you a hint as to what kind of termination triggered the side-effect. The following example shows how to use doFinally:
Reactive finally: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
doFinally consumes a SignalType for the type of termination. | |
Similarly to finally blocks, we always record the timing. | |
Here we also increment statistics in case of cancellation only. | |
take(1) cancels after one item is emitted. |
On the other hand, using handles the case where a Flux is derived from a resource and that resource must be acted upon whenever processing is done. In the following example, we replace the AutoCloseable interface of “try-with-resource” with a Disposable:
Example 16. The Disposable resource
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
The first lambda generates the resource. Here, we return our mock Disposable. | |
The second lambda processes the resource, returning a Flux<T>. | |
The third lambda is called when the Flux from <2> terminates or is cancelled, to clean up resources. | |
After subscription and execution of the sequence, the isDisposed atomic boolean becomes true. |
Now we can do the reactive equivalent of “try-with-resource” on it, which looks like the following:
Example 17. Reactive try-with-resource: using()
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
The first lambda generates the resource. Here, we return our mock Disposable. | |
The second lambda processes the resource, returning a Flux<T>. | |
The third lambda is called when the Flux from <2> terminates or is cancelled, to clean up resources. | |
After subscription and execution of the sequence, the isDisposed atomic boolean becomes true. |
Demonstrating the Terminal Aspect of onError
In order to demonstrate that all these operators cause the upstream original sequence to terminate when an error happens, we can use a more visual example with a Flux.interval. The interval operator ticks every x units of time with an increasing Long value. The following example uses an interval operator:
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");
flux.subscribe(System.out::println);
Thread.sleep(2100);
Note that interval executes on a timer Scheduler by default. If we want to run that example in a main class, we would need to add a sleep call here so that the application does not exit immediately without any value being produced. |
The preceding example prints out one line every 250ms, as follows:
tick 0 tick 1 tick 2 Uh oh
Even with one extra second of runtime, no more tick comes in from the interval. The sequence was indeed terminated by the error.
Retrying
There is another operator of interest with regards to error handling, and you might be tempted to use it in the case described in the previous section. retry, as its name indicates, lets you retry an error-producing sequence.
The thing to keep in mind is that it works by re-subscribing to the upstream Flux. This is really a different sequence, and the original one is still terminated. To verify that, we can re-use the previous example and append a retry(1) to retry once instead of using onErrorReturn. The following example shows how to do sl:
Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .retry(1) .elapsed() .subscribe(System.out::println, System.err::println); Thread.sleep(2100);
elapsed associates each value with the duration since previous value was emitted. | |
We also want to see when there is an onError. | |
Ensure we have enough time for our 4x2 ticks. |
The preceding example produces the following output:
259,tick 0 249,tick 1 251,tick 2 506,tick 0 248,tick 1 253,tick 2 java.lang.RuntimeException: boom
A new interval started, from tick 0. The additional 250ms duration is coming from the 4th tick, the one that causes the exception and subsequent retry. |
As you can see from the preceding example, retry(1) merely re-subscribed to the original interval once, restarting the tick from 0. The second time around, since the exception still occurs, it gives up and propagates the error downstream.
There is a more advanced version of retry (called retryWhen) that uses a “companion” Flux to tell whether or not a particular failure should retry. This companion Flux is created by the operator but decorated by the user, in order to customize the retry condition.
The companion Flux is a Flux<RetrySignal> that gets passed to a Retry strategy/function, supplied as the sole parameter of retryWhen. As the user, you define that function and make it return a new Publisher<?>. The Retry class is an abstract class, but it offers a factory method if you want to transform the companion with a simple lambda (Retry.from(Function)).
Retry cycles go as follows:
- Each time an error happens (giving potential for a retry), a RetrySignal is emitted into the companion Flux, which has been decorated by your function. Having a Flux here gives a bird eye’s view of all the attempts so far. The RetrySignal gives access to the error as well as metadata around it.
- If the companion Flux emits a value, a retry happens.
- If the companion Flux completes, the error is swallowed, the retry cycle stops, and the resulting sequence completes, too.
- If the companion Flux produces an error (e), the retry cycle stops and the resulting sequence errors with e.
The distinction between the previous two cases is important. Simply completing the companion would effectively swallow an error. Consider the following way of emulating retry(3) by using retryWhen:
Flux<String> flux = Flux .<String>error(new IllegalArgumentException()) .doOnError(System.out::println) .retryWhen(Retry.from(companion -> companion.take(3)));
This continuously produces errors, calling for retry attempts. | |
doOnError before the retry lets us log and see all failures. | |
The Retry is adapted from a very simple Function lambda | |
Here, we consider the first three errors as retry-able (take(3)) and then give up. |
In effect, the preceding example results in an empty Flux, but it completes successfully. Since retry(3) on the same Flux would have terminated with the latest error, this retryWhen example is not exactly the same as a retry(3).
Getting to the same behavior involves a few additional tricks:
AtomicInteger errorCount = new AtomicInteger(); Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) .retryWhen(Retry.from(companion -> companion.map(rs -> { if (rs.totalRetries() < 3) return rs.totalRetries(); else throw Exceptions.propagate(rs.failure()); }) ));
We customize Retry by adapting from a Function lambda rather than providing a concrete class | |
The companion emits RetrySignal objects, which bear number of retries so far and last failure | |
To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index). | |
In order to terminate the sequence in error, we throw the original exception after these three retries. |
One can use the builders exposed in Retry to achieve the same in a more fluent manner, as well as more finely tuned retry strategies. For example: errorFlux.retryWhen(Retry.max(3));. |
You can use similar code to implement an “exponential backoff and retry” pattern, as shown in the FAQ. |
The core-provided Retry helpers, RetrySpec and RetryBackoffSpec, both allow advanced customizations like:
- setting the filter(Predicate) for the exceptions that can trigger a retry
- modifying such a previously set filter through modifyErrorFilter(Function)
- triggering a side effect like logging around the retry trigger (ie for backoff before and after the delay), provided the retry is validated (doBeforeRetry() and doAfterRetry() are additive)
- triggering an asynchronous Mono<Void> around the retry trigger, which allows to add asynchronous behavior on top of the base delay but thus further delay the trigger (doBeforeRetryAsync and doAfterRetryAsync are additive)
- customizing the exception in case the maximum number of attempts has been reached, through onRetryExhaustedThrow(BiFunction). By default, Exceptions.retryExhausted(…) is used, which can be distinguished with Exceptions.isRetryExhausted(Throwable)
- activating the handling of transient errors (see below)
Transient error handling in the Retry specs makes use of RetrySignal#totalRetriesInARow(): to check whether to retry or not and to compute the retry delays, the index used is an alternative one that is reset to 0 each time an onNext is emitted. This has the consequence that if a re-subscribed source generates some data before failing again, previous failures don’t count toward the maximum number of retry attempts. In the case of exponential backoff strategy, this also means that the next attempt will be back to using the minimum Duration backoff instead of a longer one. This can be especially useful for long-lived sources that see sporadic bursts of errors (or transient errors), where each burst should be retried with its own backoff.
AtomicInteger errorCount = new AtomicInteger(); AtomicInteger transientHelper = new AtomicInteger(); Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> { int i = transientHelper.getAndIncrement(); if (i == 10) { sink.next(i); sink.complete(); } else if (i % 3 == 0) { sink.next(i); } else { sink.error(new IllegalStateException("Transient error at " + i)); } }) .doOnError(e -> errorCount.incrementAndGet()); transientFlux.retryWhen(Retry.max(2).transientErrors(true)) .blockLast(); assertThat(errorCount).hasValue(6);
We will count the number of errors in the retried sequence. | |
We generate a source that has bursts of errors. It will successfully complete when the counter reaches 10. | |
If the transientHelper atomic is at a multiple of 3, we emit onNext and thus end the current burst. | |
In other cases we emit an onError. That’s 2 out of 3 times, so bursts of 2 onError interrupted by 1 onNext. | |
We use retryWhen on that source, configured for at most 2 retry attempts, but in transientErrors mode. | |
At the end, the sequence reaches onNext(10) and completes, after 6 errors have been registered in errorCount. |
Without the transientErrors(true), the configured maximum attempt of 2 would be reached by the second burst and the sequence would fail after having emitted onNext(3).