一、事件多路分发器一般使用I/O复用接口。 Libvent中使用eventop 结构作为后端I/O复用的统一接口, libevent里面称之为后端, 也就是一种用于检测哪种事件已经就绪的方法。
/** Structure to define the backend of a given event_base. */ struct eventop { const char *name; void *(*init)(struct event_base *); int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); int (*dispatch)(struct event_base *, struct timeval *); void (*dealloc)(struct event_base *); int need_reinit; enum event_method_feature features; size_t fdinfo_len; };name: 表示后端的名字,如:evport, kqueue, epoll, devpoll, poll, select, win32等. init: 回调函数,入参为event_base, 该函数创建一个新结构体,这个结构体里面包含了后端需要使用的数据, 返回一个void*指针,这个返回值指针最终赋值给event_base.ev_base add: 回调函数,将fd或者信号加入到对应的I/O复用结构中 del: 回调函数,将fd或者信号移除对应的I/O服用结构 dispath: 回调函数, 实现事件循环处理的主要函数,等待事件发生,将对应事件加入激活事件队列,调用事件处理函数 dealloc:回调函数,从event_base中清理和释放数据 need_reinit: 标记位, 是否需要重新初始化event base features: 位数组,表示后端能够支持的event_method_features fdinfo_len: 额外需要为每一个fd记录的信息长度
二、在Libvent中eventop 存放在event_base结构中的const struct eventop *evsel字段, 创建event_base时初始化,代码如下:
struct event_base * event_base_new_with_config(const struct event_config *cfg) { ..... for (i = 0; eventops[i] && !base->evbase; i++) { if (cfg != NULL) { /* determine if this backend should be avoided */ if (event_config_is_avoided_method(cfg, eventops[i]->name)) continue; if ((eventops[i]->features & cfg->require_features)!= cfg->require_features) continue; } /* also obey the environment variables */ if (should_check_environment && event_is_method_disabled(eventops[i]->name)) continue; base->evsel = eventops[i]; base->evbase = base->evsel->init(base); } if (base->evbase == NULL) { event_warnx("%s: no event mechanism available", __func__); base->evsel = NULL; event_base_free(base); return NULL; } ..... }重点关注的结构是eventops数组,取一个有效的eventops数组元素, 赋值给base里面的evsel, 并且调用init回调函数创建后端处理需要的数据结构复制给evbase,如果创建的evbase不为空,就取满足这个条件的第一个eventops数组元素作为最终复制给base里面的evsel。 在linux默认第一个有效的eventops是epollops,使用的I/O复用方法是epoll。
/* Array of backends in order of preference. */ static const struct eventop *eventops[] = { #ifdef _EVENT_HAVE_EVENT_PORTS &evportops, #endif #ifdef _EVENT_HAVE_WORKING_KQUEUE &kqops, #endif #ifdef _EVENT_HAVE_EPOLL &epollops, #endif #ifdef _EVENT_HAVE_DEVPOLL &devpollops, #endif #ifdef _EVENT_HAVE_POLL &pollops, #endif #ifdef _EVENT_HAVE_SELECT &selectops, #endif #ifdef WIN32 &win32ops, #endif NULL };epollops 结构
const struct eventop epollops = { "epoll", epoll_init, epoll_nochangelist_add, epoll_nochangelist_del, epoll_dispatch, epoll_dealloc, 1, /* need reinit */ EV_FEATURE_ET|EV_FEATURE_O1, 0 };调用base->evbase = base->evsel->init(base); 实际就是调用epoll_init, base->evbase最终指向了epollop结构
struct epollop { struct epoll_event *events; int nevents; int epfd; }; #define INITIAL_NEVENT 32 #define MAX_NEVENT 4096 /* On Linux kernels at least up to 2.6.24.4, epoll can't handle timeout * values bigger than (LONG_MAX - 999ULL)/HZ. HZ in the wild can be * as big as 1000, and LONG_MAX can be as small as (1<<31)-1, so the * largest number of msec we can support here is 2147482. Let's * round that down by 47 seconds. */ #define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000) static void * epoll_init(struct event_base *base) { int epfd; struct epollop *epollop; /* Initialize the kernel queue. (The size field is ignored since * 2.6.8.) */ if ((epfd = epoll_create(32000)) == -1) { if (errno != ENOSYS) event_warn("epoll_create"); return (NULL); } evutil_make_socket_closeonexec(epfd); if (!(epollop = mm_calloc(1, sizeof(struct epollop)))) { close(epfd); return (NULL); } epollop->epfd = epfd; /* Initialize fields */ epollop->events = mm_calloc(INITIAL_NEVENT, sizeof(struct epoll_event)); if (epollop->events == NULL) { mm_free(epollop); close(epfd); return (NULL); } epollop->nevents = INITIAL_NEVENT; if ((base->flags & EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST) != 0 || ((base->flags & EVENT_BASE_FLAG_IGNORE_ENV) == 0 && evutil_getenv("EVENT_EPOLL_USE_CHANGELIST") != NULL)) base->evsel = &epollops_changelist; evsig_init(base); return (epollop); }至此,事件多路分发器已经创建完毕。 接下来就应该将event事件注册到事件多路分发器中 三、事件的添加 Libevent中通过 event_add来添加event, 实质上是调用event_add_internal
int event_add(struct event *ev, const struct timeval *tv) { int res; if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) { event_warnx("%s: event has no event_base set.", __func__); return -1; } EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock); res = event_add_internal(ev, tv, 0); EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock); return (res); } static inline int event_add_internal(struct event *ev, const struct timeval *tv, int tv_is_absolute) { ...... /*处理没有激活的READ WRITE SIGNAL 事件*/ if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { /*如果是I/O事件,添加I/O事件和I/O事件处理器的映射关系*/ if (ev->ev_events & (EV_READ|EV_WRITE)) res = evmap_io_add(base, ev->ev_fd, ev); else if (ev->ev_events & EV_SIGNAL) res = evmap_signal_add(base, (int)ev->ev_fd, ev);/*如果是信号事件,添加信号事件和信号事件处理器的映射关系*/ /*将事件处理器插入注册事件队列中*/ if (res != -1) event_queue_insert(base, ev, EVLIST_INSERTED); if (res == 1) { /* evmap says we need to notify the main thread. */ /*事件多路分发器中添加了新的事件,所以要通知主线程*/ notify = 1; res = 0; } } ...... } int evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev) { ...... /*一方面,将fd加入epoll中*/ if (evsel->add(base, ev->ev_fd, old, (ev->ev_events & EV_ET) | res, extra) == -1) return (-1); ...... /*另一方面,将event本身放入以fd为key的hash数组队列中 在epoll_wait返回fd后,可以找到对应的event,然后加入 active队列*/ ctx->nread = (ev_uint16_t) nread; ctx->nwrite = (ev_uint16_t) nwrite; TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next); ...... } int evmap_signal_add(struct event_base *base, int sig, struct event *ev) { const struct eventop *evsel = base->evsigsel; struct event_signal_map *map = &base->sigmap; struct evmap_signal *ctx = NULL; if (sig >= map->nentries) { if (evmap_make_space( map, sig, sizeof(struct evmap_signal *)) == -1) return (-1); } GET_SIGNAL_SLOT_AND_CTOR(ctx, map, sig, evmap_signal, evmap_signal_init, base->evsigsel->fdinfo_len); if (TAILQ_EMPTY(&ctx->events)) { if (evsel->add(base, ev->ev_fd, 0, EV_SIGNAL, NULL) == -1) return (-1); } TAILQ_INSERT_TAIL(&ctx->events, ev, ev_signal_next); return (1); }如果是fd事件,则通过evmap_io_add将fd加入到I/O复用函数中, 如果是信号事件,则通过evmap_signal_add将信号加入到I/O复用函数。 在evmap_io_add和evmap_signal_add分别调用evsel->add, 即epoll_nochangelist_add, epoll_nochangelist_add再调用epoll_apply_one_change,里面调用epoll_ctl将对应的fd或者信号加入到epoll中
static int epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd, short old, short events, void *p) { struct event_change ch; ch.fd = fd; ch.old_events = old; ch.read_change = ch.write_change = 0; if (events & EV_WRITE) ch.write_change = EV_CHANGE_ADD | (events & EV_ET); if (events & EV_READ) ch.read_change = EV_CHANGE_ADD | (events & EV_ET); /*注意这里传入的参数就是base->evbase, 这个就是后端需要使用的数据 :)*/ return epoll_apply_one_change(base, base->evbase, &ch); } static int epoll_apply_one_change(struct event_base *base, struct epollop *epollop, const struct event_change *ch) { struct epoll_event epev; int op, events = 0; if (1) { ...... memset(&epev, 0, sizeof(epev)); epev.data.fd = ch->fd; epev.events = events; if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) { ...... } ...... } return 0; }注意这里只是将事件号(fd或者信号值)和事件类型赋值给了epoll_event结构。意思就是说在epoll判断有数据的时候,返回的就是fd或者信号值和事件类型,通过fd或者信号值找到对应的hash数组链表,然后在冲突链中根据事件类型找到对应的event
四、事件处理函数 调用event_base_dispatch,等待事件多路分发器返回激活事件,然后调用对应事件的事件处理函数(*ev->ev_callback).
event_base_dispath调用函数event_base_loop,event_base_loop调用evsel->dispatch函数, linux 下实际就是调用epoll_dispatch, epoll_dispatch调用epoll_wait。epoll_dispatch返回激活的事件队列,然后调用event_process_active 函数依次处理就绪的信号事件和I/O事件
int event_base_loop(struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; while (!done) { ...... /*调用事件多路分发器的dispatch方法等待事件, 将就绪事件插入活动事件队列*/ res = evsel->dispatch(base, tv_p); ...... if (N_ACTIVE_CALLBACKS(base)) { /*调用event_process_active 函数依次处理就绪的信号事件和I/O事件*/ int n = event_process_active(base); if ((flags & EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base) == 0 && n != 0) done = 1; } else if (flags & EVLOOP_NONBLOCK) done = 1; } } static int epoll_dispatch(struct event_base *base, struct timeval *tv) { struct epollop *epollop = base->evbase; struct epoll_event *events = epollop->events; int i, res; long timeout = -1; ...... /*调用epoll_wait等待事件发生*/ res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); ...... /*事件发生后, 找出fd对应的event事件,并将该事件添加到active队列*/ for (i = 0; i < res; i++) { int what = events[i].events; short ev = 0; /*此处有点奇怪,为啥EPOLLHUP和EPOLLERR,认为是 同时可读可写*/ if (what & (EPOLLHUP|EPOLLERR)) { ev = EV_READ | EV_WRITE; } else { if (what & EPOLLIN) ev |= EV_READ; if (what & EPOLLOUT) ev |= EV_WRITE; } if (!ev) continue; evmap_io_active(base, events[i].data.fd, ev | EV_ET); } ...... return (0); } /*在epoll判断有数据的时候,返回的就是fd或者信号值和事件类型,通过fd或者信号值找到对应的hash数组链表,然后在冲突链中根据事件类型找到对应的event*/ evmap_io_active(struct event_base *base, evutil_socket_t fd, short events) { struct event_io_map *io = &base->io; struct evmap_io *ctx; struct event *ev; #ifndef EVMAP_USE_HT EVUTIL_ASSERT(fd < io->nentries); #endif GET_IO_SLOT(ctx, io, fd, evmap_io); EVUTIL_ASSERT(ctx); TAILQ_FOREACH(ev, &ctx->events, ev_io_next) { if (ev->ev_events & events) event_active_nolock(ev, ev->ev_events & events, 1); } } event_active_nolock将event插入到&base->activequeues队列中 void event_active_nolock(struct event *ev, int res, short ncalls) { struct event_base *base; event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p", ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback)); /* We get different kinds of events, add them together */ if (ev->ev_flags & EVLIST_ACTIVE) { ev->ev_res |= res; return; } base = ev->ev_base; EVENT_BASE_ASSERT_LOCKED(base); ev->ev_res = res; if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; if (ev->ev_events & EV_SIGNAL) { #ifndef _EVENT_DISABLE_THREAD_SUPPORT if (base->current_event == ev && !EVBASE_IN_THREAD(base)) { ++base->current_event_waiters; EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock); } #endif ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL; } /*根据优先级插入激活队列*/ event_queue_insert(base, ev, EVLIST_ACTIVE); if (EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base); } static int event_process_active(struct event_base *base) { /* Caller must hold th_base_lock */ struct event_list *activeq = NULL; int i, c = 0; /*循环处理base->activequeues队列*/ for (i = 0; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { base->event_running_priority = i; activeq = &base->activequeues[i]; /*调用event_process_active_single_queue处理激活事件*/ c = event_process_active_single_queue(base, activeq); if (c < 0) { base->event_running_priority = -1; return -1; } else if (c > 0) break; /* Processed a real event; do not * consider lower-priority events */ /* If we get here, all of the events we processed * were internal. Continue. */ } } event_process_deferred_callbacks(&base->defer_queue,&base->event_break); base->event_running_priority = -1; return c; } static int event_process_active_single_queue(struct event_base *base, struct event_list *activeq) { struct event *ev; int count = 0; EVUTIL_ASSERT(activeq != NULL); for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { if (ev->ev_events & EV_PERSIST) event_queue_remove(base, ev, EVLIST_ACTIVE); else event_del_internal(ev); if (!(ev->ev_flags & EVLIST_INTERNAL)) ++count; event_debug(( "event_process_active: event: %p, %s%scall %p", ev, ev->ev_res & EV_READ ? "EV_READ " : " ", ev->ev_res & EV_WRITE ? "EV_WRITE " : " ", ev->ev_callback)); #ifndef _EVENT_DISABLE_THREAD_SUPPORT base->current_event = ev; base->current_event_waiters = 0; #endif switch (ev->ev_closure) { case EV_CLOSURE_SIGNAL: event_signal_closure(base, ev); break; case EV_CLOSURE_PERSIST: event_persist_closure(base, ev); break; default: case EV_CLOSURE_NONE: EVBASE_RELEASE_LOCK(base, th_base_lock); /*调用事件处理函数*/ (*ev->ev_callback)( ev->ev_fd, ev->ev_res, ev->ev_arg); break; } EVBASE_ACQUIRE_LOCK(base, th_base_lock); #ifndef _EVENT_DISABLE_THREAD_SUPPORT base->current_event = NULL; if (base->current_event_waiters) { base->current_event_waiters = 0; EVTHREAD_COND_BROADCAST(base->current_event_cond); } #endif if (base->event_break) return -1; if (base->event_continue) break; } return count; } gwwu@hz-dev2.aerohive.com:~/test/libevent/my_libevent_test>more libevent_test_io.c #include <stdio.h> #include <stdlib.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/types.h> #include <unistd.h> #include <event.h> #include <string.h> #define SERVER_PORT 8888 #define BACKLOG 1024 #define MAX_BUFFER_LEN 1024 void do_process(evutil_socket_t fd, short event, void *arg) { char buff[MAX_BUFFER_LEN]; ssize_t len; memset(buff, 0, MAX_BUFFER_LEN); len = recv(fd, buff, MAX_BUFFER_LEN, 0); if (len > 0) { printf("%s\n", buff); send(fd, buff, len, 0); } return; } void do_accept(evutil_socket_t fd, short event, void *arg) { evutil_socket_t connfd; struct event_base *base; struct event *ev; struct sockaddr_in client; socklen_t client_addr_len; base = (struct event_base*)arg; printf("do accept for fd %d\n", fd); client_addr_len = sizeof(struct sockaddr_in); connfd = accept(fd, (struct sockaddr*)&client, &client_addr_len); if (connfd < 0) { printf("Error: Accept socket error!\n"); return; } ev = event_new(base, connfd, EV_READ | EV_PERSIST, do_process, (void*)base); event_add(ev, NULL); return; } int create_listen_fd_event(struct event_base *base) { evutil_socket_t fd; struct sockaddr_in server; struct event *event; fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { printf("Error: Create socket error!\n"); return -1; } bzero(&server, sizeof(struct sockaddr_in)); server.sin_family = AF_INET; server.sin_port = htons(SERVER_PORT); inet_pton(AF_INET, "127.0.0.1", &server.sin_addr.s_addr); if (bind(fd, (struct sockaddr*)&server, sizeof(server)) < 0) { printf("Error: Bind socket error!\n"); return -1; } if (listen(fd, BACKLOG) < 0) { printf("Error: listen socket error!\n"); return -1; } event = event_new(base, fd, EV_READ | EV_PERSIST, do_accept, (void*)base); event_add(event, NULL); return 0; } int main(int argc, char *argv[]) { struct event_base *base; base = event_base_new(); if (!base) { printf("Error: Create an event base error!\n"); return -1; } create_listen_fd_event(base); event_base_dispatch(base); return 0; }