这一章节,我们来分析多线程TcpServer,首先分析muduo one loop per thread的基石——EventLoopThread class,EventLoopThread会启动自己的线程,并在其中运行EventLoop::loop(),关键的startLoop()函数定义如下,此函数会返回新线程中的EventLoop对象的地址:
EventLoop* EventLoopThread::startLoop() { assert(started_ == false); started_ = true; { std::unique_lock<mutex> ulck(mutex_); while(loop_ == NULL) { cond_.wait(ulck); } } return loop_; }由于EventLoop是在新线程中建立的,而在构造函数中,新线程是threadFunc: EventLoopThread::EventLoopThread() :loop_(NULL), exiting_(false), started_(false), thread_(std::bind(&EventLoopThread::threadFunc,this)), mutex_(), cond_() { }因此,在startLoop中需要用条件变量等待EventLoop的建立,直到新线程中EventLoop变量的建立并赋值给loop_成员变量: void EventLoopThread::threadFunc() { EventLoop loop; { std::unique_lock<mutex> ulck(mutex_); loop_ = &loop; cond_.notify_one(); } std::cout<<"thread id:"<<std::this_thread::get_id()<<"\n"; loop_->loop(); }之后,如果想要在新线程中添加任务,要么添加channel,要么使用runInLoop函数。由于EventLoop是局部变量,threadFunc函数结束后,变量就失效。接着我们来分析一下EventLoopThreadPool。用one loop per thread思想实现多线程TcpServer的关键步骤是在新建的TcpConnection时从event loop pool里挑选一个loop给TcpConnection用。就是说多线程TcpServer自己的EventLoop只用来接受新连接,而新连接会用其他EventLoop来执行IO,EventLoopThreadPool类定义如下:
class EventLoopThreadPool : boost::noncopyable { public: EventLoopThreadPool(EventLoop* baseLoop); ~EventLoopThreadPool(); void setThreadNum(int numThreads) { numThreads_ = numThreads; } void start(); EventLoop* getNextLoop(); private: EventLoop* baseLoop_; bool started_; int numThreads_; int next_; // always in loop thread boost::ptr_vector<EventLoopThread> threads_; std::vector<EventLoop*> loops_; };其中start函数用来向loops_内添加内容(由于都是nocopyable,所以智能拷贝指针),最主要的函数是getNextLoop: EventLoop* EventLoopThreadPool::getNextLoop() { baseLoop_->assertInLoopThread(); EventLoop* loop = baseLoop_; if (!loops_.empty()) { // round-robin loop = loops_[next_]; ++next_; if (static_cast<size_t>(next_) >= loops_.size()) { next_ = 0; } } return loop; }记录一个next_下标,采用轮询的方式返回pool中的EventLoopThread,每个TcpServer都有自己的EventLoopThreadPool。