Java - FixedThreadPool(ThreadPoolExecutor), how to use fixed threadpool

FixedThreadPool refers to a fixed-size threadpool. You can create a thread pool with a fixed size through the newFixedThreadPool() method of Executors as shown below.

// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ExecutorService is an interface and ThreadPoolExecutor implements this interface. So, you can use it in the following way.

ExecutorService executor = Executors.newFixedThreadPool(4);
ThreadPoolExecutor executor1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

We can add multiple Tasks to this thread pool to process them. Since the thread has already been created, there is no need to create and terminate a thread every time a task is processed.

Example using a fixed thread pool

The following is the most basic example of a FixedThreadPool.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // If you schedule a task in ThreadPoolExecutor, it runs the task when there is free time.
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // Can`t add Task to ThreadPoolExecutor anymore
        // When all tasks are complete, the thread pool is terminated.
        executor.shutdown();
    }
}
  • You can schedule a task with execute() and execute the task when there is room in the thread pool.

An object implementing Runnable can be passed as an argument, and it can also be passed as an anonymous class or lambda expression.

  • When shutdown() is called, it no longer receives a Task. Terminates the thread pool after completing all processing tasks.

result

09:33:38.221 Execute task 0
09:33:38.228 Execute task 1
09:33:38.228 Doing job 0
09:33:38.228 Done job 0
09:33:38.228 Execute task 2
09:33:38.228 Doing job 1
09:33:38.228 Done job 1
09:33:38.228 Execute task 3
09:33:38.228 Doing job 2
09:33:38.228 Done job 2
09:33:38.228 Execute task 4
09:33:38.228 Doing job 3
09:33:38.228 Done job 3
09:33:38.228 Execute task 5
09:33:38.228 Doing job 4
09:33:38.228 Execute task 6
09:33:38.228 Doing job 5
09:33:38.228 Doing job 6
09:33:38.229 Done job 5
09:33:38.228 Done job 4
09:33:38.229 Done job 6
09:33:38.228 Execute task 7
09:33:38.229 Execute task 8
09:33:38.229 Doing job 7
09:33:38.229 Execute task 9
09:33:38.229 Done job 7
09:33:38.229 Doing job 8
09:33:38.229 Doing job 9
09:33:38.229 Done job 8
09:33:38.229 Done job 9

Wait for all tasks to complete

When you call shutdown(), you no longer receive a Task. However, previously added tasks are performed. If you want to wait for all tasks to complete, you should use awaitTermination() like this:

