在網絡編程系列文章中,我們實現了一個基于epoll的網絡框架,并在此基礎上開發了一個簡單的HTTP服務,在那個系列文章中我們使用了讀、寫兩個buffer將網絡IO和數據的讀寫進行了分離,它們之間的扭轉完全通過epoll事件通知,如果你認真研究過源碼,會發現,所有針對網絡IO的操作都是由事件觸發的。這種基于事件觸發的網絡模型通常我們叫做Reactor網絡模型。
由于網絡編程系列文章中代碼實現相對比較復雜,不太好講清楚。所以,我決定單獨出幾篇文章對那個系列文章進行一些拓展,主要涉及到網絡編程思想和性能測試。
這篇文章我們通過實現一個簡單的網絡框架,來說明Reactor網絡模型實現的一般思路,其本質思想和x.NET項目基本上是一樣的,只是在代碼上做了非常大的精簡,理解起來會輕松很多。
#include <sys/socket.h>#include <errno.h>#include <netinet/in.h>#include <stdio.h>#include <string.h>#include <unistd.h>#include <sys/epoll.h>int mAIn() {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(2048);if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {perror("bind fail");return -1;}listen(sockfd, 10);printf("sock-fd:%dn", sockfd);int epfd = epoll_create(1);struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);struct epoll_event events[1024] = {0};while(1) {int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0; i < nready; i++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN && sockfd == connfd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);ev.events = EPOLLIN | EPOLLET;ev.data.fd = clientfd;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);printf("clientfd: %dn", clientfd);} else if (events[i].events & EPOLLIN) {char buffer[10] = {0};int count = recv(connfd, buffer, 10, 0);if (count == 0) {printf("discounnectn");epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);close(i);continue;}send(connfd, buffer, count, 0);printf("clientfd: %d, count: %d, buffer: %sn", connfd, count, buffer);}}}}
熟悉epoll的人應該對上面的代碼比較熟悉,這段代碼的核心在下面的while主循環,如果是當前Server的Socket說明有新的連接進來,調用accept拿到客戶端的fd,將其放在epoll的events中,并注冊EPOLLIN事件,一般我們理解為可讀事件。
如果不是sockfd,說明是客戶端的fd可讀,我們將數據讀出來再原樣發送回去。
上面的代碼存在的主要問題在于,套接字的accept和讀寫操作我們是直接寫在主循環里了,這將會讓代碼的邏輯變得難以琢磨。
int recv_callback(int fd, char *buffer, int size);int send_callback(int fd, char *buffer, int size);
int recv_callback(int fd, char *buffer, int size) {int count = recv(fd, buffer, size, 0);send_callback(fd, buffer, count, 0);return count;}int send_callback(int fd, char *buffer, int size) {int count = send(fd, buffer, size, 0);return count;}
int main() {...while(1) {int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0; i < nready; i++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN && sockfd == connfd) {...} else if (events[i].events & EPOLLIN) {char buffer[10] = {0};int count = recv_callback(fd, buffer, 10);if (count == 0) {printf("disconnect\n");epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);clise(i);continue;}}}}}
int recv_callback(int fd, char *buffer, int size) {int count = recv(fd, buffer, size, 0);return count;}int send_callback(int fd, char *buffer, int size) {int count = send(fd, buffer, size, 0);return count;}
但這樣明顯也是有問題的,在recv_callback中讀完了之后,如何發送數據呢?這里,我們可以想一下,圍繞著一個套接字都有哪些部分呢?是不是可以設計出一個類似字典的結構,這個字典的key對應的就是套接字,而value對應的就是圍繞套接字相關的各個組件。
#define BUF_LEN 1024typedef int(*callback)(int fd);struct conn_channel {int fd;callback recv_call;callback send_call;char wbuf[BUF_LEN];int wlen;char rbuf[BUF_LEN];int rlen;};
struct conn_channel conn_map[1024] = {0};int main() {...while(1) {int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0; i < nready; i++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN && sockfd == connfd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);ev.events = EPOLLIN;ev.data.fd = clientaddr;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);conn_map[clientfd].fd = clientfd;conn_map[clientfd].rlen = 0;conn_map[clientfd].wlen = 0;conn_map[clientfd].recv_call = recv_callback;conn_map[clientfd].send_call = send_callback;memset(conn_map[clientfd].rbuf, 0, BUF_LEN);memset(conn_map[clientfd].wbuf, 0, BUF_LEN);printf("clientfd:%d\n", clientfd);} else if (events[i].events & EPOLLIN) {...}}}}
int recv_callback(int fd) {int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);// do somethingmemcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);conn_map[fd].wlen = conn_map[fd].rlen;conn_map[fd].rlen = 0;return count;}int send_callback(int fd) {int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);return count;}
因為有了conn_map,所以原來傳進來的buffer和size都不需要了,在conn_channel中已經有記錄了。所以只需要一個fd參數就可以了。我們在recv_callback中模擬了回復消息,強行將讀到的數據寫到了wbuffer中。這里補充一下,conn_channel中的rbuffer是用來從套接字中讀數據的,wbuffer表示的是將要發送到套接字的數據。
你可以試著把上面的代碼跑起來,然后你會發現,并沒有按我們的預期執行,send_callback中的send似乎沒有起作用。這是因為我們只是將數據從rbuffer寫到了wbuffer中,而send_callback并沒有機會調用。你可以想一想send_callback放在哪里調用比較合適呢?
在上面的例子中,顯然放在主循環中執行比較合適,在epoll中,EPOLLOUT表示可寫事件,我們可以利用這個事件。在recv_callback執行完之后我們注冊一個EPOLLOUT事件,然后在主循環中我們去監聽EPOLLOUT事件。這樣,當recv_callback將rbuffer的數據復制到wbuffer中之后,send_callback通過EPOLLOUT事件就可以在主循環中得以執行。
int recv_callback(int fd) {int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);// do somethingmemcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);conn_map[fd].wlen = conn_map[fd].rlen;conn_map[fd].rlen = 0;struct epoll_event ev;ev.events = EPOLLOUT;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);return count;}
int main() {...while(1) {int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0; i < nready; i++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN && sockfd == connfd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);ev.events = EPOLLIN;ev.data.fd = clientaddr;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);conn_map[clientfd].fd = clientfd;conn_map[clientfd].rlen = 0;conn_map[clientfd].wlen = 0;conn_map[clientfd].recv_call = recv_callback;conn_map[clientfd].send_call = send_callback;memset(conn_map[clientfd].rbuf, 0, BUF_LEN);memset(conn_map[clientfd].wbuf, 0, BUF_LEN);printf("clientfd:%d\n", clientfd);} else if (events[i].events & EPOLLIN) {int count = conn_map[connfd].recv_call(connfd);printf("recv-count:%d\n", count);} else if (events[i].events & EPOLLOUT) { // 處理EPOLLOUT事件int count = conn_map[connfd].send_call(connfd);printf("send-count:%d\n", count);}}}}
要注意的是,epfd是在main函數中定義的,而我們在recv_callback中有使用,所以我們可以暫時將epfd聲明成一個全局變量,放在外面。
int send_callback(int fd) {int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);return count;}
這樣,我們就將IO操作給屏蔽了,在主循環中我們只關注事件,不同的事件調用不同的回調函數。在對應的回調函數中只做自己該做的,做完之后注冊事件通知其它的回調函數。
int accept_callback(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);ev.events = EPOLLIN;ev.data.fd = clientaddr;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);conn_map[clientfd].fd = clientfd;conn_map[clientfd].rlen = 0;conn_map[clientfd].wlen = 0;conn_map[clientfd].recv_call = recv_callback;conn_map[clientfd].send_call = send_callback;memset(conn_map[clientfd].rbuf, 0, BUF_LEN);memset(conn_map[clientfd].wbuf, 0, BUF_LEN);return clientfd;}
struct conn_channel {int fd;union {callback accept_call;callback recv_call;} call_t;callback send_call;char wbuf[BUF_LEN];int wlen;char rbuf[BUF_LEN];int rlen;};
int main() {int sockfd = create_serv(9000);if (sockfd == -1) {perror("create-server-fail");return -1;}make_nonblocking(sockfd);epfd = epoll_create1(1);struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);struct epoll_event events[1024] = {0};conn_map[sockfd].rlen = 0;conn_map[sockfd].wlen = 0;conn_map[sockfd].fd = sockfd;conn_map[sockfd].call_t.accept_call = accept_callback;conn_map[sodkfd].send_call = send_callback;memset(conn_map[sockfd].rbuf, 0, BUF_LEN);memset(conn_map[sockfd].wbuf, 0, BUF_LEN);while(1) {int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0; i < nready; i++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {int count = conn_map[connfd].call_t.recv_call(connfd);printf("recv-count:%d\n", count);} else if (events[i].events & EPOLLOUT) {int count = conn_map[connfd].send_call(connfd);printf("send-count:%d\n", count);}}}}
你可以想一下,我們注冊的是call_t.accept_call,但在調用的時候確是call_t.recv_call,為什么這樣可行?
我們在網絡編程系列文章中,單獨為accept抽象出了一個對象,你可以對比一下這兩種實現方式,看看它們有什么區別?在系列文件中我們為什么要單獨抽象出一個accepter對象呢?
可以看到,最后主循環中的邏輯,只有兩個分支,這兩個分支代表了兩種事件,這種通過事件驅動的網絡模型便是Reactor網絡模型。本文為了容易理解,將代碼進行了精簡。在實際的工程中我們還要考慮諸多情況。比如,上面的代碼只支持epoll,我們是不是可以將事件驅動相關的代碼抽象成單獨的組件,讓其可以支持其它的事件模型。
本文雖然代碼簡單,但Reactor網絡模型的實現基本上都逃脫不了這個套路,只是在此基礎上可能會將各個部分進行單獨的封裝,比如我們在網絡編程系列文章中就將channel和map進行了抽象,讓它能適配各種場景。
總結
reactor網絡模型是網絡編程中非常重要的一種編程思想,本文通過一個簡短的示例試圖講明白reactor網絡編程模型的核心思想。當然,本文的實現還不是很完善,比如在調用回調函數的時候還是傳入了fd,我們是否可以不需要這個參數,徹徹底底地和IO進行分離呢?






