Tomcat线程池目前出现过三种,第一个是5.0的线程池模型,这个线程池目前在6.x的版本还存在,主要是用于AJP的,
第二个是5.5.x时代使用了一仲线程池
第三个是6.x版本的线程池,实际上这个不是tomcat的线程池了,tomcat使用的是JDK的线程池ThreadPoolExecutor
首先来分析一下最老的版本5.0使用的线程池
线程池只有一个类,ThreadPool,它里面包含了两个内部类一个接口
静态内部类 ControlRunnable,这是用来运行工作线程的
静态内部类 MonitorRunnable,这是监控线程,用来回收空闲的线程
接口 ThreadPoolListener,这个很奇怪,我全文搜索了,只有ThreadPool的一些方法使用了这个接口,但是它却没有实现累
首先来看看监控类 MonitorRunnable,它是通过ControlRunnable类来启动的,它的run()方法如下:
public void
run() {
while
(
true
) {
try {
// Sleep for a while.
synchronized
(
this
) {
this
.wait(interval);
}
// Check if should terminate.
// termination happens when the pool is shutting down.
if
(shouldTerminate) {
break
;
}
// Harvest idle threads.
p.checkSpareControllers();
}
catch
(
Throwable
t) {
ThreadPool
.log.error("
Unexpected exception
", t);
}
}
}
interval在创建的时候就已经指定是,是60 * 1000,也就是会睡眠60秒,
这个类最主要的工作是
每过60秒检查一下是否有空闲的线程,就是空闲线程数 是否大于 最大空闲线程数,如果是的话,就回收一些线程,保证最多空闲的线程数=最大空闲线程数
checkSpareControllers()内容如下:
protected synchronized void
checkSpareControllers() {
if
(stopThePool) {
return
;
}
if
((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
int
toFree = currentThreadCount - currentThreadsBusy -maxSpareThreads;
for
(
int
i = 0 ; i < toFree ; i++) {
ControlRunnable
c = pool[currentThreadCount - currentThreadsBusy - 1];
c.terminate();
pool[currentThreadCount - currentThreadsBusy - 1] =
null
;
currentThreadCount --;
}
}
}
再来看看运行工作线程 ControlRunnable,它的run()方法如下:
public void
run() {
boolean
_shouldRun =
false
;
boolean
_shouldTerminate =
false
;
ThreadPoolRunnable
_toRun =
null
;
try {
while
(
true
) {
try
{
/* Wait for work. */
synchronized
(
this
) {
while
(!shouldRun && !shouldTerminate) {
this
.wait();
}
_shouldRun = shouldRun;
_shouldTerminate = shouldTerminate;
_toRun = toRun;
}
if
(_shouldTerminate) {
if
(
ThreadPool
.log.isDebugEnabled())
ThreadPool
.log.debug("
Terminate
");
break
;
}
/* Check if should execute a runnable. */
try
{
if
(noThData) {
if
(_toRun !=
null
) {
Object
thData[] = _toRun.getInitData();
t.setThreadData(p, thData);
if
(
ThreadPool
.log.isDebugEnabled())
ThreadPool.log.debug("
Getting new thread data
");
}
noThData =
false
;
}
if
(_shouldRun) {
if
(_toRun !=
null
) {
_toRun.runIt(t.getThreadData(p));
}
else if
(toRunRunnable !=
null
) {
toRunRunnable.run();
}
else
{
if
(
ThreadPool
.log.isDebugEnabled())
ThreadPool
.log.debug("
No toRun ???
");
}
}
}
catch
(
Throwable
t) {
ThreadPool
.log.error(sm.getString
("
threadpool.thread_error
", t, toRun.toString()));
/*
* The runnable throw an exception (can be even a ThreadDeath),
* signalling that the thread die.
*
* The meaning is that we should release the thread from
* the pool.
*/
_shouldTerminate =
true
;
_shouldRun =
false
;
p.notifyThreadEnd(
this
);
}
finally
{
if
(_shouldRun) {
shouldRun =
false
;
/*
* Notify the pool that the thread is now idle.
*/
p.returnController(
this
);
}
}
/*
* Check if should terminate.
* termination happens when the pool is shutting down.
*/
if
(_shouldTerminate) {
break
;
}
}
catch
(
InterruptedException
ie) {
/* for the wait operation */
// can never happen, since we don't call interrupt
ThreadPool
.log.error("
Unexpected exception
", ie);
}
}
}
finally
{
p.removeThread(
Thread
.currentThread());
}
}
这里有一个ThreadPoolRunnable,这是一个接口,里面有一个runIt()方法,可以把它当做Runnable来看,凡是实现了这个接口的类,工作线程都会负责调用这个类的runIt()执行。
ControlRunnable#run()方法的最开始是让这个线程wait(),因为一开始是没有数据的所以就一直等待,直到执行了ControlRunnable#runIt()方法之后,就会唤醒一个等待的线程:
public synchronized void
runIt(
ThreadPoolRunnable
toRun) {
this
.toRun = toRun;
// Do not re-init, the whole idea is to run init only once per
// thread - the pool is supposed to run a single task, that is
// initialized once.
// noThData = true;
shouldRun =
true
;
this.notify();
}
当调用了这个方法之后,ControlRunnable#run()就会继续执行下去,之后会做一个初始化数据的工作,也就是if (noThData) 那个判断,工作线程里每个运行的线程实际上扩展了Thread,用的是ThreadWithAttribute,它可以将一些数据绑定在当前线程上,所以这个初始化数据是从ThreadPoolRunnable的实现类上取出数据,绑定到当前线程上。
初始化完成之后就开始真正运行了,如果当前的类继承自Runnalbe或者ThreadPoolRunnable都会被工作线程调用,当工作线程执行完后会将shouldTerminate标记设置为false,这样当下次循环的时候就会继续wait()了。最后将自己返还给工作线程数组,然后notify(),这是唤醒其他线程,因为可能因为当前的线程数超过了最大线程了,被迫等待,所以这里需要有一个唤醒机制告诉工作线程数组,有空闲的线程可以调用了。
如果发生错误了就退出循环,当退出循环后会将自己从线程map中删除,注意是线程map,不是工作线程数组,所以线程池中目前的线程数不会减少,只有等到监控线程检查完后才有可能减少。
当执行ThreadPool#runIt()方法时,会将一个ThreadPoolRunnable的实现类交给线程池运行,线程池首先需要从数组中找出一个线程,然后调用运行这个线程。
runIt()方法如下:
public void
runIt(
ThreadPoolRunnable
r) {
if
(
null
== r) {
throw new
NullPointerException
();
}
ControlRunnable
c = findControlRunnable();
c.runIt(r);
}
调用ControlRunnable#runIt(),这样就将ThreadPoolRunnable的实现类交给了线程池,然后会唤醒一个线程。注意这里有一步很重要,在ControlRunnable#runIt()里,这是一个同步方法,执行了语句:
this
.toRun = toRun;
这样它拿到的就是成员变量toRun,因为另一个线程赋值的时候是讲ThreadPoolRunnable的实现赋给了ThreadPool的成员变量toRun,所以线程池数组里每一个正在等待的ControlRunnable都能拿到这个值,还记得happend before原则吗,这个赋值和获取值都是同步的,所以肯定没有问题,任意唤醒的一个线程在它的同步块中会拿到这个toRun:
synchronized
(
this
) {
while
(!shouldRun && !shouldTerminate) {
this
.wait();
}
_shouldRun = shouldRun;
_shouldTerminate = shouldTerminate;
_toRun = toRun;
}
如果不是赋给ThreadPool的成员变量,而是随便赋给一个线程池成员ControlRunnable,那就有问题了,比如赋给的是A,但是唤醒是随意的,可能唤醒了B,这样B就没有toRun的值导致无法运行,而由了这个同步的赋值,同步的取值就没问题了。
下面看一下findControlRunnable()方法:
private
ControlRunnable
findControlRunnable() {
ControlRunnable
c=
null
;
if
( stopThePool ) {
throw new
IllegalStateException
();
}
// Obtain a free thread from the pool.
synchronized
(
this
) {
while
(currentThreadsBusy == currentThreadCount) {
// All threads are busy
if
(currentThreadCount < maxThreads) {
// Not all threads were open,
// Open new threads up to the max number of idel threads
int
toOpen = currentThreadCount + minSpareThreads;
openThreads(toOpen);
}
else
{
logFull(log, currentThreadCount, maxThreads);
// Wait for a thread to become idel.
try
{
this
.wait();
}
// was just catch Throwable -- but no other
// exceptions can be thrown by wait, right?
// So we catch and ignore this one, since
// it'll never actually happen, since nowhere
// do we say pool.interrupt().
catch
(
InterruptedException
e) {
log.error("
Unexpected exception
", e);
}
if
( log.isDebugEnabled() ) {
log.debug("
Finished waiting: CTC=
"+currentThreadCount +"
, CTB=
" + currentThreadsBusy);
}
// Pool was stopped. Get away of the pool.
if
( stopThePool) {
break
;
}
}
}
// Pool was stopped. Get away of the pool.
if
(0 == currentThreadCount || stopThePool) {
throw new
IllegalStateException
();
}
// If we are here it means that there is a free thread. Take it.
int
pos = currentThreadCount - currentThreadsBusy - 1;
c = pool[pos];
pool[pos] = null;
currentThreadsBusy++;
}
return
c;
}
这段代码的意思是坚持当前繁忙线程如果==当前线程,就是说所有的线程都在忙着执行那么会新开启一些线程,但如果当前的现在已经等于最大线程数了,那么就wait(),等有线程执行完了会notify()的。如果满足条件就新创建一些线程,最后将线程数组中的pos位置设置为null,表示这个位置的工作线程已经被取走了,然后返回这个工作线程ControlRunnable。
再来看看它的openThreads():
protected void
openThreads(
int
toOpen) {
if
(toOpen > maxThreads) {
toOpen = maxThreads;
}
for
(
int
i = currentThreadCount ; i < toOpen ; i++) {
pool[i - currentThreadsBusy] =
new
ControlRunnable(this);
}
currentThreadCount = toOpen;
}
这里会创建若干个新ControlRunnable
最后看看线程池的启动和停止,首先是ThreadPool#start():
public synchronized
void start() {
stopThePool=false;
currentThreadCount = 0;
currentThreadsBusy = 0;
adjustLimits();
pool =
new
ControlRunnable
[maxThreads];
openThreads(minSpareThreads);
if
(maxSpareThreads < maxThreads) {
monitor =
new
MonitorRunnable
(
this
);
}
}
可以看到,它创建了ControlRunnable和监控线程,而ControlRunnable是以数组的形式保存的,所以这个线程池使用的是数组的形式,这样比较高效,启动的时候就已经设置好数组的大小了,运行的时候就不会再改变了。这里的adjustLimits()主要是调整最大线程,最大空闲线程数的边界值,这里就不介绍了。
下面是线程池的停止方法:
public synchronized void
shutdown() {
if
(!stopThePool) {
stopThePool =
true
;
if
(monitor !=
null
) {
monitor.terminate();
monitor =
null
;
}
for
(
int
i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
try
{
pool[i].terminate();
}
catch
(
Throwable
t) {
/*
* Do nothing... The show must go on, we are shutting
* down the pool and nothing should stop that.
*/
log.error("
Ignored exception while shutting down thread poo
l", t);
}
}
currentThreadsBusy = currentThreadCount = 0;
pool =
null
;
notifyAll();
}
}
这里会调用监控线程和工作线程的terminate()方法,让停止标记置为true,从而退出执行关闭线程。
有个问题,ThreadPool#runIt()是怎么被调用的?
启动的时候就会有一个AJP线程执行监听,它是SocketAcceptor,继承了ThreadPoolRunnable。
然后它会调用ChannelSocket#acceptConnections(),这个方法内容如下:
void
acceptConnections() {
if
(log.isDebugEnabled())
log.debug("
Accepting ajp connections on
" + port);
while
(running) {
try
{
MsgContext
ep = createMsgContext(packetSize);
ep.setSource(this);
ep.setWorkerEnv(wEnv);
this
.accept(ep);
if
(!running)
break
;
// Since this is a long-running connection, we don't care
// about the small GC
SocketConnection
ajpConn =
new SocketConnection
(
this
, ep);
tp.runIt(ajpConn);
}
catch
(Exception ex) {
if
(running)
log.warn("
Exception executing accept
", ex);
}
}
}
这里可以看到,是AJP的接收线程,接收到数据之后,会调用tp.runIt(ajpConn),将SocketConnection交给线程池运行,同时也会唤醒一个正在wait()的线程池,所以接收和处理不是同一个线程,但是它们都运行在线程池中。
到这里,Tomcat5.0模式的线程池就介绍完了。
关于Tomcat6.0的线程池模型,也就是JDK的ThreadPoolExecutor,请看这篇文章:
http://rdc.taobao.com/team/jm/archives/595
转载请注明原文地址: https://www.6miu.com/read-1750148.html