본문 바로가기
Study

[Reactor 3 Reference Guide] 4. Reactor Core Features

by Developer RyanKim 2021. 10. 13.

Flux

: Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext,onComplete, and onError methods.
Flux는 표준 Publisher 이다. 0 to N 까지 비동기 시퀀스 아이템 (1)을 나타내며
완료(2)/오류(3) 신호에의해 선택적으로 종료된다.
이 3가지 신호는 Subscriber의 onNext, onComplete, and onError method 호출로 변환된다.

 

Mono

: Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).


Mono는 특화된 Publisher 이다. onNext 를 통해 최대 1개의 item을 방출하고 (값 포함 또는 미포함), OnComplete로 종료되거나 onError 신호(Mono 실패)만 방출한다.

 

Flux.generate() : 동기 
Flux.create() : 비동기 및 멀티 스레드

Flux.push() : 비동기 및 싱글 스레드

Scheduler: Reactor에서 실행 모델 및 실행 장소는 사용되는 스케줄러에 의해 결정


4. Reactor Core Features

The Reactor project main artifact is reactor-core, a reactive library that focuses on the Reactive Streams specification and targets Java 8.

 

Reactor introduces composable reactive types that implement Publisher but also provide a rich vocabulary of operators: Flux and Mono. A Flux object represents a reactive sequence of 0..N items, while a Mono object represents a single-value-or-empty (0..1) result.

Reactor는 Publisher를 구현하며 풍부한 오퍼레이션을 제공하는 Flux / Mono 를 도입한다.
Flux 는 0..N 개의 아이템을 담고, Mono 는 단일 값 또는 비어 있는(0.1) 결과를 담는다.

 

This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
이러한 구분은 비동기 처리에서 대략적인 카디널리티 정보를 제공한다.

For instance, an HTTP request produces only one response, so there is not much sense in doing a count operation. Expressing the result of such an HTTP call as a Mono<HttpResponse> thus makes more sense than expressing it as a Flux<HttpResponse>, as it offers only operators that are relevant to a context of zero items or one item.

예를들어 HTTP 요청은 하나의 응답만 생성하므로 카운트 작업을 수행하는 데 큰 의미가 없다.
따라서 HTTP 호출의 결과를 Mono<HttpResponse>로 표현하는 것이 Flux<HttpResponse>로 표현하는 것보다 더 타당하다.
항목이 0이거나 하나의 컨텍스트와 관련된 연산자만 제공하기 때문이다.

Operators that change the maximum cardinality of the processing also switch to the relevant type. For instance, the count operator exists in Flux, but it returns a Mono<Long>.
Operator 프로세싱 과정에서도 유형이 변환된다.
예를들어 count Operator 는 Flux에 있지만, 결과는 Mono를 반환한다.

4.1. Flux, an Asynchronous Sequence of 0-N Items

The following image shows how a Flux transforms items:

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

 

With this large scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove the onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.

4.2. Mono, an Asynchronous 0-1 Result

The following image shows how a Mono transforms an item:

A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

 

Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext. Mono.never() is an outlier: it doesn’t emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.
대부분의 모노 구현은 Subscriber 에서 즉시 onComplete 를 호출할 것으로 예상한다.
Mono.never()는 신호를 내보내지 않기 때문에 기술적으로 금지된 신호는 아니지만 테스트 밖에서는 그다지 유용하지 않다.
반면 onNext와 onError의 조합은 명시적으로 금지된다.

 

Mono offers only a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.

 

Note that you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one, you can use an empty Mono<Void>.

Mono를 사용하여 완료 개념(Runnable과 유사)만 있는 값이 없는 비동기 프로세스를 나타낼 수 있다.

빈 Mono<Void>를 사용하여 만들 수 있다.

4.3.1. subscribe Method Examples

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

 

        val ints = Flux.range(1, 4)
            .map { i: Int ->
                if (i <= 4) return@map i
                throw RuntimeException("Got to 4")
            }
        ints.subscribe(
            { i: Int? -> println(i) },
            { error: Throwable -> System.err.println("Error: $error") }
        ) { println("DONE")}
  Set up a Flux that produces four values when a subscriber attaches.
  Subscribe with a Subscriber that includes a handler for completion events.

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

