Многопоточность в Java. Лекция 3: блокировки и классы синхронизации потоков

1 июня
Владимир Фролов, Java-разработчик, Никита Сизинцев, Android-разработчик
Многопоточность в Java. Лекция 3: блокировки и классы синхронизации потоков

Темную силу чувствую я. 
Даешь парсек за три года. 

В первой лекции нашего курса мы поделились общими сведениями о многопоточности, во второй рассмотрели основы многопоточных программ. Теперь речь пойдет о блокировках и прочих методах синхронизации потоков.

3.1 ReentrantLock

С появлением Java 1.5 был добавлен пакет java.util.concurrent, содержащий инструменты для работы в многопоточной среде, включая блокировки и коллекции. 

В пакете locks находятся высокоуровневые блокировки, использование которых более удобно по сравнению с ключевым словом synchronized. Базовый интерфейс блокировок — java.util.concurrent.locks.Lock. Методы этого интерфейса:

  • void lock() — захватывает блокировку, если она доступна. Если блокировка занята другим потоком, текущий поток, который выполняет этот код, переходит в статус BLOCKED;
  • void lockInterruptibly() — делает то же, что и метод lock(), но позволяет прервать блокированный поток и восстановить выполнение через InterruptedException;
  • boolean tryLock() — неблокирующий вариант метода lock(). Если удалось получить блокировку, то метод возвращает true;
  • boolean tryLock(long timeout, TimeUnit timeUnit) — то же, что tryLock(), за исключением того, что метод ждет определенное время, перед тем остановить попытку получения блокировки; 
  • void unlock() — отпускает блокировку.

Захват и освобождение монитора — два независимых метода:  lock.lock() и lock.unlock(). Вспомним, что стандартная блокировка на основе ключевого слова synchronized — синтаксическая конструкция, т. е. за ее правильное использования отвечает компилятор:

Листинг 1:

synchronized(synchronizedObj) {
//some code that should be synchronized
}

Например, вот так: synchronized(synchronizedObj); написать не получится. 

Ключевое слово synchronized можно использовать вот так:

Листинг 2:

Runnable r = new MyRunnable() { () ->
    System.out.println(“Hello!”);
}

Thread t = new Thread(r);

Для запуска потока необходимо использовать метод Thread.start().  Если вызвать метод run(), то он выполнится в вызывающем потоке:

Листинг 2:

Object a = new Object();
Object b = new Object();
Object c = new Object();
synchronized(a) {
    synchronized(b) {
        synchronized(c) {
        }
    }
}

Блокировка c не знает, что была захвачена блокировка b, соответственно, блокировка b не знает, что была захвачена блокировка a. Отпустить блокировку a внутри блока синхронизации блокировки b или с не представляется возможным. Блокировна, основанная на ReentrantLock, такую возможность предоставляет, потому что методы захвата и освобождения блокировки это отдельные методы. Класс ReentrantLock возлагает на программиста корректность захвата и отпускания блокировки.  Записать это можно, например так: 

Листинг 3:

rLock.lock();
rLock.lock();
rLock.unlock();
rLock.unlock();

Если отпустить блокировку на один раз больше, т. е. сделать rLock.unlock(); еще раз, получим IllegalMonitorStateException. В Листинге 4 приведен правильный способ захвата и отпускание блокировки:

Листинг 4:

private final Lock R_LOCK = ReentrantLock();
R_LOCK.lock();
try {
    //this code will be executed in synchronization section
} finally {
    R_LOCK.unlock();
}

Такие возможности Lock могут пригодиться, когда производится работа, например, с древовидной структурой данных из нескольких потоков. Если будет производится блокировка корневого узла, при этом будут изменяться какие-то дочерние узлы, производительность всей системы будет низкой, потому что в каждый конкретный момент времени с деревом будет работать только один поток. Однако каждый узел дерева можно сопоставить с отдельной блокировкой и использовать ту, поддерево которой подлежит изменению. Остальные блокировки, расположенные выше по дереву, можно отпустить, чтобы другие потоки могли их захватить, и изменять другую часть дерева. 

graph

Рис 1: Древовидная структура данных.

3.2 Честная блокировка и ReentrantLock

