I/O复用

I/O复用

概述

I/O复用即可以同时监视多个I/O状态的能力,例如stdin,stdout,tcp等。

I/O模型

阻塞式I/O

以UDP距离,recvfrom在调用开始到返回的整段时间是阻塞的,如果没有读入就会永远阻塞下去。平常写的scanf,cin是最简单的阻塞式I/O.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>

int main(int argc,char* argv[])
{
int sfd=socket(AF_INET,SOCK_DGRAM,0);

char buf[512]={0};
struct sockaddr_in fromaddr;
bzero(&fromaddr,sizeof(fromaddr));
socklen_t fromaddrlen=sizeof(struct sockaddr);
//没有报文一直阻塞
int ret=recvfrom(sfd,buf,sizeof(buf),0,(struct sockaddr*)&fromaddr,&fromaddrlen);
printf("%d\n",ret);
puts(buf);
return 0;
}

非阻塞式I/O

进程把一个套接字设置成非阻塞,当所请求I/O操作非得把本进程投入睡眠才能完成时,不把本进程投入休眠,而是返回一个错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>

int setnoblocking(int sock)
{
//获取标志符本身状态
int opts=fcntl(sock,F_GETFL);
if(opts<0) return -1;
//添加非阻塞属性
opts|=O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<0) return -1;
return 1;
}

int main(int argc,char* argv[])
{
int sfd=socket(AF_INET,SOCK_DGRAM,0);
setnoblocking(sfd);
char buf[512]={0};
struct sockaddr_in fromaddr;
bzero(&fromaddr,sizeof(fromaddr));
socklen_t fromaddrlen=sizeof(struct sockaddr);
int ret=recvfrom(sfd,buf,sizeof(buf),0,(struct sockaddr*)&fromaddr,&fromaddrlen);
if(errno==EWOULDBLOCK) printf("无数据\n");
printf("%d\n",ret);
puts(buf);
return 0;
}

以上代码仅为实例,实际使用需要使用轮询(polling)查询数据报。会浪费大量CPU时间。

I/O复用模型

I/O复用模型主要为select、poll以及epoll,把阻塞转移到两个系统调用上,而不是阻塞在真正的I/O系统调用上。

epoll(红黑树)非阻塞模型:(非阻塞+ET触发):多线程可以使用阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#include <iostream>
#include <algorithm>
#include <string>
#include <vector>
#include <map>
#include <set>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include <pthread.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <memory.h>

const int MAXLINE=10;
const int OPEN_MAX=100;
const int LISTENQ=20;
const int SERV_PORT=1025;
const int INFTIM=1000;
struct task{
int fd;
task *next;
};
struct user_data{
int fd;
unsigned int n_size;
char line[MAXLINE];
};
void* readtask(void* args);
void* writetask(void* args);
epoll_event ev,event[20];
int epfd;
pthread_mutex_t mutex;
pthread_cond_t cond;
task *readhead=NULL,*readtail=NULL,*writehead=NULL;

using namespace std;
void check(int i,char* j)
{
if(i<0) perror(j);
}
void setnonblocking(int sock)
{
int opts=fcntl(sock,F_GETFL);
if(opts<0){
perror("fcntl\n");
exit(-1);
}
opts|=O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<0)
{
perror("fcntl setfl\n");
exit(-1);
}
}
int main()
{
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond,NULL);
task *new_task=NULL;
user_data *rdata=NULL;
pthread_t pid1,pid2;
pthread_create(&pid1,NULL,readtask,NULL);
pthread_create(&pid2,NULL,readtask,NULL);
epfd=epoll_create(10);//2.6.8之后参数被忽略

sockaddr_in clientadr,serveraddr;
int listenfd=socket(AF_INET,SOCK_STREAM,0);
setnonblocking(listenfd);
ev.data.fd=listenfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

