Java - CompletableFuture 사용 방법

CompletableFuture는 Future와 CompletionStage를 구현한 클래스입니다. Future이지만 직접 쓰레드를 생성하지 않고 async로 작업을 처리할 수 있고, 여러 CompletableFuture를 병렬로 처리하거나, 병합하여 처리할 수 있게 합니다. 또한 Cancel, Error를 처리할 수 있는 방법을 제공합니다.

CompletableFuture의 예제를 보면서 어떻게 동작하는지 알아보겠습니다.

Future로 사용하는 방법

CompletableFuture는 new CompletableFuture<Type>처럼 생성할 수 있습니다. Type은 작업이 완료되어 저장되는 데이터의 타입을 의미합니다.

다음과 같이 CompletableFuture를 생성하여 일반적인 Future처럼 사용할 수 있습니다.

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.complete("Finished");
    return null;
});

log(future.get());

위의 코드에서는 newCachedThreadPool()으로 직접 쓰레드를 만들었고, 그 쓰레드의 작업이 완료되면 complete("Finished")으로 결과를 Future에 저장하였습니다. 결과가 저장되면 get()은 값을 리턴하며 block된 상태에서 빠져나옵니다.

참고로 log()는 아래와 같은 메소드를 직접구현하였습니다. 시간과 쓰레드의 이름을 함께 출력해 줍니다.

public void log(String msg) {
    System.out.println(LocalTime.now() + " ("
            + Thread.currentThread().getName() + ") " +  msg);
}

Output:

22:58:40.478 (main) Finished

이미 값을 알고 있다면 쓰레드를 만들지 않고 completedFuture()으로 값을 할당할 수 있습니다.

Future<String> completableFuture =
        CompletableFuture.completedFuture("Skip!");

String result = completableFuture.get();
log(result);

Output:

22:59:42.553 (main) Skip!

Cancel에 대한 예외 처리

다음과 같이 쓰레드에서 cancle()이 호출될 수 있습니다. 이 때, get()에서 CancellationException이 발생하기 때문에 아래와 같이 예외처리를 해야 합니다.

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.cancel(false);
    return null;
});

String result = null;
try {
    result = future.get();
} catch (CancellationException e) {
    e.printStackTrace();
    result = "Canceled!";
}

log(result);

Output:

java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
	at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!

supplyAsync(), runAsync()

CompletableFuture는 supplyAsync()runAsync()를 제공하여 직접 쓰레드를 생성하지 않고 작업을 async로 처리하도록 할 수 있습니다.

이런식으로 supplyAsync()에 Lambda로 인자를 전달할 수 있습니다. 인자로 전달된 Lambda는 다른 쓰레드에서 처리가 됩니다.

CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> "future example");

log("get(): " + future.get());

결과를 보면 main이 아닌 다른 쓰레드에서 처리되는 것을 알 수 있습니다.

15:43:02.923 (main) get(): future example

다음 예제는 Lambda에 2초간 sleep하도록 하였습니다. main에서 호출되는 future.get()은 2초간 blocking되고 완료되면 리턴값을 받습니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            log("Starting....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Finished works";
        });

log("get(): " + future.get());

Output:

15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works

runAsync()도 사용방법은 동일합니다. supplyAsync()는 리턴 값이 있는 반면에 runAsync()는 리턴값이 없습니다.

따라서 CompletableFuture<Void>으로 선언을 해야 합니다. 결과가 완료될 때까지 get()은 blocking되지만 null을 리턴합니다.

CompletableFuture<Void> future
        = CompletableFuture.runAsync(() -> log("future example"));

log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null

처리가 완료될 때까지 기다리지 않아도 된다면 다음과 같이 짧게 구현할 수도 있습니다.

CompletableFuture.runAsync(() -> log("future example"));

Exception handling

CompletableFuture에서 작업을 처리하는 중에 Exception이 발생할 수 있습니다. 이런 경우, handle()로 예외를 처리할 수 있습니다.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String name = null;
    if (name == null) {
        throw new RuntimeException("Computation error!");
    }
    return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

log(future.get());

Ouput:

15:51:35.385 (main) Hello, Stranger!

thenApply() : 리턴 값이 있는 작업 수행

supplyAsync()으로 어떤 작업이 처리되면, 그 결과를 가지고 다른 작업도 수행하도록 구현할 수 있습니다.

thenApply() 메소드는 인자와 리턴 값이 있는 Lambda를 수행합니다. 여기서 인자는 supplyAsync()에서 리턴되는 값이 됩니다.

다음 예제를 보시면, future1은 supplyAsync()으로만 구현되어있고, future2는 future1에 thenApply()를 붙여 다른 작업도 수행하도록 구현하였습니다.

CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = future1.thenApply(
        s -> s + " + Future2");