public class FixedSizeThreadPoolExecutorExample1 {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);
            executor.execute(() -> {
                System.out.println(LocalTime.now() + " Doing job " + jobId);
                sleepSec(3);
                System.out.println(LocalTime.now() + " Done job " + jobId);
            });
        }

        // Can`t add Task to ThreadPoolExecutor anymore
        // When all tasks are complete, the thread pool is terminated.
        executor.shutdown();

        // Among the tasks registered before the shutdown() call, there may be tasks that are not yet completed.
        // Set Timeout to 20 seconds and wait for it to complete
        // Returns true if completed before 20 seconds, false if not completed after 20 seconds
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            System.out.println(LocalTime.now() + " All jobs are terminated");
        } else {
            System.out.println(LocalTime.now() + " some jobs are not terminated");

            // Force close all tasks
            executor.shutdownNow();
        }
    }

    private static void sleepSec(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • awaitTermination(): Waits for the task to complete for a set amount of time
  • shutdownNow() : Cancels the scheduled task and forcibly terminates all running tasks.

result

09:38:54.556 Execute task 0
09:38:54.565 Execute task 1
09:38:54.565 Doing job 0
09:38:54.565 Execute task 2
09:38:54.566 Doing job 1
09:38:54.566 Execute task 3
09:38:54.566 Doing job 2
09:38:54.566 Execute task 4
09:38:54.566 Execute task 5
09:38:54.566 Doing job 3
09:38:54.566 Execute task 6
09:38:54.566 Execute task 7
09:38:54.566 Execute task 8
09:38:54.566 Execute task 9
09:38:57.566 Done job 0
09:38:57.566 Done job 1
09:38:57.566 Doing job 4
09:38:57.566 Done job 2
09:38:57.566 Doing job 5
09:38:57.566 Done job 3
09:38:57.566 Doing job 6
09:38:57.567 Doing job 7
09:39:00.566 Done job 4
09:39:00.566 Doing job 8
09:39:00.567 Done job 5
09:39:00.567 Doing job 9
09:39:00.567 Done job 6
09:39:00.567 Done job 7
09:39:03.567 Done job 8
09:39:03.567 Done job 9
09:39:03.568 All jobs are terminated

When the timeout of awaitTermination() in the code above is changed to 5 seconds, the execution result

09:37:49.566 Execute task 0
09:37:49.574 Execute task 1
09:37:49.574 Doing job 0
09:37:49.574 Execute task 2
09:37:49.574 Doing job 1
09:37:49.574 Execute task 3
09:37:49.574 Doing job 2
09:37:49.579 Execute task 4
09:37:49.579 Doing job 3
09:37:49.579 Execute task 5
09:37:49.579 Execute task 6
09:37:49.579 Execute task 7
09:37:49.579 Execute task 8
09:37:49.579 Execute task 9
09:37:52.574 Done job 0
09:37:52.574 Done job 1
09:37:52.574 Doing job 4
09:37:52.574 Doing job 5
09:37:52.574 Done job 2
09:37:52.575 Doing job 6
09:37:52.579 Done job 3
09:37:52.580 Doing job 7
09:37:54.581 some jobs are not terminated
09:37:54.584 Done job 7
java.lang.InterruptedException: sleep interrupted
09:37:54.585 Done job 6
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at concurrent.FixedSizeThreadPoolExecutorExample1.sleepSec(FixedSizeThreadPoolExecutorExample1.java:41)
	at concurrent.FixedSizeThreadPoolExecutorExample1.lambda$main$0(FixedSizeThreadPoolExecutorExample1.java:18)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
09:37:54.586 Done job 4
09:37:54.586 Done job 5

How to create a thread pool as many as the number of available cores

When determining the number of thread pools, the standard can be vague as much as possible. availableProcessors() returns the number of available cores in the JVM environment in which Java is running.

package concurrent;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class FixedSizeThreadPoolExecutorExample3 {

    public static void main(String[] args) {

        final int maxCore = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxCore);
        System.out.println("Max thread pool size: " + executor.getMaximumPoolSize());

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println("Execute task " + jobId);
            executor.execute(() -> {
                System.out.println("Doing a job " + jobId);
            });
        }
        executor.shutdown();
    }
}

result

Max thread pool size: 8
Execute task 0
Execute task 1
Doing a job 0
Execute task 2
Doing a job 1
Execute task 3
Doing a job 2
Execute task 4
Doing a job 3
Execute task 5
Doing a job 4
Execute task 6
Doing a job 5
Execute task 7
Doing a job 6
Doing a job 7
Execute task 8
Execute task 9
Doing a job 8
Doing a job 9

Get the task`s result returned as a Future

In the previous example, we passed the Task as a Runnable. You can also pass a Callable object that can return a result. submit() takes a Callable as an argument and returns a Future. You can receive a return value through a Future.

The following is an example of passing a Callable Task and receiving a String as a result.

package concurrent;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample4 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final List<Future<String>> futures = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // submit() also receives a Callable as an argument and returns a Future.
            Future<String> result = executor.submit(() -> {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(LocalTime.now() + " Done task " + jobId);
                return "finished job " + jobId;
            });
            futures.add(result);
        }

        // future.get() waits for the task`s result to be returned
        for (Future<String> future : futures) {
            String result = future.get();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}

Future.get() waits for a result to be returned in a Future. The above for statement waits for the value of the second future to return when the value of the first future is returned. For example, if the first task takes 10 seconds and the second task takes 1 second, the second result is delayed because the first task continues to wait.