Одна из реализаций интерфейса Lock — класс ReenterantLock. Он позволяет одному и тому же потоку вызывать метод lock, даже если он его вызывал ранее, без освобождения блокировки.

 У класса ReentrantLock, кроме методов интерфейса Lock, есть фабричный метод newCondition(). Этот метод возвращает объект Condition, который позволяет добавить текущий поток в wait set данного объекта Condition. Это дает возможность организовывать разные условия ожидания по одной и той же блокировке, чего не позволяют ключевое слово synchronized и связки методов wait()/notify(). Для того чтоб объект попал в wait set для данного Condition объекта, нужно вызвать метод await(). Чтобы разбудить поток или потоки, которые есть в wait set, необходимо вызвать методы signal() или signalAll(). Эти методы аналогичны методам wait(), notify() и notifyAll() у объекта Object. Поскольку методы wait(), notify() и notifyAll() в объекте Object — final, методам для объекта Condition придумали другие наименования. 

Следует также заметить, что у объекта Condition есть свои методы wait() notify() и notifyAll(), однако настоятельно не рекомендуется использовать методы от объекта Object и методы от объекта Condition совместно. Методы await(), signal() и signalAll() необходимо вызывать, только если захватили соответствующую блокировку ReentrantLock. Каждый объект Condition «знает», от какого объекта блокировки был порожден объект. В связке ReentrantLock и Condition для каждого объекта Condition есть свой wait set, однако blocked set общий для всех объектов Condition. 

Wait set и blocked set — множества (sets) , а не массив и не очередь, и если потоков в множестве больше одного, есть вероятность, что какой-то поток будет находиться в нем бесконечно долго и никогда не будет разблокирован. Возможна ситуация, когда поток, который ожидает дольше всего, разблокирован не будет вовсе. 

Это демонстрирует программа в Листинге 5. В нем три потока в бесконечном цикле пытаются захватить блокировку ref бесконечное количество раз. Реализация synchronized допускает сценарий, в котором поток А будет все время владеть блокировкой, а остальные потоки: B и C, не будут успевать ее захватить. Это доказывает что wait set и blocked set у synchronized — не честные, т. е. являются неупорядоченными коллекциями. Мы видим конкуренцию, три потока постоянно пытаются захватить блокировку. Захватив блокировку, поработав и выйдя, поток может опять захватить блокировку. Остальные два могут продолжать ожидание. Реализация synchronized допускает такую ситуацию. Чтобы блокировка была честной, нужно использовать класс ReentrantLock, передав в конструктор этого класса параметр true. 

Листинг 5:

public class AppFair {
    public static void main(String[] agrs) {
        final Object ref = new Object();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“A”);
                    }
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“B”);
                    }
                }
            }
        }).start();
            new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    synchronized(ref) {
                        System.out.println(“C”);
                    }
                }
            }
        }).start();
    }
}

Если в конструктор ReentrantLock передать параметр true, получается честная блокировка. Она превращает blocked set в упорядоченную очередь.

Листинг 6:

public class AppFair2 {
    public static void main(String[] agrs) {
        final Lock ref = new ReentrantLock(true);
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“A”);
                    ref.unlock();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“B”);
                    ref.unlock();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    ref.lock();
                    System.out.println(“C”);
                    ref.unlock();
                }
            }
        }).start();
    }
}

3.3 ReentrantReadWriteLock

ReentrantReadWriteLock — уже не монитор. Говорят, что это read/write блокировка или shared/exclusive блокировка. У объекта ReadWriteLock можно получить отдельно блокировку на чтение и на запись. Если несколько раз вызвать метод readLock(), получим ссылку на один и тот же объект. Такая же ситуация — с методом writeLock(). Программа, которая демонстрирует, что readLock и writeLock остаются одними и теми же объектами при последующих вызовах:

Листинг 7:

ReadWriteLock lock = new ReentrantReadWriteLock();
Lock rLock1 = lock.readLock();
Lock rLock2 = lock.readLock();
Lock wLock1 = lock.writeLock();
Lock wLock2 = lock.writeLock();
System.out.println(rLock1 == rLock2);
System.out.println(wLock1 == wLock2);

У объекта write lock может быть любое количество объектов condition. В write lock может заходить только один поток. 

ReadLock может захватить несколько потоков одновременно. Если один поток получил readLock, а затем другой поток пытается получить writeLock, блокировку на запись он не получит и будет заблокирован, т. е. попадет в blocked set. Если один поток захватил блокировку на запись, а другой поток пытается сделать то же самое, второй поток будет заблокирован.Если блокировка на запись захвачена, а другой поток также хочет захватить блокировку на чтение, этот поток будет заблокирован. 

Потоки, которые захватили блокировку для чтения, не могут  выполняться одновременно с потоком, который захватил блокировку на запись. 

