-
Mono의 zip method는 항상 async하지 않다개발연습/WebFlux 2024. 12. 13. 19:46
WebFlux를 열심히 공부하며 사용하고 있는 요즘, API latency mornitoring을 진행하다 이상한 점을 발견했다.
이 API는 두 종류의 I/O 작업을 호출한 뒤 응답을 만드는 간단한 로직을 가지고 있다. 물론 두 I/O를 Synchronus하게 호출해도 되지만, 나는 WebFlux를 사용하니까 더욱 빠른 응답을 위해 Mono의 zip 메소드를 사용하여 Asynchronus하게 호출해야지! 라고 생각하고 구현하였다.
위 그래프에서 맨 아래의 파란색, 붉은색은 각 I/O 작업을 진행한 시간을 의미한다. 분명 Mono의 zip 메소드를 사용했지만 두 I/O 작업은 Synchronus하게 진행되고 있었다.
왜 Mono의 zip 메소드는 Synchronus하게 호출될까? 분명 검색해보면 두 Mono가 동시에 구독된다는데 (예를 들면 https://d2.naver.com/helloworld/2771091) 모두가 날 속이고 있는걸까?
케이스 별로 분석하며 Mono의 zip 메소드로 의도한 Asynchronus 한 호출을 달성해보자.
1. Mono zip 메소드 예제
public static void main(String[] args) { Mono.zip(mono1(), mono2()) .subscribe(tuple -> System.out.println(tuple)); } static Mono<String> mono1() { return Mono.just("Hello"); } static Mono<String> mono2() { return Mono.just("World"); }
아주 간단한 코드를 만들었다.
[Hello,World]
두 mono를 합쳐 tuple로 mono를 반환하는 것이 확인되었다.
그럼 이제 실제 I/O작업과 유사하게, Mono의 내부에서 blocking을 해보자.
2. Block in each Mono
public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .map( tuple -> { log.info("{} {}", tuple.getT1(), tuple.getT2()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1"); } static Mono<String> mono2() { return heavyIO("Mono2"); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start", monoName); Thread.sleep(1000); log.info("{} I/O end", monoName); return monoName + " Heavy IO"; }); }
heavyIO method를 추가했다. 여기서는 blocking call을 호출하는 Mono를 return한다.
과연 로그의 순서는 어떻게 될까? 각자 생각해보고 아래로 스크롤을 내려보자.Mono zip은 두 Mono를 동시에 구독하니까 아래와 같은 로그를 예상했는가?
Mono1 I/O start Mono2 I/O start Mono1 I/O end Mono2 I/O end Mono1 Heavy IO Mono2 Heavy IO
필자도 처음엔 이 결과를 예상했으나, 그 결과는 다음과 같았다.
Mono1 I/O start Mono1 I/O end Mono2 I/O start Mono2 I/O end Mono1 Heavy IO Mono2 Heavy IO
동시에 구독하지 않고 Mono1이 방출된 뒤에 Mono2가 구독되었다.
우연은 아닐까? 테스트코드를 작성해보았다.
@RepeatedTest(100) void test() { StepVerifier.create(MonoZip.zip()).expectNextCount(1).expectComplete().verify(); assertEquals( List.of( LoggingEvent.info("{} I/O start", "Mono1"), LoggingEvent.info("{} I/O end", "Mono1"), LoggingEvent.info("{} I/O start", "Mono2"), LoggingEvent.info("{} I/O end", "Mono2"), LoggingEvent.info("{} {}", "Mono1 Heavy IO", "Mono2 Heavy IO")), logger.getLoggingEvents()); }
결과는...
왜 그럴까?
그 이유는 thread 사용에 있다. WebFlux에서는 subscribeOn / publishOn 등을 이용하여 Schedulers를 명시하지 않으면 기본적으로 "현재 실행중인 context"에서 실행하게 된다. main thread에서 Mono를 구독하니 WebFlux를 사용했지만 사실 Sync Blocking으로 작동하고 있었다.
public static void main(String[] args) { log.info("{}", getCurrentTime()); zip().block(); log.info("{}", getCurrentTime()); } public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .map( tuple -> { log.info( "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1"); } static Mono<String> mono2() { return heavyIO("Mono2"); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start in {}", monoName, Thread.currentThread().getName()); Thread.sleep(1000); log.info("{} I/O end in {}", monoName, Thread.currentThread().getName()); return monoName + " Heavy IO"; }); } ### 로깅 결과 19:09:55 942 Mono1 I/O start in main Mono1 I/O end in main Mono2 I/O start in main Mono2 I/O end in main Mono1 Heavy IO Mono2 Heavy IO 19:09:58 012
소요 시간도 2초가 걸리는 것을 확인할 수 있다.
그럼 어떻게 Schedulers를 명시해줘야 할까?
케이스 별로 알아보도록 하자
Mono 1만 I/O thread에서 실행할 때
public static void main(String[] args) { log.info("{}", getCurrentTime()); zip().block(); log.info("{}", getCurrentTime()); } public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .map( tuple -> { log.info( "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1").subscribeOn(Schedulers.boundedElastic()); } static Mono<String> mono2() { return heavyIO("Mono2"); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start in {}", monoName, Thread.currentThread().getName()); Thread.sleep(1000); log.info("{} I/O end in {}", monoName, Thread.currentThread().getName()); return monoName + " Heavy IO"; }); } ### 로깅결과 19:11:03 788 Mono2 I/O start in main Mono1 I/O start in boundedElastic-1 Mono2 I/O end in main Mono1 I/O end in boundedElastic-1 Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1 19:11:04 871
이제부턴 start가 동시에 되고, end가 동시에 된다. Mono1, Mono2간의 start / end 순서는 랜덤하게 등장하는 것을 확인할 수 있다.
Mono 2만 I/O thread에서 실행할 때
public static void main(String[] args) { log.info("{}", getCurrentTime()); zip().block(); log.info("{}", getCurrentTime()); } public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .map( tuple -> { log.info( "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1"); } static Mono<String> mono2() { return heavyIO("Mono2").subscribeOn(Schedulers.boundedElastic()); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start in {}", monoName, Thread.currentThread().getName()); Thread.sleep(1000); log.info("{} I/O end in {}", monoName, Thread.currentThread().getName()); return monoName + " Heavy IO"; }); } ## 로깅결과 19:17:25 421 Mono1 I/O start in main Mono1 I/O end in main Mono2 I/O start in boundedElastic-1 Mono2 I/O end in boundedElastic-1 Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1 19:17:27 513
이럴수가, 다시 Synchronus하게 실행되고 있는 모습이다. 분명 스레드가 다른데 말이다.
왜 두번째 Mono 2만 I/O thread에서 실행하면 Asynchronus하게 실행되지 않을까?
그 이유는 subscribeOn을 적용하기 전과 같다. main thread에서 Mono를 구독중으로 다음 실행흐름으로 이동하지 못하고 blocking되어 있는 것이다. 반면 Mono1을 I/O thread 에서 구독하게 되면 main thread는 blocking되지 않으므로 Mono2를 구독할 수 있게 된다.
그런데 잠깐, "Mono1 Heavy IO Mono2 Heavy IO"는 왜 I/O thread에서 남을까?
Mono1만 I/O thread에서 구독하는 경우 이런 로그가 남기도 한다.
19:31:54 011 Mono1 I/O start in boundedElastic-1 Mono2 I/O start in main Mono1 I/O end in boundedElastic-1 Mono2 I/O end in main Mono1 Heavy IO Mono2 Heavy IO in main 19:31:55 090
반면 Mono2만 I/O thread에서 구독하는 경우 "Mono1 Heavy IO Mono2 Heavy IO"는 항상 I/O thread에서 남는다.
map의 실행 context는 publisher에 의해 결정된다. 즉 zipped tuple을 생성한 publisher의 스레드에서 실행되게 된다.
Mono2만 I/O thread에서 구독하는 경우 sync blocking이므로 Mono2가 방출된 시점에서 zipped tuple이 생성된다. Mono2는 I/O thread에서 구독했으므로 "Mono1 Heavy IO Mono2 Heavy IO" 로그는 항상 I/O thread에서 실행된다.
그러나 Mono1만 I/O thread에서 구독하는 경우 async하게 두 Mono가 실행되므로 어떤 publisher가 zipped tuple을 생성할 지 알 수 없다. 따라서 상황에 따라 다른 스레드에서 "Mono1 Heavy IO Mono2 Heavy IO" 로그가 남게 된다.결론: Mono의 zip 메서드에서 두 Mono를 Async & Non-blocking으로 실행하려면?
public static void main(String[] args) { log.info("{}", getCurrentTime()); zip().block(); log.info("{}", getCurrentTime()); } public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .map( tuple -> { log.info( "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1").subscribeOn(Schedulers.boundedElastic()); } static Mono<String> mono2() { return heavyIO("Mono2").subscribeOn(Schedulers.boundedElastic()); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start in {}", monoName, Thread.currentThread().getName()); Thread.sleep(1000); log.info("{} I/O end in {}", monoName, Thread.currentThread().getName()); return monoName + " Heavy IO"; }); } ## 로깅결과 19:39:47 171 Mono1 I/O start in boundedElastic-1 Mono2 I/O start in boundedElastic-2 Mono1 I/O end in boundedElastic-1 Mono2 I/O end in boundedElastic-2 Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-2 19:39:48 252
main thread를 block하지 않으면서 두 Mono를 Async하게 실행하려면 각 Mono를 subscribe할 스레드를 명시해주면 된다!
API Latency 개선.. 성공적!
번외: 그냥 zip 메서드의 subscribe thread를 명시해주면 안되나요?
public static void main(String[] args) { log.info("{}", getCurrentTime()); zip().block(); log.info("{}", getCurrentTime()); } public static Mono<String> zip() { return Mono.zip(mono1(), mono2()) .subscribeOn(Schedulers.boundedElastic()) .map( tuple -> { log.info( "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName()); return tuple.getT1() + tuple.getT2(); }); } static Mono<String> mono1() { return heavyIO("Mono1"); } static Mono<String> mono2() { return heavyIO("Mono2"); } static Mono<String> heavyIO(String monoName) { return Mono.fromCallable( () -> { log.info("{} I/O start in {}", monoName, Thread.currentThread().getName()); Thread.sleep(1000); log.info("{} I/O end in {}", monoName, Thread.currentThread().getName()); return monoName + " Heavy IO"; }); } ## 로깅결과 19:43:17 818 Mono1 I/O start in boundedElastic-1 Mono1 I/O end in boundedElastic-1 Mono2 I/O start in boundedElastic-1 Mono2 I/O end in boundedElastic-1 Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1 19:43:19 906
zip 메서드의 Mono들을 I/O thread에서 구독하는 것이라, 결국은 해당 스레드의 blocking이 일어나 main thread를 사용하는것과 별반 다른게 없다. 물론 main thread를 blocking하는건 아니므로 다른 실행흐름이 진행될 수 있지만, 각 Mono를 각 IO Thread에서 구독하는 것보다는 느리게 된다.
WebFlux 잘 쓰고 싶다...