用chromium既可以写Client,也可以写服务。相比Client,服务有更固定模式,1)创建用于侦听的socket,2)收到Client向侦听socket发来的连接请求,创建针对此次连接的会话socket,3)连接断开后销毁会话socket。
一、http服务
HttpServer是Chromium内封装了http服务的类,<chromium>/net/server/http_server_unittest.cc提供了如何使用HttpServer示例代码。核心是app构建个从HttpServer::Delegate派生的类(Rose中的HttpServerRose),当HttpServer收到“事件”时,调用HttpServerRose内函数处理这些“事件”,于是实现了app的自定义http功能。
步骤一:创建侦听socket、listen,构造HttpServer,投递DoAcceptLoop任务
void HttpServerRose::SetUp() { std::unique_ptr<ServerSocket> server_socket(new TCPServerSocket(NULL, NetLogSource())); server_socket->ListenWithAddressAndPort("192.168.0.100", 8080, 1); server_.reset(new HttpServer(std::move(server_socket), this)); server_->GetLocalAddress(&server_address_); }
以上是步骤一的简化版。server_socket是服务要用的侦听socket,和某个本地IP:Port绑定后,侦听。随后以它为参数构造HttpServer,构造期间会向消息循环投递DoAcceptLoop任务
步骤二、DoAcceptLoop创建针对此次连接的会话socket,启动DoReadLoop
DoAcceptLoop作用是接收Client发向侦听socket的连接请求,并创建会话socket。
void HttpServer::DoAcceptLoop() { int rv; do { rv = server_socket_->Accept(&accepted_socket_, base::Bind(&HttpServer::OnAcceptCompleted, weak_ptr_factory_.GetWeakPtr())); if (rv == ERR_IO_PENDING) return; rv = HandleAcceptResult(rv); } while (rv == OK); }
server_socket_->Accept是异步操作,大多情况会立即返回ERR_IO_PENDING。一旦有Client连接了,会调用Accept时设置的HttpServer::OnAcceptCompleted。根据Chromium的socket异步处理机制,直觉会猜到OnAcceptCompleted主要工作是调用HandleAcceptResult,然后触发后续的DoAcceptLoop,以便让侦听socket继续accept下一个Client。
void HttpServer::OnAcceptCompleted(int rv) { if (HandleAcceptResult(rv) == OK) DoAcceptLoop(); }
HandleAcceptResult处理有新连接的具体工作。
int HttpServer::HandleAcceptResult(int rv) { if (rv < 0) { LOG(ERROR) << "Accept error: rv=" << rv; return rv; } std::unique_ptr<HttpConnection> connection_ptr = std::make_unique<HttpConnection>(++last_id_, std::move(accepted_socket_)); HttpConnection* connection = connection_ptr.get(); id_to_connection_[connection->id()] = std::move(connection_ptr); delegate_->OnConnect(connection->id()); if (!HasClosedConnection(connection)) DoReadLoop(connection); return OK; }
HandleAcceptResult三个操作。1)以HttpServer内唯一整数和accepted_socket_(此次连接的会话socket)构造HttpConnection对象,2)调用delegate_->OnConnect,它是代理提供的针对有新会话的自处理方法。3)HasClosedConnection是false表示此个Connect正处于激活状态,以刚创建的HttpConnection为参数调用DoReadLoop。
步骤三:DoReadLoop读客户端发来的请求,有请求后调用代理提供的OnHttpRequest
void HttpServer::DoReadLoop(HttpConnection* connection) { int rv; do { HttpConnection::ReadIOBuffer* read_buf = connection->read_buf(); // Increases read buffer size if necessary. if (read_buf->RemainingCapacity() == 0 && !read_buf->IncreaseCapacity()) { Close(connection->id()); return; } rv = connection->socket()->Read( read_buf, read_buf->RemainingCapacity(), base::Bind(&HttpServer::OnReadCompleted, weak_ptr_factory_.GetWeakPtr(), connection->id())); if (rv == ERR_IO_PENDING) return; rv = HandleReadResult(connection, rv); } while (rv == OK); }
DoReadLoop向会话socket触发一次读任务,如果当前socket正好有未读请求,调用HandleReadResult,否则将来收到请求后,系统会回调OnReadCompleted。根据Chromium的socket异步处理机制,直觉会猜到OnReadCompleted主要工作是调用HandleReadResult,然后又触发后续的DoReadLoop。
void HttpServer::OnReadCompleted(int connection_id, int rv) { HttpConnection* connection = FindConnection(connection_id); if (!connection) // It might be closed right before by write error. return; if (HandleReadResult(connection, rv) == OK) DoReadLoop(connection); }
HandleReadResult处理收到的数据,按http协议解析头部,产生HttpServerRequestInfo,以它为参数调用delegate_的OnHttpRequest。处理完此次读到的数据后,调用DoReadLoop继续向会话socket要数据,于是形成了周而复始的读请求。
步骤四:app代理生成应答,通过HttpServer提供的Send发送出去
Json::Value json_root; json_root["response_code"] = 0; json_root["msg"] = "success"; Json::FastWriter writer; std::string data = writer.write(json_root); server_->Send(connection_id, HTTP_OK, data, "application/json; charset=UTF-8", TRAFFIC_ANNOTATION_FOR_TESTS);
使用了开源项目jsoncpp,用它生成json格式数据,然后调用server_->Send发送给对面的Client。请求和应答是成对出现,以上代码会是delegate_->OnHttpRequest内的一部分。另外,delegate_->OnHttpRequest被调用时处在httpd线程,为避免同步造成问题,app希望解析请求、处理请求、生成应答是在主线程,为此需要做跨进程处理,Rose用的是Webrtc中的Invoke。

使用Rose提供的http服务框架,app要做的就是让base_instance派生类重载app_handle_http_request。
步骤四:连接断开,调用代理提供的OnClose,销毁会话socket
void HttpServer::Close(int connection_id) { auto it = id_to_connection_.find(connection_id); if (it == id_to_connection_.end()) return; std::unique_ptr<HttpConnection> connection = std::move(it->second); id_to_connection_.erase(it); delegate_->OnClose(connection_id); // The call stack might have callbacks which still have the pointer of // connection. Instead of referencing connection with ID all the time, // destroys the connection in next run loop to make sure any pending // callbacks in the call stack return. base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, connection.release()); }
连接一断就会触发HttpServer::Close,主要三个操作。1)销毁为此次连接创建的HttpConnection对象。2)调用代理提供的OnClose。3)销毁会话socket。
二、rtsp服务
根据以上对http服务分析,可遵循HttpServer套路实现RtspServer,代码基本可以照抄,具体实现参考Rose源码中的<chromium>/net/server/rtsp_server.cc。依样画葫芦,RtspServer要求app提供个实现RtspServer::Delegate接口的代理。
class Delegate { public: virtual void OnConnect(int connection_id) = 0; virtual int OnRtspRequest(int connection_id, const char* buf, int buf_len) = 0; virtual void OnClose(int connection_id) = 0; };
OnRtspRequest对应着OnHttpRequest,这三个函数的调用时机和HttpServer一样。谁来实现RtspServer::Delegate?Rose把它放在了live555中最顶层的rtsp服务对象DynamicRTSPServer。
void DynamicRTSPServer::OnConnect(int connection_id) { DCHECK(connection_map_.find(connection_id) == connection_map_.end()); struct sockaddr_in clientAddr = {0}; connection_map_[connection_id] = RTSPServerSupportingHTTPStreaming::createNewClientConnection(-1, clientAddr, server_->FindSocket(connection_id), connection_id); } int DynamicRTSPServer::OnRtspRequest(int connection_id, const char* buf, int buf_len) { std::map<int, ClientConnection*>::iterator find_it = connection_map_.find(connection_id); DCHECK(find_it != connection_map_.end()); ClientConnection* connection = find_it->second; tsessionid_lock lock(connection_id); return connection->handleRequestBytes_chromium(connection_id, buf, buf_len); } void DynamicRTSPServer::OnClose(int connection_id) { std::map<int, ClientConnection*>::iterator find_it = connection_map_.find(connection_id); DCHECK(find_it != connection_map_.end()); delete find_it->second; connection_map_.erase(find_it); }
OnConnect创建一个针对此次连接的ClientConnection。该连接收到Client发来数据时,调用handleRequestBytes_chromium进行处理,此函数复制于live555自提供的handleRequestBytes。连接断开后销毁ClientConnection。至于ClientConection、handleRequestBytes是live555中概念,代码都不用动。
以上是新建连接、收到数据、断开连接时处理,和socket有关的还有发送。当live555d要发送rtsp应答、rtp包时,修改方法是换用RtspServer去发送。
发送任务之rtsp应答
int RTSPServer::RTSPClientConnection::handleRequestBytes_chromium(int connection_id, const char* buf, int newBytesRead) { ...... // fResponseBuffer是根据请求生成的rtsp应答 std::string data((char const*)fResponseBuffer, strlen((char*)fResponseBuffer)); fOurServer.rtsp_server().SendRaw(connection_id, data, TRAFFIC_ANNOTATION_FOR_TESTS); ...... }
发送任务之rtp包
bool RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize, int socketNum, net::StreamSocket* netSock, unsigned char streamChannelId) { u_int8_t framingHeader[4]; framingHeader[0] = '$'; framingHeader[1] = streamChannelId; framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8); framingHeader[3] = (u_int8_t) (packetSize&0xFF); sendDataOverTCP_chromium(netSock, framingHeader, 4); sendDataOverTCP_chromium(netSock, packet, packetSize); } bool sendDataOverTCP_chromium(net::StreamSocket* netSock, u_int8_t const* data, unsigned dataSize) { net::RtspServer& server = GenericMediaServer::instance->rtsp_server(); std::string data_str((const char*)data, dataSize); return server.SendRawBySocket(netSock, data_str, TRAFFIC_ANNOTATION_FOR_TESTS); }
发送rtsp应答的SendRaw,发送rtp包的SendRawBySocket,原理和HttpServer一样,先把要发送数据放到Server开的一个类型是QueuedWriteIOBuffer缓冲区,当会话socket可写时,就write出去。这里会遇到要发送数据量非常大的问题。对http,HttpServer开的QueuedWriteIOBuffer容量是1M字节,这对应答量不大的http或许是够了,但对要传视频数据的rtsp不够,得加大容量,Rose把这操作放在HandleAcceptResult。
int RtspServer::HandleAcceptResult(int rv) { ... // 8 seconds * 8Mbps connection->write_buf()->set_max_buffer_size(posix_align_ceil(8 * 1000000, 1024)); delegate_->OnConnect(connection->id()); }
为什么把缓冲区设为是“8 * 1000000”字节,这和发送时append_result是false就强制断开连接有关。
bool RtspServer::SendRawBySocket(StreamSocket* netSock, const std::string& data, NetworkTrafficAnnotationTag traffic_annotation) { ... bool append_result = selected_connection->write_buf()->Append(data); if (append_result) { ... } else { PostCloseConnection(*selected_connection); } return true; }
为什么append_result是false要断开连接?当Client所在设备直接关机时,read、write的异步完成都不会立即被调用,从关机到什么时候被调用的时间不同操作系统不一样,像windows可能20秒,android甚至超过60秒。为让尽快“知道”Client断了,把写入缓冲区溢出时候等同认为Client已经断了。append_result是false表示写入缓冲区溢出,于是缓冲区容量就成为Client断了时间门限,“8 * 1000000”字节大概等同8秒的8Mbps码流。
以上说的是用Chromium的socket库代替live555内置库(注:TCP时,live555的socket库有BUG,见“live555服务端无故停止发送”,按那里办法依旧不可能解决)。既然用了Chromium中socket,相应得用Chromium中的消息循环代替live555的taskScheduler().doEventLoop。如何用Chromium消息循环实现rtsp服务,1)创建一个Chromium线程,假设线程名称“Live555dThread”。2)在该线程执行setup_chromium。3)live555源码中taskScheduler().scheduleDelayedTask替换为base::ThreadTaskRunnerHandle::Get()->PostTask。
void GenericMediaServer::setup_chromium() { std::unique_ptr<net::ServerSocket> server_socket(new net::TCPServerSocket(NULL, net::NetLogSource())); uint32_t addrNBO = htonl(get_local_ipaddr()); std::stringstream addr_ss; addr_ss << (int)((addrNBO >> 24) & 0xFF) << "." << (int)((addrNBO>>16) & 0xFF); addr_ss << "." << (int)((addrNBO>>8)&0xFF) << "." << (int)(addrNBO & 0xFF); server_socket->ListenWithAddressAndPort(addr_ss.str(), htons(fServerPort.num()), 1); server_.reset(new net::RtspServer(std::move(server_socket), this)); server_->GetLocalAddress(&server_address_); }
它的设计等同“一、http服务”中“步骤一”说的setup。上面已分析过,一旦构造RtspServer,就会向“Live555dThread”线程投递DoAcceptLoop任务,于是整个rtsp服务流程就被启动。
H264VideoFileServerMediaSubsession::getAuxSDPLine
第三点把“taskScheduler().scheduleDelayedTask替换为base::ThreadTaskRunnerHandle::Get()->PostTask”,是不是live555原有的scheduleDelayedTask机制就没用了,——不是,getAuxSDPLine过程中的还得继续用。auxSDPLine是SDP中一部分,SDP生成过程参考“live555 源码分析:子会话SDP行生成”。说这之前简单小结live555d为什么要用scheduleDelayedTask。
FramedSource是源,可分为两种,第一类是由它可以得到单独一帧,别名framerSource,像H264or5VideoStreamFramer。第二类是从视频流中得到一段,别名fileSource,像ByteStreamFileSource。从层次来说,使用者直接使用的是framerSource,framerSource再使用fileSource。
RTPSink表示一种使用场景,它的startPlaying方法启动向framerSource要数据,后者再启动向内中的fileSource要数据。得到数据后的工作是解析,live555由于得到数据时的函数栈已经很深,为避免更深,就用scheduleDelayedTask把FramedSource::afterGetting任务加到任务调度器。调度器调用afterGettting,它解析收到的数据块,先后执行1)解析出一帧后,执行发送动作,2)继续解析余下数据,总之会触发scheduleDelayedTask再次把afterGettting放入调度器,于是形成了收数据、解析数据、发送帧,再收数据、解析数据……
afterGetting解析出一帧后,会调用RTPInterface::sendPacket执行发送。有些场景不须要真正发送,像生成SDP中的getAuxSDPLine,这时要做的是让RTPInterface中fTCPStreams是nullptr。
根据以上分析,一旦把scheduleDelayedTask替换为ThreadTaskRunnerHandle::Get()->PostTask,就实现了向消息循环投递任务,为什么还要有scheduleDelayedTask,这须要深入getAuxSDPLine代码。
char const* H264VideoFileServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource) { if (fAuxSDPLine != NULL) return fAuxSDPLine; // it's already been set up (for a previous client) CHECK(inputSource->fSessionid != nposm); inGetAuxSDPLines.insert(inputSource->fSessionid); if (fDummyRTPSink == NULL) { // we're not already setting it up for another, concurrent stream // Note: For H264 video files, the 'config' information ("profile-level-id" and "sprop-parameter-sets") isn't known // until we start reading the file. This means that "rtpSink"s "auxSDPLine()" will be NULL initially, // and we need to start reading data from our file until this changes. fDummyRTPSink = rtpSink; // Start reading the file: fDummyRTPSink->startPlaying(*inputSource, afterPlayingDummy, this); // Check whether the sink's 'auxSDPLine()' is ready: checkForAuxSDPLine(this); } envir().taskScheduler().doEventLoop(&fDoneFlag); inGetAuxSDPLines.erase(inputSource->fSessionid); return fAuxSDPLine; }
getAuxSDPLine执行了RTPSink的startPlaying()之后,就会通过checkForAuxSDPLine(void* clientData)检查一下auxSDP的读取是否结束,具体方法是在100毫秒后判断fDoneFlag是否是true,true表示得到了auxSDP,否则没有。由于紧挨startPlaying,第一个checkForAuxSDPLine检查到的fDoneFlag基本是false,它的作用可认是向调度器投递下一个checkForAuxSDPLine,100毫秒后执行。100毫秒后,调度器执行checkForAuxSDPLine,判断fDoneFlag,如果是true,auxSDP得到了,退出doEventLoop(&fDonwFlag),否则向调度器投递下一个checkForAuxSDPLine,100毫秒后执行。直到终有一次checkForAuxSDPLine检测到fDoneFlag是true。getAuxSDPLine也过了doEventLoop(&fDoneFlag),向上返回。
getAuxSDPLine用的调度策略是doEventLoop,导致过程中依旧得用scheduleDelayedTask。计划是将来实现网络操控kOS时一并改掉。
发现网络断开,不直接调用Close,改用ThreadTaskRunnerHandle::Get()->PostTask投递Close
上面已出现的RtspServer::SendRawBySocket,还有HandleWriteResult中rv<0,它们都不直接调用Close,而是改为PostTask投递Close。原因是在执行到它们时,live555已执行很多操作,函数栈很深,一旦Close,会消毁一些对象,像HttpConnection,live555中ClientConnection,而这些操作的上层函数可能还须要这些对象有效。为了不修改上层对这些对象依赖,改用投递。
至于函数栈不深时,像DoReadLoop,发现断开就直接调用Close。
修正GenericMediaServer::removeServerMediaSession中BUG
void GenericMediaServer::removeServerMediaSession(ServerMediaSession* serverMediaSession) { if (serverMediaSession == NULL) return; ServerMediaSession* found_sms = (ServerMediaSession*)(fServerMediaSessions->Lookup(serverMediaSession->streamName())); if (found_sms == serverMediaSession) { fServerMediaSessions->Remove(serverMediaSession->streamName()); } if (serverMediaSession->referenceCount() == 0) { Medium::close(serverMediaSession); } else { serverMediaSession->deleteWhenUnreferenced() = True; } }
原来是无条件调用fServerMediaSessions->Remove,解决BUG方法是增加“if (found_sms == serverMediaSession)”。为什么要确定fServerMediaSessions中存在该sms才删除,须要说下live555怎么管理sms。关于sms可参考“live555 源码分析:ServerMediaSession”,这里主要说怎么销毁sms。
streamName。“IP:Port/”后的那串字符串,称为资源路径。举个例子,rtsp://192.168.1.107:8554/1,streamName指是的“1”。
规则:对同一streamName的不同会话,为保证各次会话参数不会串扰,分别要对应不同的sms。什么是串扰?举个例子,Scale(sms内参数)是播时的一个参数,第一个会话用的1,第二个会话是2,不能因为第二个会话是2连带把第一个会话也设到2。
fServerMediaSessions是GenericMediaServer成员变量,整系统只一个,作用是存储sms的哈希表,哈希表的key就是streamName。构造时用的标志是STRING_HASH_KEYS,意味着key是STRING类型,所以查找时比较的不是key指针值,而是key中字符串内容。于是有个疑问,同一streamName的sms在fServerMediaSessions只能存在一个,同一streamName的N个会话如何有着N个sms?——要解决这疑问,回看上面removeServerMediaSession代码。
如果sms的引用计数是0,析构sms,否则把sms的deleteWhenUnreferenced置为true。换句话说,虽然该sms是被从fServerMediaSessions移除了,但并不表示它被析构,它会以着“私有”变量保存在拥有它引用计数的对象中,这个对象就是ClientSession。当ClientSession被析构时,这个“私有”sms被析构。
GenericMediaServer::ClientSession::~ClientSession() { ... if (fOurServerMediaSession != NULL) { fOurServerMediaSession->decrementReferenceCount(); if (fOurServerMediaSession->referenceCount() == 0 && fOurServerMediaSession->deleteWhenUnreferenced()) { fOurServer.removeServerMediaSession(fOurServerMediaSession); fOurServerMediaSession = NULL; } } }
removeServerMediaSession时已把它的deleteWhenUnreferenced置为true。顺着~ClientSession执行逻辑让恢复BUG现场。此时调用removeServerMediaSession,sms-A已不在fServerMediaSessions中,但如果fServerMediaSessions内有个相同streamName的sms-B,一旦不作修正无条件把sms-B移出fServerMediaSessions,sms-B将没人销毁了(此刻deleteWhenUnreferenced是false,~ClientSession认为是带内sms,不会销毁),造成内存泄漏。
补说下为什么“fServerMediaSessions内有个相同streamName的sms-B”。把sms加入fServerMediaSessions用的是addServerMediaSession。
void GenericMediaServer::addServerMediaSession(ServerMediaSession* serverMediaSession) { if (serverMediaSession == NULL) return; char const* sessionName = serverMediaSession->streamName(); if (sessionName == NULL) sessionName = ""; removeServerMediaSession(sessionName); // in case an existing "ServerMediaSession" with this name already exists fServerMediaSessions->Add(sessionName, (void*)serverMediaSession); }
addServerMediaSession会执行两个操作,1)从fServerMediaSessions移除同streamName的sms-A。2)把参数给的sms-B加入fServerMediaSessions。也就是说,同一个streamName的sms在fServerMediaSessions内只有一个,后来的sms-B会挤掉sms-A,让sms-A孤悬带外。
deleteWhenUnreferenced是true,表示这已是个带外sms,由拥有者去销毁;否则是带内sms,等到析构RTSPServer时,通过~RTSPServer调用cleanup去销毁。销毁sms方法都是调用removeServerMediaSession。
void GenericMediaServer::cleanup() { ... ServerMediaSession* serverMediaSession; while ((serverMediaSession = (ServerMediaSession*)fServerMediaSessions->getFirst()) != NULL) { removeServerMediaSession(serverMediaSession); // will delete it, because it no longer has any 'client session' objects using it } }
live555什么时候“构造”sms?一是在收到DESCRIBE,二是在收到SETUP。