4.3.2. Cancelling a subscribe() with Its Disposable

All these lambda-based variants of subscribe() have a Disposable return type. In this case, the Disposable interface represents the fact that the subscription can be cancelled, by calling its dispose() method.

Lambda-based  subscribe()에는 모두 Disposable(일회용) return type 이 있다.
Disposable 을 통해 구독을 취소할수 있다.

 

For a Flux or Mono, cancellation is a signal that the source should stop producing elements. However, it is NOT guaranteed to be immediate: Some sources might produce elements so fast that they could complete even before receiving the cancel instruction.
Flux or Mono의 입장에서 취소는 소스의 생산을 중단해야한다는 신호다.
하지만 즉시 보장되지는 않는다. 일부소스는 취소전에 너무 빨리 생성할 수 있다.

 

Some utilities around Disposable are available in the Disposables class. Among these, Disposables.swap() creates a Disposable wrapper that lets you atomically cancel and replace a concrete Disposable. This can be useful, for instance, in a UI scenario where you want to cancel a request and replace it with a new one whenever the user clicks on a button. Disposing the wrapper itself closes it. Doing so disposes the current concrete value and all future attempted replacements.

 

Another interesting utility is Disposables.composite(…​). This composite lets you collect several Disposable — for instance, multiple in-flight requests associated with a service call — and dispose all of them at once later on. Once the composite’s dispose() method has been called, any attempt to add another Disposable immediately disposes it.


swap() : Disposable 을 원자적으로 취소하고 교체할 수있는 wrapper를 생성한다.
composite() : 여러개의 Disposable을 수집하여 한번에 처리할 수 있다.

4.3.3. An Alternative to Lambdas: BaseSubscriber

There is an additional subscribe method that is more generic and takes a full-blown Subscriber rather than composing one out of lambdas. In order to help with writing such a Subscriber, we provide an extendable class called BaseSubscriber.

  Instances of BaseSubscriber (or subclasses of it) are single-use, meaning that a BaseSubscriber cancels its subscription to the first Publisher if it is subscribed to a second Publisher. That is because using an instance twice would violate the Reactive Streams rule that the onNext method of a Subscriber must not be called in parallel. As a result, anonymous implementations are fine only if they are declared directly within the call to Publisher#subscribe(Subscriber).

BaseSubscriber(또는 해당 하위 클래스)의 Instance는 single-use 다.
즉, BaseSubscriber가 두 번째 Publisher(2) 에 subscribe 한 경우 첫 번째 Publisher(1)에 대한 구독을 취소한다.
그 이유는 인스턴스를 두 번 사용하면 Subscriber의 onNext 메서드를 병렬로 호출해서는 안 된다는 Reactive Stream 규칙을 위반할 수 있기 때문이다.
따라서 익명 구현은 Publisher#subscribe(Subscriber) 대한 호출 내에서 직접 선언되는 경우에만 가능하다.

    @Test
    fun baseSubscriber(){
        val ss = SampleSubscriber<Int>()
        val ints = Flux.range(1, 4)
        ints.subscribe(ss);
    }


class SampleSubscriber<T> : BaseSubscriber<T>() {
    public override fun hookOnSubscribe(subscription: Subscription) {
        println("Subscribed")
        request(1)
    }

    public override fun hookOnNext(value: T) {
        println(value)
        request(1)
    }
}

The SampleSubscriber class extends BaseSubscriber, which is the recommended abstract class for user-defined Subscribers in Reactor. The class offers hooks that can be overridden to tune the subscriber’s behavior. By default, it triggers an unbounded request and behaves exactly as subscribe(). However, extending BaseSubscriber is much more useful when you want a custom request amount.

