HOME > java > concurrency

Java - FixedThreadPool(ThreadPoolExecutor) 사용 방법

By JS | 09 Nov 2019

FixedThreadPool은 고정된 크기의 쓰레드풀을 말합니다. 다음처럼 Executors의 newFixedThreadPool() 메소드를 통해서 고정된 크기의 쓰레드풀을 생성할 수 있습니다.

// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ExecutorService는 인터페이스이며 ThreadPoolExecutor는 이 인터페이스를 구현하였습니다. 그렇기 때문에 아래와 같은 방법으로 사용할 수 있습니다.

ExecutorService executor = Executors.newFixedThreadPool(4);
ThreadPoolExecutor executor1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

우리는 이 쓰레드풀에 여러 Task들을 추가하여 처리하도록 만들 수 있습니다. 쓰레드는 이미 생성되어있기 때문에 작업을 처리할 때마다 쓰레드를 만들고 종료하는 작업을 할 필요가 없습니다.

고정된 쓰레드풀을 사용하는 예제

다음은 FixedThreadPool의 가장 기본적인 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // ThreadPoolExecutor에 Task를 예약하면, 여유가 있을 때 Task를 수행합니다.
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // 더이상 ThreadPoolExecutor에 Task를 추가할 수 없습니다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        executor.shutdown();
    }
}
  • execute()로 Task를 예약할 수 있으며 쓰레드풀의 여유가 있을 때 Task를 수행합니다. 인자로 Runnable을 구현한 객체를 전달할 수 있으며, 익명클래스나, 람다식으로 전달할 수도 있습니다.
  • shutdown()를 호출하면 더 이상 Task를 받지 않습니다. 처리중인 Task를 모두 완료한 뒤 쓰레드풀을 종료합니다.

결과

09:33:38.221 Execute task 0
09:33:38.228 Execute task 1
09:33:38.228 Doing job 0
09:33:38.228 Done job 0
09:33:38.228 Execute task 2
09:33:38.228 Doing job 1
09:33:38.228 Done job 1
09:33:38.228 Execute task 3
09:33:38.228 Doing job 2
09:33:38.228 Done job 2
09:33:38.228 Execute task 4
09:33:38.228 Doing job 3
09:33:38.228 Done job 3
09:33:38.228 Execute task 5
09:33:38.228 Doing job 4
09:33:38.228 Execute task 6
09:33:38.228 Doing job 5
09:33:38.228 Doing job 6
09:33:38.229 Done job 5
09:33:38.228 Done job 4
09:33:38.229 Done job 6
09:33:38.228 Execute task 7
09:33:38.229 Execute task 8
09:33:38.229 Doing job 7
09:33:38.229 Execute task 9
09:33:38.229 Done job 7
09:33:38.229 Doing job 8
09:33:38.229 Doing job 9
09:33:38.229 Done job 8
09:33:38.229 Done job 9

모든 Task가 완료될 때까지 기다리기

shutdown()을 호출하면 더 이상 Task를 받지 않습니다. 하지만 이전에 추가된 Task는 수행됩니다. 모든 Task가 완료될 때까지 기다리려면 다음과 같이 awaitTermination()을 사용해야 합니다.

public class FixedSizeThreadPoolExecutorExample1 {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                sleepSec(3);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // 더이상 ThreadPoolExecutor에 Task를 추가할 수 없습니다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        executor.shutdown();

        // shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있습니다.
        // Timeout을 20초 설정하고 완료되기를 기다립니다.
        // 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴합니다.
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");

