自定义Java线程池

xiaoxiao2021-02-28  104

一、引言

      我们先简述一下Java线程池到底是什么。线程池,其实就是封装线程的一个容器,里面有一组线程,说成池比较直观一点。为什么要这样做呢?众所周知,线程的创建和调度都是需要消耗内存和CPU的。如果用普通线程来处理任务的时候,一般是一条线程处理一个任务。而在线程池,线程数量是有限的,任务可以暂时的放到队列中,等待线程来完成它们。       在学习线程池之前我们先来明晰一个很简单的概念,Thread在创建(new)的时候创建,但此时并没有CPU调度权限;直到调用Thread的start()方法时,才开始有权进行CPU调度。

二、分析Java线程池构造函数

先看看一段来自JDK的代码:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

我们来简单分析一下入参:

corePoolSize:核心线程数量,当所有核心线程都参与工作,队列又未满时,才将任务放进队列中。maximumPoolSize:最大线程数,当队列满后,而且核心线程又没空的时候,线程池又要开始创建线程了,最终线程数目不能超过maximumPoolSize。如果任务还需要更多线程来处理,则执行相应的策略。keepAliveTime:线程存活时间,也就是队列隔多长时间没有任务后终止线程,退出程序。unit:线程存活时间单位。workQueue:工作队列,用于存储需要执行的任务。

      到这里,我们已经对线程池的主要参数和要实现的功能有了基本了解,接下来,我们介绍一下我们自定义的线程池。为了方便起见,我们将keepAliveTime直接默认为毫秒单位,在创建线程池的时候先把核心线程创建出来,并且调度(在JDK的线程池中是在执行任务的时候才动态创建的),直接用ArrayBlockingQueue作为存储任务的队列(JDK中是通过过构造函数传进来)。其实从上面的改动可以看出,线程池的核心元素无非就是那几个。

三、一步步自定义线程池

1、构造函数

public MyThreadPool(int min, int max, int queueNum, long time){ this.max = max; this.queueNum = queueNum; queue = new ArrayBlockingQueue<Runnable>(queueNum); threads = new MyThread[min]; this.time = time; for(int i = 0; i<min; i++){ threads[i] = new MyThread(); threads[i].start(); } } min:最小线程数,也就是核心线程数,在线程池创建之后就一直存在。当然这样做性能有一定的消耗,但是这样模拟不影响线程池的核心思想。max:最大线程数,在任务队列满后如果还有任务进来,就要继续创建线程处理,但是最后线程数不能超过max。queueNum:队列容量。time:当无可处理的任务时线程的最长存活时间。

      在新建线程池的时候,我们会先初始化一些全局变量,同时创建出核心线程并调度方法。大家可能已经看出来了,我们需要自定义线程来处理任务,接下来我们介绍一下MyThread,它是线程池的内部类。

2、自定义的任务处理线程

class MyThread extends Thread{ private boolean stop = false; @Override public void run() { Runnable r = null; //如果线程不停止,则一直循环 while (!stop) { //队列空的时候,先等time毫秒,如果还没任务就停止线程 while (!stop && queue.isEmpty()) { try { Thread.sleep(time); if(queue.isEmpty()){ stopThread(); } } catch (InterruptedException e) { e.printStackTrace(); } } if (!queue.isEmpty()) { r = queue.poll(); } if(r != null){ r.run(); } r = null; } } /** * 设置停止标记为true */ public void stopThread(){ stop = true; } }

