LinuxIO多路复用

本文最后更新于:2021年4月21日 晚上

概览:LinuxIO多路复用

同步/异步、阻塞/非阻塞、并行/并发

同步:发出一个功能调用,在没有得到结果之前,该调用就不返回或这继续执行后续才做,必须一件一件事情去做。调用者主动等待调用结果

异步:当一个异步过程调用发出以后,调用者在没有得到结果之前,就可以继续执行后续操作,且当调用完成以后,一般会通过状态、通知和回调来通知调用者,对于异步调用,调用的返回并不受调用者的控制。

同步/异步关心的时消息通知机制。

在同步的情况下,由调用者自己去处理去等待消息被触发

而异步情况下,则是由某些触发机制来通知处理消息者。

阻塞/非阻塞关心的是程序等待调用结果时的状态。

阻塞:是指调用结果返回之前,当前线程会被挂起,只有在得到结果之后才会返回。

非阻塞:是指不能立刻得到结果之前,该调用不会阻塞当前进程,通过轮询的凡是查询调用是否完成。

https://zhuanlan.zhihu.com/p/88403724

https://www.jianshu.com/p/74a63eab9cbe

并行和并发表示CPU执行多个任务的方式。

并行:多个CPU时会出现并行,两个进程占用两个不同的CPU,可以同时进行,指多个任务同一时间点发生,不会相互抢占资源。

并发:在操作系统之中,某个时间段内多个进程都已经启动运行,它们共同占用一个处理机。指同一时间段内发生,多个任务之间会互相抢占资源。

https://cloud.tencent.com/developer/article/1424249


IO多路复用

IO多路复用使得程序能够同时监听多个文件描述符,能够提高程序的性能。

是最常使用的I/O通知机制,它指的是应用程序通过I/O复用函数向内核注册一组事件,内核通过I/O复用函数把其中就绪的事件通知给应用程序

  • 常用api:select、poll、epoll_wait
  • I/O复用函数本身是阻塞的,它们能提高程序效率的原因在于它们具有同时监听多个I/O事件的能力。

需要使用IO多路复用的情况

  • 客户端程序要同时处理多个socket
  • 客户端程序要同时处理用户输入和网络连接
  • TCP服务器要同时处理监听socket和连接socket。
  • 服务器要同时处理TCP请求和UDP请求。
  • 服务器要同时监听多个端口,或者处理多种服务。

且当多个文件描述符同时就绪时,如果不采取额外措施,程序就只能按顺序依次处理其中的每一个文件描述符,就像是在串行工作。要实现并发,需要多进程或者多线程。

select

思想:

首先要构造一个关于文件描述符的列表,将要监听的文件描述符添加到该列表中。

调用一个系统函数,监听该列表中的文件描述符,直到这些描述符中的一个或者多个进行I/O 操作时,该函数才返回。 (这个函数是阻塞 ,函数对文件描述符的检测的操作是由内核完成的 )

在返回时,它会告诉进程有多少(哪些)描述符要进行I/O操作。

API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// sizeof(fd_set) = 128 1024
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
/*
- 参数:
- nfds : 委托内核检测的最大文件描述符的值 + 1
- readfds : 要检测的文件描述符的读的集合,委托内核检测哪些文件描述符的读的属性
- 一般检测读操作
- 对应的是对方发送过来的数据,因为读是被动的接收数据,检测的就是读缓冲

- 是一个传入传出参数
- writefds : 要检测的文件描述符的写的集合,委托内核检测哪些文件描述符的写的属性
- 委托内核检测写缓冲区是不是还可以写数据(不满的就可以写)
- exceptfds : 检测发生异常的文件描述符的集合
- timeout : 设置的超时时间
struct timeval {
long tv_sec; // seconds
long tv_usec; // microseconds
};
- NULL : 永久阻塞,直到检测到了文件描述符有变化
- tv_sec = 0 tv_usec = 0, 不阻塞
- tv_sec > 0 tv_usec > 0, 阻塞对应的时间
- 返回值 :
- -1 : 失败
- >0(n) : 检测的集合中有n个文件描述符发生了变化
*/
// 将参数文件描述符fd对应的标志位设置为0
void FD_CLR(int fd, fd_set *set);
// 判断fd对应的标志位是0还是1, 返回值 : fd对应的标志位的值,0,返回0, 1,返回1
int FD_ISSET(int fd, fd_set *set);
// 将参数文件描述符fd 对应的标志位,设置为1
void FD_SET(int fd, fd_set *set);
// fd_set一共有1024 bit, 全部初始化为0
void FD_ZERO(fd_set *set);

