Java - CountDownLatch 사용 방법

CountDownLatch는 어떤 쓰레드가 다른 쓰레드에서 작업이 완료될 때 까지 기다릴 수 있도록 해주는 클래스입니다.

예를 들어, Main thread에서 5개의 쓰레드를 생성하여 어떤 작업을 병렬로 처리되도록 할 수 있습니다. 이 때 Main thread는 다른 쓰레드가 종료되는 것을 기다리지 않고 다음 코드(statements)를 수행합니다. 여기서 CountDownLatch를 사용하면 다음 코드(statements)를 실행하지 않고 기다리도록 만들 수 있습니다.

다른 예로, 어떤 작업을 처리하는데 CPU 리소스가 많이 필요하지 않기 때문에 Main thread에서만 처리하도록 할 수 있습니다. 하지만 어떤 프로세스가 실행되기를 기다리거나 Network 등의 외부에서 어떤 이벤트가 발생하길 기다린다면, 그런 이벤트가 발생하지 않았을 때 무한히 기다리게 될 수도 있습니다. 이럴 때, 다른 Thread에서 이 작업을 수행하도록 하고 Main thread는 일정 시간을 초과하면 작업을 기다리지 않도록, Timeout을 설정할 수 있습니다.

예로 든 것처럼, CountDownLatch를 이용하여 자신의 목적에 맞게 사용할 수 있습니다.

CountDownLatch의 예제를 소개하면서 어떻게 사용하는지 알아보겠습니다.

CountDownLatch 작동 원리

CountDownLatch는 다음과 같이 생성할 수 있습니다. 인자로 Latch의 숫자를 전달합니다.

CountDownLatch countDownLatch = new CountDownLatch(5);

다음과 같이 countDown()을 호출하면 Latch의 숫자가 1개씩 감소합니다.

countDownLatch.countDown();

await()은 Latch의 숫자가 0이 될 때까지 기다리는 코드입니다.

countDownLatch.await();

다른 쓰레드에서 countDown()을 5번 호출하게 된다면 Latch는 0이 되며, await()은 더 이상 기다리지 않고 다음 코드를 실행하게 됩니다.

다른 쓰레드 작업이 완료될 때까지 기다리기

다음은 Main 쓰레드가 5개의 쓰레드를 생성하고, 5개의 쓰레드의 작업이 완료될 때까지 기다리는 예제입니다.

public class CountDownLatchExample {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(countDownLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        System.out.println("Waiting for some work to be finished (tid: "
                + Thread.currentThread().getId() + ")");

        countDownLatch.await();

        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch countDownLatch;

        public Worker(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
            countDownLatch.countDown();
        }
    }
}

실행해보면 다음과 같은 결과가 출력됩니다. Main 쓰레드는 다른 쓰레드의 작업이 모두 완료될 때까지 기다리고 마지막으로 Finished라는 로그를 출력하고 종료하였습니다.

Start multi threads (tid: 1)
Doing something (tid: 11)
Doing something (tid: 12)
Doing something (tid: 13)
Doing something (tid: 14)
Waiting for some work to be finished (tid: 1)
Doing something (tid: 15)
Finished (tid: 1)

위의 코드에서 다음 코드가 쓰레드를 생성하고 실행시키는 코드입니다. Java8의 Stream을 이용하여 쓰레드를 생성하고 실행시켰습니다. 또한, 각각의 쓰레드에 countDownLatch를 인자로 전달하였습니다.

CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
        .generate(() -> new Thread(new Worker(countDownLatch)))
        .limit(5)
        .collect(toList());

workers.forEach(Thread::start);

다음은 Main thread에서 다른 thread의 작업이 끝날 때까지 기다리는 코드입니다. Latch의 초기값을 5로 설정하였기 때문에 Main 쓰레드는 countDown()이 5번 호출될 때까지 계속 기다리게 됩니다.

countDownLatch.await();

다음은 Worker 클래스입니다. 각각의 쓰레드는 Worker.run()을 실행하게 됩니다. run()이 실행되면 로그를 출력하고 countDown()을 호출합니다.

public static class Worker implements Runnable {
    private CountDownLatch countDownLatch;

    public Worker(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
        countDownLatch.countDown();
    }
}

모든 쓰레드가 준비될 때까지 기다리기

위의 예제에서는 먼저 실행되는 쓰레드가 먼저 작업을 수행하고 종료되었습니다. 만약 어떤 작업이 동시에 처리되어야 한다면 다음과 같이 구현할 수 있습니다.

위의 예제와 거의 비슷하지만, 2개의 Latch를 더 사용하여 모든 쓰레드가 준비될 때까지 기다리게 만들었습니다.

