[Reactor] 상황별 사용해야하는 Operator 정리

2023년 02월 16일

TOC

해당 게시글은 projectreactor.io에서 제공하는 문서를 번역한 글입니다. 각각의 오퍼레이터들의 확실한 동작에 대해 살펴보기보다 “이런 상황에는 이런 메서드를 사용해야하는구나!”, “이런 메서드들도 존재하구나!”와 같은 배움을 얻기 위해 문서를 읽으며 간단히 번역해봤습니다. 아직 사용해보지 못한 오퍼레이터들이 많아 번역한 글이 미흡할 수 있는 점 양해부탁드립니다.

Creating a New Sequence…

  • T 타입을 방출하는 시퀀스 생성: just()

    • Optional<T>로부터 생성: Mono#justOrEmpty(Optional<T>)
    • null이 될 수 있는 T로부터 생성: Mono#justOrEmpty(T)
    • T 타입을 방출하는 메서드로부터 생성할 때도 사용 가능하다.
    • 데이터를 Lazy하게 처리하고 싶다면 Mono#fromSupplier 또는 just 안에서 defer를 사용
    • 여러개의 T를 방출: Flux#just(T…)
  • Iterate 관련

    • 배열: Flux#fromArray
    • Collection, Iterable: Flux#fromIterable
    • 범위 내의 정수: Flux#range
    • 각각의 Subscription에 제공된 Stream: Flux#fromStream(Supplier<Stream>)
  • 다양한 단일 값

    • SupplierMono#fromSupplier
    • task: Mono#fromCallableMono#fromRunnable
    • CompletableFutureMono#fromFuture
  • 비어있는 Sequence생성: empty
  • 에러 생성: error

    • Throwable을 Lazy하게 처리하고 싶다면: error(Supplier<Throwable>)
  • 아무것도 하지 않는 Sequence생성: never
  • 구독시점에 정해지는 Sequence생성: defer
  • disposable 자원에 의존하는 Sequence를 생성: using
  • 프로그래밍 방식으로 이벤트를 생성

    • 동기적으로 1개씩: Flux#generate
    • 비동기적으로(동기적도 가능), 한번에 여러개씩: create (Mono의 경우 여러개씩 방출은 안된다.)

