HOME > java > tips

Java Executors를 사용하는 방법

JSFollow25 Jun 2018

Executors

자바의 Executor를 이용하면 간단히 멀티쓰레드 프로그램을 작성할 수 있습니다. 어떤 작업들을 병렬처리로 수행하는데 많이 복잡하지도, 많이 간단하지 않게 작성할 수 있습니다.

ExecutorService

ExecutorService를 생성하고 submit()으로 작업을 추가하면 됩니다. 아래 코드를 참조하면, newFixedThreadPool(4)는 Thread를 4개 생성하겠다는 의미입니다. 그리고 submit(() -> { })은 멀티쓰레드로 처리할 작업을 예약합니다.

4개의 작업을 예약했고, 예약과 동시에 먼저 생성된 4개의 쓰레드는 작업들을 처리합니다. executor를 모두 사용했으면, executor.shutdownNow()로 생성된 쓰레드를 종료해줘야 합니다. 여기서는 4개의 작업이 언제 끝날지 몰라 3초간 sleep한 후 shutdown처리를 하였습니다.

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceTest {
    public static void main(String args[]) {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdownNow();
        System.out.println("end");
    }
}

로그를 보면 두개의 쓰레드가 4개의 작업을 모두 처리하였습니다. 대부분 예약한 순서대로 작업이 처리가 되지만, 간혹 아래처럼 2번 쓰레드가 지연이 되어 순서가 뒤바뀌는 일이 발생할 수 있습니다.

Job1 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
Job2 pool-1-thread-2
end

SingleThreadExecutor

SingleThreadExecutor는 Thread가 1개인 Executor입니다. 1개이기 때문에 작업을 예약한 순서대로 처리를 합니다. 동시성(Concurrency)을 고려할 필요가 없습니다.

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceTest2 {
    public static void main(String args[]) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdownNow();
        System.out.println("end");
    }
}

로그를 보면 순서대로 처리를 합니다.

Job1 pool-1-thread-1
Job2 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
end

Future

Future를 이용하면 예약된 작업에 대한 결과를 알 수 있습니다. 아래코드를 보면 Runtime.getRuntime().availableProcessors()이 보입니다. 현재 PC(장치)의 사용가능한 Core 개수를 알 수 있어, 효율적으로 쓰레드를 생성할 수 있습니다.

executor.submit()은 Future객체를 리턴합니다. 모든 작업을 예약할 때, Future도 따로 저장을 해 두면 결과에 대한 정보를 알 수 있습니다.

이전 코드를 보면 작업을 추가하고 처리에 대한 결과를 확인하지 않는데, 여기서는 future.get()로 작업이 종료될 때 까지 기다립니다.

List에 future를 1~4 작업을 순서대로 추가했기 때문에, 그 밑의 For문에서 1~4 작업을 순서대로 기다립니다. 그래서 로그를 출력해보면 순서대로 출력이 됩니다. Future에 대한 for문이 끝나면 ExecutorService는 필요가 없기 때문에 바로 종료할 수 있습니다.

package test;

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceTest3 {
    public static void main(String args[]) {
        final int maxCore = Runtime.getRuntime().availableProcessors();
        final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        final List<Future<String>> futures = new ArrayList<>();

        for (int i = 1; i < 5; i++) {
            final int index = i;
            futures.add(executor.submit(() -> {
                System.out.println("finished job" + index);
                return "job" + index + " " + Thread.currentThread().getName();
            }));
        }

        for (Future<String> future : futures) {
            String result = null;
            try {
                result = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(result);
        }

        executor.shutdownNow();
        System.out.println("end");
    }
}

작업은 순서대로 처리되지 않을 수 있지만, 로그는 순차적으로 출력됩니다.

finished job1
finished job3
finished job2
job1 pool-1-thread-1
job2 pool-1-thread-2
job3 pool-1-thread-3
finished job4
job4 pool-1-thread-4
end

BlockingQueue

사실 위의 Future에 대한 코드를 보면 비효율적인 부분이 있습니다. 첫번째 작업이 늦게 처리된다면 다른 작업에 대한 로그도 늦게 출력이 됩니다. 물론 for문으로 먼저 끝나는 작업을 찾아 로그를 출력할 수 있습니다.

BlockingQueue는 이것을 편하게 도와줍니다. 작업이 끝날 때 BlockingQueue에 결과를 추가하고 메인쓰레드에서 Queue를 기다리면 됩니다.

전체적으로 보면, 멀티쓰레드에서 이 Queue에 add를 하는 구조입니다. 동시성 문제가 발생할 것 같지만, BlockingQueue 객체는 동시성에 안전하도록 구현되어있습니다. (코드양이 많아진 것 같아서 ParallelExcutorService 객체를 구현하였습니다.)

package test;

import java.util.List;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ExecutorServiceTest4 {
    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");

        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }

        System.out.println("end");
        service.close();
    }

    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        public ParallelExcutorService() {
        }

        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }

        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}

로그를 보면 처리한 순서대로 메인쓰레드에서 로그를 출력하고 있습니다.

finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end

참고