Java - How to use CompletableFuture

CompletableFuture is a class that implements Future and CompletionStage. Although it is a Future, it can process tasks with async without creating threads directly, and allows multiple CompletableFutures to be processed in parallel or merged. It also provides a way to handle Cancel and Error.

Let`s look at an example of CompletableFuture to see how it works.

How to use it as a Future

CompletableFuture is new CompletableFuture<Type>. Type means the type of data to be saved after the operation is completed.

You can create a CompletableFuture like this and use it like a normal Future.

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.complete("Finished");
    return null;
});

log(future.get());

In the code above, a thread is created directly with newCachedThreadPool(), and when the threads work is completed, the result is stored in a Future with complete("Finished"). When the result is saved, get()` returns a value and exits the blocked state.

For reference, log() directly implements the following methods. It prints the time and thread name together.

public void log(String msg) {
    System.out.println(LocalTime.now() + " ("
            + Thread.currentThread().getName() + ") " +  msg);
}

Output:

22:58:40.478 (main) Finished

If you already know the value, you can assign it with completedFuture() without creating a thread.

Future<String> completableFuture =
        CompletableFuture.completedFuture("Skip!");

String result = completableFuture.get();
log(result);

Output:

22:59:42.553 (main) Skip!

Exception handling for Cancel

cancle() can be called from a thread like this: At this time, since CancellationException occurs in CancellationException, the exception should be handled as follows.

CompletableFuture<String> future
        = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
    Thread.sleep(2000);
    future.cancel(false);
    return null;
});

String result = null;
try {
    result = future.get();
} catch (CancellationException e) {
    e.printStackTrace();
    result = "Canceled!";
}

log(result);

Output:

java.util.concurrent.CancellationException
	at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
	at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!

supplyAsync(), runAsync()

CompletableFuture provides supplyAsync() and runAsync() so you can async tasks without creating threads yourself.

This way you can pass arguments to Lambda to supplyAsync(). Lambda passed as an argument is processed in another thread.

CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> "future example");

log("get(): " + future.get());

If you look at the result, you can see that it is being processed in a thread other than main

15:43:02.923 (main) get(): future example

The following example tells Lambda to sleep for 2 seconds. future.get() called from main blocks for 2 seconds and receives a return value when completed.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            log("Starting....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Finished works";
        });

log("get(): " + future.get());

Output:

15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works

The usage of runAsync() is the same. supplyAsync() has a return value whereas runAsync() has no return value.

So CompletableFuture<Void>. get() will block until the result is complete, but return null.

CompletableFuture<Void> future
        = CompletableFuture.runAsync(() -> log("future example"));

log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null

If you don`t need to wait for processing to complete, you can also implement a shorter implementation like this:

CompletableFuture.runAsync(() -> log("future example"));

Exception handling

CompletableFuture may throw an Exception while processing an operation. In this case, you can handle the exception with handle().

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String name = null;
    if (name == null) {
        throw new RuntimeException("Computation error!");
    }
    return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

log(future.get());

Ouput:

15:51:35.385 (main) Hello, Stranger!

thenApply() : Perform an action with a return value

When a task is processed with supplyAsync(), it can be implemented to perform other tasks with the result.

The thenApply() method executes a Lambda with arguments and return values. Here, the argument will be the value returned by supplyAsync().

In the following example, future1 is implemented only with supplyAsync(), and future2 is implemented to perform other tasks by attaching thenApply() to future1.

CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = future1.thenApply(
        s -> s + " + Future2");

log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

Output:

15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2

The above future2 can also be defined at once as follows.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply(s -> s + " + Future2");

log("future.get(): " + future.get());

Output:

15:57:49.343 (main) future.get(): Future1 + Future2

thenApply() also has a return value, so you can apply thenApply() in succession.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenApply(s -> s + " World")
        .thenApply(s -> s + " Future");

log("future.get(): " + future.get());

Output:

16:00:00.513 (main) future.get(): Hello World Future

thenAccept() : Perform an operation with no return value

thenAccept() is similar to thenApply(). However, you can handle Lambdas that have arguments but no return values.

Here is an example using thenAccept(). Since there is no return value, thenAccept() calls CompletableFuture<Void>.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Hello");

CompletableFuture<Void> future2 = future1.thenAccept(
        s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

Looking at the result, future2 returns null.

16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null

thenCompose() : perform multiple operations sequentially

thenCompose() serves to make two CompletableFutures into one CompletableFuture like a chain. When the result of the first CompletableFuture is returned, the result is passed to the second CompletableFuture, and the operations are processed sequentially.

In the following example, thenCompose() receives a CompletableFuture composed of Lambda as an argument. where s is the String returned by supplyAsync().

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());

Output:

16:07:33.196 (main) Hello World

You can also apply thenCompose() in succession like this:

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));

log(future.get());

Output:

16:08:18.859 (main) Hello World Java

thenCombine() : Perform multiple operations simultaneously

If thenCompose() causes multiple CompletableFutures to be processed sequentially, thenCombine() causes multiple CompletableFutures to be processed in parallel. All processing is done and the results can be merged into one.

Below are two futures. thenCombine() causes these two futures to run in parallel and merges the results into one.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApply((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Looking at the results, it looks like they were processed sequentially. The reason is that thenApply() uses the same thread, so there was a waiting time.

16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!

thenApply() vs thenApplyAsync()

Use thenApplyAsync() instead of thenApplyAsync() to make it run on a different thread.

The code below is the code that changed thenApply() to thenApplyAsync() in the code above.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApplyAsync((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApplyAsync((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Thread.sleep(5000);

If you look at the result, you can see that the two tasks are processed in different threads and are processed in 2 seconds.

16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!

anyOf()

anyOf() is a method that brings only one result that is processed quickly among multiple CompletableFutures.

You can use anyOf() as follows, and out of 3 futures, only 1 result processed first is passed to thenAccept(). Of course, all three futures are executed. There is only one passed to thenAccept().

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future1");
            return "Future1";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future2");
            return "Future2";
        });

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> {
            log("starting future3");
            return "Future3";
        });

CompletableFuture.anyOf(future1, future2, future3)
        .thenAccept(s -> log("Result: " + s));

Output:

16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2

allOf()

allOf() can receive the results of all futures and process them.

Unlike anyOf(), you can use the Stream api to process the result. get() returns null.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2");

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> "Future3");

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(future1, future2, future3);

log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());

String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" + "));
log("Combined: " + combined);

Output:

16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3

async method

There are always methods with async appended to them, such as thenApply() and thenApplyAsync(). As introduced in the example above, when you want to process using a different thread instead of using the same thread, you can use the async method.

For example, thenAccept() has a method called thenAcceptAsync(). In this way, most methods with async exist as a pair.

Reference

codechachaCopyright ©2019 codechacha