Java - ArrayBlockingQueue 사용 방법

JS · 09 Jun 2020

ArrayBlockingQueue는 Array로 구현된 BlockingQueue입니다. Queue의 크기가 정해져 있기 때문에 무한히 아이템을 추가할 수 없습니다. 추가되는 아이템은 순서가 있으며, FIFO(First In First Out) 순서를 따릅니다.

BlockingQueue는 Queue에서 아이템을 가져올 때 비어있으면 null을 리턴하지 않고 아이템이 추가될 때까지 기다립니다. 반대로, 아이템을 추가할 때 Queue가 가득차 있으면 공간이 생길 때까지 기다립니다.

따라서, ArrayBlockingQueue는 멀티 쓰레드 환경에서 사용하기 위해 구현되었으며 내부적으로 동시성에 안전하기 때문에 synchronized 구문 없이 사용해도 됩니다.

정리해보면 다음과 같습니다.

  • ArrayBlockingQueue는 BlockingQueue 인터페이스를 구현
  • Queue를 생성할 때 크기를 설정하며 내부적으로 배열을 사용하여 아이템을 저장
  • 동시성에 안전하여 멀티 쓰레드에서 synchronized 없이 사용 가능
  • 아이템을 꺼낼 때 비어있으며 추가될 때까지 기다림
  • 아이템을 추가할 때 Queue가 가득차있으면 Exception이 발생하거나 일정 시간 기다릴 수 있음

ArrayBlockingQueue 생성

간단하게 다음과 같이 객체를 생성할 수 있습니다. 인자로 Queue의 크기를 전달합니다.

int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

아래와 같이 인자로 fair를 전달할 수도 있습니다. fair가 true라면 멀티쓰레드에서 이 Queue에 접근할 때 Lock을 공정하게 얻게 됩니다. false라면 정해진 규칙없이 없습니다. fair가 false가 되면 어떤 쓰레드는 오랫동안 Lock을 획득하지 못하여 Queue를 사용하지 못하는 현상이 발생할 수 있습니다.

int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true);

또한, 다음과 같이 capacity, fair 그리고 초기값을 인자로 전달할 수 있습니다. 초기값 리스트는 Collection 객체로 전달하면 됩니다.

List<Integer> list = Arrays.asList(10, 20, 30);
int capacity = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity, true, list);

add() : 아이템 추가

add()으로 아이템을 Queue에 추가할 수 있습니다. FIFO 순서로 Queue에 저장됩니다.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
System.out.println(queue);

queue.add(2);
queue.add(3);
System.out.println(queue);

queue.add(4);
queue.add(5);
System.out.println(queue);

Output:

[1]
[1, 2, 3]
[1, 2, 3, 4, 5]

만약 Queue의 크기보다 더 많은 아이템을 추가하려고 하면 Exception이 발생합니다.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
System.out.println(queue);

if (queue.remainingCapacity() == 0) {
    System.out.println("Queue is full");
} else {
    System.out.println("Queue is not full");
}

try {
    queue.add(6);
} catch (Exception e) {
    e.printStackTrace();
}

remainingCapacity()는 Queue의 여유 공간을 리턴해 줍니다. 이것으로 Queue가 가득 찻는지 알 수 있습니다. Full인 상태에서 add()를 추가하면 IllegalStateException이 발생합니다.

Output:

[1]
[1, 2, 3]
[1, 2, 3, 4, 5]
Queue is full
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at example.concurrent.ArrayBlockingQueueExample.main(ArrayBlockingQueueExample.java:63)

put() : 공간이 생길 때까지 기다렸다가 아이템 추가

Queue가 full인 상태에서 add()는 Exception을 발생시킵니다. put()add()와 거의 동일하지만 Queue가 full일 때 Exception을 발생시키지 않고 여유 공간이 생길 때까지 무한히 기다립니다.

다음 코드를 보면 6을 넣을 때 무한히 기다리게 됩니다.

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    queue.put(1);
    System.out.println("Queue: " + queue);
    queue.put(2);
    System.out.println("Queue: " + queue);
    queue.put(6);
    System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

Queue: [1]
Queue: [1, 2]
// waiting for space to become available

offer() : 예외 없이 아이템 추가

Queue가 full인 상태에서 add()는 Exception을 발생시킵니다. Exception을 발생시키고 싶지 않다면 offer()를 사용할 수 있습니다.

offer()add()와 거의 동일하지만 Queue가 full일 때 Exception을 발생시키지 않고 false를 리턴합니다.

다음은 offer를 사용하는 예제입니다. 아이템이 추가되면 true를 리턴하며, 추가하지 못하면 false를 리턴합니다.

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

