使用libcorrect 库,实现视频FEC过程记录


使用异或实现冗余包方式,冗余包大,并且很难抵抗连续丢包,这里尝试用里德所罗门编码,实现丢包测试。

如下代码效果:25个包分一组,能任意恢复其中的3个包,冗余包比率为25%,抗丢包能力为10%左右。


呱牛笔记

图仅为一个示例,该情况下可能恢复不出来包。

分块分组实现FEC纠错码,就是为了更大范围的保护数据包,在该包丢失情况下,能被恢复出来。这个思路的理解,对代码实现尤为关键。


组包规则:

每个rtp包每4个字节拆分为1组,每组200个字节,所以理论上可以最多50个包一组,这里设置为25个包一组

#define DATA_PACKETS_PER_BLOCK 25  //25个包,能任意恢复3个包

#define NUM_BLOCKS 350    //350*8 = 1400   350*32=11200  

#define REAL_BLOCK_SIZE 4 // 

#define MAX_RESTORE_PACKET 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <errno.h>
#include <time.h>
 
// 引入 libcorrect 库头文件
#include <correct/correct.h>
#include <correct/reed-solomon.h>
 
// 定义系统参数
#define RTP_HEADER_SIZE sizeof(RTPHeader)
#define FEC_HEADER_SIZE sizeof(FECHeader)
#define MAX_PACKET_SIZE 1400
#define FEC_PAYLOAD_TYPE 127  // FEC 专用载荷类型
#define DATA_PACKETS_PER_BLOCK 25  // 每块 25 个 RTP 包
#define FEC_PACKETS_PER_BLOCK 1    // 
#define TOTAL_PACKETS_PER_BLOCK (DATA_PACKETS_PER_BLOCK + FEC_PACKETS_PER_BLOCK) // 每块总数据包数
#define MAX_BLOCKS_IN_BUFFER 100   // 最大块缓冲区数量
#define RS_PRIMITIVE_POLYNOMIAL 0x11D  // 原始多项式 (GF(2^8))
#define RS_FIRST_CONSECUTIVE_ROOT 1   // 第一个连续根
#define RS_GENERATOR_ROOT_GAP 2       // 生成器根间隔
#define RS_NUM_ROOTS 32 // 校验符号数 = M=4)
 
#define MAX_RTP_SIZE MAX_PACKET_SIZE
  
#define BLOCK_SIZE 223           // 保持RS(255,223)标准块
#define SYMBLE_SIZE 32
#define REED_SOLOMON_LEN 255
#define SYMBOL_SIZE MAX_RTP_SIZE // 符号大小=包最大长度
 
#define K_RS            223     // RS编码消息长度(需K_RS ≥ BLOCK_SIZE)
#define M_RS            32      // RS编码校验长度
#define K DATA_PACKETS_PER_BLOCK       // 原始包数
#define M FEC_PACKETS_PER_BLOCK        // 冗余包数
//#define NUM_BLOCKS 175    //175*8 = 1400   每块 1 个 FEC 包  175*32=5600 = 1400 * 2
//#define REAL_BLOCK_SIZE 8 // 
 
//#define NUM_BLOCKS 200    //175*8 = 1400   200*32=6400  
//#define REAL_BLOCK_SIZE 7 // 
 
#define NUM_BLOCKS 350    //350*8 = 1400   350*32=11200  
#define REAL_BLOCK_SIZE 4 // 
#define MAX_RESTORE_PACKET 3
//#define NUM_BLOCKS 35 // 分块数=35 ,200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
 //40*35 = 1400
//#define REAL_BLOCK_SIZE 40
#define FEC_PACKETS_SIZE NUM_BLOCKS*SYMBLE_SIZE
// 定义RS编码块大小
#define RS_MAX_DATA_LENGTH BLOCK_SIZE  // // RS(255,223) 分块大小
 
// RTP 头部结构定义
typedef struct {
    uint8_t version:2;     // 版本号
    uint8_t padding:1;     // 填充标志
    uint8_t extension:1;   // 扩展标志
    uint8_t cc:4;          // CSRC 计数器
    uint8_t marker:1;      // 标记位
    uint8_t payload:7;     // 载荷类型
    uint16_t seq;          // 序列号
    uint32_t timestamp;    // 时间戳
    uint32_t ssrc;         // 同步源标识符
    // 可选的 CSRC 列表、扩展头等
} __attribute__((packed)) RTPHeader;
 