Transforming an Existing Sequence

  • 기존 데이터를 변형할 경우

    • 1:1로 변형 : map
    • 단순히 캐스팅만 할 경우 : cast
    • 각 소스 값의 index를 구체화 : Flux#index
    • 1:N으로 변형 : flatMap + 팩터리 메서드
    • 프로그래밍 방식으로 1:N 각 source element 또는 state로 변형 : handle
    • 각각의 source item에 대해 비동기로 작업을 수행할 경우(e.g. urls to http request): flatMap + 비동기 publisher를 반환하는 메서드
    • 일부 데이터를 무시할 경우: 무시할 데이터가 해당하는 조건에 대해 flatMap의 람다식에서 Mono.empty()를 반환
    • 원래 sequence 순서를 유지할 경우 : Flux#flatMapSequential (비동기 프로세스들을 받을 때마다 바로 트리거하지만 결과를 재정렬하며 순서룰 유지한다.)
    • Mono source이고 비동기 작업이 여러 값을 반환할 경우: Mono#flatMapMany
  • 기존 sequence에 요소를 추가하고 싶은 경우

    • 시작 부분에 추가할 경우: Flux#startWith(T...)
    • 끝 부분에 추가할 경우: Flux#concatWith(T...)
  • Flux를 집계할 경우

    • List로 : Flux#collectListFlux#collectSortedList(Mono<List<T>>로 변환되어 반환된다)
    • Map으로 : Flux#collectMapFlux#collectMultiMap (각각 Mono<Map<K, T>>, Mono<Map<K, Collection>>로 변환되어 반환된다.)
    • 임의의 container로 : Flux#collect
    • sequence 크기 반환: Flux#count
    • 각각의 element사이에 function을 적용할 경우: Flux#reduce
    • 중간에 값을 방출할 경우 : Flux#scan
    • predicate을 통해 boolean값
    • 모든 값에 대해 (AND) : Flux#all
    • 최소한 한개 값에 (OR) : Flux#any
    • 최소한 한개의 값이라도 있는지 확인: Flux#hasElements
    • 특정 값이 값이 있는지 확인 : Flux#hasElement
  • Publisher를 결합할 경우

    • 순서대로 : Flux#concat 또는 .concatWith(other)
    • 남은 Publisher가 방출될 때까지 오류를 지연시킬 경우 : Flux#concatDelayError
    • 다음 Publisher를 바로 subscribe할 경우 : Flux#mergeSequential
    • 방출된 순서대로 결합 (기존 순서가 아닌 도착하는 순서로 결합) : Flux#merge / .mergeWith(other)
    • 타입이 다르다면 변환후 결합: Flux#zip / Flux#zipWith
    • 짝으로 된 값으로
    • 2개의 Mono를 Tuple2로 변환 : Mono#zipWith
    • N개의 Mono가 모두 완료되었을 때 : Mono#zip
    • 종료를 조정하며
    • 1개의 Mono와 아무 source를 Mono로 : Mono#and
    • N개의 source가 모두 완료되었을 때 : Mono#when
    • 임의의 container 유형으로 :

      • 모든 side에서 값을 방출할 때마다 : Flux#zip
      • 다른 side에 새로운 값이 도착할 때마다 : Flux#combineLatest
    • 첫번째 Publisher만을 선택:
    • 값을 생성(onNext): firstWithValue
    • 신호 생성 : firstWithSignal
    • source sequence의 요소에 의해 트리거(각 요소는 Publisher에 매핑된다) : switchMap
    • 다음 publisher의 시작에 의해 트리거 : switchOnNext
  • 기존 sequence를 반복 : repeat

    • 시간 간격으로 : Flux.interval(duration).flatMap(tick -> myExistingPublisher)
  • empty sequence를 가지고 있는 경우

    • 대체 값을 원하는 경우 : defaultIfEmpty
    • 다른 sequence를 원하는 경우 : switchIfEmpty
  • sequence를 가지고 있지만 값에는 관심이 없는경우 : ignoreElements

    • Mono<Void>로 완료를 표현하고 싶은 경우 : then
    • 마지막에 다른 작업의 완료를 기다리고 싶은 경우 : thenEmpty
    • 마지막에 다른 Mono로 변경하고 싶은 경우 : Mono#then(mono)
    • 마지막에 단일 값을 방출하고 싶은 경우 : Mono#thenReturn(T)
    • 마지막에 Flux로 변경하고 싶은 경우 : thenMany
  • 완료를 연기하고 싶은 Mono가 있는 경우:

    • dl 값에서 파생된 다른 Publisher가 완료될 때까지 : Mono#delayUntil(Function)
  • 요소를 sequence graph로 재귀적으로 확장하고 조합하고 싶은 경우

    • 너비 우선으로 먼저 확장하기 : expand(Function)
    • 깊이 우선으로 먼저 확장하기 : expandDeep(Function)

Peeking into a Sequence

  • 최종 시퀀시의 변형 없이

    • 다음에 대해 통지를 받거나 추가적인 동작을 실행하고 싶을 때,
    • 방출 : doOnNext
    • 완료(값이 있다면 그 값도 포함) : Flux#doOnComplete, Mono#doOnSuccess
    • 에러 종료 : doOnError
    • 취소 : doOnCancel
    • 시퀀스의 시작: doFirst

      • Publisher#subscribe(Subscriber)에 연결된다.
    • 구독(subscription)이후: doOnSubscribe

      • Subscriber#onSubscribe(Subscription)에 연결된다.
    • 요청: doOnRequest
    • 완료 또는 에러: doOnTerminate

      • 다운 스트림으로 전파된 이후에 필요하면: doAfterTerminate
    • 모든 Signal 타입: doOnEach
    • 모든 종료 조건(완료, 에러, 취소): doFinally
    • 내부적으로 발생하는 로그를 볼 때: log
  • 모든 이벤트를 알고 싶을 때

    • 각각의 Signal로 표현되는 객체에 대해
    • sequence 외부에서 callback으로: doOnEach
    • 기본의 onNext를 통한 방출 대신: materialize

      • onNext로 다시 돌아가고 싶다면 dematerialize
    • 로그를 한 줄로: log

