cen's blog cen's blog
首页
  • 编程文章

    • markdown使用
  • 学习笔记

    • C++学习
    • C++数据结构
    • MySQL
    • Linux
    • 网络编程
算法
  • Git
  • ProtoBuf
  • 分类
  • 标签
  • 归档
关于
GitHub (opens new window)

cen

十年饮冰,难凉热血
首页
  • 编程文章

    • markdown使用
  • 学习笔记

    • C++学习
    • C++数据结构
    • MySQL
    • Linux
    • 网络编程
算法
  • Git
  • ProtoBuf
  • 分类
  • 标签
  • 归档
关于
GitHub (opens new window)
  • Linux环境搭建
  • 基本指令
  • 权限
  • Linux基础开发工具
  • 进程概念
  • 进程控制
  • 基础IO流
  • 动态库和静态库
  • 进程通信
  • 进程信号
  • 多线程
  • 线程安全
  • 生产者消费者
  • 线程池
  • 高级IO
    • 高效 IO
    • 五种 IO 模型
    • 非阻塞 IO
    • 多路转接-select
      • 基本概念和接口
      • select 服务器
      • 特点
    • 多路转接-poll
      • 基本概念和接口
      • poll 服务器
      • 特点
  • 多路转接epoll
  • Reactor
  • Linux
cen
2025-09-22
目录

高级IO

# 高效 IO

一言概之,IO 的时间就是等待的时间 + 拷贝的时间。例如,应用层中的read函数在传输层的 tcp 接收缓冲区中没有数据,需要阻塞和等待,有了数据才可以拷贝到上层。
高级 IO 就是如何尽量减少“等待”的时间?

# 五种 IO 模型

钓鱼例子:

  1. 张三:死等着鱼上钩
  2. 李四:有鱼上钩就把鱼钓上来,否则做其他事情
  3. 王五:鱼漂上绑一个铃铛,然后就去做其他事情,如果铃铛响了就挥动鱼竿将鱼钓上来
  4. 赵六:将 100 个鱼竿抛入水中,接着反复检查,有鱼上钩就把鱼钓上来
  5. 钱七:给田八一个桶、一个电话、一个鱼竿去钓鱼并要求当鱼桶装满的时候再打电话告诉钱七,而他去做其他事情

例子中,5 个人的钓鱼方式对应了五种 IO 模型:

  • 张三 --> 阻塞式 IO
  • 李四 --> 非阻塞式 IO
  • 王五 --> 信号驱动式 IO
  • 赵六 --> 多路复用/多路转接
  • 钱七 --> 异步 IO

其中,鱼就是数据,河就是内核空间,鱼竿就是文件描述符,鱼漂就是数据就绪的事件。

阻塞式 vs 非阻塞式

需要做一件事能不能立即得到返回应答,如果不能立即获得返回,需要等待,那就阻塞了(进程或线程就阻塞在那了,不能做其它事情),否则就可以理解为非阻塞(在等待的过程中可以做其它事情)。

同步 vs 异步

  • 同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果;
  • 异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后,被调用者来通知调用者,或通过回调函数处理这个调用

另外线程同步是指多线程执行具有顺序性;而这里的 IO 同步指的是参与 IO 过程。

具体理解

阻塞&非阻塞&同步&异步都是描述一种 IO 的状态,例如调用 recv 函数,当工作在阻塞模式下,数据没有就绪时,当前线程被挂起;在非阻塞模式下,就会立即返回,进而通过返回值来判断。同步标识调用者主动等待被调用者的结果。调用者需要自己来关心和获取这个结果;而异步是被调用者完成后主动通知调用者。调用者只需发起请求,然后等待通知即可。

while (true) {
    ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);
    if (size < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            break;  // 当前数据读完,退出循环
        } else if (errno == EINTR) {
            continue;  // 被中断,继续读取
        } else {
            return -1; // 真正出错
        }
    } else if (size == 0) {
        return -1;  // 对端连接关闭
    } else {
        // 处理读取到的数据
        buffer[size] = '\0';
        // ...
    }
}
return 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 非阻塞 IO

fcntl 是一个用于操作文件描述符的系统调用函数。它可以执行多种操作,如复制文件描述符、获取和设置文件描述符标志、文件状态标志以及记录锁等。

#include <fcntl.h>

int fcntl(int fd, int cmd, ... /* arg */);
1
2
3

