redis源码学习4 ae.c 事件循环

xiaoxiao2021-02-28  39

基于一个定时器的事件循环,每次循环取已被触发的事件做处理 先列模块 ae.c event loop 主模块ae_evport.c 负责调用 evport 的接口ae_epoll.c 负责调用 epoll 的接口ae_kqueue.c 负责调用 kqueue 的接口ae_select.c 负责调用 select 的接口 注意 evport, epoll, kqueue, select 按是否支持而选择使用最前者,性能递减一个客户端一个文件描述符

主要函数

`aeCreateFileEvent()` 创建文件事件`aeApiPoll()` 取命中的事件`aeProcessEvents()` 处理事件,先文件事件,后时间事件`processTimeEvents()` 处理定时事件

写了一个精简版的服务器,框架就是redis抄下来的,直接看server.c 里调用的 ae*() 函数就明白调用流程了

server.c 代码

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netinet/tcp.h> #include <fcntl.h> #include "ae.h" #define IPADDR "127.0.0.1" #define PORT 8787 #define MAXLINE 1024 #define LISTEN_BACKLOG 5 #define CLIENT_SIZE 2 #define AE_SIZE (CLIENT_SIZE + 32) #define UNUSED(V) ((void) V) void readQueryFromClient(struct aeEventLoop *el, int fd, void *privdata, int mask); typedef struct aClient { int fd; } aClient; typedef struct aServer { int ipfd; aeEventLoop *el; aClient clients[CLIENT_SIZE]; long long stat_numconnections; int hz; } aServer; static aServer server; /*===========================================================================*/ int listenToPort(const char* ip,int port) { int fd; struct sockaddr_in servaddr; fd = socket(AF_INET, SOCK_STREAM,0); if (fd == -1) { fprintf(stderr, "create socket fail,erron:%d,reason:%s\n", errno, strerror(errno)); return -1; } server.ipfd = fd; /*一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。*/ int reuse = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) { return -1; } /* 设置非阻塞, accept() 时需要 */ if (fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { fprintf(stderr,"set non block error\n"); return -1; } bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); inet_pton(AF_INET,ip,&servaddr.sin_addr); if (bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) { perror("bind error: "); close(fd); return -1; } if (listen(fd,LISTEN_BACKLOG) == -1) { perror("listen error: "); close(fd); return -1; } return 1; } // 定时回调 int serverCron(struct aeEventLoop *el, long long id, void *clientData) { UNUSED(el); UNUSED(id); UNUSED(clientData); return 1000/server.hz; } // tcp 接收器 void acceptTcpHandler(struct aeEventLoop *el, int fd, void *privdata, int mask) { int max = 1000, tmpval; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { struct sockaddr_in cliaddr; socklen_t cliaddrlen; cliaddrlen = sizeof(cliaddr); int clifd; clifd = accept(fd,(struct sockaddr*)&cliaddr,&cliaddrlen); if (clifd == -1) { if (errno == EINTR) continue; if (errno != EWOULDBLOCK) fprintf(stderr, "accept fail,error:%s\n", strerror(errno)); return; } tmpval = 1; if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &tmpval, sizeof(tmpval)) == -1) { return; } fprintf(stdout, "accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port); /* 将新客户端添加到数组中 */ int i = 0; for (i = 0; i < CLIENT_SIZE; i++) { if (server.clients[i].fd < 0) { server.clients[i].fd = clifd; if (aeCreateFileEvent(server.el, clifd, AE_READABLE, readQueryFromClient, &server.clients[i]) == AE_ERR) { fprintf(stdout, "client client error: %s:%d\n", inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port); server.clients[i].fd = -1; close(clifd); return; } server.stat_numconnections++; break; } } /* 加不进新的客户端 */ if (i == CLIENT_SIZE) { fprintf(stderr,"too many clients.\n"); close(clifd); return; } } } // 客户端消息处理器 void readQueryFromClient(struct aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); int n; char buf[MAXLINE] = {0}; aClient *client = (aClient *)privdata; //接收客户端发送的信息 n = (int)read(fd, buf, MAXLINE); if (n <= 0) { printf("recv %d\n", n); /*n==0表示读取完成,客户端关闭套接字*/ aeDeleteFileEvent(el, fd, AE_READABLE); aeDeleteFileEvent(el, fd, AE_WRITABLE); close(fd); client->fd = -1; server.stat_numconnections--; return; } printf("recv buf is:%s, len:%d\n", buf, (int)strlen(buf)); write(fd, buf, strlen(buf) +1); } static int server_init() { int i; for (i = 0; i < CLIENT_SIZE; ++i) { server.clients[i].fd = -1; } server.stat_numconnections = 0; server.hz = 10; server.el = aeCreateEventLoop(AE_SIZE); if (server.el == NULL) { return -1; } if (listenToPort(IPADDR, PORT) < 0) return -1; if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { return -1; } if (aeCreateFileEvent(server.el, server.ipfd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) return -1; return 0; } static void server_uninit() { if (server.el) aeDeleteEventLoop(server.el); } int main(int argc,char *argv[]) { /* 初始化服务端 */ if (server_init() < 0) { server_uninit(); return -1; } /* 主循环开始 */ aeMain(server.el); server_uninit(); return 0; } void *zmalloc(size_t size) { return malloc(size); } void zfree(void *ptr) { free(ptr); } void *zrealloc(void *ptr, size_t size) { return realloc(ptr, size); }https://github.com/antirez/redis/blob/unstable/src/ae.h https://github.com/antirez/redis/blob/unstable/src/ae.c 直接用原版的 ae.c,然后在前面加上上面几个函数

