一个UDP可读缓冲区不够导致丢包的现象

今天看到一篇写UDP 丢包场景的文章,其中提到如果UDP 缓冲区填满导致丢包的问题,写了个小程序验证了下,确实之前没有细究过,描述如下:

  • 数据报分片重组丢失:UDP 协议本身规定的大小是 64kb,但是在数据链路层有 MTU 的限制,大小大概在 5kb,所以当你发送一个很大的 UDP 包的时候,这个包会在 IP 层进行分片,然后重组。这个过程就有可能导致分片的包丢失。UDP 本身有 CRC 检测机制,会抛弃掉丢失的 UDP 包;

  • UDP 缓冲区填满:当 UDP 的缓冲区已经被填满的时候,接收方还没有处理这部分的 UDP 数据报,这个时候再过来的数据报就没有地方可以存了,自然就都被丢弃了。

    client发送两次UDP数据,第一次 500字节,第二次300字节,server端阻塞模式下接包,第一次recvfrom( 1000 ),收到是 1000,还是500,还是300,还是其他?

    由于UDP通信的有界性,接收到只能是500或300,又由于UDP的无序性和非可靠性,接收到可能是300,也可能是500,也可能一直阻塞在recvfrom调用上,直到超时返回(也就是什么也收不到)。

    第二种情况:在假定数据包是不丢失并且是按照发送顺序按序到达的情况下,server端阻塞模式下接包,先后三次调用:recvfrom( 200),recvfrom( 1000),recvfrom( 1000),接收情况如何呢?
    由于UDP通信的有界性,第一次recvfrom( 200)将接收第一个500字节的数据包,但是因为用户空间buf只有200字节,于是只会返回前面200字节,剩下300字节将丢弃。第二次recvfrom( 1000)将返回300字节,第三次recvfrom( 1000)将会阻塞。


如何解决:

以libevent测试程序为例,在接收到缓冲区有数据的事件后,首先通过如下的方法,或者libevent封装的方法,获取到系统缓冲区中可读数据的大小,然后申请到对应大小的buffer去调用recvfrom方法,否则会出现如上UDP可读缓冲区小余可读数据的情况,导致出现UDP数据读不全的问题!

int ret = ioctl(fd, FIONREAD, &totallen);

或者

static int
get_n_bytes_readable_on_socket(evutil_socket_t fd)
{
#if defined(FIONREAD) && defined(_WIN32)
        unsigned long lng = EVBUFFER_MAX_READ;
        if (ioctlsocket(fd, FIONREAD, &lng) < 0)
                return -1;
        /* Can overflow, but mostly harmlessly. XXXX */
        return (int)lng;
#elif defined(FIONREAD)
        int n = EVBUFFER_MAX_READ;
        if (ioctl(fd, FIONREAD, &n) < 0)
                return -1;
        return n;
#else
        return EVBUFFER_MAX_READ;
#endif
}


服务器收到包之后全部回发给客户端:

[root@localhost sample]# ./test_udp_server

bind() success – [0.0.0.0] [8888]

Read: len [2] – content [bc]

Read: len [4] – content [abcd]

Read: len [4] – content [abcd]

Read: len [17] – content [12345678901111111]

Read: len [13] – content [aaaaaaaaab123]


客户端发送字符给服务器,并输出服务器发回来的字符串:

[root@localhost sample]# ./test_udp_client

set sz:2097152, et recv_buff:0, len:0

set sz:2097152, get snd_buff:0, len:0

bind() success – [11.12.115.239] [8888]

input 0: exit!

input other: send to other msg!

abc

input message to others : 

sendToServer: len [2] – content [bc]

input message to others : Read:count:2 ,but read len [2] – content [bc]

recvfrom() no pending data.: Success

abcd


sendToServer: len [4] – content [abcd]

input message to others : Read:count:4 ,but read len [4] – content [abcd]

recvfrom() no pending data.: Success

abcd


sendToServer: len [4] – content [abcd]

input message to others : Read:count:4 ,but read len [4] – content [abcd]

recvfrom() no pending data.: Success

12345678901111111


sendToServer: len [17] – content [12345678901111111]

input message to others : Read:count:17 ,but read len [10] – content [1234567890

                                                                                 ­]

recvfrom() no pending data.: Success

aaaaaaaaab123


sendToServer: len [13] – content [aaaaaaaaab123]

input message to others : Read:count:13 ,but read len [10] – content [aaaaaaaaab

                                                                                 ­]

recvfrom() no pending data.: Success


服务器的代码:

