杨记

碎片化学习令人焦虑,系统化学习使人进步

0%

Socket编程2

黑马程序员,man 7 ip ,高并发服务器、UDP

高并发服务器

image-20220617124915060

多进程并发服务器

使用多进程并发服务器时要考虑以下几点:

  1. 父进程最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符)

  2. 系统内创建进程个数(与内存大小相关)

  3. 进程创建过多是否降低整体服务性能(进程调度)

server.c ,和每个客户端的数据交互放在子进程中。子进程结束后,捕捉SIGCHLD信号回收进程资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <sys/wait.h>

#include "wrap.h"

#define SERV_IP "106.13.232.203"
#define SERV_PORT 8888

void wait_child(int signo)
{
while( waitpid(0, NULL, WNOHANG) > 0 );
return;
}

int main()
{
int sfd, cfd, i, pid;
struct sockaddr_in serv_addr, clie_addr;
socklen_t clie_addr_len;
char buf[BUFSIZ], clie_IP[256];
struct sigaction newact;

newact.sa_handler = wait_child;
newact.sa_flags = 0;
sigemptyset(&newact.sa_mask);
sigaction(SIGCHLD, &newact, NULL);

sfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERV_PORT);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// inet_pton(AF_INET, SERV_IP, &serv_addr.sin_addr.s_addr);
Bind(sfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

Listen(sfd, 128);

while(1) {
clie_addr_len = sizeof(clie_addr);
cfd = Accept(sfd, (struct sockaddr*)&clie_addr, &clie_addr_len);

bzero(&clie_IP, sizeof(clie_IP));
printf("client: IP:%s\tport:%d\n", inet_ntop(AF_INET, &clie_addr.sin_addr.s_addr, clie_IP, sizeof(clie_IP)), ntohs(clie_addr.sin_port));

pid = fork();
if(pid < 0) {
perr_exit("fork");
} else if(pid == 0) {
close(sfd);
break;
} else {
close(cfd);
}
}
if(pid == 0) {
int n;
while(1) {
n = Read(cfd, buf, sizeof(buf));
if(n == 0) { // client close
close(cfd);
return 0;
} else if(n == -1) {
perr_exit("read");
} else {
for(i = 0; i < n; ++i)
buf[i] = toupper(buf[i]);
write(cfd, buf, n);
write(STDOUT_FILENO, buf, n);
}
}
}
return 0;
}

多线程并发服务器

在使用线程模型开发服务器时需考虑以下问题:

  1. 调整进程内最大文件描述符上限

  2. 线程如有共享数据,考虑线程同步

  3. 服务于客户端线程退出时,退出处理。(退出值,分离态)

  4. 系统负载,随着链接客户端增加,导致其它线程不能及时得到CPU

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <fcntl.h>
#include <pthread.h>
#include <arpa/inet.h>

#include "wrap.h"

#define MAXLINE 8192
#define SERV_PORT 8000

struct s_info { //定义一个结构体, 将地址结构跟cfd捆绑
struct sockaddr_in clie_addr;
int connfd;
};

void *do_work(void *arg)
{
int n, i;
struct s_info *ts = (struct s_info*)arg;
char buf[BUFSIZ];
char str[INET_ADDRSTRLEN]; // #define INET_ADDRSTRLEN 16 "[+d"可查看

while(1) {
n = Read(ts->connfd, buf, sizeof(buf)); // 读客户端
if(n == 0) {
printf("the client %d closed ...\n", ts->connfd);
break;
}
printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &(*ts).clie_addr.sin_addr.s_addr, str, sizeof(str)), ntohs((*ts).clie_addr.sin_port));

for(i = 0; i < n; ++i)
buf[i] = toupper(buf[i]); // 小写-->大写

Write(STDOUT_FILENO, buf, n); //写出至屏幕
Write(ts->connfd, buf, n); //回写至客户端
}
Close(ts->connfd);

return (void*)0;
}

int main()
{
struct sockaddr_in serv_addr, clie_addr;
socklen_t clie_addr_len;
int listenfd, connfd;
pthread_t tid;
struct s_info ts[256]; //根据最大线程数创建结构体数组
int i = 0;

listenfd = Socket(AF_INET, SOCK_STREAM, 0); //创建一个socket, 得到文件描述符

bzero(&serv_addr, sizeof(serv_addr)); // 地址结构清零
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT); //指定端口号 8000

Bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

Listen(listenfd, 128); //设置同一时刻链接服务器上限数

printf("Accepting client connect ...\n");

while(1) {
clie_addr_len = sizeof(clie_addr);
connfd = Accept(listenfd, (struct sockaddr*)&clie_addr, &clie_addr_len); //阻塞监听客户端链接请求
ts[i].clie_addr = clie_addr;
ts[i].connfd = connfd;

//达到线程最大数是,pthread_create出错处理,增加服务器稳定性
pthread_create(&tid, NULL, do_work, (void*)&ts[i]);
pthread_detach(tid); // 子线程分离,防止僵尸线程产生
++i;
}
return 0;
}

多路I/O转接服务器

多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。

主要使用的方法有三种

  • select函数实现
  • poll函数实现
  • epoll函数实现
既然是文件,则使用文件描述符进行操作。所有除了对套接字(也是文件)适用外,之前的管道等也同样适用。

select实现

select能监听的文件描述符个数受限于FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数

解决1024以下客户端时使用select是很合适的,但如果链接客户端过多,select采用的是轮询模型,会大大降低服务器响应效率,不应在select上投入更多精力

select函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

/*
nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态
readfds: 监控有读数据到达文件描述符集合,传入传出参数
writefds: 监控写数据到达文件描述符集合,传入传出参数
exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数
timeout: 定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询
返回值: 三个文件描述符集合所有 有相应事件发生的个数
*/

struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};

//把文件描述符集合里所有位清0 初始化set(位图)
void FD_ZERO(fd_set *set);
//把文件描述符集合里fd清0 即从set中删除fd
void FD_CLR(int fd, fd_set *set);
//测试文件描述符集合里fd是否置1 即判断fd是否在set中
int FD_ISSET(int fd, fd_set *set);
//把文件描述符集合里fd位置1 即将fd添加到set中
void FD_SET(int fd, fd_set *set);

存在的问题:

  1. 文件描述符有上限 1024
  2. 不知道具体对应哪个文件描述符有反馈,需要for(1 —> 1023)或自定义数据结构(数组)
  3. 监听集合作为传入传出参数,会发生改变。需要保存原集合