fcntl 函数有 5 种功能:

  • 复制一个现有的描述符(cmd=F_DUPFD)
  • 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD)
  • 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL)
  • 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN)
  • 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW)

设置文件描述符为非阻塞:

int fd1 = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, fd1 | O_NONBLOCK);
1
2

实现

void setNonBlock(int fd) {
    int fd1 = fcntl(fd, F_GETFL);
    if (fd1 < 0) {
        return;
    }
    fcntl(fd, F_SETFL, fd1 | O_NONBLOCK);
}

void execTasks() {
    cout << "exec other task..." << endl;
}

int main() {
    setNonBlock(0);
    char buffer[1024];
    while (true) {
        std::cout << ">>> ";
        ssize_t size = read(0, buffer, sizeof(buffer) - 1);
        if (size < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                std::cout << strerror(errno) << std::endl;
                sleep(1);
                execTasks();
                continue;
            } else if (errno == EINTR) {
                std::cout << strerror(errno) << std::endl;
                sleep(1);
                execTasks();
                continue;
            } else {
                std::cout << strerror(errno) << std::endl;
                break;
            }
        }
        if (size == 0) {
            std::cout << "buffer end# " << std::endl;
            break;
        }
        buffer[size - 1] = 0;
        std::cout << "echo# " << buffer << std::endl;
        execTasks();
    }
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 多路转接-select

# 基本概念和接口

select 函数来实现多路复用输入/输出模型,select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。

#include <sys/select.h>

typedef /* ... */ fd_set;

struct timeval {
    time_t       tv_sec;   /* Seconds */
    suseconds_t  tv_usec;  /* Microseconds */
};

typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

int select(int nfds,
           fd_set* restrict readfds,
           fd_set* restrict writefds,
           fd_set* restrict exceptfds,
           struct timeval* restrict timeout);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

参数:

  • nfds:需要监视的文件描述符中,最大的文件描述符值 + 1

  • readfds、writefds、exceptfds:输入输出型参数,位图类型,分别表示输入时用户告诉内核 fd 上的读、写、异常事件需要关心,比特位的位置表示哪个 fd,比特位的内容表示是否需要关心;输出时内核告诉用户需要关心的多个 fd 中,那些已经就绪了,比特位的位置表示哪个 fd,比特位的内容表示 fd 上的读、写、异常事件就绪了。其中,

    操作 fd_set 的系列接口:
    void FD_CLR(int fd, fd_set *set);
    int FD_ISSET(int fd, fd_set *set);
    void FD_SET(int fd, fd_set *set);
    void FD_ZERO(fd_set *set);

  • timeout:调用时由用户设置 select 的等待时间,返回时表示 timeout 的剩余时间

  1. timeout 设置为 nullptr,表示阻塞式等待;
  2. timeout 设置为 {0, 0},表示非阻塞式等待;
  3. timeout 设置为特定时间值,表示在该时间内阻塞式等待,超过该时间,以非阻塞式返回,且返回时表示剩余时间

返回值:

  • 调用成功,返回就绪的文件描述符个数;
  • timeout 时间耗尽,超时返回,返回 0;
  • 调用失败,则返回-1,同时错误码会被设置

# select 服务器

selectServer.hpp

#pragma once

#include <poll.h>

#include <vector>

#include "log.hpp"
#include "socket.hpp"

using namespace std;

namespace Server {

const static int fdnum = sizeof(fd_set) * 8;
const static int defaultfd = -1;

class selectServer {
private:
    // for debug:
    void print() {
        cout << "fd list: ";
        for (auto& i : fdarray) {
            if (i != defaultfd) {
                cout << i << " ";
            }
        }
        cout << endl;
    }

    void Accepter() {
        logMessage(NORMAL, "Accepter in...");
        string clientip;
        uint16_t clientport;
        int sock = Sock::Accept(listensock, &clientip, &clientport);
        if (sock < 0) {
            logMessage(ERROR, "accept fail");
            exit(ACCEPT_ERR);
        }
        logMessage(NORMAL, "have a new link [%s : %d], sock: %d", clientip.c_str(), clientport, sock);
        // 新的文件描述符放入数组
        int i = 0;
        for (; i < fdarray.size(); i++) {
            if (fdarray[i] != defaultfd) {
                continue;
            }
            break;
        }
        if (i == fdnum) {
            logMessage(WARNING, "server have fulled, please wait...");
        } else {
            fdarray[i] = sock;
        }
        print();

        logMessage(NORMAL, "Accepter off...");
    }