log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

Output:

15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2

위의 future2는 다음과 같이 한번에 정의할 수도 있습니다.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply(s -> s + " + Future2");

log("future.get(): " + future.get());

Output:

15:57:49.343 (main) future.get(): Future1 + Future2

thenApply() 또한 리턴값이 있기 때문에, 연달아 thenApply()를 적용할 수 있습니다.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenApply(s -> s + " World")
        .thenApply(s -> s + " Future");

log("future.get(): " + future.get());

Output:

16:00:00.513 (main) future.get(): Hello World Future

thenAccept() : 리턴 값이 없는 작업 수행

thenAccept()thenApply()와 비슷합니다. 하지만, 인자는 있지만 리턴값이 없는 Lambda를 처리할 수 있습니다.

다음은 thenAccept()를 사용하는 예제입니다. 리턴 값이 없기 때문에 thenAccept()CompletableFuture<Void>를 리턴하게 됩니다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Hello");

CompletableFuture<Void> future2 = future1.thenAccept(
        s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

결과를 보면 future2는 null을 리턴합니다.

16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null

thenCompose() : 여러 작업을 순차적으로 수행

thenCompose()는 chain처럼 두개의 CompletableFuture를 하나의 CompletableFuture으로 만들어주는 역할을 합니다. 첫번째 CompletableFuture의 결과가 리턴되면 그 결과를 두번째 CompletableFuture으로 전달하며, 순차적으로 작업이 처리됩니다.

다음 예제를 보시면 thenCompose()는 Lambda로 구성된 CompletableFuture를 인자로 받습니다. 여기서 ssupplyAsync()에서 리턴되는 String입니다.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());

Output:

16:07:33.196 (main) Hello World

다음과 같이 연속적으로 thenCompose()를 적용할 수도 있습니다.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));

log(future.get());

Output:

16:08:18.859 (main) Hello World Java

thenCombine() : 여러 작업을 동시에 수행

thenCompose()가 여러개의 CompletableFuture를 순차적으로 처리되도록 만들었다면, thenCombine()는 여러 CompletableFuture를 병렬로 처리되도록 만듭니다. 모든 처리가 완료되고 그 결과를 하나로 합칠 수 있습니다.

아래에는 두개의 future가 있습니다. thenCombine()으로 이 두개의 future가 병렬로 진행되도록 하고 그 결과를 하나로 합치도록 합니다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApply((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

결과를 보면, 순차적으로 처리된 것처럼 보입니다. 그 이유는 thenApply()가 동일한 쓰레드를 사용하기 때문에 대기하는 시간이 있었습니다.

16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!

thenApply() vs thenApplyAsync()

thenApply() 대신에 thenApplyAsync()를 사용하면 다른 쓰레드에서 동작하도록 만들 수 있습니다.

아래 코드는 위의 코드에서 thenApply()thenApplyAsync()로 변경한 코드입니다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApplyAsync((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApplyAsync((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Thread.sleep(5000);

결과를 보면 두개의 작업이 다른 쓰레드에서 처리되어 2초만에 처리되는 것을 볼 수 있습니다.

16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!

anyOf()

anyOf()는 여러개의 CompletableFuture 중에서 빨리 처리되는 1개의 결과만을 가져오는 메소드입니다.

다음과 같이 anyOf()를 사용할 수 있으며, 3개의 future 중에 가장 먼저 처리되는 1개의 결과만 thenAccept()으로 전달됩니다. 물론 3개의 future는 모두 실행이 됩니다. thenAccept()에 전달되는 것이 1개일 뿐입니다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future1");
            return "Future1";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future2");
            return "Future2";
        });

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future3");
            return "Future3";
        });

CompletableFuture.anyOf(future1, future2, future3)
        .thenAccept(s -> log("Result: " + s));

Output:

16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2

allOf()

allOf()는 모든 future의 결과를 받아서 처리를 할 수 있습니다.

anyOf()와 다르게 Stream api를 사용하여 결과를 처리할 수 있습니다. get()은 null을 리턴합니다.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2");

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> "Future3");

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(future1, future2, future3);

log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());

String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" + "));
log("Combined: " + combined);

Output:

16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3

async method

thenApply()thenApplyAsync()처럼 뒤에 async가 붙은 메소드들이 항상 존재합니다. 위의 예제에서 소개한 것처럼 동일한 쓰레드를 사용하지 않고 다른 쓰레드를 사용하여 처리하고 싶을 때 async가 붙은 메소드를 사용하면 됩니다.

예를 들어, thenAccept()thenAcceptAsync()라는 메소드를 갖고 있습니다. 이런식으로 대부분 async가 붙은 메소드들이 pair로 존재합니다.

참고

Loading script...

Related Posts

codechachaCopyright ©2019 codechacha