使用流程
  1. select()监听 服务器端的监听套接字listenfd的 读(有读数据说明有客户端进行连接)
  2. 使用accept()建立连接返回交互套接字connfd,后select()监听connfd的读写
  3. select()返回 需要处理的文件描述符的个数,使用FD_ISSET()进行判断是哪些文件描述符

server.c 还是小写转大写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>

#include "wrap.h"

#define SERV_PORT 6666

int main(int argc, char* argv[])
{
int i, j, n, maxi;

int nready, client[FD_SETSIZE]; // 自定义数组client 防止遍历1024个文件描述符 FD_SETSIZE 1024
int maxfd, listenfd, connfd, sockfd;
char buf[BUFSIZ], str[INET_ADDRSTRLEN]; // #define INET_ADDRSTRLEN 16

struct sockaddr_in clie_addr, serv_addr;
socklen_t clie_addr_len;
fd_set rset, allset; // rset读事件文件描述符集合 allset用来暂存

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); // 端口复用

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
Listen(listenfd, 128);

printf("wait for connect ...\n");

maxfd = listenfd; // 起初 listenfd 即为最大文件描述符
maxi = -1; // 之后用作client[]的下标 初始值指向0个元素之前下标位置

for(i = 0; i < FD_SETSIZE; ++i)
client[i] = -1; // 用-1初始化client[]

FD_ZERO(&allset);
FD_SET(listenfd, &allset); //构造select监控文件描述符集

while(1) {
rset = allset; //每次循环时都从新设置select监控信号集
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if(nready < 0)
perr_exit("select");

// 有新的客户端链接请求
if(FD_ISSET(listenfd, &rset)) {
clie_addr_len = sizeof(clie_addr);
connfd = Accept(listenfd, (struct sockaddr*)&clie_addr, &clie_addr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, (void*)&clie_addr.sin_addr.s_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));

for(i = 0; i < FD_SETSIZE; ++i) {
if(client[i] < 0) { // 找client[]中没有使用的位置
client[i] = connfd; // 保存accept返回的文件描述符到client[]里
break;
}
}

if(i == FD_SETSIZE) { //达到select能监控的文件个数上限
fputs("too many clients\n", stderr);
exit(1);
}

FD_SET(connfd, &allset); // 向监控文件描述符集合allset添加新的文件描述符connfd
if(connfd > maxfd)
maxfd = connfd; // select 第一个参数需要

if(i > maxi)
maxi = i; // 保证maxi存的总是client[]最后一个元素下标

if(--nready == 0)
continue;
}

// 有数据需要读取
for(i = 0; i <= maxi; ++i) { // 检测哪个clients有数据就绪
if((sockfd = client[i]) < 0)
continue;
if(FD_ISSET(sockfd, &rset)) {
if((n = Read(sockfd, buf, sizeof(buf))) == 0) { // 当client关闭链接时 服务器端也关闭对应连接
Close(sockfd);
FD_CLR(sockfd, &allset); // 解除select对此文件描述符的监控
client[i] = -1;
} else if(n > 0) { // 处理数据 转成大写
for(j = 0; j < n; ++j)
buf[j] = toupper(buf[j]);
Write(sockfd, buf, n);
Write(STDOUT_FILENO, buf, n);
}
if(--nready == 0)
break; // 跳出for循环 还在while中
}
}
}

Close(listenfd);
return 0;
}
pselect函数

pselect原型如下,此模型应用较少。

1
2
3
4
5
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask);
// 用sigmask替代当前进程的阻塞信号集,调用返回后还原原有阻塞信号集

poll实现

优点:

  1. 能监听的文件描述符可以突破1024(方法参见epoll)
  2. 监听、返回集合 分离开
  3. 搜索范围小
poll函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; // 文件描述符
short events; // 监控的事件
short revents; // 监控事件中满足条件返回的事件
};

/*
fds 结构体数组首地址
nfds 监控数组中有多少文件描述符需要被监控(不一定是数组长度)
timeout 毫秒级等待
-1:阻塞等待 #define INFTIM -1 (Linux中没有定义此宏)
0:立即返回,不阻塞进程
>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

事件:
POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND (*)
POLLRDNORM 数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级可读数据
POLLOUT 普通或带外数据可写 (*)
POLLWRNORM 数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误 (*)
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件
*/

如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0

使用示例

server.c 小写转大写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <poll.h>
#include <arpa/inet.h>

#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 8000
#define OPEN_MAX 1024

int main(int argc, char* argv[])
{
int i, j, maxi, listenfd, connfd, sockfd;
int nready; // 接收poll返回值 记录满足监听事件的fd个数
ssize_t n;

char buf[MAXLINE], str[INET_ADDRSTRLEN];
socklen_t clie_len;
struct pollfd client[OPEN_MAX];
struct sockaddr_in clie_addr, serv_addr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

Listen(listenfd, 128);

client[0].fd = listenfd; //要监听的第一个文件描述符 存入client[0]
client[0].events = POLLIN; //listenfd监听普通读事件

for(i = 1; i < OPEN_MAX; ++i)
client[i].fd = -1; // 用-1初始化client[]中剩余元素 0也是文件描述符,不可用

maxi = 0;

while(1) {
nready = poll(client, maxi+1, -1); // 阻塞监听是否有客户端连接请求
if(client[0].revents & POLLIN) { //listenfd有读事件就绪
clie_len = sizeof(clie_addr);
connfd = Accept(listenfd, (struct sockaddr*)&clie_addr, &clie_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));

for(i = 1; i < OPEN_MAX; ++i)
if(client[i].fd < 0) {
client[i].fd = connfd; //找到client[]中空闲的位置 存放accept返回的connfd
break;
}
if(i == OPEN_MAX) // 达到最大客户端数
perr_exit("too many clients");

if(i > maxi)
maxi = i; // 更新client[]中最大元素下标
if(--nready <= 0)
continue;
}

for(i = 1; i <= maxi; ++i) {
if(client[i].fd < 0)
continue;
if(client[i].revents & POLLIN) {
if((n = Read(sockfd, buf, MAXLINE)) < 0) {
if(errno == ECONNRESET) { // 收到RST标志
printf("client[%d] aborted connection\n", i);
Close(sockfd);
client[i].fd = -1; //poll中不监控该文件描述符 置为-1 不用像select那样移除
} else
perr_exit("read");
} else if(n == 0) { // 说明客户端关闭连接
printf("client[%d], closed connection\n", i);
Close(sockfd);
client[i].fd = -1;
} else {
for(j = 0; j < n; ++j)
buf[j] = toupper(buf[j]);
Writen(sockfd, buf, n);
}

if(--nready <= 0)
break;
}
}
}
return 0;
}
ppoll函数

