MP3文件流式分发并播放,主要涉及广播分发音频文件,然后在广播端解码播放。
服务器端,读文件,发送流,为了避免粘包,先发长度,然后发送二进制流。
流格式为RTP+1300个字节的音频
struct ClientConfig { public: char ip[64]; unsigned short port; bool useTcp; }; std::vector<ClientConfig*> clients; static volatile uint8_t running_flag = 0; static volatile uint8_t running_end_flag = 1; // RTP头结构 #pragma pack(push, 1) struct RTPHeader { uint8_t version_cc; uint8_t marker_pt; uint16_t seq; uint32_t timestamp; uint32_t ssrc; }; DWORD WINAPI SendWorker(LPVOID param) { char* file_name = (char*)param; int file_size = GetFileSize(file_name); // 一次性读取整个文件 FILE* fp = fopen(file_name, "rb"); if (fp == NULL) { free(file_name); return 0; } //fseek(fp, 0, SEEK_END); //long file_size = ftell(fp); fseek(fp, 0, SEEK_SET); std::vector<SOCKET> clientSocks; int sendBufSize = 1 * 1024 * 1024; // 1MB for (auto* client : clients) { if (client == NULL) { continue; } SOCKET sock = client->useTcp ? socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) : socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&sendBufSize, sizeof(sendBufSize)); sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(client->port); addr.sin_addr.s_addr = inet_addr(client->ip); if (client->useTcp) connect(sock, (sockaddr*)&addr, sizeof(addr)); clientSocks.push_back(sock); } #if 1 //第一个包,发送文件名,文件大小,文件信息 char send_buffer[256] = ""; std::string name = GetFileName(file_name); sprintf(send_buffer, "start#name:%s#file_size:%d",name.c_str(), file_size); for (size_t i = 0; i < clients.size(); ++i) { if (clients[i]->useTcp) { int packetSize = file_size;//strlen(send_buffer)+1; uint32_t netSize = htonl(packetSize); // 发送长度头 send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0); //send(clientSocks[i], send_buffer, packetSize+1, 0); } else { int packetSize = strlen(send_buffer)+1; sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(clients[i]->port); addr.sin_addr.s_addr = inet_addr(clients[i]->ip); sendto(clientSocks[i], send_buffer, packetSize + 1, 0, (sockaddr*)&addr, sizeof(addr)); } } #endif // 分段发送 const int chunkSize = 1300; // 有效载荷大小 RTPHeader header; header.version_cc = 0x80; // V=2, CC=0 header.marker_pt = 0x60; // PT=96 header.ssrc = htonl(0x12345678); char* fileData = new char[chunkSize*2]; running_end_flag = 0; // 构造RTP包 char *packet = (char *)malloc(sizeof(RTPHeader) + chunkSize); int seq = 1; int file_offset = 0; int need_pause = 0; while (!feof(fp)) { if (!running_flag) { need_pause = 1; break; } int readSize = fread(fileData, 1, chunkSize, fp); if (readSize <= 0) { break; } header.seq = htons(seq++); header.timestamp = htonl(file_offset); // 模拟时间戳 memcpy((void *)packet, (void*)&header, HEADER_SIZE); memcpy((char*)packet + HEADER_SIZE, (char*)fileData, readSize); int packetSize = HEADER_SIZE + readSize; // 为每个客户端发送当前块 for (size_t i = 0; i < clients.size(); ++i) { if (clients[i]->useTcp) { uint32_t netSize = htonl(packetSize); // 发送长度头 send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0); send(clientSocks[i], packet, packetSize, 0); } else { sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(clients[i]->port); addr.sin_addr.s_addr = inet_addr(clients[i]->ip); sendto(clientSocks[i], packet, packetSize, 0, (sockaddr*)&addr, sizeof(addr)); } } file_offset += readSize; if (!running_flag) { need_pause = 1; break; } Sleep(10); // 控制速率 } char temp[64] = ""; sprintf(temp, "end!"); if (need_pause) { sprintf(temp, "stop"); } for (size_t i = 0; i < clients.size(); ++i) { if (clients[i]->useTcp) { int len = strlen(temp)+1; uint32_t netSize = htonl(len); // 发送长度头 send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0); send(clientSocks[i], temp, len, 0); } else { sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(clients[i]->port); addr.sin_addr.s_addr = inet_addr(clients[i]->ip); sendto(clientSocks[i], temp, strlen(temp), 0, (sockaddr*)&addr, sizeof(addr)); } } Sleep(10); // 控制速率 for (auto sock : clientSocks) closesocket(sock); delete[] fileData; clients.clear(); running_end_flag = 1; running_flag = 0; fclose(fp); free(packet); free(file_name); return 0; }
Windows电脑模拟客户端:
几个关键点:1、音频流保存为文件;
2、使用ffmpeg的api探测探测音频流的格式,解码并转换为电脑播放的音频采样率、channel和音频PCM格式;
3、使用SDL的API播放音频,关键是AUDIO_F32LSB的转换,要不播放出来的声音有异常;
#define DATA_PORT 5600 #define SAMPLE_RATE 44100 #define OUT_PLAY_CHANNEL 2 enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format); // RTP头结构 #pragma pack(push, 1) struct RTPHeader { uint8_t version_cc; uint8_t marker_pt; uint16_t seq; uint32_t timestamp; uint32_t ssrc; }; #define AUDIO_BUFFER_COUNT 4 #define MAX_FRAME_SIZE 4410*8 #define DETECT_LEN 300*1024 #define OUT_DATA_SIZE 1152 * 8 // 音频解码上下文 struct AudioContext { AVCodecContext* codecCtx; AVFormatContext* fmt_ctx; AVCodecParameters* codecpar; int audio_stream_idx; const AVCodec* codec; AVCodecParserContext* parser; SwrContext* swrCtx; HWAVEOUT hWaveOut; bool isTcpMode; AVFrame* frame; AVFrame* pframePCM; AVPacket* pkt; uint8_t* outData[2]; uint8_t* cache_pcm_buffer; size_t cache_capacity; size_t cache_buffer_size; uint8_t* recv_buffer; size_t capacity; size_t recv_buffer_size; size_t read_buffer; int file_size; unsigned long play_start_time; int duration; double accumulated_play_time; size_t dectected; struct { uint8_t* data; int size; bool inUse; } audioBuffers[AUDIO_BUFFER_COUNT]; int currentBuffer; pthread_mutex_t bufferMutex; bool running;// bool play_pause;// SDL_AudioDeviceID audio_device; }; static AudioContext ctx; int configure_decoder(AudioContext* ctx) { if (ctx->codec != NULL) { avcodec_free_context(&ctx->codecCtx); ctx->codecCtx = NULL; swr_free(&ctx->swrCtx); } if (ctx->codecpar == NULL) { printf("codecpar == NULL \n"); return -1; } // 查找解码器 ctx->codec = avcodec_find_decoder(ctx->codecpar->codec_id); if (!ctx->codec) { printf("avcodec_find_decoder failed \n"); return AVERROR_DECODER_NOT_FOUND; } // 创建解码器上下文 ctx->codecCtx = avcodec_alloc_context3(ctx->codec); if (!ctx->codecCtx) { printf("avcodec_alloc_context3 failed \n"); return AVERROR(ENOMEM); } // 复制参数到解码器上下文 int ret = avcodec_parameters_to_context(ctx->codecCtx, ctx->codecpar); if (ret < 0) { printf("avcodec_parameters_to_context failed: %d\n", ret); return ret; } // 打开解码器 ret = avcodec_open2(ctx->codecCtx, ctx->codec, NULL); if (ret < 0) { printf("Could not open codec: %d\n", ret); avcodec_free_context(&ctx->codecCtx); return -1; } ctx->swrCtx = swr_alloc(); // 初始化重采样器 swr_alloc_set_opts(ctx->swrCtx, AV_CH_LAYOUT_STEREO, #ifdef USE_SDL_DEVICE sdl_format_to_ffmpeg(AUDIO_F32LSB), #else AV_SAMPLE_FMT_S16, #endif SAMPLE_RATE, av_get_default_channel_layout(ctx->codecCtx->channels), ctx->codecCtx->sample_fmt, ctx->codecCtx->sample_rate, 0, NULL); ret = swr_init(ctx->swrCtx); if (ret < 0) { printf("swr_init failed: %d\n", ret); avcodec_free_context(&ctx->codecCtx); swr_free(&ctx->swrCtx); return -1; } return ret; } enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format) { switch (format) { case AUDIO_S16SYS: case AUDIO_S16MSB: return AV_SAMPLE_FMT_S16; case AUDIO_F32LSB: case AUDIO_F32MSB: return AV_SAMPLE_FMT_FLT; case AUDIO_S32LSB: case AUDIO_S32MSB: return AV_SAMPLE_FMT_S32; default: return AV_SAMPLE_FMT_NONE; } } #define SDL_AUDIO_BUFFER_SIZE 4096 int start_play_with_sdl() { // 设置SDL音频参数 SDL_AudioSpec desired, obtained; desired.freq = SAMPLE_RATE; // 采样率 desired.format = AUDIO_F32;// AUDIO_F32LSB;// AUDIO_S16SYS; // 16位有符号整数(系统字节序) desired.channels = 2; // 立体声 desired.silence = 0; // 静音值为0 desired.samples = SDL_AUDIO_BUFFER_SIZE; // 音频缓冲区大小 desired.callback = NULL; // 不使用回调 desired.userdata = NULL; if (SDL_Init(SDL_INIT_AUDIO) < 0) { fprintf(stderr, "SDL初始化失败: %s\n", SDL_GetError()); return -1; } // 打开音频设备 ctx.audio_device = SDL_OpenAudioDevice(NULL, 0, &desired, &obtained, SDL_AUDIO_ALLOW_FORMAT_CHANGE); if (ctx.audio_device == 0) { fprintf(stderr, "Failed to open audio device: %s\n", SDL_GetError()); return -1; } // 检查实际获得的音频格式是否符合要求 if (obtained.format != AUDIO_F32LSB) { fprintf(stderr, "SDL did not provide AUDIO_S16SYS format\n"); return -1; } // 开始播放 SDL_PauseAudioDevice(ctx.audio_device, 0); } // 探测音频格式 int probe_audio_format(AudioContext* ctx, std::string &temp_file) { if (ctx->fmt_ctx != NULL) { avformat_free_context(ctx->fmt_ctx); } ctx->fmt_ctx = avformat_alloc_context(); if (!ctx->fmt_ctx) return AVERROR(ENOMEM); // 探测前20KB数据(可调整) ctx->fmt_ctx->probesize = DETECT_LEN; ctx->fmt_ctx->max_analyze_duration = 1 * AV_TIME_BASE; // 5秒 // 探测格式但不解析完整文件 int ret = avformat_open_input(&ctx->fmt_ctx, temp_file.c_str(), NULL, NULL); if (ret < 0) { printf("open failed"); return ret; } // 查找流信息(仅头部) ret = avformat_find_stream_info(ctx->fmt_ctx, NULL); if (ret < 0) { printf("avformat_find_stream_info failed"); return ret; } // 查找音频流 for (unsigned i = 0; i < ctx->fmt_ctx->nb_streams; i++) { AVStream* stream = ctx->fmt_ctx->streams[i]; if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { ctx->audio_stream_idx = i; ctx->codecpar = stream->codecpar; break; } } if (ctx->audio_stream_idx == -1) { return AVERROR_STREAM_NOT_FOUND; } if (0 != configure_decoder(ctx)) { return 0; } { int64_t total_duration = ctx->fmt_ctx->duration + ((ctx->fmt_ctx->duration <= (INT64_MAX - 5000)) ? 5000 : 0); ctx->duration = (total_duration / AV_TIME_BASE); int us = (total_duration % AV_TIME_BASE); printf("total_duration:%02d:%02d\r\n", ctx->duration, us); } struct timeval tv; gettimeofday(&tv, NULL); unsigned long start_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;//ms ctx->recv_buffer_size = 0; ctx->dectected = 1; ctx->play_start_time = start_time;// GetTickCount();// time(nullptr); register_pjsip_thread("NetworkThread"); //start_play(ctx->codecCtx->sample_rate, ctx->codecCtx->channels, NULL, RemotePlayCallbackFunc); #ifndef USE_SDL_DEVICE start_play(SAMPLE_RATE, OUT_PLAY_CHANNEL, NULL, RemotePlayCallbackFunc); #else start_play_with_sdl(); #endif return 0; } bool appendBinaryFile(const std::string& filePath, const uint8_t* binaryData, size_t dataSize) { // 创建并打开文件流,以二进制追加模式 std::ofstream outFile(filePath, std::ios::out | std::ios::binary | std::ios::app); // 检查文件是否成功打开 if (!outFile) { std::cerr << "无法打开文件: " << filePath << std::endl; return false; } // 将二进制数据追加写入文件 outFile.write(reinterpret_cast<const char*>(binaryData), dataSize); // 关闭文件流 outFile.close(); return true; } bool initDecoder(AudioContext* ctx) { // FFmpeg初始化 avcodec_register_all(); pthread_mutex_init(&ctx->bufferMutex, NULL); ctx->fmt_ctx = NULL;// avformat_alloc_context(); for (int i = 0; i < AUDIO_BUFFER_COUNT; i++) { ctx->audioBuffers[i].data = (uint8_t*)av_malloc(MAX_FRAME_SIZE); if (!ctx->audioBuffers[i].data) return false; ctx->audioBuffers[i].size = 0; ctx->audioBuffers[i].inUse = false; } ctx->currentBuffer = 0; ctx->cache_capacity = 400 * 1024; ctx->cache_pcm_buffer = (uint8_t*)malloc(ctx->cache_capacity); ctx->cache_buffer_size = 0; ctx->capacity = 400 * 1024; ctx->recv_buffer = (uint8_t*)malloc(ctx->capacity); ctx->recv_buffer_size = 0; ctx->dectected = 0; #if 1 ctx->codecCtx = NULL; ctx->codec = NULL; ctx->pkt = av_packet_alloc(); av_init_packet(ctx->pkt); ctx->frame = av_frame_alloc(); ctx->outData[0] = (uint8_t*)av_malloc(OUT_DATA_SIZE); ctx->outData[1] = (uint8_t*)av_malloc(OUT_DATA_SIZE); memset(ctx->outData[0], 0x00, OUT_DATA_SIZE); memset(ctx->outData[1], 0x00, OUT_DATA_SIZE); ctx->pframePCM = av_frame_alloc(); if (ctx->frame == NULL || ctx->pframePCM == NULL) { return false; } AVFrame* pframePCM = ctx->pframePCM; #ifdef USE_SDL_DEVICE pframePCM->format = sdl_format_to_ffmpeg(AUDIO_F32LSB); #else pframePCM->format = AV_SAMPLE_FMT_S16; #endif pframePCM->channel_layout = AV_CH_LAYOUT_STEREO; pframePCM->sample_rate = SAMPLE_RATE; pframePCM->nb_samples = SAMPLE_RATE / 100;// 2 * SAMPLE_RATE / 100;//(rate*channels*AV_CH_LAYOUT_STEREO*20)/8000;// pframePCM->channels = OUT_PLAY_CHANNEL; av_frame_get_buffer(pframePCM, 0); ctx->read_buffer = pframePCM->nb_samples * OUT_PLAY_CHANNEL; ringmalloc(ctx->read_buffer * 2); return true; } // 网络接收线程 int NetworkThread(void * param) { AudioContext* ctx = (AudioContext*)param; SOCKET sock = ctx->isTcpMode ? socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) : socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); sockaddr_in addr = {}; addr.sin_family = AF_INET; addr.sin_port = htons(DATA_PORT); addr.sin_addr.s_addr = INADDR_ANY; bind(sock, (sockaddr*)&addr, sizeof(addr)); register_pjsip_thread("NetworkThread"); std::string curr_path = ""; GetExePath(curr_path); std::string save_temp_path = curr_path + "\\temp.data";// wchar_t* file_temp = str_to_wstr(save_temp_path.c_str()); // 接收数据循环 int start_recv_len = 100 * 1024; #define BUFFER_SIZE 21480 uint8_t *buffer = (uint8_t *)malloc(BUFFER_SIZE); int recvLen = 0; // TCP模式监听连接 if (ctx->isTcpMode) { do { listen(sock, 1); SOCKET client = accept(sock, NULL, NULL); SOCKET new_sock = client; ctx->dectected = 0; ctx->recv_buffer_size = 0; ctx->play_pause = false; int first_packet = 1; int server_play_end = 0; int server_play_stop = 0; while (1) { uint32_t netSize = 0; int ret = recv(new_sock, (char*)&netSize, sizeof(netSize), MSG_WAITALL); if (ret < 0 || recvLen < 0) { printf("1 failed: client closed.\r\n"); break; } if (ret != 4) { //error printf("1 failed: client ret failed.\r\n"); break; } recvLen = ntohl(netSize); if (first_packet == 1) { first_packet = 0; //第一个包是文件大小 //sprintf(send_buffer, "start#name:%s#file_size:%d", name.c_str(), file_size); ctx->file_size = recvLen; printf("2 : server start,file_size:%d.\r\n", recvLen); continue; } ret = recv(new_sock, (char*)(buffer), recvLen, 0); if (ret <= 0 || recvLen <= 0) { printf("2 failed: client closed.\r\n"); break; } if (recvLen == 5) { buffer[recvLen] = '\0'; if (strstr((char *)buffer, "stop") != NULL) { printf("2 failed: server closed.\r\n"); server_play_stop = 1; Sleep(1000); break; } else if (strstr((char*)buffer, "end!") != NULL) { printf("2 failed: server play end.\r\n"); server_play_end = 1; break; } } uint8_t* payload = buffer + HEADER_SIZE; int payload_len = recvLen - HEADER_SIZE; if (payload_len < 0) { continue; } appendBinaryFile(save_temp_path.c_str(), payload, payload_len); ctx->recv_buffer_size += payload_len; if (ctx->recv_buffer_size >= DETECT_LEN) { if (ctx->dectected == 0) { probe_audio_format(ctx, save_temp_path); } } if (ctx->dectected) { ret = decodeFrame(ctx, buffer, recvLen); if (ret == AVERROR_EOF) { printf("2 failed: server closed.\r\n"); break; } } Sleep(5); } if (ctx->dectected && server_play_end) { do { int ret = decodeFrame(ctx, buffer, recvLen); if (ret == AVERROR_EOF) { printf("2 failed: file end.\r\n"); break; } Sleep(10); } while (!ctx->play_pause); do { int ret = do_get_frame(ctx); if (ret == -1) { break; } Sleep(10); } while (!ctx->play_pause); } #ifdef USE_SDL_DEVICE while (SDL_GetQueuedAudioSize(ctx->audio_device) > 0) { Uint32 queue_size = SDL_GetQueuedAudioSize(ctx->audio_device); double remaining_sec = (double)queue_size / (SAMPLE_RATE * OUT_PLAY_CHANNEL * 2); // 2 bytes per sample printf("剩余: %.2f KB (约 %.1f 秒)\r", queue_size / 1024.0, remaining_sec); SDL_Delay(100); } SDL_Delay(100); if (ctx->audio_device != 0) { SDL_ClearQueuedAudio(ctx->audio_device); // 清空音频队列 SDL_CloseAudioDevice(ctx->audio_device); printf("SDL音频设备已关闭\n"); } SDL_Quit(); #endif ctx->dectected = 0; ctx->recv_buffer_size = 0; closesocket(new_sock); Sleep(100); stop_play(); avformat_close_input(&ctx->fmt_ctx); avformat_free_context(ctx->fmt_ctx); avcodec_free_context(&ctx->codecCtx); ctx->codecCtx = NULL; ctx->fmt_ctx = NULL; Sleep(10); DeleteFile(file_temp); } while (ctx->running); } else { while (ctx->running) { recvLen = recvfrom(sock, (char*)buffer, BUFFER_SIZE, 0, NULL, NULL); if (recvLen == 4) { buffer[recvLen] = '\0'; if (strstr((char*)buffer, "stop") != NULL) { break; } }else if (recvLen > 0) { decodeFrame(ctx, buffer, recvLen); } } } delete[]file_temp; free(buffer); closesocket(sock); return 0; }
遗留问题,保存到音频流数据,能探测到音频格式之后,播放总是播一半就结束了。
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com