Java - ThreadPoolExecutor로 제한된 개수의 쓰레드풀 사용

Java에서 Executors를 사용하여 제한된 개수의 쓰레드 풀(Fixed Thread Pool)을 생성하는 방법을 소개합니다.

1. Fixed thread pool

Fixed Thread Pool은 정해진 개수의 쓰레드가 만들어져 있는 Thread Pool을 의미합니다. 즉, 쓰레드풀에서 사용할 수 있는 최대 쓰레드 개수가 제한되어있기 때문에, 실수로 쓰레드를 많이 생성하여 리소스를 낭비하는 일은 발생하지 않습니다.

2. Executors.newFixedThreadPool()로 Fixed Thread Pool 생성

Executors.newFixedThreadPool()ThreadPoolExecutor를 리턴하며, 이 객체는 제한된 개수의 쓰레드 풀을 관리합니다.

// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

다음은 4개의 쓰레드를 갖고 있는 Thrad pool을 생성하는 예제입니다. ExecutorService는 인터페이스이며 ThreadPoolExecutor는 이 인터페이스를 구현한 클래스입니다.

ExecutorService executor = Executors.newFixedThreadPool(4);

아래와 같이 직접 ThreadPoolExecutor로 형변환하여 사용할 수도 있습니다.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

이렇게 만들어진 ExecutorService 객체를 이용하여 여러 작업들을 4개의 쓰레드에서 실행되도록 만들 수 있습니다. 4개의 쓰레드들은 어떤 작업을 수행 완료한 뒤에 종료되지는 않으며, 다음에 요청될 작업을 기다립니다.

3. Fixed Thread Pool을 사용하는 예제

다음은 ThreadPoolExecutor의 가장 기본적인 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // ThreadPoolExecutor에 Task를 예약하면, 여유가 있을 때 Task를 수행합니다.
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        // shutdown() 이후에는 Task를 추가할 수 없습니다.
        executor.shutdown();
    }
}
  • execute()로 Task를 예약할 수 있으며 쓰레드풀의 여유가 있을 때 Task를 수행합니다. 인자로 Runnable을 구현한 객체를 전달할 수 있으며, 익명클래스나, 람다식으로 전달할 수도 있습니다.
  • shutdown()를 호출하면 더 이상 Task를 받지 않습니다. 처리중인 Task를 모두 완료한 뒤 쓰레드풀을 종료합니다.

Output:

09:33:38.221 Execute task 0
09:33:38.228 Execute task 1
09:33:38.228 Doing job 0
09:33:38.228 Done job 0
09:33:38.228 Execute task 2
09:33:38.228 Doing job 1
09:33:38.228 Done job 1
09:33:38.228 Execute task 3
09:33:38.228 Doing job 2
09:33:38.228 Done job 2
09:33:38.228 Execute task 4
09:33:38.228 Doing job 3
09:33:38.228 Done job 3
09:33:38.228 Execute task 5
09:33:38.228 Doing job 4
09:33:38.228 Execute task 6
09:33:38.228 Doing job 5
09:33:38.228 Doing job 6
09:33:38.229 Done job 5
09:33:38.228 Done job 4
09:33:38.229 Done job 6
09:33:38.228 Execute task 7
09:33:38.229 Execute task 8
09:33:38.229 Doing job 7
09:33:38.229 Execute task 9
09:33:38.229 Done job 7
09:33:38.229 Doing job 8
09:33:38.229 Doing job 9
09:33:38.229 Done job 8
09:33:38.229 Done job 9

4. shutdown() 수행 시, 모든 작업이 완료될 때까지 대기

shutdown()을 호출하면 쓰레드풀은 종료됩니다. 하지만 만약 아직 완료되지 않은 작업(Task)이 있다면 모두 완료되고 종료됩니다.

shutdown()는 비동기적으로 종료되기 때문에, 다른 작업 완료 후 쓰레드가 종료되는 것을 기다리려면 awaitTermination()를 호출해야 합니다.

참고로, shutdown() 호출 후에 쓰레드풀은 더 이상 작업(Task)을 요청받지 않습니다. 이전에 요청된 작업만 수행합니다.

public class FixedSizeThreadPoolExecutorExample1 {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                sleepSec(3);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // 더이상 ThreadPoolExecutor에 Task를 추가할 수 없습니다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        executor.shutdown();

        // shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있습니다.
        // Timeout을 20초 설정하고 완료되기를 기다립니다.
        // 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴합니다.
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");