select的缺点

  1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
  2. 在内核中也要遍历传递进来的所有fd,这个开销也比较大
  3. select默认支持的文件描述符集合是1024,fd_set是128字节的。
  4. fds集合不能够重用,每次都要重置!

select使用

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <poll.h>
struct pollfd {
int fd; /* 委托内核检测的文件描述符 */
short events; /* 委托内核检测文件描述符的什么事件 */
short revents; /* 文件描述符实际发生的事件 */
};

//使用实例
struct pollfd myfd;
myfd.fd = 5;
myfd.events = POLLIN | POLLOUT;

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
/*
- 参数:
- fds : 是一个struct pollfd 结构体数组,这是一个需要检测的文件描述符的集合
- nfds : 这个是第一个参数数组中最后一个有效元素的下标 + 1
- timeout : 阻塞时长
0 : 不阻塞
-1 : 阻塞,当检测到需要检测的文件描述符有变化,解除阻塞
>0 : 阻塞的时长
- 返回值:
-1 : 失败
>0(n) : 成功,n表示检测到集合中有n个文件描述符发生变化
*/

常用的一些事件:

poll比select的优点

  • 能够支持更多的文件描述符
  • 文件描述符集合不用重置,方便编程!

poll的缺点

  • 依旧是select的问题,拷贝的开销较大、每次都是轮询,花销也比较大!

poll的使用

epoll

API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
include <sys/epoll.h>
// 创建一个新的epoll实例。在内核中创建了一个数据,这个数据中有两个比较重要的数据,一个是需要检测的文件描述符的信息(红黑树),还有一个是就绪列表,存放检测到数据发送改变的文件描述符信息(双向链表)。

int epoll_create(int size);
/*
- 参数:
size : 目前没有意义了。随便写一个数,必须大于0
- 返回值:
-1 : 失败
> 0 : 文件描述符,操作epoll实例的
*/

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

常见的Epoll检测事件:
- EPOLLIN
- EPOLLOUT
- EPOLLERR


// 对epoll实例进行管理:添加文件描述符信息,删除信息,修改信息
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
- 参数:
- epfd : epoll实例对应的文件描述符
- op : 要进行什么操作
EPOLL_CTL_ADD: 添加
EPOLL_CTL_MOD: 修改
EPOLL_CTL_DEL: 删除
- fd : 要检测的文件描述符
- event : 检测文件描述符什么事情
*/

// 检测函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
/*
- 参数:
- epfd : epoll实例对应的文件描述符
- events : 传出参数,保存了发送了变化的文件描述符的信息
- maxevents : 第二个参数结构体数组的大小
- timeout : 阻塞时间
- 0 : 不阻塞
- -1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
- > 0 : 阻塞的时长(毫秒)
- 返回值:
- 成功,返回发送变化的文件描述符的个数 > 0
- 失败 -1
*/

epoll实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include <stdio.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/epoll.h>

int main(){

//1. 创建socket
int lfd = socket(AF_INET,SOCK_STREAM,0);
if(lfd == -1){
perror("socket");
exit(-1);
}

//2. 绑定
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(9876);
inet_pton(AF_INET,"172.26.96.221",&serveraddr.sin_addr.s_addr);
int ret = bind(lfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr));
if(ret == -1){
perror("bind");
exit(-1);
}

//3. 监听
ret = listen(lfd,5);
if(ret == -1){
perror("listen");
exit(-1);
}

//4. epoll注册事件

//先创建一个epoll实例
int epfd = epoll_create(1);
if(epfd == -1){
perror("epoll_create");
exit(-1);
}

struct epoll_event epev;
epev.events = EPOLLIN;
epev.data.fd = lfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&epev);

struct epoll_event epevs[1024];//事件数组

int cfd;//已连接的套接字描述符