      线程在调度之后就一直轮询,直到调用了stopThread()方法才停止。轮询期间主要是从任务队列中取出任务,然后执行。在这期间,当队列空的时候,先等time毫秒,如果还没任务就停止线程。在这里,我们从队列中取出任务为什么不加锁呢?这个在队列的poll()方法里面已经帮我们做了。说了那么多,任务到底是怎样进到队列里的呢?接下来,让我们看一下它的execute()方法,这个方法用来将要执行的任务传进来。

3、任务执行

public void execute(Runnable task) throws Exception { //当线程池被清理后,直接返回 if (threads == null) return; //如果队列任务数少于容量,就往队列添加任务 if(queue.size() < queueNum){ queue.add(task); } //如果队列任务数已经封顶,就再创建执行线程 if (queue.size() >= queueNum && threads.length < max){ resize(); } //如果队列任务数和线程数都已经封顶,则清理线程池并抛异常 if (queue.size() >= queueNum && threads.length >= max){ queue.clear(); clear(); throw new Exception("任务太多了"); } }

      在实际应用中,我们除了创建线程池就是执行execute()方法了。任务通过execute()方法传进来,并添加到队列中,针对队列容量和线程数量的不同,有不同的处理策略,如上面代码注释所述。       但是很奇怪,这个方法只是简单地将任务传进来,又是怎样执行任务的呢?其实很简单,这个方法将任务传进来其实都是放到队列中的,只是针对队列的任务数不同,采取不同数量的线程去调度队列里面的任务而已。在每个线程的run()方法中都会轮询任务队列,那才是任务的真正调度时机。       理论上讲,线程池是有容量限制的,无论是线程数量还是队列任务数都有,一旦超过这个容量,都有相应的策略去处理,在这个例子中的策略是抛异常并且清理线程池,那线程池又是怎样清理的呢?接下来我们来分析一下。

4、清理线程池

public void clear(){ //当队列中还有任务的时候,要等它执行完 while (!queue.isEmpty()){ try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } //一个个地停止线程 for (MyThread th : threads) { th.stopThread(); th = null; } //将线程数组置为null,让JVM进行回收。并且清理队列 threads = null; queue.clear(); }

      这个方法主要分为三步,第一步是在队列中还有任务的时候,轮训等待任务执行完;第二步是停止线程;第三步是回收资源。       细心的读者可能发现,我们在队列爆满之后会继续创建线程,那么那些线程也需要用一个容器装起来维护吖(就像核心线程一样,我们可以手动停止并清理它们),要怎么做呢?其实很简单,只要用一个数组将他们连同前面的线程装起来就好,简而言之就是扩容。

5、扩容

public void resize(){ MyThread[] temp = threads; //新建数组,容量比以前多1 threads = new MyThread[temp.length + 1]; //将以前的线程放到新数组中 for (int i = 0; i<temp.length; i++) { threads[i] = temp[i]; } //将新建的一条线程放到新数组中 threads[temp.length] = new MyThread(); threads[temp.length].start(); temp = null; }

      在扩容方法中,我们先用一个临时的数组装旧的线程,接下来新建一个容量比以前多1的线程数组,用原来的threads指向它,然后从临时线程数组将旧的线程取出来装到新建数组中,最后将新建的一条线程也放到新建数组中。好了,说了那么多可能糊里糊涂的,接下来我们来做一下总结。

四、总结

package com.thread.threadPool; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; /** * Created by 程序猿 on 2017/6/7 0007. */ public class MyThreadPool { private int max; private int queueNum; private Queue<Runnable> queue; MyThread[] threads = null; private Runnable task; private long time; /** * 初始化 * @param min * @param max * @param queueNum */ public MyThreadPool(int min, int max, int queueNum, long time){ this.max = max; this.queueNum = queueNum; queue = new ArrayBlockingQueue<Runnable>(queueNum); threads = new MyThread[min]; this.time = time; for(int i = 0; i<min; i++){ threads[i] = new MyThread(); threads[i].start(); } } public void execute(Runnable task) throws Exception { //当线程池被清理后,直接返回 if (threads == null) return; //如果队列任务数少于容量,就往队列添加任务 if(queue.size() < queueNum){ queue.add(task); } //如果队列任务数已经封顶,就再创建执行线程 if (queue.size() >= queueNum && threads.length < max){ resize(); } //如果队列任务数和线程数都已经封顶,则清理线程池并抛异常 if (queue.size() >= queueNum && threads.length >= max){ queue.clear(); clear(); throw new Exception("任务太多了"); } } public void clear(){ //当队列中还有任务的时候,要等它执行完 while (!queue.isEmpty()){ try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } //一个个地停止线程 for (MyThread th : threads) { th.stopThread(); th = null; } //将线程数组置为null,让JVM进行回收。并且清理队列 threads = null; queue.clear(); } public void resize(){ MyThread[] temp = threads; //新建数组,容量比以前多1 threads = new MyThread[temp.length + 1]; //将以前的线程放到新数组中 for (int i = 0; i<temp.length; i++) { threads[i] = temp[i]; } //将新建的一条线程放到新数组中 threads[temp.length] = new MyThread(); threads[temp.length].start(); temp = null; } class MyThread extends Thread{ private boolean stop = false; @Override public void run() { Runnable r = null; //如果线程不停止,则一直循环 while (!stop) { //队列空的时候,先等time毫秒,如果还没任务就停止线程 while (!stop && queue.isEmpty()) { try { Thread.sleep(time); if(queue.isEmpty()){ stopThread(); } } catch (InterruptedException e) { e.printStackTrace(); } } if (!queue.isEmpty()) { r = queue.poll(); } if(r != null){ r.run(); } r = null; } } /** * 设置停止标记为true */ public void stopThread(){ stop = true; } } }

      由于以上介绍中的代码太散,所以我将总体代码放到总结这里来了,方便阅读。在线程池中,我们要注意的无非两点,一是核心参数有哪些?二是任务怎样去调度。核心参数主要有4个:核心线程数、最大线程数、队列、无任务时等待时间。而线程调度主要分为两步:往队列里添加任务并且在线程的run()方法中不断轮询执行队列里的任务。

转载请注明原文地址: https://www.6miu.com/read-49344.html

最新回复(0)