public class CountDownLatchExample2 {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch readyLatch = new CountDownLatch(5);
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch finishLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(readyLatch,
                                    startLatch, finishLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        readyLatch.await();

        System.out.println("Waited for ready and started doing some work (tid: "
                + Thread.currentThread().getId() + ")");
        startLatch.countDown();

        finishLatch.await();
        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch readyLatch;
        private CountDownLatch startLatch;
        private CountDownLatch finishLatch;

        public Worker(CountDownLatch readyLatch, CountDownLatch startLatch,
                      CountDownLatch finishLatch) {
            this.readyLatch = readyLatch;
            this.startLatch = startLatch;
            this.finishLatch = finishLatch;
        }

        @Override
        public void run() {
            readyLatch.countDown();
            try {
                startLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Do something (tid: "
                    + Thread.currentThread().getId() + ")");
            finishLatch.countDown();
        }
    }
}

다음과 같이 3개의 Latch를 생성하였으며, readyLatchstartLatch는 쓰레드가 준비될 때까지 기다리게 만들기 위한 Latch입니다.

CountDownLatch readyLatch = new CountDownLatch(5);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(5);
List<Thread> workers = Stream
        .generate(() -> new Thread(new Worker(readyLatch, startLatch, finishLatch)))
        .limit(5)
        .collect(toList());

다음은 Worker 클래스의 run()입니다. 각각의 쓰레드는 준비가 되면 readyLatch.countDown()으로 준비가 되었음을 Main thread에게 알려줍니다. 그리고 Main thread가 startLatch.countDown()을 호출하면 모든 쓰레드는 작업을 수행하기 시작합니다.

@Override
public void run() {
    readyLatch.countDown();
    try {
        startLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
    finishLatch.countDown();
}

아래 코드가 Main thread에서 쓰레드를 실행시키고, 준비가 되면 Latch로 작업을 수행하라는 신호를 보내는 코드입니다.

workers.forEach(Thread::start);

readyLatch.await();

System.out.println("Waited for ready and started doing some work (tid: "
        + Thread.currentThread().getId() + ")");
startLatch.countDown();

finishLatch.await();
System.out.println("Finished (tid: "
        + Thread.currentThread().getId() + ")");

실행 결과는 다음과 같습니다.

Start multi threads (tid: 1)
Waited for ready and started doing some work (tid: 1)
Do something (tid: 12)
Do something (tid: 14)
Do something (tid: 13)
Do something (tid: 11)
Do something (tid: 15)
Finished (tid: 1)

정해진 시간만 기다리기 (Timeout)

위의 코드들의 문제점은 어떤 쓰레드가 작업을 완료하지 못하면 countDown() 호출이 안되어 Main thread가 무한히 기다리게 된다는 것입니다.

await()에 Timeout을 설정하면, 정해진 시간만 기다리도록 만들 수 있습니다.

다음과 같이 Timeout을 설정할 수 있습니다. 아래 코드는 Timeout을 5초로 설정하였고, 5초가 지날 때 까지 Latch가 0이 되지 않으면 더 기다리지 않고 다음 코드를 수행하게 됩니다. 시간의 단위는 MINUTES 등으로 변경할 수도 있습니다.

await(5, TimeUnit.SECONDS)

다음은 Timeout을 설정하는 예제입니다.

public class CountDownLatchExample3 {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(countDownLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        System.out.println("Waiting for some work to be finished (tid: "
                + Thread.currentThread().getId() + ")");

        countDownLatch.await(5, TimeUnit.SECONDS);

        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch countDownLatch;

        public Worker(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("Doing something (tid: " + Thread.currentThread().getId() + ")");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            System.out.println("Done (tid: " + Thread.currentThread().getId() + ")");
        }
    }
}

Timeout은 5초로 설정하였고, Worker.run()에서 10초를 기다리게 하였습니다.

실행해보면 다음과 같이 기다리다가 포기하고 다음 코드를 수행합니다.

Start multi threads (tid: 1)
Doing something (tid: 11)
Doing something (tid: 12)
Doing something (tid: 13)
Doing something (tid: 14)
Waiting for some work to be finished (tid: 1)
Doing something (tid: 15)
Finished (tid: 1)
Done (tid: 12)
Done (tid: 11)
Done (tid: 14)
Done (tid: 15)
Done (tid: 13)

이 예제의 결과를 보면, 10초가 지나서 다른 쓰레드의 작업이 모두 완료되었습니다. Timeout이 발생했을 때 쓰레드에 대해서 아무것도 하지 않았기 때문입니다. 만약 Timeout이 발생했을 때 모든 쓰레드를 종료하고 싶다면 Main이 다른 쓰레드들을 모두 종료하도록 만들면 됩니다.

Loading script...

Related Posts

codechachaCopyright ©2019 codechacha