GNU定义了ppoll(非POSIX标准),可以支持设置信号屏蔽字。

1
2
3
4
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <poll.h>
int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *timeout_ts, const sigset_t *sigmask);

epoll实现

epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

目前epell是linux大规模并发网络程序中的热门首选模型。

epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

可以使用cat命令查看一个进程可以打开的socket描述符上限:cat /proc/sys/fs/file-max

如有需要,可以通过修改配置文件的方式修改该上限值。

1
2
3
4
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536
* hard nofile 100000

image-20220615221830468

基础API
  1. epoll_create函数
  2. epoll_ctl函数
  3. epoll_wait函数

1)创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。

1
2
3
4
#include <sys/epoll.h>
int epoll_create(int size); // size 监听数目
// 内核和根据要监听的文件个数创建一个红黑树结构,并分配内存
// 返回文件描述符(句柄),用来操作内核中的开辟出的红黑树

2)控制某个epoll监控的文件描述符上的事件:注册、修改、删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

/*
fd: 要进行操作的文件描述符
epfd: 为epoll_creat的句柄
op: 表示动作,用3个宏来表示:
EPOLL_CTL_ADD (注册新的fd到epfd),
EPOLL_CTL_MOD (修改已经注册的fd的监听事件),
EPOLL_CTL_DEL (从epfd删除一个fd);
event: 告诉内核需要监听的事件

EPOLLIN: 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT: 表示对应的文件描述符可以写
EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR: 表示对应的文件描述符发生错误
EPOLLHUP: 表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
*/

3)等待所监控文件描述符上有事件的产生,类似于select()调用

1
2
3
4
5
6
7
8
9
10
11
12
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

/*
events: 结构体数组,传出参数。用来存内核得到事件的集合
maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size
timeout: 是超时时间
-1: 阻塞
0: 立即返回,非阻塞
>0: 指定毫秒
返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
*/
使用示例

server.c 小写转大写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <ctype.h>

#include "wrap.h"

#define MAXLINE 8192
#define SERV_PORT 8000
#define OPEN_MAX 5000

int main(int argc, char *argv[])
{
int i, listenfd, connfd, sockfd;
int n, num = 0;
ssize_t nready, efd, res;
char buf[MAXLINE], str[INET_ADDRSTRLEN];
socklen_t clie_len;

struct sockaddr_in clie_addr, serv_addr;
struct epoll_event tep, ep[OPEN_MAX]; //tep: epoll_ctl参数 ep[]:epoll_wait参数

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //端口复用

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

Listen(listenfd, 128);

efd = epoll_create(OPEN_MAX); //创建epoll模型,efd指向红黑树根节点
if(efd == -1)
perr_exit("epoll_create");

tep.events = EPOLLIN; // 指定listenfd监听事件为读
tep.data.fd = listenfd;
res = epoll_ctl(efd, EPOLL_CTL_ADD,listenfd, &tep); // 将listenfd及对应结构体设置到efd操纵的树上
if(res == -1)
perr_exit("epoll_ctl");

for(;;) {
// epoll为server阻塞监听事件 ep为struct epoll_event类型数组 OPEN_MAX为数组容量 -1永久阻塞
nready = epoll_wait(efd, ep, OPEN_MAX, -1);
if(nready == -1)
perr_exit("epoll_wait");

for(i = 0; i < nready; ++i) {
if(!ep[i].events & EPOLLIN) // 如果不是 读事件 , 继续循环
continue;

if(ep[i].data.fd == listenfd) { // 判断满足事件的fd是不是listenfd
clie_len = sizeof(clie_addr);
connfd = Accept(listenfd, (struct sockaddr*)&clie_addr, &clie_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));
printf("cfd %d---client %d\n", connfd, ++num);

tep.events = EPOLLIN;
tep.data.fd = connfd;
res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
if(res == -1)
perr_exit("epoll_ctl");
} else { // 不是listenfd
sockfd = ep[i].data.fd;
n = Read(sockfd, buf, MAXLINE);

if(n == 0) { //读到0,说明客户端关闭连接
res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL); //将该文件描述符从红黑树中删除
if(res == -1)
perr_exit("epoll_ctl");
Close(sockfd); //关闭与该客户端的连接
printf("client[%d] closed connection\n", sockfd);
} else if (n < 0) { //出错
perror("read n < 0");
res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
Close(sockfd);
} else { //实际读到了字节数
for(i = 0; i < n; ++i)
buf[i] = toupper(buf[i]); //转大写
Write(STDOUT_FILENO, buf, n);
Writen(sockfd, buf, n);
}
}
}
}
Close(listenfd);
Close(efd);
return 0;
}

epoll进阶

事件模型

EPOLL事件有两种模型:

  • Edge Triggered (ET)边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
  • Level Triggered (LT)水平触发只要有数据都会触发。

image-20220616150558550

思考如下步骤:

  1. 假定我们已经把一个用来从管道中读取数据的文件描述符(RFD)添加到epoll描述符。

  2. 管道的另一端写入了2KB的数据

  3. 调用epoll_wait,并且它会返回RFD,说明它已经准备好读取操作

  4. 读取1KB的数据

  5. 调用epoll_wait……

在这个过程中,有两种工作模式:ET模式 和 LT模式

ET模式

ET模式即Edge Triggered工作模式。

如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

1) 基于非阻塞文件句柄

2) 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。

LT模式

LT模式即Level Triggered工作模式。

与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。

LT(level triggered):LT是缺省的工作方式,并且同时支持blockno-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

管道示例

  1. 创建一个管道,子进程写数据,父进程读数据
  2. 子进程每次写10B大小的数据,再睡眠10s
  3. 父进程使用epoll监听管道的读端,每次读5B大小的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>

#define MAXLINE 10

