Java - How to use ExecutorService

By using java.util.concurrent.Executors and java.util.concurrent.ExecutorService, you can simply create a thread pool for parallel processing.

Create ExecutorService

Executors create ExecutorService object and provide the following methods to set the number and type of thread pool.

  • newFixedThreadPool(int) : Creates a thread pool fixed by the number of arguments.
  • newCachedThreadPool(): Creates as many threadpools as needed, when needed. There can be performance benefits because already created threads can be recycled
  • newScheduledThreadPool(int): If you have a task that runs after a certain amount of time, or a task that runs periodically, you might consider a ScheduledThreadPool.
  • newSingleThreadExecutor(): Returns an ExecutorService with 1 thread. It is used to process tasks that need to be run on a single thread.

The following code creates an ExecutorService with 4 fixed thread pools.

ExecutorService executor = Executors.newFixedThreadPool(4);

Add Job to ExecutorService

If you create an ExecutorService with Executors, the ExecutorService can handle the work. Just add the task with the ExecutorService.submit() method.

In the code below, newFixedThreadPool(4) means to create 4 threads. And submit(() -> { }) schedules a task for multithreading. You can pass a lambda expression as an argument.

In the code below, 4 tasks are scheduled, and the 4 threads created at the same time as the reservation process the tasks.

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceTest {

    public static void main(String args[]) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });

        // Can`t add Task to ExecutorService anymore
        // When all tasks are complete, the thread pool is terminated.
        executor.shutdown();

        // Among the tasks registered before the shutdown() call, there may be tasks that are not yet completed.
        // Set Timeout to 20 seconds and wait for it to complete
        // Returns true if completed before 20 seconds, false if not completed after 20 seconds
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");

            // Force close all tasks
            executor.shutdownNow();
        }

        System.out.println("end");
    }
}

Looking at the log, two threads processed all four tasks. Most of the tasks are processed in the order they are scheduled, but in some cases, the order may be reversed due to the delay of thread 2 as shown below.

Job1 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
Job2 pool-1-thread-2
end

shutdown() will stop adding any more tasks to the threadpool. And when all the tasks being processed are completed, the thread pool is terminated.

awaitTermination() waits for the already running task to finish for the specified time. If it does not finish within the specified time, false is returned. At this time, calling shutdownNow() can forcefully terminate all running tasks.

SingleThreadExecutor

SingleThreadExecutor is an Executor with 1 Thread. Since there is only one, the jobs are processed in the order they were scheduled. There is no need to consider concurrency.

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceTest2 {

    public static void main(String args[]) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job1 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job2 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job3 " + threadName);
        });
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Job4 " + threadName);
        });

        executor.shutdown();
        executor.awaitTermination(20, TimeUnit.SECONDS)
        System.out.println("end");
    }
}

The logs are processed in order.

Job1 pool-1-thread-1
Job2 pool-1-thread-1
Job3 pool-1-thread-1
Job4 pool-1-thread-1
end

Future

Futures let you know the results of scheduled tasks.

executor.submit() returns a Future object. When scheduling all tasks, if you save the Future separately, you can know the result of processing in the thread pool from the main thread.

If you look at the previous code, you add a task and don`t check the result of processing, Here, future.get() waits until the operation is finished.

Because I added futures 1-4 to the list in order, In the For statement below it, it waits for operations 1 to 4 in order. So, if you print the log, the output is in order. When the for statement on the Future is finished, the ExecutorService is not needed, so it can exit immediately.

Runtime.getRuntime().availableProcessors() returns the number of currently available cores.

Since you can know the number of available cores of the current PC (device), you can create threads efficiently.

package test;

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceTest3 {

    public static void main(String args[]) {
        final int maxCore = Runtime.getRuntime().availableProcessors();
        final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        final List<Future<String>> futures = new ArrayList<>();

        for (int i = 1; i < 5; i++) {
            final int index = i;
            futures.add(executor.submit(() -> {
                System.out.println("finished job" + index);
                return "job" + index + " " + Thread.currentThread().getName();
            }));
        }

        for (Future<String> future : futures) {
            String result = null;
            try {
                result = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(result);
        }

        executor.shutdownNow();
        System.out.println("end");
    }
}

Jobs may not be processed in sequence, but logs are output sequentially.

finished job1
finished job3
finished job2
job1 pool-1-thread-1
job2 pool-1-thread-2
job3 pool-1-thread-3
finished job4
job4 pool-1-thread-4
end

BlockingQueue

In fact, if you look at the code for the Future above, there is an inefficient part. If the first job is processed late, the logs for other jobs are also output late. Of course, you can output the log by finding jobs that end first with a for statement.

BlockingQueue makes this easy. When the task is finished, add the result to the BlockingQueue and wait for the Queue on the main thread.

Overall, it is a structure that adds to this queue in multi-thread. It seems to be a concurrency problem, but BlockingQueue objects are implemented to guarantee concurrency. (The amount of code seems to have increased, so I implemented the ParallelExcutorService object.)

package test;

import java.util.List;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ExecutorServiceTest4 {

    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");

        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }

        System.out.println("end");
        service.close();
    }

    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        public ParallelExcutorService() {
        }

        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }

        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}

If you look at the log, the log is output from the main thread in the order in which it was processed.

finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end

Reference

codechachaCopyright ©2019 codechacha