CompletableFuture는 Future와 CompletionStage를 구현한 클래스입니다. Future이지만 직접 쓰레드를 생성하지 않고 async로 작업을 처리할 수 있고, 여러 CompletableFuture를 병렬로 처리하거나, 병합하여 처리할 수 있게 합니다. 또한 Cancel, Error를 처리할 수 있는 방법을 제공합니다.
CompletableFuture의 예제를 보면서 어떻게 동작하는지 알아보겠습니다.
Future로 사용하는 방법
CompletableFuture는 new CompletableFuture<Type>
처럼 생성할 수 있습니다. Type은 작업이 완료되어 저장되는 데이터의 타입을 의미합니다.
다음과 같이 CompletableFuture를 생성하여 일반적인 Future처럼 사용할 수 있습니다.
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.complete("Finished");
return null;
});
log(future.get());
위의 코드에서는 newCachedThreadPool()
으로 직접 쓰레드를 만들었고, 그 쓰레드의 작업이 완료되면 complete("Finished")
으로 결과를 Future에 저장하였습니다.
결과가 저장되면 get()
은 값을 리턴하며 block된 상태에서 빠져나옵니다.
참고로 log()
는 아래와 같은 메소드를 직접구현하였습니다. 시간과 쓰레드의 이름을 함께 출력해 줍니다.
public void log(String msg) {
System.out.println(LocalTime.now() + " ("
+ Thread.currentThread().getName() + ") " + msg);
}
Output:
22:58:40.478 (main) Finished
이미 값을 알고 있다면 쓰레드를 만들지 않고 completedFuture()
으로 값을 할당할 수 있습니다.
Future<String> completableFuture =
CompletableFuture.completedFuture("Skip!");
String result = completableFuture.get();
log(result);
Output:
22:59:42.553 (main) Skip!
Cancel에 대한 예외 처리
다음과 같이 쓰레드에서 cancle()
이 호출될 수 있습니다. 이 때, get()
에서 CancellationException
이 발생하기 때문에 아래와 같이 예외처리를 해야 합니다.
CompletableFuture<String> future
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(2000);
future.cancel(false);
return null;
});
String result = null;
try {
result = future.get();
} catch (CancellationException e) {
e.printStackTrace();
result = "Canceled!";
}
log(result);
Output:
java.util.concurrent.CancellationException
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
at CompletableFutureExample.lambda$ex3$1(CompletableFutureExample.java:47)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23:02:53.074 (main) Canceled!
supplyAsync(), runAsync()
CompletableFuture는 supplyAsync()
와 runAsync()
를 제공하여 직접 쓰레드를 생성하지 않고 작업을 async로 처리하도록 할 수 있습니다.
이런식으로 supplyAsync()
에 Lambda로 인자를 전달할 수 있습니다. 인자로 전달된 Lambda는 다른 쓰레드에서 처리가 됩니다.
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "future example");
log("get(): " + future.get());
결과를 보면 main이 아닌 다른 쓰레드에서 처리되는 것을 알 수 있습니다.
15:43:02.923 (main) get(): future example
다음 예제는 Lambda에 2초간 sleep하도록 하였습니다. main에서 호출되는 future.get()
은 2초간 blocking되고 완료되면 리턴값을 받습니다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log("Starting....");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Finished works";
});
log("get(): " + future.get());
Output:
15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works
runAsync()
도 사용방법은 동일합니다.
supplyAsync()
는 리턴 값이 있는 반면에 runAsync()
는 리턴값이 없습니다.
따라서 CompletableFuture<Void>
으로 선언을 해야 합니다.
결과가 완료될 때까지 get()
은 blocking되지만 null
을 리턴합니다.
CompletableFuture<Void> future
= CompletableFuture.runAsync(() -> log("future example"));
log("get(): " + future.get());
15:47:01.328 (ForkJoinPool.commonPool-worker-1) future example
15:47:01.328 (main) get(): null
처리가 완료될 때까지 기다리지 않아도 된다면 다음과 같이 짧게 구현할 수도 있습니다.
CompletableFuture.runAsync(() -> log("future example"));
Exception handling
CompletableFuture에서 작업을 처리하는 중에 Exception이 발생할 수 있습니다.
이런 경우, handle()
로 예외를 처리할 수 있습니다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String name = null;
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
log(future.get());
Ouput:
15:51:35.385 (main) Hello, Stranger!
thenApply() : 리턴 값이 있는 작업 수행
supplyAsync()
으로 어떤 작업이 처리되면, 그 결과를 가지고 다른 작업도 수행하도록 구현할 수 있습니다.
thenApply()
메소드는 인자와 리턴 값이 있는 Lambda를 수행합니다. 여기서 인자는 supplyAsync()
에서 리턴되는 값이 됩니다.
다음 예제를 보시면, future1은 supplyAsync()
으로만 구현되어있고, future2는 future1에 thenApply()
를 붙여 다른 작업도 수행하도록 구현하였습니다.
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = future1.thenApply(
s -> s + " + Future2");
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
Output:
15:56:26.203 (main) future1.get(): Future1
15:56:26.203 (main) future2.get(): Future1 + Future2
위의 future2는 다음과 같이 한번에 정의할 수도 있습니다.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply(s -> s + " + Future2");
log("future.get(): " + future.get());
Output:
15:57:49.343 (main) future.get(): Future1 + Future2
thenApply()
또한 리턴값이 있기 때문에, 연달아 thenApply()
를 적용할 수 있습니다.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(s -> s + " Future");
log("future.get(): " + future.get());
Output:
16:00:00.513 (main) future.get(): Hello World Future
thenAccept() : 리턴 값이 없는 작업 수행
thenAccept()
도 thenApply()
와 비슷합니다. 하지만, 인자는 있지만 리턴값이 없는 Lambda를 처리할 수 있습니다.
다음은 thenAccept()
를 사용하는 예제입니다. 리턴 값이 없기 때문에 thenAccept()
는 CompletableFuture<Void>
를 리턴하게 됩니다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<Void> future2 = future1.thenAccept(
s -> log(s + " World"));
log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());
결과를 보면 future2는 null을 리턴합니다.
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null
thenCompose() : 여러 작업을 순차적으로 수행
thenCompose()
는 chain처럼 두개의 CompletableFuture를 하나의 CompletableFuture으로 만들어주는 역할을 합니다.
첫번째 CompletableFuture의 결과가 리턴되면 그 결과를 두번째 CompletableFuture으로 전달하며, 순차적으로 작업이 처리됩니다.
다음 예제를 보시면 thenCompose()
는 Lambda로 구성된 CompletableFuture를 인자로 받습니다. 여기서 s
는 supplyAsync()
에서 리턴되는 String입니다.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());
Output:
16:07:33.196 (main) Hello World
다음과 같이 연속적으로 thenCompose()
를 적용할 수도 있습니다.
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Java"));
log(future.get());
Output:
16:08:18.859 (main) Hello World Java
thenCombine() : 여러 작업을 동시에 수행
thenCompose()
가 여러개의 CompletableFuture를 순차적으로 처리되도록 만들었다면, thenCombine()
는 여러 CompletableFuture를 병렬로 처리되도록 만듭니다.
모든 처리가 완료되고 그 결과를 하나로 합칠 수 있습니다.
아래에는 두개의 future가 있습니다. thenCombine()
으로 이 두개의 future가 병렬로 진행되도록 하고 그 결과를 하나로 합치도록 합니다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApply((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApply((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
결과를 보면, 순차적으로 처리된 것처럼 보입니다. 그 이유는 thenApply()
가 동일한 쓰레드를 사용하기 때문에 대기하는 시간이 있었습니다.
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!
thenApply() vs thenApplyAsync()
thenApply()
대신에 thenApplyAsync()
를 사용하면 다른 쓰레드에서 동작하도록 만들 수 있습니다.
아래 코드는 위의 코드에서 thenApply()
를 thenApplyAsync()
로 변경한 코드입니다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1")
.thenApplyAsync((s) -> {
log("Starting future1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2")
.thenApplyAsync((s) -> {
log("Starting future2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + "!";
});
future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
.thenAccept((s) -> log(s));
Thread.sleep(5000);
결과를 보면 두개의 작업이 다른 쓰레드에서 처리되어 2초만에 처리되는 것을 볼 수 있습니다.
16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!
anyOf()
anyOf()
는 여러개의 CompletableFuture 중에서 빨리 처리되는 1개의 결과만을 가져오는 메소드입니다.
다음과 같이 anyOf()
를 사용할 수 있으며, 3개의 future 중에 가장 먼저 처리되는 1개의 결과만 thenAccept()
으로 전달됩니다.
물론 3개의 future는 모두 실행이 됩니다. thenAccept()
에 전달되는 것이 1개일 뿐입니다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
log("starting future1");
return "Future1";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
log("starting future2");
return "Future2";
});
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> {
log("starting future3");
return "Future3";
});
CompletableFuture.anyOf(future1, future2, future3)
.thenAccept(s -> log("Result: " + s));
Output:
16:19:56.826 (ForkJoinPool.commonPool-worker-2) starting future2
16:19:56.826 (ForkJoinPool.commonPool-worker-1) starting future1
16:19:56.826 (ForkJoinPool.commonPool-worker-3) starting future3
16:19:56.826 (ForkJoinPool.commonPool-worker-2) Result: Future2
allOf()
allOf()
는 모든 future의 결과를 받아서 처리를 할 수 있습니다.
anyOf()
와 다르게 Stream api를 사용하여 결과를 처리할 수 있습니다. get()
은 null을 리턴합니다.
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Future1");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Future2");
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "Future3");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" + "));
log("Combined: " + combined);
Output:
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3
async method
thenApply()
와 thenApplyAsync()
처럼 뒤에 async가 붙은 메소드들이 항상 존재합니다.
위의 예제에서 소개한 것처럼 동일한 쓰레드를 사용하지 않고 다른 쓰레드를 사용하여 처리하고 싶을 때 async가 붙은 메소드를 사용하면 됩니다.
예를 들어, thenAccept()
는 thenAcceptAsync()
라는 메소드를 갖고 있습니다. 이런식으로 대부분 async가 붙은 메소드들이 pair로 존재합니다.
참고
Related Posts
- Java - Unsupported class file major version 61 에러
- Java - String.matches()로 문자열 패턴 확인 및 다양한 예제 소개
- Java - 문자열 공백제거 (trim, replace)
- Java - replace()와 replaceAll()의 차이점
- Java - ArrayList 초기화, 4가지 방법
- Java - 배열 정렬(Sorting) (오름차순, 내림차순)
- Java - 문자열(String)을 비교하는 방법 (==, equals, compare)
- Java - StringBuilder 사용 방법, 예제
- Java - 로그 출력, 파일 저장 방법 (Logger 라이브러리)
- Java IllegalArgumentException 의미, 발생 이유
- Java - NullPointerException 원인, 해결 방법
- Seleninum의 ConnectionFailedException: Unable to establish websocket connection 해결
- Java - compareTo(), 객체 크기 비교
- Java - BufferedWriter로 파일 쓰기
- Java - BufferedReader로 파일 읽기
- Java charAt() 함수 알아보기
- Java - BigInteger 범위, 비교, 연산, 형변환
- Java contains()로 문자(대소문자 X) 포함 확인
- Java - Set(HashSet)를 배열로 변환
- Java - 문자열 첫번째 문자, 마지막 문자 확인
- Java - 문자열 한글자씩 자르기
- Java - 문자열 단어 개수 가져오기
- Java - 1초마다 반복 실행
- Java - 배열을 Set(HashSet)로 변환
- Java - 여러 Set(HashSet) 합치기
- Java - 명령행 인자 입력 받기
- Java - 리스트 역순으로 순회, 3가지 방법
- Java - 특정 조건으로 리스트 필터링, 3가지 방법
- Java - HashMap 모든 요소들의 합계, 평균 계산
- Java - 특정 조건으로 HashMap 필터링
- Java - 싱글톤(Singleton) 패턴 구현
- Java - 숫자 왼쪽에 0으로 채우기
- Java - String 배열 초기화 방법
- Java - 정렬된 순서로 Map(HashMap) 순회
- Java - HashMap에서 key, value 가져오기