Java基础-2、并发

xiaoxiao2021-02-28  97

目录

 

synchronized

Volatile

信号量Semaphore

实现所有线程在等待某个事件的发生才去执行

CAS缺陷以及如何解决

AQS

死锁、活锁

线程池

线程池的种类

配置线程池大小

工作机制及原理

ThreadPoolExecutor类

ThreadLoad原理

LockSupport工具

Condition原理

Fork/Join框架的理解

分段锁(JUC)的原理、锁力度的减小思考

八种阻塞队列以及各个阻塞队列的特性


synchronized

class TestSynchronized{     Object lock = null;     public void method1(){         lock = new Object();         synchronized (lock){             System.out.println("----method1----");         }     }     public void method2(){         synchronized (this){             System.out.println("----method2----");         }     }     public synchronized void method3(){         System.out.println("----method3----");     }     public static synchronized void method4(){         System.out.println("----method4----");     } }

 

 

类别

Synchronized

Lock

存在形式

JAVA关键字,JVM

是一个类

锁的释放

获取锁的线程执行完成同步中的代码线程发生异常

在finally中必须释放锁,不然会造成死锁

获取锁

如果A线程获取了锁,那么B线程等待。如果A线程阻塞,那么B线程会一直等待

Lock有多种锁的获取方式,都可以尝试获得锁,线程可以不用一直等待

锁状态

无法判断

可判断

锁类型

悲观锁(可重入 不可中断 非公平)

乐观锁(可重入 可判断 可公平(两者皆可))

性能

少量同步

大量同步

 

偏向锁-》轻量级锁-》重量级锁(1.6后)

互斥锁ReentrantLock

 

Volatile

volatile:用来修饰被不同线程访问和修改的变量

volatile关键字与JVM内存模型相关

  Java语言是支持多线程的,为了解决线程并发的问题,在语言内部引入了同步块和volatile关键字机制

volatile关键字机制:

 synchronized修饰方法和代码块,以实现同步

 用volatile修饰的变量,线程在每次使用变量的时候,都会读取变量修改后的值。volatile经常被误用进行原子性的操作。但是这些操作并不是真正的原子性。在读取加载之后,如果变量发生了变化,线程工作内存中的值由于已加载,不会产生对应的变法。对于volatile修改的变量,JVM只是保证从内存加载到线程工作内存的值是最新的。

交互图:

public class VolatileTest {     //public static int count = 0; //实际运算每次结果都不一样     public static volatile int count = 0; //     public static void inc(){         //这里延迟1毫秒,使得结果明显         try {             Thread.sleep(1);         } catch (Exception e) {             // TODO: handle exception         }         count ++;     }          public static void main(String[] args) {         //同时启动1000个线程,去进行i++运算,看看实际结果         for (int i = 0; i < 1000; i++) {             new Thread(new Runnable() {                  @Override                 public void run() {                     VolatileTest.inc();                 }             }).start();         }                  System.out.println("运行结果:"+count);     } }

 

信号量Semaphore

       在多线程中,线程间传递信号的一种方式。

