Java - How to use ArrayBlockingQueue

ArrayBlockingQueue is a BlockingQueue implemented as an Array. Since the size of the queue is fixed, items cannot be added infinitely. Items to be added are in order and follow FIFO (First In First Out) order.

BlockingQueue does not return null when retrieving an item from the Queue and waiting for an item to be added if empty. Conversely, when adding items, if the Queue is full, it waits for space to become available.

Therefore, ArrayBlockingQueue is implemented for use in a multi-threaded environment and can be used without the synchronized syntax as it is internally concurrency safe.

In summary:

  • ArrayBlockingQueue implements the BlockingQueue interface.
  • Set the size when creating the queue, and internally use an array to store items
  • It is concurrency-safe and can be used without synchronization in multiple threads.
  • Empty when retrieving an item and waiting for it to be added
  • If the queue is full when adding items, an exception may occur or it may wait for a certain period of time.

Create ArrayBlockingQueue

You can simply create an object like this: Pass the size of the Queue as an argument.

int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

You can also pass fair as an argument as shown below. If fair is true, the lock will be obtained fairly when accessing this queue from multiple threads. If false, there is no set rule. If fair becomes false, some threads may not be able to acquire a lock for a long time and thus the queue may not be used.

int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true);

Also, capacity, fair and initial values can be passed as arguments as follows. The initial value list can be passed as a Collection object.

List<Integer> list = Arrays.asList(10, 20, 30);
int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true, list);

add() : add item

You can add items to the queue with add(). They are stored in the queue in FIFO order.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
System.out.println(queue);

queue.add(2);
queue.add(3);
System.out.println(queue);

queue.add(4);
queue.add(5);
System.out.println(queue);

Output:

[1]
[1, 2, 3]
[1, 2, 3, 4, 5]

If you try to add more items than the queue size, an exception is thrown.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
System.out.println(queue);

if (queue.remainingCapacity() == 0) {
    System.out.println("Queue is full");
} else {
    System.out.println("Queue is not full");
}

try {
    queue.add(6);
} catch (Exception e) {
    e.printStackTrace();
}

remainingCapacity() returns free space in queue. This will tell you if the Queue is full. If add() is added in full state, IllegalStateException is thrown.

Output:

[1]
[1, 2, 3]
[1, 2, 3, 4, 5]
Queue is full
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at example.concurrent.ArrayBlockingQueueExample.main(ArrayBlockingQueueExample.java:63)

put() : Wait until space is available and add item

When the queue is full, add() throws an Exception. put() is almost identical to add(), but it does not throw an exception when the queue is full and waits indefinitely until free space is available.

If you look at the following code, it will wait indefinitely when you insert 6

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    queue.put(1);
    System.out.println("Queue: " + queue);
    queue.put(2);
    System.out.println("Queue: " + queue);
    queue.put(6);
    System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

Queue: [1]
Queue: [1, 2]
// waiting for space to become available

offer() : Add item without exception

When the queue is full, add() throws an Exception. If you dont want to throw an Exception, you can use offer()`.

offer() is almost identical to add(), but returns false without raising an exception when the queue is full.

Here is an example using offer. Returns true if the item is added, and false if not added.

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

System.out.println("success: " + queue.offer(1));
System.out.println("success: " + queue.offer(2));

try {
    boolean success = queue.offer(6);
    System.out.println("success: " + success);
} catch (Exception e) {
    e.printStackTrace();
}

Output:

success: true
success: true
success: false

offer(timeout) : Add item after waiting a certain amount of time

When the queue is full, offer() returns false without adding any items.

If Timeout is passed as an argument to offer(), it waits for the set time, and if there is free space within the set time, an item is added.

If you run the following code, 1 and 2 are added immediately, but 6 waits for 5 seconds and returns false. If space is available within 5 seconds, add the item and return true.

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

try {
    long timeout = 5000;
    System.out.println("success: " + queue.offer(1, timeout, TimeUnit.MILLISECONDS));
    System.out.println("success: " + queue.offer(2, timeout, TimeUnit.MILLISECONDS));

    boolean success = queue.offer(6, timeout, TimeUnit.MILLISECONDS);
    System.out.println("success: " + success);
} catch (Exception e) {
    e.printStackTrace();
}

take() : to get an item

take() removes an item from the queue and returns its value.

Eject the first one you added first, like this: Exception handling is necessary because InterruptedException may occur if the queue is empty.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("Queue: " + queue);

try {
    int head = queue.take();
    System.out.println("head: " + head);
    System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

Queue: [1, 2, 3]
head: 1
Queue: [2, 3]

Wait indefinitely for item to be added

If take() is called when the queue is empty, it blocks and waits for an item to be added. If an item is added from another thread while waiting, its value is returned.

Interrupt may occur while waiting, so an exception handling for InterruptedException is required.

Running the code below will make it wait indefinitely This is because no other thread is adding items to this queue.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    int head = queue.take();
    System.out.println("head: " + head);
} catch (InterruptedException e) {
    e.printStackTrace();
}

poll(timeout) : Wait for a certain amount of time when fetching an item

You can use poll() to set a Timeout when fetching an item to avoid waiting indefinitely.

The following code sets the Timeout to 1 second. Returns null if timeout occurs.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    long timeout = 1000;
    Integer head = queue.poll(timeout, TimeUnit.MILLISECONDS);
    System.out.println("head: " + head);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

head: null

Example of BlockingQueue running in multithreading

This is an example of BlockingQueue running in multithreading.

Refer to the "BlockingQueue" code of How to use ExecutorService.

I am calling add() from the parallel thread of ParallelExcutorService and take() from the main thread.

package test;

import java.util.List;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ExecutorServiceTest4 {

    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");

        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }

        System.out.println("end");
        service.close();
    }

    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        public ParallelExcutorService() {
        }

        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }

        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}

Output:

finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end

Reference

codechachaCopyright ©2019 codechacha