CompletableFuture is a class that implements Future and CompletionStage. Although it is a Future, it can process tasks with async without creating threads directly, and allows multiple CompletableFutures to be processed in parallel or merged. It also provides a way to handle Cancel and Error.
Let`s look at an example of CompletableFuture to see how it works.
How to use it as a Future
CompletableFuture is new CompletableFuture<Type>
. Type means the type of data to be saved after the operation is completed.
You can create a CompletableFuture like this and use it like a normal Future.
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.complete("Finished");
return null;
});
log(future.get());
In the code above, a thread is created directly with newCachedThreadPool()
, and when the threads work is completed, the result is stored in a Future with
complete("Finished").
When the result is saved,
get()` returns a value and exits the blocked state.
For reference, log()
directly implements the following methods. It prints the time and thread name together.
public void log(String msg) {
System.out.println(LocalTime.now() + " ("
+ Thread.currentThread().getName() + ") " + msg);
}
Output:
22:58:40.478 (main) Finished
If you already know the value, you can assign it with completedFuture()
without creating a thread.
Future<String> completableFuture =
CompletableFuture.completedFuture("Skip!");
String result = completableFuture.get();
log(result);
Output:
22:59:42.553 (main) Skip!
Exception handling for Cancel
cancle()
can be called from a thread like this: At this time, since CancellationException
occurs in CancellationException
, the exception should be handled as follows.
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.cancel(false);
return null;
});
String result = null;
try {
result = future.get();
} catch (CancellationException e) {
e.printStackTrace();
result = "Canceled!";
}
log(result);
Output:
java.util.concurrent.CancellationException
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!
supplyAsync(), runAsync()
CompletableFuture provides supplyAsync()
and runAsync()
so you can async tasks without creating threads yourself.
This way you can pass arguments to Lambda to supplyAsync()
. Lambda passed as an argument is processed in another thread.
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "future example");
log("get(): " + future.get());
If you look at the result, you can see that it is being processed in a thread other than main
15:43:02.923 (main) get(): future example
The following example tells Lambda to sleep for 2 seconds. future.get()
called from main blocks for 2 seconds and receives a return value when completed.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log("Starting....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Finished works";
});
log("get(): " + future.get());
Output:
15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works
The usage of runAsync()
is the same.
supplyAsync()
has a return value whereas runAsync()
has no return value.
So CompletableFuture<Void>
.
get()
will block until the result is complete, but return null
.
CompletableFuture<Void> future
= CompletableFuture.runAsync(() -> log("future example"));
log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null
If you don`t need to wait for processing to complete, you can also implement a shorter implementation like this:
CompletableFuture.runAsync(() -> log("future example"));
Exception handling
CompletableFuture may throw an Exception while processing an operation.
In this case, you can handle the exception with handle()
.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String name = null;
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
log(future.get());
Ouput:
15:51:35.385 (main) Hello, Stranger!
thenApply() : Perform an action with a return value
When a task is processed with supplyAsync()
, it can be implemented to perform other tasks with the result.
The thenApply()
method executes a Lambda with arguments and return values. Here, the argument will be the value returned by supplyAsync()
.
In the following example, future1 is implemented only with supplyAsync()
, and future2 is implemented to perform other tasks by attaching thenApply()
to future1.
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = future1.thenApply(
s -> s + " + Future2");
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
Output:
15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2
The above future2 can also be defined at once as follows.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply(s -> s + " + Future2");
log("future.get(): " + future.get());
Output:
15:57:49.343 (main) future.get(): Future1 + Future2
thenApply()
also has a return value, so you can apply thenApply()
in succession.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + " Future");
log("future.get(): " + future.get());
Output:
16:00:00.513 (main) future.get(): Hello World Future
thenAccept() : Perform an operation with no return value
thenAccept()
is similar to thenApply()
. However, you can handle Lambdas that have arguments but no return values.
Here is an example using thenAccept()
. Since there is no return value, thenAccept()
calls CompletableFuture<Void>
.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<Void> future2 = future1.thenAccept(
s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
Looking at the result, future2 returns null.
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null
thenCompose() : perform multiple operations sequentially
thenCompose()
serves to make two CompletableFutures into one CompletableFuture like a chain.
When the result of the first CompletableFuture is returned, the result is passed to the second CompletableFuture, and the operations are processed sequentially.
In the following example, thenCompose()
receives a CompletableFuture composed of Lambda as an argument. where s
is the String returned by supplyAsync()
.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());
Output:
16:07:33.196 (main) Hello World
You can also apply thenCompose()
in succession like this:
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));
log(future.get());
Output:
16:08:18.859 (main) Hello World Java
thenCombine() : Perform multiple operations simultaneously
If thenCompose()
causes multiple CompletableFutures to be processed sequentially, thenCombine()
causes multiple CompletableFutures to be processed in parallel.
All processing is done and the results can be merged into one.
Below are two futures. thenCombine()
causes these two futures to run in parallel and merges the results into one.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApply((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Looking at the results, it looks like they were processed sequentially. The reason is that thenApply()
uses the same thread, so there was a waiting time.
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!
thenApply() vs thenApplyAsync()
Use thenApplyAsync()
instead of thenApplyAsync()
to make it run on a different thread.
The code below is the code that changed thenApply()
to thenApplyAsync()
in the code above.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApplyAsync((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApplyAsync((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Thread.sleep(5000);
If you look at the result, you can see that the two tasks are processed in different threads and are processed in 2 seconds.
16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!
anyOf()
anyOf()
is a method that brings only one result that is processed quickly among multiple CompletableFutures.
You can use anyOf()
as follows, and out of 3 futures, only 1 result processed first is passed to thenAccept()
.
Of course, all three futures are executed. There is only one passed to thenAccept()
.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
log("starting future1");
return "Future1";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
log("starting future2");
return "Future2";
});
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> {
log("starting future3");
return "Future3";
});
CompletableFuture.anyOf(future1, future2, future3)
.thenAccept(s -> log("Result: " + s));
Output:
16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2
allOf()
allOf()
can receive the results of all futures and process them.
Unlike anyOf()
, you can use the Stream api to process the result. get()
returns null.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2");
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "Future3");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" + "));
log("Combined: " + combined);
Output:
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3
async method
There are always methods with async appended to them, such as thenApply()
and thenApplyAsync()
.
As introduced in the example above, when you want to process using a different thread instead of using the same thread, you can use the async method.
For example, thenAccept()
has a method called thenAcceptAsync()
. In this way, most methods with async exist as a pair.
Reference
Related Posts
- Java - Remove items from List while iterating
- Java - How to find key by value in HashMap
- Java - Update the value of a key in HashMap
- Java - How to put quotes in a string
- Java - How to put a comma (,) after every 3 digits
- BiConsumer example in Java 8
- Java 8 - Consumer example
- Java 8 - BinaryOperator example
- Java 8 - BiPredicate Example
- Java 8 - Predicate example
- Java 8 - Convert Stream to List
- Java 8 - BiFunction example
- Java 8 - Function example
- Java - Convert List to Map
- Exception testing in JUnit
- Hamcrest Collections Matcher
- Hamcrest equalTo () Matcher
- AAA pattern of unit test (Arrange/Act/Assert)
- Hamcrest Text Matcher
- Hamcrest Custom Matcher
- Why Junit uses Hamcrest
- Java - ForkJoinPool
- Java - How to use Futures
- Java - Simple HashTable implementation
- Java - Create a file in a specific path
- Java - Mockito의 @Mock, @Spy, @Captor, @InjectMocks
- Java - How to write test code using Mockito
- Java - Synchronized block
- Java - How to decompile a ".class" file into a Java file (jd-cli decompiler)
- Java - How to generate a random number
- Java - Calculate powers, Math.pow()
- Java - Calculate the square root, Math.sqrt()
- Java - How to compare String (==, equals, compare)
- Java - Calculate String Length
- Java - case conversion & comparison insensitive (toUpperCase, toLowerCase, equalsIgnoreCase)