For a custom request amount, the bare minimum is to implement hookOnSubscribe(Subscription subscription) and hookOnNext(T value), as we did. In our case, the hookOnSubscribe method prints a statement to standard out and makes the first request. Then the hookOnNext method prints a statement and performs additional requests, one request at a time.


hookOnSubscribe hookOnNext
requestUnbounded hookOnComplete hookOnError hookOnCancel hookFinally
위 메소드의 오버라이드를 통해 최소한의 구현으로 개발 가능!

  When manipulating a request, you must be careful to produce enough demand for the sequence to advance, or your Flux can get “stuck”. That is why BaseSubscriber defaults to an unbounded request in hookOnSubscribe. When overriding this hook, you should usually call request at least once.

요청을 조작할 때는 시퀀스가 진행되도록 충분한 수요를 창출하도록 주의해야 한다.
그렇지 않으면 Flux가 "stuck" 될 수 있다.
그렇기 때문에 BaseSubscriber는 기본적으로 hookOnSubscribe에서 제한없는 요청을 사용한다.
이 hook을 overriding 할 때는 일반적으로 request 을 한 번 이상 호출해야 한다.


Operators that Change the Demand from Downstream

One thing to keep in mind is that demand expressed at the subscribe level can be reshaped by each operator in the upstream chain. A textbook case is the buffer(N) operator: If it receives a request(2), it is interpreted as a demand for two full buffers. As a consequence, since buffers need N elements to be considered full, the buffer operator reshapes the request to 2 x N.

한 가지 유의해야 할 점은 subscribe 수준에서 표현된 demand 는 업스트림 체인의 각 operator에 의해 재편될 수 있다는 것이다.
정확한 예시 -  buffer(N) operator: request(2)을 수신하는 경우, 두 개의 전체 버퍼에 대한 요구로 해석된다.
결과적으로 버퍼는 N개의 요소로 가득 찬 것으로 간주해야 하므로, buffer operator 는 request 를 2 x N으로 재구성한다.

Prefetch is a way to tune the initial request made on these inner sequences. If unspecified, most of these operators start with a demand of 32.

 

These operators usually also implement a replenishing optimization: Once the operator has seen 75% of the prefetch request fulfilled, it re-requests 75% from upstream. This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.

 

Finally, a couple of operators let you directly tune the request: limitRate and limitRequest.

 

limitRate(N) splits the downstream requests so that they are propagated upstream in smaller batches. For instance, a request of 100 made to limitRate(10) would result in, at most, 10 requests of 10 being propagated to the upstream. Note that, in this form, limitRate actually implements the replenishing optimization discussed earlier.

 

The operator has a variant that also lets you tune the replenishing amount (referred to as the lowTide in the variant): limitRate(highTide, lowTide). Choosing a lowTide of 0 results in strict batches of highTide requests, instead of batches further reworked by the replenishing strategy.

 

limitRequest(N), on the other hand, caps the downstream request to a maximum total demand. It adds up requests up to N. If a single request does not make the total demand overflow over N, that particular request is wholly propagated upstream. After that amount has been emitted by the source, limitRequest considers the sequence complete, sends an onComplete signal downstream, and cancels the source.

limitRate :

limitRequest : 


4.4. Programmatically creating a sequence

In this section, we introduce the creation of a Flux or a Mono by programmatically defining its associated events (onNext, onError, and onComplete). All these methods share the fact that they expose an API to trigger the events that we call a sink. There are actually a few sink variants, which we’ll get to shortly.

4.4.1. Synchronous generate

The simplest form of programmatic creation of a Flux is through the generate method, which takes a generator function.

This is for synchronous and one-by-one emissions, meaning that the sink is a SynchronousSink and that its next() method can only be called at most once per callback invocation. You can then additionally call error(Throwable) or complete(), but this is optional.

The most useful variant is probably the one that also lets you keep a state that you can refer to in your sink usage to decide what to emit next. The generator function then becomes a BiFunction<S, SynchronousSink<T>, S>, with <S> the type of the state object. You have to provide a Supplier<S> for the initial state, and your generator function now returns a new state on each round.

 