Filtering a Sequence

  • 시퀀스를 필터링 하고 싶은 경우

    • 임의의 기준을 기반으로: filter
    • 비동기로 계산해야한다면 filterWhen
    • 방출된 객체의 타입을 제한해야할 경우: ofType
    • 모든 값들을 무시해야할 경우: ignoreElements
    • 중복된 값들을 무시할 경우
    • 전체 시퀀스에 대해(논리적으로 중복이 없는 set): Flux#distinct
    • 이후 방출된 값에 대한 중복 제거: Flux#distincUntilChanged
  • 시퀀스의 subset을 획득하고 싶은 경우

    • N개의 요소 획득
    • 시퀀스의 앞부분을 얻고 싶은 경우: Flux#take(long)

      • 업스트림에서 무제한적으로 요청 : Flux#take(long, false)
      • Duration을 기준으로: Flux#take(Duration)
      • 첫번째 요소만 Mono로: Flux#next()
    • 시퀀스의 마지막부분을 얻고 싶은 경우 : Flux#takeLast
    • 기준을 충족할 때까지 : Flux#takeUntil (pridicate-based), Flux#takeUntilOther (companion publisher-based)
    • 기준이 충족되는 동안: Flux#takeWhile
    • 요소 1개를 획득
    • 특정 위치의 요소: Flux#elementAt
    • 마지막 부분을 얻을 경우: .takeLast(1)

      • 비어 있을 때 에러를 방출할 경우: Flux#last()
      • 비어 있을 때 디폴트 값을 방출할 경우: Flux#last(T)
    • 요소를 건너뛰고 싶은 경우
    • 시퀀스의 앞부분부터: Flux#skip(long)

      • Duration을 기준으로: Flux#skip(Duration)
    • 시퀀스의 마지막 부분을: Flux#skipLast
    • 기준을 충족할 때까지 : Flux#skipUntil (pridicate-based), Flux#skipUntilOther (companion publisher-based)
    • 기준이 충족되는 동안: Flux#skipWhile
    • 아이템들을 샘플링할 경우
    • Duration: Flux#sample(duration)

      • 마지막 요소 대신 샘플링 윈도우에서 첫번째 요소를 유지할 경우: sampleFirst
    • Publisher기반 윈도우로: Flux#sample(Publisher)
    • Publisher ‘timing out’을 기반으로: Flux#sampleTimeout (각각의 요소들은 Publisher를 트리거하고 Publisher가 다음과 겹치지 않으면 방출한다)
    • 요소가 최대 1개라 예상할 경우(1개를 초과하면 에러가 발생한다)
    • 시퀀스가 비어있을 때 에러를 원할 경우: Flux#single()
    • 시퀀스가 비어있을 때 디폴트 값을 원하는 경우: Flux#single(T)
    • 비어있는 시퀀스도 허용할 경우: Flux#singleOrEmpty()

Handling Errors

  • 에러 시퀀스를 생성: error

    • 성공한
  • try/catch와 동일한 처리를 하고 싶은 경우

    • exception을 throwing: error
    • exception을 catch
    • 디폴트 값으로 falling back: onErrorReturn
    • 에러를 swallow(삼키기) onErrorComplete
    • 다른 Flux나 Mono로 falling back: onErrorResume
    • Exception을 감싸고 다시 던질 경우: onErrorMap(t → new RuntimeException(t))
    • finaly 블록: doFinally
    • java7에서의 using 패턴: using
  • 에러를 복구하고 싶은 경우

    • falling back:
    • 값으로: onErrorReturn
    • 에러를 삼켜서 완료 상태로: onErrorComplete
    • 에러에 따라 다른 Publisher나 Mono로: Flux#onErrorResume, Mono#onErrorResume
    • 재시도
    • 간단한 정책(최대 시도 횟수)을 통해: retry(), retry(long)
    • companion control Flux를 트리거: retryWhen
    • 표준 백오프 전략을 사용: retryWhen(Retry.backoff(..))
  • Backpressure의 오류 처리 (upstream에서 최대 요청을 하고 downstream이 충분한 요청을 생성하지 않을 경우 적용)

    • 특별한 IllegalStateException을 throwing : Flux#onBackpressureError
    • 초과한 값들을 드랍 : Flux#onBackpressureDrop
    • 마지막에 받은 아이템은 제외 : Flux#onBackpressureLatest
    • 초과 값들을 버퍼링 (바운드 또는 언바운드) : Flux#onBackpressureBuffer
    • bounded buffer가 overflow 시 전략 적용 : Flux#onBackpressureBuffer with a BufferOverflowStrategy

Working with Time

  • 제공된 데이터의 최상의 정밀도 및 다기능성으로 측정된 시간을 방출하고 싶을 때: timed

    • Timed#elapsed() for Duration since last onNext
    • Timed#timestamp() for Instant representation of the epoch timestamp (milliseconds resolution)
    • Timed#elapsedSinceSubcription() for Duration since subscription (rather than last onNext)
    • can have nanoseconds resolution for elapsed Durations
  • 방출 사이에 너무 많은 지연이 있어 sequence가 중단되기를 원할 경우 : timeout
  • 일정한 시간 간격으로 방출하고 싶은 경우 : Flux#interval
  • 초기 지연 후 단일 0을 방출하고 싶은 경우 : Mono.delay
  • 지연을 유발하고 싶을 때

    • onNext 신호 사이마다 : Mono#delayElementFlux#delayElements
    • subscription을 하기 전 : delaySubscription

