Study

[Reactor 3 Reference Guide] 9. Advanced Features and Concepts - 2

Developer RyanKim 2021. 11. 18. 03:37

https://projectreactor.io/docs/core/release/reference/#advanced

 

Reactor 3 Reference Guide

10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 1

projectreactor.io

9. Advanced Features and Concepts
9.6 ~ 9.10

9.6. Replacing Default Schedulers

As we described in the Threading and Schedulers section, Reactor Core comes with several Scheduler implementations. While you can always create new instances through the new* factory methods, each Scheduler flavor also has a default singleton instance that is accessible through the direct factory method (such as Schedulers.boundedElastic() versus Schedulers.newBoundedElastic(…​)).

These default instances are the ones used by operators that need a Scheduler to work when you do not explicitly specify one. For example, Flux#delayElements(Duration) uses the Schedulers.parallel() instance.

In some cases, however, you might need to change these default instances with something else in a cross-cutting way, without having to make sure every single operator you call has your specific Scheduler as a parameter. An example is measuring the time every single scheduled task takes by wrapping the real schedulers, for instrumentation purposes. In other words, you might want to change the default Schedulers.

Changing the default schedulers is possible through the Schedulers.Factory class. By default, a Factory creates all the standard Scheduler through similarly named methods. You can override each of these with your custom implementation.

Additionally, the factory exposes one additional customization method: decorateExecutorService. It is invoked during the creation of every Reactor Core Scheduler that is backed by a ScheduledExecutorService (even non-default instances, such as those created by calls to Schedulers.newParallel()).

This lets you tune the ScheduledExecutorService to be used: The default one is exposed as a Supplier and, depending on the type of Scheduler being configured, you can choose to entirely bypass that supplier and return your own instance or you can get() the default instance and wrap it.

Once you create a Factory that fits your needs, you must install it by calling Schedulers.setFactory(Factory).

Finally, there is a last customizable hook in Schedulers: onHandleError. This hook is invoked whenever a Runnable task submitted to a Scheduler throws an Exception (note that if there is an UncaughtExceptionHandler set for the Thread that ran the task, both the handler and the hook are invoked).

9.7. Using Global Hooks

Reactor has another category of configurable callbacks that are invoked by Reactor operators in various situations. They are all set in the Hooks class, and they fall into three categories:

9.7.1. Dropping Hooks

Dropping hooks are invoked when the source of an operator does not comply with the Reactive Streams specification. These kind of errors are outside of the normal execution path (that is, they cannot be propagated through onError).

Typically, a Publisher calls onNext on the operator despite having already called onCompleted on it previously. In that case, the onNext value is dropped. The same is true for an extraneous onError signal.

The corresponding hooks, onNextDropped and onErrorDropped, let you provide a global Consumer for these drops. For example, you can use it to log the drop and clean up resources associated with a value if needed (as it never makes it to the rest of the reactive chain).

Setting the hooks twice in a row is additive: every consumer you provide is invoked. The hooks can be fully reset to their defaults by using the Hooks.resetOn*Dropped() methods.

9.7.2. Internal Error Hook

One hook, onOperatorError, is invoked by operators when an unexpected Exception is thrown during the execution of their onNext, onError, and onComplete methods.

Unlike the previous category, this is still within the normal execution path. A typical example is the map operator with a map function that throws an Exception (such as division by zero). It is still possible at this point to go through the usual channel of onError, and that is what the operator does.

First, it passes the Exception through onOperatorError. The hook lets you inspect the error (and the incriminating value, if relevant) and change the Exception. Of course, you can also do something on the side, such as log and return the original Exception.

Note that you can set the onOperatorError hook multiple times. You can provide a String identifier for a particular BiFunction and subsequent calls with different keys concatenates the functions, which are all executed. On the other hand, reusing the same key twice lets you replace a function you previously set.

As a consequence, the default hook behavior can be both fully reset (by using Hooks.resetOnOperatorError()) or partially reset for a specific key only (by using Hooks.resetOnOperatorError(String)).

9.7.3. Assembly Hooks