System.out.println("success: " + queue.offer(1));
System.out.println("success: " + queue.offer(2));

try {
    boolean success = queue.offer(6);
    System.out.println("success: " + success);
} catch (Exception e) {
    e.printStackTrace();
}

Output:

success: true
success: true
success: false

offer(timeout) : 일정시간 기다렸다 아이템 추가

Queue가 Full일 때 offer()는 아이템을 추가하지 않고 false를 리턴합니다.

offer()의 인자로 Timeout을 전달하면 설정한 시간 만큼 기다리고, 정해진 시간 내에 여유 공간이 생기면 아이템을 추가합니다.

다음 코드를 실행하시면, 1과 2는 바로 추가되지만 6은 5초동안 기다렸다가 false를 리턴합니다. 5초 내에 공간이 생기면 아이템을 추가하고 true를 리턴합니다.

int capacity = 2;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

try {
    long timeout = 5000;
    System.out.println("success: " + queue.offer(1, timeout, TimeUnit.MILLISECONDS));
    System.out.println("success: " + queue.offer(2, timeout, TimeUnit.MILLISECONDS));

    boolean success = queue.offer(6, timeout, TimeUnit.MILLISECONDS);
    System.out.println("success: " + success);
} catch (Exception e) {
    e.printStackTrace();
}

take() : 아이템 가져오기

take()는 Queue에서 아이템을 삭제하고 그 값을 리턴합니다.

다음과 같이 먼저 추가한 것을 먼저 꺼냅니다. Queue가 비어있는 경우 InterruptedException가 발생할 수 있기 때문에 예외처리가 필요합니다.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);

queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("Queue: " + queue);

try {
    int head = queue.take();
    System.out.println("head: " + head);
    System.out.println("Queue: " + queue);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

Queue: [1, 2, 3]
head: 1
Queue: [2, 3]

아이템이 추가될 때까지 무한히 대기

Queue가 empty일 때 take()를 호출하면 block되고, 아이템이 추가될 때까지 기다립니다. 대기 중에 다른 쓰레드에서 아이템이 추가되면 그 값을 리턴합니다.

대기 중에 Interrupt가 발생할 수 있기 때문에 InterruptedException에 대한 예외처리가 필요합니다.

아래 코드를 실행하면 무한히 기다리게 됩니다. 왜냐하면 다른 쓰레드에서 이 Queue에 아이템을 추가하지 않기 때문입니다.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    int head = queue.take();
    System.out.println("head: " + head);
} catch (InterruptedException e) {
    e.printStackTrace();
}

poll(timeout) : 아이템 가져올 때 일정 시간 대기

poll()을 사용하면 아이템을 가져올 때 Timeout을 설정하여 무한히 기다리는 일이 발생하지 않도록 할 수 있습니다.

다음 코드는 Timeout을 1초로 설정한 코드입니다. Timeout이 발생하면 null을 리턴합니다.

int capacity = 5;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(capacity);
try {
    long timeout = 1000;
    Integer head = queue.poll(timeout, TimeUnit.MILLISECONDS);
    System.out.println("head: " + head);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Output:

head: null

멀티쓰레드에서 동작하는 BlockingQueue 예제

멀티쓰레드에서 동작하는 BlockingQueue 예제입니다.

ExecutorService를 사용하는 방법의 "BlockingQueue" 코드를 참고하였습니다.

ParallelExcutorService의 병렬 쓰레드에서 add()를 호출하고 Main 쓰레드에서 take()를 호출하고 있습니다.

package test;

import java.util.List;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ExecutorServiceTest4 {

    public static void main(String args[]) {
        ParallelExcutorService service = new ParallelExcutorService();
        service.submit("job1");
        service.submit("job2");
        service.submit("job3");
        service.submit("job4");

        for (int i = 0 ; i < 4; i++) {
            String result = service.take();
            System.out.println(result);
        }

        System.out.println("end");
        service.close();
    }

    private static class ParallelExcutorService {
        private final int maxCore = Runtime.getRuntime().availableProcessors();
        private final ExecutorService executor = Executors.newFixedThreadPool(maxCore);
        private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

        public ParallelExcutorService() {
        }

        public void submit(String job) {
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished " + job);
                String result = job + ", " + threadName;
                try {
                    queue.put(result);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        public String take() {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }

        public void close() {
            List<Runnable> unfinishedTasks = executor.shutdownNow();
            if (!unfinishedTasks.isEmpty()) {
                System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
            }
        }
    }
}

Output:

finished job1
finished job3
finished job4
finished job2
job1, pool-1-thread-1
job3, pool-1-thread-3
job4, pool-1-thread-4
job2, pool-1-thread-2
end

참고

codechachaCopyright ©2019 codechacha