while(1){
ret = epoll_wait(epfd,epevs,1024,-1);//-1 表示阻塞
if(ret == -1){
perror("epoll_wait");
exit(-1);
}

printf("ret = %d \n",ret);

for(int i=0;i<ret;i++){
int curfd = epevs[i].data.fd;

if(curfd == lfd){
//触发监听,说明是客户端的连接到达
struct sockaddr_in cliaddr;
int addrlen = sizeof(cliaddr);

cfd = accept(curfd,(struct sockaddr *)&cliaddr,&addrlen);

//添加一个事件为 监听客户端发来的数据
epev.events = EPOLLIN;
epev.data.fd = cfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&epev);
}else if(curfd == cfd){
char buf[1024] = {0};

int len = read(curfd,buf,sizeof(buf));
if(len == -1){
perror("read");
exit(-1);
}else if(len == 0){
printf("client closed \n");
epoll_ctl(epfd,EPOLL_CTL_DEL,curfd,NULL);
close(curfd);
}else if(len > 0){
printf("recv buf = %s \n",buf);
write(curfd,buf,strlen(buf)+1);
}
}
}

}

close(lfd);
close(epfd);

return 0;
}

epoll的工作模式

1.LT模式 水平触发

假设委托内核检测读事件 -> 检测fd的读缓冲区

读缓冲区有数据 - > epoll检测到了会给用户通知

  1. 用户不读数据,数据一直在缓冲区,epoll 会一直通知
  2. 用户只读了一部分数据,epoll会通知
  3. 缓冲区的数据读完了,不通知

LT(level - triggered)是缺省的工作方式,并且同时支持 block 和 no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的 fd 进行 IO 操 作。如果你不作任何操作,内核还是会继续通知你的。

2.ET模式 边沿触发

假设委托内核检测读事件 -> 检测fd的读缓冲区

读缓冲区有数据 - > epoll检测到了会给用户通知

  1. 用户不读数据,数据一致在缓冲区中,epoll下次检测的时候就不通知了
  2. 用户只读了一部分数据,epoll不通知
  3. 缓冲区的数据读完了,不通知

ET(edge - triggered)是高速工作方式,只支持 no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪, 并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述 符不再为就绪状态了。

但是请注意,如果一直不对这个 fd 作 IO 操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

ET 模式在很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。epoll 工作在 ET 模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。


epoll ET模式

  1. 设置已连接的socket非阻塞,fctnl
  2. 对socket的监听事件,添加上epev.events = EPOLLIN | EPOLLET;
  3. 读取数据时,应当循环读取while( (len = read(cfd,recvBuf,sizeof(recvBuf))) > 0 )
  4. 非阻塞读取数据可能会产生错误,这时需要额外检测EAGAIN的错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>

int main() {

// 创建socket
int lfd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in saddr;
saddr.sin_port = htons(9999);
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = INADDR_ANY;

// 绑定
bind(lfd, (struct sockaddr *)&saddr, sizeof(saddr));

// 监听
listen(lfd, 8);

// 调用epoll_create()创建一个epoll实例
int epfd = epoll_create(100);

// 将监听的文件描述符相关的检测信息添加到epoll实例中
struct epoll_event epev;
epev.events = EPOLLIN;
epev.data.fd = lfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &epev);

struct epoll_event epevs[1024];

while(1) {

int ret = epoll_wait(epfd, epevs, 1024, -1);
if(ret == -1) {
perror("epoll_wait");
exit(-1);
}

printf("ret = %d\n", ret);

for(int i = 0; i < ret; i++) {

int curfd = epevs[i].data.fd;

if(curfd == lfd) {
// 监听的文件描述符有数据达到,有客户端连接
struct sockaddr_in cliaddr;
int len = sizeof(cliaddr);
int cfd = accept(lfd, (struct sockaddr *)&cliaddr, &len);

// 设置cfd属性非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);

epev.events = EPOLLIN | EPOLLET; // 设置边沿触发
epev.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &epev);
} else {
if(epevs[i].events & EPOLLOUT) {
continue;
}

// 循环读取出所有数据
char buf[5];
int len = 0;
while( (len = read(curfd, buf, sizeof(buf))) > 0) {
// 打印数据
// printf("recv data : %s\n", buf);
write(STDOUT_FILENO, buf, len);
write(curfd, buf, len);
}
if(len == 0) {
printf("client closed....");
}else if(len == -1) {
if(errno == EAGAIN) { //非阻塞情况下,可能会产生这样的错误!!
printf("data over.....");
}else {
perror("read");
exit(-1);
}

}

}

}
}

close(lfd);
close(epfd);
return 0;
}

IO模型

