7、并发编程基础-Concurrent.util常用类

xiaoxiao2021-02-28  33

目录

1.CyclicBarrier的使用 2.CountDownLacth的使用 3.Callable和Future的使用 4.Semaphore(计算信号量)的使用


1、CyclicBarrier的使用

场景还原:每个线程线程代表一个跑步运动员,当运动员都准备好后,才一起除非,只要有一个人没有准备好,大家都等待。这个种情况如何使代码实现呢?

这种情况就需要用到CyclicBarrier了,它可以把程序阻塞在一个地方进行等待,指定需要执行的任务达到CyclicBarrier设置的值,此时CyclicBarrier后面的代码才会被执行。

示列如下:

import java.io.IOException; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UseCyclicBarrier { private final static int PARTIES = 3; static class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(5)); System.out.println(name + " 准备OK."); //此段代码会直接等待,直到达到PARTIES设定的值, barrier.await()后面的代码才会被执行。 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } } public static void main(String[] args) throws IOException, InterruptedException { //定义CyclicBarrier CyclicBarrier barrier = new CyclicBarrier(PARTIES); //定义三个线程 ExecutorService executor = Executors.newFixedThreadPool(3); //定义三个运动员 executor.submit(new Thread(new Runner(barrier, "吕布"))); executor.submit(new Thread(new Runner(barrier, "赵云"))); executor.submit(new Thread(new Runner(barrier, "马超"))); executor.shutdown(); } }

2、CountDownLacth的使用

CountDownLacth经常用于监听某些初始化的操作,等初始化执行完毕后,通知主线程继续工作。

场景还原:一年级期末考试要开始了,监考老师发下去试卷,然后坐在讲台旁边玩着手机等待着学生答题,有的学生提前交了试卷,并约起打球了,等到最后一个学生交卷了,老师开始整理试卷,贴封条,下班,陪老婆孩子去了。

import java.util.concurrent.CountDownLatch; public class UseCountDownLatch { public static void main(String[] args) { //定义CountDownLatch,需要被到计时2次 final CountDownLatch countDown = new CountDownLatch(2); //定义一个老师T1 Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("T1—监考老师发下去试卷" + "等待学生交卷..."); countDown.await(); System.out.println("T1—监考老师在继续搞事情,坐在讲台旁边玩着手机等待着学生答题..."); } catch (InterruptedException e) { e.printStackTrace(); } } },"t1"); //定义第一学生T2 Thread t2 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("T2—有一个学生开始提前交了试卷..."); Thread.sleep(3000); System.out.println("T2-第一个学生交完卷了,T1-老师线程继续等待其他学生交卷..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //定义最后一个学生T3 Thread t3 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("T3—最后一个学生开始交试卷..."); Thread.sleep(4000); System.out.println("T3-最后一个学生交完卷了,T1-老师线程继续等待其他学生交卷..."); countDown.countDown(); System.out.println("T1老师发现所有学生已经交卷,老师开始整理试卷,贴封条,下班,陪老婆孩子去了"); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } }

注意区分 CyclicBarrier和CountDownLacth的确保。

CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。CountDownLatch,可以把他理解成倒计时锁。 CyclicBarrier是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。 CyclicBarrier:可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。

上面2个解释来自Zone的博文,我觉得他解释很到位,所以在此作了引用。

3、Callable和Future的使用

Future是一个设计模式,可以实现异步获取数据的。Future工作原理和自定义实现可参考我之前的一个示例。并发编程基础-多线程设计模式

Future模式是非常合适在处理耗时很长的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量。

使用concurrent工具类提供的 Future使用示例:

import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class UseFuture implements Callable<String>{ private String para; public UseFuture(String para){ this.para = para; } /** * 这里是真实的业务逻辑,其执行可能很慢 */ @Override public String call() throws Exception { //模拟执行耗时 Thread.sleep(5000); String result = this.para + "处理完成"; return result; } //主控制函数 public static void main(String[] args) throws Exception { String queryStr = "query"; //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类 FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr)); FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr)); //创建一个固定线程的线程池且线程数为1, ExecutorService executor = Executors.newFixedThreadPool(2); //这里提交任务future,则开启线程执行RealData的call()方法执行 //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值 Future f1 = executor.submit(future); //单独启动一个线程去执行的 Future f2 = executor.submit(future2); System.out.println("请求完毕"); try { //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑 System.out.println("处理实际的业务逻辑..."); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待 System.out.println("数据:" + future.get()); System.out.println("数据:" + future2.get()); executor.shutdown(); } }

4、Semaphore(计算信号量)的使用

官网API解释:计数信号量。从概念上讲,一个信号量维护一组允许。每个 acquire()块如果必要的许可证前,是可用的,然后把它。每个 release()添加许可,潜在收购方释放阻塞。然而,不使用实际允许的对象; Semaphore只是计数的数量和相应的行为。 信号量通常是用来限制线程的数量比可以访问一些(物理或逻辑)资源。

Semaphore非常适合高并发访问,新系统在上线之前,要对系统的访问进行评估,当然这值肯定不是随便拍拍脑袋就能想出来的,是经过以往的经验、数据历年的访问量,已经推广力度进行一个合理的评估,当然评估标准不能太大也不能太小,太大的话投入的资源达不到实际效果,纯属浪费资源,太小的话,某时间点一个高峰值的访问量上来直接可以压垮系统。

相关概念: PV(Page View) 网站的总访问量,页面浏览量或者点击量,用户没刷新一次就会被记录一次。

UV(Unique Visitor) 访问网站的一台电脑客户端为一个访客。一般来讲,时间上以00:00-24:00之内相同ip的客户只记录一次。

QPS (Query Per Second)即每秒查询数,qps很大程度上代表了业务系统上的繁忙程度,每次请求的背后,可能对应着多次磁盘I/O,多次网络请求,多个CPU时间片等。我们通过QPS可以非常直观的了解当前系统业务的情况,一旦当前QPS超过设定的预警值,可以考虑增加机器对集群扩容,以免压力过大造成宕机,可以根据前期的压力测试得以评估,再结合后期综合运维情况,估算出的阈值。

RT (Response Time)即请求的响应时间,这个指标非常关键,直接说明前端用户的体验,因此任何系统设计师都想降低RT时间。

当然还涉及cpu,内存,网络,磁盘等情况,细节问题很多。如select,updata,delete等数据层的操作。

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class UseSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); //模拟实际业务逻辑 Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println(semp.getQueueLength()); // 退出线程池 exec.shutdown(); } }
转载请注明原文地址: https://www.6miu.com/read-2630506.html

最新回复(0)