Java - CountDownLatchを使用する方法、および例

CountDownLatchはどのスレッドが他のスレッドでの作業が完了するまで待つことができるようにするクラスです。

たとえば、Main threadで5つのスレッドを作成して、どのような操作を並列に処理されるようにすることができます。 このときMain threadは、他のスレッドが終了するのを待たずに、次のコード(statements)を実行します。 ここCountDownLatchを使用すると、次のコード(statements)を実行せずに待つようにすることができます。

他の例として、いくつかのタスクを処理するのにCPUリソースが多く必要としないので、Main threadのみ処理するようにすることができます。 しかし、いくつかのプロセスが実行されるのを待つか、Networkなどの外部からのイベントが発生したい待っている場合、このようなイベントが発生していないとき、無限待たされることがあります。 このようなときは、他のThreadでこのタスクを実行するようにしてMain threadは、一定時間を超えると、作業を待たないように、Timeoutを設定することができます。

例に挙げたように、CountDownLatchを利用して、自分の目的に合わせて使用することができます。

CountDownLatchの例を紹介しながら、どのように使用するかを知ってみましょう。

CountDownLatch動作原理

CountDownLatchは、次のように生成することができます。引数としてLatchの数を渡します。

CountDownLatch countDownLatch = new CountDownLatch(5);

次のように countDown()を呼び出すと、Latchの数が1つずつ減少します。

countDownLatch.countDown();

await()はLatchの数が0になるまで待つのコードです。

countDownLatch.await();

他のスレッドで countDown()を5回呼び出した場合Latchは0になり、 await()は、もはや待たずに次のコードを実行することになります。

他のスレッド処理が完了するまで待つ

以下は、Mainスレッドが5つのスレッドを作成し、5つのスレッドの処理が完了するまで待機する例です。

public class CountDownLatchExample {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(countDownLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        System.out.println("Waiting for some work to be finished (tid: "
                + Thread.currentThread().getId() + ")");

        countDownLatch.await();

        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch countDownLatch;

        public Worker(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
            countDownLatch.countDown();
        }
    }
}

実行してみると次のような結果が出力されます。 Mainスレッドは他のスレッドの処理がすべて完了するまで待って、最後に Finishedという名前のログを出力して終了しました。

Start multi threads (tid: 1)
Doing something (tid: 11)
Doing something (tid: 12)
Doing something (tid: 13)
Doing something (tid: 14)
Waiting for some work to be finished (tid: 1)
Doing something (tid: 15)
Finished (tid: 1)

上記のコードでは、次のコードがスレッドを作成して実行させるコードです。 Java8のStreamを利用して、スレッドを作成して実行しました。 また、それぞれのスレッドに countDownLatchを引数として渡されました。

CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream
        .generate(() -> new Thread(new Worker(countDownLatch)))
        .limit(5)
        .collect(toList());

workers.forEach(Thread::start);

以下は、Main threadで他のthreadの作業が終わるまで待ってコードです。 Latchの初期値を5に設定したのでMainスレッドは countDown()が5回呼び出されるまで継続待たされます。

countDownLatch.await();

以下は、Workerクラスです。それぞれのスレッドは、 Worker.run()を実行することになります。 run()が実行されると、ログを出力して countDown()を呼び出します。

public static class Worker implements Runnable {
    private CountDownLatch countDownLatch;

    public Worker(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
        countDownLatch.countDown();
    }
}

すべてのスレッドが準備されるまで待つ

上記の例では、先に実行されているスレッドは、最初にタスクを実行して終了しました。 もし何が同時に処理される場合は、以下のように実装することができます。

上記の例とほぼ同じですが、2つのLatchをより使用して、すべてのスレッドが準備されるまで待つことしました。