Рис 2: Несколько читающих потоков и заблокированный поток записи

Когда захвачено несколько блокировок на чтение, а другой поток пытается захватить блокировку на запись, после чего опять происходит чтение,  преимущество отдается потоку на запись.

Рис 3: Заблокированные читающие потоки

В ReentrantReadWriteLock есть два режима: справедливый и несправедливый. В несправедливом режиме порядок получения блокировок не определён, в справедливом — реализована очередь потоков. Когда текущая блокировка отпущена, одиночный поток записи, ожидающий дольше других, получит доступ к блокировке. Или же блокировку получит группа потоков-читателей, которая ждёт дольше чем поток писатель. 

Есть еще и возможность понижения блокировки в статусе. Можно получить блокировку записи и понизить ее до блокировки чтения, отпустив блокировку записи. Однако обратная операция невозможна.

3.4 StampedLock

Этот тип блокировки появился в Java 8. Это попытка реализовать оптимистическую блокировку, которая не блокирует текущий поток при чтении данных. Также  StampedLock  — попытка сделать версионированную структуру данных. Эта блокировка может быть и оптимистической (получается блокировка на чтение, которая не блокирует текущий поток), и пессимистической (если не удается получить блокировку, поток, который читает данные, блокируется). При получении блокировки возвращается штамп — число типа long. Это значение используется при отпускании блокировки и для проверки того, является ли полученная ранее блокировка всё ещё валидной. Также блокировку можно рассматривать как номер текущей версии данных, которую эта блокировка защищает. 

