浅谈 Select & Epoll 多路复用技术
传统阻塞 Socket 编程
传统的并发通信是如何进行的?
阻塞模式下使用阻塞模式下的多进程 + recv/send 进行通信。
-
【主进程】依次调用
socket
创建和初始化 socket,调用bind
将 socket 关联到ip:port
二元组,调用listen
开始被动监听。 -
【主进程】然后开一个循环反复阻塞在
accept
上等待连接,一旦连接建立就通过fork()
创建子进程 -
【子进程】在子进程中通过
recv
和send
和客户端通信。需要注意的是 TCP 是基于字节流的,因此需要在设计应用层协议时约定消息边界。往往需要在子进程中设计好状态机来处理。 -
【主进程】通过
sigaction
或signal(SIGCHLD, SIG_IGN)
处理子进程结束信号,避免僵尸进程。
send
调用将用户 buf 复制到内核 sndbuf,实际发送由协议层完成,如果失败会在下一个 socket 函数报错.
recv
调用实质只从内核 rcvbuf 复制数据,实际接收由协议完成。
如何知道 socket 连接断开?
可以用 setsockopt
设置超时时间。如果出现问题,recv
会返回 -1
。
非阻塞 Socket 编程
非阻塞模式下,只需要一个线程,从而大大减轻了运行开销。一个线程能在(宏观上)同时处理大量 socketfd,这是一种多路复用。
多路复用有 select,poll,epoll 三种方式。
而 Select 需要完整遍历,且 fdset 本质是位图,容量小。Poll 利用 pollfd 数组解决容量问题。二者都需要遍历 fd。
Epoll 解决低效遍历问题。只需要直接管理一个 fd,而各个 socketfd 通过内部的红黑树管理,利用事件监听
Select 、Poll 多路复用
-
select 方式:监视 writefds、readfds、和 exceptfds 三类 fdset。以读为例,调用 select 后,内核会遍历检查各个 fd 是否可读。三类描述符状态变化,或者超时,则将 fds 集合拷贝到用户内存,并唤醒用户进程。
-
poll 方式:采用 pollfd 描述 fd 集合,解决了 fdset 的大小限制。
问题总结:
-
用户态需要开个循环反复调用
-
反复调用导致反复传参:每次调用都要把 fdsets 从用户空间拷贝到内核空间
-
处理能力有限:只能监听有限个 fd
-
用户每次都只能挨个遍历每个 socket 来收集可读事件
这俩了解即可,只是作为历史的尘埃和反面教材罢了。重点是 epoll。
EPoll 方式
基础了解:
epoll 方式:采用 epoll 对象管理 fd。内部采用红黑树管理。按需得到 events,解决低效问题。提供三个函数:
-
epoll_create:创建一个 epoll 句柄
-
epoll_ctl:向 epoll 对象中添加/修改/删除要管理的连接
-
epoll_wait:等待其管理的连接上的 IO 事件
-
按需拷贝:fd 首次调用 epoll_ctl 拷贝,每次调用 epoll_wait 不拷贝
-
无限大小:其内部有 wq(等待队列链表)、rbr(红黑树,索引 socket)、rdllist(就绪 fd 链表)
-
回调通知:不需要遍历所有 fd
-
-
epoll 水平触发和边缘触发
-
LT:只要内核缓冲区有数据就一直通知
-
ET:只有状态发生变化才通知,只有当 socket 由不可写到可写或由不可读到可读,才会返回其sockfd
-
Epoll 使用实践
API 和相关结构
1#include <sys/epoll.h>
2
3int epoll_create(int size);
-
作用:创建一个 epoll 的句柄,size 用来告诉内核这个监听的数目一共有多大。
-
注意:使用完 epoll 后,必须调用
close()
关闭,否则可能导致 fd 被耗尽。 -
参数:
- size:监听 socket 的数目
-
返回值:
- 如果成功,返回 epoll fd 句柄,否则返回 -1
1#include <sys/epoll.h>
2
3int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
-
作用:epoll 的事件注册函数,epoll_ctl 向 epoll 对象中添加、修改或者删除感兴趣的事件
-
参数:
-
epfd:epoll 句柄
-
op:操作类型,EPOLL_CTL_ADD 添加、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除
-
fd:要监听的 fd
-
event:监听事件,是告诉内核需要监听什么事。包含了要监听的 fd 的事件类型,以及回调函数
-
-
返回值:返回 0 表示成功,返回 - 1 表示失败
epoll_event 和 epoll_event 结构体:
1typedef union epoll_data {
2 void *ptr;
3 int fd;
4 __uint32_t u32;
5 __uint64_t u64;
6} epoll_data_t;
7
8struct epoll_event {
9 __uint32_t events; /* Epoll events */
10 epoll_data_t data; /* User data variable */
11};
events 的取值:
-
EPOLLIN:表示对应的 fd 可读
-
EPOLLOUT:表示对应的 fd 可写
-
EPOLLPRI:表示对应的 fd 有紧急数据可读
-
EPOLLERR:表示对应的 fd 发生错误
-
EPOLLHUP:表示对应的 fd 连接关闭
-
EPOLLET:表示对应的 fd 被 epoll 注册的时候,使用 edge-trigger 模式,也就是只有 fd 的状态发生变化时才触发回调(性能更高)
-
EPOLLONESHOT:表示一次性的事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个socket加入到EPOLL队列里
1#include <sys/epoll.h>
2
3int epoll_wait(int epfd, struct epoll_event *events,
4 int maxevents, int timeout);
-
作用:epoll 的事件监听函数,epoll_wait 用来监听 epoll 对象上注册的事件
-
参数:
-
epfd:epoll 句柄
-
events:用来保存监听到的事件
-
maxevents:最多监听多少个事件
-
timeout:超时时间,单位是毫秒
-
-
返回值:返回监听到的事件数目,如果返回 -1 表示失败
C++ 实践
下面的代码,我们使用 epoll 实现一个简单的 echo server。
1#include <arpa/inet.h>
2#include <sys/epoll.h>
3#include <unistd.h>
4#include <csignal>
5#include <cstring>
6#include <iostream>
7#include <string>
8
9namespace lb {
10
11class cli_options {
12 private:
13 // IPv4 Address
14 std::string address_;
15 u_int16_t port_;
16
17 public:
18 cli_options(const std::string& address, const u_int16_t& port)
19 : address_(address), port_(port) {}
20 const std::string& address() const { return address_; }
21 u_int16_t port() const { return port_; }
22};
23
24class server {
25 private:
26 static server* instance_;
27 static constexpr int max_events = 1024;
28 lb::cli_options options_;
29 int epoll_fd_;
30
31 public:
32 server(const lb::cli_options& options) : options_(options) {
33 instance_ = this;
34 }
35 void run();
36 // signal handler
37 static void handle_signal(int signo);
38};
39
40} // namespace lb
41
42lb::server* lb::server::instance_ = nullptr;
43
44lb::cli_options init_args(int argc, char const* const argv[]) {
45 std::string address = "0.0.0.0";
46 u_int16_t port = 8080;
47 const char* usage = "usage: lb_server [--address=<address>] [--port=<port>]";
48 const char* address_opt = "--address";
49 const char* port_opt = "--port";
50 for (int i = 1; i < argc; i++) {
51 if (std::strcmp(argv[i], address_opt) == 0) {
52 if (i + 1 < argc) {
53 address = argv[i + 1];
54 } else {
55 std::cerr << usage << std::endl;
56 exit(1);
57 }
58 } else if (std::strcmp(argv[i], port_opt) == 0) {
59 if (i + 1 < argc) {
60 port = atoi(argv[i + 1]);
61 } else {
62 std::cerr << usage << std::endl;
63 exit(1);
64 }
65 } else {
66 std::cerr << usage << std::endl;
67 exit(1);
68 }
69 }
70 lb::cli_options options(address, port);
71 return options;
72}
73
74void lb::server::handle_signal(int signo) {
75 if (signo == SIGINT) {
76 std::cout << "server is shutting down..." << std::endl;
77 auto instance = lb::server::instance_;
78 close(instance->epoll_fd_);
79 exit(0);
80 }
81}
82
83void lb::server::run() {
84 // 初始化信号处理函数
85 struct sigaction sa;
86 sa.sa_handler = lb::server::handle_signal;
87 sigemptyset(&sa.sa_mask);
88 sa.sa_flags = 0;
89 sigaction(SIGINT, &sa, NULL);
90
91 // 初始化地址
92 in_addr_t addr = inet_addr(options_.address().c_str());
93 if (addr == INADDR_NONE) {
94 std::cerr << "invalid address" << std::endl;
95 exit(1);
96 }
97 sockaddr_in addr_in;
98 addr_in.sin_family = AF_INET;
99 addr_in.sin_port = htons(options_.port());
100 addr_in.sin_addr.s_addr = addr;
101
102 // 初始化 socket_fd,并进行 bind 和 listen
103 auto listen_fd = socket(AF_INET, SOCK_STREAM, 0);
104 if (listen_fd == -1) {
105 std::cerr << "socket error" << std::endl;
106 exit(1);
107 }
108 int opt = 1;
109 setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
110
111 int ret;
112 ret = bind(listen_fd, (sockaddr*)&addr_in, sizeof(addr_in));
113 if (ret == -1) {
114 std::cerr << "bind error: " << strerror(errno) << std::endl;
115 exit(1);
116 }
117 ret = listen(listen_fd, SOMAXCONN);
118 if (ret == -1) {
119 std::cerr << "listen error" << std::endl;
120 exit(1);
121 }
122
123 std::cout << "server is listening on " << options_.address() << ":"
124 << options_.port() << std::endl;
125
126 // 初始化 epoll
127
128 epoll_fd_ = epoll_create(max_events);
129 if (epoll_fd_ == -1) {
130 std::cerr << "epoll_create error" << std::endl;
131 exit(1);
132 }
133
134 epoll_event event;
135 event.data.fd = listen_fd;
136 event.events = EPOLLIN | EPOLLET;
137
138 ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd, &event);
139 if (ret == -1) {
140 std::cerr << "epoll_ctl error when initialize: " << strerror(errno)
141 << std::endl;
142 exit(1);
143 }
144 epoll_event events[max_events];
145
146 // 事件循环,接收到的事件会被放到 events 中
147 while (true) {
148 int n = epoll_wait(epoll_fd_, events, max_events, -1);
149 if (n == -1) {
150 std::cerr << "epoll_wait error" << std::endl;
151 exit(1);
152 }
153 if (n == 0) {
154 std::cout << ".";
155 continue;
156 }
157 // 处理每个事件
158 for (int i = 0; i < n; i++) {
159 // 如果是新连接,则 accept
160 if (events[i].data.fd == listen_fd) {
161 sockaddr_in client_addr;
162 socklen_t client_addr_len = sizeof(client_addr);
163 // 新的连接
164 int client_fd =
165 accept(listen_fd, (sockaddr*)&client_addr, &client_addr_len);
166 if (client_fd == -1) {
167 std::cerr << "accept error" << std::endl;
168 exit(1);
169 }
170 std::cout << "client connected (from "
171 << inet_ntoa(client_addr.sin_addr) << ":"
172 << ntohs(client_addr.sin_port) << ")" << std::endl;
173
174 // 将新的 client_fd 添加到 epoll 中
175 event.data.fd = client_fd;
176 // event.events = EPOLLIN ;
177 event.events = EPOLLIN | EPOLLET;
178 ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event);
179 if (ret == -1) {
180 std::cerr << "epoll_ctl error when add client: " << strerror(errno)
181 << std::endl;
182 exit(1);
183 }
184 continue;
185 }
186
187 // 处理 EPOLLIN 事件
188 if (!(events[i].events & EPOLLIN)) {
189 continue;
190 }
191
192 // 对于已经连接的 client,如果有数据到来,则读取数据
193 char buf[32];
194 int client_fd = events[i].data.fd;
195 int n = recv(client_fd, buf, sizeof(buf), 0);
196 if (n == -1) {
197 // 查询错误码
198 int err = errno;
199 // 如果是不能立即完成,则继续等待
200 if (err == EAGAIN || err == EWOULDBLOCK) {
201 continue;
202 } else {
203 // 其他错误,关闭连接
204 std::cerr << "recv error" << std::endl;
205 close(client_fd);
206 // 删除 epoll 中的连接
207 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, NULL);
208 // 打印错误信息
209 std::cerr << "recv error when recv: " << strerror(err) << std::endl;
210 continue;
211 }
212 }
213 // 获取 client 的地址,这一步只是为了输出地址
214 sockaddr_in client_addr;
215 socklen_t client_addr_len = sizeof(client_addr);
216 ret = getsockname(client_fd, (sockaddr*)&client_addr, &client_addr_len);
217 if (ret == -1) {
218 std::cerr << "getsockname error" << std::endl;
219 exit(1);
220 }
221
222 // 收到 0 字节的数据,则表示 client 正常地断开连接
223 if (n == 0) {
224 // https://stackoverflow.com/questions/8707601/is-it-necessary-to-deregister-a-socket-from-epoll-before-closing-it
225 std::cout << "client disconnected (from "
226 << inet_ntoa(client_addr.sin_addr) << ":"
227 << ntohs(client_addr.sin_port) << ")" << std::endl;
228 close(client_fd);
229 // ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, NULL);
230 // if (ret == -1) {
231 // std::cerr << "epoll_ctl error when remove fd: " << strerror(errno) <<
232 // std::endl; exit(1);
233 // }
234 }
235 // 正常收到数据,则打印出来
236 else {
237 // 打印收到的 bytes
238 std::cout << "time: " << time(NULL) << std::endl
239 << "client addr: " << inet_ntoa(client_addr.sin_addr) << ":"
240 << ntohs(client_addr.sin_port) << std::endl
241 << "received " << n << " bytes" << std::endl
242 << "data: " << std::string(buf, 0, n) << std::endl
243 << std::endl;
244 // echo back
245 send(client_fd, buf, n, 0);
246 }
247 }
248 }
249}
250
251int main(int argc, char const* argv[]) {
252 auto options = init_args(argc, argv);
253 lb::server server(options);
254 server.run();
255 return 0;
256}
当然,还有一些不足之处:
-
程序轻易使用
exit(1)
来退出,导致健壮性不是很好。 -
TCP 提供的是字节流服务,我们没有处理消息边界,所以收到的数据可能会被分割。
-
安全性不是很好,无论谁来连接我们都允许了。
其他问题
epoll 可以用回调函数处理吗?
答:epoll_ctl 虽然可以设置要监听的 fd,但 epoll_event 并未提供函数指针之类的接口,因此无法直接设置回调。需要的话得自己封装。
epoll 可以处理 UDP 吗?
答:可以,但没必要。多路复用是用于面向连接的服务,而 UDP 已经提供了面向消息的服务,并且不可靠,所以内核不会像 TCP 那样进行复杂的连接管理,自然也不需要多路复用。
对于 UDP,我们迭代+状态机就足矣处理了。
HTTP/3 中的底层支撑协议 QUIC,是基于 UDP 的协议,在应用层面实现了连接,因此又引入了多路复用。
epoll 如果想结合多线程如何使用?
可以。Reactor 模型了解一下?