Splitting a Flux

  • 경계 기준을 정해서 FLux를 Flux<Flux>로 분리하고 싶을 경우

    • 사이즈를 경제로: window(int)
    • window를 overlapping하거나 드랍하고 싶을 경우: window(int, int)
    • Duration을 경계로: window(Duration)
    • window를 overlapping하거나 드랍하고 싶을 경우: window(Duration, Duration)
    • 사이즈 또는 시간을 경계로(사이즈 카운트에 도달하거나 시간 초과가되면 윈도우가 닫힌다): windowTimeout(int, Duration)
    • Predicate 기반: windowUtil
    • Control Publisher의 onNext로 표현되는 임의의 경계에 의해 동작: window(Publisher), windowWhen
  • Flux를 분리하여 경계를 정해 같은 경계의 요소들을 버퍼에 나눠 담고 싶은 경우

    • List로
    • 사이즈 경계로: buffer(int)

      • 버퍼를 overlapping하거나 드랍하고 싶을 경우: buffer(int, int)
    • Duration을 경계로: buffer(Duration)

      • window를 overlapping하거나 드랍하고 싶을 경우: buffer(Duration, Duration)
    • 사이즈 또는 시간을 경계로: bufferTimeout(int, Duration)
    • Predicate 기반: bufferUtil(Predicate)

      • 다음 버퍼의 경계를 트리거한 요소를 넣고 싶을 경우: .bufferUtil(predicate, true)
      • Predicate이 일치하는 동안은 버퍼링하고 경계를 트릭한 요소를 드랍하고 싶을 경우: bufferWhile(Predicate)
    • Control Publisher의 onNext로 표현되는 임의의 경계에 의해 동작: buffer(Publisher), bufferWhen
    • 임의의 Collection 타입인 C로: use variants like buffer(int, Supplier<C>)
  • 특성을 공유하는 요소가 동일한 하위 플럭스를 이루도록 Flux를 분할하고 싶다면: groupBy(Function<T,K>)

Going Back to the Synchronous World

Note: Mono#toFuture를 제외한 모든 메서드는 “non-blocking only” 표시가 된 스케줄러 내에서 호출되면 UnsupportedOperationException을 발생시킨다.

  • Flux<T>로부터

    • 첫 번째 요소를 얻을 때까지 차단(block)하고 싶다면: Flux#blockFirst
    • 타임아웃을 주려면: Flux#blockFirst(Duration)
    • 마지막 요소를 얻을 때까지 차단(block)하고 싶다면: Flux#blockLast
    • 타임아웃을 주려면: Flux#blockLast(Duration)
    • 동기로 Iterable<T>로 전환하고 싶다면: Flux#toIterable
    • 동기로 Java8의 Stream<T>로 전환하고 싶다면: Flux#toStream
  • Mono<T>로부터

    • 값을 얻을 때까지 차단(block)하고 싶은 경우: Mono#block
    • 타임아웃을 주려면: Mono#block(Duration)
    • CompletableFuture<T>: Mono#toFuture

Multicasting a Flux to several Subscribers

  • 여러 Subscriber를 하나의 Flux에 연결하고 싶은 경우:

    • connect()를 통해 소스를 트리거하는 시점을 결정: publish() (ConnectableFlux를 반환)
    • 소스를 즉시 트리거(이후의 구독자들은 이후의 데이터를 본다): share()
    • 충분한 구독자가 등록되어 소스에 영구적인 연결: publish().autoConnect(n)
    • 구독자가 임계값 위/아래로 이동하면 자동으로 소스의 연결 및 취소: publish().refCount(n)
    • 취소하기 전에 신규 가입자가 들어올 수 있는 기회를 제공: publish().refCount(n, Duration)
  • Publisher의 데이터를 캐시하고 이후의 구독자들에게 재전송하고 싶은 경우:

    • 요소 n개까지: cache(int)
    • Duration내에서 본 최신 요소를 캐싱한다: cache(Duration)
    • n개 이하의 요소를 유지하고 싶은 경우: cache(int, Duration)
    • 소스를 즉시 트리거하지 않고: Flux#replay (ConnectableFlux를 반환)
Buy me a coffeeBuy me a coffee
Written by

@Seongwon

기술공유를 통해 새로운 가치 창조을 추구하는 백엔드 개발자 오성원입니다.
©SeongwonOh