Java - CompletableFuture使用方法と例

CompletableFutureはFutureとCompletionStageを実装したクラスです。 Futureだ直接スレッドを作成せずにasyncにタスクを処理することができ、いくつかのCompletableFutureを並列に処理するか、マージして処理することができるようになります。 またCancel、Errorを処理することができる方法を提供します。

CompletableFutureの例を見てどのように動作するかを知ってみましょう。

Futureとして使用する方法

CompletableFutureは new CompletableFuture<Type>のように生成することができます。 Typeは、ジョブが完了して保存されているデータのタイプを意味します。

次のようにCompletableFutureを作成して、一般的なFutureのように使用することができます。

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.complete("Finished");
    return null;
});

log(future.get());

上記のコードでは、 newCachedThreadPool()で直接スレッドを作り、そのスレッドの処理が完了すると、 complete("Finished")結果をFutureに保存しました。 結果が格納されると、 get()は値を戻し、blockされた状態で出てきます。

ちなみに log()は以下のようなメソッドを直接実装しました。時間とスレッドの名前を一緒に出力してくれます。

public void log(String msg) {
    System.out.println(LocalTime.now() + " ("
            + Thread.currentThread().getName() + ") " +  msg);
}

Output:

22:58:40.478 (main) Finished

すでに値を知っていれば、スレッドを作成せずに completedFuture()に値を割り当てることができます。

Future<String> completableFuture =
        CompletableFuture.completedFuture("Skip!");

String result = completableFuture.get();
log(result);

Output:

22:59:42.553 (main) Skip!

Cancelの例外処理

次のようにスレッドで cancle()が呼び出されることがあります。このとき、 get()CancellationExceptionが発生するため、以下のように例外処理をする必要があります。

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.cancel(false);
    return null;
});

String result = null;
try {
    result = future.get();
} catch (CancellationException e) {
    e.printStackTrace();
    result = "Canceled!";
}

log(result);

Output:

java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
	at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!

supplyAsync(), runAsync()

CompletableFutureは supplyAsync()runAsync()を提供して直接スレッドを作成せずに作業をasyncに処理するようにすることができます。

このように supplyAsync()のLambdaに引数を渡すことができます。引数として渡されたLambdaは、他のスレッドで処理がされます。

CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> "future example");

log("get(): " + future.get());

結果を見ると、main以外のスレッドで処理されていることがわかります。

15:43:02.923 (main) get(): future example

次の例では、Lambda 2秒間sleepするようにしました。 mainから呼び出される future.get()は2秒間blockingされて完了すると、戻り値を受け取ります。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            log("Starting....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Finished works";
        });

log("get(): " + future.get());

Output:

15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works

runAsync()も使用方法は同じです。 supplyAsync()は戻り値がある一方で runAsync()は戻り値がありません。

したがって CompletableFuture<Void>に宣言する必要があります。 結果が完了するまで、 get()はblockingますが、 nullを返します。

CompletableFuture<Void> future
        = CompletableFuture.runAsync(() -> log("future example"));

log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null

処理が完了するまで待たなくても場合は、次のように短く実装することもできます。

CompletableFuture.runAsync(() -> log("future example"));

Exception handling

CompletableFutureで作業を処理する際にExceptionが発生する可能性があります。 このような場合には、 handle()で例外を処理することができます。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String name = null;
    if (name == null) {
        throw new RuntimeException("Computation error!");
    }
    return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

log(future.get());

Ouput:

15:51:35.385 (main) Hello, Stranger!

thenApply():戻り値があるタスクを実行する

supplyAsync()で何が処理されると、その結果を持って、他の作業も実行するように実装することができます。

thenApply()メソッドは、引数と戻り値があるLambdaを実行します。ここで、引数は supplyAsync()で返される値になります。

次の例を見れば、future1は supplyAsync()でのみ実装されており、future2はfuture1に thenApply()を付けて、他の作業も実行するように実装しました。

CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = future1.thenApply(
        s -> s + " + Future2");