// FEC 包头部结构(简化版)
typedef struct {
    uint8_t version:2;     // RTP 版本
    uint8_t padding:1;     // 填充标志
    uint8_t extension:1;   // 扩展标志
    uint8_t cc:4;          // CSRC 计数器
    uint8_t marker:1;      // 标记位
    uint8_t payload:7;     // 载荷类型(FEC 专用)
    uint16_t seq;          // 序列号
    uint32_t timestamp;    // 时间戳
    uint32_t ssrc;         // 同步源标识符
    uint16_t block_id;     // 块 ID
    uint8_t fec_index;     // FEC 包索引
    uint8_t original_packet_count; // 原始数据包数量 
} __attribute__((packed)) FECHeader;
 
typedef struct 
    int block_num;  // 块号
    /*
    组合1 200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
    组合2 200个字节一组,25个rtp4个fec包 5600/4 = 1400个字节,每包分成175个块,每块8个字节,一共175个分组 每包175*32个字节 = 5600 字节
    *组合2能恢复2个RTP包
    */
    uint8_t fec_data[NUM_BLOCKS * SYMBLE_SIZE]; 
} __attribute__((packed))  FECPacket;
 
// 发送缓冲区结构
typedef struct {
    uint8_t *data;         // 数据包数据
    int length;            // 数据包长度
    int seq_num;           // 序列号
    int timestamp;         // 时间戳
    int is_fec;            // 是否为 FEC 包
    int block_id;          // 所属块 ID
    int packet_id;         // 在块中的 ID
} PacketBuffer;
 
 
// 编码器上下文
typedef struct {
    correct_reed_solomon *rs;      // libcorrect Reed-Solomon编码器
    PacketBuffer packets[DATA_PACKETS_PER_BLOCK]; // 原始数据包
    FECPacket fecData[M];    // 内存池:  M冗余包
    int packet_count;              // 当前块中的数据包数量
    int next_seq_num;              // 下一个序列号
    int next_block_id;             // 下一个块 ID
    int ssrc;                      // 同步源标识符
    // 网络相关
    int sockfd;                    // 套接字描述符
    struct sockaddr_in dest_addr;  // 目标地址
} EncoderContext;
 
// 解码器上下文
typedef struct {
    correct_reed_solomon *rs;      // libcorrect Reed-Solomon解码器
    PacketBuffer *received_packets[MAX_BLOCKS_IN_BUFFER]; // 接收的数据包
    int block_map[MAX_BLOCKS_IN_BUFFER];  // 块ID映射
    int block_packet_count[MAX_BLOCKS_IN_BUFFER]; // 每个块收到的包数量
    int block_recovered[MAX_BLOCKS_IN_BUFFER]; // 块是否已恢复
    int ssrc;                      // 同步源标识符
    // 网络相关
    int sockfd;                    // 套接字描述符
    struct sockaddr_in src_addr;   // 源地址
    // 统计信息
    int total_packets;             // 接收的总数据包数
    int recovered_packets;         // 成功恢复的数据包数
    int total_blocks;              // 处理的总块数
    int recovered_blocks;          // 成功恢复的块数
} DecoderContext;
  
 
void printf_hex(uint8_t *data, int len);
 
