본문 바로가기
Java

Java - 비동기 api 호출 (CompletableFuture 2편)

by 오늘부터개발시작 2022. 9. 5.

지난 시간에는 Java에서 비동기로 api를 호출하기 위해서 CompletableFuture를 사용하는 방법에 대해서 알아보았다. 오늘은 CompletableFuture를 사용할 때 성능을 최적화하는 방법과 여러 개의 비동기 작업을 파이프라인을 통해서 선언적으로 사용하는 방법에 대해서 알아보도록하겠다. CompletableFuture의 기본 사용 방법을 알고 싶다면 이전 글을 참고하길 바란다.

 

https://today-devstart.tistory.com/32

 

Java - 비동기 api 호출 (CompletableFuture 1편)

동기 API Java 개발을 하다보면 loop 안에서 여러번의 api를 호출해야할 때가 있다. 예를 들어서 아래와 같이 10번의 api 호출을 해야하는 코드가 있다. for (int i = 0; i < 10; i++) { blockingApiCall(); } Jav..

obv-cloud.com

 

성능 최적화 (Cusotom Executor 사용)

비동기로 api를 처리할 때 속도에 큰 영향을 미치는 것은 스레드의 개수이다. 스레드가 4개 인데 요청개수가 4개보다 작다면 Stream으로 병렬처리하는 것이 더 빠르고 만약에 4개보다 크다면 병렬과 비동기가 비슷한 결과를 보여준다. 그러면 그냥 병렬처리를 써야할까?

 

Stream의 병렬 처리는 Runtime.getRuntime().availableProcessors() 메소드가 반환하는 스레드 수 만큼 병렬로 처리할 수 있지만 다행히도CompletableFuture는 스레드 개수가 커스텀이 가능하다. 그렇다면 스레드는 몇개로 하는게 좋을까?

 

최적의 스레드 개수를 정하는 공식이 있지만 Java에서 사용하기에 비동기로 호출할 때 알맞은 스레드의 개수는 100개가 넘지 않는게 좋다. 스레드가 너무 많으면 CPU 메모리 경쟁이 너무 심해져서 오버헤드가 너무 커질 수 있고 Java에서는 스레드 수가 너무 많으면 서버가 크래시 될 위험도 있다. 모던 자바 인 액션에서는 api 호출 개수와 100개 중에서 min 값으로 스레드 개수를 정하는 것이 바람직하다고 나와 있다. 

 

Executor executor = Executors.newFixedThreadPool(Math.min(apiList.size(), 100), (Runnable r) -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        });

Stream<CompletableFuture<Double>> apiCallList = Stream.iterate(0, n -> n + 1)
    .limit(10)
    .map((i) -> CompletableFuture.supplyAsync(() -> i + 0.0, executor))
    .collect(Collectors.toList())
    .stream();

List<Double> apiCallResultList = apiCallList
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList());

 

위의 코드에서는 스레드 풀의 크기를 api 요청 개수와 100개 중에 적은 숫자를 선택하고 스레드를 데몬 스레드로 설정해준다. 데몬 스레드와 일반 스레드의 성능은 같지만 데몬 스레드는 프로그램이 종료될 때 같이 종료돼서 예기치 못한 에러를 방지할 수 있다.

 

비동기 api 파이프라인 

Java 5에서 제공하는 Future 대신에 Java 8의 CompletableFuture를 사용하는 이유는 Stream처럼 동기 + 비동기 로직을 파이프라인으로 만들어서 선언적으로 처리할 수 있기 때문이다. 이렇게 처리하면 훨씬 가독성도 높아지고 구현하기도 쉬워진다. 

 

Executor executor = Executors.newFixedThreadPool(Math.min(apiList.size(), 100), (Runnable r) -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        });

Stream<CompletableFuture<Double>> apiCallList = Stream.iterate(0, n -> n + 1)
        .limit(10)
        .map((i) -> CompletableFuture.supplyAsync(() -> getDoubleValue(), executor))
        .map(completableFuture -> completableFuture.thenApply((d) -> d * 10))
        .map(completableFuture -> completableFuture.thenCompose(
        		d -> CompletableFuture.supplyAsync(() -> getDoubleValue2(), executor)))
        .collect(Collectors.toList())
        .stream();

List<Double> apiCallResultList = 
		apiCallList.map(CompletableFuture::join).collect(Collectors.toList());

 

위 코드에서 thenApply는 동기 처리를 파이프라인에 추가할 때 넣을 수 있고 thenCompose는 비동기 처리를 파이프라인에 추가할 때 넣을 수 있다. 이처럼 Java 8의 CompletableFuture를 사용하면 api 결과를 받아서 복잡하게 처리하지 않고 한 눈에 플로우를 확인할 수 있다.