int main(int argc, char *argv[])
{
int epfd, i;
int pfd[2]; // epoll和管道
pid_t pid;
char buf[MAXLINE], ch = 'a';

pipe(pfd);
pid = fork();

if(pid == 0) { // 写
close(pfd[0]); //关闭读端
while(1) {
// aaaa\n
for(i = 0; i < MAXLINE/2; ++i)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
// bbbb\n
for(; i < MAXLINE; ++i)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
// aaaa\nbbbb\n
write(pfd[1], buf, sizeof(buf));
printf("child write to pipe-------------\n");
printf("%s", buf);
printf("--------------------------------\n");
sleep(5);
}
close(pfd[1]);
} else if(pid > 0) { //父 读
struct epoll_event event, resevent;
int res, len;

close(pfd[1]); //关闭写端

epfd = epoll_create(1);
event.events = EPOLLIN | EPOLLET; // ET 边沿触发
//event.events = EPOLLIN; // LT 水平触发(默认)
event.data.fd = pfd[0];
epoll_ctl(epfd, EPOLL_CTL_ADD, pfd[0], &event);

while(1) {
res = epoll_wait(epfd, &resevent, 1, -1);
printf("father read from pipe\n");
printf("res %d\n", res);
if(resevent.data.fd == pfd[0]) {
len = read(pfd[0], buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}

close(pfd[0]);
close(epfd);
} else {
perror("fork");
exit(-1);
}
return 0;
}

ET边沿触发模式:子进程写一次aaaa\nbbbb\n, 父进程读一次aaaa\n;……

image-20220616152303401

LT水平触发模式: 子进程写一次aaaa\nbbbb\n, 父进程读一次aaaa\n,父进程再读一次bbbb\n;……

image-20220616152702823

C/S示例

和管道示例类似,子进程换成了客户端,父进程换成了服务器

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <stdio.h>
#include <sys/epoll.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#define MAXLINE 10
#define SERV_PORT 9000

int main()
{
struct sockaddr_in serv_addr, clie_addr;
socklen_t clie_addr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int epfd;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

listen(listenfd, 128);

struct epoll_event event;
struct epoll_event resevent;
int res, len;

epfd = epoll_create(1);
event.events = EPOLLIN | EPOLLET; // ET 边沿触发
//event.events = EPOLLIN; // LT 水平触发 默认

printf("Accept connections ...\n");
clie_addr_len = sizeof(clie_addr);
connfd = accept(listenfd, (struct sockaddr*)&clie_addr, &clie_addr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));

event.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event);

while(1) {
res = epoll_wait(epfd, &resevent, 1, -1);

printf("server read from socket buf\n");
printf("res %d\n", res);
if(resevent.data.fd == connfd) {
len = read(connfd, buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}

close(connfd);
close(listenfd);
close(epfd);
return 0;
}

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <arpa/inet.h>

#define MAXLINE 10
#define SERV_IP "127.0.0.1"
#define SERV_PORT 9000

int main(int argc, char *argv[])
{
int connfd, i;
struct sockaddr_in serv_addr;
char buf[MAXLINE];
char ch = 'a';

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET, SERV_IP, (void*)&serv_addr.sin_addr.s_addr);

connfd = socket(AF_INET, SOCK_STREAM, 0);

connect(connfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

while(1) {
// aaaa\n
for(i = 0; i < MAXLINE/2; ++i)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
// bbbb\n
for(; i < MAXLINE; ++i)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
// aaaa\nbbbb\n
write(connfd, buf, sizeof(buf));
printf("client write to socket buf-------------\n");
printf("%s", buf);
printf("---------------------------------------\n");
sleep(5);
}
close(connfd);
return 0;
}

ET边沿触发模式

image-20220616161411020

LT水平触发模式:

image-20220616162101547

非阻塞读示例

基于网络C/S非阻塞模型的epoll ET触发模式。服务器对套接字非阻塞读,但要加上while循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>

#define MAXLINE 10
#define SERV_PORT 9000

int main()
{
struct sockaddr_in clie_addr, serv_addr;
socklen_t clie_addr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int epfd, flag;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
listen(listenfd, 128);

struct epoll_event event;
struct epoll_event resevent;
int res, len;

epfd = epoll_create(1);

event.events = EPOLLIN | EPOLLET; // ET 边沿触发
//event.events = EPOLLIN
printf("Accepting connections ...\n");
clie_addr_len = sizeof(clie_addr);
connfd = accept(listenfd, (struct sockaddr*)&clie_addr, &clie_addr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));

flag = fcntl(connfd, F_GETFL); // 修改connfd为非阻塞读
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);

event.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event); //将connfd放入监听红黑树
while(1) {
printf("epoll_wait begin\n");
res = epoll_wait(epfd, &resevent, 1, -1); //只监听1个
printf("epoll_wait end res %d\n", res);

if(resevent.data.fd == connfd) {
while((len = read(connfd, buf, MAXLINE/2)) > 0) //非阻塞读 轮询
write(STDOUT_FILENO, buf, len);
}
}

return 0;
}

image-20220616164717253

反应堆模型

libevent 核心思想实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
* epoll基于非阻塞I/O事件驱动
*/
#include <stdio.h>
#include <errno.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <time.h>

#define MAX_EVENTS 1024 //监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080

struct myevent_s {
int fd; //要监听的文件描述符
int events; //对应的监听事件
void *arg; //泛型参数
void (*call_back)(int fd, int events, void *arg); //回调函数
int status; //是否在监听: 1->在红黑树上 0->不在
char buf[BUFLEN];
int len;
long last_active; //记录每次加入红黑树 g_efd 的时间值
};

int g_efd; //全局变量 保存epoll_create的返回值
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组 +1 --> listen fd

void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg);
void eventadd(int efd, int events, struct myevent_s *ev);
void eventdel(int efd, struct myevent_s *ev);
void acceptconn(int lfd, int events, void *arg);
void recvdata(int fd, int events, void *arg);
void senddata(int fd, int events, void *arg);
void initlistensocket(int efd, short port);