// 初始化编码器
EncoderContext* init_encoder(const char* ip, int port) {
    EncoderContext* ctx = malloc(sizeof(EncoderContext));
    if (!ctx) return NULL;
     
    // 初始化libcorrect Reed-Solomon编码器
    ctx->rs = correct_reed_solomon_create(
        RS_PRIMITIVE_POLYNOMIAL,
        RS_FIRST_CONSECUTIVE_ROOT,
        RS_GENERATOR_ROOT_GAP,
        RS_NUM_ROOTS
    );
     
    if (!ctx->rs) {
        free(ctx);
        return NULL;
    }
    //int k = correct_reed_solomon_message_length(ctx->rs);
    //int m = correct_reed_solomon_block_length(ctx->rs);
    //printf("k:%d, M:%d\r\n", k , m);
    // 验证参数兼容性
    //assert(correct_reed_solomon_message_length(enc->rs) == K);
    //assert(correct_reed_solomon_block_length(enc->rs) == K+M);
     
    ctx->packet_count = 0;
    ctx->next_seq_num = 0;
    ctx->next_block_id = 0;
    ctx->ssrc = rand();
     
    // 创建 UDP 套接字
    ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (ctx->sockfd < 0) {
        perror("socket creation failed");
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
     
    memset(&ctx->dest_addr, 0, sizeof(ctx->dest_addr));
    ctx->dest_addr.sin_family = AF_INET;
    ctx->dest_addr.sin_port = htons(port);
    if (inet_pton(AF_INET, ip, &ctx->dest_addr.sin_addr) <= 0) {
        perror("invalid address");
        close(ctx->sockfd);
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
     
    // 初始化数据包缓冲区
    for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
        ctx->packets[i].data = NULL;
    
    return ctx;
}
 
// 初始化解码器
DecoderContext* init_decoder(int port) {
    DecoderContext* ctx = malloc(sizeof(DecoderContext));
    if (!ctx) return NULL;
     
    // 初始化libcorrect Reed-Solomon解码器(与编码器参数一致)
    ctx->rs = correct_reed_solomon_create(
        RS_PRIMITIVE_POLYNOMIAL,
        RS_FIRST_CONSECUTIVE_ROOT,
        RS_GENERATOR_ROOT_GAP,
        RS_NUM_ROOTS
    );
     
    if (!ctx->rs) {
        free(ctx);
        return NULL;
    }
     
    // 初始化块映射和计数器
    memset(ctx->block_map, -1, sizeof(ctx->block_map));
    memset(ctx->block_packet_count, 0, sizeof(ctx->block_packet_count));
    memset(ctx->block_recovered, 0, sizeof(ctx->block_recovered));
     
    ctx->ssrc = 0;
     
    // 初始化统计信息
    ctx->total_packets = 0;
    ctx->recovered_packets = 0;
    ctx->total_blocks = 0;
    ctx->recovered_blocks = 0;
     
    // 创建 UDP 套接字
    ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (ctx->sockfd < 0) {
        perror("socket creation failed");
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
     
    memset(&ctx->src_addr, 0, sizeof(ctx->src_addr));
    ctx->src_addr.sin_family = AF_INET;
    ctx->src_addr.sin_addr.s_addr = INADDR_ANY;
    ctx->src_addr.sin_port = htons(port);
     
    if (bind(ctx->sockfd, (const struct sockaddr *)&ctx->src_addr, 
             sizeof(ctx->src_addr)) < 0) {
        perror("bind failed");
        close(ctx->sockfd);
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
     
    // 初始化数据包缓冲区
    for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
        ctx->received_packets[i] = (PacketBuffer *)malloc(sizeof(PacketBuffer));
        memset(ctx->received_packets[i], 0x00, sizeof(PacketBuffer));
    }
     
    return ctx;
}
 
// 清理编码器资源
void cleanup_encoder(EncoderContext* ctx) {
    if (ctx) {
        // 释放数据包缓冲区
        for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
            if (ctx->packets[i].data) {
                free(ctx->packets[i].data);
            }
        }
          
        if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
        if (ctx->sockfd >= 0) close(ctx->sockfd);
        free(ctx);
    }
}
 
// 清理解码器资源
void cleanup_decoder(DecoderContext* ctx) {
    if (ctx) {
        // 释放数据包缓冲区
        for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
            if (ctx->received_packets[i]) {
                free(ctx->received_packets[i]);
            }
        }
         
        if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
        if (ctx->sockfd >= 0) close(ctx->sockfd);
         
        // 打印统计信息
        printf("解码器统计信息:\n");
        printf("  处理的总块数: %d\n", ctx->total_blocks);
        printf("  成功恢复的块数: %d\n", ctx->recovered_blocks);
        printf("  接收的总数据包: %d\n", ctx->total_packets);
        printf("  成功恢复的数据包: %d\n", ctx->recovered_packets);
        printf("  块恢复成功率: %.2f%%\n"
              (float)ctx->recovered_blocks / ctx->total_blocks * 100);
        printf("  数据包恢复成功率: %.2f%%\n"
              (float)ctx->recovered_packets / (ctx->total_blocks * DATA_PACKETS_PER_BLOCK) * 100);
         
        free(ctx);
    }
}
 
// 使用Reed-Solomon码编码并发送RTP包
int encode_and_send(EncoderContext* ctx, const uint8_t* payload, int payload_len, int timestamp) {
    if (!ctx || !payload || payload_len <= 0) {
        errno = EINVAL;
        return -1;
    }
     
    // 确保有效载荷长度不超过最大值
    if (payload_len > MAX_PACKET_SIZE ) {
        payload_len = MAX_PACKET_SIZE;
    }
     
    // 计算所需缓冲区大小
    int packet_size = RTP_HEADER_SIZE + payload_len;
     
    // 分配或调整数据包缓冲区大小
    if (ctx->packets[ctx->packet_count].data == NULL) {
        ctx->packets[ctx->packet_count].data = malloc(packet_size);
        if (!ctx->packets[ctx->packet_count].data) {
            perror("内存分配失败");
            return -1;
        }
    else if (ctx->packets[ctx->packet_count].length < packet_size) {
        ctx->packets[ctx->packet_count].data = realloc(ctx->packets[ctx->packet_count].data, packet_size);
        if (!ctx->packets[ctx->packet_count].data) {
            perror("内存分配失败");
            return -1;
        }
    }
     
    // 创建 RTP 头部
    RTPHeader header;
    header.version = 2;
    header.padding = 0;
    header.extension = 0;
    header.cc = 0;
    header.marker = 0;
    header.payload = 96;  // 假设默认载荷类型 96
    header.seq = htons(ctx->next_seq_num++);
    header.timestamp = htonl(timestamp);
    header.ssrc = htonl(ctx->ssrc);
     
    // 构建完整的 RTP 包
    memcpy(ctx->packets[ctx->packet_count].data, &header, RTP_HEADER_SIZE);
    memcpy(ctx->packets[ctx->packet_count].data + RTP_HEADER_SIZE, payload, payload_len);
    ctx->packets[ctx->packet_count].length = packet_size;
    ctx->packets[ctx->packet_count].seq_num = ntohs(header.seq);
    ctx->packets[ctx->packet_count].timestamp = ntohl(header.timestamp);
    ctx->packets[ctx->packet_count].is_fec = 0;
    ctx->packets[ctx->packet_count].block_id = ctx->next_block_id;
    ctx->packets[ctx->packet_count].packet_id = ctx->packet_count;
     
    // 发送原始 RTP 包
    ssize_t sent = sendto(ctx->sockfd, 
                         ctx->packets[ctx->packet_count].data, 
                         ctx->packets[ctx->packet_count].length, 
                         0, 
                         (const struct sockaddr *)&ctx->dest_addr, 
                         sizeof(ctx->dest_addr));
     
    if (sent < 0) {
        perror("发送失败");
        return -1;
    }
     
    printf("发送原始包 #%d (块 %d/%d), 长度 %d 字节\n"
          ctx->packets[ctx->packet_count].seq_num,
          ctx->next_block_id, ctx->packet_count + 1, payload_len);
     
    // 增加包计数
    int current_packet = ctx->packet_count++;
     
    // 当收集到足够的包时,生成并发送 FEC 包
    if (ctx->packet_count == DATA_PACKETS_PER_BLOCK) {
        generate_and_send_fec(ctx);
        ctx->packet_count = 0;
        ctx->next_block_id++;
    }
     
    return current_packet;
}
 
// 处理变长包:填充至SYMBOL_SIZE
void pad_packet(char *pkt, size_t actual_len) {
    if (actual_len < SYMBOL_SIZE) {
        memset(pkt + actual_len, 0, SYMBOL_SIZE - actual_len);
    }
}
 
// 使用Reed-Solomon码生成并发送FEC包
void generate_and_send_fec(EncoderContext* ctx) {
    int block_id = ctx->next_block_id;  
     
    uint8_t msg[BLOCK_SIZE];  // 当前块数据(223字节)
    uint8_t encoded[BLOCK_SIZE + SYMBLE_SIZE]; // 编码结果(255字节) 
          
      
    printf("generate_and_send_fec  NUM_BLOCKS:%d\n",  NUM_BLOCKS);
     
    int group_index = 0;
    //35个块*40 ==1400
    //175*8=1400
    for (int block=0; block<NUM_BLOCKS;block++){
        int offset = block * REAL_BLOCK_SIZE;
        int size = REAL_BLOCK_SIZE;
          
        memset(msg, 0x00, sizeof(msg));
        // 2. 分块处理(每块223字节)-200字节+23字节补0
        //每个包取40个字节,5个rtp包 一共200个字节
        for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) 
        {
            // 1.准备数据
            uint8_t *full_data = ctx->packets[i].data + RTP_HEADER_SIZE; 
            int payload_len = ctx->packets[i].length - RTP_HEADER_SIZE; 
            // 从完整数据中提取当前块
            if (block == NUM_BLOCKS - 1){
                int reserved = payload_len - REAL_BLOCK_SIZE*block;
                if (reserved < size){
                    size = reserved;
                }
            }
            memcpy((void *)msg+i*REAL_BLOCK_SIZE, (void *)full_data + offset, size);
        }
        // RS编码
        ssize_t ret = correct_reed_solomon_encode(ctx->rs, msg, BLOCK_SIZE, encoded);
        if (ret != REED_SOLOMON_LEN){
            printf("correct_reed_solomon_encode not ret:%d\r\n", ret);
        
         
        if (block == 0){
            printf_hex(encoded+BLOCK_SIZE, SYMBLE_SIZE);
        }
        FECPacket *hdr = (FECPacket*)&(ctx->fecData[group_index]);
        hdr->block_num = block; // 标记保护的块
        memcpy(hdr->fec_data + block*SYMBLE_SIZE, encoded + BLOCK_SIZE, SYMBLE_SIZE); //SYMBLE_SIZE为32个标记位
    }
   
     
    for (int fec_idx = 0; fec_idx < FEC_PACKETS_PER_BLOCK; fec_idx++) {
        FECPacket *fecPacket = (FECPacket*)&(ctx->fecData[fec_idx]);
        // 创建FEC头部
        FECHeader fec_header;
        fec_header.version = 2;
        fec_header.padding = 0;
        fec_header.extension = 0;
        fec_header.cc = 0;
        fec_header.marker = 0;
        fec_header.payload = FEC_PAYLOAD_TYPE;
        fec_header.seq = htons(ctx->next_seq_num++);
        fec_header.timestamp = htonl(ctx->packets[0].timestamp);
        fec_header.ssrc = htons(ctx->ssrc);
        fec_header.block_id = htons(block_id);
        fec_header.fec_index = fec_idx;
        fec_header.original_packet_count = DATA_PACKETS_PER_BLOCK;
        int fec_packet_len = FEC_HEADER_SIZE +NUM_BLOCKS * SYMBLE_SIZE;  //sizeof(FECPacket) 
          
        // 计算FEC包总长度
        printf("fec_idx:%d, fec_packet_len:%d,sizeof(FECPacket):%d\n", fec_idx, fec_packet_len,sizeof(FECPacket));
        uint8_t* fec_packet = malloc(fec_packet_len);
        if (!fec_packet) {
            perror("FEC包内存分配失败");
            continue;
        }
 
        // 复制头部数据
        memcpy(fec_packet, (void *)&fec_header, FEC_HEADER_SIZE);
        uint8_t *fec_data = fecPacket->fec_data;
        memcpy((uint8_t *)(fec_packet+FEC_HEADER_SIZE), fec_data, NUM_BLOCKS * SYMBLE_SIZE);
   
        // 发送FEC包
        ssize_t sent = sendto(ctx->sockfd, fec_packet, fec_packet_len, 0, 
                            (const struct sockaddr *)&ctx->dest_addr, 
                            sizeof(ctx->dest_addr));
         
        if (sent < 0) {
            perror("FEC包发送失败");
        else {
            printf("发送FEC包 #%d (块 %d/%d), 保护 %d 个RTP包\n"
                  ntohs(fec_header.seq), block_id, fec_idx + 1, DATA_PACKETS_PER_BLOCK);
        }
         
        free(fec_packet);
    
     
    // 重置包计数器,准备下一个块
    ctx->packet_count = 0;
    ctx->next_block_id++;
}
 
//MAX_PACKET_SIZE +  RTP_HEADER_SIZE + 1
#define MAX_UDP_PACKET NUM_BLOCKS*32+120
 
// 使用Reed-Solomon码接收并解码RTP包
int receive_and_decode(DecoderContext* ctx, uint8_t** output_buffer, int* output_len, int* seq_num) {
    if (!ctx || !output_buffer || !output_len || !seq_num) {
        errno = EINVAL;
        return -1;
    }
     
    uint8_t buffer[MAX_UDP_PACKET];
    socklen_t addr_len = sizeof(ctx->src_addr);
     
    // 接收数据包
    int recv_len = recvfrom(ctx->sockfd, buffer, MAX_UDP_PACKET, 0, 
                           (struct sockaddr *)&ctx->src_addr, &addr_len);
    if (recv_len < 0) {
        perror("recvfrom failed");
        return -1;
    }
     
    // 解析RTP头部
    RTPHeader* rtp_header = (RTPHeader*)buffer;
    uint16_t packet_seq = ntohs(rtp_header->seq);
     
    // 检查是否是FEC包
    int is_fec = (rtp_header->payload == FEC_PAYLOAD_TYPE);
     
    // 计算块ID和包ID
    int block_id = 0;
    int packet_id = 0;
     
    if (is_fec) {
        // 解析FEC头部
        FECHeader* fec_header = (FECHeader*)buffer;
        packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
        block_id = ntohs(fec_header->block_id);
         
         
        // 计算FEC包在块中的索引
        int fec_index =  fec_header->fec_index;
         
        printf("接收到FEC包 #%d (块 %d) %d len:%d\n"
              packet_seq, block_id, fec_index, recv_len);
    else {
        // 原始 RTP 包
        block_id = packet_seq / TOTAL_PACKETS_PER_BLOCK;
        packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
         
        printf("接收到原始包 #%d (块 %d/%d) len:%d\n"
              packet_seq, block_id, packet_id, recv_len );
    }
      
     
    // 查找或分配块缓冲区
    int block_index = packet_id; 
    // 验证块ID是否有效
    if (block_index < 0 || block_index >= MAX_BLOCKS_IN_BUFFER) {
        printf("警告: 无效的包块ID %d\n", block_id);
        return 0;
    }
     
    // 保存数据包,只缓存一组
    int group_index = 0;
    PacketBuffer* packet = ctx->received_packets[block_index];
     
    // 分配或调整缓冲区大小
    if (packet->data == NULL) {
        packet->data = malloc( MAX_UDP_PACKET);//
        if (!packet->data) {
            perror("内存分配失败");
            return -1;
        }
    else if (packet->length < recv_len) {
        packet->data = realloc(packet->data, recv_len);
        if (!packet->data) {
            perror("[2]内存分配失败");
            return -1;
        }
    }
     
    memcpy(packet->data, buffer, recv_len);
    packet->length = recv_len;
    packet->seq_num = packet_seq;
    packet->timestamp = ntohl(rtp_header->timestamp);
    packet->is_fec = is_fec;
    packet->block_id = block_id;
    packet->packet_id = packet_id;
     
    // 更新接收计数
    ctx->block_packet_count[block_index]++;
     
    // 更新统计信息
    ctx->total_packets++;
     
    // 尝试恢复块数据
    if (block_index >= DATA_PACKETS_PER_BLOCK) {
        int recovered = recover_lost_packets(ctx, block_id);
        if (recovered) {
            ctx->block_recovered[block_index] = 1;
            ctx->recovered_blocks++;
            ctx->recovered_packets += DATA_PACKETS_PER_BLOCK;
             
            printf("成功恢复块 %d 的所有数据包\n", block_id);
            // 返回第一个可用的原始数据包
            for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
                if (ctx->received_packets[i]->data) {
                    packet = ctx->received_packets[i];
                    *output_buffer = ctx->received_packets[i]->data + RTP_HEADER_SIZE;
                    *output_len = ctx->received_packets[i]->length - RTP_HEADER_SIZE;
                    *seq_num = ctx->received_packets[i]->seq_num;
                     
                     
                    // 标记为已处理
                    free(ctx->received_packets[i]->data);
                    ctx->received_packets[i]->data = NULL;
                      
                     memset((void *)packet, 0x00 ,sizeof(packet));
                }
            }
        }
    }
     
    return 0; // 没有可用的完整数据包
}
 
void printf_hex(uint8_t *data, int len){
    printf("*****print len:%d***\n", len);
    for (int i=0; i<len; i++){
        printf("%02x", data[i]);
    
    printf("******end**\n");
}
 
// 使用Reed-Solomon码恢复丢失的数据包
int recover_lost_packets(DecoderContext* ctx, int block_index) { 
    int group_index = 0;
    int missing_packets  = 0;
    int received[TOTAL_PACKETS_PER_BLOCK] = {0};
    int erasures[BLOCK_SIZE] = {0}, num_erasures = 0;
      
     int random =  rand() % (TOTAL_PACKETS_PER_BLOCK -1);
     int random2 =   rand() % (TOTAL_PACKETS_PER_BLOCK -1);// random+1;//
     int random3 =  rand() % (TOTAL_PACKETS_PER_BLOCK -1);
      
    //random = random2;
      
    printf("尝试恢复块 %d,%d,%d: %d 的数据包...\n", random, random2, random3, block_index);
    // 检查是否有FEC包可用
    int available_fec_packets = 0;
    int available_fec_packet_index = 0;
    for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
        PacketBuffer* packet = ctx->received_packets[i];
        if (packet->data){
            if (random == i || random2 == i || random3 == i){
                received[i] = 0; 
                uint8_t *data = packet->data+RTP_HEADER_SIZE;
                printf("test lost:%d ,orign data \n", i);
                printf_hex(data, packet->length - RTP_HEADER_SIZE);
                missing_packets++; 
            }else{
                received[i] = 1;
                if (packet->is_fec) {
                    available_fec_packets++;
                    available_fec_packet_index = i;
                     
                    //printf("fec lost:%d ,orign data \n", i);
                    //printf_hex(packet->data+FEC_HEADER_SIZE, 32);
                }
            }
        }else{
            received[i] = 0; 
        
    
     
    // 如果FEC包全丢,不进行恢复
    if (available_fec_packets == 0) {
        printf("错误: 没有可用的FEC包,无法恢复\n");
        return 0;
    
      
     
    printf("块 %d 有 %d 个丢失的数据包,%d 个可用的FEC包,fec_i:%d\n"
          block_index, missing_packets, available_fec_packets, available_fec_packet_index);
     
    // 检查是否有足够的FEC包来恢复
    if (missing_packets > MAX_RESTORE_PACKET) {
        printf("错误: 可用FEC包不足,无法恢复 (需要至少 %d 个,实际 %d 个)\n"
              missing_packets, available_fec_packets);
        return 0;
    
       
    uint8_t symbols[REED_SOLOMON_LEN], restored[REED_SOLOMON_LEN];
    PacketBuffer* fec_packet = ctx->received_packets[available_fec_packet_index] ;
    FECHeader* fec_header = (FECHeader*)fec_packet;
    // 1. 分块恢复(共35块,每块40个字节)
    // 2. 分块恢复(共175块,每块8个字节)
    for (int block=0; block<NUM_BLOCKS; block++) {
        int offset = block * REAL_BLOCK_SIZE;
        memset(symbols, 0, REED_SOLOMON_LEN);
        memset(restored, 0, REED_SOLOMON_LEN);
        memset(erasures, 0, BLOCK_SIZE);
        num_erasures = 0;
        // 组包1:收集当前块的原始数据 5个包,每个包40个字节
        // 组包2:收集当前块的原始数据 175个包,每个包8个字节
        for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {
            if (received[i] ) {
                uint8_t *data =  ctx->received_packets[i]->data;
                uint8_t *full_data = data + RTP_HEADER_SIZE; 
                memcpy(symbols + i*REAL_BLOCK_SIZE, full_data + offset, REAL_BLOCK_SIZE);
            }else{
                 
                for (int j=0; j<REAL_BLOCK_SIZE; j++){
                    erasures[num_erasures++] = i*REAL_BLOCK_SIZE+j;
                }
                //num_erasures += REAL_BLOCK_SIZE;
            }
        }
  
        // 收集当前块的校验数据(1个FEC包) 
        memcpy(symbols + BLOCK_SIZE , (void*)fec_packet->data + FEC_HEADER_SIZE + block*SYMBLE_SIZE, SYMBLE_SIZE);  
         
        if (block == 0){
             
            printf("***************num_erasures:%d data:",num_erasures);
            for (int j=0; j<num_erasures; j++){ 
                printf("%d,",erasures[j]);
            }
            printf("****\n",num_erasures);
            printf_hex(symbols, REED_SOLOMON_LEN);
        }
        // RS解码
        int success = correct_reed_solomon_decode_with_erasures(
            ctx->rs, 
            symbols, 
            REED_SOLOMON_LEN,
            erasures, 
            num_erasures, 
            restored
        );
        if (block == 0){
         printf_hex(restored, REED_SOLOMON_LEN);
         printf("#########success:%d#############\n",success);
        }
 
        // 3. 回写恢复的原始数据 
        if (success) {
            for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {  
                if (!received[i]) { 
                    PacketBuffer *packetbuffer = ctx->received_packets[i] ;
                    if (packetbuffer && packetbuffer->data == NULL){
                        packetbuffer->data = (uint8_t *)malloc(MAX_PACKET_SIZE + RTP_HEADER_SIZE);
                    }  
                    uint8_t *full_data = packetbuffer->data + RTP_HEADER_SIZE; 
                    memcpy(full_data + offset, restored + i*REAL_BLOCK_SIZE, REAL_BLOCK_SIZE);
                }
            }  
        }
    }  
     
    for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {  
        if (!received[i]) { 
            int missing_idx= i;
            if (ctx->received_packets[missing_idx]->data == NULL){
                ctx->received_packets[missing_idx]->data = (uint8_t *)malloc(RTP_HEADER_SIZE+MAX_PACKET_SIZE);
            }
            // 创建恢复包的头部
            RTPHeader* recovered_header = (RTPHeader*)ctx->received_packets[missing_idx]->data;  
              
            // 复制RTP头部(从第一个可用的包复制)
            RTPHeader* header = NULL;
            for (int k = 0; k < DATA_PACKETS_PER_BLOCK; k++) {
                if (ctx->received_packets[k]->data) {
                    header = (RTPHeader*)ctx->received_packets[k]->data;
                    break;
                }
            }
            *recovered_header = *header;
            ctx->received_packets[missing_idx]->length = RTP_HEADER_SIZE + MAX_PACKET_SIZE;
            ctx->received_packets[missing_idx]->seq_num = recovered_header->seq;
            ctx->received_packets[missing_idx]->timestamp = ntohl(recovered_header->timestamp);
            ctx->received_packets[missing_idx]->is_fec = 0;
            ctx->received_packets[missing_idx]->block_id = fec_header->block_id;
            ctx->received_packets[missing_idx]->packet_id = missing_idx;
             
            uint8_t *data = ctx->received_packets[missing_idx]->data + RTP_HEADER_SIZE;
            printf_hex(data, MAX_PACKET_SIZE);
            printf("成功恢复块 %d 的数据包 #%d\n", fec_header->block_id, i);
        }
    
    printf("块 %d 的恢复尝试完成: 成功\n", fec_header->block_id);
    return 1;
}
 
// 编码器主函数示例
void encoder_main(const char* dest_ip, int dest_port) {
    EncoderContext* ctx = init_encoder(dest_ip, dest_port);
    if (!ctx) {
        printf("编码器初始化失败\n");
        return;
    }
     
    printf("编码器开始发送数据到 %s:%d...\n", dest_ip, dest_port);
     
    // 模拟发送 100 个 RTP 包
    for (int i = 0; i < 100; i++) {
        // 创建随机数据作为载荷
        uint8_t payload[MAX_PACKET_SIZE];
        for (int j = 0; j < sizeof(payload); j++) {
            payload[j] = rand() % 256;
        }
         
        // 编码并发送
        encode_and_send(ctx, payload, sizeof(payload), i * 90);
         
        // 控制发送速率
        usleep(10000); // 10ms
    }
     
    cleanup_encoder(ctx);
}
 
// 解码器主函数示例
void decoder_main(int listen_port) {
    DecoderContext* ctx = init_decoder(listen_port);
    if (!ctx) {
        printf("解码器初始化失败\n");
        return;
    }
     
    printf("解码器开始在端口 %d 接收数据...\n", listen_port);
     
    uint8_t* output_buffer = NULL;
    int output_len = 0;
    int seq_num = 0;
     
    // 持续接收和解码数据包
    while (1) {
        int result = receive_and_decode(ctx, &output_buffer, &output_len, &seq_num);
        if (result > 0) {
            printf("处理接收到的数据包 #%d, 长度 %d 字节\n", seq_num, output_len);
             
            // 这里可以处理接收到的数据,例如写入文件或播放
            // 示例中只是简单打印信息
             
            // 注意:output_buffer 指向的是 ctx 内部缓冲区,
            // 在下一次调用 receive_and_decode 前保持有效
        else if (result < 0) {
            printf("接收数据时发生错误\n");
            break;
        }
         
        // 控制循环速率
        usleep(1000); // 1ms
    }
     
    cleanup_decoder(ctx);
}
 
int main(int argc, char* argv[]) {
    // 初始化随机数生成器
    srand(time(NULL));
     
    if (argc < 2) {
        printf("用法: %s [encoder|decoder] [参数...]\n", argv[0]);
        printf("编码器: %s encoder <目标IP> <目标端口>\n", argv[0]);
        printf("解码器: %s decoder <监听端口>\n", argv[0]);
        return 1;
    }
     
    if (strcmp(argv[1], "encoder") == 0) {
        if (argc < 4) {
            printf("编码器需要目标IP和端口参数\n");
            return 1;
        }
         
        const char* dest_ip = argv[2];
        int dest_port = atoi(argv[3]);
         
        encoder_main(dest_ip, dest_port);
    else if (strcmp(argv[1], "decoder") == 0) {
        if (argc < 3) {
            printf("解码器需要监听端口参数\n");
            return 1;
        }
         
        int listen_port = atoi(argv[2]);
         
        decoder_main(listen_port);
    else {
        printf("未知命令: %s\n", argv[1]);
        return 1;
    }
     
    return 0;
}


测试:

接收端:

./testc  decoder 5000


发送端:

./testc  encoder 127.0.0.1 5000

-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com


本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com

请先登录后发表评论
  • 最新评论
  • 总共0条评论