java.util.concurrent.Executors와 java.util.concurrent.ExecutorService를 이용하면 간단히 쓰레드풀을 생성하여 병렬처리를 할 수 있습니다.
ExecutorService 생성
Executors는 ExecutorService 객체를 생성하며, 다음 메소드를 제공하여 쓰레드 풀을 개수 및 종류를 정할 수 있습니다.
- newFixedThreadPool(int) : 인자 개수만큼 고정된 쓰레드풀을 만듭니다.
 - newCachedThreadPool(): 필요할 때, 필요한 만큼 쓰레드풀을 생성합니다. 이미 생성된 쓰레드를 재활용할 수 있기 때문에 성능상의 이점이 있을 수 있습니다.
 - newScheduledThreadPool(int): 일정 시간 뒤에 실행되는 작업이나, 주기적으로 수행되는 작업이 있다면 ScheduledThreadPool을 고려해볼 수 있습니다.
 - newSingleThreadExecutor(): 쓰레드 1개인 ExecutorService를 리턴합니다. 싱글 쓰레드에서 동작해야 하는 작업을 처리할 때 사용합니다.
 
다음은 4개의 고정된 쓰레드풀을 갖고 있는 ExecutorService를 생성하는 코드입니다.
ExecutorService executor = Executors.newFixedThreadPool(4);ExecutorService에 작업(Job) 추가
Executors로 ExecutorService를 생성하였다면, ExecutorService는 작업을 처리할 수 있습니다. ExecutorService.submit() 메소드로 작업을 추가하면 됩니다.
아래 코드에서 newFixedThreadPool(4)는 Thread를 4개 생성하겠다는 의미입니다.
그리고 submit(() -> { })은 멀티쓰레드로 처리할 작업을 예약합니다. 인자로 람다식을 전달할 수 있습니다.
아래 코드에서 4개의 작업을 예약했고, 예약과 동시에 먼저 생성된 4개의 쓰레드는 작업들을 처리합니다.
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceTest {
    public static void main(String args[]) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });
        // 더이상 ExecutorService에 Task를 추가할 수 없습니다.
        // 작업이 모두 완료되면 쓰레드풀을 종료시킵니다.
        executor.shutdown();
        // shutdown() 호출 전에 등록된 Task 중에 아직 완료되지 않은 Task가 있을 수 있습니다.
        // Timeout을 20초 설정하고 완료되기를 기다립니다.
        // 20초 전에 완료되면 true를 리턴하며, 20초가 지나도 완료되지 않으면 false를 리턴합니다.
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");
            // 모든 Task를 강제 종료합니다.
            executor.shutdownNow();
        }
        System.out.println("end");
    }
}로그를 보면 두개의 쓰레드가 4개의 작업을 모두 처리하였습니다. 대부분 예약한 순서대로 작업이 처리가 되지만, 간혹 아래처럼 2번 쓰레드가 지연이 되어 순서가 뒤바뀌는 일이 발생할 수 있습니다.
Job1 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
Job2 pool-1-thread-2
endshutdown()은 더 이상 쓰레드풀에 작업을 추가하지 못하도록 합니다. 그리고 처리 중인 Task가 모두 완료되면 쓰레드풀을 종료시킵니다.
awaitTermination()은 이미 수행 중인 Task가 지정된 시간동안 끝나기를 기다립니다.
지정된 시간 내에 끝나지 않으면 false를 리턴하며, 이 때 shutdownNow()를 호출하면 실행 중인 Task를 모두 강제로 종료시킬 수 있습니다.
SingleThreadExecutor
SingleThreadExecutor는 Thread가 1개인 Executor입니다. 1개이기 때문에 작업을 예약한 순서대로 처리를 합니다. 동시성(Concurrency)을 고려할 필요가 없습니다.
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceTest2 {
    public static void main(String args[]) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });
        executor.shutdown();
        executor.awaitTermination(20, TimeUnit.SECONDS)
        System.out.println("end");
    }
}로그를 보면 순서대로 처리를 합니다.
Job1 pool-1-thread-1
Job2 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
endFuture
Future를 이용하면 예약된 작업에 대한 결과를 알 수 있습니다.
executor.submit()은 Future객체를 리턴합니다. 모든 작업을 예약할 때, Future를 따로 저장을 해 두면 메인쓰레드에서 쓰레드풀에서 처리한 결과를 알 수 있습니다.
이전 코드를 보면 작업을 추가하고 처리에 대한 결과를 확인하지 않는데,
여기서는 future.get()로 작업이 종료될 때 까지 기다립니다.
List에 future를 Job1에서 Job4까지의 작업을 순서대로 추가했기 때문에, 그 밑의 For문에서 1~4 작업을 순서대로 기다립니다. 그래서 로그를 출력해보면 순서대로 출력이 됩니다. Future에 대한 for문이 끝나면 ExecutorService는 필요가 없기 때문에 바로 종료할 수 있습니다.
Runtime.getRuntime().availableProcessors()는 현재 사용가능한 core 개수를 리턴해 줍니다.
현재 PC(장치)의 사용가능한 Core 개수를 알 수 있기 때문에 효율적으로 쓰레드를 생성할 수 있습니다.
package test;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceTest3 {
    public static void main(String args[]) {
        final int maxCore = Runtime.getRuntime().availableProcessors();
        final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        final List<Future<String>> futures = new ArrayList<>();
        for (int i = 1; i < 5; i++) {
            final int index = i;
            futures.add(executor.submit(() -> {
                System.out.println("finished job" + index);
                return "job" + index + " " + Thread.currentThread().getName();
            }));
        }
        for (Future<String> future : futures) {
            String result = null;
            try {
                result = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(result);
        }
        executor.shutdownNow();
        System.out.println("end");
    }
}작업은 순서대로 처리되지 않을 수 있지만, 로그는 순차적으로 출력됩니다.
finished job1
finished job3
finished job2
job1 pool-1-thread-1
job2 pool-1-thread-2
job3 pool-1-thread-3
finished job4
job4 pool-1-thread-4
endBlockingQueue
사실 위의 Future에 대한 코드를 보면 비효율적인 부분이 있습니다. 첫번째 작업이 늦게 처리된다면 다른 작업에 대한 로그도 늦게 출력이 됩니다. 물론 for문으로 먼저 끝나는 작업을 찾아 로그를 출력할 수 있습니다.
BlockingQueue는 이것을 편하게 도와줍니다. 작업이 끝날 때 BlockingQueue에 결과를 추가하고 메인쓰레드에서 Queue를 기다리면 됩니다.
전체적으로 보면, 멀티쓰레드에서 이 Queue에 add를 하는 구조입니다. 동시성 문제가 발생할 것 같지만, BlockingQueue 객체는 동시성을 보장하도록 구현되어있습니다. (코드양이 많아진 것 같아서 ParallelExcutorService 객체를 구현하였습니다.)
package test;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ExecutorServiceTest4 {
    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");
        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }
        System.out.println("end");
        service.close();
    }
    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
        public ParallelExcutorService() {
        }
        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}로그를 보면 처리한 순서대로 메인쓰레드에서 로그를 출력하고 있습니다.
finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end참고
Related Posts
- Java - Unsupported class file major version 61 에러
 - Java - String.matches()로 문자열 패턴 확인 및 다양한 예제 소개
 - Java - 문자열 공백제거 (trim, replace)
 - Java - replace()와 replaceAll()의 차이점
 - Java - ArrayList 초기화, 4가지 방법
 - Java - 배열 정렬(Sorting) (오름차순, 내림차순)
 - Java - 문자열(String)을 비교하는 방법 (==, equals, compare)
 - Java - StringBuilder 사용 방법, 예제
 - Java - 로그 출력, 파일 저장 방법 (Logger 라이브러리)
 - Java IllegalArgumentException 의미, 발생 이유
 - Java - NullPointerException 원인, 해결 방법
 - Seleninum의 ConnectionFailedException: Unable to establish websocket connection 해결
 - Java - compareTo(), 객체 크기 비교
 - Java - BufferedReader로 파일 읽기
 - Java - BufferedWriter로 파일 쓰기
 - Java - BigInteger 범위, 비교, 연산, 형변환
 - Java charAt() 함수 알아보기
 - Java contains()로 문자(대소문자 X) 포함 확인
 - Java - Set(HashSet)를 배열로 변환
 - Java - 문자열 첫번째 문자, 마지막 문자 확인
 - Java - 문자열 한글자씩 자르기
 - Java - 문자열 단어 개수 가져오기
 - Java - 1초마다 반복 실행
 - Java - 배열을 Set(HashSet)로 변환
 - Java - 여러 Set(HashSet) 합치기
 - Java - 명령행 인자 입력 받기
 - Java - 리스트 역순으로 순회, 3가지 방법
 - Java - 특정 조건으로 리스트 필터링, 3가지 방법
 - Java - HashMap 모든 요소들의 합계, 평균 계산
 - Java - 특정 조건으로 HashMap 필터링
 - Java - 싱글톤(Singleton) 패턴 구현
 - Java - 숫자 왼쪽에 0으로 채우기
 - Java - String 배열 초기화 방법
 - Java - 정렬된 순서로 Map(HashMap) 순회
 - Java - HashMap에서 key, value 가져오기