       与互斥量的区别

互斥量用于线程的互斥,信号量用于线程的同步互斥量值只能为0/1,信号量值可以为非负整数。信号量可以实现多个同类资源的多线程互斥和同步。互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。

class TestSemaphore{     Semaphore semaphore = new Semaphore(10);//控制10个共享资源的使用     public void use(){         try {             semaphore.acquire();         } catch (InterruptedException e) {             e.printStackTrace();         }     }     public  void  release(){         semaphore.release();     } }

 

实现所有线程在等待某个事件的发生才去执行

1.闭锁CountDownLatch

闭锁是典型的等待事件发生的同步工具类,将闭锁的初始值设置1,所有线程调用await方法等待,当事件发生时调用countDown将闭锁值减为0,则所有await等待闭锁的线程得以继续执行。

class TestCountDownLatch{     public static void main(String[] args){         int num = 2;         final CountDownLatch latch = new CountDownLatch(num);         for(;num> 0;num--){             new Thread(){                 public void run() {                     try {                         System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");                         Thread.sleep(3000);                         System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");                         latch.countDown();                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                 };             }.start();         }         try {             System.out.println("等待2个子线程执行完毕...");             latch.await();             System.out.println("2个子线程已经执行完毕");             System.out.println("继续执行主线程");         } catch (InterruptedException e) {             e.printStackTrace();         }     } }

 

2.阻塞队列BlockingQueue

所有等待事件的线程尝试从空的阻塞队列获取元素,将阻塞,当事件发生时,向阻塞队列中同时放入N个元素(N的值与等待的线程数相同),则所有等待的线程从阻塞队列中取出元素后得以继续执行。

 

3.信号量Semaphore

设置信号量的初始值为等待的线程数N,一开始将信号量申请完,让剩余的信号量为0,待事件发生时,同时释放N个占用的信号量,则等待信号量的所有线程将获取信号量得以继续执行。

class TestSemaphore{     public static void main(String[] args){         // 8位工人         int n = 8;         // 5台机器         Semaphore semaphore = new Semaphore(5);         for(int i=0;i<n;i++){             new Worker(i,semaphore).start();         }     }     static class Worker extends Thread{         private int num;         private Semaphore semaphore;         public Worker(int num,Semaphore semaphore){             this.num = num;             this.semaphore = semaphore;         }         @Override         public void run() {             try {                 semaphore.acquire();                 System.out.println("工人"+this.num+"占用一个机器在生产...");                 Thread.sleep(2000);                 System.out.println("工人"+this.num+"释放出机器");                 semaphore.release();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }

 

4.栅栏CyclicBarrier

设置栅栏的初始值为1,当事件发生时,调用barrier.wait()冲破设置的栅栏,将调用指定的Runable线程执行,在该线程中启动N个新的子线程执行。这个方法并不是让执行中的线程全部等待在某个点,待某一事件发生后继续执行。

class TestCyclicBarrier{     public static void main(String[] args) {         int N = 4;         CyclicBarrier barrier  = new CyclicBarrier(N);         for(int i=0;i<N;i++){             new Writer(barrier).start();         }     }     static class Writer extends Thread{         private CyclicBarrier cyclicBarrier;         public Writer(CyclicBarrier cyclicBarrier) {             this.cyclicBarrier = cyclicBarrier;         }         @Override         public void run() {             System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");             try {                 //以睡眠来模拟写入数据操作                 Thread.sleep(5000);                 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");                 cyclicBarrier.await();             } catch (InterruptedException e) {                 e.printStackTrace();             }catch(BrokenBarrierException e){                 e.printStackTrace();             }             System.out.println("所有线程写入完毕,继续处理其他任务...");         }     } }

 

CAS缺陷以及如何解决

参考地址:https://blog.csdn.net/jeffleo/article/details/75269618

CAS:Compare And Swap,即比较并交换。(AtomicInteger)

实现了Java多线程的并发操作。整个AQS同步组件、Atomic原子类操作等都是以CAS实现的,甚至ConcurrentHashMap在1.8的版本中也调整为了CAS+Synchronized。可以说CAS是整个JUC的基石。

 

缺陷:

主要表现在三个方法:

循环时间太长

只能保证一个共享变量原子操作

ABA问题:解决方案则是版本号

AQS

       参考资料:https://www.cnblogs.com/daydaynobug/p/6752837.html

     AQS:AbstractQueuedSynchronized, 抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch

       AQS里的state只有两种状态:0表示未锁定,1表示锁定

死锁、活锁

死锁:指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。4个产生条件:

互斥条件请求和保持条件不剥夺条件环路等待条件

活锁:指的是任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试—失败—尝试—失败的过程。处于活锁的实体是在不断的改变状态,活锁有可能自行解开。

线程池

资源地址:https://blog.csdn.net/wxq544483342/article/details/53118674  

线程池的种类

1、创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

         ExecutorService pool = Executors.newCachedThreadPool();

2、创建容量为1的缓冲池

        ExecutorService pool = Executors.newSingleThreadExecutor();

3、创建固定容量大小的缓冲池

ExecutorService pool = Executors.newFixedThreadPool(int);

4、创建一个定长线程池,支持定时及周期性任务执行。

         ExecutorService pool = Executors.newScheduledThreadPool(1);

5、创建一个拥有多个任务队列(以便减少连接数)的线程池。