int main(int argc, char *argv[])
{
unsigned short port = SERV_PORT;

if(argc == 2)
port = atoi(argv[1]); //使用用户指定端口 未指定则用默认端口

g_efd = epoll_create(MAX_EVENTS+1); //创建红黑树 返回给全局g_efd
if(g_efd <= 0) {
printf("create efd in %s err %s\n", __func__, strerror(errno));
exit(-1);
}

initlistensocket(g_efd, port); //初始化监听socket

struct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组
printf("server running:port[%d]\n", port);

int checkpos = 0, i;

while(1) {
/* 超时验证,每次测试100个链接,不测试listenfd 当客户端60秒内没有和服务器通信,则关闭与该客户端链接 */

long now = time(NULL); //当前时间
for(i = 0; i < 100; ++i, checkpos++) { //一次循环检测100个 checkpos控制检测对象
if(checkpos == MAX_EVENTS)
checkpos = 0;
if(g_events[checkpos].status != 1) //不在红黑树 g_efd 上
continue;

long duration = now - g_events[checkpos].last_active; //客户端不活跃的时间

if(duration >= 60) {
close(g_events[checkpos].fd); //关闭与该客户端链接
printf("[fd=%d\n] timeout\n", g_events[checkpos].fd);
eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树g_efd移除
}
}

/* 监听红黑树g_efd,将满足的事件的文件描述符加至events数组中,1秒没有实践满足,返回0 */
int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
if(nfd < 0) {
perror("epoll_wait error");
break;
}

for(i = 0; i < nfd; ++i) {
/* 使用自定义结构体myevent_s类型指针,接收 联合体data的void *ptr成员 */
struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;

if((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}

/* 退出前释放所有资源 */
return 0;
}

/*创建 socket, 初始化lfd */
void initlistensocket(int efd, short port)
{
int lfd = socket(AF_INET, SOCK_STREAM, 0);
int flag = fcntl(lfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(lfd, F_SETFL, flag); // 将socket设为非阻塞

eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]); //g_events数组的最后一个元素对应lfd

eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);

struct sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(port);

bind(lfd, (struct sockaddr*)&sin, sizeof(sin));

listen(lfd, 128);
}

/* 将结构体 myevent_s 成员变量 初始化 */
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
//memset(ev->buf, 0, sizeof(ev->buf));
//ev->len = 0;
ev->last_active = time(NULL); //调用eventset函数的时间
}

/* 向epoll监听的红黑树 添加一个 文件描述符 */
void eventadd(int efd, int events, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};;
int op;
epv.data.ptr = ev;
epv.events = ev->events = events; // EPOLLIN 或 EPOLLOUT

if(ev->status == 1) { // 已经在红黑树 g_efd 里
op = EPOLL_CTL_MOD; // 修改其属性
} else { // 已经在红黑树中
op = EPOLL_CTL_ADD; // 将其加入红黑树 g_efd, 并将status置1
ev->status = 1;
}

if(epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);
}

/* 从epoll监听的 红黑树中删除一个 文件描述符 */
void eventdel(int efd, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};

if(ev->status != 1) //不在红黑树上
return;

epv.data.ptr = ev;
ev->status = 0; //修改状态
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将ev->fd 摘除
}

/* 当有文件描述符就绪, epoll返回,调用该函数 与客户端建立连接 */
void acceptconn(int lfd, int events, void *arg)
{
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int cfd, i;

if((cfd = accept(lfd, (struct sockaddr*)&cin, &len)) == -1) {
if(errno != EAGAIN && errno != EINTR) {
/* 暂时不做处理 */
}
printf("%s: accept, %s\n", __func__, strerror(errno));
return;
}

do {
for(i = 0; i < MAX_EVENTS; ++i) //从全局数组g_events中找一个空闲元素
if(g_events[i].status == 0) //类似于select中找值为-1的元素
break;

if(i == MAX_EVENTS) {
printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
break; //跳出do while(0) 不执行后续代码
}

int flag = 0;
if((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为阻塞
printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
break;
}

/* 给cfd设置一个 myevent_s结构体 回调函数 设置为 recvdata */

eventset(&g_events[i], cfd, recvdata, &g_events[i]);
eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加红黑树g_efd中 监听读事件
} while(0);

printf("new connect [%s:%d][time:%ld], pos[%d]\n",
inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
}

void recvdata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;

len = recv(fd, ev->buf, sizeof(ev->buf), 0); //读文件描述符,数据存入myevent_s成员buf中

eventdel(g_efd, ev); //将该节点从红黑树上摘除

if(len > 0) {
ev->len = len;
ev->buf[len] = '\0'; //手动添加字符串结束标记
printf("C[%d]:%s\n", fd, ev->buf);

eventset(ev, fd, senddata, ev); //设置该fd对应的回调函数为senddata
eventadd(g_efd, EPOLLOUT, ev); //将fd加入红黑树g_efd中,监听其写事件
} else if(len == 0) {
close(ev->fd);
/* ev-g_events 地址相减得到偏移元素位置 */
printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);
} else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}

void senddata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;

len = send(fd, ev->buf, ev->len, 0); //直接将数据回写给客户端
/*
* printf("fd=%d\tev->buf=%s\tev->len=%d\n", fd, ev->buf, ev->len);
* printf("send len = %d\n", len);
*/

if(len > 0) {
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
eventdel(g_efd, ev); //从红黑树g_efd中移除
eventset(ev, fd, recvdata, ev); //将该fd的回调函数改为recvdata
eventadd(g_efd, EPOLLIN, ev); //重新添加到红黑树上,设为监听读事件
} else {
close(ev->fd); //关闭连接
eventdel(g_efd, ev); //从红黑树g_efd中移除
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
}

线程池

  1. 预先创建阻塞于accept多线程,使用互斥锁上锁保护accept

  2. 预先创建多线程,由主线程调用accept

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_

typedef struct threadpool_t threadpool_t;

threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);

int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);

int threadpool_destroy(threadpool_t *pool);

int threadpool_all_threadnum(threadpool_t *pool);

int threadpool_busy_threadnum(threadpool_t *pool);

#endif

threadpool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>

#include "threadpool.h"

#define DEFAULT_TIME 10 /* 10s检测一次 */
#define MIN_WAIT_TASK_NUM 10 /* 如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池 */
#define DEFAULT_THREAD_VARY 10 /* 每次创建或销毁线程的个数 */
#define true 1
#define false 0

typedef struct {
void *(*function)(void *); /* 函数指针 回调函数 */
void *arg; /* 上面函数的参数 */
} threadpool_task_t;

/* 描述线程池相关信息 */
struct threadpool_t {
pthread_mutex_t lock; /* 用于锁住本结构体 */
pthread_mutex_t thread_counter; /* 记录忙状态线程个数的锁 -- busy_thr_num */
pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

pthread_t *threads; /* 存放线程池中每个线程的tid 数组 */
pthread_t adjust_tid; /* 存管理线程tid */
threadpool_task_t *task_queue; /* 任务队列 */

int min_thr_num; /* 线程池最小线程数 */
int max_thr_num; /* 线程池最大线程数 */
int live_thr_num; /* 当前存活线程个数 */
int busy_thr_num; /* 忙状态线程个数 */
int wait_exit_thr_num; /* 要销毁的线程个数 */

int queue_front; /* task_queue队头下标 */
int queue_rear; /* task_queue队尾下标 */
int queue_size; /* taks_queue队中实际任务数 */
int queue_max_size; /* task_queue队列可容纳任务数上限 */

int shutdown; /* 标志位,线程池使用状态,true或false */
};

