服务端消费客户端发来的消息进行分析和展示,所以这个的初始化指的是CatHomeModule的初始化
CatHomeModule依赖TcpSocketReceiver和MessageConsumer,前者用来接收客户端发送的消息,后者用来消费消息。 TcpSocketReceiver通过Messagecodec对MessageQueue中的MessageTree进行解码,还原成为MessageTree,然后通过MessageHandler调用Consumer对消息进行消费。(这个消费的过程其实是一个消息分发的过程。消息有不同的消息分析器) 消费的过程是一个周期性的过程,对应上图右边部分。一个Period代表一个周期,每个周期对应一个持续时间(duration),默认为一小时。 RealTimeConsumer是MessageConsumer的实现类,他的作用是进行实时的消费,如何实现周期性消费呢?他需要依赖PeriodManager,进行周期管理。所以在初始化MessageConsumer的过程中会初始化PeriodManager。并且开启periodmanager的守护线程,进行周期开始和结束的控制。
通过m_strategy.next(now)方法进行时间对比,返回大于零或小于零的值,来决定是开始新的周期还是结束旧的周期。这个线程是每隔一秒执行一次的。
public long next(long now) { long startTime = now - now % m_duration;//得到一个整点的时间,作为开始时间,如果now是10:05,startTime就是10:00 Date nowTime = new Date(startTime); // for current period 当前周期返回开始时间, //第一次进入的时候m_lastStartTime初始化为周期开始时间 if (startTime > m_lastStartTime) { m_lastStartTime = startTime; return startTime; } // prepare next period ahead //下一个时期 返回大于0的值,则开始新的周期 if (now - m_lastStartTime >= m_duration - m_aheadTime) { m_lastStartTime = startTime + m_duration; return startTime + m_duration; } // last period is over 上一个周期已经结束了 //返回小于零的值,销毁上一个周期 if (now - m_lastEndTime >= m_duration + m_extraTime) { long lastEndTime = m_lastEndTime; m_lastEndTime = startTime; return -lastEndTime; }在初始化周期管理器的时候,会执行startPeriod方法,来开启一个周期。实例化Period,在这个过程中,会通过analyzerManager得到12种分析器,并声明一个m_task的HashMap<String,List> key就是分析器名称,value就是List,分析器不同对应的list的size也不同,例如transaction分析比较耗时,就会分配两个PeriodTask,如下图
private void startPeriod(long startTime) { long endTime = startTime + m_strategy.getDuration(); Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger); m_periods.add(period); period.start(); }实例化完成后,将period加入到m_periods中,然后调用period.start方法,这个方法循环m_task每一种分析器,启动periodTask的线程,进行analyze,这个方法会一直对队列进行轮询,从队列中取出tree,进行process处理, process是抽象方法,具体会由重写了该方法的子类去执行process方法
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) { List<PeriodTask> taskList = tasks.getValue(); for (int i = 0; i < taskList.size(); i++) { PeriodTask task = taskList.get(i); task.setIndex(i); Threads.forGroup("Cat-RealtimeConsumer").start(task); } } @Override public void run() { try { m_analyzer.analyze(m_queue); } catch (Exception e) { Cat.logError(e); } }讲完有关周期的初始化过程,我们在回头看看server接收到消息是如何放到消费队列中的
tcp接收到消息---->MessageDocoder: decode—>DefaulMessageHandler : handle(MessageTree tree) m_consumer.consume(tree)----> RealtimeConsumer consume方法
@Override public void consume(MessageTree tree) { long timestamp = tree.getMessage().getTimestamp(); Period period = m_periodManager.findPeriod(timestamp); if (period != null) { period.distribute(tree); } else { m_serverStateManager.addNetworkTimeError(1); } } 根据当前时间,找到当前时间对应的周期Period然后调用周期的distribute方法进行消息的分发 a) 循环m_tasks i. 获得list中的某一个PeriodTask ii. 将消息放到队列(m_queue)中可以看出,一个消息将被放到所有类型的消息分析器进行分析
分析器将与后边做具体的分析
说明: 本文涉及到的UML图片来源于大众点评Cat–Server模块架构分析这篇文章,部分内容也有参考