            // 모든 Task를 강제 종료합니다.
            executor.shutdownNow();
        }
    }

    private static void sleepSec(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • awaitTermination(): 설정한 시간동안 Task가 완료되기를 기다립니다.
  • shutdownNow() : 예약된 Task를 취소하고, 실행 중인 모든 Task를 강제로 종료시킵니다.

Output:

09:38:54.556 Execute task 0
09:38:54.565 Execute task 1
09:38:54.565 Doing job 0
09:38:54.565 Execute task 2
09:38:54.566 Doing job 1
09:38:54.566 Execute task 3
09:38:54.566 Doing job 2
09:38:54.566 Execute task 4
09:38:54.566 Execute task 5
09:38:54.566 Doing job 3
09:38:54.566 Execute task 6
09:38:54.566 Execute task 7
09:38:54.566 Execute task 8
09:38:54.566 Execute task 9
09:38:57.566 Done job 0
09:38:57.566 Done job 1
09:38:57.566 Doing job 4
09:38:57.566 Done job 2
09:38:57.566 Doing job 5
09:38:57.566 Done job 3
09:38:57.566 Doing job 6
09:38:57.567 Doing job 7
09:39:00.566 Done job 4
09:39:00.566 Doing job 8
09:39:00.567 Done job 5
09:39:00.567 Doing job 9
09:39:00.567 Done job 6
09:39:00.567 Done job 7
09:39:03.567 Done job 8
09:39:03.567 Done job 9
09:39:03.568 All jobs are terminated

위의 코드에서 awaitTermination()의 Timeout을 5초로 변경했을 때, 실행 결과

09:37:49.566 Execute task 0
09:37:49.574 Execute task 1
09:37:49.574 Doing job 0
09:37:49.574 Execute task 2
09:37:49.574 Doing job 1
09:37:49.574 Execute task 3
09:37:49.574 Doing job 2
09:37:49.579 Execute task 4
09:37:49.579 Doing job 3
09:37:49.579 Execute task 5
09:37:49.579 Execute task 6
09:37:49.579 Execute task 7
09:37:49.579 Execute task 8
09:37:49.579 Execute task 9
09:37:52.574 Done job 0
09:37:52.574 Done job 1
09:37:52.574 Doing job 4
09:37:52.574 Doing job 5
09:37:52.574 Done job 2
09:37:52.575 Doing job 6
09:37:52.579 Done job 3
09:37:52.580 Doing job 7
09:37:54.581 some jobs are not terminated
09:37:54.584 Done job 7
java.lang.InterruptedException: sleep interrupted
09:37:54.585 Done job 6
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
09:37:54.586 Done job 4
09:37:54.586 Done job 5

5. 사용가능한 코어 개수 만큼 쓰레드 생성 방법

쓰레드풀에서 생성할 쓰레드 개수를 정할 때 얼만큼 생성하는 것이 효율적인지 고민이 될 수 있습니다.

CPU의 core 개수가 작은데 매우 많은 쓰레드를 만들면, 잦은 Context change로 적은 개수의 쓰레드보다 성능이 안좋을 수 있기 때문입니다.

이런 이유로, CPU 코어 개수로 쓰레드의 개수를 정하기도 합니다. Runtime.availableProcessors()는 Java가 실행 중인 JVM 환경에서 사용가능한 코어 개수를 리턴해줍니다.

아래와 같이 CPU 코어 개수 만큼 쓰레드풀의 쓰레드를 생성할 수 있습니다.

package concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class FixedSizeThreadPoolExecutorExample3 {

    public static void main(String[] args) {

        final int maxCore = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxCore);
        System.out.println("Max thread pool size: " + executor.getMaximumPoolSize());

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println("Execute task " + jobId);
            executor.execute(() -> {
                System.out.println("Doing a job " + jobId);
            });
        }
        executor.shutdown();
    }
}

Output:

Max thread pool size: 8
Execute task 0
Execute task 1
Doing a job 0
Execute task 2
Doing a job 1
Execute task 3
Doing a job 2
Execute task 4
Doing a job 3
Execute task 5
Doing a job 4
Execute task 6
Doing a job 5
Execute task 7
Doing a job 6
Doing a job 7
Execute task 8
Execute task 9
Doing a job 8
Doing a job 9

6. Future로 Task의 결과 리턴 받기

이전 예제에서는 Task를 Runnable로 전달하였습니다. Runnable은 리턴 값이 없기 때문에, 작업을 수행만 하고 결과를 리턴 받을 수는 없었습니다.

이번에는 Callable을 쓰레드풀에 전달하고, Callable의 구현 내용을 수행하고 결과를 리턴하는 방법을 알아보겠습니다. Callable은 리턴 값이 있습니다.

아래 예제는 ThreadPoolExecutor.submit()은 Callable을 인자로 받고 Future를 리턴합니다. 그리고 Future를 통해 Callable의 결과를 리턴 받을 수 있습니다.