         ExecutorService pool = Executors.newWorkStealingPool(); ExecutorService类的方法: void shutdown();          关闭线程池 不可以再 submit 新的 task,已经 submit 的将继续执行          List<Runnable> shutdownNow();          关闭线程池 试图停止当前正在执行的 task,并返回尚未执行的 task 的 list          boolean isShutdown();          是否已经关闭了线程池,当调用shutdown()或shutdownNow()方法后返回为true。           boolean isTerminated();          当调用shutdown()方法后,并且所有提交的任务完成后返回为true;         当调用shutdownNow()方法后,成功停止后返回为true;          boolean awaitTermination(long timeout, TimeUnit unit) throws         InterruptedException;          <T> Future<T> submit(Callable<T> task);          <T> Future<T> submit(Runnable task, T result);          Future<?> submit(Runnable task);          <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)          throws InterruptedException;          <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                               long timeout, TimeUnit unit)          throws InterruptedException;          <T> T invokeAny(Collection<? extends Callable<T>> tasks)          throws InterruptedException, ExecutionException;          <T> T invokeAny(Collection<? extends Callable<T>> tasks,                 long timeout, TimeUnit unit)          throws InterruptedException, ExecutionException, TimeoutException;

配置线程池大小

CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1IO密集型任务,参考值可以设置为2*NCPU

工作机制及原理

线程池的两个核心队列:

线程等待池,即线程队列BlockingQueue任务处理池(PoolWorker),即正在工作的Thread列表(HashSet)

线程池的核心参数:

核心池大小(corePoolSize),即固定大小,设定好之后,线程池的稳定峰值,达到这个值之后池的线程数大小不会释放。最大处理线程池数(maximumPoolSize),当线程池里面的线程数超过corePoolSize,小于maximumPoolSize时会动态创建与回收线池里面的线程池资源。

ThreadPoolExecutor类

构造方法:

/**  *  * @param corePoolSize  核心池大小 默认情况下,在创建好线程池之后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,  *                      当线程池中的线程数量达到corePoolSize后,就会把这些任务放到缓存队列中  * @param maximumPoolSize 线程池最大线程数量,表示在线程池中最多能创建线程的数量;在corePoolSize和maximumPoolSize的线程数会被自动释放,而小于corePoolSize的则不会。  * @param keepAliveTime 表示线程没有执行任务时最多保持多久时间会终止。  *                      默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会生效,直到线程池数量不大于corePoolSize,  *                      即只有当线程池数量大于corePoolSize数量,超出这个数量的线程一旦到达keepAliveTime就会终止。  *                      但是如果调用了allowCoreThreadTimeout(boolean)方法,即使线程池的线程数量不大于corePoolSize,线程也会在keepAliveTime之后就终止,知道线程池的数量为0为止。  * @param unit 参数keepAliveTime的时间单位,一个时间单位枚举类。 Nanos/Micros/Millis/Seconds/Minutes/Hours/Days  * @param workQueue 一个阻塞队列,用来存储等待执行任务的队列,这个参数选择也很重要,会对线程池的运行过程产生重大影响。  *                  一般来说,这里的阻塞队列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue)。  * @param threadFactory 线程工厂,主要用来创建线程;可以是一个自定义的线程工厂,默认就是Executors.defaultThreadFactory()。用来在线程池中创建线程。  * @param handler 表示当拒绝处理任务时的策略,也是可以自定义的,默认是我们前面的4种取值:  *                  ThreadPoolExecutor.AbortPolicy(默认的,一言不合即抛异常的)  *                  ThreadPoolExecutor.DiscardPolicy(一言不合就丢弃任务)  *                  ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任务给抛弃,然后执行当前任务)  *                  ThreadPoolExecutor.CallerRunsPolicy(由调用者所在线程来执行任务)  */

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

                             BlockingQueue<Runnable> workQueue, public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

                             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {

        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

 

ThreadLoad原理

class TestThreadLocal{     private static TestThreadLocal testThreadLocal = null;     private static ThreadLocal<TestThreadLocal> map = new ThreadLocal<TestThreadLocal>();     private TestThreadLocal(){}     public static synchronized TestThreadLocal getInstance1(){         if(testThreadLocal==null){             testThreadLocal = new TestThreadLocal();         }         return testThreadLocal;     }     public static TestThreadLocal getInstance2(){ //效率大于getInstance1()         testThreadLocal = map.get();         if(testThreadLocal==null){             testThreadLocal = new TestThreadLocal();             map.set(testThreadLocal);         }         return testThreadLocal;     } }

 

LockSupport工具

LockSupport提供了park和unpark方法实现阻塞和解除阻塞,都是基于“许可(permit)”作为关联。permit相当于一个信号量(0,1),默认是0。

使用偏移量来获取对象。是因为线程已经被阻塞了,如果不通过内存的方式,直接调用线程内的方法,线程是不会响应的。

park()、unpark()和wait()、notify()的区别

面向的主体不同。LockSupport是针对Thread进行阻塞,可以指定阻塞的对象,每次可以唤醒指定具体的线程。Wait()是以对象,阻塞当前的线程和唤醒单个线程或者所有线程。实现的机制不同。LockSupport可以指定monitor的object对象,但和object.wait()两者的阻塞队列并不交叉。

static void testLockSupport(){     // 调用native方法阻塞当前线程     LockSupport.park();     // 阻塞当前线程,最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回     LockSupport.parkNanos(1);     // 阻塞当前线程,知道deadline时间(deadline - 毫秒数)     LockSupport.parkUntil(1);     Object blocker = new Object();     //1) 记录当前线程等待的对象(阻塞对象);     //2) 阻塞当前线程;     //3) 当前线程等待对象置为null。     LockSupport.park(blocker);     // 阻塞当前线程,最长等待时间不超过nanos毫秒,同样,在阻塞当前线程的时候做了记录当前线程等待的对象操作     LockSupport.parkNanos(blocker,1);     // 阻塞当前线程直到deadline时间,相同的,也做了阻塞前记录当前线程等待对象的操作     LockSupport.parkUntil(blocker,1);     // 唤醒处于阻塞状态的线程Thread     LockSupport.unpark(new Thread());}

 

Condition原理

在经典的生产者和消费者模式中,可以使用Object.wait()和Object. Notify()阻塞和唤醒线程,但这样处理只有一个等待队列。在可重入锁ReentrantLock中,使用AQS的condition可以实现设置多个等待队列,可以使用lock.newCondition生成一个等待队列。

static void testCondition() throws InterruptedException {     // 需要结合lock使用     Lock lock = new ReentrantLock();     final Condition notEmpty = lock.newCondition();     // 将当前线程阻塞,调用await()之前必须先获取锁,     // 调用await()时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程     notEmpty.await();     notEmpty.await(1, TimeUnit.MINUTES);     notEmpty.awaitNanos(1);     notEmpty.awaitUninterruptibly();     notEmpty.awaitUntil(new Date());     // 另一个线程将已经阻塞的线程唤醒     // 其他线程调用signal()时也必须获取锁,     // 当执行signal()方法时将等待队列的节点移入到同步队列,     // 当线程退出临界区释放锁时,唤醒同步队列的首个节点     notEmpty.signal();     notEmpty.signalAll();}

Fork/Join框架的理解

并行流:把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。(fork/join框架)

串行流:则相反

Fork/Join框架:

在必要的情况下,将一个大的任务进行拆分(fork)成若干个子任务(拆到临界值),再将一个个任务的结果进行join汇总。

Fork/Join与线程池的区别:

  Fork/Join采用“工作窃取模式”,当执行新的任务时可以将其拆分成更小的任务执行,并将小任务加到线程队列中。然后再随机从一个线程中偷一个任务并把他加入到自己的队列中。

例如:CPU上有两个线程A/B,A已经执行完了,B还有任务未执行。这时A将B队尾的任务偷过来,加入到自己的队列中。相对于传统的线程池,Fork/Join更有效的使用了CPU资源。

// ForkJoin这个框架的实现需要继承RecursiveTask 或者 RecursiveAction, // RecursiveTask是有返回值的,相反RecursiveAction则没有@Datapublic class TestForkJoinWork extends RecursiveTask<Long> {     private Long start;     private Long end;     public static final Long  CRITICAL = 10000L;     public TestForkJoinWork(Long start , Long end){         this.start = start;         this.end   = end;     }     @Override     protected Long compute() {         Long length = end - start;         if (length <= CRITICAL){             Long sum = 0L;             for (Long i = start;i<=end;i++){                 sum += i;             }             return sum;         }else{             Long middle = (start+end) / 2;             TestForkJoinWork right = new TestForkJoinWork(start,middle);             right.fork(); // 拆分 压入线程队列             TestForkJoinWork left = new TestForkJoinWork(middle+1,end);             left.fork(); // 拆分 压入线程队列             return right.join() + left.join();         }     } }class TestForkJoinWorkDemo{      void test1(){         //Fork/Join实现         long l = System.currentTimeMillis();         ForkJoinPool forkJoinPool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持         TestForkJoinWork task = new TestForkJoinWork(0L,10000000000L);//参数为起始值与结束值         Long invoke = forkJoinPool.invoke(task);         long l1 = System.currentTimeMillis();         System.out.println("invoke = " + invoke+"  time: " + (l1-l));     }     void test2(){          // 普通线程完成         Long x = 0L;         Long y = 10000000000L;         long l = System.currentTimeMillis();         for (Long i = 0L; i <= y; i++) {             x += i;         }         long l1 = System.currentTimeMillis();         System.out.println("invoke = " + x+"  time: " + (l1-l));     }     void test3(){          // 并行流         long l = System.currentTimeMillis();         //parallel()与sequential()在并行流与串行流中随意切换         long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum);         long l1 = System.currentTimeMillis();         System.out.println("invoke = " + reduce+"  time: " + (l1-l));     }     public static void main(String[] args){          TestForkJoinWorkDemo demo = new TestForkJoinWorkDemo();          demo.test1();          demo.test2();          demo.test3();         /**          * invoke = -5340232216128654848  time: 40213          * invoke = -5340232216128654848  time: 52510          * invoke = -5340232216128654848  time: 963          */     } }

 

分段锁(JUC)的原理、锁力度的减小思考

容器中有多把锁,每一把锁用于锁容器中的一部分数据。当多线程时访问容器中的不同数据段的数据时,线程间就不会发生锁竞争了。因而有效的提高了高并发的访问效率。例如ConcurrentHashMap使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也可被其他线程访问。ConcurrentHashMap中使用了一个包含16个锁的数组,每个锁保护所在的散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。合理的使用散列算法来使关键字均匀分布,那么可使对锁的请求减少到原来的1/16。这样可使ConcurrentHashMap的并发达到16个线程。

独占锁:只有一把锁,每次只能一个线程能访问。例如HashTable。采用串行方式,会降低其伸缩性。一般有三种方式降低锁的竞争:

减少锁的持有时间降低锁的请求频率使用带有协调机制的独占锁,这些机制允许更高的并发性。

八种阻塞队列以及各个阻塞队列的特性

public class TestQueue {     public static void main(String[] args) throws InterruptedException {         Object obj = new Object();         boolean flag;         Object val;         // 由数组结构组成的有界阻塞队列         BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);         // 队未满时,返回true,队满时则抛出异常IllegalStateException         flag = arrayBlockingQueue.add(obj);         // 队未满时,返回true,队满时返回false。非阻塞立即返回         arrayBlockingQueue.offer(obj);         // 队未满时直接插入无返回值,队满时阻塞等待,一直等到队列未满时再插入         arrayBlockingQueue.put(obj);         // 队列不为空时,返回队首值并移除。队为空时抛出异常NoSuchElementException         val = arrayBlockingQueue.remove();         // 队列不为空时,返回队首值并移除。队为空时返回null。非阻塞立即返回         val = arrayBlockingQueue.poll();         // 设定等待的时间,如果在指定时间内队列为空则返回null,不为空则返回队首值         val =  arrayBlockingQueue.poll(1,TimeUnit.MINUTES);         // 队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。         val = arrayBlockingQueue.take();         flag = arrayBlockingQueue.contains(obj);         // 由链表结构组成的有界阻塞队列         BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10);         // 支持优先级别排序的无界阻塞队列 默认大小为11         BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();         // 使用有限级队列实现的无界阻塞队列 PriorityQueue         BlockingQueue delayQueue = new DelayQueue();         // 不存储元素的阻塞队列         BlockingQueue synchronousQueue = new SynchronousQueue();         // 由链表结构组成的无界阻塞队列         BlockingQueue linkedTransferQueue = new LinkedTransferQueue();         // 由链表结构组成的双阻塞队列         BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque();     }

 

 

 

转载请注明原文地址: https://www.6miu.com/read-40785.html

最新回复(0)