void *threadpool_thread(void *threadpool); //销毁线程池
int threadpool_free(threadpool_t *pool); //释放pool资源
void *adjust_thread(void *threadpool); //管理者线程的启动函数,管理线程池的线程数
int is_thread_alive(pthread_t tid); //判断某个线程是否存活

threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do {
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
printf("malloc threadpool fail\n");
break; // 跳出do while
}

pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num; // 存活的线程数 初值=最小线程数
pool->queue_size = 0; // 有0个产品
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false; // 不关闭线程池

/* 根据最大线程上限数,给工作线程数组开辟空间,并清零 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
if(pool->threads == NULL) {
printf("malloc threads fail\n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);

/* 任务队列开辟空间 */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);
if(pool->task_queue == NULL) {
printf("malloc task_queue fail\n");
break;
}

/* 初始化互斥锁、条件变量 */
if(pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond fail\n");
break;
}

/* 启动 min_thr_num 个 work thread */
for(i = 0; i < min_thr_num; ++i) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); // pool指向当前线程池
printf("start thread 0x%x ...\n", (unsigned int)pool->threads[i]);
}

pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void*)pool); // 启动管理者线程

return pool;

} while(0);

threadpool_free(pool); // 前面代码调用失败时,释放pool存储空间
return NULL;
}

/* 向线程池中 添加一个任务 */
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));

/* 队列已经满,调用wait阻塞 */
while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if(pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
}

/* 添加的任务插入尾部 先释放尾部arg */
if(pool->task_queue[pool->queue_rear].arg != NULL) {
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
/* 添加任务到任务队列中 */
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; // 队尾指针移动,模拟环形
pool->queue_size++;

/* 添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程 */
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));

return 0;
}

int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); // 发0号信号,测试线程是否存活
if(kill_rc == ESRCH) {
return false;
}
return true;
}

int threadpool_destroy(threadpool_t *pool)
{
int i;
if(pool == NULL) {
return -1;
}
pool->shutdown = true;

/* 先销毁管理线程 */
pthread_join(pool->adjust_tid, NULL);

for(i = 0; i < pool->live_thr_num; ++i) {
/* 通知所有的空闲线程 */
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for(i = 0; i < pool->live_thr_num; ++i) {
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}

int threadpool_all_threadnum(threadpool_t *pool)
{
int all_threadnum = -1;
pthread_mutex_lock(&(pool->lock));
all_threadnum = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
return all_threadnum;
}

int threadpool_busy_threadnum(threadpool_t *pool)
{
int busy_threadnum = -1;
pthread_mutex_lock(&(pool->thread_counter));
busy_threadnum = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->thread_counter));
return busy_threadnum;
}

/* 管理线程 */
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while(!pool->shutdown) {
sleep(DEFAULT_TIME); // 定时 对线程池管理

pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; // 关注 任务数
int live_thr_num = pool->live_thr_num; // 存活线程数
pthread_mutex_unlock(&(pool->lock));

pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; // 忙着的线程数
pthread_mutex_unlock(&(pool->thread_counter));

// 创建新线程 任务数大于最小线程池个数,且存活的线程数少于最大线程个数时
if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
pthread_mutex_lock(&(pool->lock));
int add = 0;

// 一次增加 DFAULT_THREAD 个线程
for(i = 0; i < pool->max_thr_num && add <DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; ++i) {
if(pool->threads[i] = 0 || !is_thread_alive(pool->threads[i])) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
add++;
pool->live_thr_num++;
}
}

pthread_mutex_unlock(&(pool->lock));
}

// 销毁多余的空闲线程 忙线程x2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时
if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
pthread_mutex_lock(&(pool->lock));
// 一次销毁DEFAULT_THREAD个线程 随机10个
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; // 要销毁的线程数 设置为10
pthread_mutex_unlock(&(pool->lock));

for(i = 0; i < DEFAULT_THREAD_VARY; ++i) {
// 通知处于空闲状态的线程,他们会自行终止
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}


void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;

while(true) {
/* 刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务 */
pthread_mutex_lock(&(pool->lock));

/* queue_size == 0 说明没有任务,调wait阻塞在条件变量上,若有任务,跳过该while */
while((pool->queue_size == 0) && (!pool->shutdown)) {
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

/* 清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程 */
if(pool->wait_exit_thr_num > 0) {
pool->wait_exit_thr_num--;

/* 如果线程池里线程个数大于最小值时可以结束当前线程 */
if(pool->live_thr_num > pool->min_thr_num) {
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}

/* 如果指定了true,要关闭线程池里的每个线程,自行退出处理 */
if(pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_exit(NULL); //线程自行结束
}

/* 从任务队列里获取任务,是一个出队操作 */
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;

pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //出队 模拟环形队列
pool->queue_size--;

/* 通知可以有新任务添加进来 */
pthread_cond_broadcast(&(pool->queue_not_full));

/* 任务取出后,立即将线程池锁 释放 */
pthread_mutex_unlock(&(pool->lock));

/* 执行任务 */
printf("thread 0x%x start working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); // 忙状态线程数变量锁
pool->busy_thr_num++; // 忙状态线程数+1
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); // 执行回调函数

/* 任务结束处理 */
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; // 处理掉一个任务,忙状态线程数-1
pthread_mutex_unlock(&(pool->thread_counter));
}

pthread_exit(NULL);
}

int threadpool_free(threadpool_t *pool)
{
if(pool == NULL) {
return -1;
}

if(pool->task_queue) {
free(pool->task_queue);
}
if(pool->threads) {
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}

#if 1 // 测试代码
/* 线程池中的线程,模拟处理业务 */
void *process(void *arg)
{
printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(), *(int *)arg);
sleep(1);
printf("task %d is end\n", *(int *)arg);
return NULL;
}

int main(void)
{
threadpool_t *thp = threadpool_create(3, 100, 100); // 创建线程池,池里最小3个线程,最大100,队列最大100
printf("pool inited");

int num[20], i;
for(i = 0; i < 20; ++i) {
num[i] = 1;
printf("add task %d\n", i);
threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 */
}
sleep(10); /* 等子线程完成任务 */
threadpool_destroy(thp);

return 0;
}

#endif

UDP服务器

传输层主要应用的协议模型有两种,一种是TCP协议,另外一种则是UDP协议。TCP协议在网络通信中占主导地位,绝大多数的网络通信借助TCP协议完成数据传输。但UDP也是网络通信中不可或缺的重要通信手段。

相较于TCP而言,UDP通信的形式更像是发短信。不需要在数据传输之前建立、维护连接。只专心获取数据就好。省去了三次握手的过程,通信速度可以大大提高,但与之伴随的通信的稳定性和正确率便得不到保证。因此,我们称UDP为“无连接的不可靠报文传递”。

那么与我们熟知的TCP相比,UDP有哪些优点和不足呢?由于无需创建连接,所以UDP开销较小,数据传输速度快,实时性较强。多用于对实时性要求较高的通信场合,如视频会议、电话会议等。但随之也伴随着数据传输不可靠,传输数据的正确率、传输顺序和流量都得不到控制和保证。所以,通常情况下,使用UDP协议进行数据传输,为保证数据的正确性,我们需要在应用层添加辅助校验协议来弥补UDP的不足,以达到数据可靠传输的目的。

与TCP类似的,UDP也有可能出现缓冲区被填满后,再接收数据时丢包的现象。由于它没有TCP滑动窗口的机制,通常采用如下两种方法解决:

1) 服务器应用层设计流量控制,控制发送数据速度。

