在beanstalk中,网络处理模块没有使用第三方库,而是作者自己实现的一个模块,总计代码不到100行。
是epoll的使用典范。
1. epoll_event
struct epoll_event结构如下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */};typedef union epoll_data { void ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;beanstalk中将epoll_data存储一个socket抽象,结构如下:
socket抽象 struct Socket { int fd; //socket句柄 Handle f; //回调处理函数 void *x; //回调函数输入参数 int added; //是否加入epoll};
2. epoll处理
epoll操作包括包括创建epoll对象,将socket加入epoll对象,获取准备好的socket。
epollintsockinit(void) //epoll初始化,非常简单,没有特殊处理{ epfd = epoll_create(1); if (epfd == -1) { twarn("epoll_create"); return -1; } return 0;}intsockwant(Socket *s, int rw) //管理epoll的socket,增删改{ int op; struct epoll_event ev = {}; //epoll操作 if (!s->added && !rw) { return 0; } else if (!s->added && rw) { s->added = 1; op = EPOLL_CTL_ADD; } else if (!rw) { op = EPOLL_CTL_DEL; } else { op = EPOLL_CTL_MOD; } //epoll_event的事件和数据,使用的是epoll默认的LT(level-trigger )模式 switch (rw) { case 'r': ev.events = EPOLLIN; break; case 'w': ev.events = EPOLLOUT; break; } ev.events |= EPOLLRDHUP | EPOLLPRI; ev.data.ptr = s; return epoll_ctl(epfd, op, s->fd, &ev);}intsocknext(Socket **s, int64 timeout) //获取一个触发epoll事件的socket{ int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); if (r == -1 && errno != EINTR) { //由于epoll_wait同步等待,有可能被信号中断,返回EINTR错误 twarn("epoll_wait"); exit(1); } //将事件转化为抽象的操作 if (r) { *s = ev.data.ptr; if (ev.events & (EPOLLHUP|EPOLLRDHUP)) { return 'h'; } else if (ev.events & EPOLLIN) { return 'r'; } else if (ev.events & EPOLLOUT) { return 'w'; } } return 0;
3. epoll在程序中的回调
在beanstalk中,socket抽象包括两类:
a. 监听socket,回调函数就是接受处理srvaccept
b.和客户端连接socket,回调函数就是协议处理prothandle
整体来说流程如下:
a. Server负责初始化epoll;并将自己的监听socket加入epoll;
b. 当客户端有连接的时候,监听socket通过接受处理srvaccept和客户端建立socket连接,并将其加入epoll;
c. 时钟处理通过定时任务prottick,把超时的客户端连接socket加入epoll。
这里的加入epoll,第一个是直接调用socketwant加入的,其他两种都是将socket放入协议处理文件的静态变量dirty中,
然后调用update_conns()来完成的。
4. 网络处理流程总结
a. Server创建epoll;
b. 监听socket加入epoll,接受客户端连接h_accept、连接管理update_conns、关闭连接connclose调用socketwant处理socket请求;
c. 在连接Conn.state为STATE_CLOSE时,或者事件fd和连接fd不同时,或者在update_conns中调用socketwant失败时,调用关闭连接;
d. 接受客户端连接h_accept、协议处理prothandle、定时处理prottick三个函数,函数返回之前都要调用连接管理函数update_conns;
5.. 网络连接接受处理
1 void 2 h_accept(const int fd, const short which, Server *s) 3 { 4 Conn *c; 5 int cfd, flags, r; 6 socklen_t addrlen; 7 struct sockaddr_in6 addr; 8 9 addrlen = sizeof addr;10 cfd = accept(fd, (struct sockaddr *)&addr, &addrlen);11 if (cfd == -1) {12 if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");13 update_conns();14 return;15 }16 if (verbose) {17 printf("accept %d\n", cfd);18 }19 20 flags = fcntl(cfd, F_GETFL, 0);21 if (flags < 0) {22 twarn("getting flags");23 close(cfd);24 if (verbose) {25 printf("close %d\n", cfd);26 }27 update_conns();28 return;29 }30 31 r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);32 if (r < 0) {33 twarn("setting O_NONBLOCK");34 close(cfd);35 if (verbose) {36 printf("close %d\n", cfd);37 }38 update_conns();39 return;40 }41 42 c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);43 if (!c) {44 twarnx("make_conn() failed");45 close(cfd);46 if (verbose) {47 printf("close %d\n", cfd);48 }49 update_conns();50 return;51 }52 c->srv = s;53 c->sock.x = c;54 c->sock.f = (Handle)prothandle;55 c->sock.fd = cfd;56 57 r = sockwant(&c->sock, 'r');58 if (r == -1) {59 twarn("sockwant");60 close(cfd);61 if (verbose) {62 printf("close %d\n", cfd);63 }64 update_conns();65 return;66 }67 update_conns();68 }