做转码服务的原型时,看了看MCU的实现,考虑到如果不做转码,可以将多路rtp流直接合成为一路rtmp流输出,这样就相当于实现了多人连麦,并将多人连麦的视频转发直播了,所以做了这个简单的原型实现!
DEMO只实现了接收一路rtp流,输出一路rtmp流!
同转码服务的类图设计:
基础库是ZLMediaKit,确实很方便!
直接上代码:
TranscoderTaskManager.h
可以使用linux的nc 127.0.0.1 3500 进行客户端测试! 然后使用ffmpeg对接收到的端口进行rtp包的推流: ffmpeg -re -i tuiliu_mp4.mp4 -vcodec libx264 -b:v 600k -s 480x320 -profile baseline -maxrate 600k -minrate 600k -g 20 -keyint_min 20 -sc_threshold 0 -an -f rtp rtp://11.12.112.42:52458 在ZLMediaKit的ZLMediaKit\server\main.cpp中启动TCP 3500端口的监听: //启动转码服务 TranscoderTaskManager::getInstance().startTranscoderServer(); 此结构体用来接收命令 /* {“dest_ip”:11.12.112.10, “dest_port”:9000, “socket_protocol”:”udp”, “transport_protocol”:”rtp”, “source_width”:1080, “source_height”:1920, "source_sps":""; "source_pps":""; “source_samplerate”:2000,//kbps “source_video_payloadtype”:”rtp”, “source_video_codec”:”h264”, “source_audio_codec”:”aac”, “dest_video_codec”:”h264”, “dest_audio_codec”:”aac”, “dest_width”:640, “dest_height”:480, “dest_samplerate”:800 } */ class InputTaskInfo : public std::enable_shared_from_this<InputTaskInfo> { friend class TranscoderTaskManager; friend class TranscoderSession; public: typedef std::shared_ptr<InputTaskInfo> Ptr; protected: string dest_ip; string transactcode; string protocol; int dest_port;//for tcp int dest_audio_port; int dest_video_port; int socket_protocol;//0:udp, 1:tcp string transport_protocol; int source_width; int source_height; int source_samplerate; string source_video_payloadtype; string source_audio_payloadtype; string source_video_codec; string source_audio_codec; string source_sps; string source_pps; string dest_video_codec; string dest_audio_codec; int dest_width; int dest_height; int dest_samplerate; bool needTranscode; bool outputUseRTMP; bool outputNoAudio; bool bSrtp; string output_rtmp_live_name; int proxy_recv_audio_port; int proxy_recv_video_port; RcvUDPDataTask::Ptr rcvVideoUDPTask; RcvUDPDataTask::Ptr rcvAudioUDPTask; Timer::Ptr _muteAudiotimer; unsigned long lastTimeStamp; unsigned long lastVideoTimeStamp; unsigned long lastAudioTimeStamp; Timer::Ptr _timer; MuteAudioMaker::Ptr _audioMaker; MultiMediaSourceMuxer::Ptr _mediaMuxer = NULL; std::shared_ptr<FrameMerger> _merger; AudioTrack::Ptr _audioTrack = NULL; VideoTrack::Ptr _videoTrack = NULL; void *_rtp_decoder = nullptr; BufferRaw::Ptr _buffer; }; class TranscoderTaskManager : public std::enable_shared_from_this<TranscoderTaskManager> { public: typedef std::shared_ptr<TranscoderTaskManager> Ptr; static TranscoderTaskManager& getInstance() { static TranscoderTaskManager taskManager; return taskManager; } void startTranscoderServer(); void addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo); void removeTask(const string &transcode) { lock_guard<mutex> lck(_mtxTranscodeClient); _userTranscoderClientInfoMap.erase(transcode); } InputTaskInfo::Ptr getTask(string &transcode); void removeTask(string &transcode); protected: TranscoderTaskManager(); ~TranscoderTaskManager(); private: TcpServer::Ptr _transcoderSrv; unordered_map<string, InputTaskInfo::Ptr> _userTranscoderClientInfoMap; mutex _mtxTranscodeClient; }; ////////////TRANSCODER 配置/////////// namespace Transcoder { #define TRANSCODER_FIELD "transcoder." const string kPort = TRANSCODER_FIELD"port"; onceToken token1([]() { mINI::Instance()[kPort] = 3500; }, nullptr); } //namespace Shell TranscoderTaskManager::TranscoderTaskManager():_transcoderSrv(new TcpServer()) { } TranscoderTaskManager::~TranscoderTaskManager() { } void TranscoderTaskManager::startTranscoderServer() { uint16_t transcoderPort = mINI::Instance()[Transcoder::kPort]; _transcoderSrv->start<TranscoderSession>(transcoderPort); } void TranscoderTaskManager::addTask(const string &transcode, const InputTaskInfo::Ptr &inputInfo) { //创建转码对象TranscoderTask //创建接收socket //开始监听接收任务和转码任务 lock_guard<mutex> lck(_mtxTranscodeClient); _userTranscoderClientInfoMap[transcode] = inputInfo; } InputTaskInfo::Ptr TranscoderTaskManager::getTask(string &transcode) { if (_userTranscoderClientInfoMap.find(transcode) != _userTranscoderClientInfoMap.end()) { return _userTranscoderClientInfoMap[transcode]; } return NULL; } void TranscoderTaskManager::removeTask(string &transcode) { _userTranscoderClientInfoMap->erase(transcode); }
TranscoderSession.h
class TranscoderSession : public TcpSession { public: TranscoderSession(const Socket::Ptr &pSock); virtual ~TranscoderSession(); ////TcpSession override//// void onRecv(const Buffer::Ptr &pBuf) override; void onError(const SockException &err) override; void onManager() override; private: string _transcoder; string _strRecvBuf; Ticker _beatTicker; string _strUserName; //消耗的总流量 uint64_t _ui64TotalBytes = 0; };
TranscoderSession.cpp
/** 常量定义 **/ #define START_TRANSCODE_CMD "1001" #define STOP_TRANSCODE_CMD "1002" #define START_PROXY_CMD "2001" #define STOP_PROXY_CMD "2002" /** 函数 **/ TranscoderSession::TranscoderSession(const Socket::Ptr &pSock) : TcpSession(pSock) { DebugP(this); //send("hello."); } void TranscoderSession::onRecv(const Buffer::Ptr&buf) { //DebugL << hexdump(buf->data(), buf->size()); _beatTicker.resetTime(); //所有3500的输入消息会回调到这个方法: //使用json解析出命令START_PROXY_CMD, 然后启动一个UDP的接收任务: _strRecvBuf.append(buf->data(), buf->size()); Json::Reader reader; Json::Value root; if (reader.parse(strValue, root)) { //..此处省略解析json字符串的代码 if (value.compare(START_PROXY_CMD) == 0) { const weak_ptr<TcpSession> weakSelf = shared_from_this(); auto &weak1 = inputInfo; if (inputInfo->_mediaMuxer == NULL) { //使用rtmp://127.0.0.1/live/chn_00 点播就可以了 inputInfo->_mediaMuxer.reset(new MultiMediaSourceMuxer(DEFAULT_VHOST, "live", "chn_00", 0, true, true, false, false)); } inputInfo->rcvVideoUDPTask = make_shared<RcvUDPDataTask>(); inputInfo->proxy_recv_video_port = inputInfo->rcvVideoUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) { uint8_t * data = (uint8_t *)buf->data(); uint8_t rtp_type = 0x7F & data[1]; uint8_t rtp_mark = 0x1 & data[2]; uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7]; auto frame = std::make_shared<H264FrameNoCacheAble>((char *)(buf->data() + 12), buf->size() - 12, timestamp, timestamp, 0); //这里就是把收到的rtp流转发给mediamuxer,用于混合成rtmp流 weak1->_videoTrack->inputFrame(frame); }); inputInfo->_videoTrack = std::make_shared<H264Track>(); //添加视频 inputInfo->_mediaMuxer->addTrack(inputInfo->_videoTrack); //视频数据写入_mediaMuxer inputInfo->_videoTrack->addDelegate(inputInfo->_mediaMuxer); //用来合并rtp包 inputInfo->_merger = std::make_shared<FrameMerger>(); inputInfo->rcvAudioUDPTask = make_shared<RcvUDPDataTask>(); inputInfo->proxy_recv_audio_port = inputInfo->rcvAudioUDPTask->startListener([weakSelf, weak1](const Buffer::Ptr &buf, struct sockaddr *addr, int len) { // uint8_t * data = (uint8_t *)buf->data(); uint8_t rtp_type = 0x7F & data[1]; uint8_t rtp_mark = 0x1 & data[2]; uint32_t timestamp = (((uint32_t)data[4]) << 24) | (((uint32_t)data[5]) << 16) | (((uint32_t)data[6]) << 8) | data[7]; auto frame = std::make_shared<AACFrameNoCacheAble>((char *)(buf->data() + 12), buf->size() - 12, timestamp, timestamp); weak1->_audioTrack->inputFrame(frame); weak1->_timer.reset(); } ); inputInfo->_audioTrack = std::make_shared<AACTrack>(); //添加音频 inputInfo->_mediaMuxer->addTrack(inputInfo->_audioTrack); inputInfo->_audioTrack->addDelegate(inputInfo->_mediaMuxer); retJson["proxy_recv_video_port"] = inputInfo->proxy_recv_video_port; retJson["proxy_recv_audio_port"] = inputInfo->proxy_recv_audio_port; TranscoderTaskManager::getInstance().addTask(inputInfo->transactcode, inputInfo); //将接收video和audio的端口返回给客户端 std::string out = retJson.toStyledString(); send(out); } } } TranscoderSession::~TranscoderSession() { DebugP(this); TranscoderTaskManager::getInstance().removeTask(_transactcode); } void TranscoderSession::onError(const SockException &err) { WarnP(this) << err.what(); } void TranscoderSession::onManager() { //session 超时管理 }
class RcvUDPDataTask : public std::enable_shared_from_this<RcvUDPDataTask> { public: //接收数据回调 typedef function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)> onReadCB; enum MediaType {VIDEO = 0, AUDIO}; typedef std::shared_ptr<RcvUDPDataTask> Ptr; public: RcvUDPDataTask(); virtual ~RcvUDPDataTask(); int startListener(string peerAddr, int peerPort); int startListener(onReadCB cb); int stopListener(); private: //RTP端口,trackid idx 为数组下标 Socket::Ptr _rcvSock; Socket::Ptr _sendSock; uint64_t _ui64TotalBytes = 0; MediaType _mediaType; }; RcvUDPDataTask::RcvUDPDataTask() { _sendSock.reset(new Socket(nullptr, false)); _rcvSock.reset(new Socket(nullptr, false)); } int RcvUDPDataTask::stopListener() { _rcvSock->closeSock(); return 0; } int RcvUDPDataTask::startListener(onReadCB cb) { //设置接收socket onceToken token(nullptr, [&]() { SockUtil::setRecvBuf(_rcvSock->rawFD(), 4 * 1024 * 1024); //SockUtil::setSendBuf(_sendSock->rawFD(), 4 * 1024 * 1024); }); //所有收到的包直接回调到cb方法 _rcvSock->setOnRead(cb); _rcvSock->setOnErr([this](const SockException &err) { }); if (!_rcvSock->bindUdpSock(0, "0.0.0.0")) { return -1; } return _rcvSock->get_local_port(); }
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com