Java - ForkJoinPool

Java Fork and Join using ForkJoinPool와 같이 ForkJoinPool을 설명하는 좋은 글들이 많습니다. 이 블로그의 글과 그림을 보면 어떻게 동작하는지 이해하 쉽지만, 예제를 조금 고쳐, Task들이 어떻게 분리되어 멀티 쓰레드에서 처리되는를 보았습니다.

ForkJoinPool은 ExecutorService와 비슷합니다. Thread Pool을 생성하여 여러 작업을 병렬처리로 수행할 수 있습니다. ForkJoinPool가 조금 다른 부분은 Task의 크기에 따라 분할(Fork)하고, 분할된 Task가 처리되면 그것을 합쳐(Join) 리턴해 줍니다. 마치 분할정복법(Divide And Conquer) 알고리즘처럼 동작합니다.

Fork

ForkJoinPool에서 Fork는 Task를 분할하여 다른 쓰레드에서 처리시킨다는 의미입니다. 다음 그림처럼 하나의 Task를 작은 여러 Task로 나누고, 여러 쓰레드에 Task를 할당해 줍니다. forkjoinpool - fork

Join

ForkJoinPool에서 Join은 다른 쓰레드에서 처리된 결과를 기다렸다가 합친다는 의미입니다. 즉, 아래 그럼처럼 Parent는 Child에서 처리되는 Task가 완료될 때까지 기다린 후 결과를 합쳐 더 상위의 Parent로 전달합니다. forkjoinpool - join

RecursiveAction과 RecursiveTask

RecursiveAction과 RecursiveTask는 Task를 대표하는 클래스입니다. ForkJoinPool에서 어떤 Task를 처리하려면 이 두개의 클래스를 이용해야 합니다.

이 두 클래스의 차이점은 다음과 같습니다.

  • RecursiveAction: 리턴 값이 없는 Task입니다. 리턴 값이 필요하지 않는 Task라면 이 클래스로 Task를 정의하면 됩니다.
  • RecursiveTask: 리턴 값이 있는 Task입니다. Parent는 Child Task의 리턴 값을 기다려 합친 후 상위 Parent로 전달합니다.

ForkJoinPool 예제

먼저 ForkJoinPool를 생성합니다. 인자로 생성할 쓰레드 개수를 전달합니다.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

이제 Task를 정의하고 ForkJoinPool가 그 Task를 처리하도록 만들면 됩니다.

RecursiveAction

RecursiveAction는 리턴 값이 없는 Task입니다.

다음은 RecursiveAction로 구현한 Task입니다. Task가 실행되면 compute()가 호출됩니다. Task를 분할하고 싶다면 RecursiveTask를 생성하고 fork()를 호출하여 다른 쓰레드에서 작업이 처리되도록 합니다.

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<MyRecursiveAction>();

            subtasks.addAll(createSubtasks());

            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }

        } else {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

위 코드를 보시면

  • 작업에 부하를 주기 위해 sleep() 코드를 추가하였습니다. 바로 종료되면 동일 쓰레드에서 Task가 연달아 실행될 수 있기 때문에 병렬처리되는 것처럼 보이지 않을 수 있습니다.
  • Task이름과 시간을 출력하여 어떤 쓰레드에서 어떤 작업이 언제 처리되는지 쉽게 볼 수 있습니다.
  • compute()에서는 workload가 16보다 클 때 Task를 나누고 16이하면 더 이상 나누지 않고 그 쓰레드에서 처리하도록 정의하였습니다.

다음과 같이 forkJoinPool.invoke()으로 RecursiveAction을 인자로 전달하고 처리되도록 할 수 있습니다.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(128);
forkJoinPool.invoke(myRecursiveAction);

// Just wait until all tasks done
forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

실행 결과는 다음과 같습니다.

[19:45:23.430][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[19:45:24.432][ForkJoinPool-1-worker-1] Splitting workLoad : 64
[19:45:24.432][ForkJoinPool-1-worker-2] Splitting workLoad : 64
[19:45:25.433][ForkJoinPool-1-worker-1] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-3] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-0] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-2] Splitting workLoad : 32
[19:45:26.434][ForkJoinPool-1-worker-1] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-0] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-2] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-3] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-2] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-0] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-1] Doing workLoad myself: 16
[19:45:26.435][ForkJoinPool-1-worker-3] Doing workLoad myself: 16

RecursiveTask

RecursiveTask는 리턴 값이 있는 Task입니다. RecursiveAction과 실행 방법은 동일하지만 처리된 결과를 리턴 받기 위해, Parent는 join()으로 Child의 Task가 완료될 때까지 기다립니다.

다음은 RecursiveTask로 구현한 Task입니다.

public class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveTask> subtasks =
                    new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());

            for(MyRecursiveTask subtask : subtasks){
                subtask.fork();
            }

            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
                System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                        + "Received result from subtask");
            }
            return result;

        } else {
            sleep(1000);
            System.out.println("[" + LocalTime.now() + "]["
                    + " Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);

// Just wait until all tasks done
forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

위의 코드를 실행하면 다음과 같이 출력됩니다.

[20:52:53.978][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[20:52:54.981][ForkJoinPool-1-worker-3] Splitting workLoad : 64
[20:52:54.981][ForkJoinPool-1-worker-2] Splitting workLoad : 64
[20:52:55.983][ForkJoinPool-1-worker-0] Splitting workLoad : 32
[20:52:55.983][ForkJoinPool-1-worker-2] Splitting workLoad : 32
[20:52:55.984][ForkJoinPool-1-worker-4] Splitting workLoad : 32
[20:52:57.984][ Doing workLoad myself: 16
[20:52:57.984][ Doing workLoad myself: 16
[20:52:57.984][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:57.984][ForkJoinPool-1-worker-0]Received result from subtask
[20:52:57.985][ Doing workLoad myself: 16
[20:52:57.985][ForkJoinPool-1-worker-4]Received result from subtask
[20:52:58.985][ Doing workLoad myself: 16
[20:52:58.986][ Doing workLoad myself: 16
[20:52:58.985][ Doing workLoad myself: 16
[20:52:58.986][ForkJoinPool-1-worker-4]Received result from subtask
[20:52:58.986][ForkJoinPool-1-worker-0]Received result from subtask
[20:52:58.987][ForkJoinPool-1-worker-4] Splitting workLoad : 32
[20:52:58.987][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-3]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.989][ForkJoinPool-1-worker-1]Received result from subtask
[20:53:00.989][ Doing workLoad myself: 16
[20:53:00.989][ Doing workLoad myself: 16
[20:53:00.989][ForkJoinPool-1-worker-4]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-4]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-3]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-1]Received result from subtask
mergedResult = 384

비동기적으로 Task 처리(Future)

Task를 비동기적으로 처리할 수 있습니다. submit()에 Task를 인자로 전달할 수 있으며 Future를 리턴받습니다. Future를 통해 필요할 때 Result를 기다리거나 읽을 수 있습니다.

다음과 같이 사용할 수 있습니다.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
Future<Long> future = forkJoinPool.submit(myRecursiveTask);

System.out.println("Do something....");

System.out.println("mergedResult = " + future.get());

forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

실행 결과는 다음과 같습니다.

Do something....
[21:03:41.125][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[21:03:42.126][ForkJoinPool-1-worker-2] Splitting workLoad : 64
....
[21:03:48.134][ForkJoinPool-1-worker-3]Received result from subtask
[21:03:48.134][ForkJoinPool-1-worker-1]Received result from subtask
mergedResult = 384

참고

Loading script...

Related Posts

codechachaCopyright ©2019 codechacha