ArrayBlockingQueue is a BlockingQueue implemented as an Array. Since the size of the queue is fixed, items cannot be added infinitely. Items to be added are in order and follow FIFO (First In First Out) order.
BlockingQueue does not return null when retrieving an item from the Queue and waiting for an item to be added if empty. Conversely, when adding items, if the Queue is full, it waits for space to become available.
Therefore, ArrayBlockingQueue is implemented for use in a multi-threaded environment and can be used without the synchronized
syntax as it is internally concurrency safe.
In summary:
- ArrayBlockingQueue implements the BlockingQueue interface.
- Set the size when creating the queue, and internally use an array to store items
- It is concurrency-safe and can be used without synchronization in multiple threads.
- Empty when retrieving an item and waiting for it to be added
- If the queue is full when adding items, an exception may occur or it may wait for a certain period of time.
Create ArrayBlockingQueue
You can simply create an object like this: Pass the size of the Queue as an argument.
int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
You can also pass fair
as an argument as shown below. If fair
is true, the lock will be obtained fairly when accessing this queue from multiple threads.
If false, there is no set rule. If fair becomes false, some threads may not be able to acquire a lock for a long time and thus the queue may not be used.
int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true);
Also, capacity, fair and initial values can be passed as arguments as follows. The initial value list can be passed as a Collection object.
List<Integer> list = Arrays.asList(10, 20, 30);
int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true, list);
add() : add item
You can add items to the queue with add()
. They are stored in the queue in FIFO order.
int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
queue.add(1);
System.out.println(queue);
queue.add(2);
queue.add(3);
System.out.println(queue);
queue.add(4);
queue.add(5);
System.out.println(queue);
Output:
[1]
[1, 2, 3]
[1, 2, 3, 4, 5]
If you try to add more items than the queue size, an exception is thrown.
int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
System.out.println(queue);
if (queue.remainingCapacity() == 0) {
System.out.println("Queue is full");
} else {
System.out.println("Queue is not full");
}
try {
queue.add(6);
} catch (Exception e) {
e.printStackTrace();
}
remainingCapacity()
returns free space in queue. This will tell you if the Queue is full.
If add()
is added in full state, IllegalStateException is thrown.
Output:
[1]
[1, 2, 3]
[1, 2, 3, 4, 5]
Queue is full
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at example.concurrent.ArrayBlockingQueueExample.main(ArrayBlockingQueueExample.java:63)
put() : Wait until space is available and add item
When the queue is full, add()
throws an Exception.
put()
is almost identical to add()
, but it does not throw an exception when the queue is full and waits indefinitely until free space is available.
If you look at the following code, it will wait indefinitely when you insert 6
int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
queue.put(1);
System.out.println("Queue: " + queue);
queue.put(2);
System.out.println("Queue: " + queue);
queue.put(6);
System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
Output:
Queue: [1]
Queue: [1, 2]
// waiting for space to become available
offer() : Add item without exception
When the queue is full, add()
throws an Exception. If you dont want to throw an Exception, you can use
offer()`.
offer()
is almost identical to add()
, but returns false without raising an exception when the queue is full.
Here is an example using offer. Returns true
if the item is added, and false
if not added.
int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
System.out.println("success: " + queue.offer(1));
System.out.println("success: " + queue.offer(2));
try {
boolean success = queue.offer(6);
System.out.println("success: " + success);
} catch (Exception e) {
e.printStackTrace();
}
Output:
success: true
success: true
success: false
offer(timeout) : Add item after waiting a certain amount of time
When the queue is full, offer()
returns false without adding any items.
If Timeout is passed as an argument to offer()
, it waits for the set time, and if there is free space within the set time, an item is added.
If you run the following code, 1 and 2 are added immediately, but 6 waits for 5 seconds and returns false. If space is available within 5 seconds, add the item and return true.
int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
long timeout = 5000;
System.out.println("success: " + queue.offer(1, timeout, TimeUnit.MILLISECONDS));
System.out.println("success: " + queue.offer(2, timeout, TimeUnit.MILLISECONDS));
boolean success = queue.offer(6, timeout, TimeUnit.MILLISECONDS);
System.out.println("success: " + success);
} catch (Exception e) {
e.printStackTrace();
}
take() : to get an item
take()
removes an item from the queue and returns its value.
Eject the first one you added first, like this: Exception handling is necessary because InterruptedException may occur if the queue is empty.
int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("Queue: " + queue);
try {
int head = queue.take();
System.out.println("head: " + head);
System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
Output:
Queue: [1, 2, 3]
head: 1
Queue: [2, 3]
Wait indefinitely for item to be added
If take()
is called when the queue is empty, it blocks and waits for an item to be added.
If an item is added from another thread while waiting, its value is returned.
Interrupt may occur while waiting, so an exception handling for InterruptedException is required.
Running the code below will make it wait indefinitely This is because no other thread is adding items to this queue.
int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
int head = queue.take();
System.out.println("head: " + head);
} catch (InterruptedException e) {
e.printStackTrace();
}
poll(timeout) : Wait for a certain amount of time when fetching an item
You can use poll()
to set a Timeout when fetching an item to avoid waiting indefinitely.
The following code sets the Timeout to 1 second. Returns null if timeout occurs.
int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
long timeout = 1000;
Integer head = queue.poll(timeout, TimeUnit.MILLISECONDS);
System.out.println("head: " + head);
} catch (InterruptedException e) {
e.printStackTrace();
}
Output:
head: null
Example of BlockingQueue running in multithreading
This is an example of BlockingQueue running in multithreading.
Refer to the "BlockingQueue" code of How to use ExecutorService.
I am calling add()
from the parallel thread of ParallelExcutorService and take()
from the main thread.
package test;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ExecutorServiceTest4 {
public static void main(String args[]) {
ParallelExcutorService service = new ParallelExcutorService();
service.submit("job1");
service.submit("job2");
service.submit("job3");
service.submit("job4");
for (int i = 0 ; i < 4; i++) {
String result = service.take();
System.out.println(result);
}
System.out.println("end");
service.close();
}
private static class ParallelExcutorService {
private final int maxCore = Runtime.getRuntime().availableProcessors();
private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
public ParallelExcutorService() {
}
public void submit(String job) {
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished " + job);
String result = job + ", " + threadName;
try {
queue.put(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
public void close() {
List<Runnable> unfinishedTasks = executor.shutdownNow();
if (!unfinishedTasks.isEmpty()) {
System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
}
}
}
}
Output:
finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end
Reference
Related Posts
- Java - Remove items from List while iterating
- Java - How to find key by value in HashMap
- Java - Update the value of a key in HashMap
- Java - How to put quotes in a string
- Java - How to put a comma (,) after every 3 digits
- BiConsumer example in Java 8
- Java 8 - Consumer example
- Java 8 - BinaryOperator example
- Java 8 - BiPredicate Example
- Java 8 - Predicate example
- Java 8 - Convert Stream to List
- Java 8 - BiFunction example
- Java 8 - Function example
- Java - Convert List to Map
- Exception testing in JUnit
- Hamcrest Collections Matcher
- Hamcrest equalTo () Matcher
- AAA pattern of unit test (Arrange/Act/Assert)
- Hamcrest Text Matcher
- Hamcrest Custom Matcher
- Why Junit uses Hamcrest
- Java - ForkJoinPool
- Java - How to use Futures
- Java - Simple HashTable implementation
- Java - Create a file in a specific path
- Java - Mockito의 @Mock, @Spy, @Captor, @InjectMocks
- Java - How to write test code using Mockito
- Java - Synchronized block
- Java - How to decompile a ".class" file into a Java file (jd-cli decompiler)
- Java - How to generate a random number
- Java - Calculate powers, Math.pow()
- Java - Calculate the square root, Math.sqrt()
- Java - How to compare String (==, equals, compare)
- Java - Calculate String Length
- Java - case conversion & comparison insensitive (toUpperCase, toLowerCase, equalsIgnoreCase)