ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Mono의 zip method는 항상 async하지 않다
    개발연습/WebFlux 2024. 12. 13. 19:46

    WebFlux를 열심히 공부하며 사용하고 있는 요즘, API latency mornitoring을 진행하다 이상한 점을 발견했다.

    이 API는 두 종류의 I/O 작업을 호출한 뒤 응답을 만드는 간단한 로직을 가지고 있다. 물론 두 I/O를 Synchronus하게 호출해도 되지만, 나는 WebFlux를 사용하니까 더욱 빠른 응답을 위해 Mono의 zip 메소드를 사용하여 Asynchronus하게 호출해야지! 라고 생각하고 구현하였다.

    위 그래프에서 맨 아래의 파란색, 붉은색은 각 I/O 작업을 진행한 시간을 의미한다. 분명 Mono의 zip 메소드를 사용했지만 두 I/O 작업은 Synchronus하게 진행되고 있었다.

    왜 Mono의 zip 메소드는 Synchronus하게 호출될까? 분명 검색해보면 두 Mono가 동시에 구독된다는데 (예를 들면 https://d2.naver.com/helloworld/2771091) 모두가 날 속이고 있는걸까?

    케이스 별로 분석하며 Mono의 zip 메소드로 의도한 Asynchronus 한 호출을 달성해보자.

     

    1. Mono zip 메소드 예제

    public static void main(String[] args) {
        Mono.zip(mono1(), mono2())
            .subscribe(tuple -> System.out.println(tuple));
      }
    
      static Mono<String> mono1() {
        return Mono.just("Hello");
      }
    
      static Mono<String> mono2() {
        return Mono.just("World");
      }

    아주 간단한 코드를 만들었다. 

    [Hello,World]

    두 mono를 합쳐 tuple로 mono를 반환하는 것이 확인되었다.

    그럼 이제 실제 I/O작업과 유사하게, Mono의 내부에서 blocking을 해보자.

     

     

    2. Block in each Mono

    public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .map(
                tuple -> {
                  log.info("{} {}", tuple.getT1(), tuple.getT2());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1");
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2");
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start", monoName);
              Thread.sleep(1000);
              log.info("{} I/O end", monoName);
              return monoName + " Heavy IO";
            });
      }

    heavyIO method를 추가했다. 여기서는 blocking call을 호출하는 Mono를 return한다.
    과연 로그의 순서는 어떻게 될까? 각자 생각해보고 아래로 스크롤을 내려보자.

    Mono zip은 두 Mono를 동시에 구독하니까 아래와 같은 로그를 예상했는가?

    Mono1 I/O start
    Mono2 I/O start
    Mono1 I/O end
    Mono2 I/O end
    Mono1 Heavy IO Mono2 Heavy IO

    필자도 처음엔 이 결과를 예상했으나, 그 결과는 다음과 같았다.

    Mono1 I/O start
    Mono1 I/O end
    Mono2 I/O start
    Mono2 I/O end
    Mono1 Heavy IO Mono2 Heavy IO

    동시에 구독하지 않고 Mono1이 방출된 뒤에 Mono2가 구독되었다.

    우연은 아닐까? 테스트코드를 작성해보았다.

    @RepeatedTest(100)
    void test() {
        StepVerifier.create(MonoZip.zip()).expectNextCount(1).expectComplete().verify();
        assertEquals(
            List.of(
                LoggingEvent.info("{} I/O start", "Mono1"),
                LoggingEvent.info("{} I/O end", "Mono1"),
                LoggingEvent.info("{} I/O start", "Mono2"),
                LoggingEvent.info("{} I/O end", "Mono2"),
                LoggingEvent.info("{} {}", "Mono1 Heavy IO", "Mono2 Heavy IO")),
            logger.getLoggingEvents());
    }

    결과는...

    왜 그럴까?

    그 이유는 thread 사용에 있다. WebFlux에서는 subscribeOn / publishOn 등을 이용하여 Schedulers를 명시하지 않으면 기본적으로 "현재 실행중인 context"에서 실행하게 된다. main thread에서 Mono를 구독하니 WebFlux를 사용했지만 사실 Sync Blocking으로 작동하고 있었다.

      public static void main(String[] args) {
        log.info("{}", getCurrentTime());
        zip().block();
        log.info("{}", getCurrentTime());
      }
    
      public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .map(
                tuple -> {
                  log.info(
                      "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1");
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2");
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start in {}", monoName, Thread.currentThread().getName());
              Thread.sleep(1000);
              log.info("{} I/O end in {}", monoName, Thread.currentThread().getName());
              return monoName + " Heavy IO";
            });
      }
      
      
      ### 로깅 결과
    19:09:55 942
    Mono1 I/O start in main
    Mono1 I/O end in main
    Mono2 I/O start in main
    Mono2 I/O end in main
    Mono1 Heavy IO Mono2 Heavy IO
    19:09:58 012

    소요 시간도 2초가 걸리는 것을 확인할 수 있다.

     

    그럼 어떻게 Schedulers를 명시해줘야 할까?

    케이스 별로 알아보도록 하자

    Mono 1만 I/O thread에서 실행할 때

      public static void main(String[] args) {
        log.info("{}", getCurrentTime());
        zip().block();
        log.info("{}", getCurrentTime());
      }
    
      public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .map(
                tuple -> {
                  log.info(
                      "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1").subscribeOn(Schedulers.boundedElastic());
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2");
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start in {}", monoName, Thread.currentThread().getName());
              Thread.sleep(1000);
              log.info("{} I/O end in {}", monoName, Thread.currentThread().getName());
              return monoName + " Heavy IO";
            });
      }
      
      ### 로깅결과
    19:11:03 788
    Mono2 I/O start in main
    Mono1 I/O start in boundedElastic-1
    Mono2 I/O end in main
    Mono1 I/O end in boundedElastic-1
    Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1
    19:11:04 871

    이제부턴 start가 동시에 되고, end가 동시에 된다. Mono1, Mono2간의 start / end 순서는 랜덤하게 등장하는 것을 확인할 수 있다. 

    Mono 2만 I/O thread에서 실행할 때

      public static void main(String[] args) {
        log.info("{}", getCurrentTime());
        zip().block();
        log.info("{}", getCurrentTime());
      }
    
      public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .map(
                tuple -> {
                  log.info(
                      "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1");
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2").subscribeOn(Schedulers.boundedElastic());
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start in {}", monoName, Thread.currentThread().getName());
              Thread.sleep(1000);
              log.info("{} I/O end in {}", monoName, Thread.currentThread().getName());
              return monoName + " Heavy IO";
            });
      }
      
      ## 로깅결과
    19:17:25 421
    Mono1 I/O start in main
    Mono1 I/O end in main
    Mono2 I/O start in boundedElastic-1
    Mono2 I/O end in boundedElastic-1
    Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1
    19:17:27 513

    이럴수가, 다시 Synchronus하게 실행되고 있는 모습이다. 분명 스레드가 다른데 말이다.

    왜 두번째 Mono 2만 I/O thread에서 실행하면 Asynchronus하게 실행되지 않을까?

    그 이유는 subscribeOn을 적용하기 전과 같다. main thread에서 Mono를 구독중으로 다음 실행흐름으로 이동하지 못하고 blocking되어 있는 것이다. 반면 Mono1을 I/O thread 에서 구독하게 되면 main thread는 blocking되지 않으므로 Mono2를 구독할 수 있게 된다.

     

    그런데 잠깐, "Mono1 Heavy IO Mono2 Heavy IO"는 왜 I/O thread에서 남을까?

    Mono1만 I/O thread에서 구독하는 경우 이런 로그가 남기도 한다.

    19:31:54 011
    Mono1 I/O start in boundedElastic-1
    Mono2 I/O start in main
    Mono1 I/O end in boundedElastic-1
    Mono2 I/O end in main
    Mono1 Heavy IO Mono2 Heavy IO in main
    19:31:55 090

    반면 Mono2만 I/O thread에서 구독하는 경우 "Mono1 Heavy IO Mono2 Heavy IO"는 항상 I/O thread에서 남는다.

    map의 실행 context는 publisher에 의해 결정된다. 즉 zipped tuple을 생성한 publisher의 스레드에서 실행되게 된다.
    Mono2만 I/O thread에서 구독하는 경우 sync blocking이므로 Mono2가 방출된 시점에서 zipped tuple이 생성된다. Mono2는 I/O thread에서 구독했으므로 "Mono1 Heavy IO Mono2 Heavy IO" 로그는 항상 I/O thread에서 실행된다.
    그러나 Mono1만 I/O thread에서 구독하는 경우 async하게 두 Mono가 실행되므로 어떤 publisher가 zipped tuple을 생성할 지 알 수 없다. 따라서 상황에 따라 다른 스레드에서 "Mono1 Heavy IO Mono2 Heavy IO" 로그가 남게 된다.

    결론: Mono의 zip 메서드에서 두 Mono를 Async & Non-blocking으로 실행하려면?

    public static void main(String[] args) {
        log.info("{}", getCurrentTime());
        zip().block();
        log.info("{}", getCurrentTime());
      }
    
      public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .map(
                tuple -> {
                  log.info(
                      "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1").subscribeOn(Schedulers.boundedElastic());
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2").subscribeOn(Schedulers.boundedElastic());
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start in {}", monoName, Thread.currentThread().getName());
              Thread.sleep(1000);
              log.info("{} I/O end in {}", monoName, Thread.currentThread().getName());
              return monoName + " Heavy IO";
            });
      }
      
      ## 로깅결과
    19:39:47 171
    Mono1 I/O start in boundedElastic-1
    Mono2 I/O start in boundedElastic-2
    Mono1 I/O end in boundedElastic-1
    Mono2 I/O end in boundedElastic-2
    Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-2
    19:39:48 252

    main thread를 block하지 않으면서 두 Mono를 Async하게 실행하려면 각 Mono를 subscribe할 스레드를 명시해주면 된다!

    API Latency 개선.. 성공적!

     

    번외: 그냥 zip 메서드의 subscribe thread를 명시해주면 안되나요?

    public static void main(String[] args) {
        log.info("{}", getCurrentTime());
        zip().block();
        log.info("{}", getCurrentTime());
      }
    
      public static Mono<String> zip() {
        return Mono.zip(mono1(), mono2())
            .subscribeOn(Schedulers.boundedElastic())
            .map(
                tuple -> {
                  log.info(
                      "{} {} in {}", tuple.getT1(), tuple.getT2(), Thread.currentThread().getName());
                  return tuple.getT1() + tuple.getT2();
                });
      }
    
      static Mono<String> mono1() {
        return heavyIO("Mono1");
      }
    
      static Mono<String> mono2() {
        return heavyIO("Mono2");
      }
    
      static Mono<String> heavyIO(String monoName) {
        return Mono.fromCallable(
            () -> {
              log.info("{} I/O start in {}", monoName, Thread.currentThread().getName());
              Thread.sleep(1000);
              log.info("{} I/O end in {}", monoName, Thread.currentThread().getName());
              return monoName + " Heavy IO";
            });
      }
      
      ## 로깅결과
    19:43:17 818
    Mono1 I/O start in boundedElastic-1
    Mono1 I/O end in boundedElastic-1
    Mono2 I/O start in boundedElastic-1
    Mono2 I/O end in boundedElastic-1
    Mono1 Heavy IO Mono2 Heavy IO in boundedElastic-1
    19:43:19 906

    zip 메서드의 Mono들을 I/O thread에서 구독하는 것이라, 결국은 해당 스레드의 blocking이 일어나 main thread를 사용하는것과 별반 다른게 없다. 물론 main thread를 blocking하는건 아니므로 다른 실행흐름이 진행될 수 있지만, 각 Mono를 각 IO Thread에서 구독하는 것보다는 느리게 된다.

     

    WebFlux 잘 쓰고 싶다...

    댓글

Designed by Tistory.