CompletableFutureはFutureとCompletionStageを実装したクラスです。 Futureだ直接スレッドを作成せずにasyncにタスクを処理することができ、いくつかのCompletableFutureを並列に処理するか、マージして処理することができるようになります。 またCancel、Errorを処理することができる方法を提供します。
CompletableFutureの例を見てどのように動作するかを知ってみましょう。
Futureとして使用する方法
CompletableFutureは new CompletableFuture<Type>
のように生成することができます。 Typeは、ジョブが完了して保存されているデータのタイプを意味します。
次のようにCompletableFutureを作成して、一般的なFutureのように使用することができます。
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.complete("Finished");
return null;
});
log(future.get());
上記のコードでは、 newCachedThreadPool()
で直接スレッドを作り、そのスレッドの処理が完了すると、 complete("Finished")
結果をFutureに保存しました。
結果が格納されると、 get()
は値を戻し、blockされた状態で出てきます。
ちなみに log()
は以下のようなメソッドを直接実装しました。時間とスレッドの名前を一緒に出力してくれます。
public void log(String msg) {
System.out.println(LocalTime.now() + " ("
+ Thread.currentThread().getName() + ") " + msg);
}
Output:
22:58:40.478 (main) Finished
すでに値を知っていれば、スレッドを作成せずに completedFuture()
に値を割り当てることができます。
Future<String> completableFuture =
CompletableFuture.completedFuture("Skip!");
String result = completableFuture.get();
log(result);
Output:
22:59:42.553 (main) Skip!
Cancelの例外処理
次のようにスレッドで cancle()
が呼び出されることがあります。このとき、 get()
で CancellationException
が発生するため、以下のように例外処理をする必要があります。
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.cancel(false);
return null;
});
String result = null;
try {
result = future.get();
} catch (CancellationException e) {
e.printStackTrace();
result = "Canceled!";
}
log(result);
Output:
java.util.concurrent.CancellationException
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!
supplyAsync(), runAsync()
CompletableFutureは supplyAsync()
と runAsync()
を提供して直接スレッドを作成せずに作業をasyncに処理するようにすることができます。
このように supplyAsync()
のLambdaに引数を渡すことができます。引数として渡されたLambdaは、他のスレッドで処理がされます。
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "future example");
log("get(): " + future.get());
結果を見ると、main以外のスレッドで処理されていることがわかります。
15:43:02.923 (main) get(): future example
次の例では、Lambda 2秒間sleepするようにしました。 mainから呼び出される future.get()
は2秒間blockingされて完了すると、戻り値を受け取ります。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log("Starting....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Finished works";
});
log("get(): " + future.get());
Output:
15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works
runAsync()
も使用方法は同じです。
supplyAsync()
は戻り値がある一方で runAsync()
は戻り値がありません。
したがって CompletableFuture<Void>
に宣言する必要があります。
結果が完了するまで、 get()
はblockingますが、 null
を返します。
CompletableFuture<Void> future
= CompletableFuture.runAsync(() -> log("future example"));
log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null
処理が完了するまで待たなくても場合は、次のように短く実装することもできます。
CompletableFuture.runAsync(() -> log("future example"));
Exception handling
CompletableFutureで作業を処理する際にExceptionが発生する可能性があります。
このような場合には、 handle()
で例外を処理することができます。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String name = null;
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
log(future.get());
Ouput:
15:51:35.385 (main) Hello, Stranger!
thenApply():戻り値があるタスクを実行する
supplyAsync()
で何が処理されると、その結果を持って、他の作業も実行するように実装することができます。
thenApply()
メソッドは、引数と戻り値があるLambdaを実行します。ここで、引数は supplyAsync()
で返される値になります。
次の例を見れば、future1は supplyAsync()
でのみ実装されており、future2はfuture1に thenApply()
を付けて、他の作業も実行するように実装しました。
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = future1.thenApply(
s -> s + " + Future2");
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
Output:
15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2
上記のfuture2は次のように一度に定義することもできます。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply(s -> s + " + Future2");
log("future.get(): " + future.get());
Output:
15:57:49.343 (main) future.get(): Future1 + Future2
thenApply()
も戻り値があるので、相次いで thenApply()
を適用することができます。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + " Future");
log("future.get(): " + future.get());
Output:
16:00:00.513 (main) future.get(): Hello World Future
thenAccept():戻り値がないタスクを実行する
thenAccept()
も thenApply()
と似ています。しかし、引数が、戻り値がないLambdaを処理することができます。
以下は、 thenAccept()
を使用する例です。戻り値がないため、 thenAccept()
は CompletableFuture<Void>
を返しになります。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<Void> future2 = future1.thenAccept(
s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
結果を見ると、future2はnullを返します。
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null
thenCompose():複数のタスクを順番に実行
thenCompose()
はchainのように二つのCompletableFutureを一つのCompletableFutureで作ってくれる役割をします。
最初CompletableFutureの結果が返されると、その結果を第二CompletableFutureに伝達し、順次ジョブが処理されます。
次の例を見れば thenCompose()
はLambdaで構成されたCompletableFutureを引数として受け取ります。ここで、 s
はsupplyAsync()
で返されるStringです。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());
Output:
16:07:33.196 (main) Hello World
次のように連続的に thenCompose()
を適用することもできます。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));
log(future.get());
Output:
16:08:18.859 (main) Hello World Java
thenCombine():複数のタスクを同時に実行
thenCompose()
が複数のCompletableFutureを順次処理されるように作成した場合、 thenCombine()
は、複数のCompletableFutureを並列に処理されるようになります。
すべての処理が完了し、その結果を一つに結合することができます。
下には、二つのfutureがあります。 thenCombine()
でこの二つのfutureが並列に行われるようにし、その結果を一つに合わせるようにします。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApply((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
結果を見ると、順次処理されたかのように見えます。その理由は、 thenApply()
が同じスレッドを使用するので、待機時間がありました。
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!
thenApply() vs thenApplyAsync()
thenApply()
の代わりに thenApplyAsync()
を使用すると、他のスレッドで動作するようにすることができます。
次のコードは、上記のコードで thenApply()
を thenApplyAsync()
に変更したコードです。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApplyAsync((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApplyAsync((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Thread.sleep(5000);
結果を見ると、二つの作業が他のスレッドで処理され、2秒後に処理されるのを見ることができます。
16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!
anyOf()
anyOf()
は、複数のCompletableFuture中高速に処理される1つの結果だけを取得メソッドです。
次のように anyOf()
を使用することができ、3つのfutureの中で最も最初に処理されている1つの結果だけ thenAccept()
に渡されます。
もちろん3つのfutureはすべて実行されます。 thenAccept()
に渡されることが1存在するだけです。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
log("starting future1");
return "Future1";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
log("starting future2");
return "Future2";
});
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> {
log("starting future3");
return "Future3";
});
CompletableFuture.anyOf(future1, future2, future3)
.thenAccept(s -> log("Result: " + s));
Output:
16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2
allOf()
allOf()
は、すべてのfutureの結果を受けて処理を行うことができます。
anyOf()
と違っStream apiを使用して結果を処理することができます。 get()
はnullを返します。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2");
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "Future3");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" + "));
log("Combined: " + combined);
Output:
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3
async method
thenApply()
と thenApplyAsync()
のように後ろにasyncがついたメソッドが常に存在します。
上記の例で紹介したように、同じスレッドを使用せずに、他のスレッドを使用して処理したいときにasyncがついたメソッドを使用します。
たとえば、 thenAccept()
は thenAcceptAsync()
と呼ばれるメソッドを持っています。このように、ほとんどasyncがついたメソッドがpairに存在します。
参考
Related Posts
- Java - AtomicReference使用方法
- Java - CountDownLatchを使用する方法、および例
- Java - ScheduledThreadPoolExecutor使用方法
- Java - SummaryStatisticsの使用方法(count、min、max、average)
- Java8 - 関数型インタフェース (Functional Interface) について
- Java - String配列をint配列に変換する
- Java - ArrayList要素の値を変更する方法、replaceAll()
- Java - 2つのリストが同じかどうかを比較
- Java - 配列から特定のIndex要素を削除する3つの方法
- Java - HashMapソート、4つの方法
- Java - 文字列を配列に変換する方法
- Java - ArrayListが空であることを確認する3つの方法
- Java - ArrayListの巡回、4つの方法
- Java - ArrayListの最大値、最小 値を見つける
- Java - ArrayListの合計、平均値の計算
- Java - HashMap巡回、3つの方法
- Java - do whileとwhileの違い
- Java - Lambda式と関数型インタフェース
- Java - List empty(null)チェック、3つの方法
- Java - ArrayListの初期化、4つの方法
- Java - Stream.reduce()の使い方と例
- Java - 2つのマップを結合する(merge、putAll)
- Java - java.util.Dateをjava.sql.Dateに変換する
- Java - ArrayListをStringに変換する
- Java - ClassNotFoundExceptionの発生原因と解決策
- Java - 非静的メソッドは静的コンテキストから参照できません
- Java - NoSuchMethodErrorの原因と解決策
- Java - JSONライブラリを使用する方法(JSONObject、JSONArray)
- Java - byte[]配列をFileに保存
- Java - byte[]の配列をStringに変換
- Java - ファイルのアクセス権を確認し、変更
- Java - 一時フォルダ(Temp directory)パスを取得する
- Javaでシェルスクリプトを実行
- Java - Streamを配列に変換する
- Java - リスト重複排除、2つの方法