These hooks tie in the lifecycle of operators. They are invoked when a chain of operators is assembled (that is, instantiated). onEachOperator lets you dynamically change each operator as it is assembled in the chain, by returning a different Publisher. onLastOperator is similar, except that it is invoked only on the last operator in the chain before the subscribe call.

If you want to decorate all operators with a cross-cutting Subscriber implementation, you can look into the Operators#lift* methods to help you deal with the various types of Reactor Publishers out there (Flux, Mono, ParallelFlux, GroupedFlux, and ConnectableFlux), as well as their Fuseable versions.

Like onOperatorError, these hooks are cumulative and can be identified with a key. They can also be reset partially or totally.

9.7.4. Hook Presets

The Hooks utility class provides two preset hooks. These are alternatives to the default behaviors that you can use by calling their corresponding method, rather than coming up with the hook yourself:

  • onNextDroppedFail(): onNextDropped used to throw a Exceptions.failWithCancel() exception. It now defaults to logging the dropped value at the DEBUG level. To go back to the old default behavior of throwing, use onNextDroppedFail().
  • onOperatorDebug(): This method activates debug mode. It ties into the onOperatorError hook, so calling resetOnOperatorError() also resets it. You can independently reset it by using resetOnOperatorDebug(), as it uses a specific key internally.

9.8. Adding a Context to a Reactive Sequence

One of the big technical challenges encountered when switching from an imperative programming perspective to a reactive programming mindset lies in how you deal with threading.

Contrary to what you might be used to, in reactive programming, you can use a Thread to process several asynchronous sequences that run at roughly the same time (actually, in non-blocking locksteps). The execution can also easily and often jump from one thread to another.

This arrangement is especially hard for developers that use features dependent on the threading model being more “stable,” such as ThreadLocal. As it lets you associate data with a thread, it becomes tricky to use in a reactive context. As a result, libraries that rely on ThreadLocal at least introduce new challenges when used with Reactor. At worst, they work badly or even fail. Using the MDC of Logback to store and log correlation IDs is a prime example of such a situation.

The usual workaround for ThreadLocal usage is to move the contextual data, C, along your business data, T, in the sequence, by using (for instance) Tuple2<T, C>. This does not look good and leaks an orthogonal concern (the contextual data) into your method and Flux signatures.

Since version 3.1.0, Reactor comes with an advanced feature that is somewhat comparable to ThreadLocal but can be applied to a Flux or a Mono instead of a Thread. This feature is called Context.

As an illustration of what it looks like, the following example both reads from and writes to Context:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();

In the following sections, we cover Context and how to use it, so that you can eventually understand the preceding example.

  This is an advanced feature that is more targeted at library developers. It requires good understanding of the lifecycle of a Subscription and is intended for libraries that are responsible for the subscriptions.

 

9.8.1. The Context API

Context is an interface reminiscent of Map.It stores key-value pairs and lets you fetch a value you stored by its key. It has a simplified version that only exposes read methods, the ContextView. More specifically:

  • Both key and values are of type Object, so a Context (and ContextView) instance can contain any number of highly divergent values from different libraries and sources.
  • A Context is immutable. It expose write methods like put and putAll but they produce a new instance.
  • For a read-only API that doesn’t even expose such write methods, there’s the ContextView superinterface since 3.4.0
  • You can check whether the key is present with hasKey(Object key).
  • Use getOrDefault(Object key, T defaultValue) to retrieve a value (cast to a T) or fall back to a default one if the Context instance does not have that key.
  • Use getOrEmpty(Object key) to get an Optional<T> (the Context instance attempts to cast the stored value to T).
  • Use put(Object key, Object value) to store a key-value pair, returning a new Context instance. You can also merge two contexts into a new one by using putAll(ContextView).
  • Use delete(Object key) to remove the value associated to a key, returning a new Context.
 
When you create a Context, you can create pre-valued Context instances with up to five key-value pairs by using the static Context.of methods. They take 2, 4, 6, 8 or 10 Object instances, each couple of Object instances being a key-value pair to add to the Context.
Alternatively you can also create an empty Context by using Context.empty().

 

9.8.2. Tying a Context to a Flux and Writing

