参考了:Skynet服务器框架(六) Socket服务源码剖析和应用(linshuhe1的专栏)以及Skynet 源码学习 -- Socket Server 和 Skynet_socket(cchd0001的专栏)
用了Skynet下的Socket接口后,越发想看看它的底层实现。和我之前想的一样,Skynet底层的网络并发在Linux下使用的正是 epoll。
EPOLL封装层:
./skynet-src/socket_poll.h 给了我答案:
#ifndef socket_poll_h #define socket_poll_h #include <stdbool.h> typedef int poll_fd; struct event { void * s; bool read; bool write; }; static bool sp_invalid(poll_fd fd); static poll_fd sp_create(); static void sp_release(poll_fd fd); static int sp_add(poll_fd fd, int sock, void *ud); static void sp_del(poll_fd fd, int sock); static void sp_write(poll_fd, int sock, void *ud, bool enable); static int sp_wait(poll_fd, struct event *e, int max); static void sp_nonblocking(int sock); #ifdef __linux__ #include "socket_epoll.h" #endif #if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) #include "socket_kqueue.h" #endif #endif
可以发现Skynet在Linux下使用了 epoll 来管理网络并发,在FreeBSD等平台下使用了 kqueue。该头文件定义了一个结构体 event ,后面可以知道,该结构体就是对 epoll 下的 epoll_event 做了简易封装,抛弃了 epoll_event 下EPOLLPRI、EPOLLERR等不常用事件,仅仅保留了EPOLLIN(读) 、EPOLLOUT(写)两个事件,分别用 read 和 write 两个 bool 值来简单标记。以上的函数实现在 socket_epoll.h 和 socket_kqueue.h 里。
总体来说,Skynet在 epoll 的基础上封装了五层。本文先介绍最底下两层,下一篇介绍上三层。
首先是 ./skynet-src/socket_epoll.h,这一层是对 epoll 的简单封装。
// 用于判断产生的 epoll fd 是否有效 static bool sp_invalid(int efd) { return efd == -1; } // 用于产生一个 epoll fd,1024是用来建议内核监听的数目,自从 linux 2.6.8 之后,该参数是被忽略的,即可以填大于0的任意值。 static int sp_create() { return epoll_create(1024); } // 释放 epoll fd static void sp_release(int efd) { close(efd); } /* * 为 epoll 添加一个监听的文件描述符,仅监控读事件 * fd : sp_create() 返回值 * sock : 待监听的文件描述符 * ud : 自己使用的指针地址 * : 返回0表示添加成功, -1表示失败 */ static int sp_add(int efd, int sock, void *ud) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = ud; if (epoll_ctl(efd, EPOLL_CTL_ADD, sock, &ev) == -1) { return 1; } return 0; } /* * 删除 epoll 中监听的 fd * fd : sp_create()创建的fd * sock : 待删除的fd */ static void sp_del(int efd, int sock) { epoll_ctl(efd, EPOLL_CTL_DEL, sock , NULL); } /* * 修改 epoll 中已有 fd 的监听事件 * efd : epoll fd * sock : 待修改的fd * ud : 用户自定义数据指针 * enable: true表示开启写监听, false表示还是读监听 */ static void sp_write(int efd, int sock, void *ud, bool enable) { struct epoll_event ev; ev.events = EPOLLIN | (enable ? EPOLLOUT : 0); ev.data.ptr = ud; epoll_ctl(efd, EPOLL_CTL_MOD, sock, &ev); } /* * 轮询fd事件 * efd : sp_create()创建的fd * e : 一段struct event内存的首地址 * max : e内存能够使用的最大值 * : 返回监听到事件的fd数量,write与read分别对应写和读事件flag,值为true时表示该事件发生 */ static int sp_wait(int efd, struct event *e, int max) { struct epoll_event ev[max]; int n = epoll_wait(efd , ev, max, -1); int i; for (i=0;i<n;i++) { e[i].s = ev[i].data.ptr; unsigned flag = ev[i].events; e[i].write = (flag & EPOLLOUT) != 0; e[i].read = (flag & EPOLLIN) != 0; } return n; } /* * 将fd设置为非阻塞 */ static void sp_nonblocking(int fd) { int flag = fcntl(fd, F_GETFL, 0); if ( -1 == flag ) { return; } fcntl(fd, F_SETFL, flag | O_NONBLOCK); }
接着是 ./skynet-src/socket_server.c
这一层对上一层的封装较为复杂。
socket_server 封装:
先看几个重要的结构体:
struct write_buffer { struct write_buffer * next; void *buffer; char *ptr; int sz; bool userobject; uint8_t udp_address[UDP_ADDRESS_SIZE]; }; #define SIZEOF_TCPBUFFER (offsetof(struct write_buffer, udp_address[0])) #define SIZEOF_UDPBUFFER (sizeof(struct write_buffer)) //写缓冲队列 struct wb_list { struct write_buffer * head; //写缓冲区的头指针 struct write_buffer * tail; //写缓冲区的尾指针 }; struct socket { uintptr_t opaque; //所属服务在skynet中对应的handle struct wb_list high;//高优先级写队列 struct wb_list low; //低优先级写队列 int64_t wb_size; //写缓存尚未发送的数据大小 int fd; int id; //用于索引socket_server里的slot数组 uint16_t protocol; //使用的协议类型(TCP/UDP) uint16_t type; //scoket的类型或状态(读、写、监听等) union { int size; //读缓存预估需要的大小 uint8_t udp_address[UDP_ADDRESS_SIZE]; } p; }; struct socket_server { int recvctrl_fd; // pipe读端 int sendctrl_fd; // pipe写端 int checkctrl; poll_fd event_fd; // epoll/kqueue的fd int alloc_id; int event_n; // epoll_wait 返回的事件数 int event_index; // 当前处理的事件序号 struct socket_object_interface soi; struct event ev[MAX_EVENT]; // epoll_wait 返回的事件集合 struct socket slot[MAX_SOCKET]; // 每个socket_server可以包含多个socket,slot存储这些socket char buffer[MAX_INFO]; uint8_t udpbuffer[MAX_UDP_PACKAGE]; fd_set rfds; // select监测的fd集 }; struct request_open { int id; // 用于在socket_server的slot找到对应的socket int port; uintptr_t opaque; char host[1]; }; struct request_send { int id; int sz; char * buffer; }; struct request_send_udp { struct request_send send; uint8_t address[UDP_ADDRESS_SIZE]; }; struct request_setudp { int id; uint8_t address[UDP_ADDRESS_SIZE]; }; struct request_close { int id; uintptr_t opaque; }; struct request_listen { int id; int fd; uintptr_t opaque; char host[1]; }; struct request_bind { int id; int fd; uintptr_t opaque; }; struct request_start { int id; uintptr_t opaque; }; struct request_setopt { int id; int what; int value; }; struct request_udp { int id; int fd; int family; uintptr_t opaque; }; /* The first byte is TYPE S Start socket B Bind socket L Listen socket K Close socket O Connect to (Open) X Exit D Send package (high) P Send package (low) A Send UDP package T Set opt U Create UDP socket C set udp address */ struct request_package { uint8_t header[8]; // 6 bytes dummy union { char buffer[256]; struct request_open open; struct request_send send; struct request_send_udp send_udp; struct request_close close; struct request_listen listen; struct request_bind bind; struct request_start start; struct request_setopt setopt; struct request_udp udp; struct request_setudp set_udp; } u; uint8_t dummy[256]; }; struct socket_message { int id; uintptr_t opaque; // 在skynet中对应一个Actor实体的handle句柄 int ud; // 对于accept连接来说, ud是新连接的id;对于数据(data)来说, ud是数据的大小 char * data; };此外,还有几个宏:
// 宏定义socket_server_poll()返回的socket消息类型 #define SOCKET_DATA 0 //数据data到来 #define SOCKET_CLOSE 1 //关闭连接 #define SOCKET_OPEN 2 //多处用到,参见代码 #define SOCKET_ACCEPT 3 //被动连接建立 #define SOCKET_ERROR 4 //错误 #define SOCKET_EXIT 5 //退出socket #define SOCKET_UDP 6 //udp通信 // socket状态 #define SOCKET_TYPE_INVALID 0 // 无效的套接字 #define SOCKET_TYPE_RESERVE 1 // 预留,即将被使用 #define SOCKET_TYPE_PLISTEN 2 // 监听套接字,尚未加入 epoll 管理 #define SOCKET_TYPE_LISTEN 3 // 监听套接字,已加入 epoll 管理 #define SOCKET_TYPE_CONNECTING 4 // 尝试连接中的套接字 #define SOCKET_TYPE_CONNECTED 5 // 已连接的套接字(主动或被动) #define SOCKET_TYPE_HALFCLOSE 6 // 上层已发起关闭套接字请求,但发送缓冲区尚未发送完毕,未调用close #define SOCKET_TYPE_PACCEPT 7 // accept()后的套接字,但尚未加入 epoll 管理 #define SOCKET_TYPE_BIND 8 // 已绑定其他类型描述符,如 stdin, stdout先来看该层的初始化函数:
struct socket_server * socket_server_create() { int i; int fd[2]; poll_fd efd = sp_create(); // 创建一个监听 epoll,非常重要! if (sp_invalid(efd)) { fprintf(stderr, "socket-server: create event pool failed.\n"); return NULL; } if (pipe(fd)) { // 创建 pipe sp_release(efd); fprintf(stderr, "socket-server: create socket pair failed.\n"); return NULL; } if (sp_add(efd, fd[0], NULL)) { // 将 pipe 的读端放入 epoll 中监听,注意 pipe 消息是没有 socket* 变量的,为NULL // add recvctrl_fd to event poll fprintf(stderr, "socket-server: can't add server fd to event pool.\n"); close(fd[0]); close(fd[1]); sp_release(efd); return NULL; } struct socket_server *ss = MALLOC(sizeof(*ss)); // 创建 socket_server 实例,然后一系列初始化 ss->event_fd = efd; ss->recvctrl_fd = fd[0]; ss->sendctrl_fd = fd[1]; ss->checkctrl = 1; for (i=0;i<MAX_SOCKET;i++) { struct socket *s = &ss->slot[i]; s->type = SOCKET_TYPE_INVALID; // 所有socket的类型初始化为SOCKET_TYPE_INVALID clear_wb_list(&s->high); clear_wb_list(&s->low); } ss->alloc_id = 0; ss->event_n = 0; ss->event_index = 0; memset(&ss->soi, 0, sizeof(ss->soi)); FD_ZERO(&ss->rfds); assert(ss->recvctrl_fd < FD_SETSIZE); return ss; }接着是该层的核心代码,该函数作为中枢,掌管着内外数据流动。初始化的 epoll 和 pipe 都在该函数中扮演重要角色。以该代码为核心,我们可以画出这样一幅图:
参照下面的源码,我们知道 socket_server_poll 作为任务处理分发器,处理着一个 socket_server 里所有的事件。除了一些控制指令由 pipe 传输,还有一些其他事件由 epoll 监听。不论是控制指令还是 epoll 监听到的其他读写事件,都由 socket_server_poll 分发给相应的函数去处理。这里的其他描述符事件主要是指socket连接的事件,比如TCP异步连接成功触发的事件,socket的读写事件等等。
// return type int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { for (;;) { if (ss->checkctrl) { // 每次处理完epoll的事件后会设置checkctrl=1 if (has_cmd(ss)) { // 检测管道读端是否可读 //printf("has_cmd = 1\n"); int type = ctrl_cmd(ss, result); // 处理控制命令 if (type != -1) { clear_closed_event(ss, result, type); return type; } else continue; } else { //printf("has_cmd = 0\n"); ss->checkctrl = 0; // pipe 里没有数据,置为0,此时如果有socket连接到来,接着可从sp_wait()获取事件, } // 当所有事件处理完毕后,会重新置为1,然后再次调用非阻塞select接受pipe事件。处理 // 一批事件的过程是连续的,不会被pipe事件打断,直到处理完。 } printf("event: %d %d\n", ss->event_index, ss->event_n); if (ss->event_index == ss->event_n) { // 相等说明事件处理完毕,可以调用 sp_wait() 接收新事件 ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT); // epoll 监听很多东西,如 pipe 读端, 标准输出1,listen_fd, connect_fd 等等 printf("now we get: %d\n", ss->event_n); ss->checkctrl = 1; if (more) { *more = 0; } ss->event_index = 0; if (ss->event_n <= 0) { ss->event_n = 0; return -1; } } struct event *e = &ss->ev[ss->event_index++]; struct socket *s = e->s; if (s == NULL) { // s = NULL 说明是 pipe 消息,直接跳过。 待本批所有事件处理完毕后再交由 has_cmd 和 ctrl_cmd 处理 // dispatch pipe message at beginning continue; } printf("get fd: %d, opa: %d, type: %d, read: %d, write: %d\n", s->fd, s->opaque, s->type, e->read, e->write); switch (s->type) { // 处理 epoll 事件 case SOCKET_TYPE_CONNECTING: // 由于使用了异步tcp连接,连接成功后,客户端connect_fd可写 return report_connect(ss, s, result); case SOCKET_TYPE_LISTEN: if (report_accept(ss, s, result)) { // 由于使用了异步tcp连接,连接成功后,服务器listen_fd可读。 return SOCKET_ACCEPT; } break; case SOCKET_TYPE_INVALID: fprintf(stderr, "socket-server: invalid socket\n"); break; default: if (e->read) { int type; if (s->protocol == PROTOCOL_TCP) { type = forward_message_tcp(ss, s, result); } else { type = forward_message_udp(ss, s, result); if (type == SOCKET_UDP) { // try read again --ss->event_index; return SOCKET_UDP; } } if (e->write) { // Try to dispatch write message next step if write flag set. e->read = false; --ss->event_index; } if (type == -1) break; clear_closed_event(ss, result, type); return type; } if (e->write) { int type = send_buffer(ss, s, result); if (type == -1) break; clear_closed_event(ss, result, type); return type; } break; } } }
除此之外,还有两个函数协助:
/* * 该函数使用非阻塞 select 来监测 pipe 读端,当 pipe 中写入数据后,pipe 将变为可读,返回 1 表明可读,0 为不可读。 */ static int has_cmd(struct socket_server *ss) { struct timeval tv = {0,0}; int retval; FD_SET(ss->recvctrl_fd, &ss->rfds); retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv); if (retval == 1) { return 1; } return 0; } /* * 该函数从 pipe 中读取数据,首先读2字节的头,取出数据类型和大小后,读取相应大小的数据后按消息类型发给相应的处理函数。result由socket_server_poll * 传入。依据不同的消息类型,交由 start_socket、bind_socket 等函数填写。 */ // return type static int ctrl_cmd(struct socket_server *ss, struct socket_message *result) { int fd = ss->recvctrl_fd; // the length of message is one byte, so 256+8 buffer size is enough. uint8_t buffer[256]; uint8_t header[2]; block_readpipe(fd, header, sizeof(header)); int type = header[0]; int len = header[1]; block_readpipe(fd, buffer, len); // ctrl command only exist in local fd, so don't worry about endian. switch (type) { case 'S': return start_socket(ss,(struct request_start *)buffer, result); case 'B': return bind_socket(ss,(struct request_bind *)buffer, result); case 'L': return listen_socket(ss,(struct request_listen *)buffer, result); case 'K': return close_socket(ss,(struct request_close *)buffer, result); case 'O': return open_socket(ss, (struct request_open *)buffer, result); case 'X': result->opaque = 0; result->id = 0; result->ud = 0; result->data = NULL; return SOCKET_EXIT; case 'D': return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_HIGH, NULL); case 'P': return send_socket(ss, (struct request_send *)buffer, result, PRIORITY_LOW, NULL); case 'A': { struct request_send_udp * rsu = (struct request_send_udp *)buffer; return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address); } case 'C': return set_udp_address(ss, (struct request_setudp *)buffer, result); case 'T': setopt_socket(ss, (struct request_setopt *)buffer); return -1; case 'U': add_udp_socket(ss, (struct request_udp *)buffer); return -1; default: fprintf(stderr, "socket-server: Unknown ctrl %c.\n",type); return -1; }; return -1; }该层的核心接口:
假如要在C语言中直接使用socket_server,基本上是用这些封装好的接口基本上也就足够了:
// 由于所有的接口实现都写在头文件里面,全部声明为static。 //创建一个socket_server struct socket_server * socket_server_create(); //释放一个socket_server的资源占用 void socket_server_release(struct socket_server *); /* * 封装了的epoll或kqueue,用来获取socket的网络事件或消息 * (通常放在循环体中持续监听网络消息) * socket_server : socket_server_create() 返回的socket_server实例 * result : 结果数据存放的地址指针 * : 返回消息类型,对应于宏定义中的SOCKET_DATA的类型 */ int socket_server_poll(struct socket_server *, struct socket_message *result, int *more); //退出socket_server void socket_server_exit(struct socket_server *); /* * 关闭socket_server * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * id : socket_server_listen() 返回的id */ void socket_server_close(struct socket_server *, uintptr_t opaque, int id); /* * 停止socket * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * id : socket句柄 */ void socket_server_shutdown(struct socket_server *, uintptr_t opaque, int id); /* * 将该socket放入epoll中监听(启动之前要先通过socket_server_listen()开启TCP的socket(),bind(),listen()步骤) * 或将服务器 report_accept() 后的socket放入epoll中监听。总之,对于socket的fd,想要收发数据,都得先调用 socket_server_start() * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * id : socket_server_listen() 返回的id */ void socket_server_start(struct socket_server *, uintptr_t opaque, int id); /* * 发送数据 * socket_server : socket_server_create() 返回的socket_server实例 * buffer : 要发送的数据 * sz : 数据的大小 * id : socket_server_listen() 返回的id * : 假如返回-1表示error */ int64_t socket_server_send(struct socket_server *, int id, const void * buffer, int sz); void socket_server_send_lowpriority(struct socket_server *, int id, const void * buffer, int sz); /* * 开启TCP监听,执行了socket(),bind(),listen() 步骤 * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * addr : ip地址 * port : 端口号 * : 返回一个id作为操作此端口监听的句柄 */ int socket_server_listen(struct socket_server *, uintptr_t opaque, const char * addr, int port, int backlog); /* * 以非阻塞的方式连接服务器 * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * addr : ip地址 * port : 端口号 * : 返回一个id作为操作此端口监听的句柄 */ int socket_server_connect(struct socket_server *, uintptr_t opaque, const char * addr, int port); /* * 并不对应bind函数,而是将stdin、stout这类IO加入到epoll中管理 * socket_server : socket_server_create() 返回的socket_server实例 * opaque : skynet中服务handle的句柄 * fd : socket的文本描述 */ int socket_server_bind(struct socket_server *, uintptr_t opaque, int fd); // for tcp void socket_server_nodelay(struct socket_server *, int id); /* * 创建一个udp socket监听,并绑定skynet服务的handle,udp不需要像tcp那样要调用socket_server_start后才能接收消息 * 如果port != 0, 绑定socket,如果addr == NULL, 绑定 ipv4 0.0.0.0。如果想要使用ipv6,地址使用“::”,端口中port设为0 */ int socket_server_udp(struct socket_server *, uintptr_t opaque, const char * addr, int port); // 设置默认的端口地址,返回0表示成功 int socket_server_udp_connect(struct socket_server *, int id, const char * addr, int port); /* * 假如 socket_udp_address 是空的, 使用最后最后调用 socket_server_udp_connect 时传入的address代替 * 也可以使用 socket_server_send 来发送udp数据 */ int64_t socket_server_udp_send(struct socket_server *, int id, const struct socket_udp_address *, const void *buffer, int sz); // 获取传入消息的IP地址 address, 传入的 socket_message * 必须是SOCKET_UDP类型 const struct socket_udp_address * socket_server_udp_address(struct socket_server *, struct socket_message *, int *addrsz); // if you send package sz == -1, use soi. void socket_server_userobject(struct socket_server *, struct socket_object_interface *soi); 以上函数有的会调用 reserve_id() 来获取一个 socket_server 中的 slot[] 中的一个 socket,该 socket 会存储很多相关信息。然而 reserve_id() 仅仅是初始化 socket 结构体,尚有很多其他变量并未被赋值。这些函数并不是真正的执行者,它们会将任务消息写入 pipe,然后由 socket_server_poll() 读取 pipe 再将任务消息交给真正的执行者。与前面 reserve_id() 对应的,在这些真正的执行者中有的会调用new_fd() 函数,进一步对之前 reserve_id() 后的 socket 进一步赋值,并按需将 fd 加入 epoll 的监管下。正如之前分析的 socket_server_poll() 函数。这些消息以字符区分:
常用的指令有:
S Start socket 启动一个Socket B Bind socket 绑定一个Socket L Listen socket 监听一个Socket K Close socket 关闭一个Socket O Connect to (Open) 连接一个Socket X Exit 退出一个Socket D Send package (high) 发送数据 P Send package (low) (不常用,也用于发送数据) A Send UDP package T Set opt U Create UDP socket C set udp address
以上函数的执行伴随着自定义 socket 结构体里 type 改变,关系剖析如下:
下面尝试用用这些API:
以云风socket-server为例,里面有个test.c,为了便于分析,我们稍作修改:
#include "socket_server.h" #include <sys/socket.h> #include <pthread.h> #include <sys/select.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <signal.h> int c; static void * _poll(void * ud) { struct socket_server *ss = ud; struct socket_message result; for (;;) { int type = socket_server_poll(ss, &result, NULL); // DO NOT use any ctrl command (socket_server_close , etc. ) in this thread. switch (type) { case SOCKET_EXIT: return NULL; case SOCKET_DATA: printf("message(%lu) [id=%d] size=%d data= %s\n",result.opaque,result.id, result.ud, result.data); free(result.data); break; case SOCKET_CLOSE: printf("close(%lu) [id=%d]\n",result.opaque,result.id); break; case SOCKET_OPEN: printf("open(%lu) [id=%d] %s\n",result.opaque,result.id,result.data); break; case SOCKET_ERROR: printf("error(%lu) [id=%d]\n",result.opaque,result.id); break; case SOCKET_ACCEPT: printf("accept(%lu) [id=%d %s] from [%d]\n",result.opaque, result.ud, result.data, result.id); break; } } } static void test(struct socket_server *ss) { pthread_t pid; pthread_create(&pid, NULL, _poll, ss); /* int c = socket_server_connect(ss,100,"127.0.0.1",80); printf("connecting %d\n",c); */ int l = socket_server_listen(ss,200,"127.0.0.1",8888,32); // 使用 127.0.0.1:8888 开启TCP监听 printf("listening %d\n",l); socket_server_start(ss,201,l); // 让epoll监听该TCP int b = socket_server_bind(ss,300,1); // 让epoll监听标准输出 printf("binding stdin %d\n",b); int i; c = socket_server_connect(ss, 400, "127.0.0.1", 8888); // 异步连接 127.0.0.1:8888 //sleep(2); char *data = (char *) malloc(sizeof(char) * 20); memcpy(data, "hello world", 20); socket_server_send(ss, c, data, strlen(data)); // 发送数据 /* for (i=0;i<100;i++) { socket_server_connect(ss, 400+i, "127.0.0.1", 8888); } socket_server_exit(ss); */ pthread_join(pid, NULL); } int main() { struct sigaction sa; sa.sa_handler = SIG_IGN; sigaction(SIGPIPE, &sa, 0); struct socket_server * ss = socket_server_create(); test(ss); socket_server_release(ss); return 0; } 编译后运行,发现数据并未被服务器接收,排查半天,发现是 report_accept() 函数并未将 accept() 新创建的 fd 纳入 epoll 监听。这里为了便于分析,我们暂时修改成:将所有新 accept() 得到的 fd 都纳入 epoll 监听。修改仅一处,在 report_accept() 函数中: struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
修改为:
struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, true); 重新编译,这回可以正常接收。输出如下(包含部分自己添加的调试信息o(╯□╰)o): checkctrl = 1 has_cmd = 1 LLLLLLLLLLLLL checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 0 0 listening 1 binding stdin 2 now we get: 1 checkctrl = 1 has_cmd = 1 SSSSSSSSSSSS s->opaque old: 200, new: 201 oooooooppppppppp3333333 open(201) [id=1] start checkctrl = 1 has_cmd = 1 BBBBBBBBBBBB oooooooppppppppp222222 open(300) [id=2] binding checkctrl = 1 has_cmd = 1 OOOOOOOOOOOOOO checkctrl = 1 has_cmd = 1 DDDDDDDDDDDDDD s->type: 4, id = 3 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 2 get fd: 6, opa: 201, type: 3, read: 1, write: 0 client_fd: 8 accept(201) [id=4 127.0.0.1:58840] from [1] checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 2 get fd: 7, opa: 400, type: 4, read: 0, write: 1 Error: 0 PTR: 1207962912, 0AAAAAAAAAAAAAAAAAAAAA oooooooppppppppp5555 open(400) [id=3] 127.0.0.1 checkctrl = 0 event: 2 2 now we get: 1 get fd: 7, opa: 400, type: 5, read: 0, write: 1 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 1 get fd: 8, opa: 201, type: 7, read: 1, write: 0 message(201) [id=4] size=11 data= hello world // 正常接收到数据 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 skynet是异步读写,这让用户在调用API时更加容易,不用考虑同步问题,同步问题由skynet内部解决。前面我们接触了异步connect,现在我们简单看看它是如何异步写的。正如 test.c 里的代码,我们在调用 socket_server_send() 时并没有考虑之前的 socket_server_connect() 是否完成。可见异步操作的便捷。socket_server_send() 将 send 消息写入 pipe,然后由 send_socket() 来真正处理。send_socket()代码如下:
/* When send a package , we can assign the priority : PRIORITY_HIGH or PRIORITY_LOW If socket buffer is empty, write to fd directly. If write a part, append the rest part to high list. (Even priority is PRIORITY_LOW) Else append package to high (PRIORITY_HIGH) or low (PRIORITY_LOW) list. */ static int send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) { int id = request->id; struct socket * s = &ss->slot[HASH_ID(id)]; struct send_object so; send_object_init(ss, &so, request->buffer, request->sz); if (s->type == SOCKET_TYPE_INVALID || s->id != id || s->type == SOCKET_TYPE_HALFCLOSE || s->type == SOCKET_TYPE_PACCEPT) { so.free_func(request->buffer); return -1; } assert(s->type != SOCKET_TYPE_PLISTEN && s->type != SOCKET_TYPE_LISTEN); if (send_buffer_empty(s) && s->type == SOCKET_TYPE_CONNECTED) { printf("s->type: %d, id = %d\n", s->type, id); if (s->protocol == PROTOCOL_TCP) { int n = write(s->fd, so.buffer, so.sz); if (n<0) { switch(errno) { case EINTR: case EAGAIN: n = 0; break; default: fprintf(stderr, "socket-server: write to %d (fd=%d) error :%s.\n",id,s->fd,strerror(errno)); force_close(ss,s,result); return SOCKET_CLOSE; } } if (n == so.sz) { so.free_func(request->buffer); return -1; } append_sendbuffer(ss, s, request, n); // add to high priority list, even priority == PRIORITY_LOW } else { // udp if (udp_address == NULL) { udp_address = s->p.udp_address; } union sockaddr_all sa; socklen_t sasz = udp_socket_address(s, udp_address, &sa); int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz); if (n != so.sz) { append_sendbuffer_udp(ss,s,priority,request,udp_address); } else { so.free_func(request->buffer); return -1; } } sp_write(ss->event_fd, s->fd, s, true); } else { printf("s->type: %d, id = %d\n", s->type, id); if (s->protocol == PROTOCOL_TCP) { if (priority == PRIORITY_LOW) { append_sendbuffer_low(ss, s, request); } else { append_sendbuffer(ss, s, request, 0); } } else { if (udp_address == NULL) { udp_address = s->p.udp_address; } append_sendbuffer_udp(ss,s,priority,request,udp_address); } } return -1; }
send_socket()不经可以发送udp数据,也可发送tcp数据。send_socket() 首先判断当前 socket 的状态,如果连接尚未建立,如出于 CONNECTING,那么将会调用 append 系列函数将数据暂时保存起来,待连接建立后再发送。特别需要格外小心的是一系列sp_write(ss->event_fd, s->fd, s, false) 及 sp_write(ss->event_fd, s->fd, s, true)的用法。对于 epoll 里的 EPOLLOUT 事件,当发送缓冲有空间,可以被写入数据时,该事件会一直被触发(有很大几率一直触发)。sp_write() 函数就是用于管理是否监听 EPOLLOUT 事件的钥匙。比如我们通过源码可以发现,如果在连接建立之前我们就调用了 send 函数,epoll 就会持续监听 EPOLLOUT 事件,直到被暂存的发送数据全部被 write() 成功(由 socket_server_poll 里的 send_buffer 发起),才会调用 sp_write(ss->event_fd, s->fd, s, false) 结束监听。send_buffer() 会检测自定义 socket 结构体里的发送缓存是否全部发送完毕。
上面的例子中,只实现了客户端向服务器发数据,如何实现双向发数据呢?仔细看 send_socket() 这个函数,发现该函数仅保留对 CONNECTING 和 CONNECTED 两种 socket 类型的处理,过滤掉了其他所有类型socket。那么按之前的写法, 服务器的 socket 在 report_accept() 后处于 PACCEPT 的状态。难道是只有连接发起者才能发数据?想想不对啊。不小心又看了看 start_socket() 函数,明白了。。
static int start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { int id = request->id; result->id = id; result->opaque = request->opaque; result->ud = 0; result->data = NULL; struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID || s->id !=id) { return SOCKET_ERROR; } if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) { if (sp_add(ss->event_fd, s->fd, s)) { s->type = SOCKET_TYPE_INVALID; return SOCKET_ERROR; } s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; s->opaque = request->opaque; result->data = "start"; return SOCKET_OPEN; } else if (s->type == SOCKET_TYPE_CONNECTED) { s->opaque = request->opaque; result->data = "transfer"; return SOCKET_OPEN; } return -1; } start_socket() 不仅可以让处于 PLISTEN 状态的 socket 变为 LISTEN 状态,还可以让处于 PACCEPT 状态变为 CONNECTED!!同时还能将其 fd 纳入 epoll 的监控下。可见, start_socket() 就是像是使能键一样。关于这一点,我们在之后上层的剖析中会有进一步介绍。由此推想到之前 report_accept() 函数对于 accept() 产生的新 fd,并未将其放在 epoll 的监控下,如果你想使用它进行 socket 通信,还需调用 socket_server_start() 函数来使能。结论:socket_server_listen() 及 report_accept() 新创建的 socket 都需要通过调用 socket_server_start() 来使能,才能收数据。于是,我们将 report_accept() 里的修改回滚为之前的:
struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false); 然后,在收到 SOCKET_ACCEPT 消息后,调用 socket_server_start(),修改如下:
int c; static void * _poll(void * ud) { struct socket_server *ss = ud; struct socket_message result; for (;;) { int type = socket_server_poll(ss, &result, NULL); // DO NOT use any ctrl command (socket_server_close , etc. ) in this thread. switch (type) { case SOCKET_EXIT: return NULL; case SOCKET_DATA: printf("message(%lu) [id=%d] size=%d data= %s\n",result.opaque,result.id, result.ud, result.data); free(result.data); char *data = (char *) malloc(sizeof(char) * 20); // 新增 memcpy(data, "hello cxl", 20); // 新增 socket_server_send(ss, result.id, data, strlen(data)); // 新增 break; case SOCKET_CLOSE: printf("close(%lu) [id=%d]\n",result.opaque,result.id); break; case SOCKET_OPEN: printf("open(%lu) [id=%d] %s\n",result.opaque,result.id,result.data); break; case SOCKET_ERROR: printf("error(%lu) [id=%d]\n",result.opaque,result.id); break; case SOCKET_ACCEPT: printf("accept(%lu) [id=%d %s] from [%d]\n",result.opaque, result.ud, result.data, result.id); socket_server_start(ss, 600, result.ud); // 新增 break; } } } 于是就能双向通信了。(我这种写法是无穷的回声,根本停不下来。。。)输出如下:
listening 1 binding stdin 2 checkctrl = 1 has_cmd = 1 LLLLLLLLLLLLL checkctrl = 1 has_cmd = 1 SSSSSSSSSSSS s->opaque old: 200, new: 201 oooooooppppppppp3333333 open(201) [id=1] start checkctrl = 1 has_cmd = 1 BBBBBBBBBBBB error(300) [id=2] checkctrl = 1 has_cmd = 1 OOOOOOOOOOOOOO checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 0 0 now we get: 2 get fd: 6, opa: 201, type: 3, read: 1, write: 0 client_fd: 8 accept(201) [id=4 127.0.0.1:56814] from [1] checkctrl = 1 has_cmd = 1 SSSSSSSSSSSS s->opaque old: 201, new: 600 oooooooppppppppp3333333 open(600) [id=4] start checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 2 get fd: 7, opa: 400, type: 4, read: 0, write: 1 Error: 0 PTR: 0, 0AAAAAAAAAAAAAAAAAAAAA oooooooppppppppp5555 open(400) [id=3] 127.0.0.1 checkctrl = 0 event: 2 2 now we get: 1 checkctrl = 1 has_cmd = 1 DDDDDDDDDDDDDD s->type111111111111: 5, id = 3 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 1 get fd: 8, opa: 600, type: 5, read: 1, write: 0 message(600) [id=4] size=11 data= hello world checkctrl = 1 has_cmd = 1 DDDDDDDDDDDDDD s->type111111111111: 5, id = 4 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 1 get fd: 7, opa: 400, type: 5, read: 1, write: 0 message(400) [id=3] size=9 data= hello cxlld checkctrl = 1 has_cmd = 1 DDDDDDDDDDDDDD s->type111111111111: 5, id = 3 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 1 get fd: 8, opa: 600, type: 5, read: 1, write: 0 message(600) [id=4] size=9 data= hello cxlld checkctrl = 1 has_cmd = 1 DDDDDDDDDDDDDD s->type111111111111: 5, id = 4 checkctrl = 1 has_cmd = 0 checkctrl = 0 event: 1 1 now we get: 1 get fd: 7, opa: 400, type: 5, read: 1, write: 0 message(400) [id=3] size=9 data= hello cxlld . . .
关于socket_server源码解析,还可以参考一下视频:
skynet源码分析 - 01skynet源码分析(一)epoll及事件循环工作流程:skynet源码分析 - 02skynet源码分析(二)skynet源码分析 - 03skynet源码分析(三)