2) 借助setsockopt函数改变接收缓冲区大小。如:

1
2
3
4
5
#include <sys/socket.h>
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);

int n = 220x1024;
setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &n, sizeof(n));

C/S模型

image-20220618141648371

由于UDP不需要维护连接,程序逻辑简单了很多,但是UDP协议是不可靠的,保证通讯可靠性的机制需要在应用层实现。

编译运行server,在两个终端里各开一个client与server交互,看看server是否具有并发服务的能力。用Ctrl+C关闭server,然后再运行server,看此时client还能否和server联系上。和前面TCP程序的运行结果相比较,体会无连接的含义。

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>

#define SERV_PORT 8000

int main()
{
struct sockaddr_in serv_addr, clie_addr;
socklen_t clie_addr_len;
int sockfd;
char buf[BUFSIZ];
char str[INET_ADDRSTRLEN];
int i, n, ret;

sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd == -1)
perror("socket error\n");

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(SERV_PORT);

ret = bind(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if(ret == -1)
perror("bind error\n");

printf("Accepting connectings ... \n");
while(1) {
clie_addr_len = sizeof(clie_addr);
n = recvfrom(sockfd, buf, BUFSIZ, 0, (struct sockaddr*)&clie_addr, &clie_addr_len);
if(n == -1)
perror("recvfrom error\n");

printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)),
ntohs(clie_addr.sin_port));

for(i = 0; i < n; ++i)
buf[i] = toupper(buf[i]);

n = sendto(sockfd, buf, n, 0, (struct sockaddr*)&clie_addr, sizeof(clie_addr));
if(n == -1)
perror("sendto error\n");

}
close(sockfd);

return 0;
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>

#define SERV_IP "106.13.232.203"
#define SERV_PORT 8000

int main()
{
struct sockaddr_in serv_addr;
int sockfd, n;
char buf[BUFSIZ];

sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd == -1)
perror("sockfd error\n");

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
inet_pton(AF_INET, SERV_IP, &serv_addr.sin_addr);
serv_addr.sin_port = htons(SERV_PORT);

while(fgets(buf, sizeof(buf), stdin) != NULL) {
sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if(n == -1)
perror("sendto error\n");

n = recvfrom(sockfd, buf, sizeof(buf), 0, NULL, 0); //NULL: 不保存对端信息
if(n == -1)
perror("recvfrom error\n");

write(STDOUT_FILENO, buf, n);
}
close(sockfd);
return 0;
}

广播

要设置广播权限

1
2
int flag = 1;
setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &flag, sizeof(flag)); //设置广播权限

使用ifconfig可以查看广播地址

image-20220619133706699

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <string.h>

#define SERV_PORT 8000
#define BROADCAST_IP "192.168.15.255"
#define CLIENT_PORT 9000

int main()
{
int sockfd;
struct sockaddr_in serv_addr, clie_addr;
char buf[BUFSIZ];

// 构造用于UDP通信的套接字
sockfd = socket(AF_INET, SOCK_DGRAM, 0);

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET; // ipv4
serv_addr.sin_port = htons(SERV_PORT);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); //本地任意IP

bind(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

int flag = 1;
setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &flag, sizeof(flag)); //设置广播权限

// 构造 client 地址 IP+端口
bzero(&clie_addr, sizeof(clie_addr));
clie_addr.sin_family = AF_INET;
clie_addr.sin_port = htons(CLIENT_PORT);
inet_pton(AF_INET, BROADCAST_IP, &clie_addr.sin_addr.s_addr);

int i = 0;
while(1) {
sprintf(buf, "Drink %d glasses of water\n", ++i);
//fgets(buf, sizeof(buf), stdin);
sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr*)&clie_addr, sizeof(clie_addr));
sleep(1);
}
close(sockfd);
return 0;
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

#define CLIENT_PORT 9000

int main()
{
struct sockaddr_in local_addr;
int connfd;
ssize_t len;
char buf[BUFSIZ];

//创建socket
connfd = socket(AF_INET, SOCK_DGRAM, 0);

//初始化本地端地址
bzero(&local_addr, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
local_addr.sin_port = htons(CLIENT_PORT);

int ret = bind(connfd, (struct sockaddr*)&local_addr, sizeof(local_addr));
if(ret == 0)
printf("...bind ok ...\n");

while(1) {
len = recvfrom(connfd, buf, sizeof(buf), 0, NULL, 0);
write(STDOUT_FILENO, buf, len);
}
close(connfd);
return 0;
}

多播/组播

组播组可以是永久的也可以是临时的。组播组地址中,有一部分由官方分配的,称为永久组播组。永久组播组保持不变的是它的ip地址,组中的成员构成可以发生变化。永久组播组中成员的数量都可以是任意的,甚至可以为零。那些没有保留下来供永久组播组使用的ip组播地址,可以被临时组播组利用。

224.0.0.0~224.0.0.255 为预留的组播地址(永久组地址),地址224.0.0.0保留不做分配,其它地址供路由协议使用;

224.0.1.0~224.0.1.255 是公用组播地址,可以用于Internet;欲使用需申请。

224.0.2.0~238.255.255.255 为用户可用的组播地址(临时组地址),全网范围内有效;

239.0.0.0~239.255.255.255 为本地管理组播地址,仅在特定的本地范围内有效。

可使用ip addressip ad命令查看网卡编号,如:

image-20220619134050224

if_nametoindex函数可以根据网卡名,获取网卡序号。

获取组播权限

1
2
3
4
5
6
7
#define GROUP "239.0.0.2"

struct ip_mreqn group;
inet_pton(AF_INET, GROUP, &group.imr_multiaddr); /* 设置组地址 */
inet_pton(AF_INET, "0.0.0.0", &group.imr_address); /* 本地任意IP */
group.imr_ifindex = if_nametoindex("eth0"); /* 给出网卡名,转换为对应编号: eth0 --> 编号 命令:ip ad */
setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, &group, sizeof(group)); /* 组播权限 */

