[Spring WebFlux] Project Reactor 란? / 마블 다이어그램(Marble Diagram) / 스케쥴러 (Scheduler) / Operators
Project Reactor (=Reactor) 란?
: 리액티브 스트림즈(Reactive Streams) 표준 사양의 구현체 중 하나
➡ Spring Reactive Web Application의 개발에 있어 핵심 of 핵심 역할 라이브러리!
(Spring 5 ~ 지원 : 리액티브 스택)
Reactor의 특징
1) Reactive Stream를 구현한 리액티브 라이브러리
2) 완전한 Non-Blocking 통신 지원 : 요청 쓰레드가 차단되지 X
3) Publisher 타입 - Mono, Flux 제공
- Mono[0|1] : 0, 1건의 데이터 emit 가능
- Flux[N] : 여러 건의 데이터 emit 가능
4) MSA(Microservie Architecture) 구조에 적합
: 서비스간 통신이 잦기 때문에 Non-blocking 통신 굿
*MSA 구조? = 각각을 마이크로하게 나눈 독립적인 서비스를 연결한 구조
5) Backpressure 전략 지원 :
Subscriber의 처리 속도가 Publisher 의 emit 속도를 따라가지 못할 때
적절하게 제어해줌
Reactor 구성 요소
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class HelloReactorExample {
public static void main(String[] args) throws InterruptedException {
Flux // ✨Reactor Sequence 시작점
.just("Hello", "Reactor") // ✨just() ➡ Publisher (데이터 emit)
.map(message -> message.toUpperCase()) // ✨map() ➡ Operator (데이터 가공)
.publishOn(Schedulers.parallel()) // ✨ Scheduler : 쓰레드 관리자
.subscribe(System.out::println, // ✨ subscribe() ➡ emit한 데이터 전달받아 처리
error -> System.out.println(error.getMessage()),
// ✨에러 발생시 에러 전달받아 처리
() -> System.out.println("# onComplete")); //✨Sequence 종료 후 후처리
Thread.sleep(100L); // 0.1초 동작 지연
}
}
마블 다이어그램 (Marble Diagram)
: 시간의 흐름에 따라 변화하는 데이터의 흐름
* marble? = 구슬 / 1구슬 = 1데이터
Mono의 마블 다이어그램 : 0, 1개의 데이터만 emit
Flux의 마블 다이어그램 : n개의 데이터 emit
스케줄러 (Scheduler)란?
: 쓰레드 관리자
= Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드 제공!
⬇
복잡한 멀티쓰레딩 프로세스를 단순하게 해준다
Scheduler 전용 Operator :
적절한 상황에 맞는 쓰레드를 추가로 생성
- subscribeOn()
- publishOn()
1) subscribeOn() :
구독 직후 실행되는 Operator 체인의 실행 쓰레드를 Scheduler로 지정한 쓰레드로 변경함
= 구독 시점 직후의 실행 흐름을 다른 쓰레드로 바꾸는데 사용
= 데이터 소스에서 데이터를 emit하는 원본 Publisher의 실행 쓰레드를 지정하는 역할
스케쥴러 : 주로 Schedulers.*boundedElastic* 사용
* doOnSubscribe() : 구독 직후에 트리거(동작 수행)되는 Operator
2) publishOn() :
publishOn() 다운스트림쪽 쓰레드가 publishOn()에서 스케쥴러로 지정한 쓰레드로 변경됨
= 전달받은 데이터를 가공처리하는 Operator 앞에 추가해서 실행 쓰레드를 별도로 추가하는 역할
스케쥴러 : 주로 Schedulers.*parallel*() 사용
*doOnNext() : 바로 앞에 위치한 Operator가 실행될 때, 트리거 되는 Operator
🤔 쓰레드를 추가하거나, 구분해서 사용해야 할 이유는 무엇이지?
➡ 복잡한 계산이 필요할 경우 하나의 쓰레드만 사용한다면 응답지연이 발생할 수도 있다!
따라서 요청 처리 쓰레드 등 별도의 쓰레드를 Operator에 따라 추가하거나 한다.
Operators
: 종류 多 ➡ 유형별로 분류한 뒤 자주 쓰이는 것 위주로 학습할 것!
마블 다이어그램으로 어떤 Operator일지 우선적으로 생각해본다.
유형(상황)별 Operators 목록
- Sequence Creating
- Sequence Transforming
- Sequence Peeking
- Sequence Data Filtering
- Handling Errors
- 새로운 Sequence를 생성(Create)할 경우
fromStream() : Stream을 입력으로 전달받아 emit
fromIterable() : Iterable을 입력으로 전달받아 emit (List, Map, Set..)
create() : 프로그래밍 방식으로 Signal 이벤트를 발생시킴, 한 번에 여러 건의 데이터 비동기적으로 emit - 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우
map()
flatMap() : 내부로 들어오는 데이터 한 건당 하나의 Sequence 생성, 작업 처리 순서 보장 X
concat() : 논리적으로 하나의 Sequence로 이어붙임 ➡ 이어붙인 Sequence에서 시간 순서데로 데이터 emit
zip() : 여러개의 Publisher Sequence 에서 emit된 데이터들 같은 index로 결합 - Sequence 내부 동작을 확인(Peeking) 하고자 할 경우
doOnNext() : emit 시 트리거되어 부수 효과(side-effect) 추가 가능
log() : Publisher 발생하는 Signal 이벤트를 로그로 출력해줌 - Sequence에서 데이터 필터링(Filtering)이 필요한 경우
ex) filter() / take() - 에러를 처리(Handling errors) 하고자 할 경우
error() : 의도적으로 예외 던져서 onError Signal 이벤트 발생
timeout() : 주어지는 시간 동안 emit 되는 데이터 없을 시 onError Signal 이벤트 발생
retry() : Sequence 에서 에러 발생시 주어진 숫자 만큼 재구독 ➡ Sequence 다시 시작