//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <event.h>
#include <event2/listener.h>

#include <fcntl.h>

#include <pthread.h>

#include <signal.h>

#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>

/*
[root@localhost sample]# gcc -o test_udp_server test_udp_server.c -levent -lpthread
*/
#define SVR_IP "0.0.0.0"
#define SVR_PORT 8888
#define BUF_SIZE 1024

#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)

void read_cb(int fd, short event, void *arg) {
	char buf[BUF_SIZE];
	int len;
	int size = sizeof(struct sockaddr);
	struct sockaddr_in client_addr;

	memset(buf, 0, sizeof(buf));
	len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);

	if (len == -1) {
		perror("recvfrom()");
	} else if (len == 0) {
		printf("Connection Closed\n");
	} else {
		printf("Read: len [%d] – content [%s]\n", len, buf);

		/* Echo */
		sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
	}
}

int set_sock_buf_size(int fd, int sz0)
{
	int sz;

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket rcv size"); 
	}

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket snd size"); 
	}

	return 0;
}

int bind_socket(struct event *ev) {
	int sock_fd;
	int flag = 1;
	struct sockaddr_in sin;

	/* Create endpoint */
	if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		perror("socket()");
		return -1;
	}

	/* Set socket option */
	if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
		perror("setsockopt()");
		return 1;
	}

	/* Set IP, port */
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_addr.s_addr = inet_addr(SVR_IP);
	sin.sin_port = htons(SVR_PORT);

#ifdef IP_RECVERR
		if (sin.sin_family != AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IP_RECVERR");
		}
#endif

#ifdef IPV6_RECVERR
		if (sin.sin_family == AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IPV6_RECVERR");
		}
#endif

    if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
        perror("O_NONBLOCK");
        return -1;
    }
	set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);

	/* Bind */
	if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
		perror("bind()");
		return -1;
	} else {
		printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
	}


	/* Init one event and add to active events */
	event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
	if (event_add(ev, NULL) == -1) {
		printf("event_add() failed\n");
	}

	return 0;
}

int
main(int argc, char **argv)
{
	struct event ev;

	/* Init. event */
	if (event_init() == NULL) {
		printf("event_init() failed\n");
		return -1;
	}

	/* Bind socket */
	if (bind_socket(&ev) != 0) {  
		printf("bind_socket() failed\n");
		return -1;
	}

	/* Enter event loop */
	event_dispatch();

	printf("done\n");
	return 0;
}

客户端的代码:

//event test udp socket server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <event.h>
#include <event2/listener.h>


#include <fcntl.h>

#include <pthread.h> 
#include <signal.h>

#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/resource.h>
/*

[root@localhost sample]# gcc -o test_udp_client test_udp_client.c -levent -lpthread
*/

#define LOCAL_IP "0.0.0.0"
#define LOCAL_PORT 19393

#define SVR_IP "11.12.115.239"
#define SVR_PORT  8888
#define BUF_SIZE 10 //1024

#define UR_CLIENT_SOCK_BUF_SIZE (65536)
#define UR_SERVER_SOCK_BUF_SIZE (UR_CLIENT_SOCK_BUF_SIZE * 32)

void read_cb(int fd, short event, void *arg) {
	char buf[BUF_SIZE];
	int len = 0;
	int totallen = 0;
	int size = sizeof(struct sockaddr);
	struct sockaddr_in client_addr;

	while(1){
        int ret = ioctl(fd, FIONREAD, &totallen);
        if(ret < 0 || !totallen) {
			perror("recvfrom() no pending data.");
        	break;
        }
		memset(buf, 0, sizeof(buf));
		len = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)&client_addr, &size);

		if (len == -1) {
			perror("recvfrom() error");
			break;
		} else if (len == 0) {
			printf("Connection Closed\n");
		} else {
			printf("Read:count:%d ,but read len [%d] – content [%s]\n",totallen, len, buf);

			/* Echo */
			//sendto(fd, buf, len, 0, (struct sockaddr *)&client_addr, size);
		}

	}
}
 

int set_sock_buf_size(int fd, int sz0)
{
	int sz;

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}

	if (sz < 1) {
		perror("Cannot set socket rcv size"); 
	}

        int val=0, len = 0;
        int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&val, &len);
	printf("set sz:%d, et recv_buff:%d, len:%d\r\n", sz, val, len);

	sz = sz0;
	while (sz > 0) {
		if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void*) (&sz), (socklen_t) sizeof(sz)) < 0) {
			sz = sz / 2;
		} else {
			break;
		}
	}
        
	val=0, len = 0;
	ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&val, &len);
	printf("set sz:%d, get snd_buff:%d, len:%d\r\n", sz, val, len);

	if (sz < 1) {
		perror("Cannot set socket snd size"); 
	}

	return 0;
}

