多路转接epoll
# 基本概念和接口
- epoll_create epoll_create 创建一个 epoll 模型
#include <sys/epoll.h>
int epoll_create(int size);
/* size大要求,大于0即可 */
1
2
3
4
2
3
4
- epoll_ctl
epoll_ctl 用于管理 epoll 模型所关注的文件描述符集合。
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
};
typedef union epoll_data epoll_data_t;
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
参数:
epfd:epoll_create 成功调用后返回的 epoll 模型的文件描述符
op:指定要执行的操作类型
EPOLL_CTL_ADD:注册新的文件描述符到指定的 epoll 模型中。EPOLL_CTL_MOD:修改已经注册的文件描述符的监听事件。EPOLL_CTL_DEL:从 epoll 模型中删除指定的文件描述符。
fd:需要监视的文件描述符
event:表示关心的哪些事件
EPOLLIN: 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭)EPOLLOUT: 表示对应的文件描述符可以写EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)EPOLLERR: 表示对应的文件描述符发生错误EPOLLHUP: 表示对应的文件描述符被挂断EPOLLET: 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里
epoll_wait
epoll_wait 收集关心的事件中已经就绪的事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
1
参数:
- epfd:epoll_create 成功调用后返回的 epoll 模型的文件描述符
- events:输出型参数,内核会将已经就绪的事件拷贝到 events 数组当中
- maxevents:指定 events 数组的最大容量,即一次调用最多可以接收多少个事件
- timeout:ms 为单位,同 poll
返回值:
- 返回值小于 0, 表示出错;
- 返回值等于 0, 表示 poll 函数等待超时;
- 返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回
# 底层原理
红黑树和就绪队列
- 当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体
- 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件
- 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来
回调机制
- 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法
- 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体
- 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.
- 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户,而且 epoll_wait 可以将所有的就绪事件按顺序放到用户传入的数组中
# LT 模式和 ET 模式
- 水平触发(LT)模式:如果有人在门口按着门铃不放手,门铃就会持续不断地响,直到你开门或者对方松手。只要“有访客”(对应 socket 有数据可读)这个状态存在,它就会一直提醒你。
- 边缘触发(ET)模式:无论访客在门口站多久,门铃只在访客按下按钮的那个瞬间响一次。之后即使他还在门口,门铃也不会再响了。它只关心状态变化的瞬间。
什么叫做事件就绪?即 IO 条件满足了,可以进行 IO 了,select/poll/epoll 就是等待事件就绪,是一种 IO 就绪事件的通知机制。
- LT:只要底层有数据没读完,epoll 就会一直通知用户读取数据
- ET:只要底层有数据没读完,epoll 不再通知用户,指导底层的数据再次增多时,才会再通知用户一次
理解 ET:
ET 模式下,底层只有在数据从无到有变化的时候才会通知上层,只会通知一次,这样就倒逼程序员将本轮就绪的数据全部读取上,可以循环读取,知道读取不到数据来实现。默认的 fd 是阻塞式的,所以 ET 模式下 fd 必须是非阻塞式的,否则进程挂起,导致服务器瘫痪。
# epoll 服务器
#pragma once
#include <functional>
#include <vector>
#include <sys/epoll.h>
#include "log.hpp"
#include "socket.hpp"
using namespace std;
typedef function<string(char*)> func_t;
namespace Server {
const static int eventnum = 10;
const static int timeout = 3000;
class epollServer {
private:
void Accepter() {
string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport);
if (sock < 0) {
logMessage(WARNING, "accept fail");
}
logMessage(NORMAL, "have a new link [%s : %d], sock: %d", clientip.c_str(), clientport, sock);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
}
void Recver(int sock) {
logMessage(NORMAL, "Recver in");
char buffer[1024];
ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0);
// 你怎么保证读的是一个报文?不能保证
if (n <= 0) {
logMessage(WARNING, "client quit");
// 建议:先 epoll 移除后 close 关闭
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
} else {
buffer[n] = 0;
string resp = func(buffer);
logMessage(NORMAL, "client# %s", buffer);
send(sock, resp.c_str(), resp.size(), 0);
}
logMessage(NORMAL, "Recver off");
}
void handlerEvent(int n) {
for (int i = 0; i < n; i++) {
uint32_t event = events[i].events;
int sock = events[i].data.fd;
if (sock == listensock && (event & EPOLLIN)) {
// listen 套接字就绪
Accepter();
} else if (event & EPOLLIN) {
// 普通套接字就绪
Recver(sock);
}
}
}
public:
epollServer(uint16_t _port, func_t _func) : port(_port), listensock(-1), func(_func), epfd(-1), events(nullptr) {}
void init() {
// 1. 创建 socket
listensock = Sock::Socket();
Sock::Bind(listensock, port);
Sock::Listen(listensock);
// 2. 创建 epoll 模型
epfd = epoll_create(256);
if (epfd < 0) {
logMessage(FATAL, "epoll_create error!");
exit(EPOLL_CREATE_ERR);
}
// 3. 将 listen 套接字的读事件关心起来
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listensock; // 当事件就绪时,可以知道是哪一个 fd
epoll_ctl(epfd, EPOLL_CTL_ADD, listensock, &ev);
// 4. 存放就绪时间的数组
events = new struct epoll_event[eventnum];
}
void start() {
while (true) {
// 5. 得到就绪的时间
int n = epoll_wait(epfd, events, eventnum, timeout);
switch (n) {
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(ERROR, "%s", strerror(errno));
break;
default:
logMessage(DEBUG, "have %d links", n);
handlerEvent(n);
break;
}
}
}
~epollServer() {
close(listensock);
close(epfd);
listensock = -1;
epfd = -1;
delete[] events;
}
private:
uint16_t port;
int listensock;
func_t func;
int epfd;
struct epoll_event* events;
};
} // namespace Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
上次更新: 2025/11/11, 22:03:54