从理论上来说,阻塞IOI/O复用信号驱动IO都是同步I/O模型,I/O读写操作都是在I/O事件发生后,由应用程序来完成的。

异步IO,用户可以直接对IO执行读写操作,这些操作告诉内核用户读写缓冲区的位置,以及IO操作完成之后内核通知应用程序的方式。

异步IO操作总是立即返回,无论IO是否阻塞,因为真正的读写操作已经由内核接管

p127

同步IO模型要求用户自行执行IO操作(将数据从内核缓冲区读入用户缓冲区,或将数据从用户缓冲区写入内核缓冲区)

异步IO机制则由内核来执行IO操作,(数据在内核缓冲区和用户缓冲区之间的移动是由内核在后台自动完成的)

同步IO向应用程序通知的是IO就绪事件

异步IO向应用程序通知的是IO完成事件

IO模型 读写操作和阻塞阶段
阻塞IO 程序阻塞于读写函数
IO复用 程序阻塞于IO复用系统调用,但可同时监听多个IO事件,对IO本身的速写=读写操作是非阻塞的。
SIGIO信号 信号触发读写就绪事件,用户程序执行读写阶段,程序没有阻塞阶段。
异步IO 内核执行读写操作并触发读写完成事件,程序没有阻塞阶段。

事件处理模式

  • Reactor模式:同步IO模型
  • Proactor模式:异步IO模型

同步:通知IO就绪事件

异步:通知IO完成事件

Reactor模式 — 同步

要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将该事件通知给工作线程。除此之外,主线程不做任何其他实质性地工作。

读写数据、接收新的连接、以及处理客户请求均在工作线程中完成。

使用同步I/O模型(epoll_wait为例)实现地Reactor模式工作流程

1
2
3
4
5
6
7
8
9
10
11
1. 主线程往epoll内核事件表中注册socket上的读就绪事件
2. 主线程调用epoll_wait等待socket上有数据可以读
3.socket上有数据可以读的时候,epoll_wait通知主线程,
主线程则将socket可读事件放入请求队列
4. 睡眠在请求队列上的某个工作线程就被唤醒,它从socket上读取数据,
并处理客户请求,然后往epoll内核事件表中注册socket上的写就绪事件
5. 主线程调用epoll_wait等待socket可写
6.socket可写时,epoll_wait通知主线程。
主线程将socket可写事件放入请求队列
7. 睡眠在请求队列上的某个工作线程被唤醒,
它往socket上写入服务器处理客户请求的结果

Proactor模式 — 异步

Proactor将所有的IO操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑。

使用异步I/O模型(以aio_read和aio_write为例)实现Proactor模式的工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
1. 主线程调用aio_read函数向内核注册socket上的读完成事件,
并告诉内核 用户缓冲区的位置 以及读操作完成时如何通知应用程序。
2. 主线程继续处理其他逻辑
3.socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,
来通知应用程序数据已经可用
4. 应用程序预先定义好的信号处理函数 选择一个工作线程来处理客户请求。
工作线程来处理客户请求之后,调用aio_write函数向内核注册socket上的写完成事件
并告诉内核 用户写缓冲区的位置,以及写操作完成时如何通知应用程序。
5. 主线程继续处理其他逻辑
6. 当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,
来通知应用程序数据已经发送完毕
7. 应用程序预先定义好的信号处理函数许纳泽一个工作线程来做善后处理,
比如决定是否关闭socket

同步方式模拟Proactor模式

原理:主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一“完成事件”。

从工作线程的角度来看,它直接获得了数据读写的结果,接下来只要对读写的结果进行逻辑处理。

使用同步I/O模型(epoll_wait为例)模拟地Proactor模式工作流程

1
2
3
4
5
6
7
8
9
10
1. 主线程往epoll内核事件表中注册socket上的读就绪函数
2. 主线程调用epoll_wait等待soket上有数据可读
3. 当soket上有数据可读时,epoll_wait通知主线程,
主线程从socket循环读取数据,直到没有更多数据可读
然后将读取到的数据封装成一个请求对象并插入请求队列。
4. 睡眠在请求队列上的某个工作线程被唤醒,它获取请求对象并处理客户请求
然后往epoll内核事件表中注册socket上的写就绪事件
5. 主线程调用epoll_wait等待socket可写
6.socket可写时,epoll_wait通知主线程,
主线程往socket上写入服务器处理客户端请求的结果

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!