使用异或实现冗余包方式,冗余包大,并且很难抵抗连续丢包,这里尝试用里德所罗门编码,实现丢包测试。
如下代码效果:25个包分一组,能任意恢复其中的3个包,冗余包比率为25%,抗丢包能力为10%左右。
图仅为一个示例,该情况下可能恢复不出来包。
分块分组实现FEC纠错码,就是为了更大范围的保护数据包,在该包丢失情况下,能被恢复出来。这个思路的理解,对代码实现尤为关键。
组包规则:
每个rtp包每4个字节拆分为1组,每组200个字节,所以理论上可以最多50个包一组,这里设置为25个包一组
#define DATA_PACKETS_PER_BLOCK 25 //25个包,能任意恢复3个包
#define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200
#define REAL_BLOCK_SIZE 4 //
#define MAX_RESTORE_PACKET 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 | #include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdint.h> #include <unistd.h> #include <arpa/inet.h> #include <errno.h> #include <time.h> // 引入 libcorrect 库头文件 #include <correct/correct.h> #include <correct/reed-solomon.h> // 定义系统参数 #define RTP_HEADER_SIZE sizeof(RTPHeader) #define FEC_HEADER_SIZE sizeof(FECHeader) #define MAX_PACKET_SIZE 1400 #define FEC_PAYLOAD_TYPE 127 // FEC 专用载荷类型 #define DATA_PACKETS_PER_BLOCK 25 // 每块 25 个 RTP 包 #define FEC_PACKETS_PER_BLOCK 1 // #define TOTAL_PACKETS_PER_BLOCK (DATA_PACKETS_PER_BLOCK + FEC_PACKETS_PER_BLOCK) // 每块总数据包数 #define MAX_BLOCKS_IN_BUFFER 100 // 最大块缓冲区数量 #define RS_PRIMITIVE_POLYNOMIAL 0x11D // 原始多项式 (GF(2^8)) #define RS_FIRST_CONSECUTIVE_ROOT 1 // 第一个连续根 #define RS_GENERATOR_ROOT_GAP 2 // 生成器根间隔 #define RS_NUM_ROOTS 32 // 校验符号数 = M=4) #define MAX_RTP_SIZE MAX_PACKET_SIZE #define BLOCK_SIZE 223 // 保持RS(255,223)标准块 #define SYMBLE_SIZE 32 #define REED_SOLOMON_LEN 255 #define SYMBOL_SIZE MAX_RTP_SIZE // 符号大小=包最大长度 #define K_RS 223 // RS编码消息长度(需K_RS ≥ BLOCK_SIZE) #define M_RS 32 // RS编码校验长度 #define K DATA_PACKETS_PER_BLOCK // 原始包数 #define M FEC_PACKETS_PER_BLOCK // 冗余包数 //#define NUM_BLOCKS 175 //175*8 = 1400 每块 1 个 FEC 包 175*32=5600 = 1400 * 2 //#define REAL_BLOCK_SIZE 8 // //#define NUM_BLOCKS 200 //175*8 = 1400 200*32=6400 //#define REAL_BLOCK_SIZE 7 // #define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200 #define REAL_BLOCK_SIZE 4 // #define MAX_RESTORE_PACKET 3 //#define NUM_BLOCKS 35 // 分块数=35 ,200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节 //40*35 = 1400 //#define REAL_BLOCK_SIZE 40 #define FEC_PACKETS_SIZE NUM_BLOCKS*SYMBLE_SIZE // 定义RS编码块大小 #define RS_MAX_DATA_LENGTH BLOCK_SIZE // // RS(255,223) 分块大小 // RTP 头部结构定义 typedef struct { uint8_t version:2; // 版本号 uint8_t padding:1; // 填充标志 uint8_t extension:1; // 扩展标志 uint8_t cc:4; // CSRC 计数器 uint8_t marker:1; // 标记位 uint8_t payload:7; // 载荷类型 uint16_t seq; // 序列号 uint32_t timestamp; // 时间戳 uint32_t ssrc; // 同步源标识符 // 可选的 CSRC 列表、扩展头等 } __attribute__((packed)) RTPHeader; // FEC 包头部结构(简化版) typedef struct { uint8_t version:2; // RTP 版本 uint8_t padding:1; // 填充标志 uint8_t extension:1; // 扩展标志 uint8_t cc:4; // CSRC 计数器 uint8_t marker:1; // 标记位 uint8_t payload:7; // 载荷类型(FEC 专用) uint16_t seq; // 序列号 uint32_t timestamp; // 时间戳 uint32_t ssrc; // 同步源标识符 uint16_t block_id; // 块 ID uint8_t fec_index; // FEC 包索引 uint8_t original_packet_count; // 原始数据包数量 } __attribute__((packed)) FECHeader; typedef struct { int block_num; // 块号 /* 组合1 200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节 组合2 200个字节一组,25个rtp4个fec包 5600/4 = 1400个字节,每包分成175个块,每块8个字节,一共175个分组 每包175*32个字节 = 5600 字节 *组合2能恢复2个RTP包 */ uint8_t fec_data[NUM_BLOCKS * SYMBLE_SIZE]; } __attribute__((packed)) FECPacket; // 发送缓冲区结构 typedef struct { uint8_t *data; // 数据包数据 int length; // 数据包长度 int seq_num; // 序列号 int timestamp; // 时间戳 int is_fec; // 是否为 FEC 包 int block_id; // 所属块 ID int packet_id; // 在块中的 ID } PacketBuffer; // 编码器上下文 typedef struct { correct_reed_solomon *rs; // libcorrect Reed-Solomon编码器 PacketBuffer packets[DATA_PACKETS_PER_BLOCK]; // 原始数据包 FECPacket fecData[M]; // 内存池: M冗余包 int packet_count; // 当前块中的数据包数量 int next_seq_num; // 下一个序列号 int next_block_id; // 下一个块 ID int ssrc; // 同步源标识符 // 网络相关 int sockfd; // 套接字描述符 struct sockaddr_in dest_addr; // 目标地址 } EncoderContext; // 解码器上下文 typedef struct { correct_reed_solomon *rs; // libcorrect Reed-Solomon解码器 PacketBuffer *received_packets[MAX_BLOCKS_IN_BUFFER]; // 接收的数据包 int block_map[MAX_BLOCKS_IN_BUFFER]; // 块ID映射 int block_packet_count[MAX_BLOCKS_IN_BUFFER]; // 每个块收到的包数量 int block_recovered[MAX_BLOCKS_IN_BUFFER]; // 块是否已恢复 int ssrc; // 同步源标识符 // 网络相关 int sockfd; // 套接字描述符 struct sockaddr_in src_addr; // 源地址 // 统计信息 int total_packets; // 接收的总数据包数 int recovered_packets; // 成功恢复的数据包数 int total_blocks; // 处理的总块数 int recovered_blocks; // 成功恢复的块数 } DecoderContext; void printf_hex(uint8_t *data, int len); // 初始化编码器 EncoderContext* init_encoder( const char * ip, int port) { EncoderContext* ctx = malloc ( sizeof (EncoderContext)); if (!ctx) return NULL; // 初始化libcorrect Reed-Solomon编码器 ctx->rs = correct_reed_solomon_create( RS_PRIMITIVE_POLYNOMIAL, RS_FIRST_CONSECUTIVE_ROOT, RS_GENERATOR_ROOT_GAP, RS_NUM_ROOTS ); if (!ctx->rs) { free (ctx); return NULL; } //int k = correct_reed_solomon_message_length(ctx->rs); //int m = correct_reed_solomon_block_length(ctx->rs); //printf("k:%d, M:%d\r\n", k , m); // 验证参数兼容性 //assert(correct_reed_solomon_message_length(enc->rs) == K); //assert(correct_reed_solomon_block_length(enc->rs) == K+M); ctx->packet_count = 0; ctx->next_seq_num = 0; ctx->next_block_id = 0; ctx->ssrc = rand (); // 创建 UDP 套接字 ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (ctx->sockfd < 0) { perror ( "socket creation failed" ); correct_reed_solomon_destroy(ctx->rs); free (ctx); return NULL; } memset (&ctx->dest_addr, 0, sizeof (ctx->dest_addr)); ctx->dest_addr.sin_family = AF_INET; ctx->dest_addr.sin_port = htons(port); if (inet_pton(AF_INET, ip, &ctx->dest_addr.sin_addr) <= 0) { perror ( "invalid address" ); close(ctx->sockfd); correct_reed_solomon_destroy(ctx->rs); free (ctx); return NULL; } // 初始化数据包缓冲区 for ( int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) { ctx->packets[i].data = NULL; } return ctx; } // 初始化解码器 DecoderContext* init_decoder( int port) { DecoderContext* ctx = malloc ( sizeof (DecoderContext)); if (!ctx) return NULL; // 初始化libcorrect Reed-Solomon解码器(与编码器参数一致) ctx->rs = correct_reed_solomon_create( RS_PRIMITIVE_POLYNOMIAL, RS_FIRST_CONSECUTIVE_ROOT, RS_GENERATOR_ROOT_GAP, RS_NUM_ROOTS ); if (!ctx->rs) { free (ctx); return NULL; } // 初始化块映射和计数器 memset (ctx->block_map, -1, sizeof (ctx->block_map)); memset (ctx->block_packet_count, 0, sizeof (ctx->block_packet_count)); memset (ctx->block_recovered, 0, sizeof (ctx->block_recovered)); ctx->ssrc = 0; // 初始化统计信息 ctx->total_packets = 0; ctx->recovered_packets = 0; ctx->total_blocks = 0; ctx->recovered_blocks = 0; // 创建 UDP 套接字 ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (ctx->sockfd < 0) { perror ( "socket creation failed" ); correct_reed_solomon_destroy(ctx->rs); free (ctx); return NULL; } memset (&ctx->src_addr, 0, sizeof (ctx->src_addr)); ctx->src_addr.sin_family = AF_INET; ctx->src_addr.sin_addr.s_addr = INADDR_ANY; ctx->src_addr.sin_port = htons(port); if (bind(ctx->sockfd, ( const struct sockaddr *)&ctx->src_addr, sizeof (ctx->src_addr)) < 0) { perror ( "bind failed" ); close(ctx->sockfd); correct_reed_solomon_destroy(ctx->rs); free (ctx); return NULL; } // 初始化数据包缓冲区 for ( int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) { ctx->received_packets[i] = (PacketBuffer *) malloc ( sizeof (PacketBuffer)); memset (ctx->received_packets[i], 0x00, sizeof (PacketBuffer)); } return ctx; } // 清理编码器资源 void cleanup_encoder(EncoderContext* ctx) { if (ctx) { // 释放数据包缓冲区 for ( int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) { if (ctx->packets[i].data) { free (ctx->packets[i].data); } } if (ctx->rs) correct_reed_solomon_destroy(ctx->rs); if (ctx->sockfd >= 0) close(ctx->sockfd); free (ctx); } } // 清理解码器资源 void cleanup_decoder(DecoderContext* ctx) { if (ctx) { // 释放数据包缓冲区 for ( int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) { if (ctx->received_packets[i]) { free (ctx->received_packets[i]); } } if (ctx->rs) correct_reed_solomon_destroy(ctx->rs); if (ctx->sockfd >= 0) close(ctx->sockfd); // 打印统计信息 printf ( "解码器统计信息:\n" ); printf ( " 处理的总块数: %d\n" , ctx->total_blocks); printf ( " 成功恢复的块数: %d\n" , ctx->recovered_blocks); printf ( " 接收的总数据包: %d\n" , ctx->total_packets); printf ( " 成功恢复的数据包: %d\n" , ctx->recovered_packets); printf ( " 块恢复成功率: %.2f%%\n" , ( float )ctx->recovered_blocks / ctx->total_blocks * 100); printf ( " 数据包恢复成功率: %.2f%%\n" , ( float )ctx->recovered_packets / (ctx->total_blocks * DATA_PACKETS_PER_BLOCK) * 100); free (ctx); } } // 使用Reed-Solomon码编码并发送RTP包 int encode_and_send(EncoderContext* ctx, const uint8_t* payload, int payload_len, int timestamp) { if (!ctx || !payload || payload_len <= 0) { errno = EINVAL; return -1; } // 确保有效载荷长度不超过最大值 if (payload_len > MAX_PACKET_SIZE ) { payload_len = MAX_PACKET_SIZE; } // 计算所需缓冲区大小 int packet_size = RTP_HEADER_SIZE + payload_len; // 分配或调整数据包缓冲区大小 if (ctx->packets[ctx->packet_count].data == NULL) { ctx->packets[ctx->packet_count].data = malloc (packet_size); if (!ctx->packets[ctx->packet_count].data) { perror ( "内存分配失败" ); return -1; } } else if (ctx->packets[ctx->packet_count].length < packet_size) { ctx->packets[ctx->packet_count].data = realloc (ctx->packets[ctx->packet_count].data, packet_size); if (!ctx->packets[ctx->packet_count].data) { perror ( "内存分配失败" ); return -1; } } // 创建 RTP 头部 RTPHeader header; header.version = 2; header.padding = 0; header.extension = 0; header.cc = 0; header.marker = 0; header.payload = 96; // 假设默认载荷类型 96 header.seq = htons(ctx->next_seq_num++); header.timestamp = htonl(timestamp); header.ssrc = htonl(ctx->ssrc); // 构建完整的 RTP 包 memcpy (ctx->packets[ctx->packet_count].data, &header, RTP_HEADER_SIZE); memcpy (ctx->packets[ctx->packet_count].data + RTP_HEADER_SIZE, payload, payload_len); ctx->packets[ctx->packet_count].length = packet_size; ctx->packets[ctx->packet_count].seq_num = ntohs(header.seq); ctx->packets[ctx->packet_count].timestamp = ntohl(header.timestamp); ctx->packets[ctx->packet_count].is_fec = 0; ctx->packets[ctx->packet_count].block_id = ctx->next_block_id; ctx->packets[ctx->packet_count].packet_id = ctx->packet_count; // 发送原始 RTP 包 ssize_t sent = sendto(ctx->sockfd, ctx->packets[ctx->packet_count].data, ctx->packets[ctx->packet_count].length, 0, ( const struct sockaddr *)&ctx->dest_addr, sizeof (ctx->dest_addr)); if (sent < 0) { perror ( "发送失败" ); return -1; } printf ( "发送原始包 #%d (块 %d/%d), 长度 %d 字节\n" , ctx->packets[ctx->packet_count].seq_num, ctx->next_block_id, ctx->packet_count + 1, payload_len); // 增加包计数 int current_packet = ctx->packet_count++; // 当收集到足够的包时,生成并发送 FEC 包 if (ctx->packet_count == DATA_PACKETS_PER_BLOCK) { generate_and_send_fec(ctx); ctx->packet_count = 0; ctx->next_block_id++; } return current_packet; } // 处理变长包:填充至SYMBOL_SIZE void pad_packet( char *pkt, size_t actual_len) { if (actual_len < SYMBOL_SIZE) { memset (pkt + actual_len, 0, SYMBOL_SIZE - actual_len); } } // 使用Reed-Solomon码生成并发送FEC包 void generate_and_send_fec(EncoderContext* ctx) { int block_id = ctx->next_block_id; uint8_t msg[BLOCK_SIZE]; // 当前块数据(223字节) uint8_t encoded[BLOCK_SIZE + SYMBLE_SIZE]; // 编码结果(255字节) printf ( "generate_and_send_fec NUM_BLOCKS:%d\n" , NUM_BLOCKS); int group_index = 0; //35个块*40 ==1400 //175*8=1400 for ( int block=0; block<NUM_BLOCKS;block++){ int offset = block * REAL_BLOCK_SIZE; int size = REAL_BLOCK_SIZE; memset (msg, 0x00, sizeof (msg)); // 2. 分块处理(每块223字节)-200字节+23字节补0 //每个包取40个字节,5个rtp包 一共200个字节 for ( int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { // 1.准备数据 uint8_t *full_data = ctx->packets[i].data + RTP_HEADER_SIZE; int payload_len = ctx->packets[i].length - RTP_HEADER_SIZE; // 从完整数据中提取当前块 if (block == NUM_BLOCKS - 1){ int reserved = payload_len - REAL_BLOCK_SIZE*block; if (reserved < size){ size = reserved; } } memcpy (( void *)msg+i*REAL_BLOCK_SIZE, ( void *)full_data + offset, size); } // RS编码 ssize_t ret = correct_reed_solomon_encode(ctx->rs, msg, BLOCK_SIZE, encoded); if (ret != REED_SOLOMON_LEN){ printf ( "correct_reed_solomon_encode not ret:%d\r\n" , ret); } if (block == 0){ printf_hex(encoded+BLOCK_SIZE, SYMBLE_SIZE); } FECPacket *hdr = (FECPacket*)&(ctx->fecData[group_index]); hdr->block_num = block; // 标记保护的块 memcpy (hdr->fec_data + block*SYMBLE_SIZE, encoded + BLOCK_SIZE, SYMBLE_SIZE); //SYMBLE_SIZE为32个标记位 } for ( int fec_idx = 0; fec_idx < FEC_PACKETS_PER_BLOCK; fec_idx++) { FECPacket *fecPacket = (FECPacket*)&(ctx->fecData[fec_idx]); // 创建FEC头部 FECHeader fec_header; fec_header.version = 2; fec_header.padding = 0; fec_header.extension = 0; fec_header.cc = 0; fec_header.marker = 0; fec_header.payload = FEC_PAYLOAD_TYPE; fec_header.seq = htons(ctx->next_seq_num++); fec_header.timestamp = htonl(ctx->packets[0].timestamp); fec_header.ssrc = htons(ctx->ssrc); fec_header.block_id = htons(block_id); fec_header.fec_index = fec_idx; fec_header.original_packet_count = DATA_PACKETS_PER_BLOCK; int fec_packet_len = FEC_HEADER_SIZE +NUM_BLOCKS * SYMBLE_SIZE; //sizeof(FECPacket) // 计算FEC包总长度 printf ( "fec_idx:%d, fec_packet_len:%d,sizeof(FECPacket):%d\n" , fec_idx, fec_packet_len, sizeof (FECPacket)); uint8_t* fec_packet = malloc (fec_packet_len); if (!fec_packet) { perror ( "FEC包内存分配失败" ); continue ; } // 复制头部数据 memcpy (fec_packet, ( void *)&fec_header, FEC_HEADER_SIZE); uint8_t *fec_data = fecPacket->fec_data; memcpy ((uint8_t *)(fec_packet+FEC_HEADER_SIZE), fec_data, NUM_BLOCKS * SYMBLE_SIZE); // 发送FEC包 ssize_t sent = sendto(ctx->sockfd, fec_packet, fec_packet_len, 0, ( const struct sockaddr *)&ctx->dest_addr, sizeof (ctx->dest_addr)); if (sent < 0) { perror ( "FEC包发送失败" ); } else { printf ( "发送FEC包 #%d (块 %d/%d), 保护 %d 个RTP包\n" , ntohs(fec_header.seq), block_id, fec_idx + 1, DATA_PACKETS_PER_BLOCK); } free (fec_packet); } // 重置包计数器,准备下一个块 ctx->packet_count = 0; ctx->next_block_id++; } //MAX_PACKET_SIZE + RTP_HEADER_SIZE + 1 #define MAX_UDP_PACKET NUM_BLOCKS*32+120 // 使用Reed-Solomon码接收并解码RTP包 int receive_and_decode(DecoderContext* ctx, uint8_t** output_buffer, int * output_len, int * seq_num) { if (!ctx || !output_buffer || !output_len || !seq_num) { errno = EINVAL; return -1; } uint8_t buffer[MAX_UDP_PACKET]; socklen_t addr_len = sizeof (ctx->src_addr); // 接收数据包 int recv_len = recvfrom(ctx->sockfd, buffer, MAX_UDP_PACKET, 0, ( struct sockaddr *)&ctx->src_addr, &addr_len); if (recv_len < 0) { perror ( "recvfrom failed" ); return -1; } // 解析RTP头部 RTPHeader* rtp_header = (RTPHeader*)buffer; uint16_t packet_seq = ntohs(rtp_header->seq); // 检查是否是FEC包 int is_fec = (rtp_header->payload == FEC_PAYLOAD_TYPE); // 计算块ID和包ID int block_id = 0; int packet_id = 0; if (is_fec) { // 解析FEC头部 FECHeader* fec_header = (FECHeader*)buffer; packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK; block_id = ntohs(fec_header->block_id); // 计算FEC包在块中的索引 int fec_index = fec_header->fec_index; printf ( "接收到FEC包 #%d (块 %d) %d len:%d\n" , packet_seq, block_id, fec_index, recv_len); } else { // 原始 RTP 包 block_id = packet_seq / TOTAL_PACKETS_PER_BLOCK; packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK; printf ( "接收到原始包 #%d (块 %d/%d) len:%d\n" , packet_seq, block_id, packet_id, recv_len ); } // 查找或分配块缓冲区 int block_index = packet_id; // 验证块ID是否有效 if (block_index < 0 || block_index >= MAX_BLOCKS_IN_BUFFER) { printf ( "警告: 无效的包块ID %d\n" , block_id); return 0; } // 保存数据包,只缓存一组 int group_index = 0; PacketBuffer* packet = ctx->received_packets[block_index]; // 分配或调整缓冲区大小 if (packet->data == NULL) { packet->data = malloc ( MAX_UDP_PACKET); // if (!packet->data) { perror ( "内存分配失败" ); return -1; } } else if (packet->length < recv_len) { packet->data = realloc (packet->data, recv_len); if (!packet->data) { perror ( "[2]内存分配失败" ); return -1; } } memcpy (packet->data, buffer, recv_len); packet->length = recv_len; packet->seq_num = packet_seq; packet->timestamp = ntohl(rtp_header->timestamp); packet->is_fec = is_fec; packet->block_id = block_id; packet->packet_id = packet_id; // 更新接收计数 ctx->block_packet_count[block_index]++; // 更新统计信息 ctx->total_packets++; // 尝试恢复块数据 if (block_index >= DATA_PACKETS_PER_BLOCK) { int recovered = recover_lost_packets(ctx, block_id); if (recovered) { ctx->block_recovered[block_index] = 1; ctx->recovered_blocks++; ctx->recovered_packets += DATA_PACKETS_PER_BLOCK; printf ( "成功恢复块 %d 的所有数据包\n" , block_id); // 返回第一个可用的原始数据包 for ( int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) { if (ctx->received_packets[i]->data) { packet = ctx->received_packets[i]; *output_buffer = ctx->received_packets[i]->data + RTP_HEADER_SIZE; *output_len = ctx->received_packets[i]->length - RTP_HEADER_SIZE; *seq_num = ctx->received_packets[i]->seq_num; // 标记为已处理 free (ctx->received_packets[i]->data); ctx->received_packets[i]->data = NULL; memset (( void *)packet, 0x00 , sizeof (packet)); } } } } return 0; // 没有可用的完整数据包 } void printf_hex(uint8_t *data, int len){ printf ( "*****print len:%d***\n" , len); for ( int i=0; i<len; i++){ printf ( "%02x" , data[i]); } printf ( "******end**\n" ); } // 使用Reed-Solomon码恢复丢失的数据包 int recover_lost_packets(DecoderContext* ctx, int block_index) { int group_index = 0; int missing_packets = 0; int received[TOTAL_PACKETS_PER_BLOCK] = {0}; int erasures[BLOCK_SIZE] = {0}, num_erasures = 0; int random = rand () % (TOTAL_PACKETS_PER_BLOCK -1); int random2 = rand () % (TOTAL_PACKETS_PER_BLOCK -1); // random+1;// int random3 = rand () % (TOTAL_PACKETS_PER_BLOCK -1); //random = random2; printf ( "尝试恢复块 %d,%d,%d: %d 的数据包...\n" , random, random2, random3, block_index); // 检查是否有FEC包可用 int available_fec_packets = 0; int available_fec_packet_index = 0; for ( int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) { PacketBuffer* packet = ctx->received_packets[i]; if (packet->data){ if (random == i || random2 == i || random3 == i){ received[i] = 0; uint8_t *data = packet->data+RTP_HEADER_SIZE; printf ( "test lost:%d ,orign data \n" , i); printf_hex(data, packet->length - RTP_HEADER_SIZE); missing_packets++; } else { received[i] = 1; if (packet->is_fec) { available_fec_packets++; available_fec_packet_index = i; //printf("fec lost:%d ,orign data \n", i); //printf_hex(packet->data+FEC_HEADER_SIZE, 32); } } } else { received[i] = 0; } } // 如果FEC包全丢,不进行恢复 if (available_fec_packets == 0) { printf ( "错误: 没有可用的FEC包,无法恢复\n" ); return 0; } printf ( "块 %d 有 %d 个丢失的数据包,%d 个可用的FEC包,fec_i:%d\n" , block_index, missing_packets, available_fec_packets, available_fec_packet_index); // 检查是否有足够的FEC包来恢复 if (missing_packets > MAX_RESTORE_PACKET) { printf ( "错误: 可用FEC包不足,无法恢复 (需要至少 %d 个,实际 %d 个)\n" , missing_packets, available_fec_packets); return 0; } uint8_t symbols[REED_SOLOMON_LEN], restored[REED_SOLOMON_LEN]; PacketBuffer* fec_packet = ctx->received_packets[available_fec_packet_index] ; FECHeader* fec_header = (FECHeader*)fec_packet; // 1. 分块恢复(共35块,每块40个字节) // 2. 分块恢复(共175块,每块8个字节) for ( int block=0; block<NUM_BLOCKS; block++) { int offset = block * REAL_BLOCK_SIZE; memset (symbols, 0, REED_SOLOMON_LEN); memset (restored, 0, REED_SOLOMON_LEN); memset (erasures, 0, BLOCK_SIZE); num_erasures = 0; // 组包1:收集当前块的原始数据 5个包,每个包40个字节 // 组包2:收集当前块的原始数据 175个包,每个包8个字节 for ( int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (received[i] ) { uint8_t *data = ctx->received_packets[i]->data; uint8_t *full_data = data + RTP_HEADER_SIZE; memcpy (symbols + i*REAL_BLOCK_SIZE, full_data + offset, REAL_BLOCK_SIZE); } else { for ( int j=0; j<REAL_BLOCK_SIZE; j++){ erasures[num_erasures++] = i*REAL_BLOCK_SIZE+j; } //num_erasures += REAL_BLOCK_SIZE; } } // 收集当前块的校验数据(1个FEC包) memcpy (symbols + BLOCK_SIZE , ( void *)fec_packet->data + FEC_HEADER_SIZE + block*SYMBLE_SIZE, SYMBLE_SIZE); if (block == 0){ printf ( "***************num_erasures:%d data:" ,num_erasures); for ( int j=0; j<num_erasures; j++){ printf ( "%d," ,erasures[j]); } printf ( "****\n" ,num_erasures); printf_hex(symbols, REED_SOLOMON_LEN); } // RS解码 int success = correct_reed_solomon_decode_with_erasures( ctx->rs, symbols, REED_SOLOMON_LEN, erasures, num_erasures, restored ); if (block == 0){ printf_hex(restored, REED_SOLOMON_LEN); printf ( "#########success:%d#############\n" ,success); } // 3. 回写恢复的原始数据 if (success) { for ( int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (!received[i]) { PacketBuffer *packetbuffer = ctx->received_packets[i] ; if (packetbuffer && packetbuffer->data == NULL){ packetbuffer->data = (uint8_t *) malloc (MAX_PACKET_SIZE + RTP_HEADER_SIZE); } uint8_t *full_data = packetbuffer->data + RTP_HEADER_SIZE; memcpy (full_data + offset, restored + i*REAL_BLOCK_SIZE, REAL_BLOCK_SIZE); } } } } for ( int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (!received[i]) { int missing_idx= i; if (ctx->received_packets[missing_idx]->data == NULL){ ctx->received_packets[missing_idx]->data = (uint8_t *) malloc (RTP_HEADER_SIZE+MAX_PACKET_SIZE); } // 创建恢复包的头部 RTPHeader* recovered_header = (RTPHeader*)ctx->received_packets[missing_idx]->data; // 复制RTP头部(从第一个可用的包复制) RTPHeader* header = NULL; for ( int k = 0; k < DATA_PACKETS_PER_BLOCK; k++) { if (ctx->received_packets[k]->data) { header = (RTPHeader*)ctx->received_packets[k]->data; break ; } } *recovered_header = *header; ctx->received_packets[missing_idx]->length = RTP_HEADER_SIZE + MAX_PACKET_SIZE; ctx->received_packets[missing_idx]->seq_num = recovered_header->seq; ctx->received_packets[missing_idx]->timestamp = ntohl(recovered_header->timestamp); ctx->received_packets[missing_idx]->is_fec = 0; ctx->received_packets[missing_idx]->block_id = fec_header->block_id; ctx->received_packets[missing_idx]->packet_id = missing_idx; uint8_t *data = ctx->received_packets[missing_idx]->data + RTP_HEADER_SIZE; printf_hex(data, MAX_PACKET_SIZE); printf ( "成功恢复块 %d 的数据包 #%d\n" , fec_header->block_id, i); } } printf ( "块 %d 的恢复尝试完成: 成功\n" , fec_header->block_id); return 1; } // 编码器主函数示例 void encoder_main( const char * dest_ip, int dest_port) { EncoderContext* ctx = init_encoder(dest_ip, dest_port); if (!ctx) { printf ( "编码器初始化失败\n" ); return ; } printf ( "编码器开始发送数据到 %s:%d...\n" , dest_ip, dest_port); // 模拟发送 100 个 RTP 包 for ( int i = 0; i < 100; i++) { // 创建随机数据作为载荷 uint8_t payload[MAX_PACKET_SIZE]; for ( int j = 0; j < sizeof (payload); j++) { payload[j] = rand () % 256; } // 编码并发送 encode_and_send(ctx, payload, sizeof (payload), i * 90); // 控制发送速率 usleep(10000); // 10ms } cleanup_encoder(ctx); } // 解码器主函数示例 void decoder_main( int listen_port) { DecoderContext* ctx = init_decoder(listen_port); if (!ctx) { printf ( "解码器初始化失败\n" ); return ; } printf ( "解码器开始在端口 %d 接收数据...\n" , listen_port); uint8_t* output_buffer = NULL; int output_len = 0; int seq_num = 0; // 持续接收和解码数据包 while (1) { int result = receive_and_decode(ctx, &output_buffer, &output_len, &seq_num); if (result > 0) { printf ( "处理接收到的数据包 #%d, 长度 %d 字节\n" , seq_num, output_len); // 这里可以处理接收到的数据,例如写入文件或播放 // 示例中只是简单打印信息 // 注意:output_buffer 指向的是 ctx 内部缓冲区, // 在下一次调用 receive_and_decode 前保持有效 } else if (result < 0) { printf ( "接收数据时发生错误\n" ); break ; } // 控制循环速率 usleep(1000); // 1ms } cleanup_decoder(ctx); } int main( int argc, char * argv[]) { // 初始化随机数生成器 srand ( time (NULL)); if (argc < 2) { printf ( "用法: %s [encoder|decoder] [参数...]\n" , argv[0]); printf ( "编码器: %s encoder <目标IP> <目标端口>\n" , argv[0]); printf ( "解码器: %s decoder <监听端口>\n" , argv[0]); return 1; } if ( strcmp (argv[1], "encoder" ) == 0) { if (argc < 4) { printf ( "编码器需要目标IP和端口参数\n" ); return 1; } const char * dest_ip = argv[2]; int dest_port = atoi (argv[3]); encoder_main(dest_ip, dest_port); } else if ( strcmp (argv[1], "decoder" ) == 0) { if (argc < 3) { printf ( "解码器需要监听端口参数\n" ); return 1; } int listen_port = atoi (argv[2]); decoder_main(listen_port); } else { printf ( "未知命令: %s\n" , argv[1]); return 1; } return 0; } |
测试:
接收端:
./testc decoder 5000
发送端:
./testc encoder 127.0.0.1 5000
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com