    void Recver(int fd, int pos) {
        logMessage(NORMAL, "Recver in...");
        char buffer[1024];
        ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0) {
            close(fd);
            fdarray[pos] = defaultfd;
            logMessage(WARNING, "client quit");
            return;
        }

        buffer[n] = 0;
        string msg = "client# ";
        msg += buffer;
        write(fd, msg.c_str(), msg.size());
        logMessage(NORMAL, "Recver off...");
    }

    void handlerEvent(fd_set& rset) {
        if (FD_ISSET(listensock, &rset)) {
            Accepter();
        }
        for (int i = 0; i < fdnum; i++) {
            if (fdarray[i] == defaultfd || fdarray[i] == listensock) {
                continue;
            }
            logMessage(DEBUG, "fdarray[%d] = %d", i, fdarray[i]);

            if (FD_ISSET(fdarray[i], &rset)) {
                Recver(fdarray[i], i);
            }
        }
    }

public:
    selectServer(uint16_t _port) : port(_port), listensock(-1), fdarray(fdnum, defaultfd) {}

    void init() {
        listensock = Sock::Socket();
        Sock::Bind(listensock, port);
        Sock::Listen(listensock);
        fdarray[0] = listensock;
    }

    void start() {
        fd_set rset;

        while (true) {
            FD_ZERO(&rset);
            struct timeval timeout = {3, 0};
            int maxfd = fdarray[0];
            for (auto& fd : fdarray) {
                if (fd == defaultfd) {
                    continue;
                } else {
                    FD_SET(fd, &rset);
                }
                maxfd = max(maxfd, fd);
            }
            logMessage(DEBUG, "max fd = %d", maxfd);
            int n = select(maxfd + 1, &rset, nullptr, nullptr, &timeout);
            switch (n) {
                case 0:
                    logMessage(NORMAL, "timeout...");
                    break;
                case -1:
                    logMessage(ERROR, "select fail...");
                    break;
                default:
                    logMessage(DEBUG, "have %d links", n);
                    handlerEvent(rset);
                    break;
            }
        }
    }

    ~selectServer() {
        close(listensock);
        listensock = -1;
    }

private:
    uint16_t port;
    int listensock;
    vector<int> fdarray;  // 存放所有的合法文件描述符
};

}  // namespace Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145

注意点:

  1. 接听套接字先交给 select,连接事件就绪就是读事件就绪
  2. vector<int> fdarray用来存放所有的合法文件描述符
  3. 每次 select 都需要重新设置位图
FD_ZERO(&rset);
struct timeval timeout = {3, 0};
int maxfd = fdarray[0];
for (auto& fd : fdarray) {
    if (fd == defaultfd) {
        continue;
    } else {
        FD_SET(fd, &rset);
    }
    maxfd = max(maxfd, fd);
}
logMessage(DEBUG, "max fd = %d", maxfd);
int n = select(maxfd + 1, &rset, nullptr, nullptr, &timeout);
1
2
3
4
5
6
7
8
9
10
11
12
13

# 特点

  1. 可监控的文件描述符个数取决与 sizeof(fd_set)的值,有上限
  2. 需要借助于辅助数组来保存合法的 fd,作为源数据和 fd_set 进行 FD_ISSET 判断。
  3. 调用 select 之前,要重新设置 fd;调用之后,要重新更新 fd,增加了拷贝的开销

# 多路转接-poll

# 基本概念和接口

#include <poll.h>

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
1
2
3
4
5
6
7
8
9

参数

  • fds 是一个 poll 函数监听的结构列表,每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合,可以想象成一个动态数组;也就是用户告知操作系统关心 fd 上的那些 events,然后内核告知用户那些 revents 就绪了。
    常用的事件宏定义:
#define POLLIN		0x001		/* There is data to read.  */
#define POLLPRI		0x002		/* There is urgent data to read.  */
#define POLLOUT		0x004		/* Writing now will not block.  */
1
2
3
  • nfds 表示 fds 数组的长度
  • timeout 表示 poll 函数的超时时间, 单位是毫秒(ms),与 select 的 timeout 类似
  1. timeout 设置为 -1,表示阻塞式等待;
  2. timeout 设置为 0,表示非阻塞式等待;
  3. timeout 设置为特定时间值,表示在该时间内阻塞式等待,超过该时间,以非阻塞式返回,且返回时表示剩余时间