void sendToServer(int fd, char* msg, int len, struct sockaddr_in *server){
	printf("sendToServer: len [%d] – content [%s]\n", len, msg);
	sendto(fd, msg, len, 0, (struct sockaddr *)server, sizeof(struct sockaddr_in));
}

typedef struct AAA{
    struct sockaddr_in *server;
    int fd;
}ServerInfo;

//recv user input value
static void * recv_input_thread(void *arg)  {
	if (arg == NULL){
    	return NULL;
	}
    ServerInfo *serverInfo = (ServerInfo *) arg;

	char  input;
	char msg[1024];

	printf("input 0: exit!\n"); 
	printf("input other: send to other msg!\n");

    scanf("%c", &input);
    fflush(stdin);
    if (input == '0'){
    	return NULL;
    }
	do{

	    printf("input message to others : "); 
        memset(msg, 0x00, 1024);
        //scanf("%s", &msg); 
        gets(msg);
        msg[1023] = '\0';
        printf("\n"); 


		sendToServer(serverInfo->fd, msg, strlen(msg), serverInfo->server);
 
	}while(input != '0'); 

	printf("Done!\n"); 
    free(serverInfo);
	return NULL;
}


int bind_socket(struct event *ev, int sock_fd, int local_port) { 
	int flag = 1;
	struct sockaddr_in sin;
  
	/* Set IP, port */
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_addr.s_addr = inet_addr(LOCAL_IP);
	sin.sin_port = local_port;

#ifdef IP_RECVERR
		if (sin.sin_family != AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IP, IP_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IP_RECVERR");
		}
#endif

#ifdef IPV6_RECVERR
		if (sin.sin_family == AF_INET6) {
			int on = 0;
#ifdef TURN_IP_RECVERR
			on = 1;
#endif
			if(setsockopt(sock_fd, IPPROTO_IPV6, IPV6_RECVERR, (void *)&on, sizeof(on))<0)
				perror("IPV6_RECVERR");
		}
#endif

    if (fcntl(sock_fd, F_SETFL, O_NONBLOCK) == -1) {
        perror("O_NONBLOCK");
        return -1;
    }
	set_sock_buf_size(sock_fd, UR_SERVER_SOCK_BUF_SIZE);

	/* Bind */
	if (bind(sock_fd, (struct sockaddr *)&sin, sizeof(struct sockaddr)) < 0) {
		perror("bind()");
		return -1;
	} else {
		printf("bind() success – [%s] [%u]\n", SVR_IP, SVR_PORT);
	}


	/* Init one event and add to active events */
	event_set(ev, sock_fd, EV_READ | EV_PERSIST, &read_cb, NULL);
	if (event_add(ev, NULL) == -1) {
		printf("event_add() failed\n");
	}

	return 0;
}

int
main(int argc, char **argv)
{
	struct event udp_event;

	/* Init. event */
	if (event_init() == NULL) {
		printf("event_init() failed\n");
		return -1;
	} 
 
	int sock_fd;
	int flag = 1;
	struct sockaddr_in server;

	/* Create endpoint */
	if ((sock_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		perror("socket()");
		return -1;
	}

	/* Set socket option */
	if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(int)) < 0) {
		perror("setsockopt()");
		return 1;
	}

	/* Bind socket */
	if (bind_socket(&udp_event, sock_fd, LOCAL_PORT) != 0) {  
		printf("bind_socket() failed\n");
		return -1;
	}

	/* Set IP, port */
	memset(&server, 0, sizeof(server));
	server.sin_family = AF_INET;
	server.sin_addr.s_addr = inet_addr(SVR_IP);
	server.sin_port = htons(SVR_PORT);


	ServerInfo *serverInfo = (ServerInfo *)malloc(sizeof(ServerInfo));
	serverInfo->server = &server;
	serverInfo->fd = sock_fd;

    pthread_t tidp;
	if ((pthread_create(&tidp, NULL, recv_input_thread, (void*)serverInfo)) == -1){
		printf("create error!\n");
		return 1;
	}
 
	/* Enter the event loop; does not return. */
	event_dispatch();
	close(sock_fd);


	printf("done\n");
	return 0;
}

呱牛笔记

请先登录后发表评论
  • 最新评论
  • 总共0条评论