client.c 代码

#include <netinet/in.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/select.h> #include <arpa/inet.h> #include <unistd.h> #include <errno.h> #define IPADDR "127.0.0.1" #define PORT 8787 #define MAXLINE 1024 static void handle_recv_msg(int sockfd, char *buf) { printf("client recv msg is:%s\n", buf); sleep(5); write(sockfd, buf, strlen(buf) +1); } static void handle_connection(int sockfd) { char recvline[MAXLINE]; int maxfdp; fd_set readfds; int n; struct timeval tv; int retval = 0; while (1) { FD_ZERO(&readfds); FD_SET(sockfd,&readfds); maxfdp = sockfd; tv.tv_sec = 5; tv.tv_usec = 0; retval = select(maxfdp+1,&readfds,NULL,NULL,&tv); if (retval == -1) { return ; } if (retval == 0) { printf("client timeout.\n"); continue; } if (FD_ISSET(sockfd, &readfds)) { n = (int)read(sockfd,recvline,MAXLINE); if (n <= 0) { fprintf(stderr,"client: server is closed.\n"); close(sockfd); FD_CLR(sockfd,&readfds); return; } handle_recv_msg(sockfd, recvline); } } } int main(int argc,char *argv[]) { int sockfd; struct sockaddr_in servaddr; sockfd = socket(AF_INET,SOCK_STREAM,0); bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(PORT); inet_pton(AF_INET,IPADDR,&servaddr.sin_addr); int retval = 0; retval = connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr)); if (retval < 0) { fprintf(stderr, "connect fail,error:%s\n", strerror(errno)); return -1; } printf("client send to server .\n"); char hello[30] = {0}; sprintf(hello, "hello server by %d", sockfd); write(sockfd, hello, strlen(hello) + 1); handle_connection(sockfd); return 0; } Makefile

cc = gcc CFLAGS=-g -Wall -std=c99 -fPIC objects = server.o ae.o client.o OUTPUT=server client all: $(OUTPUT) echo "ok" server: server.o ae.o $(cc) $(CFLAGS) -o $@ $^ client: client.o $(cc) $(CFLAGS) -o $@ $^ server.o: client.o: ae.o: .PHONY: compile clean compile: $(cc) -c $(objects) clean: -rm $(OUTPUT) $(objects)

另附本机运行环境是 macOS High Sierra 10.13.2

转载请注明原文地址: https://www.6miu.com/read-2150106.html

最新回复(0)