Tomcat源码分析---线程池原理

xiaoxiao2021-02-28  22

Tomcat线程池目前出现过三种,第一个是5.0的线程池模型,这个线程池目前在6.x的版本还存在,主要是用于AJP的, 第二个是5.5.x时代使用了一仲线程池 第三个是6.x版本的线程池,实际上这个不是tomcat的线程池了,tomcat使用的是JDK的线程池ThreadPoolExecutor 首先来分析一下最老的版本5.0使用的线程池 线程池只有一个类,ThreadPool,它里面包含了两个内部类一个接口 静态内部类 ControlRunnable,这是用来运行工作线程的 静态内部类  MonitorRunnable,这是监控线程,用来回收空闲的线程 接口 ThreadPoolListener,这个很奇怪,我全文搜索了,只有ThreadPool的一些方法使用了这个接口,但是它却没有实现累 首先来看看监控类 MonitorRunnable,它是通过ControlRunnable类来启动的,它的run()方法如下:          public void  run() {              while ( true ) {                 try {                      // Sleep for a while.                      synchronized ( this ) {                          this .wait(interval);                     }                     // Check if should terminate.                     // termination happens when the pool is shutting down.                      if (shouldTerminate) {                          break ;                     }                      // Harvest idle threads.                     p.checkSpareControllers();                 }  catch ( Throwable  t) {              ThreadPool .log.error(" Unexpected exception ", t);                 }             }         } interval在创建的时候就已经指定是,是60 * 1000,也就是会睡眠60秒, 这个类最主要的工作是 每过60秒检查一下是否有空闲的线程,就是空闲线程数 是否大于 最大空闲线程数,如果是的话,就回收一些线程,保证最多空闲的线程数=最大空闲线程数 checkSpareControllers()内容如下:      protected synchronized void  checkSpareControllers() {          if (stopThePool) {              return ;         }          if ((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {              int  toFree = currentThreadCount - currentThreadsBusy -maxSpareThreads;              for ( int  i = 0 ; i < toFree ; i++) {                  ControlRunnable  c = pool[currentThreadCount - currentThreadsBusy - 1];                 c.terminate();                 pool[currentThreadCount - currentThreadsBusy - 1] =  null ;                 currentThreadCount --;             }         }     } 再来看看运行工作线程 ControlRunnable,它的run()方法如下:          public void  run() {              boolean  _shouldRun =  false ;              boolean  _shouldTerminate =  false ;              ThreadPoolRunnable  _toRun =  null ;             try {                  while  ( true ) {                      try  {                          /* Wait for work. */                          synchronized  ( this ) {                              while  (!shouldRun && !shouldTerminate) {                                  this .wait();                             }                             _shouldRun = shouldRun;                             _shouldTerminate = shouldTerminate;                             _toRun = toRun;                         }                          if  (_shouldTerminate) {                              if  ( ThreadPool .log.isDebugEnabled())                                  ThreadPool .log.debug(" Terminate ");                              break ;                         }                          /* Check if should execute a runnable.  */                          try  {                              if  (noThData) {                                  if  (_toRun !=  null ) {                                      Object  thData[] = _toRun.getInitData();                                     t.setThreadData(p, thData);                                      if  ( ThreadPool .log.isDebugEnabled())                                         ThreadPool.log.debug(" Getting new thread data ");                                 }                                 noThData =  false ;                             }                              if  (_shouldRun) {                                  if  (_toRun !=  null ) {                                     _toRun.runIt(t.getThreadData(p));                                 }  else if  (toRunRunnable !=  null ) {                                     toRunRunnable.run();                                 }  else  {                                      if  ( ThreadPool .log.isDebugEnabled())                                      ThreadPool .log.debug(" No toRun ??? ");                                 }                             }                         }  catch  ( Throwable  t) {                              ThreadPool .log.error(sm.getString                                 (" threadpool.thread_error ", t, toRun.toString()));                             /*                              * The runnable throw an exception (can be even a ThreadDeath),                              * signalling that the thread die.                              *                             * The meaning is that we should release the thread from                             * the pool.                             */                             _shouldTerminate =  true ;                             _shouldRun =  false ;                             p.notifyThreadEnd( this );                         }  finally  {                              if  (_shouldRun) {                                 shouldRun =  false ;                                  /*                                 * Notify the pool that the thread is now idle.                                  */                                 p.returnController( this );                             }                         }                         /*                         * Check if should terminate.                         * termination happens when the pool is shutting down.                         */                          if  (_shouldTerminate) {                              break ;                         }                     }  catch  ( InterruptedException  ie) {  /* for the wait operation */                          // can never happen, since we don't call interrupt                          ThreadPool .log.error(" Unexpected exception ", ie);                     }                 }             }  finally  {                 p.removeThread( Thread .currentThread());             }         } 这里有一个ThreadPoolRunnable,这是一个接口,里面有一个runIt()方法,可以把它当做Runnable来看,凡是实现了这个接口的类,工作线程都会负责调用这个类的runIt()执行。 ControlRunnable#run()方法的最开始是让这个线程wait(),因为一开始是没有数据的所以就一直等待,直到执行了ControlRunnable#runIt()方法之后,就会唤醒一个等待的线程:          public synchronized void  runIt( ThreadPoolRunnable  toRun) {          this .toRun = toRun;         // Do not re-init, the whole idea is to run init only once per         // thread - the pool is supposed to run a single task, that is         // initialized once.              // noThData = true;             shouldRun =  true ;             this.notify();         }   当调用了这个方法之后,ControlRunnable#run()就会继续执行下去,之后会做一个初始化数据的工作,也就是if (noThData) 那个判断,工作线程里每个运行的线程实际上扩展了Thread,用的是ThreadWithAttribute,它可以将一些数据绑定在当前线程上,所以这个初始化数据是从ThreadPoolRunnable的实现类上取出数据,绑定到当前线程上。   初始化完成之后就开始真正运行了,如果当前的类继承自Runnalbe或者ThreadPoolRunnable都会被工作线程调用,当工作线程执行完后会将shouldTerminate标记设置为false,这样当下次循环的时候就会继续wait()了。最后将自己返还给工作线程数组,然后notify(),这是唤醒其他线程,因为可能因为当前的线程数超过了最大线程了,被迫等待,所以这里需要有一个唤醒机制告诉工作线程数组,有空闲的线程可以调用了。   如果发生错误了就退出循环,当退出循环后会将自己从线程map中删除,注意是线程map,不是工作线程数组,所以线程池中目前的线程数不会减少,只有等到监控线程检查完后才有可能减少。  当执行ThreadPool#runIt()方法时,会将一个ThreadPoolRunnable的实现类交给线程池运行,线程池首先需要从数组中找出一个线程,然后调用运行这个线程。 runIt()方法如下:      public void  runIt( ThreadPoolRunnable  r) {          if ( null  == r) {              throw new   NullPointerException ();         }          ControlRunnable  c = findControlRunnable();         c.runIt(r);     } 调用ControlRunnable#runIt(),这样就将ThreadPoolRunnable的实现类交给了线程池,然后会唤醒一个线程。注意这里有一步很重要,在ControlRunnable#runIt()里,这是一个同步方法,执行了语句: this .toRun = toRun; 这样它拿到的就是成员变量toRun,因为另一个线程赋值的时候是讲ThreadPoolRunnable的实现赋给了ThreadPool的成员变量toRun,所以线程池数组里每一个正在等待的ControlRunnable都能拿到这个值,还记得happend before原则吗,这个赋值和获取值都是同步的,所以肯定没有问题,任意唤醒的一个线程在它的同步块中会拿到这个toRun:                          synchronized  ( this ) {                              while  (!shouldRun && !shouldTerminate) {                                  this .wait();                             }                             _shouldRun = shouldRun;                             _shouldTerminate = shouldTerminate;                             _toRun = toRun;                         } 如果不是赋给ThreadPool的成员变量,而是随便赋给一个线程池成员ControlRunnable,那就有问题了,比如赋给的是A,但是唤醒是随意的,可能唤醒了B,这样B就没有toRun的值导致无法运行,而由了这个同步的赋值,同步的取值就没问题了。 下面看一下findControlRunnable()方法:      private   ControlRunnable  findControlRunnable() {          ControlRunnable  c= null ;          if  ( stopThePool ) {              throw new  IllegalStateException ();         }          // Obtain a free thread from the pool.          synchronized ( this ) {              while  (currentThreadsBusy == currentThreadCount) {                   // All threads are busy                  if  (currentThreadCount < maxThreads) {                     // Not all threads were open,                     // Open new threads up to the max number of idel threads                      int  toOpen = currentThreadCount + minSpareThreads;                     openThreads(toOpen);                 }  else  {                     logFull(log, currentThreadCount, maxThreads);                      // Wait for a thread to become idel.                      try  {                          this .wait();                     }                     // was just catch Throwable -- but no other                     // exceptions can be thrown by wait, right?                     // So we catch and ignore this one, since                     // it'll never actually happen, since nowhere                     // do we say pool.interrupt().                      catch ( InterruptedException  e) {                         log.error(" Unexpected exception ", e);                     }              if ( log.isDebugEnabled() ) {             log.debug(" Finished waiting: CTC= "+currentThreadCount +" , CTB= " + currentThreadsBusy);                     }                      // Pool was stopped. Get away of the pool.                      if ( stopThePool) {                          break ;                     }                 }             }              // Pool was stopped. Get away of the pool.              if (0 == currentThreadCount || stopThePool) {                  throw new   IllegalStateException ();             }                                 // If we are here it means that there is a free thread. Take it.              int  pos = currentThreadCount - currentThreadsBusy - 1;             c = pool[pos];             pool[pos] = null;             currentThreadsBusy++;         }          return  c;     } 这段代码的意思是坚持当前繁忙线程如果==当前线程,就是说所有的线程都在忙着执行那么会新开启一些线程,但如果当前的现在已经等于最大线程数了,那么就wait(),等有线程执行完了会notify()的。如果满足条件就新创建一些线程,最后将线程数组中的pos位置设置为null,表示这个位置的工作线程已经被取走了,然后返回这个工作线程ControlRunnable。 再来看看它的openThreads():      protected void  openThreads( int  toOpen) {          if (toOpen > maxThreads) {             toOpen = maxThreads;         }          for ( int  i = currentThreadCount ; i < toOpen ; i++) {             pool[i - currentThreadsBusy] =  new  ControlRunnable(this);         }         currentThreadCount = toOpen;     } 这里会创建若干个新ControlRunnable 最后看看线程池的启动和停止,首先是ThreadPool#start():      public synchronized  void start() {         stopThePool=false;         currentThreadCount  = 0;         currentThreadsBusy  = 0;         adjustLimits();         pool =  new  ControlRunnable [maxThreads];         openThreads(minSpareThreads);          if  (maxSpareThreads < maxThreads) {             monitor =  new  MonitorRunnable ( this );         }     } 可以看到,它创建了ControlRunnable和监控线程,而ControlRunnable是以数组的形式保存的,所以这个线程池使用的是数组的形式,这样比较高效,启动的时候就已经设置好数组的大小了,运行的时候就不会再改变了。这里的adjustLimits()主要是调整最大线程,最大空闲线程数的边界值,这里就不介绍了。 下面是线程池的停止方法:      public synchronized void  shutdown() {          if (!stopThePool) {             stopThePool =  true ;              if  (monitor !=  null ) {                 monitor.terminate();                 monitor =  null ;             }              for ( int  i = 0; i < currentThreadCount - currentThreadsBusy; i++) {                  try  {                     pool[i].terminate();                 }  catch ( Throwable  t) {               /*              * Do nothing... The show must go on, we are shutting              * down the pool and nothing should stop that.              */             log.error(" Ignored exception while shutting down thread poo l", t);                 }             }             currentThreadsBusy = currentThreadCount = 0;             pool =  null ;             notifyAll();         }     } 这里会调用监控线程和工作线程的terminate()方法,让停止标记置为true,从而退出执行关闭线程。 有个问题,ThreadPool#runIt()是怎么被调用的? 启动的时候就会有一个AJP线程执行监听,它是SocketAcceptor,继承了ThreadPoolRunnable。 然后它会调用ChannelSocket#acceptConnections(),这个方法内容如下:      void  acceptConnections() {          if  (log.isDebugEnabled())             log.debug(" Accepting ajp connections on  " + port);          while  (running) {              try  {                  MsgContext  ep = createMsgContext(packetSize);                 ep.setSource(this);                 ep.setWorkerEnv(wEnv);                  this .accept(ep);                  if  (!running)                      break ;                 // Since this is a long-running connection, we don't care                 // about the small GC                  SocketConnection  ajpConn =  new SocketConnection ( this , ep);                 tp.runIt(ajpConn);             }  catch  (Exception ex) {                  if  (running)                     log.warn(" Exception executing accept ", ex);             }         }     } 这里可以看到,是AJP的接收线程,接收到数据之后,会调用tp.runIt(ajpConn),将SocketConnection交给线程池运行,同时也会唤醒一个正在wait()的线程池,所以接收和处理不是同一个线程,但是它们都运行在线程池中。 到这里,Tomcat5.0模式的线程池就介绍完了。 关于Tomcat6.0的线程池模型,也就是JDK的ThreadPoolExecutor,请看这篇文章: http://rdc.taobao.com/team/jm/archives/595
转载请注明原文地址: https://www.6miu.com/read-1750148.html

最新回复(0)