Java - ForkJoinPool

There are many good articles explaining ForkJoinPool, such as Java Fork and Join using ForkJoinPool. It is easy to understand how it works if you look at the text and pictures of this blog, but by modifying the example a bit, you can see how tasks are separated and processed in multi-threads.

ForkJoinPool is similar to ExecutorService. Multiple tasks can be performed in parallel by creating a thread pool. A slightly different part of ForkJoinPool is split (Fork) according to the size of the task, and when the split task is processed, it returns (Join) it. It works like a divide and conquer algorithm.

Fork

In ForkJoinPool, Fork means that the task is split and processed in a different thread. As shown in the following figure, one task is divided into several small tasks, and tasks are assigned to several threads. forkjoinpool - fork

Join

In ForkJoinPool, Join means to wait for results processed by other threads and join them. In other words, as shown below, the Parent waits until the task processed in the Child is completed, then combines the results and delivers it to the higher Parent. forkjoinpool - join

RecursiveAction과 RecursiveTask

RecursiveAction and RecursiveTask are classes that represent Task. To handle any task in ForkJoinPool, you need to use these two classes.

The differences between these two classes are:

  • RecursiveAction: A Task with no return value. If you are a Task that does not need a return value, you can define Task with this class.
  • RecursiveTask: A Task with a return value. Parent waits for the return value of the Child Task, merges it, and passes it to the parent Parent

Example of ForkJoinPool

First, create a ForkJoinPool. Passes the number of threads to be created as an argument.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Now we just need to define a Task and let the ForkJoinPool handle that Task.

RecursiveAction

RecursiveAction is a Task with no return value.

The following is a Task implemented with RecursiveAction. When the Task is executed, compute() is called. If you want to partition a task, create a RecursiveTask and call fork() to have the task processed in another thread.

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<MyRecursiveAction>();

            subtasks.addAll(createSubtasks());

            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }

        } else {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

If you look at the code above

  • Added sleep() code to load work. If it terminates immediately, it may not appear to be parallelized because Tasks can be executed consecutively in the same thread.
  • By printing the task name and time, you can easily see which task is being processed by which thread and when.
  • In compute(), when the workload is greater than 16, the task is divided, and when the workload is less than 16, it is defined to be processed in that thread without further division.

You can pass RecursiveAction as an argument to forkJoinPool.invoke() and have it processed as follows.

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(128);
forkJoinPool.invoke(myRecursiveAction);

// Just wait until all tasks done
forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

The execution result is as follows.

[19:45:23.430][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[19:45:24.432][ForkJoinPool-1-worker-1] Splitting workLoad : 64
[19:45:24.432][ForkJoinPool-1-worker-2] Splitting workLoad : 64
[19:45:25.433][ForkJoinPool-1-worker-1] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-3] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-0] Splitting workLoad : 32
[19:45:25.433][ForkJoinPool-1-worker-2] Splitting workLoad : 32
[19:45:26.434][ForkJoinPool-1-worker-1] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-0] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-2] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-3] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-2] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-0] Doing workLoad myself: 16
[19:45:26.434][ForkJoinPool-1-worker-1] Doing workLoad myself: 16
[19:45:26.435][ForkJoinPool-1-worker-3] Doing workLoad myself: 16

RecursiveTask

A RecursiveTask is a Task with a return value. The execution method is the same as that of RecursiveAction, but in order to return the processed result, Parent waits until Child`s Task is completed with join().

The following is a Task implemented with RecursiveTask.

public class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {
        String threadName = Thread.currentThread().getName();

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                    + " Splitting workLoad : " + this.workLoad);
            sleep(1000);
            List<MyRecursiveTask> subtasks =
                    new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());

            for(MyRecursiveTask subtask : subtasks){
                subtask.fork();
            }

            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
                System.out.println("[" + LocalTime.now() + "][" + threadName + "]"
                        + "Received result from subtask");
            }
            return result;

        } else {
            sleep(1000);
            System.out.println("[" + LocalTime.now() + "]["
                    + " Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);

// Just wait until all tasks done
forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

Running the above code will output the following:

[20:52:53.978][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[20:52:54.981][ForkJoinPool-1-worker-3] Splitting workLoad : 64
[20:52:54.981][ForkJoinPool-1-worker-2] Splitting workLoad : 64
[20:52:55.983][ForkJoinPool-1-worker-0] Splitting workLoad : 32
[20:52:55.983][ForkJoinPool-1-worker-2] Splitting workLoad : 32
[20:52:55.984][ForkJoinPool-1-worker-4] Splitting workLoad : 32
[20:52:57.984][ Doing workLoad myself: 16
[20:52:57.984][ Doing workLoad myself: 16
[20:52:57.984][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:57.984][ForkJoinPool-1-worker-0]Received result from subtask
[20:52:57.985][ Doing workLoad myself: 16
[20:52:57.985][ForkJoinPool-1-worker-4]Received result from subtask
[20:52:58.985][ Doing workLoad myself: 16
[20:52:58.986][ Doing workLoad myself: 16
[20:52:58.985][ Doing workLoad myself: 16
[20:52:58.986][ForkJoinPool-1-worker-4]Received result from subtask
[20:52:58.986][ForkJoinPool-1-worker-0]Received result from subtask
[20:52:58.987][ForkJoinPool-1-worker-4] Splitting workLoad : 32
[20:52:58.987][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-3]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.988][ForkJoinPool-1-worker-2]Received result from subtask
[20:52:58.989][ForkJoinPool-1-worker-1]Received result from subtask
[20:53:00.989][ Doing workLoad myself: 16
[20:53:00.989][ Doing workLoad myself: 16
[20:53:00.989][ForkJoinPool-1-worker-4]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-4]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-3]Received result from subtask
[20:53:00.990][ForkJoinPool-1-worker-1]Received result from subtask
mergedResult = 384

Process tasks asynchronously (Future)

Tasks can be processed asynchronously. You can pass a Task as an argument to submit() and return a Future. A Future allows you to wait for or read a Result when needed.

You can use it like this:

ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
Future<Long> future = forkJoinPool.submit(myRecursiveTask);

System.out.println("Do something....");

System.out.println("mergedResult = " + future.get());

forkJoinPool.awaitTermination(5, TimeUnit.SECONDS);

The execution result is as follows.

Do something....
[21:03:41.125][ForkJoinPool-1-worker-1] Splitting workLoad : 128
[21:03:42.126][ForkJoinPool-1-worker-2] Splitting workLoad : 64
....
[21:03:48.134][ForkJoinPool-1-worker-3]Received result from subtask
[21:03:48.134][ForkJoinPool-1-worker-1]Received result from subtask
mergedResult = 384

Reference

codechachaCopyright ©2019 codechacha