StampedLock — не reentrant, поэтому следует опасаться повторных захватов блокировки, они приведут к deadlock`у. Простейший пример использования блокировки приведен в Листинге 8. 

Листинг 8:

Runnable r = new Runnable() {
    @Override
    public void run() {
        long stamp = lock.writeLock(); 
        try { 
            sleep(1); 
            map.put("foo", "bar"); 
        } finally { 
            lock.unlockWrite(stamp); 
        } 
    }
};
Runnable r2 = new Runnable() {
    @Override
    public void run() {
        long stamp = lock.readLock(); 
        try { 
            System.out.println(map.get("foo")); 
            sleep(1); 
        } finally { 
            lock.unlockRead(stamp); 
        }
    }
};

Этот класс также позволяет конвертировать блокировку на чтение в блокировку на запись. Оптимистичная блокировка для чтения, вызываемая с помощью метода tryOptimisticRead(), отличается тем, что она всегда будет возвращать штамп, не блокируя текущий поток. Вне зависимости от того, занят ли ресурс, к которому она обратилась. Если ресурс был заблокирован блокировкой для записи и другой поток пытается получить блокировку для чтения, возвращенный штамп будет равен нулю. В любой момент можно проверить, является ли блокировка валидной с помощью lock.validate(stamp). Если ресурс был заблокирован блокировкой на запись, а другой поток пытается получить не оптимистическую блокировку на чтение, поток будет заблокирован. 

Пример использования оптимистической блокировки:

Листинг 9:

public class OptimisticLock {
    public static void main(String[] args) {
        Runnable r1 = new Runnable() {
            @Overrride
            public void run() {
    long stamp = lock.tryOptimisticRead(); 
    try { 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(1);
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(2); 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                } finally { 
                    lock.unlock(stamp); 
    }
            }
        }
    }
}
public class OptimisticExample {
    public static void main(String[] args) {
        Runnable r1 = new Runnable() {
            @Override
            public void run() {
    long stamp = lock.tryOptimisticRead(); 
    try { 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(1);
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
                    sleep(2); 
                    System.out.println("Optimistic Lock Valid: " + lock.validate(stamp)); 
    } finally { 
        lock.unlock(stamp); 
    } 
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
    long stamp = lock.writeLock(); 
    try { 
                    System.out.println("Write Lock acquired"); 
                    sleep(2); 
    } finally { 
                    lock.unlock(stamp); 
                    System.out.println("Write done"); 
    } 
            }
        };
        
        new Thread(r1).start();
        new Thread(r2).start();
    }
}

В листинге 9 в потоке r1 захватывается оптимистическая блокировка, проверяется ее валидность и вызывается метод Thread.sleep(), например, для имитации некой деятельности. В это время в потоке r2 получается блокировка на запись, не дожидаясь, когда освободится блокировка на чтение. 

После получения блокировки на запись, блокировка на чтение перестает быть валидной, даже после окончания записи. Таким образом, при использовании оптимистичных блокировок вам НУЖНО постоянно следить за их валидностью (проверять ее нужно уже после того, как выполнены все необходимые операции).

Еще один пример с использованием оптимистической блокировки:

Листинг 10:

double distanceFromOriginV1() { // A read-only method
     long stamp;
     if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
       double currentX = x;
       double currentY = y;
       if (sl.validate(stamp))
         return Math.sqrt(currentX * currentX + currentY * currentY);
     }
     stamp = sl.readLock(); // fall back to read lock
     try {
       double currentX = x;
       double currentY = y;
         return Math.sqrt(currentX * currentX + currentY * currentY);
     } finally {
       sl.unlockRead(stamp);
     }
}

Листинг 11 — пример чтения с использованием оптимистической блокировки:

Листинг 11:

double readValues() {
    long stamp;
    if ((stamp = sl.tryOptimisticRead()) != 0L) { 
        double currentX = //reading value from some shared object
        double currentY = //reading value from some shared object
        if (sl.validate(stamp)) {
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    }
    return <default value>;
}

В коде, представленном в Листинге 11, захватывается оптимистическая блокировка и читаются значения в локальные переменные. Обратите внимание, что чтение должно происходить вплоть до примитивных типов или неизменяемых объектов и чтение должно происходить в конструкции кода tryOptimisticLock()/validate(stamp). Внутри этого блока опасно сохранять ссылку на изменяемый объект и после конструкции validate(stamp) использовать изменяемые поля этого объекта. Необходимо добраться до необходимых вам в дальнейшем примитивных полей, неизменяемых объектов или создать копию изменяемого. Это необходимо, чтобы избежать изменения данных после выхода из блокировки и вызова метода validate. 

Более того, не рекомендуется внутри блока tryOptimisticLock()/validate(stamp) делать необратимые изменения, если validate(stamp) не пройдет. Собственно, лучше вообще ничего, кроме чтения, в этом блоке не делать. Блок tryOptimisticLock()/validate(stamp) — способ получить непротиворечивое представление данных, а все операции с этими данными лучше выполнить после, когда успешный validate(stamp) даст вам право считать, что полученные данные действительно не противоречивы.

3.5 CowntDownLatch

CowntDownLatch — защелка с обратным отсчетом. Позволяет задать значение счетчика, а когда его значение будет равно нулю, заблокированные потоки на этой блокировке будут одновременно запущены. В конструктор CountDownLatch передаётся число. Поток, который использует CountDownLatch, может уменьшить это число и заблокироваться. Для уменьшения числа в счетчике вызывается метод countDown(). После вызова этого метода поток продолжает свое выполнение. Для того чтоб заблокировать поток, необходимо вызвать метод await().

Рассмотрим программу, которая моделирует гонку машин. Сначала каждая машина подъезжает к стартовой линии, затем все они получают команды: «на старт!», «внимание!», «марш!». Каждая машина представлена потоком. Предположим, что каждая машина имеет постоянную скорость, и пока она продолжает гонку, поток на определенное время засыпает. Скорость машины выбирается произвольно. Сначала все машины подъезжают к старту. Это проверяется циклом while. Как только все машины подъехали, дается три команды, счетчик в CountDownLatch становится равным 0, и все 5 потоков запускаются. Какой-то поток выполняется быстрее, другой —  медленнее, затем программа завершает свое выполнение. Метод await() аналогичен методу wait() в классе Object.

Листинг 12:

public class Race {
    //Создаем CountDownLatch на 8 «условий»
    private static final CountDownLatch START = new CountDownLatch(8);
    //Условная длина гоночной трассы
    private static final int TRACK_LENGTH = 500000;
    public static void main(String[] args) throws InterruptedException {
        int carSpeed = 0;
        for (int i = 1; i <= 5; i++) {
            carSpeed = (int) (Math.random() * 100 + 50);
            new Thread(new Car(i, carSpeed)).start();
            Thread.sleep(1000);
        }
        while (START.getCount() > 3) { //Проверяем, собрались ли все автомобили
            Thread.sleep(100);              //у стартовой прямой. Если нет, ждем 100ms
        }
        Thread.sleep(1000);
        System.out.println(«На старт!»);
        START.countDown();//Команда дана, уменьшаем счетчик на 1
        Thread.sleep(1000);
        System.out.println(«Внимание!»);
        START.countDown();//Команда дана, уменьшаем счетчик на 1
        Thread.sleep(1000);
        System.out.println(«Марш!»);
        START.countDown();//Команда дана, уменьшаем счетчик на 1
        //счетчик становится равным нулю, и все ожидающие потоки
        //одновременно разблокируются
    }
    public static class Car implements Runnable {
        private int carNumber;
        private int carSpeed; //считаем, что скорость автомобиля постоянная
        public Car(int carNumber, int carSpeed) {
            this.carNumber = carNumber;
            this.carSpeed = carSpeed;
        }
        @Override
        public void run() {
            try {
                System.out.printf("Автомобиль №%d подъехал к стартовой прямой.\n", carNumber);
                //Автомобиль подъехал к стартовой прямой — условие выполнено
                //уменьшаем счетчик на 1
                START.countDown();
                //метод await() блокирует поток, вызвавший его, до тех пор пока
                //счетчик CountDownLatch не станет равен 0
                START.await();
                Thread.sleep(TRACK_LENGTH / carSpeed);//ждем пока автомобиль   проедет трассу
                System.out.printf("Автомобиль №%d финишировал!\n", carNumber);
            } catch (InterruptedException e) {
            }
        }
    }
}

3.6 CyclicBarrier

Циклический барьер очень похож на CountDownLatch, однако у него есть несколько отличий:

  • методы countDown() и await() объединены в один — await(), после вызова которого поток блокируется, если число не равно нулю; 
  • класс CyclicBarrier можно использовать повторно. Как только значение становится равным нулю, это значение восстанавливается, и объект класса можно использовать заново;
  • как только значение счетчика стало равным нулю, есть возможность выполнить дополнительный runnable, который может быть передан в конструктор CyclicBarrier.

Рис 4: Принцип работы CyclicBarrier

3.7 Semaphore

Примитив синхронизации позволяет определенному количеству потоков исполнять критическую секцию кода, которая защищена семафором. Когда создается семафор, в конструктор передается количество пропусков. Во время захвата семафора количество допустимых пропусков уменьшается. Когда семафор освобождается, количество пропусков увеличивается. Также в методе захвата семафора можно указать, какое количество пропусков возьмет поток. В методе освобождения семафора можно указать количество пропусков, которое будет возвращено: это количество не может быть больше, чем захваченное количество пропусков. 

Когда вторым параметром в конструктор передается true, потоки, которые ожидают получение пропуска, т. е. хотят войти в критическую секцию, выстраиваются в очередь. Т. е. вместо blocking set используется очередь по типу FIFO. 

Листинг 13: 

class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    // Not a particularly efficient data structure; just for demo
    protected Object[] items = new Object[MAX_AVAILABLE];
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }
    public void putItem(Object x) {
        if (markAsUnused(x)) {
            available.release();
        }
    }
    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }
    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else {
                    return false;
                }
            }
        }
        return false;
    }
}

3.8 Exchanger

Exchanger  — точка синхронизации, которая позволяет двум потокам обмениваться значениями. При создании экземпляра этого объекта указывается тип объекта, которым будут обмениваться потоки.  У объекта этого класса есть метод exchange, принимающий значение которое надо передать другому потоку. Метод возвращает значение, которые другой поток передал текущему. Когда поток вызывает метод exchange(), а другой поток не готов к обмену значениями, текущий поток переходит в состояние WAITING. Отметим, что этот класс стоит использовать для обмена значениями между двумя потоками. Если один Exchanger используется только одним потоком или количество потоков-пользователей больше двух, то какой-то поток будет в состоянии WAITING бесконечно долго, пока не остановится программа. Если первый поток подготовил значение, и метод exchange выполняет второй поток, то второй поток после выполнения метода не блокируется. Также у класса есть перегруженный метод exchange, который принимает время, которое поток будет находится в состоянии WAITING. По истечении этого времени генерируется проверяемое исключение TimeOutException.

3.9 Phaser

Phaser — самый сложный синхронизатор, который, тем не менее, обладает самым большим функционалом. Phaser похож на CountDownLatch, он позволяет синхронизировать несколько потоков, причем их количество может меняться в разных фазах. Phaser — блокировка, которую можно переиспользовать. Работа Phaser состоит из выполнения фаз — выполнив очередную фазу, поток вызывает у Phaser метод arriveAndAwaitAdvance(). Работу Phaser можно представить как цепочку заданий или фаз, в каждой из которых может принимать участие разное количество потоков. 

Чтобы текущий поток участвовал в определенной фазе вычислений, он регистрируется вызовом метода register() у объекта Phaser. После того как один поток закончит свои вычисления и захочет подождать остальные потоки, он вызывает метод arriveAndAwaitAdvance() у блокировки. Это блокирующий метод. Поток продолжит свое выполнение, когда количество потоков, вызвавших метод arriveAndAwaitAdvance(), станет равно количеству зарегистрированных потоков, которые вызвали метод register(). Если поток не хочет участвовать в следующей фазе выполнения с использованием Phaser, следует вызвать метод arriveAndDeregister(). Если поток выполнил свои вычисления, но не хочет ожидать, пока другие потоки завершат этап своих вычислений, поток должен вызвать метод arrive(). При переходе на следующий этап выполняется логика рис 5: 

Рис 5. Код, который выполняется, когда Phaser переходит на следующий этап выполнения

Максимальное количество потоков, которые могут быть синхронизированы с использованием блокировки Phaser, 65635. Также можно объединять несколько блокировок Phaser в отношение предок-потомок. На этом синхронизаторе возможно реализовать CountDownLatch и CyclicBarrier.

CountDownLatch — аналог метода countDown():

Листинг 14:

phaser.arriveAndDeregister();
аналог метода await():

Листинг 15:

    if (!phaser.isTerminated()) {
        phaser.awaitAdvance(phaser.getPhase());
    }

Реализация класса CyclicBarrier — просто вызов метод arriveAndAwaitAdvance().

3.10 Процессы при нормальном завершении потока

Рассмотрим, как работает метод join(). Этот метод был изначально в классе Thread, который появился еще в Java 1. Следовательно, можно сделать вывод, что внутри метод join использует методы wait и notify. Это действительно так: если зайти в метод join, можно увидеть, что выполняется вот такой код (код с ожиданием периода времени не приведен):

Листинг 16:

if (millis == 0) {

    while (isAlive()) {
        wait(0);
    }
}

Если выполнилась строка кода t.join(), поток, который вызвал метод join на другом потоке t, попадает в wait set потока t. Возникает закономерный вопрос: если в Java 1 не было никаких дополнительных средств, как поток переводился из состояния wait в состояние blocked, а затем и в runnable? Чтобы разобраться, рассмотрим следующую программу:

Листинг 17:

public class TestNotify {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new ParentThread());
        t.start();
        t.join();
        System.out.println("ExceptionHandlerExample thread finished");
    }
}
class ParentThread implements Runnable {
    @Override
    public void run() {
        final int numberOfChildrenThreads = 5;
        CountDownLatch latch = new CountDownLatch(numberOfChildrenThreads);
        for (int i = 0; i < numberOfChildrenThreads; ++i) {
            Thread thread = new Thread(new ChildThread(currentThread, latch, i));
            thread.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (Thread.currentThread()) {
        }
        System.out.println("Joined thread was finished");
    }
}
class ChildThread implements Runnable {
   private Thread parentThread;
   private CountDownLatch latch;
   private int n = 0;
   public ChildThread(Thread parentThread, CountDownLatch latch, int n) {
       this.parentThread = parentThread;
       this.latch = latch;
       this.n = n;
   }
   @Override
   public void run() {
       Thread currentTread = Thread.currentThread();
       currentTread.setName("Name = " + n);
       synchronized (parentThread) {
           latch.countDown();
           try {
               parentThread.wait();
               System.out.println("Thread" + currentTread.getName() +  "was notified");
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

В Листинге 17 создаётся поток parent thread, и к нему “присоединяется” главный поток. Затем в wait set потока parent thread добавляется пять дочерних потоков. Конструкция в Листинге 18

Листинг 18:

synchronized (Thread.currentThread()) {
}

захватывает блокировку на потоке parent thread. Это производится на тот случай, если планировщик потоков решит переключить контекст с последнего пятого потока после выполнения строки latch.countDown() на поток parent thread. Тогда поток parentThread может завершиться раньше, чем пятый дочерний поток. 

Если использовать конструкцию, приведенную в Листинге 18, такой ситуации не случится, потому что блокировка parent thread будет захвачена последним потоком. Затем поток parent thread завершает свое выполнение, что подтверждает вывод строки “Joined thread was finished”. После этого показываются записи о том, что дочерние потоки были переведены из состояния waiting в состояние blocked, а затем runnable. Т. е. для потока parent thread вызвался метод notifyAll(). И это виртуальная Java-машина производит самостоятельно под капотом. Так же при завершении потока переменная isAlive для потока устанавливается в false. Из примера в Листинге 17 можно сделать вывод, что когда выполнение кода потока проходит закрывающую фигурную скобку, поток еще выполняет дополнительные действия по корректной остановке потока.