다음은 Callable Task를 전달하고, 결과로 String을 받는 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample4 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final List<Future<String>> futures = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // submit()은 인자로 Callable도 받으며 Future를 리턴합니다.
            Future<String> result = executor.submit(() -> {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(LocalTime.now() + " Done task " + jobId);
                return "finished job " + jobId;
            });
            futures.add(result);
        }

        // future.get()은 Task의 result가 리턴될 때까지 기다립니다.
        for (Future<String> future : futures) {
            String result = future.get();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}

Future.get()Future에 결과가 리턴될 때까지 기다립니다. 위의 for문은 첫번째 future의 값이 리턴되면 두번째 future 값이 리턴되기를 기다립니다. 예를 들어, 첫번째 Task가 10초 걸리고, 두번째 Task가 1초걸린다면, 첫번째에서 계속 기다리기 때문에 두번째의 결과를 늦게 가져오게 됩니다.

만약 Task가 완료되는 순서대로 결과를 처리하고 싶다면 BlockingQueue를 사용하여 구현할 수 있습니다.

Output:

12:24:36.948 Execute task 0
12:24:36.958 Execute task 1
12:24:36.958 Execute task 2
12:24:36.959 Execute task 3
12:24:36.959 Execute task 4
12:24:36.959 Execute task 5
12:24:36.959 Execute task 6
12:24:36.959 Execute task 7
12:24:36.959 Execute task 8
12:24:36.959 Execute task 9
12:24:38.959 Done task 0
12:24:38.959 Done task 1
12:24:38.959 Done task 2
12:24:38.959 finished job 0
12:24:38.959 Done task 3
12:24:38.959 finished job 1
12:24:38.959 finished job 2
12:24:38.959 finished job 3
12:24:40.959 Done task 4
12:24:40.959 Done task 6
12:24:40.959 Done task 5
12:24:40.959 Done task 7
12:24:40.959 finished job 4
12:24:40.960 finished job 5
12:24:40.960 finished job 6
12:24:40.960 finished job 7
12:24:42.959 Done task 8
12:24:42.959 Done task 9
12:24:42.960 finished job 8
12:24:42.960 finished job 9

7. BlockingQueue로 결과 리턴 받기

Future를 사용하지 않고 BlockingQueue를 이용하여 결과를 받을 수 있습니다. BlockingQueue는 동시성을 보장하기 때문에 멀티쓰레드에서 접근해도 문제가 없습니다.

아래 예제에서는 Callable을 사용하지 않고 Runnable을 사용합니다. 여기서는 결과를 리턴할 필요가 없기 때문입니다.

그리고 Task 0은 10초를 sleep하고, Task9는 1초를 sleep하도록 하였습니다. 이처럼 먼저 수행되는 Task가 더 오래 걸리도록 만들었습니다. 이렇게 하는 이유는 Task가 완료되는 순서대로 결과를 처리할 수 있다는 것을 보여주기 위해서입니다.

다음은 그 예제입니다.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample5 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            final int sleepSec = 10 - i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // Result를 BlockingQueue에 저장합니다.
            executor.submit(() -> {
                System.out.println(LocalTime.now() + " Doing Task " + jobId + ", sleepSec: " + sleepSec);
                try {
                    TimeUnit.SECONDS.sleep(sleepSec);
                    String result = "finished job " + jobId;
                    queue.put(result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // take()는 Queue에 아이템이 추가될 때까지 기다렸다가 리턴합니다.
        for (int i = 0; i < 10; i++) {
            String result = queue.take();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}

Output:

12:26:09.009 Execute task 0
12:26:09.018 Execute task 1
12:26:09.018 Doing Task 0, sleepSec: 10
12:26:09.018 Execute task 2
12:26:09.018 Doing Task 1, sleepSec: 9
12:26:09.018 Execute task 3
12:26:09.018 Doing Task 2, sleepSec: 8
12:26:09.018 Execute task 4
12:26:09.018 Doing Task 3, sleepSec: 7
12:26:09.018 Execute task 5
12:26:09.018 Execute task 6
12:26:09.018 Execute task 7
12:26:09.018 Execute task 8
12:26:09.018 Execute task 9
12:26:16.019 Doing Task 4, sleepSec: 6
12:26:16.019 finished job 3
12:26:17.018 finished job 2
12:26:17.018 Doing Task 5, sleepSec: 5
12:26:18.018 Doing Task 6, sleepSec: 4
12:26:18.018 finished job 1
12:26:19.018 finished job 0
12:26:19.018 Doing Task 7, sleepSec: 3
12:26:22.018 Doing Task 8, sleepSec: 2
12:26:22.018 finished job 7
12:26:22.019 Doing Task 9, sleepSec: 1
12:26:22.019 finished job 6
12:26:22.019 finished job 4
12:26:22.019 finished job 5
12:26:23.019 finished job 9
12:26:24.019 finished job 8
Loading script...

Related Posts

codechachaCopyright ©2019 codechacha