今天看到一篇写UDP 丢包场景的文章,其中提到如果UDP 缓冲区填满导致丢包的问题,写了个小程序验证了下,确实之前没有细究过,描述如下:
数据报分片重组丢失:UDP 协议本身规定的大小是 64kb,但是在数据链路层有 MTU 的限制,大小大概在 5kb,所以当你发送一个很大的 UDP 包的时候,这个包会在 IP 层进行分片,然后重组。这个过程就有可能导致分片的包丢失。UDP 本身有 CRC 检测机制,会抛弃掉丢失的 UDP 包;
UDP 缓冲区填满:当 UDP 的缓冲区已经被填满的时候,接收方还没有处理这部分的 UDP 数据报,这个时候再过来的数据报就没有地方可以存了,自然就都被丢弃了。
client发送两次UDP数据,第一次 500字节,第二次300字节,server端阻塞模式下接包,第一次recvfrom( 1000 ),收到是 1000,还是500,还是300,还是其他?
由于UDP通信的有界性,接收到只能是500或300,又由于UDP的无序性和非可靠性,接收到可能是300,也可能是500,也可能一直阻塞在recvfrom调用上,直到超时返回(也就是什么也收不到)。
第二种情况:在假定数据包是不丢失并且是按照发送顺序按序到达的情况下,server端阻塞模式下接包,先后三次调用:recvfrom( 200),recvfrom( 1000),recvfrom( 1000),接收情况如何呢?
由于UDP通信的有界性,第一次recvfrom( 200)将接收第一个500字节的数据包,但是因为用户空间buf只有200字节,于是只会返回前面200字节,剩下300字节将丢弃。第二次recvfrom( 1000)将返回300字节,第三次recvfrom( 1000)将会阻塞。
如何解决:
以libevent测试程序为例,在接收到缓冲区有数据的事件后,首先通过如下的方法,或者libevent封装的方法,获取到系统缓冲区中可读数据的大小,然后申请到对应大小的buffer去调用recvfrom方法,否则会出现如上UDP可读缓冲区小余可读数据的情况,导致出现UDP数据读不全的问题!
int ret = ioctl(fd, FIONREAD, &totallen);
或者
static int get_n_bytes_readable_on_socket(evutil_socket_t fd) { #if defined(FIONREAD) && defined(_WIN32) unsigned long lng = EVBUFFER_MAX_READ; if (ioctlsocket(fd, FIONREAD, &lng) < 0) return -1; /* Can overflow, but mostly harmlessly. XXXX */ return (int)lng; #elif defined(FIONREAD) int n = EVBUFFER_MAX_READ; if (ioctl(fd, FIONREAD, &n) < 0) return -1; return n; #else return EVBUFFER_MAX_READ; #endif }
服务器收到包之后全部回发给客户端:
[root@localhost sample]# ./test_udp_server
bind() success – [0.0.0.0] [8888]
Read: len [2] – content [bc]
Read: len [4] – content [abcd]
Read: len [4] – content [abcd]
Read: len [17] – content [12345678901111111]
Read: len [13] – content [aaaaaaaaab123]
客户端发送字符给服务器,并输出服务器发回来的字符串:
[root@localhost sample]# ./test_udp_client
set sz:2097152, et recv_buff:0, len:0
set sz:2097152, get snd_buff:0, len:0
bind() success – [11.12.115.239] [8888]
input 0: exit!
input other: send to other msg!
abc
input message to others :
sendToServer: len [2] – content [bc]
input message to others : Read:count:2 ,but read len [2] – content [bc]
recvfrom() no pending data.: Success
abcd
sendToServer: len [4] – content [abcd]
input message to others : Read:count:4 ,but read len [4] – content [abcd]
recvfrom() no pending data.: Success
abcd
sendToServer: len [4] – content [abcd]
input message to others : Read:count:4 ,but read len [4] – content [abcd]
recvfrom() no pending data.: Success
12345678901111111
sendToServer: len [17] – content [12345678901111111]
input message to others : Read:count:17 ,but read len [10] – content [1234567890
]
recvfrom() no pending data.: Success
aaaaaaaaab123
sendToServer: len [13] – content [aaaaaaaaab123]
input message to others : Read:count:13 ,but read len [10] – content [aaaaaaaaab
]
recvfrom() no pending data.: Success
服务器的代码:
//event test udp socket server #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <event.h> #include <event2/listener.h> #include <fcntl.h> #include <pthread.h> #include <signal.h> #include <sys/types.h> #include <sys/time.h> #include <sys/stat.h> #include <sys/resource.h> /* [root@localhost sample]# gcc -o test_udp_server test_udp_server.c -levent -lpthread */ #define SVR_IP "0.0.0.0" #define SVR_PORT 8888 #define BUF_SIZE 1024 #define UR_CLIENT_SOCK_BUF_SIZE (65536) #define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32) void read_cb(int fd, short event, void *arg) { char buf[BUF_SIZE]; int len; int size = sizeof(struct sockaddr); struct sockaddr_in client_addr; memset(buf, 0, sizeof(buf)); len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size); if (len == -1) { perror("recvfrom()"); } else if (len == 0) { printf("Connection Closed\n"); } else { printf("Read: len [%d] – content [%s]\n", len, buf); /* Echo */ sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size); } } int set_sock_buf_size(int fd, int sz0) { int sz; sz = sz0; while (sz > 0) { if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) { sz = sz / 2; } else { break; } } if (sz < 1) { perror("Cannot set socket rcv size"); } sz = sz0; while (sz > 0) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) { sz = sz / 2; } else { break; } } if (sz < 1) { perror("Cannot set socket snd size"); } return 0; } int bind_socket(struct event *ev) { int sock_fd; int flag = 1; struct sockaddr_in sin; /* Create endpoint */ if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { perror("socket()"); return -1; } /* Set socket option */ if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) { perror("setsockopt()"); return 1; } /* Set IP, port */ memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = inet_addr(SVR_IP); sin.sin_port = htons(SVR_PORT); #ifdef IP_RECVERR if (sin.sin_family != AF_INET6) { int on = 0; #ifdef TURN_IP_RECVERR on = 1; #endif if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0) perror("IP_RECVERR"); } #endif #ifdef IPV6_RECVERR if (sin.sin_family == AF_INET6) { int on = 0; #ifdef TURN_IP_RECVERR on = 1; #endif if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0) perror("IPV6_RECVERR"); } #endif if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) { perror("O_NONBLOCK"); return -1; } set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE); /* Bind */ if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) { perror("bind()"); return -1; } else { printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT); } /* Init one event and add to active events */ event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL); if (event_add(ev, NULL) == -1) { printf("event_add() failed\n"); } return 0; } int main(int argc, char **argv) { struct event ev; /* Init. event */ if (event_init() == NULL) { printf("event_init() failed\n"); return -1; } /* Bind socket */ if (bind_socket(&ev) != 0) { printf("bind_socket() failed\n"); return -1; } /* Enter event loop */ event_dispatch(); printf("done\n"); return 0; }
客户端的代码:
//event test udp socket server #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <event.h> #include <event2/listener.h> #include <fcntl.h> #include <pthread.h> #include <signal.h> #include <sys/ioctl.h> #include <sys/types.h> #include <sys/time.h> #include <sys/stat.h> #include <sys/resource.h> /* [root@localhost sample]# gcc -o test_udp_client test_udp_client.c -levent -lpthread */ #define LOCAL_IP "0.0.0.0" #define LOCAL_PORT 19393 #define SVR_IP "11.12.115.239" #define SVR_PORT 8888 #define BUF_SIZE 10 //1024 #define UR_CLIENT_SOCK_BUF_SIZE (65536) #define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32) void read_cb(int fd, short event, void *arg) { char buf[BUF_SIZE]; int len = 0; int totallen = 0; int size = sizeof(struct sockaddr); struct sockaddr_in client_addr; while(1){ int ret = ioctl(fd, FIONREAD, &totallen); if(ret < 0 || !totallen) { perror("recvfrom() no pending data."); break; } memset(buf, 0, sizeof(buf)); len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size); if (len == -1) { perror("recvfrom() error"); break; } else if (len == 0) { printf("Connection Closed\n"); } else { printf("Read:count:%d ,but read len [%d] – content [%s]\n",totallen, len, buf); /* Echo */ //sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size); } } } int set_sock_buf_size(int fd, int sz0) { int sz; sz = sz0; while (sz > 0) { if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) { sz = sz / 2; } else { break; } } if (sz < 1) { perror("Cannot set socket rcv size"); } int val=0, len = 0; int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&val, &len); printf("set sz:%d, et recv_buff:%d, len:%d\r\n", sz, val, len); sz = sz0; while (sz > 0) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) { sz = sz / 2; } else { break; } } val=0, len = 0; ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&val, &len); printf("set sz:%d, get snd_buff:%d, len:%d\r\n", sz, val, len); if (sz < 1) { perror("Cannot set socket snd size"); } return 0; } void sendToServer(int fd, char* msg, int len, struct sockaddr_in *server){ printf("sendToServer: len [%d] – content [%s]\n", len, msg); sendto(fd, msg, len, 0, (struct sockaddr *)server, sizeof(struct sockaddr_in)); } typedef struct AAA{ struct sockaddr_in *server; int fd; }ServerInfo; //recv user input value static void * recv_input_thread(void *arg) { if (arg == NULL){ return NULL; } ServerInfo *serverInfo = (ServerInfo *) arg; char input; char msg[1024]; printf("input 0: exit!\n"); printf("input other: send to other msg!\n"); scanf("%c", &input); fflush(stdin); if (input == '0'){ return NULL; } do{ printf("input message to others : "); memset(msg, 0x00, 1024); //scanf("%s", &msg); gets(msg); msg[1023] = '\0'; printf("\n"); sendToServer(serverInfo->fd, msg, strlen(msg), serverInfo->server); }while(input != '0'); printf("Done!\n"); free(serverInfo); return NULL; } int bind_socket(struct event *ev, int sock_fd, int local_port) { int flag = 1; struct sockaddr_in sin; /* Set IP, port */ memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = inet_addr(LOCAL_IP); sin.sin_port = local_port; #ifdef IP_RECVERR if (sin.sin_family != AF_INET6) { int on = 0; #ifdef TURN_IP_RECVERR on = 1; #endif if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0) perror("IP_RECVERR"); } #endif #ifdef IPV6_RECVERR if (sin.sin_family == AF_INET6) { int on = 0; #ifdef TURN_IP_RECVERR on = 1; #endif if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0) perror("IPV6_RECVERR"); } #endif if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) { perror("O_NONBLOCK"); return -1; } set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE); /* Bind */ if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) { perror("bind()"); return -1; } else { printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT); } /* Init one event and add to active events */ event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL); if (event_add(ev, NULL) == -1) { printf("event_add() failed\n"); } return 0; } int main(int argc, char **argv) { struct event udp_event; /* Init. event */ if (event_init() == NULL) { printf("event_init() failed\n"); return -1; } int sock_fd; int flag = 1; struct sockaddr_in server; /* Create endpoint */ if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { perror("socket()"); return -1; } /* Set socket option */ if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) { perror("setsockopt()"); return 1; } /* Bind socket */ if (bind_socket(&udp_event, sock_fd, LOCAL_PORT) != 0) { printf("bind_socket() failed\n"); return -1; } /* Set IP, port */ memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_addr.s_addr = inet_addr(SVR_IP); server.sin_port = htons(SVR_PORT); ServerInfo *serverInfo = (ServerInfo *)malloc(sizeof(ServerInfo)); serverInfo->server = &server; serverInfo->fd = sock_fd; pthread_t tidp; if ((pthread_create(&tidp, NULL, recv_input_thread, (void*)serverInfo)) == -1){ printf("create error!\n"); return 1; } /* Enter the event loop; does not return. */ event_dispatch(); close(sock_fd); printf("done\n"); return 0; }
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com