众所周知,libevent支持多种I/O多路复用,如select、poll、epoll、kqueue等。那么其中是如何实现的呢? 主要就是结构体eventop,它内部成员有几个函数指针,统一了每种I/O多路复用的接口,也就是说,要想libevent支持某种I/O多路复用,就必须实现这几种接口。结构体eventop位于event-internal.h中。
这5个函数指针分别指向某个具体的多路I/O复用机制的初始化、添加事件、删除事件、分发事件、释放资源这5个操作上。这样,libevent就可以支持该多路I/O复用机制了。 但是其实我们使用的时候并没有选择用哪一种,说明是有默认选项的。
在event.c文件中,有一个静态全局的eventops数组,并且按照优先级选择用哪一种多路I/O复用机制。定义如下:
static const struct eventop *eventops[] = { #ifdef HAVE_EVENT_PORTS &evportops, #endif #ifdef HAVE_WORKING_KQUEUE &kqops, #endif #ifdef HAVE_EPOLL &epollops, #endif #ifdef HAVE_DEVPOLL &devpollops, #endif #ifdef HAVE_POLL &pollops, #endif #ifdef HAVE_SELECT &selectops, #endif #ifdef WIN32 &win32ops, #endif NULL };这里主要运用了条件编译,在config.h中define了可以支持的多路I/O复用,由于只是简单的define,并且代码比较多,就不在这里列举了,可以自己看一看。
接下来,我们以epoll为例来看一看epollops的真面目。 在epoll.c文件中:
const struct eventop epollops = { "epoll", epoll_init, epoll_add, epoll_del, epoll_dispatch, epoll_dealloc, 1 /* need reinit */ };那么只用实现epoll_init、epoll_add、epoll_del、epoll_dispatch、epoll_dealloc这几个函数就行了。 首先先看一下这几个函数要用到的结构体
//对应读和写事件 struct evepoll { struct event *evread; struct event *evwrite; }; struct epollop { //每个fd可对应读/写事件 struct evepoll *fds; //fd的数量 int nfds; //epoll事件 struct epoll_event *events; //事件的数量 int nevents; //epoll专用文件描述符 int epfd; };接下来是epoll_init:
static void * epoll_init(struct event_base *base) { int epfd; struct epollop *epollop; /* Disable epollueue when this environment variable is set */ if (evutil_getenv("EVENT_NOEPOLL")) return (NULL); /* Initalize the kernel queue */ //创建epoll句柄 if ((epfd = epoll_create(32000)) == -1) { if (errno != ENOSYS) event_warn("epoll_create"); return (NULL); } //这是为了防止在使用多进程时,子进程继承父进程打开的文件描述符及权限。所以设置FD_CLOEXEC标志。 FD_CLOSEONEXEC(epfd); //给epollop申请空间 if (!(epollop = calloc(1, sizeof(struct epollop)))) return (NULL); epollop->epfd = epfd; /* Initalize fields */ epollop->events = malloc(INITIAL_NEVENTS * sizeof(struct epoll_event)); //当申请空间失败时,把之前申请的也释放了,然后return if (epollop->events == NULL) { free(epollop); return (NULL); } epollop->nevents = INITIAL_NEVENTS; epollop->fds = calloc(INITIAL_NFILES, sizeof(struct evepoll)); //同理 if (epollop->fds == NULL) { free(epollop->events); free(epollop); return (NULL); } epollop->nfds = INITIAL_NFILES; //由于libevent为了将signal也集成到事件主循环中,所以使用了套结字对(socket pair)。这个函数就用于创建socket pair和初始化evsignal_info evsignal_init(base); return (epollop); }epoll_add:
static int epoll_add(void *arg, struct event *ev) { struct epollop *epollop = arg; struct epoll_event epev = {0, {0}}; struct evepoll *evep; int fd, op, events; //如果是signal事件,直接调用evsignal_add来添加就行了 if (ev->ev_events & EV_SIGNAL) return (evsignal_add(ev)); fd = ev->ev_fd; //当前的文件描述符大于nfds时,需要重新扩展。(这点是利用了linux系统优先分配空闲的最小值fd) if (fd >= epollop->nfds) { /* Extent the file descriptor array as necessary */ if (epoll_recalc(ev->ev_base, epollop, fd) == -1) return (-1); } //evep是当前fd(需要add的)对应的struct evepoll(里面是读/写事件) evep = &epollop->fds[fd]; //对应的默认操作是添加操作 op = EPOLL_CTL_ADD; events = 0; //如果已经指向了一个读事件,证明该fd已经在epoll监听中了,所以应该将操作改为EPOLL_CTL_MOD,但是为了防止以前的监听读事件标志被覆盖,所以重新加上。 if (evep->evread != NULL) { //监听读事件 events |= EPOLLIN; op = EPOLL_CTL_MOD; } //同理 if (evep->evwrite != NULL) { events |= EPOLLOUT; op = EPOLL_CTL_MOD; } //如果设置了EV_READ标志,说明是读事件 if (ev->ev_events & EV_READ) events |= EPOLLIN; //如果设置了EV_WRITE标志,说明是写事件 if (ev->ev_events & EV_WRITE) events |= EPOLLOUT; //设置struct epoll_event epev.data.fd = fd; epev.events = events; //修改/增加fd到监听的epollop->epfd中去 if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1) return (-1); /* Update events responsible */ //如果是读事件,那么让evread指向该event if (ev->ev_events & EV_READ) evep->evread = ev; //如果是写事件,那么让evwrite指向该event if (ev->ev_events & EV_WRITE) evep->evwrite = ev; return (0); }接下来便是最复杂的epoll_dispatch了:
static int epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv) { struct epollop *epollop = arg; struct epoll_event *events = epollop->events; struct evepoll *evep; int i, res, timeout = -1; //如果设置了超时等待时间,那么就将这时间具体多少ms算出来 if (tv != NULL) timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000; //如果该等待时间大于了最长的等待时间,那就直接设置为最长等待时间(该最长等待时间是epoll.c里面设定的) //#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000) //即最久等35分钟.....不过可以自己修改 if (timeout > MAX_EPOLL_TIMEOUT_MSEC) { /* Linux kernels can wait forever if the timeout is too big; * see comment on MAX_EPOLL_TIMEOUT_MSEC. */ //linux内核是可以无限等的,但是关键还是看MAX_EPOLL_TIMEOUT_MSEC timeout = MAX_EPOLL_TIMEOUT_MSEC; } //epoll_wait函数我相信你应该懂的,返回值是触发了的事件总数 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); //返回-1代表出错了 if (res == -1) { //如果不是被信号中断的,那么直接报错了 if (errno != EINTR) { event_warn("epoll_wait"); return (-1); } //处理signal事件 evsignal_process(base); return (0); } else if //查看是否有信号的标志,如若发生,则处理signal事件 (base->sig.evsignal_caught) { evsignal_process(base); } event_debug(("%s: epoll_wait reports %d", __func__, res)); //依次处理被触发的事件 for (i = 0; i < res; i++) { //what:记录什么类型的事件 int what = events[i].events; struct event *evread = NULL, *evwrite = NULL; int fd = events[i].data.fd; if (fd < 0 || fd >= epollop->nfds) continue; evep = &epollop->fds[fd]; //如果是被挂断或者出错导致被触发 if (what & (EPOLLHUP|EPOLLERR)) { evread = evep->evread; evwrite = evep->evwrite; } else { if (what & EPOLLIN) { evread = evep->evread; } if (what & EPOLLOUT) { evwrite = evep->evwrite; } } //如果读/写事件都没有,直接结束本次循环 if (!(evread||evwrite)) continue; //手动激活读/写事件 if (evread != NULL) event_active(evread, EV_READ, 1); if (evwrite != NULL) event_active(evwrite, EV_WRITE, 1); } //当nevents不够用的时候,重新分配 if (res == epollop->nevents && epollop->nevents < MAX_NEVENTS) { /* We used all of the event space this time. We should be ready for more events next time. */ int new_nevents = epollop->nevents * 2; struct epoll_event *new_events; new_events = realloc(epollop->events, new_nevents * sizeof(struct epoll_event)); if (new_events) { epollop->events = new_events; epollop->nevents = new_nevents; } } return (0); }注意epoll_dispatch函数中并没有调用处理事件的业务,而是在event_base_loop中由event_process_actice调用。相当于它的工作是只负责把事件添加到激活队列中,然后由event_process_actice处理。
接下来只剩epoll_del和epoll_dealloc还有epoll_recalc了,epoll_del的逻辑和epoll_add大致相似,就不在这里列出了。而epoll_recalc无非就是重新分配内存,也没什么需要注意的,最后就来个epoll_dealloc收尾吧。
epoll_dealloc:
static void epoll_dealloc(struct event_base *base, void *arg) { struct epollop *epollop = arg; evsignal_dealloc(base); if (epollop->fds) free(epollop->fds); if (epollop->events) free(epollop->events); if (epollop->epfd >= 0) close(epollop->epfd); //释放了空间之后,别忘了将指针指向的地方赋为null,不然指向的是已经释放了的空间,造成野指针 memset(epollop, 0, sizeof(struct epollop)); free(epollop); }这里我们来关注一下void *arg,前面已经出现过多次,可能你会对其产生疑惑,而在event_init中的函数类型也是void *,而返回的是struct epollop *,在event_add或者event_base_loop中,调用的都是evsel->add(evbase, ev)或者evsel->dispatch(base, evbase, tv_p)这样的,这说明每一个多路I/O复用都对应有它自己的struct xxxop *。之所以返回值是void *,是为了将不同的struct xxxop *转换成统一的指针类型。
在本节中,我们终于知道了神秘的libevent如何同时支持了那么多种多路I/O复用机制,再配合上前面讲的主循环以及event还有event_base以及信号、定时事件。你应该可以跟着事件的主循环,在脑海中浮现出一个一个的事件是如何被添加到事件链表,以及被激活,等待调度,调度之后被销毁等等的场景了。 接下来,我们将研究libevent库中缓冲区的部分。