To make a Context be useful, it must be tied to a specific sequence and be accessible by each operator in a chain. Note that the operator must be a Reactor-native operator, as Context is specific to Reactor.

Actually, a Context is tied to each Subscriber in a chain. It uses the Subscription propagation mechanism to make itself available to each operator, starting with the final subscribe and moving up the chain.

In order to populate the Context, which can only be done at subscription time, you need to use the contextWrite operator.

contextWrite(ContextView) merges the ContextView you provide and the Context from downstream (remember, the Context is propagated from the bottom of the chain towards the top). This is done through a call to putAll, resulting in a NEW Context for upstream.

  You can also use the more advanced contextWrite(Function<Context, Context>). It receives a copy of the Context from downstream, lets you put or delete values as you see fit, and returns the new Context to use. You can even decide to return a completely different instance, although it is really not recommended (doing so might impact third-party libraries that depend on the Context).

9.8.3. Reading a Context, through the ContextView

Once you have populated a Context, you may want to peek into it at runtime. Most of the time, the responsibility of putting information into the Context is on the end user’s side, while exploiting that information is on the third-party library’s side, as such libraries are usually upstream of the client code.

The read oriented operators allow to obtain data from the Context in a chain of operators by exposing its ContextView:

  • to access the context from a source-like operator, use deferContextual factory method
  • to access the context from the middle of an operator chain, use transformDeferredContextual(BiFunction)
  • alternatively, when dealing with an inner sequence (like inside a flatMap), the ContextView can be materialized using Mono.deferContextual(Mono::just). Usually though, you might want to perform meaningful work directly within the defer’s lambda, eg. Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key))) where v is the value being flatMapped.
  In order to read from the Context without misleading users into thinking one can write to it while data is running through the pipeline, only the ContextView is exposed by the operators above.

 

9.8.4. Simple Context Examples

The examples in this section are meant as ways to better understand some of the caveats of using a Context.

We first look back at our simple example from the introduction in a bit more detail, as the following example shows:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key)))) 
    .contextWrite(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello World") 
            .verifyComplete();
  The chain of operators ends with a call to contextWrite(Function) that puts "World" into the Context under a key of "message".
  We flatMap on the source element, materializing the ContextView with Mono.deferContextual() and directly extract the data associated to "message" and concatenate that with the original word.
  The resulting Mono<String> emits "Hello World".
  The numbering above versus the actual line order is not a mistake. It represents the execution order. Even though contextWrite is the last piece of the chain, it is the one that gets executed first (due to its subscription-time nature and the fact that the subscription signal flows from bottom to top).
  In your chain of operators, the relative positions of where you write to the Context and where you read from it matters. The Context is immutable and its content can only be seen by operators above it, as demonstrated in the following example:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) 
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); 

StepVerifier.create(r)
            .expectNext("Hello Stranger") 
            .verifyComplete();
  The Context is written to too high in the chain.
  As a result, in the flatMap, there is no value associated with our key. A default value is used instead.
  The resulting Mono<String> thus emits "Hello Stranger".