加入多播组

1
setsockopt(connfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)); //加入多播组

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <net/if.h>

#define SERV_PORT 8000
#define CLIENT_PORT 9000
#define GROUP "239.0.0.2"

int main()
{
int sockfd;
struct sockaddr_in serv_addr, clie_addr;
char buf[BUFSIZ] = "itcast\n";
struct ip_mreqn group;

sockfd = socket(AF_INET, SOCK_DGRAM, 0); //构造用于UDP通信的套接字

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERV_PORT);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);

bind(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

inet_pton(AF_INET, GROUP, &group.imr_multiaddr); // 设置组地址
inet_pton(AF_INET, "0.0.0.0", &group.imr_address); // 本地任意IP
group.imr_ifindex = if_nametoindex("eth0"); // 给出网卡名,获取为对应编号

setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, &group, sizeof(group)); //设置组播权限

bzero(&clie_addr, sizeof(clie_addr));
clie_addr.sin_family = AF_INET;
inet_pton(AF_INET, GROUP, &clie_addr.sin_addr);
clie_addr.sin_port = htons(CLIENT_PORT);

int i = 0;
while(1) {
sprintf(buf, "itcast %d\n", ++i);
//fgets(buf, sizeof(buf), stdin);
sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr*)&clie_addr, sizeof(clie_addr));
sleep(1);
}
close(sockfd);
return 0;
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <unistd.h>
#include <net/if.h>
#include <arpa/inet.h>
#include <string.h>

#define CLIENT_PORT 9000
#define GROUP "239.0.0.2"

int main()
{
struct sockaddr_in local_addr;
int connfd;
ssize_t len;
char buf[BUFSIZ];

struct ip_mreqn group;

connfd = socket(AF_INET, SOCK_DGRAM, 0);

bzero(&local_addr, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_port = htons(CLIENT_PORT);
local_addr.sin_addr.s_addr = htonl(INADDR_ANY);

bind(connfd, (struct sockaddr*)&local_addr, sizeof(local_addr));

inet_pton(AF_INET, GROUP, &group.imr_multiaddr);
inet_pton(AF_INET, "0.0.0.0", &group.imr_address);
group.imr_ifindex = if_nametoindex("eth0");

setsockopt(connfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)); //加入多播组

while(1) {
len = recvfrom(connfd, buf, sizeof(buf), 0, NULL, 0);
write(STDOUT_FILENO, buf, len);
}
close(connfd);
return 0;
}

本地套接字

socket API原本是为网络通讯设计的,但后来在socket的框架上发展出一种IPC机制,就是UNIX Domain Socket。虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。

UNIX Domain Socket是全双工的,API接口语义丰富,相比其它IPC机制有明显的优越性,目前已成为使用最广泛的IPC机制,比如X Window服务器和GUI程序之间就是通过UNIX Domain Socket通讯的。

使用UNIX Domain Socket的过程和网络socket十分相似,也要先调用socket()创建一个socket文件描述符,address family指定为AF_UNIX,type可以选择SOCK_DGRAM或SOCK_STREAM,protocol参数仍然指定为0即可。

UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。

对比网络套接字地址结构和本地套接字地址结构:

1
2
3
4
5
6
7
8
9
10
struct sockaddr_in {
__kernel_sa_family_t sin_family; /*Address family*/
__be16 sin_port; /* Port number */
struct in_addr sin_addr; /*Internet address */
};

struct sockaddr_un { // 头文件 <sys/un.h>
__kernel_sa_family_t sun_family; /*AF_UNIX*/
char sun_path[UNIX_PATH_MAX]; /* pathname */
};

以下程序将UNIX Domain socket绑定到一个地址。

1
2
size = offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path);
#define offsetof(type, member) ((int)&((type *)0)->MEMBER) // offsetof获取 member在结构体type中的偏移量

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR "serv.socket"

int main(void)
{
int lfd, cfd, len, size, i;
struct sockaddr_un servaddr, cliaddr;
char buf[4096];

lfd = Socket(AF_UNIX, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path,SERV_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* servaddr total len */

unlink(SERV_ADDR); /* 确保bind之前serv.sock文件不存在,bind会创建该文件 */
Bind(lfd, (struct sockaddr *)&servaddr, len); /* 参3不能是sizeof(servaddr) */

Listen(lfd, 20);

printf("Accept ...\n");
while (1) {
len = sizeof(cliaddr);
cfd = Accept(lfd, (struct sockaddr *)&cliaddr, (socklen_t *)&len);

len -= offsetof(struct sockaddr_un, sun_path); /* 得到文件名的长度 */
cliaddr.sun_path[len] = '\0'; /* 确保打印时,没有乱码出现 */

printf("client bind filename %s\n", cliaddr.sun_path);

while ((size = read(cfd, buf, sizeof(buf))) > 0) {
for (i = 0; i < size; i++)
buf[i] = toupper(buf[i]);
write(cfd, buf, size);
}
close(cfd);
}
close(lfd);

return 0;
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stddef.h>

#include "wrap.h"

#define SERV_ADDR "serv.socket"
#define CLIE_ADDR "clie.socket"

int main(void)
{
int cfd, len;
struct sockaddr_un servaddr, cliaddr;
char buf[4096];

cfd = Socket(AF_UNIX, SOCK_STREAM, 0);

bzero(&cliaddr, sizeof(cliaddr));
cliaddr.sun_family = AF_UNIX;
strcpy(cliaddr.sun_path,CLIE_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(cliaddr.sun_path); /* 计算客户端地址结构有效长度 */

unlink(CLIE_ADDR);
Bind(cfd, (struct sockaddr *)&cliaddr, len); /* 客户端也需要bind, 不能依赖自动绑定*/


bzero(&servaddr, sizeof(servaddr)); /* 构造server 地址 */
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path,SERV_ADDR);

len = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path); /* 计算服务器端地址结构有效长度 */

Connect(cfd, (struct sockaddr *)&servaddr, len);

while (fgets(buf, sizeof(buf), stdin) != NULL) {
write(cfd, buf, strlen(buf));
len = read(cfd, buf, sizeof(buf));
write(STDOUT_FILENO, buf, len);
}

close(cfd);

return 0;
}

欢迎关注我的其它发布渠道