返回值:

  • 返回值小于 0, 表示出错;
  • 返回值等于 0, 表示 poll 函数等待超时;
  • 返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回

# poll 服务器

pollServer.cpp

#pragma once

#include <vector>

#include <poll.h>

#include "log.hpp"
#include "socket.hpp"

using namespace std;

namespace Server {

const static int fdnum = 1024;
const static int defaultfd = -1;
const static int timeout = 3000;

class pollServer {
private:
    // for debug:
    void print() {
        cout << "fd list: ";
        for (int i = 0; i < fdnum; i++) {
            if (rfds[i].fd != defaultfd)
                cout << rfds[i].fd << " ";
        }
        cout << endl;
    }

    void Accepter() {
        logMessage(NORMAL, "Accepter in...");
        string clientip;
        uint16_t clientport;
        int sock = Sock::Accept(listensock, &clientip, &clientport);
        if (sock < 0) {
            logMessage(ERROR, "accept fail");
            exit(ACCEPT_ERR);
        }
        logMessage(NORMAL, "have a new link [%s : %d], sock: %d", clientip.c_str(), clientport, sock);

        int i = 0;
        for (; i < fdnum; i++) {
            if (rfds[i].fd != defaultfd) {
                continue;
            }
            break;
        }
        if (i == fdnum) {
            logMessage(WARNING, "server have fulled, please wait...");
        } else {
            rfds[i].fd = sock;
            rfds[i].events = POLLIN;
            rfds[i].revents = 0;
        }
        print();
        logMessage(NORMAL, "Accepter off...");
    }

    void Recver(int pos) {
        logMessage(NORMAL, "Recver in...");
        char buffer[1024];
        int fd = rfds[pos].fd;
        ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0) {
            close(fd);
            rfds[pos].fd = defaultfd;
            rfds[pos].events = POLLIN;
            rfds[pos].revents = 0;
            logMessage(WARNING, "client quit");
            return;
        }

        buffer[n] = 0;
        string msg = "client# ";
        msg += buffer;
        write(fd, msg.c_str(), msg.size());
        logMessage(NORMAL, "Recver off...");
    }

    void handlerEvent() {
        if (rfds[0].revents & POLLIN) {
            Accepter();
        }
        for (int i = 0; i < fdnum; i++) {
            if (rfds[i].fd == defaultfd || rfds[i].fd == listensock) {
                continue;
            }
            logMessage(DEBUG, "rfds[%d] = %d", i, rfds[i]);

            if (rfds[i].events & POLLIN && rfds[i].revents & POLLIN) {
                Recver(i);
            }
        }
    }

    void resetRfds(int pos) {
        rfds[pos].fd = defaultfd;
        rfds[pos].events = 0;
        rfds[pos].revents = 0;
    }

public:
    pollServer(uint16_t _port) : port(_port), listensock(-1), rfds(nullptr) {}

    void init() {
        listensock = Sock::Socket();
        Sock::Bind(listensock, port);
        Sock::Listen(listensock);
        rfds = new struct pollfd[fdnum];
        for (int i = 0; i < fdnum; i++) {
            resetRfds(i);
        }
        rfds[0].fd = listensock;
        rfds[0].events = POLLIN;
    }

    void start() {
        while (true) {
            int n = poll(rfds, fdnum, timeout);
            switch (n) {
                case 0:
                    logMessage(NORMAL, "timeout...");
                    break;
                case -1:
                    logMessage(ERROR, "%s", strerror(errno));
                    break;
                default:
                    logMessage(DEBUG, "have %d links", n);
                    handlerEvent();
                    break;
            }
        }
    }

    ~pollServer() {
        close(listensock);
        listensock = -1;
        delete[] rfds;
    }

private:
    uint16_t port;
    int listensock;
    struct pollfd* rfds;
};

}  // namespace Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147

# 特点

  • pollfd 结构包含了要监视的 event 和发生的 event,接口使用比 select 更方便
  • poll 并没有最大数量限制 (但是数量过大后性能也是会下降).
  • 文件描述符数目增多时,需要轮询 pollfd 来获取就绪的描述符;
  • 每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核中,增加拷贝开销
上次更新: 2025/11/11, 22:03:54
线程池
多路转接epoll

← 线程池 多路转接epoll→

最近更新
01
动态规划
11-08
02
ProtoBuf
09-28
03
Git
09-28
更多文章>
Theme by Vdoing | Copyright © 2024-2025 京ICP备2020044002号-3 京公网安备11010502056119号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式