文章目录
概述Executor框架简介Executor框架的两级调度模型Executor框架成员任务执行任务异步计算的结果
Executor框架的类与接口
使用Executor框架Executors 工厂方法newFixedThreadPool 固定大小的线程池newSingleThreadExecutor 单线程的线程池newCachedThreadPool 可缓存的线程池newSingleThreadScheduledExecutornewScheduledThreadPool 定时任务调度的线程池
实例newFixedThreadPool示例newSingleThreadExecutor示例newCachedThreadPool示例newSingleThreadScheduledExecutor示例newScheduledThreadPool示例
概述
在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java线程既是工作单元,也是执行单元。从JDK1.5开始,把工作单元与执行机制分离开来。工作单元包括Runnable 和 Callable,而执行机制由Executor框架提供。
Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架)。
Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
类关系图如下:
在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。
Executor框架简介
Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。 Java线程启动时会创建一个本地操作系统线程;当Java线程终止时,这个操作系统线程也会被回收。操作系统会调用所有线程并将他们分配给可用的CPU。
可以将此种模式分为两层
在上层,Java多线程程序通常把应用程序分解为若干任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程
在底层,操作系统内核将这些线程映射到硬件处理器上。
两级调度模型的示意图如下:
从图中可以看出,该框架用来控制应用程序的上层调度,下层调度由操作系统内核控制,不受应用程序控制.
Executor框架成员
任务
被执行任务需要实现的接口:Runnable接口和Callable接口
执行任务
任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。
Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExector.
异步计算的结果
Future和实现Future接口的FutureTask类。
Executor框架的类与接口
Executor是一个接口,Executor框架的基础,它将任务的提交与任务的执行分离。
Executors 线程池工厂类
AbstractExecutorService 执行框架抽象类。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
Future接口和它的实现FutureTask类,代表异步计算的结果。
Runnable和Callable接口的实现类,都可以被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor执行.
使用Executor框架
1。 主线程首先要创建实现 Runnable接口或者Callable接口的任务对象。Executors可以把一个Runnable对象封装为一个Callable对象,如下
Executors
.callable(Runnale task
);
或者
Executors
.callable(Runnable task
, Object result
);
2。 然后把Runnable对象直接交给ExecutorService执行
ExecutorService
.execute(Runnable command
);
或者也可以把Runnable对象或Callable对象提交给ExecutorService执行。 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable接口,我们也可以创建FutureTask类,然后直接交给ExecutorService执行。
ExecutorService
.submit(Runnable task
);
3。 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
Executors 工厂方法
JDK内部提供了五种最常见的线程池。由Executors类的五个静态工厂方法创建
newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool
newSingleThreadScheduledExecutor
newScheduledThreadPool
newFixedThreadPool 固定大小的线程池
我们来看下源码及注释
只有一个入参nThreads的静态方法
public static ExecutorService
newFixedThreadPool(int nThreads
) {
return new ThreadPoolExecutor(nThreads
, nThreads
,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>());
}
两个入参nThreads和threadFactory的静态方法
public static ExecutorService
newFixedThreadPool(int nThreads
, ThreadFactory threadFactory
) {
return new ThreadPoolExecutor(nThreads
, nThreads
,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>(),
threadFactory
);
}
每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
该方法返回一个包含指定数目线程的线程池,如果任务数量多于线程数目,那么没有没有执行的任务必须等待,直到有任务完成为止。
newSingleThreadExecutor 单线程的线程池
public static ExecutorService
newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService
newSingleThreadExecutor(ThreadFactory threadFactory
) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>(),
threadFactory
));
}
这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
返回单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
说明:LinkedBlockingQueue会无限的添加需要执行的Runnable。
newCachedThreadPool 可缓存的线程池
public static ExecutorService
newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer
.MAX_VALUE
,
60L
, TimeUnit
.SECONDS
,
new SynchronousQueue<Runnable>());
}
public static ExecutorService
newCachedThreadPool(ThreadFactory threadFactory
) {
return new ThreadPoolExecutor(0, Integer
.MAX_VALUE
,
60L
, TimeUnit
.SECONDS
,
new SynchronousQueue<Runnable>(),
threadFactory
);
}
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newCachedThreadPool方法创建的线程池可以自动的扩展线程池的容量。核心线程数量为0。
SynchronousQueue是个特殊的队列。 SynchronousQueue队列的容量为0。当试图为SynchronousQueue添加Runnable,则执行会失败。只有当一边从SynchronousQueue取数据,一边向SynchronousQueue添加数据才可以成功。SynchronousQueue仅仅起到数据交换的作用,并不保存线程。但newCachedThreadPool()方法没有线程上限。Runable添加到SynchronousQueue会被立刻取出。
根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于JVM能创建线程的数量,可能引起内存不足。
newSingleThreadScheduledExecutor
public static ScheduledExecutorService
newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService
newSingleThreadScheduledExecutor(ThreadFactory threadFactory
) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory
));
}
该线程池支持定时以及周期性执行任务的需求。
newScheduledThreadPool 定时任务调度的线程池
public static ScheduledExecutorService
newScheduledThreadPool(int corePoolSize
) {
return new ScheduledThreadPoolExecutor(corePoolSize
);
}
public static ScheduledExecutorService
newScheduledThreadPool(
int corePoolSize
, ThreadFactory threadFactory
) {
return new ScheduledThreadPoolExecutor(corePoolSize
, threadFactory
);
}
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
实例
newFixedThreadPool示例
package com
.xgj
.master
.java
.executor
.newFixedThreadPool
;
import java
.util
.concurrent
.Callable
;
import java
.util
.concurrent
.ExecutionException
;
import java
.util
.concurrent
.ExecutorService
;
import java
.util
.concurrent
.Executors
;
import java
.util
.concurrent
.Future
;
import org
.junit
.Test
;
public class NewFixedThreadPoolDemo {
@Test
public void test() {
ExecutorService fixPool
= Executors
.newFixedThreadPool(2);
Callable
<String> callable
= new Callable<String>() {
String result
= "Bussiness deals successfully";
@Override
public String
call() throws Exception
{
System
.out
.println("Callable is working");
Thread
.sleep(5 * 1000);
System
.out
.println("Callable some bussiness logic is here ");
return result
;
}
};
Runnable runnable
= new Runnable() {
@Override
public void run() {
try {
System
.out
.println("Runnable is working");
Thread
.sleep(5 * 1000);
System
.out
.println("Runnable some bussiness logic is here ");
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
};
Future
<String> callableFuture
= fixPool
.submit(callable
);
Future
<?> runnableFuture
= fixPool
.submit(runnable
);
if (callableFuture
.isDone()) {
System
.out
.println("\t\tCallable is done !");
} else {
System
.out
.println("\t\tCallable is not done !");
}
if (runnableFuture
.isDone()) {
System
.out
.println("\t\tRunnable is done !");
} else {
System
.out
.println("\t\tRunnable is not done !");
}
try {
String result
= callableFuture
.get();
System
.out
.println("CallableFuture的返回值为:" + result
);
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (ExecutionException e
) {
e
.printStackTrace();
}
fixPool
.shutdown();
System
.out
.println("fixPool shutdown");
}
}
运行结果:
Callable is not done
!
Callable is working
Runnable is not done
!
Runnable is working
Callable some bussiness logic is here
Runnable some bussiness logic is here
CallableFuture的返回值为
:Bussiness deals successfully
fixPool shutdown
newSingleThreadExecutor示例
package com
.xgj
.master
.java
.executor
.newSingleThreadExecutor
;
import java
.util
.concurrent
.Callable
;
import java
.util
.concurrent
.ExecutionException
;
import java
.util
.concurrent
.ExecutorService
;
import java
.util
.concurrent
.Executors
;
import java
.util
.concurrent
.Future
;
import java
.util
.concurrent
.TimeUnit
;
import org
.junit
.Test
;
public class NewSingleThreadExecutorDemo {
@Test
public void test() throws InterruptedException
, ExecutionException
{
ExecutorService newSingleThreadPool
= Executors
.newSingleThreadExecutor();
Future
<Integer> callableFuture
= newSingleThreadPool
.submit(new NewSingleThreadExecutorDemo().new CallableThread());
int callval
= callableFuture
.get();
System
.out
.println("Callable:" + callval
);
boolean isTerminated
= newSingleThreadPool
.isTerminated();
System
.out
.println("newSingleThreadPool isTerminated :" + isTerminated
);
newSingleThreadPool
.awaitTermination(10, TimeUnit
.SECONDS
);
newSingleThreadPool
.shutdownNow();
System
.out
.println("newSingleThreadPool shutdownNow ");
}
class CallableThread implements Callable<Integer> {
@Override
public Integer
call() throws Exception
{
int cnt
= 0;
for (; cnt
< 5; cnt
++) {
Thread
.sleep(5 * 1000);
System
.out
.println("call:" + cnt
);
}
return cnt
;
}
}
}
运行结果
call
:0
call
:1
call
:2
call
:3
call
:4
Callable
:5
newSingleThreadPool isTerminated
:false
newSingleThreadPool shutdownNow
newCachedThreadPool示例
package com
.xgj
.master
.java
.executor
.newCachedThreadPool
;
import java
.util
.concurrent
.Callable
;
import java
.util
.concurrent
.ExecutionException
;
import java
.util
.concurrent
.ExecutorService
;
import java
.util
.concurrent
.Executors
;
import java
.util
.concurrent
.Future
;
import org
.junit
.Test
;
public class NewCachedThreadPoolDemo {
@Test
public void test() {
ExecutorService cachedThreadPool
= Executors
.newCachedThreadPool();
Callable
<String> callable
= new Callable<String>() {
String message
= "Callable is done !";
@Override
public String
call() throws Exception
{
for (int i
= 0; i
< 10; i
++) {
System
.out
.println("Callable is doing something");
Thread
.sleep(500);
}
return message
;
}
};
Runnable runnable
= new Runnable() {
@Override
public void run() {
try {
for (int i
= 0; i
< 10; i
++) {
System
.out
.println("\tRunnable is doing something");
Thread
.sleep(1000);
}
} catch (Exception e
) {
}
}
};
Future
<String> callableFuture
= cachedThreadPool
.submit(callable
);
Future
<?> runnableFuture
= cachedThreadPool
.submit(runnable
);
if (callableFuture
.isDone()) {
System
.out
.println("\t\tCallable is done !");
} else {
System
.out
.println("\t\tCallable is not done !");
}
if (runnableFuture
.isDone()) {
System
.out
.println("\t\tRunnable is done !");
} else {
System
.out
.println("\t\tRunnable is not done !");
}
try {
String returnedValue
= callableFuture
.get();
System
.out
.println(returnedValue
);
} catch (InterruptedException e
) {
e
.printStackTrace();
} catch (ExecutionException e
) {
e
.printStackTrace();
}
cachedThreadPool
.shutdown();
}
}
运行结果
Callable is not done
!
Runnable is not done
!
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is doing something
Callable is doing something
Runnable is doing something
Callable is done
!
newSingleThreadScheduledExecutor示例
package com
.xgj
.master
.java
.executor
.newSingleThreadScheduledExecutor
;
import java
.util
.concurrent
.Executors
;
import java
.util
.concurrent
.ScheduledExecutorService
;
import java
.util
.concurrent
.ThreadFactory
;
import java
.util
.concurrent
.TimeUnit
;
public class NewSingleThreadScheduleExectorDemo {
private static String threadNamePrefix
= "XiaoGongJiang";
public static void main(String
[] args
) {
ScheduledExecutorService scheduleSingleThreadPool
= Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread
newThread(Runnable r
) {
return new Thread(r
, "Thread-" + threadNamePrefix
);
}
});
Runnable runnable
= new Runnable() {
@Override
public void run() {
try {
System
.out
.println("Begin");
for (int i
= 0; i
< 3; i
++) {
System
.out
.println("\tRunnable is doing something");
Thread
.sleep(1000);
}
} catch (Exception e
) {
System
.out
.println(e
.getMessage());
}
}
};
scheduleSingleThreadPool
.scheduleAtFixedRate(runnable
, 2, 5, TimeUnit
.SECONDS
);
}
}
运行结果:
Begin
Runnable is doing something
Runnable is doing something
Runnable is doing something
Begin
Runnable is doing something
Runnable is doing something
Runnable is doing something
Begin
........
等上个任务处理完成后,紧接着处理下一个,一直循环下去。
两个方法的区别:
scheduleAtFixedRate ,以固定的频率来执行某个任务。
scheduleWithFixedDealy, 相对固定的延迟后,执行某个任务。
newScheduledThreadPool示例
package com
.xgj
.master
.java
.executor
.newScheduledThreadPool
;
import java
.text
.DateFormat
;
import java
.util
.Date
;
import java
.util
.concurrent
.Executors
;
import java
.util
.concurrent
.ScheduledFuture
;
import java
.util
.concurrent
.ScheduledThreadPoolExecutor
;
import java
.util
.concurrent
.TimeUnit
;
public class NewScheduledThreadPoolDemo {
final static DateFormat fmt
= DateFormat
.getTimeInstance(DateFormat
.LONG
);
public static void main(String
[] args
) {
ScheduledThreadPoolExecutor sch
= (ScheduledThreadPoolExecutor
) Executors
.newScheduledThreadPool(5);
Runnable oneShotTask
= new Runnable() {
@Override
public void run() {
System
.out
.println("\t oneShotTask Execution Time: " + fmt
.format(new Date()));
}
};
Runnable delayTask
= new Runnable() {
@Override
public void run() {
try {
System
.out
.println("\t delayTask Execution Time: " + fmt
.format(new Date()));
Thread
.sleep(10 * 1000);
System
.out
.println("\t delayTask End Time: " + fmt
.format(new Date()));
} catch (Exception e
) {
}
}
};
Runnable periodicTask
= new Runnable() {
@Override
public void run() {
try {
System
.out
.println("\t periodicTask Execution Time: " + fmt
.format(new Date()));
Thread
.sleep(10 * 1000);
System
.out
.println("\t periodicTask End Time: " + fmt
.format(new Date()));
} catch (Exception e
) {
}
}
};
System
.out
.println("Submission Time: " + fmt
.format(new Date()));
ScheduledFuture
<?> periodicFuture
= sch
.scheduleAtFixedRate(periodicTask
, 5, 5, TimeUnit
.SECONDS
);
}
}
Submission Time
: 上午
12时
26分
27秒
periodicTask Execution Time
: 上午
12时
26分
32秒
periodicTask End Time
: 上午
12时
26分
42秒
periodicTask Execution Time
: 上午
12时
26分
42秒
periodicTask End Time
: 上午
12时
26分
52秒
periodicTask Execution Time
: 上午
12时
26分
52秒
periodicTask End Time
: 上午
12时
27分
02秒
periodicTask Execution Time
: 上午
12时
27分
02秒
.........
.........