public class CountDownLatchExample2 {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch readyLatch = new CountDownLatch(5);
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch finishLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(readyLatch,
                                    startLatch, finishLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        readyLatch.await();

        System.out.println("Waited for ready and started doing some work (tid: "
                + Thread.currentThread().getId() + ")");
        startLatch.countDown();

        finishLatch.await();
        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch readyLatch;
        private CountDownLatch startLatch;
        private CountDownLatch finishLatch;

        public Worker(CountDownLatch readyLatch, CountDownLatch startLatch,
                      CountDownLatch finishLatch) {
            this.readyLatch = readyLatch;
            this.startLatch = startLatch;
            this.finishLatch = finishLatch;
        }

        @Override
        public void run() {
            readyLatch.countDown();
            try {
                startLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Do something (tid: "
                    + Thread.currentThread().getId() + ")");
            finishLatch.countDown();
        }
    }
}

次のように3つのLatchを作成し、 readyLatchstartLatchは、スレッドが準備されるまで待たさ作成するためのLatchです。

CountDownLatch readyLatch = new CountDownLatch(5);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(5);
List<Thread> workers = Stream
        .generate(() -> new Thread(new Worker(readyLatch, startLatch, finishLatch)))
        .limit(5)
        .collect(toList());

以下は、 Workerクラスのrun()です。それぞれのスレッドは準備ができたら、 readyLatch.countDown()で準備ができていることをMain threadに通知します。 そしてMain threadが startLatch.countDown()を呼び出すと、すべてのスレッドは、タスクを実行する開始します。

@Override
public void run() {
    readyLatch.countDown();
    try {
        startLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Do something (tid: " + Thread.currentThread().getId() + ")");
    finishLatch.countDown();
}

次のコードは、Main threadでスレッドを実行させて、準備ができたらLatchのタスクを実行するように信号を送信するコードです。

workers.forEach(Thread::start);

readyLatch.await();

System.out.println("Waited for ready and started doing some work (tid: "
        + Thread.currentThread().getId() + ")");
startLatch.countDown();

finishLatch.await();
System.out.println("Finished (tid: "
        + Thread.currentThread().getId() + ")");

実行結果は次のとおりです。

Start multi threads (tid: 1)
Waited for ready and started doing some work (tid: 1)
Do something (tid: 12)
Do something (tid: 14)
Do something (tid: 13)
Do something (tid: 11)
Do something (tid: 15)
Finished (tid: 1)

定められた時間だけ待つ(Timeout)

上記のコードの問題は、どのようなスレッドが処理を完了していなければ、 countDown()の呼び出しにならなくてMain threadが無限に待つことになります。

await()のTimeoutを設定すれば、決まった時間だけ待つようにすることができます。

次のようにTimeoutを設定することができます。次のコードは、Timeoutを5秒に設定し、5秒経過するまでLatchが0にされていない場合は、より待たずに次のコードを実行されます。 時間の単位は、MINUTESなどで変更することもできます。

await(5, TimeUnit.SECONDS)

ここでは、Timeoutを設定する例を示します。

public class CountDownLatchExample3 {

    public static void main(String args[]) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new Worker(countDownLatch)))
                .limit(5)
                .collect(toList());

        System.out.println("Start multi threads (tid: "
                + Thread.currentThread().getId() + ")");

        workers.forEach(Thread::start);

        System.out.println("Waiting for some work to be finished (tid: "
                + Thread.currentThread().getId() + ")");

        countDownLatch.await(5, TimeUnit.SECONDS);

        System.out.println("Finished (tid: "
                + Thread.currentThread().getId() + ")");
    }

    public static class Worker implements Runnable {
        private CountDownLatch countDownLatch;

        public Worker(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("Doing something (tid: " + Thread.currentThread().getId() + ")");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            System.out.println("Done (tid: " + Thread.currentThread().getId() + ")");
        }
    }
}

Timeoutは5秒に設定し、 Worker.run()で10秒を待っていました。

実行してみると次のように待ってあきらめて、次のコードを実行します。

Start multi threads (tid: 1)
Doing something (tid: 11)
Doing something (tid: 12)
Doing something (tid: 13)
Doing something (tid: 14)
Waiting for some work to be finished (tid: 1)
Doing something (tid: 15)
Finished (tid: 1)
Done (tid: 12)
Done (tid: 11)
Done (tid: 14)
Done (tid: 15)
Done (tid: 13)

この例の結果を見ると、10秒を過ぎて、他のスレッドの処理がすべて完了しました。 Timeoutが発生したときのスレッドについて何もしていないからです。 もしTimeoutが発生したときにすべてのスレッドを終了したい場合はMain、他のスレッドを終了するように作成されます。

codechachaCopyright ©2019 codechacha