Skip to content

JUC 并发通信工具类

CountDownLatch

CountDownLatch 用来控制一个或者等待多个线程。

内部通过维护计数器,调用 countDown()方法会让计数器减 1,减到 0 的时候,调用 await()在等待的线程就会被唤醒

java
public class CountdownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch countDownLatch = new CountDownLatch(num);
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            service.execute(() -> {
                System.out.print("executor  ");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        service.shutdown();
    }
}

输出如下:

text
executor  executor  executor  executor  executor  executor  executor  executor  executor  executor  end

CyclicBarrier

CyclicBarrier ⽤来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执⾏。也是通过计数器来实现,线程调用 await()方法计数器减 1,并等待,直到计数器为 0,所有调用 await()方法在等待的线程才能继续执行

java
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        int num = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            pool.execute(() -> {
                System.out.print("begin  ");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print ("end  ");
            });
        }
        pool.shutdown();
    }
}

输出如下:

text
begin  begin  begin  begin  begin  begin  begin  begin  begin  begin  end  end  end  end  end  end  end  end  end  end

Semaphone

Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。可以用来进行限流控制

java
public class SemaphoneDemo {

    public static void main(String[] args) {
        int maxCount = 3;
        int total = 10;

        Semaphore semaphore = new Semaphore(maxCount);

        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < total; i++) {
            pool.execute(() -> {
                try {
                    semaphore.acquire();
                    System.out.println("当前可用信号量" + semaphore.availablePermits());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        pool.shutdown();
    }
}

输出

text
当前可用信号量1
当前可用信号量2
当前可用信号量0
当前可用信号量0
当前可用信号量1
当前可用信号量0
当前可用信号量0
当前可用信号量0
当前可用信号量0
当前可用信号量0

Exchanger

Exchanger 类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。

java
public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");

        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

    }
}

输出如下:

text
这个时候线程A是阻塞的,在等待线程B的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据

Released under the MIT License.