            // 모든 Task를 강제 종료합니다.
            executor.shutdownNow();
        }
    }

    private static void sleepSec(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • awaitTermination(): 설정한 시간동안 Task가 완료되기를 기다립니다.
  • shutdownNow() : 예약된 Task를 취소하고, 실행 중인 모든 Task를 강제로 종료시킵니다.

결과

09:38:54.556 Execute task 0
09:38:54.565 Execute task 1
09:38:54.565 Doing job 0
09:38:54.565 Execute task 2
09:38:54.566 Doing job 1
09:38:54.566 Execute task 3
09:38:54.566 Doing job 2
09:38:54.566 Execute task 4
09:38:54.566 Execute task 5
09:38:54.566 Doing job 3
09:38:54.566 Execute task 6
09:38:54.566 Execute task 7
09:38:54.566 Execute task 8
09:38:54.566 Execute task 9
09:38:57.566 Done job 0
09:38:57.566 Done job 1
09:38:57.566 Doing job 4
09:38:57.566 Done job 2
09:38:57.566 Doing job 5
09:38:57.566 Done job 3
09:38:57.566 Doing job 6
09:38:57.567 Doing job 7
09:39:00.566 Done job 4
09:39:00.566 Doing job 8
09:39:00.567 Done job 5
09:39:00.567 Doing job 9
09:39:00.567 Done job 6
09:39:00.567 Done job 7
09:39:03.567 Done job 8
09:39:03.567 Done job 9
09:39:03.568 All jobs are terminated

위의 코드에서 awaitTermination()의 Timeout을 5초로 변경했을 때, 실행 결과

09:37:49.566 Execute task 0
09:37:49.574 Execute task 1
09:37:49.574 Doing job 0
09:37:49.574 Execute task 2
09:37:49.574 Doing job 1
09:37:49.574 Execute task 3
09:37:49.574 Doing job 2
09:37:49.579 Execute task 4
09:37:49.579 Doing job 3
09:37:49.579 Execute task 5
09:37:49.579 Execute task 6
09:37:49.579 Execute task 7
09:37:49.579 Execute task 8
09:37:49.579 Execute task 9
09:37:52.574 Done job 0
09:37:52.574 Done job 1
09:37:52.574 Doing job 4
09:37:52.574 Doing job 5
09:37:52.574 Done job 2
09:37:52.575 Doing job 6
09:37:52.579 Done job 3
09:37:52.580 Doing job 7
09:37:54.581 some jobs are not terminated
09:37:54.584 Done job 7
java.lang.InterruptedException: sleep interrupted
09:37:54.585 Done job 6
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
09:37:54.586 Done job 4
09:37:54.586 Done job 5

사용가능한 코어 개수 만큼 쓰레드풀을 만드는 방법

쓰레드풀 개수를 정할 때 최대한 기준이 애매할 수 있습니다. availableProcessors()는 Java가 실행 중인 JVM 환경에서 사용가능한 코어 개수를 리턴해줍니다.

package concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class FixedSizeThreadPoolExecutorExample3 {

    public static void main(String[] args) {

        final int maxCore = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxCore);
        System.out.println("Max thread pool size: " + executor.getMaximumPoolSize());

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println("Execute task " + jobId);
            executor.execute(() -> {
                System.out.println("Doing a job " + jobId);
            });
        }
        executor.shutdown();
    }
}

결과

Max thread pool size: 8
Execute task 0
Execute task 1
Doing a job 0
Execute task 2
Doing a job 1
Execute task 3
Doing a job 2
Execute task 4
Doing a job 3
Execute task 5
Doing a job 4
Execute task 6
Doing a job 5
Execute task 7
Doing a job 6
Doing a job 7
Execute task 8
Execute task 9
Doing a job 8
Doing a job 9

Future로 Task의 결과 리턴 받기

이전 예제에서는 Task를 Runnable로 전달하였습니다. 결과를 리턴할 수 있는 Callable 객체를 전달할 수도 있습니다. submit()은 인자로 Callable을 받고 Future를 리턴합니다. Future를 통해 리턴값을 받을 수 있습니다.

다음은 Callable Task를 전달하고, 결과로 String을 받는 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample4 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final List<Future<String>> futures = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // submit()은 인자로 Callable도 받으며 Future를 리턴합니다.
            Future<String> result = executor.submit(() -> {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(LocalTime.now() + " Done task " + jobId);
                return "finished job " + jobId;
            });
            futures.add(result);
        }

        // future.get()은 Task의 result가 리턴될 때까지 기다립니다.
        for (Future<String> future : futures) {
            String result = future.get();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}