bzero(&serveraddr,sizeof(serveraddr));
serveraddr.sin_family=AF_INET;
serveraddr.sin_port=htons(SERV_PORT);
serveraddr.sin_addr.s_addr=INADDR_ANY;
bind(listenfd,(sockaddr*)&serveraddr,sizeof(serveraddr));
listen(listenfd,LISTENQ);
int maxi=0;
for(;;)
{
int nfds=epoll_wait(epfd,event,20,0);
for(int i=0;i<nfds;i++)
{
if(event[i].data.fd==listenfd)
{
socklen_t len;
int connectfd=accept(listenfd,(sockaddr*)&clientadr,&len);
if(connectfd<0)
{
perror("connect error\n");
exit(-1);
}
setnonblocking(connectfd);
char *str=inet_ntoa(clientadr.sin_addr);
cout<<"connec_from>>"<<str<<endl;
ev.data.fd=connectfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connectfd,&ev);
}else if(event[i].events&EPOLLIN)
{
cout<<"reading"<<endl;
int sockfd;
if((sockfd=event[i].data.fd)<0) continue;
new_task=new task();
new_task->fd=sockfd;
new_task->next=NULL;
pthread_mutex_lock(&mutex);
if(readhead==NULL)
{
readhead=new_task;
readtail=new_task;
}else{
readtail->next=new_task;
readtail=new_task;
}
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
}else if(event[i].events&EPOLLOUT){
auto rdata=(user_data*)event[i].data.ptr;
int sockfd=rdata->fd;
write(sockfd,rdata->line,rdata->n_size);
delete rdata;
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
}
}
return 0;
}
void* readtask(void* args)
{
int fd=-1;
unsigned int n;
user_data *data=NULL;
while(1)
{
pthread_mutex_lock(&mutex);
while(readhead==NULL)
pthread_cond_wait(&cond,&mutex);
fd=readhead->fd;
task* tmp=readhead;
readhead=readhead->next;
delete tmp;
pthread_mutex_unlock(&mutex);
data=new user_data;
data->fd=fd;
if((n=read(fd,data->line,MAXLINE))<0)
{
if(errno==ECONNRESET)
{
close(fd);
}else{
perror("readlin error\n");
}
if(data!=NULL) delete data;
}else{
if(n==0)
{
close(fd);
cout<<"client close connect!"<<endl;
if(data!=NULL) delete data;
}else{
data->n_size=n;
ev.data.ptr=data;
ev.events=EPOLLET|EPOLLOUT;
epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);
}
}
}
}

select模型:(轮询)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <stdio.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>

int main()
{
int sockfd=socket(AF_INET,SOCK_STREAM,0);
struct sockaddr_in addr;
bzero(&addr,sizeof(addr));
addr.sin_family=AF_INET;
addr.sin_port=htons(8888);
addr.sin_addr.s_addr=INADDR_ANY;
bind(sockfd,(struct sockaddr*)&addr,sizeof(addr));
listen(sockfd,19);
int fd=0;
char buf[512]={0};
fd_set rdset;
while(1)
{
FD_ZERO(&rdset);
//注册
FD_SET(sockfd,&rdset);
if(select(20,&rdset,NULL,NULL,NULL)<0)
{
continue;
}
for(fd=0;fd<19;fd++)
{
if(FD_ISSET(fd,&rdset))
{
if(fd==sockfd)
{
struct sockaddr_in client;
int cfd=accept(sockfd,(struct sockaddr*)&client,NULL);
FD_SET(cfd,&rdset);
}else{
bzero(buf,sizeof(buf));
recv(fd,buf,sizeof(buf),0);
puts(buf);
//取消注册
FD_CLR(fd,&rdset);
close(fd);
}
}
}
}
close(sockfd);
return 0;
}

信号驱动式I/O模型

内核在描述符就绪发送SIGIO信号通知即为信号驱动式I/O.

不适用与TCP,因为TCP会在以下情况下都会产生SIGIO信号,无法分辨事件类型

  • 监听套接字上某个连接请求请求已经完成
  • 某个断连接请求已经发起
  • 某个连接之半已经关闭
  • 数据到达套接字
  • 数据已经从套接字发送走(即输出缓冲区有空闲空间)
  • 发生某个异步错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//应用极少

//定义SIGIO信号处理函数
void sig_io(int sig)
{
...
}
//注册信号处理
signal(SIGIO,sig_io);
//设置套接字owner,一般为主进程
fcntl(sockfd,F_SETOWN,getpid());
//开启该套接字信号驱动I/O
fcntl(sockfd,F_SETFL,O_ASYNC);
//or,老内核使用
const int on=1;
ioctl(sockfd,O_ASYNC,&on);

异步I/O模型

POSIX中aio_*相关函数

异步I/O与信号驱动式I/O区别在于,信号驱动式I/O通知我们何时可以启动一个I/O操作,异步I/O模型是内核通知我们I/O操作何时完成

目前内核内aio相关函数依旧在变动,接口还在改变

1
2
3
//Linux 4.5+
ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);

参考:AIO

模型对比

只有AIO是真正的异步。

参考资料

  • UNIX网络编程,第6章