Spring

[Spring WebFlux] Project Reactor 란? / 마블 다이어그램(Marble Diagram) / 스케쥴러 (Scheduler) / Operators

jungha_k 2022. 11. 30. 21:43

Project Reactor (=Reactor) 란?

: 리액티브 스트림즈(Reactive Streams) 표준 사양의 구현체 중 하나

 

Spring Reactive Web Application의 개발에 있어 핵심 of 핵심 역할 라이브러리!

(Spring 5 ~ 지원 : 리액티브 스택)

 

 

https://projectreactor.io/

Reactor의 특징

 

1) Reactive Stream를 구현한 리액티브 라이브러리

 

 

2) 완전한 Non-Blocking 통신 지원 : 요청 쓰레드가 차단되지 X

 

 

3) Publisher 타입 - Mono, Flux 제공

  • Mono[0|1] : 0, 1건의 데이터 emit 가능
  • Flux[N] : 여러 건의 데이터 emit 가능

4) MSA(Microservie Architecture) 구조에 적합

: 서비스간 통신이 잦기 때문에 Non-blocking 통신 굿

 

*MSA 구조? = 각각을 마이크로하게 나눈 독립적인 서비스를 연결한 구조

5) Backpressure 전략 지원 :

Subscriber의 처리 속도가 Publisher 의 emit 속도를 따라가지 못할 때

적절하게 제어해줌

 


Reactor 구성 요소

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    // ✨Reactor Sequence 시작점
            .just("Hello", "Reactor")                    // ✨just() ➡ Publisher (데이터 emit)
            .map(message -> message.toUpperCase())       // ✨map() ➡ Operator (데이터 가공)
            .publishOn(Schedulers.parallel())            // ✨ Scheduler : 쓰레드 관리자
            .subscribe(System.out::println,          // ✨ subscribe() ➡ emit한 데이터 전달받아 처리
                    error -> System.out.println(error.getMessage()),
                    // ✨에러 발생시 에러 전달받아 처리
                    () -> System.out.println("# onComplete"));   //✨Sequence 종료 후 후처리

        Thread.sleep(100L); // 0.1초 동작 지연
        
    }
}

마블 다이어그램 (Marble Diagram)

: 시간의 흐름에 따라 변화하는 데이터의 흐름

 

* marble? = 구슬 / 1구슬 = 1데이터

 

 

 

Mono의 마블 다이어그램 : 0, 1개의 데이터만 emit

 

 

Flux의 마블 다이어그램 : n개의 데이터 emit

 

 


스케줄러 (Scheduler)란?

: 쓰레드 관리자 

= Reactor Sequence 상에서 처리되는 동작들하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드 제공! 

 

복잡한 멀티쓰레딩 프로세스를 단순하게 해준다

 


Scheduler 전용 Operator :

적절한 상황에 맞는 쓰레드를 추가로 생성

  • subscribeOn()
  • publishOn()

 

1) subscribeOn() :

구독 직후 실행되는 Operator 체인의 실행 쓰레드를 Scheduler로 지정한 쓰레드로 변경

= 구독 시점 직후의 실행 흐름을 다른 쓰레드로 바꾸는데 사용

= 데이터 소스에서 데이터를 emit하는 원본 Publisher의 실행 쓰레드를 지정하는 역할

 

스케쥴러 : 주로  Schedulers.*boundedElastic* 사용

* doOnSubscribe() : 구독 직후에 트리거(동작 수행)되는 Operator

 

 

 

2) publishOn() :

publishOn() 다운스트림쪽 쓰레드가 publishOn()에서 스케쥴러로 지정한 쓰레드로 변경

= 전달받은 데이터를 가공처리하는 Operator 앞에 추가해서 실행 쓰레드를 별도로 추가하는 역할

 

스케쥴러 : 주로 Schedulers.*parallel*() 사용

 

*doOnNext() : 바로 앞에 위치한 Operator가 실행될 때, 트리거 되는 Operator

 

 


🤔 쓰레드를 추가하거나, 구분해서 사용해야 할 이유는 무엇이지?

➡ 복잡한 계산이 필요할 경우 하나의 쓰레드만 사용한다면 응답지연이 발생할 수도 있다!

따라서 요청 처리 쓰레드 등 별도의 쓰레드를 Operator에 따라 추가하거나 한다.

 


Operators

: 종류 多 ➡ 유형별로 분류한 뒤 자주 쓰이는 것 위주로 학습할 것!

마블 다이어그램으로 어떤 Operator일지 우선적으로 생각해본다.

 

 

유형(상황)별 Operators 목록

  • Sequence Creating
  • Sequence Transforming
  • Sequence Peeking
  • Sequence Data Filtering
  • Handling Errors

  • 새로운 Sequence를 생성(Create)할 경우

    fromStream() : Stream을 입력으로 전달받아 emit
    fromIterable() : Iterable을 입력으로 전달받아 emit (List, Map, Set..)
    create() : 프로그래밍 방식으로 Signal 이벤트를 발생시킴, 한 번에 여러 건의 데이터 비동기적으로 emit



  • 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우

    map() 
    flatMap() : 내부로 들어오는 데이터 한 건당 하나의 Sequence 생성, 작업 처리 순서 보장 X
    concat() : 논리적으로 하나의 Sequence로 이어붙임 ➡ 이어붙인 Sequence에서 시간 순서데로 데이터 emit
    zip() : 여러개의 Publisher Sequence 에서 emit된 데이터들 같은 index로 결합



  • Sequence 내부 동작을 확인(Peeking) 하고자 할 경우

    doOnNext() : emit 시 트리거되어 부수 효과(side-effect) 추가 가능
    log() : Publisher 발생하는 Signal 이벤트를 로그로 출력해줌


  • Sequence에서 데이터 필터링(Filtering)이 필요한 경우

    ex) filter() / take()



  • 에러를 처리(Handling errors) 하고자 할 경우

    error() : 의도적으로 예외 던져서 onError Signal 이벤트 발생 
    timeout() : 주어지는 시간 동안 emit 되는 데이터 없을 시 onError Signal 이벤트 발생 
    retry() : Sequence 에서 에러 발생시 주어진 숫자 만큼 재구독 ➡ Sequence 다시 시작