log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

Output:

15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2

上記のfuture2は次のように一度に定義することもできます。

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply(s -> s + " + Future2");

log("future.get(): " + future.get());

Output:

15:57:49.343 (main) future.get(): Future1 + Future2

thenApply()も戻り値があるので、相次いで thenApply()を適用することができます。

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenApply(s -> s + " World")
        .thenApply(s -> s + " Future");

log("future.get(): " + future.get());

Output:

16:00:00.513 (main) future.get(): Hello World Future

thenAccept():戻り値がないタスクを実行する

thenAccept()thenApply()と似ています。しかし、引数が、戻り値がないLambdaを処理することができます。

以下は、 thenAccept()を使用する例です。戻り値がないため、 thenAccept()CompletableFuture<Void>を返しになります。

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Hello");

CompletableFuture<Void> future2 = future1.thenAccept(
        s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

結果を見ると、future2はnullを返します。

16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null

thenCompose():複数のタスクを順番に実行

thenCompose()はchainのように二つのCompletableFutureを一つのCompletableFutureで作ってくれる役割をします。 最初CompletableFutureの結果が返されると、その結果を第二CompletableFutureに伝達し、順次ジョブが処理されます。

次の例を見れば thenCompose()はLambdaで構成されたCompletableFutureを引数として受け取ります。ここで、 ssupplyAsync()で返されるStringです。

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());

Output:

16:07:33.196 (main) Hello World

次のように連続的に thenCompose()を適用することもできます。

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));

log(future.get());

Output:

16:08:18.859 (main) Hello World Java

thenCombine():複数のタスクを同時に実行

thenCompose()が複数のCompletableFutureを順次処理されるように作成した場合、 thenCombine()は、複数のCompletableFutureを並列に処理されるようになります。 すべての処理が完了し、その結果を一つに結合することができます。

下には、二つのfutureがあります。 thenCombine()でこの二つのfutureが並列に行われるようにし、その結果を一つに合わせるようにします。

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApply((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

結果を見ると、順次処理されたかのように見えます。その理由は、 thenApply()が同じスレッドを使用するので、待機時間がありました。

16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!

thenApply() vs thenApplyAsync()

thenApply()の代わりに thenApplyAsync()を使用すると、他のスレッドで動作するようにすることができます。

次のコードは、上記のコードで thenApply()thenApplyAsync()に変更したコードです。

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApplyAsync((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApplyAsync((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Thread.sleep(5000);

結果を見ると、二つの作業が他のスレッドで処理され、2秒後に処理されるのを見ることができます。

16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!

anyOf()

anyOf()は、複数のCompletableFuture中高速に処理される1つの結果だけを取得メソッドです。

次のように anyOf()を使用することができ、3つのfutureの中で最も最初に処理されている1つの結果だけ thenAccept()に渡されます。 もちろん3つのfutureはすべて実行されます。 thenAccept()に渡されることが1存在するだけです。

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future1");
            return "Future1";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future2");
            return "Future2";
        });

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future3");
            return "Future3";
        });

CompletableFuture.anyOf(future1, future2, future3)
        .thenAccept(s -> log("Result: " + s));

Output:

16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2

allOf()

allOf()は、すべてのfutureの結果を受けて処理を行うことができます。

anyOf()と違っStream apiを使用して結果を処理することができます。 get()はnullを返します。

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2");

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> "Future3");

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(future1, future2, future3);

log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());

String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" + "));
log("Combined: " + combined);

Output:

16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3

async method

thenApply()thenApplyAsync()のように後ろにasyncがついたメソッドが常に存在します。 上記の例で紹介したように、同じスレッドを使用せずに、他のスレッドを使用して処理したいときにasyncがついたメソッドを使用します。

たとえば、 thenAccept()thenAcceptAsync()と呼ばれるメソッドを持っています。このように、ほとんどasyncがついたメソッドがpairに存在します。

参考

codechachaCopyright ©2019 codechacha