If you want to process results in the order in which tasks are completed, you can implement it using BlockingQueue.

result

12:24:36.948 Execute task 0
12:24:36.958 Execute task 1
12:24:36.958 Execute task 2
12:24:36.959 Execute task 3
12:24:36.959 Execute task 4
12:24:36.959 Execute task 5
12:24:36.959 Execute task 6
12:24:36.959 Execute task 7
12:24:36.959 Execute task 8
12:24:36.959 Execute task 9
12:24:38.959 Done task 0
12:24:38.959 Done task 1
12:24:38.959 Done task 2
12:24:38.959 finished job 0
12:24:38.959 Done task 3
12:24:38.959 finished job 1
12:24:38.959 finished job 2
12:24:38.959 finished job 3
12:24:40.959 Done task 4
12:24:40.959 Done task 6
12:24:40.959 Done task 5
12:24:40.959 Done task 7
12:24:40.959 finished job 4
12:24:40.960 finished job 5
12:24:40.960 finished job 6
12:24:40.960 finished job 7
12:24:42.959 Done task 8
12:24:42.959 Done task 9
12:24:42.960 finished job 8
12:24:42.960 finished job 9

Get result returned with BlockingQueue

You can get the result using BlockingQueue without using Future. BlockingQueue guarantees concurrency, so there is no problem with multi-thread access.

The example below uses Runnable, not Callable. This is because the Task does not need to return a result.

And Task 0 sleeps 10 seconds, and Task9 sleeps 1 second. This made the task that is executed first to take longer. The reason we do this is to demonstrate that we can process the results in the order in which the Tasks complete.

Here is an example.

package concurrent;

import java.time.LocalTime;
import java.util.concurrent.*;

public class FixedSizeThreadPoolExecutorExample5 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        for (int i = 0; i < 10; i++) {
            final int jobId = i;
            final int sleepSec = 10 - i;
            System.out.println(LocalTime.now() + " Execute task " + jobId);

            // Result is stored in BlockingQueue
            executor.submit(() -> {
                System.out.println(LocalTime.now() + " Doing Task " + jobId + ", sleepSec: " + sleepSec);
                try {
                    TimeUnit.SECONDS.sleep(sleepSec);
                    String result = "finished job " + jobId;
                    queue.put(result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // take() waits for an item to be added to the queue and returns
        for (int i = 0; i < 10; i++) {
            String result = queue.take();
            System.out.println(LocalTime.now() + " " + result);
        }

        executor.shutdown();
    }
}
12:26:09.009 Execute task 0
12:26:09.018 Execute task 1
12:26:09.018 Doing Task 0, sleepSec: 10
12:26:09.018 Execute task 2
12:26:09.018 Doing Task 1, sleepSec: 9
12:26:09.018 Execute task 3
12:26:09.018 Doing Task 2, sleepSec: 8
12:26:09.018 Execute task 4
12:26:09.018 Doing Task 3, sleepSec: 7
12:26:09.018 Execute task 5
12:26:09.018 Execute task 6
12:26:09.018 Execute task 7
12:26:09.018 Execute task 8
12:26:09.018 Execute task 9
12:26:16.019 Doing Task 4, sleepSec: 6
12:26:16.019 finished job 3
12:26:17.018 finished job 2
12:26:17.018 Doing Task 5, sleepSec: 5
12:26:18.018 Doing Task 6, sleepSec: 4
12:26:18.018 finished job 1
12:26:19.018 finished job 0
12:26:19.018 Doing Task 7, sleepSec: 3
12:26:22.018 Doing Task 8, sleepSec: 2
12:26:22.018 finished job 7
12:26:22.019 Doing Task 9, sleepSec: 1
12:26:22.019 finished job 6
12:26:22.019 finished job 4
12:26:22.019 finished job 5
12:26:23.019 finished job 9
12:26:24.019 finished job 8

Reference

codechachaCopyright ©2019 codechacha