Future.get()은 Future에 결과가 리턴될 때까지 기다립니다. 위의 for문은 첫번째 future의 값이 리턴되면 두번째 future 값이 리턴되기를 기다립니다. 예를 들어, 첫번째 Task가 10초 걸리고, 두번째 Task가 1초걸린다면, 첫번째에서 계속 기다리기 때문에 두번째의 결과를 늦게 가져오게 됩니다.

만약 Task가 완료되는 순서대로 결과를 처리하고 싶다면 BlockingQueue를 사용하여 구현할 수 있습니다.

결과

12:24:36.948 Execute task 0
12:24:36.958 Execute task 1
12:24:36.958 Execute task 2
12:24:36.959 Execute task 3
12:24:36.959 Execute task 4
12:24:36.959 Execute task 5
12:24:36.959 Execute task 6
12:24:36.959 Execute task 7
12:24:36.959 Execute task 8
12:24:36.959 Execute task 9
12:24:38.959 Done task 0
12:24:38.959 Done task 1
12:24:38.959 Done task 2
12:24:38.959 finished job 0
12:24:38.959 Done task 3
12:24:38.959 finished job 1
12:24:38.959 finished job 2
12:24:38.959 finished job 3
12:24:40.959 Done task 4
12:24:40.959 Done task 6
12:24:40.959 Done task 5
12:24:40.959 Done task 7
12:24:40.959 finished job 4
12:24:40.960 finished job 5
12:24:40.960 finished job 6
12:24:40.960 finished job 7
12:24:42.959 Done task 8
12:24:42.959 Done task 9
12:24:42.960 finished job 8
12:24:42.960 finished job 9

BlockingQueue로 결과 리턴 받기

Future를 사용하지 않고 BlockingQueue를 이용하여 결과를 받을 수 있습니다. BlockingQueue는 동시성을 보장하기 때문에 멀티쓰레드에서 접근해도 문제가 없습니다.

아래 예제에서는 Callable을 사용하지 않고 Runnable을 사용합니다. Task는 결과를 리턴할 필요가 없기 때문입니다.

그리고 Task 0은 10초를 sleep하고, Task9는 1초를 sleep하도록 하였습니다. 이처럼 먼저 수행되는 Task가 더 오래 걸리도록 만들었습니다. 이렇게 하는 이유는 Task가 완료되는 순서대로 결과를 처리할 수 있다는 것을 보여주기 위해서입니다.

다음은 그 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample5 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            final int sleepSec = 10 - i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // Result를 BlockingQueue에 저장합니다.
            executor.submit(() -> {
                System.out.println(LocalTime.now() + " Doing Task " + jobId + ", sleepSec: " + sleepSec);
                try {
                    TimeUnit.SECONDS.sleep(sleepSec);
                    String result = "finished job " + jobId;
                    queue.put(result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // take()는 Queue에 아이템이 추가될 때까지 기다렸다가 리턴합니다.
        for (int i = 0; i < 10; i++) {
            String result = queue.take();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}
12:26:09.009 Execute task 0
12:26:09.018 Execute task 1
12:26:09.018 Doing Task 0, sleepSec: 10
12:26:09.018 Execute task 2
12:26:09.018 Doing Task 1, sleepSec: 9
12:26:09.018 Execute task 3
12:26:09.018 Doing Task 2, sleepSec: 8
12:26:09.018 Execute task 4
12:26:09.018 Doing Task 3, sleepSec: 7
12:26:09.018 Execute task 5
12:26:09.018 Execute task 6
12:26:09.018 Execute task 7
12:26:09.018 Execute task 8
12:26:09.018 Execute task 9
12:26:16.019 Doing Task 4, sleepSec: 6
12:26:16.019 finished job 3
12:26:17.018 finished job 2
12:26:17.018 Doing Task 5, sleepSec: 5
12:26:18.018 Doing Task 6, sleepSec: 4
12:26:18.018 finished job 1
12:26:19.018 finished job 0
12:26:19.018 Doing Task 7, sleepSec: 3
12:26:22.018 Doing Task 8, sleepSec: 2
12:26:22.018 finished job 7
12:26:22.019 Doing Task 9, sleepSec: 1
12:26:22.019 finished job 6
12:26:22.019 finished job 4
12:26:22.019 finished job 5
12:26:23.019 finished job 9
12:26:24.019 finished job 8

참고