Muduo之Acceptor源码分析笔记

xiaoxiao2021-02-27  299

Muduo之Acceptor源码解析

前面我们大概介绍了Muduo网络库的整体架构以及Reactor模式的一般架构,从这里就开始分析Muduo网络库中用到的一些类。

本篇文章主要是介绍Acceptor

muduo网络库的example中作为server的类都是继承于class TcpServer, 而TcpServer初始化的时候创建一个Acceptor对象、一个EventThreadPool对象以及一些Callback。因此从整体上来看Acceptor主要就是供TcpServer使用,生命期由后者控制,负责管理服务器监听的socket并将回调通知使用者,进行连接分发。下面我就专门分析下Acceptor的实现。

首先先看下类图:

首先从class Acceptor类的声明中可以看出该类包含主要包含如下几个数据成员:

class Acceptor : boost::noncopyable { public: typedef boost::function<void (int sockfd, const InetAddress&)> NewConnectionCallback; Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport); ~Acceptor(); void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } bool listenning() const { return listenning_; } void listen(); private: void handleRead(); EventLoop* loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listenning_; int idleFd_; };

从上面可以看到主要包含一个EventLoop、Socket和一个Channel对象。 Acceptor主要负责服务器端的端口监听、事件连接以及事件分发。

acceptSocket_就是服务器监听端口的对象,class Socket把传统意义上的socket进行封装,而class Channel又是对socket的封装,他将socket以及对应的多个处理函数Callback封装在一起,那么当socket发生事件的时候,能够很快的找到相应的处理函数。 handleRead()是读事件处理函数,服务器因为会把具体事件分发给别的线程,那么针对主线程来说就只有连接建立的读事件需要处理。

而loop_就是Acceptor的EventLoop,因为Acceptor需要一直监听连接事件,Acceptor拥有自己的EventLoop和Channel。

通过源代码中的例子可以看到如果要服务器开始工作总是需要调用server.start()和loop_.loop()

当我们通过TcpServer.start()的时候,实际上就是调用Acceptor.listen()。

那么就详细分析下Acceptor这个类。

构造函数

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_(loop), acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), acceptChannel_(loop, acceptSocket_.fd()), listenning_(false), idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { assert(idleFd_ >= 0); acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(reuseport); acceptSocket_.bindAddress(listenAddr); acceptChannel_.setReadCallback( boost::bind(&Acceptor::handleRead, this)); }

从构造函数的传入参数可以看出需要传入当前线程的EventLoop对象,而在acceptChannel的初始化中完成了Loop和acceptSocket的关联。

前三句话是设置监听套接字的,而之前我们提到监听服务器一般只会有读事件发生,那么最后一行就是设置读事件发生的回调函数的,当有连接请求建立的时候时候就会产生读事件,那么这时候就会回调handleRead(),

读事件回调函数

void Acceptor::handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; //FIXME loop until no more int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // string hostport = peerAddr.toIpPort(); // LOG_TRACE << "Accepts of " << hostport; if (newConnectionCallback_) { //这边就开始分发连接了。 newConnectionCallback_(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }

通过accpet可以获得连接建立的套接字,然后判断是否newConnectionCallback回调是否设置,有的话就直接调用它。这个回调函数是在TcpServer的构造函数中设置的,这边是关联到TcpServer::newConnection(),这个函数呆会再分析。

回到之前当TcpServer初始化完成之后(这里其实就是把accpetor和threadPool以及该主线程需要的EventLoop都初始化好了,之后就会调用start()),那么我们这边再来看下Acceptor::listen()

Acceptor::listen()

void Acceptor::listen() { loop_->assertInLoopThread(); listenning_ = true; acceptSocket_.listen(); acceptChannel_.enableReading(); }

listen函数首先判断loop是否在当前线程中,之后设置listening为true表明开始监听过程,之后就是调用Socket.listen()以及Channel.enableReading(),前者最后还是会执行系统调用的::listen(),关键是后者,我们有提到Channel是对socket和回调函数的封装,他也会有成员指针指向该线程的loop,我们就需要将读事件设置到关心的条件里面,这里就是通过Channgel类里面的如下几个接口去设置的。

void enableReading() { events_ |= kReadEvent; update(); } void disableReading() { events_ &= ~kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); }

而其中的update()就会去调用loop_->updateChannel(),在Looper里面再去更新Poller关心的条件/监听的事件。然后调用loop.loop()就开始poll了。

回到刚才的TcpServer::newConnection(),我们来看下当有新的连接请求进来的时候回发生什么?

事件的分发

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[64]; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe //这边因为TcpServer的线程ID和选择的ioLoop肯定不相同,所以在runInloop里面会直接将要执行的任务添加到ioLoop的任务队列里面 ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); }

从上面的代码可以看出首先从threadPool里面按照一定原则取出一个Eventloop,然后创建了一个TcpConnection对象并进行初始化,然后将该TcpConnection添加到刚才取出的EventLoop的任务队列里面。

总结

从上面的分析可以看出,Acceptor有自己的EventLoop和Channel、Socket对象,因为监听连接请求也是个poll行为。Acceptor通过将这些对象进行封装,实现了服务器端的端口监听以及连接分发。

当收到连接请求的时候,会从threadPool里面取一个EventLoop然后创建一个TcpConnection对象并加入到任务队列里面。

转载请注明原文地址: https://www.6miu.com/read-5519.html

最新回复(0)