4.4.2. Asynchronous and Multi-threaded: create

create is a more advanced form of programmatic creation of a Flux which is suitable for multiple emissions per round, even from multiple threads.

It exposes a FluxSink, with its next, error, and complete methods. Contrary to generate, it doesn’t have a state-based variant. On the other hand, it can trigger multi-threaded events in the callback.

  create can be very useful to bridge an existing API with the reactive world - such as an asynchronous API based on listeners.
  create doesn’t parallelize your code nor does it make it asynchronous, even though it can be used with asynchronous APIs. If you block within the create lambda, you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn, there’s the caveat that a long-blocking create lambda (such as an infinite loop calling sink.next(t)) can lock the pipeline: the requests would never be performed due to the loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false) variant: requestOnSeparateThread = false will use the Scheduler thread for the create and still let data flow by performing request in the original thread.

Imagine that you use a listener-based API. It processes data by chunks and has two events: (1) a chunk of data is ready and (2) the processing is complete (terminal event), as represented in the MyEventListener interface:

interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }

You can use create to bridge this into a Flux<T>:

Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); });

  Bridge to the MyEventListener API
  Each element in a chunk becomes an element in the Flux.
  The processComplete event is translated to onComplete.
  All of this is done asynchronously whenever the myEventProcessor executes.

Additionally, since create can bridge asynchronous APIs and manages backpressure, you can refine how to behave backpressure-wise, by indicating an OverflowStrategy:

  • IGNORE to Completely ignore downstream backpressure requests. This may yield IllegalStateException when queues get full downstream.
  • ERROR to signal an IllegalStateException when the downstream can’t keep up.
  • DROP to drop the incoming signal if the downstream is not ready to receive it.
  • LATEST to let downstream only get the latest signals from upstream.
  • BUFFER (the default) to buffer all signals if the downstream can’t keep up. (this does unbounded buffering and may lead to OutOfMemoryError).
  Mono also has a create generator. The MonoSink of Mono’s create doesn’t allow several emissions. It will drop all signals after the first one.


A hybrid push/pull model

Most Reactor operators, like create, follow a hybrid push/pull model. What we mean by that is that despite most of the processing being asynchronous (suggesting a push approach), there is a small pull component to it: the request.

The consumer pulls data from the source in the sense that it won’t emit anything until first requested.

 

The source pushes data to the consumer whenever it becomes available, but within the bounds of its requested amount.

 

Note that push() and create() both allow to set up an onRequest consumer in order to manage the request amount and to ensure that data is pushed through the sink only when there is pending request.

 

4.5. Threading and Schedulers

Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

Obtaining a Flux or a Mono does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made. The following example runs a Mono in a new thread:

Flux 또는 Mono 가 반드시 전용 스레드에서 실행되는 것은 아니다. 대신, 대부분의 연산자는 이전 연산자가 실행한 스레드에서 계속 작업한다. 지정하지 않는 한, 최상위 연산자(소스) 자체는 subscribe() 호출이 수행된 스레드에서 실행한다. 다음 예제는 새 스레드에서 모노를 실행한다.

 

subscribeOn은 하위 체인이 구성될 때 구독 프로세스에 적용됩니다. 따라서 체인의 어디에 서브스크립트On을 배치하든 상관없이 항상 소스 배출의 컨텍스트에 영향을 미칩니다. 그러나 publishOn에 대한 후속 호출의 동작에는 영향을 미치지 않으며, 이후 체인의 부분에 대한 실행 컨텍스트를 계속 전환합니다.

 

 

PublishOn은 가입자 체인의 중간에 있는 다른 운영자와 동일한 방식으로 적용됩니다. 연결된 스케줄러의 작업자에 대해 콜백을 실행하는 동안 업스트림에서 신호를 가져와서 다운스트림으로 재생합니다. 따라서 다음과 같이 후속 연산자가 실행되는 위치에 영향을 미칩니다(다른 publishOn이 체인될 때까지).

댓글