Similarly, in the case of several attempts to write the same key to the Context, the relative order of the writes matters, too. Operators that read the Context see the value that was set closest to being under them, as demonstrated in the following example:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
    .contextWrite(ctx -> ctx.put(key, "Reactor")) 
    .contextWrite(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello Reactor") 
            .verifyComplete();
  A write attempt on key "message".
  Another write attempt on key "message".
  The deferContextual only saw the value set closest to it (and below it): "Reactor".

In the preceding example, the Context is populated with "World" during subscription. Then the subscription signal moves upstream and another write happens. This produces a second immutable Context with a value of "Reactor". After that, data starts flowing. The deferContextual sees the Context closest to it, which is our second Context with the "Reactor" value (exposed to the user as a ContextView).

You might wonder if the Context is propagated along with the data signal. If that was the case, putting another flatMap between these two writes would use the value from the top Context. But this is not the case, as demonstrated by the following example:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) 
    .contextWrite(ctx -> ctx.put(key, "Reactor")) 
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.get(key)))) 
    .contextWrite(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello Reactor World") 
            .verifyComplete();
  This is the first write to happen.
  This is the second write to happen.
  The top context read sees second write.
  The flatMap concatenates the result from initial read with the value from the first write.
  The Mono emits "Hello Reactor World".

The reason is that the Context is associated to the Subscriber and each operator accesses the Context by requesting it from its downstream Subscriber.

One last interesting propagation case is the one where the Context is also written to inside a flatMap, as in the following example:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor")) 
    )
    .contextWrite(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
  This contextWrite does not impact anything outside of its flatMap.
  This contextWrite impacts the main sequence’s Context.

In the preceding example, the final emitted value is "Hello World Reactor" and not "Hello Reactor World", because the contextWrite that writes "Reactor" does so as part of the inner sequence of the second flatMap. As a consequence, it is not visible or propagated through the main sequence and the first flatMap does not see it. Propagation and immutability isolate the Context in operators that create intermediate inner sequences such as flatMap.

9.8.5. Full Example

Now we can consider a more real life example of a library reading information from the Context: a reactive HTTP client that takes a Mono<String> as the source of data for a PUT but also looks for a particular Context key to add a correlation ID to the request’s headers.

From the user perspective, it is called as follows:

doPut("www.example.com", Mono.just("Walter"))

In order to propagate a correlation ID, it would be called as follows:

doPut("www.example.com", Mono.just("Walter"))
	.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))

As the preceding snippets show, the user code uses contextWrite to populate a Context with an HTTP_CORRELATION_ID key-value pair. The upstream of the operator is a Mono<Tuple2<Integer, String>> (a simplistic representation of an HTTP response) returned by the HTTP client library. So it effectively passes information from the user code to the library code.

