Druid数据连接池源码分析

xiaoxiao2021-02-28  13

Druid是阿里巴巴公司的数据库连接池工具,昨天突然想学习一下阿里的druid源码,于是下载下来分析了一下。也就2个多小时粗略看了一下,中间有些知识点没见过,不懂,现查BAIDU学习。简单总结一下,边总结边继续看代码,估计错误不少,欢迎指正!      在自己看之前,想找找druid源码分析,居然在BAIDU上搜索不到任何信息,只是介绍如何配置,只能自己看过来了。这里的介绍,细节不说了,着眼于大方向与设计思路。  1。为监控而生,那么何时,又如何监控呢?      简单的操作数据库通常涉及datasource,connection,preparedstatement ,ResultSet等东西,如果我要监控这些,必然要建些代理类。     我们的操作都由代理类完成,在完成的过程中,产生监控的数据。     druid号称为监控而生,监控功能就是一根针,正所谓见缝插针,没有缝隙就要创造缝隙,所以就要建个代理类,代理类与被代理理之间就是缝隙。而代理对象必然持有被代理对象。      public interface PreparedStatementProxy extends PreparedStatement, StatementProxy      作为实现类:PreparedStatementProxyImpl,就持有一个java.sql.PreparedStatement。      随便看看它的查询方法:      public ResultSet executeQuery() throws SQLException {  .....          return createChain().preparedStatement_executeQuery(this);//产生过滤链,并由过滤链执行。      }    FilterChainImpl中有:  Java代码   public ResultSetProxy preparedStatement_executeQuery(PreparedStatementProxy statement) throws SQLException {       if (this.pos < filterSize) {           return nextFilter().preparedStatement_executeQuery(this, statement);       }       ResultSet resultSet = statement.getRawObject().executeQuery();       return wrap(statement, resultSet);   }       上面的方法说明:在执行查询前,要经过过滤链处理,等处理完了,再由statement执行,执行完了,得到一个ResultSet后,包装一个产生最后返回的代理类。  2.说说统计过滤器吧,只是过滤链上的一环     此模式见的最多的就是web.xml中配置的过滤器filter了,你配置几个过滤器filter,都实现了dofilter()方法,那么把filter们组织成来,放在filterchain的List之类的容器中,那么就可以循环执行dofilter来处理些东东。可以看出filterchain持有所有的filter,那filter执行后,要告诉filterchain执行下一个,所以fiter还必须持有filterchain。但filter不需要一直持有filterchain对象,只是临时持有一会,所以filterchain是作为方法的参数传进来的。这也说明filter可以是这个chain中的一环,也可以同时是另一个chain中的一环,谁来都行。  这个过程,又点象观察者模式,又象回调模式。     看个简单的,StatFilter中的一个统计连接提交的方法:(StatFilter中的过滤方法超级多,对各种数据库操作都记账;当然logfilter中也一样,防御SQL注入攻击的WallFilter,估计也一样:)  Java代码   @Override   public void connection_commit(FilterChain chain, ConnectionProxy connection) throws SQLException {       chain.connection_commit(connection);       JdbcDataSourceStat dataSourceStat = chain.getDataSource().getDataSourceStat();       dataSourceStat.getConnectionStat().incrementConnectionCommitCount();   }     先是让chain去作一步(就是nextFilter开始干活,所有的filter都干完了,就真正commit一下)然后,对数据源的commit的操作计数进行增加。  Java代码   public void connection_commit(ConnectionProxy connection) throws SQLException {       if (this.pos < filterSize) {           nextFilter().connection_commit(this, connection);//让下一个干活           return;       }       connection.getRawObject().commit();//都干完了,才真正提交。这个连接也是一个代理,让里面真正的java.sql.connection提交。   }   private Filter nextFilter() {       Filter filter = getFilters().get(pos++);       return filter;   }   DataSourceProxyConfig中有一个private final List<Filter> filters = new ArrayList<Filter>();//就是普通的arraylist放过滤器。  上面两点合在一起,就是原来执行一个数据库操作,现在给代理类执行,执行中先经过一个个过滤器进行统计,之后再真正执行数据库操作。对最终用户透明的。  3.统计的东东怎么记录的呢?  在stat包里面,随便找一个对象看看吧。比如:JdbcStatementStat  ..........................  Java代码   private final AtomicLong    createCount      = new AtomicLong(0);                                     // 执行createStatement的计数   private final AtomicLong    prepareCount     = new AtomicLong(0);                                     // 执行parepareStatement的计数   private final AtomicLong    prepareCallCount = new AtomicLong(0);                                     // 执行preCall的计数   private final AtomicLong    closeCount       = new AtomicLong(0);    .....................................  哇哦,一大堆统计计数器,都是AtomicLong的,就是线程同步的,增加计数时调用它的incrementAndGet()方法。不过TableStat中就是普通的int了,呵呵。  4.说说最上面的蓝字的执行  ResultSet resultSet = statement.getRawObject().executeQuery();       PreparedStatementProxy代理了一个实现了java.sql.statement接口的对象,所以它用那个对象执行查询。那个对象是什么样的呢?       对了,是这个:DruidPooledPreparedStatement,看的出,这个家伙也是代理了别人,因为它用stat来做查询,只是代理前后做了点其它事情。  Java代码   public ResultSet executeQuery() throws SQLException {       checkOpen();       incrementExecuteCount();       transactionRecord(sql);       oracleSetRowPrefetch();       conn.beforeExecute();       try {           ResultSet rs = stmt.executeQuery();           if (rs == null) {               return null;           }           DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);           addResultSetTrace(poolableResultSet);           return poolableResultSet;       } catch (Throwable t) {           throw checkException(t);       } finally {           conn.afterExecute();       }   }       其中:addResultSetTrace是把查询结果放在List<ResultSet>       resultSetTrace;中。为何?  5.看看几个Holder是干什么的?  DruidConnectionHolder中有什么?new DruidPooledConnection时,正好用这个holder。  Java代码       private final DruidAbstractDataSource       dataSource;       private final Connection                    conn;       private final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();       private final List<StatementEventListener>  statementEventListeners  = new CopyOnWriteArrayList<StatementEventListener>();       private PreparedStatementPool               statementPool;   //这是一个LRU算法的池。放的就是下面的PreparedStatementHolder!!!!       private final List<Statement>               statementTrace           = new ArrayList<Statement>(2);      PreparedStatementHolder中呢?new DruidPooledPreparedStatement时,正好用这个holder。       private final PreparedStatementKey key;       private final PreparedStatement    statement;   Holder从名字来看就是持有什么,DruidConnectionHolder必然持有Connection,PreparedStatementHolder必然持有PreparedStatement。DruidConnectionHolder当然还持有属于这个连接的PreparedStatement之类的。透过几个调用关系,差不多可以猜测出设计思路:  一般我们用connect对象,再产生statment对象,再执行SQL之类的,当我们在一个对象执行前后做一些统计之类的操作时,那就用代理对象来做,比如前面的filterchain用于代理对象中。可是如果调用其它对象时,想把它们之间的一些关联的东东保持下来,比如一个连接下的所有的PreparedStatement,那就需要一个holder对象来帮忙了。也许你可以把一堆其它的东东都给这个对象身上,不过这样就不清晰了,太乱了。  没准holder也可以称为一个设计模式。当然proxy也是,还记得有叫handler的吧,都有行为学上的意义!!  DruidPooledConnection中的PreparedStatement prepareStatement(String sql)方法,就是看看池子里有没有(stmtHolder = holder.getStatementPool().get(key);),没有的话才new PreparedStatementHolder(key, conn.prepareStatement(sql));,有的话内存容器中取了。  大概关系这样的:DruidPooledConnection-->DruidConnectionHolder-->ConnectionProxy-->filterChain---connection。  6.讲讲上面用到的LRU缓存,就是存PreparedStatementHolder的池子。    一种LinkedHashMap吧,自己实现一下removeEldestEntry方法就可以了,容量达到就扔掉最OLD的。  Java代码   public class LRUCache extends LinkedHashMap<PreparedStatementKey, PreparedStatementHolder> {       private static final long serialVersionUID = 1L;       public LRUCache(int maxSize){           super(maxSize, 0.75f, true);       }       protected boolean removeEldestEntry(Entry<PreparedStatementKey, PreparedStatementHolder> eldest) {           boolean remove = (size() > dataSource.getMaxPoolPreparedStatementPerConnectionSize());           if (remove) {               closeRemovedStatement(eldest.getValue());           }           return remove;       }   }   7.MOCK包    这个也不太懂,网上查了一下,说是一些造假的,且方便测试的对象,它实现了相关的接口,所以可以被当成它所假冒的东西使用。特别是真实的东西不方便使用,或者很慢,或者有其它不理想的情况下。  8,sqlPaser    druid下的主要的功能包除这个以外,都介绍完了,说是SQL解析器,不过没空看了,下次看了再补充吧。  9.connectPool连接池    连接池当然是重头戏了,简单先提一下,主要用到的是ReentrantLock锁,还有 notEmpty empty两个条件,生产连接与消费连接的线程在两个条件上等待与唤醒。连接池是由数据源确定的,所以具体要看pool包里的DruidAbstractDataSource与DruidDataSource两个类了。  哇,这两个类很庞大,首先看了一下属性,主要有很多count,time,还有一些default值,留意集合字段,比如Map<DruidPooledConnection, Object> activeConnections  private volatile DruidConnectionHolder[] connections;之类的。里面还有些线程。  9.1创建连接    先看看DruidDataSource里的CreateConnectionThread都干什么,首先是一些条件,比如下面这个代码(省略不重要的代码),连接太多了的时候,在empty条件上等待,就是等空了再运行,现在别急着创建连接,等着吧!  // 防止创建超过maxActive数量的连接  Java代码           if (activeCount + poolingCount >= maxActive) {               empty.await();               continue;           }   try {       connection = createPhysicalConnection();       setFailContinuous(false);   boolean result = put(connection);   后面是创建一个物理连接,然后put一下,这个put很可能是放池子中,那么仔细看一下。主要下面几句,说明都写在后面了:      holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);//产生一个连接holder              connections[poolingCount] = holder;//这不就是池子吗?就是一个DruidConnectionHolder的Array了。              incrementPoolingCount();//池子中的计数加1.              notEmpty.signal();//发出非空的信号,所有在非空条件上等待的线程,你们可能动起来了。              notEmptySignalCount++;  9.2使用连接      那么谁在notEmpty条件上等待呢?我们查一下,发现是方法  DruidConnectionHolder takeLast()之中,当poolingCount中数量为0时等待。正好说明使用连接的线程,当连接没有时,就等待着呗。如果池中有连接呢?就执行下面的语句了:          decrementPoolingCount();//减少池中连接的计数,当然拿走一个少一个了。          DruidConnectionHolder last = connections[poolingCount];//正好拿走的是池中最后一个。          connections[poolingCount] = null;//最后一个就成null了。      顺便看看谁在用takeLast(),查找发现是getPooledConnection(),从名字就知道是获取链接的主要方法了。仔细看看getPooledConnection中又调用getConnection(),这里面又是插入了过滤链,果然是为统计而生,都记录在案了。仔细看filterChain.dataSource_connect()参数中有this,说明它把自己传进去了,说明这个filterChain并不从属于任何datasource,可以是这具数据源,也可以是那个数据源。具体过滤哪个,临时传入。      当我们设计过滤链时,如果我们的功能是为多人服务的,那就说明要传入服务对象进来。而不是setpropety设置成关联关系。一个人如何设计复杂的代码呢?当然是头脑中有一个非常抽象,而明确的思路。  9.2减少连接  在创建连接线程附近还有一个DestroyConnectionThread(),看看吧  跟踪里面,有destroyTask.run();----->shrink(true);看名字是收缩嘛,可能连接空闲的太多了,就缩小呗。  在shrink()方法中,重点有下面的语句,分析放在后面:  final int checkCount = poolingCount - minIdle;//池中的数量-最小空闲数量,感觉是收缩的条件之一嘛。  Java代码   for (DruidConnectionHolder item : evictList) {//可回收的都放在这里了       Connection connection = item.getConnection();       JdbcUtils.close(connection);//关闭这些连接了。       destroyCount.incrementAndGet();   }   9.3 初始化方法init()  都是谁在用这两个线程呢?查找一下,发现创建线程与收缩线程是由void init()来调用的,看名字就知道这是系统启动的主方法了。  void init(){       initFromSPIServiceLoader();//load filters from SPI ServiceLoader,这个spi就不介绍了,在分析dubbo的另一个帖子里,里面已经有SPI介绍了。反正是把配置的filter放在filterchain中(List<Filter>)  Java代码   connections = new DruidConnectionHolder[maxActive];//新建连接池,个数是最大活动连接数maxActive。                   for (int i = 0, size = getInitialSize(); i < size; ++i) {//放入连接池中连接                       PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();                       DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);                       connections[poolingCount] = holder;                       incrementPoolingCount();                   }               createAndLogThread();//看名字是日志,就不看了               createAndStartCreatorThread();//创建连接的线程,一直在工作,池子满了就是等待状态。               createAndStartDestroyThread();//收缩池子的线程,一直在工作。                  initedLatch.await();//主线程在计数器为0前一直等待。       init = true;   }   就里有一个知识点。CountDownLatch             initedLatch             = new CountDownLatch(2); 就叫倒计时同步器。当前同步数为2,在变成0后,主线程才能运行,否则一直等待中。  在创建连接与收缩池子的线程中都有initedLatch.countDown();,一共正好两个,那么主线程就是等待上面两个线程都运行了才运行吧,才置init状态标识为true。看来理解没有错喽。  单看一个create线程,里面有一个countDown方法:  Java代码   protected void createAndStartCreatorThread() {       if (createScheduler == null) {           String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);           createConnectionThread = new CreateConnectionThread(threadName);           createConnectionThread.start();           return;       }       initedLatch.countDown();//如果有createScheduler就直接-1;   }   public class CreateConnectionThread extends Thread {       public void run() {           initedLatch.countDown();//如果没有时,在这里面-1;   10。总结:  看过了源码?我们究竟如何提高?我们也可以做出这么好的东东吗?  1.首先你会掌握足够多的基础知识,比如会用到多线程,concurrent包里的东西,甚至很少用的容器。说明作者看过很厚的资料书或者看过很多java源码。  2.深入学习国外的源码。我发现一些相似的处理方式,在这个产品中出现,在那个产品中也出现,当然不一定完全相同。比如hadoop中的异步转同步与dubbo就差不多,什么 fastfail在秒杀中也有人用。  3.看过两个源码了,应该可以抽象出处理问题的方式了,这样自己碰到相似情况时,马上就可以套用。  【彩蛋】      基本上看完了主要代码,为监控而生这句体会更深了,我们就择其一点,深入体会一下过滤链模式吧,算是本贴的彩蛋。这个东东真的很多地方在用,那么我们如何用呢?使用时,应该有三个对象,分别是被处理的对象,过滤器,过滤器链,之前的帖子我提出一个观点,对象的本质是各种关系的组合,组合是最重要的。那么这两个对象之间的包含关系,引用关系,以及主要的方法应该怎么设计,以及为什么要这么设计呢?  过滤器是一个基本单元,特点是:它不会引用过滤器链,因为它可以属于不同的过滤器链。它不会引用处理对象,因为它可以处理这个对象也可以处理那个对象。所以在这三者中,过滤器的过滤方法中,会传入另外两个对象,而不会在其它属性和方法中产生。web中的过滤器中的dofilter参数就是chain与req与res三个。  过滤链一般都是可装配的,过滤器是一个个基本单元,所以链条要给用户配置的机会,过滤器以后可能会增加,但这个不是给用户的功能,可以通过spi的机制加进来。  过滤器链条是一个组合器,它的特点:产生过滤器时,必须把基本过滤器传进来,而且是稳定的引用关系,但因为链条上有多个过滤器,所以要有一个容器来放它们。所以过滤链持有一个容器,init的时候,放入一个个过滤器。过滤器是串起来一个个执行的,所以还要有一个定位信息,现在执行到那一个了。再深入思考一下,过滤链处理的对象,有的是处理到第2个,有的处理到第5个,有的处理到第3个。那多线程的问题来了。查一下代码(protected int   pos = 0;if (this.pos < filterSize) donext),看来pos定位计数不是一个threadlocal啊,再看public FilterChainImpl(DataSourceProxy dataSource),构造函数中传入的是数据源,所以一个数据源不会传入计数指针,只会传过来过滤器容器。回头我想查一下web中filter的源码,里面的容器是啥,定位是啥?是不是threadlocal变量呢?  ( 补一下,后来在tomcat这里查到了:org.apache.catalina.core.ApplicationFilterChain的属性中 private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];  private int pos = 0; 还包含了beforeFilter和afterFilter等功能,作者肯定学习到这些过滤代码了,而我too young了,看的东西太少了)  有点复杂了,再看看FilterChainImpl,里面并不持有过滤单元,而其中的重要方法,getFilters、nextFilter,都是从构造chain的dataSource中来的。有点象什么呢?象以前你去找饭店吃饭,现在你带着菜让饭店加工。有道理啊,因为过滤链规则并不一定要持有基本单位啊,就象冒泡排序不一定持有排序元素,是可以单独抽象出来的。过滤链既然含有非线程的pos,那么每个过滤链条都是一次性使用,否则pos指针就乱了。那么查找一下工程中所有的new FilterChainImpl(),我们发现pool包里有4个,其它都在proxy包里,正好证明了前面说的见缝插针,弄出个代理来,在代理与被代理之间插入过滤链,或者其它什么你想要的功能。比如ClobProxyImpl中有createChain()方法,而且每个clob操作中都要调用createChain,就是说每个操作都是新的过滤链,那么前面提到的pos计数就是仅自己方法使用,不会有什么共享冲突的发生了。  我们再理一下思路,三个主要的元素(被处理的对象,过滤器,过滤器链),其中过滤器链被拆分了,只剩下过滤规则与计数器。过滤容器交给了被处理对象持有了,变来料加工。过滤链条是每一个proxy对象、准确的说是每一个proxy对象的每一个具体要使用过滤的方法临时create出来的。再理一个思路,我们看一个filter,看接口与对象都成,发现里面有处理connect的一大堆方法,有处理resultset的一大堆方法,有处理statemente的一大堆方法,有处理....。  哇哦,太多了,所有的都在这里,每一个过滤器都很庞大,很全面,是重量级对象,只有一份。一个数据源持有几个过滤器,过滤链不是细长形状,而是矮胖形状的。而你的任何一个对象(当然是代理对象喽)的任何一个简单操作,都会生成一个过滤链。所以你的过滤链中不应该持有过滤器,持有每一个简单计数pos是非常合理的,过滤链是一个轻量级的对象。  再引申思考一下,如果我们一开始想到一个功能,比如这么一个监控功能,我们可以肯定的知道用filter方式,我们能否如这般组织好这些对象之间的关系呢?而且这么组织有没有什么进一步重构的可能呢?  我们可能会有一批与连接相关的过滤器,一批与resultset有关的过滤器;从另一个角度,我们有统计的过滤器,有日志的过滤器。我们还有过滤链规则,我们有很多要过滤的对象,可以先打散,比如过滤链的规则与过滤链容器是分开的。再尝试组合,可以按连接来组合(连接的统计,连接的日志在一起),可以从功能上组合(连接日志与resultset日志在一起)。那么我们插拨的要么是整个统计,要么是整个日志。如果另一种组合,我们可以只要连接的统计与日志,可以只要查询的日志与统计。但如果要连接的统计,要查询的日志,那么druid目前是不支持的。这有点象拆成块,再搭积木的感觉。  再比如说,现在由datasource来持有过滤器容器。那么过滤器的粒度只是不同的数据源。有些过滤是对connect的,有些是对resultset的,所有的数据源的下级对象都是一样的过滤器容器,这方面有没有个性化的需求呢?比如对clob我只想日志不想统计,createChain的时候是不是有什么参数可以配置一下?  另外,在看FilterChainImpl时,它除了有一大批要过滤的方法外,还有好几个wrap方法。wrap恰好是根据原始对象来生成过滤缝隙的代理对象,同时把产生filterchain的条件都传给它,让代理对象执行每一个方法都可以new filterchain出来。而在代理对象的每一个具体方法中,调用filterchain来处理时,又把代理自己给filterchain,filterchain执行过程中,还要用代理对象的getRawObject得到被代理对象来执行最后的业务。如果代理对象连续执行了三个不同的方法,那就是第一次new 一个filterchain,一个链条处理完了就会recycleFilterChain(chain);重置pos计数,后面两个方法就还是用这个代理对象持有的filterchain,不过计数变成0了。  再理一下:我有一个原始对象,通过FilterChainImpl产生代理对象,代理对象再传给FilterChainImpl的具体方法用,方法里面进行过滤后再取出原始对象来用。再比喻:我有一个不锈钢杯子,让别人给包上泥巴,成了一个泥杯子,再让别人做烧制,着色,刻画等处理。最后成为精美的陶瓷水杯,当然喝水功能还是靠原来的不锈钢杯子来实现的,因为它不是不不锈钢菜刀。当然菜刀也可以包上泥巴,也做烧制,着色,刻画等处理,最后成为一个陶瓷刀具。包泥巴和烧制,着色,刻画当然是不同的业务,但可以开一个店来做。  再全面总结一下:  关于druid,配置一个数据源,它持有一堆过滤器零件,持有一个连接池,连接池里有两个主要的线程在跑。我对任何对象做任何操作时,给我包装一下,再产生或重置一个过虑链并过滤处理后再执行操作,就酱紫。  查看图片附件
转载请注明原文地址: https://www.6miu.com/read-850077.html

最新回复(0)