JUC 并发通信工具类
CountDownLatch
CountDownLatch 用来控制一个或者等待多个线程。
内部通过维护计数器,调用 countDown()方法会让计数器减 1,减到 0 的时候,调用 await()在等待的线程就会被唤醒
java
public class CountdownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int num = 10;
CountDownLatch countDownLatch = new CountDownLatch(num);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(() -> {
System.out.print("executor ");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
service.shutdown();
}
}
输出如下:
text
executor executor executor executor executor executor executor executor executor executor end
CyclicBarrier
CyclicBarrier ⽤来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执⾏。也是通过计数器来实现,线程调用 await()方法计数器减 1,并等待,直到计数器为 0,所有调用 await()方法在等待的线程才能继续执行
java
public class CyclicBarrierDemo {
public static void main(String[] args) {
int num = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.print("begin ");
try {
cyclicBarrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
System.out.print ("end ");
});
}
pool.shutdown();
}
}
输出如下:
text
begin begin begin begin begin begin begin begin begin begin end end end end end end end end end end
Semaphone
Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。可以用来进行限流控制
java
public class SemaphoneDemo {
public static void main(String[] args) {
int maxCount = 3;
int total = 10;
Semaphore semaphore = new Semaphore(maxCount);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < total; i++) {
pool.execute(() -> {
try {
semaphore.acquire();
System.out.println("当前可用信号量" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
pool.shutdown();
}
}
输出
text
当前可用信号量1
当前可用信号量2
当前可用信号量0
当前可用信号量0
当前可用信号量1
当前可用信号量0
当前可用信号量0
当前可用信号量0
当前可用信号量0
当前可用信号量0
Exchanger
Exchanger 类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。
java
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
输出如下:
text
这个时候线程A是阻塞的,在等待线程B的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据