The following example shows mock code from the library’s perspective that reads the context and “augments the request” if it can find the correlation ID:

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.deferContextual(c -> 
          Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) 
      );

  return dataAndContext.<String>handle((dac, sink) -> {
      if (dac.getT2().isPresent()) { 
        sink.next("PUT <" + dac.getT1() + "> sent to " + url +
            " with header X-Correlation-ID = " + dac.getT2().get());
      }
      else {
        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
      }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}
  Materialize the ContextView through Mono.deferContextual and…​
  within the defer, extract a value for the correlation ID key, as an Optional.
  If the key was present in the context, use the correlation ID as a header.

The library snippet zips the data Mono with Mono.deferContextual(Mono::just). This gives the library a Tuple2<String, ContextView>, and that context contains the HTTP_CORRELATION_ID entry from downstream (as it is on the direct path to the subscriber).

The library code then uses map to extract an Optional<String> for that key, and, if the entry is present, it uses the passed correlation ID as a X-Correlation-ID header. That last part is simulated by the handle.

The whole test that validates the library code used the correlation ID can be written as follows:

@Test
public void contextForLibraryReactivePut() {
  Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
      .contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
      .filter(t -> t.getT1() < 300)
      .map(Tuple2::getT2);

  StepVerifier.create(put)
              .expectNext("PUT <Walter> sent to www.example.com" +
                  " with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
              .verifyComplete();
}

 

9.9. Dealing with Objects that Need Cleanup

In very specific cases, your application may deal with types that necessitate some form of cleanup once they are no longer in use. This is an advanced scenario — for, example when you have reference-counted objects or when you deal with off-heap objects. Netty’s ByteBuf is a prime example of both.

In order to ensure proper cleanup of such objects, you need to account for it on a Flux-by-Flux basis, as well as in several of the global hooks (see Using Global Hooks):

  • The doOnDiscard Flux/Mono operator
  • The onOperatorError hook
  • The onNextDropped hook
  • Operator-specific handlers

This is needed because each hook is made with a specific subset of cleanup in mind, and users might want (for example) to implement specific error-handling logic in addition to cleanup logic within onOperatorError.

Note that some operators are less adapted to dealing with objects that need cleanup. For example, bufferWhen can introduce overlapping buffers, and that means that the discard “local hook” we used earlier might see a first buffer as being discarded and cleanup an element in it that is in a second buffer, where it is still valid.

  For the purpose of cleaning up, all these hooks MUST be IDEMPOTENT. They might on some occasions get applied several times to the same object. Unlike the doOnDiscard operator, which performs a class-level instanceOf check, the global hooks are also dealing with instances that can be any Object. It is up to the user’s implementation to distinguish between which instances need cleanup and which do not.

9.9.1. The doOnDiscard Operator or Local Hook

This hook has been specifically put in place for cleanup of objects that would otherwise never be exposed to user code. It is intended as a cleanup hook for flows that operate under normal circumstances (not malformed sources that push too many items, which is covered by onNextDropped).

It is local, in the sense that it is activated through an operator and applies only to a given Flux or Mono.

Obvious cases include operators that filter elements from upstream. These elements never reach the next operator (or final subscriber), but this is part of the normal path of execution. As such, they are passed to the doOnDiscard hook. Examples of when you might use the doOnDiscard hook include the following:

  • filter: Items that do not match the filter are considered to be “discarded.”
  • skip: Skipped items are discarded.
  • buffer(maxSize, skip) with maxSize < skip: A “dropping buffer” — items in between buffers are discarded.

But doOnDiscard is not limited to filtering operators, and is also used by operators that internally queue data for backpressure purposes. More specifically, most of the time, this is important during cancellation. An operator that prefetches data from its source and later drains to its subscriber upon demand could have un-emitted data when it gets cancelled. Such operators use the doOnDiscard hook during cancellation to clear up their internal backpressure Queue.

  Each call to doOnDiscard(Class, Consumer) is additive with the others, to the extent that it is visible and used by only operators upstream of it.

9.9.2. The onOperatorError hook

The onOperatorError hook is intended to modify errors in a transverse manner (similar to an AOP catch-and-rethrow).

When the error happens during the processing of an onNext signal, the element that was being emitted is passed to onOperatorError.

If that type of element needs cleanup, you need to implement it in the onOperatorError hook, possibly on top of error-rewriting code.

9.9.3. The onNextDropped Hook

With malformed Publishers, there could be cases where an operator receives an element when it expected none (typically, after having received the onError or onComplete signals). In such cases, the unexpected element is “dropped” — that is, passed to the onNextDropped hook. If you have types that need cleanup, you must detect these in the onNextDropped hook and implement cleanup code there as well.

9.9.4. Operator-specific Handlers

Some operators that deal with buffers or collect values as part of their operations have specific handlers for cases where collected data is not propagated downstream. If you use such operators with the type(s) that need cleanup, you need to perform cleanup in these handlers.

For example, distinct has such a callback that is invoked when the operator terminates (or is cancelled) in order to clear the collection it uses to judge whether an element is distinct or not. By default, the collection is a HashSet, and the cleanup callback is a HashSet::clear. However, if you deal with reference-counted objects, you might want to change that to a more involved handler that would release each element in the set before calling clear() on it.

9.10. Null Safety

Although Java does not allow expressing null-safety with its type system, Reactor now provides annotations to declare nullability of APIs, similar to those provided by Spring Framework 5.

Reactor uses these annotations, but they can also be used in any Reactor-based Java project to declare null-safe APIs. Nullability of the types used inside method bodies is outside of the scope of this feature.

These annotations are meta-annotated with JSR 305 annotations (a dormant JSR that is supported by tools such as IntelliJ IDEA) to provide useful warnings to Java developers related to null-safety in order to avoid NullPointerException at runtime. JSR 305 meta-annotations let tooling vendors provide null safety support in a generic way, without having to hard-code support for Reactor annotations.

  It is not necessary nor recommended with Kotlin 1.1.5+ to have a dependency on JSR 305 in your project classpath.

They are also used by Kotlin, which natively supports null safety. See this dedicated section for more details.

The following annotations are provided in the reactor.util.annotation package:

  • @NonNull: Indicates that a specific parameter, return value, or field cannot be null. (It is not needed on parameters and return values where @NonNullApi applies) .
  • @Nullable: Indicates that a parameter, return value, or field can be null.
  • @NonNullApi: Package-level annotation that indicates non-null is the default behavior for parameters and return values.
  Nullability for generic type arguments, variable arguments, and array elements is not yet supported. See issue #878 for up-to-date information.