http、rtsp服务

用chromium既可以写Client,也可以写服务。相比Client,服务有更固定模式,1)创建用于侦听的socket,2)收到Client向侦听socket发来的连接请求,创建针对此次连接的会话socket,3)连接断开后销毁会话socket。

一、http服务

HttpServer是Chromium内封装了http服务的类,<chromium>/net/server/http_server_unittest.cc提供了如何使用HttpServer示例代码。核心是app构建个从HttpServer::Delegate派生的类(Rose中的HttpServerRose),当HttpServer收到“事件”时,调用HttpServerRose内函数处理这些“事件”,于是实现了app的自定义http功能。

步骤一:创建侦听socket、listen,构造HttpServer,投递DoAcceptLoop任务

void HttpServerRose::SetUp()
{
  std::unique_ptr<ServerSocket> server_socket(new TCPServerSocket(NULL, NetLogSource()));
  server_socket->ListenWithAddressAndPort("192.168.0.100", 8080, 1);

  server_.reset(new HttpServer(std::move(server_socket), this));
  server_->GetLocalAddress(&server_address_);
}

以上是步骤一的简化版。server_socket是服务要用的侦听socket,和某个本地IP:Port绑定后,侦听。随后以它为参数构造HttpServer,构造期间会向消息循环投递DoAcceptLoop任务

步骤二、DoAcceptLoop创建针对此次连接的会话socket,启动DoReadLoop

DoAcceptLoop作用是接收Client发向侦听socket的连接请求,并创建会话socket。

void HttpServer::DoAcceptLoop()
{
  int rv;
  do {
    rv = server_socket_->Accept(&accepted_socket_,
                                base::Bind(&HttpServer::OnAcceptCompleted,
                                           weak_ptr_factory_.GetWeakPtr()));
    if (rv == ERR_IO_PENDING)
      return;
    rv = HandleAcceptResult(rv);
  } while (rv == OK);
}

server_socket_->Accept是异步操作,大多情况会立即返回ERR_IO_PENDING。一旦有Client连接了,会调用Accept时设置的HttpServer::OnAcceptCompleted。根据Chromium的socket异步处理机制,直觉会猜到OnAcceptCompleted主要工作是调用HandleAcceptResult,然后触发后续的DoAcceptLoop,以便让侦听socket继续accept下一个Client。

void HttpServer::OnAcceptCompleted(int rv) {
  if (HandleAcceptResult(rv) == OK)
    DoAcceptLoop();
}

HandleAcceptResult处理有新连接的具体工作。

int HttpServer::HandleAcceptResult(int rv)
{
  if (rv < 0) {
    LOG(ERROR) << "Accept error: rv=" << rv;
    return rv;
  }

  std::unique_ptr<HttpConnection> connection_ptr =
      std::make_unique<HttpConnection>(++last_id_, std::move(accepted_socket_));
  HttpConnection* connection = connection_ptr.get();
  id_to_connection_[connection->id()] = std::move(connection_ptr);
  delegate_->OnConnect(connection->id());
  if (!HasClosedConnection(connection))
    DoReadLoop(connection);
  return OK;
}

HandleAcceptResult三个操作。1)以HttpServer内唯一整数和accepted_socket_(此次连接的会话socket)构造HttpConnection对象,2)调用delegate_->OnConnect,它是代理提供的针对有新会话的自处理方法。3)HasClosedConnection是false表示此个Connect正处于激活状态,以刚创建的HttpConnection为参数调用DoReadLoop。

步骤三:DoReadLoop读客户端发来的请求,有请求后调用代理提供的OnHttpRequest

void HttpServer::DoReadLoop(HttpConnection* connection)
{
  int rv;
  do {
    HttpConnection::ReadIOBuffer* read_buf = connection->read_buf();
    // Increases read buffer size if necessary.
    if (read_buf->RemainingCapacity() == 0 && !read_buf->IncreaseCapacity()) {
      Close(connection->id());
      return;
    }

    rv = connection->socket()->Read(
        read_buf,
        read_buf->RemainingCapacity(),
        base::Bind(&HttpServer::OnReadCompleted,
                   weak_ptr_factory_.GetWeakPtr(), connection->id()));
    if (rv == ERR_IO_PENDING)
      return;
    rv = HandleReadResult(connection, rv);
  } while (rv == OK);
}

DoReadLoop向会话socket触发一次读任务,如果当前socket正好有未读请求,调用HandleReadResult,否则将来收到请求后,系统会回调OnReadCompleted。根据Chromium的socket异步处理机制,直觉会猜到OnReadCompleted主要工作是调用HandleReadResult,然后又触发后续的DoReadLoop。

void HttpServer::OnReadCompleted(int connection_id, int rv) {
  HttpConnection* connection = FindConnection(connection_id);
  if (!connection)  // It might be closed right before by write error.
    return;

  if (HandleReadResult(connection, rv) == OK)
    DoReadLoop(connection);
}

HandleReadResult处理收到的数据,按http协议解析头部,产生HttpServerRequestInfo,以它为参数调用delegate_的OnHttpRequest。处理完此次读到的数据后,调用DoReadLoop继续向会话socket要数据,于是形成了周而复始的读请求。

步骤四:app代理生成应答,通过HttpServer提供的Send发送出去

Json::Value json_root;
json_root["response_code"] = 0;
json_root["msg"] = "success";
Json::FastWriter writer;
std::string data = writer.write(json_root);
server_->Send(connection_id, HTTP_OK, data, "application/json; charset=UTF-8", TRAFFIC_ANNOTATION_FOR_TESTS);

使用了开源项目jsoncpp,用它生成json格式数据,然后调用server_->Send发送给对面的Client。请求和应答是成对出现,以上代码会是delegate_->OnHttpRequest内的一部分。另外,delegate_->OnHttpRequest被调用时处在httpd线程,为避免同步造成问题,app希望解析请求、处理请求、生成应答是在主线程,为此需要做跨进程处理,Rose用的是Webrtc中的Invoke。

图1 Rose实现的Http服务框架

使用Rose提供的http服务框架,app要做的就是让base_instance派生类重载app_handle_http_request。

步骤四:连接断开,调用代理提供的OnClose,销毁会话socket

void HttpServer::Close(int connection_id)
{
  auto it = id_to_connection_.find(connection_id);
  if (it == id_to_connection_.end())
    return;

  std::unique_ptr<HttpConnection> connection = std::move(it->second);
  id_to_connection_.erase(it);
  delegate_->OnClose(connection_id);

  // The call stack might have callbacks which still have the pointer of
  // connection. Instead of referencing connection with ID all the time,
  // destroys the connection in next run loop to make sure any pending
  // callbacks in the call stack return.
  base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE, connection.release());
}

连接一断就会触发HttpServer::Close,主要三个操作。1)销毁为此次连接创建的HttpConnection对象。2)调用代理提供的OnClose。3)销毁会话socket。

二、rtsp服务

根据以上对http服务分析,可遵循HttpServer套路实现RtspServer,代码基本可以照抄,具体实现参考Rose源码中的<chromium>/net/server/rtsp_server.cc。依样画葫芦,RtspServer要求app提供个实现RtspServer::Delegate接口的代理。

class Delegate {
  public:
    virtual void OnConnect(int connection_id) = 0;
    virtual int OnRtspRequest(int connection_id, const char* buf, int buf_len) = 0;
    virtual void OnClose(int connection_id) = 0;
};

OnRtspRequest对应着OnHttpRequest,这三个函数的调用时机和HttpServer一样。谁来实现RtspServer::Delegate?Rose把它放在了live555中最顶层的rtsp服务对象DynamicRTSPServer。

void DynamicRTSPServer::OnConnect(int connection_id)
{
  DCHECK(connection_map_.find(connection_id) == connection_map_.end());
  struct sockaddr_in clientAddr = {0};
  connection_map_[connection_id] = RTSPServerSupportingHTTPStreaming::createNewClientConnection(-1, clientAddr, server_->FindSocket(connection_id), connection_id);
}

int DynamicRTSPServer::OnRtspRequest(int connection_id, const char* buf, int buf_len)
{
  std::map<int, ClientConnection*>::iterator find_it = connection_map_.find(connection_id);
  DCHECK(find_it != connection_map_.end());

  ClientConnection* connection = find_it->second;
  tsessionid_lock lock(connection_id);
  return connection->handleRequestBytes_chromium(connection_id, buf, buf_len);
}

void DynamicRTSPServer::OnClose(int connection_id)
{
  std::map<int, ClientConnection*>::iterator find_it = connection_map_.find(connection_id);
  DCHECK(find_it != connection_map_.end());
  delete find_it->second;
  connection_map_.erase(find_it);
}

OnConnect创建一个针对此次连接的ClientConnection。该连接收到Client发来数据时,调用handleRequestBytes_chromium进行处理,此函数复制于live555自提供的handleRequestBytes。连接断开后销毁ClientConnection。至于ClientConection、handleRequestBytes是live555中概念,代码都不用动。

以上是新建连接、收到数据、断开连接时处理,和socket有关的还有发送。当live555d要发送rtsp应答、rtp包时,修改方法是换用RtspServer去发送。

发送任务之rtsp应答

int RTSPServer::RTSPClientConnection::handleRequestBytes_chromium(int connection_id, const char* buf, int newBytesRead)
{
  ......
  // fResponseBuffer是根据请求生成的rtsp应答
  std::string data((char const*)fResponseBuffer, strlen((char*)fResponseBuffer));
  fOurServer.rtsp_server().SendRaw(connection_id, data, TRAFFIC_ANNOTATION_FOR_TESTS);
  ......
}

发送任务之rtp包

bool RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize, int socketNum, net::StreamSocket* netSock, unsigned char streamChannelId)
{
  u_int8_t framingHeader[4];
  framingHeader[0] = '$';
  framingHeader[1] = streamChannelId;
  framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
  framingHeader[3] = (u_int8_t) (packetSize&0xFF);
  sendDataOverTCP_chromium(netSock, framingHeader, 4);
  sendDataOverTCP_chromium(netSock, packet, packetSize);
}

bool sendDataOverTCP_chromium(net::StreamSocket* netSock, u_int8_t const* data, unsigned dataSize)
{
  net::RtspServer& server = GenericMediaServer::instance->rtsp_server();
  std::string data_str((const char*)data, dataSize);
  return server.SendRawBySocket(netSock, data_str, TRAFFIC_ANNOTATION_FOR_TESTS);
}

发送rtsp应答的SendRaw,发送rtp包的SendRawBySocket,原理和HttpServer一样,先把要发送数据放到Server开的一个类型是QueuedWriteIOBuffer缓冲区,当会话socket可写时,就write出去。这里会遇到要发送数据量非常大的问题。对http,HttpServer开的QueuedWriteIOBuffer容量是1M字节,这对应答量不大的http或许是够了,但对要传视频数据的rtsp不够,得加大容量,Rose把这操作放在HandleAcceptResult。

int RtspServer::HandleAcceptResult(int rv)
{
  ...
  // 8 seconds * 8Mbps
  connection->write_buf()->set_max_buffer_size(posix_align_ceil(8 * 1000000, 1024));
  delegate_->OnConnect(connection->id());
}

为什么把缓冲区设为是“8 * 1000000”字节,这和发送时append_result是false就强制断开连接有关。

bool RtspServer::SendRawBySocket(StreamSocket* netSock, const std::string& data, NetworkTrafficAnnotationTag traffic_annotation)
{
  ...
  bool append_result = selected_connection->write_buf()->Append(data);
  if (append_result) {
    ...
  } else {
    PostCloseConnection(*selected_connection);
  }
  return true;
}

为什么append_result是false要断开连接?当Client所在设备直接关机时,read、write的异步完成都不会立即被调用,从关机到什么时候被调用的时间不同操作系统不一样,像windows可能20秒,android甚至超过60秒。为让尽快“知道”Client断了,把写入缓冲区溢出时候等同认为Client已经断了。append_result是false表示写入缓冲区溢出,于是缓冲区容量就成为Client断了时间门限,“8 * 1000000”字节大概等同8秒的8Mbps码流。

以上说的是用Chromium的socket库代替live555内置库(注:TCP时,live555的socket库有BUG,见“live555服务端无故停止发送”,按那里办法依旧不可能解决)。既然用了Chromium中socket,相应得用Chromium中的消息循环代替live555的taskScheduler().doEventLoop。如何用Chromium消息循环实现rtsp服务,1)创建一个Chromium线程,假设线程名称“Live555dThread”。2)在该线程执行setup_chromium。3)live555源码中taskScheduler().scheduleDelayedTask替换为base::ThreadTaskRunnerHandle::Get()->PostTask。

void GenericMediaServer::setup_chromium()
{
  std::unique_ptr<net::ServerSocket> server_socket(new net::TCPServerSocket(NULL, net::NetLogSource()));

  uint32_t addrNBO = htonl(get_local_ipaddr());
  std::stringstream addr_ss;
  addr_ss << (int)((addrNBO >> 24) & 0xFF) << "." << (int)((addrNBO>>16) & 0xFF);
  addr_ss << "." << (int)((addrNBO>>8)&0xFF) << "." << (int)(addrNBO & 0xFF);
  server_socket->ListenWithAddressAndPort(addr_ss.str(), htons(fServerPort.num()), 1);

  server_.reset(new net::RtspServer(std::move(server_socket), this));
  server_->GetLocalAddress(&server_address_);
}

它的设计等同“一、http服务”中“步骤一”说的setup。上面已分析过,一旦构造RtspServer,就会向“Live555dThread”线程投递DoAcceptLoop任务,于是整个rtsp服务流程就被启动。

H264VideoFileServerMediaSubsession::getAuxSDPLine

第三点把“taskScheduler().scheduleDelayedTask替换为base::ThreadTaskRunnerHandle::Get()->PostTask”,是不是live555原有的scheduleDelayedTask机制就没用了,——不是,getAuxSDPLine过程中的还得继续用。auxSDPLine是SDP中一部分,SDP生成过程参考“live555 源码分析:子会话SDP行生成”。说这之前简单小结live555d为什么要用scheduleDelayedTask。

FramedSource是源,可分为两种,第一类是由它可以得到单独一帧,别名framerSource,像H264or5VideoStreamFramer。第二类是从视频流中得到一段,别名fileSource,像ByteStreamFileSource。从层次来说,使用者直接使用的是framerSource,framerSource再使用fileSource。

RTPSink表示一种使用场景,它的startPlaying方法启动向framerSource要数据,后者再启动向内中的fileSource要数据。得到数据后的工作是解析,live555由于得到数据时的函数栈已经很深,为避免更深,就用scheduleDelayedTask把FramedSource::afterGetting任务加到任务调度器。调度器调用afterGettting,它解析收到的数据块,先后执行1)解析出一帧后,执行发送动作,2)继续解析余下数据,总之会触发scheduleDelayedTask再次把afterGettting放入调度器,于是形成了收数据、解析数据、发送帧,再收数据、解析数据……

afterGetting解析出一帧后,会调用RTPInterface::sendPacket执行发送。有些场景不须要真正发送,像生成SDP中的getAuxSDPLine,这时要做的是让RTPInterface中fTCPStreams是nullptr。

根据以上分析,一旦把scheduleDelayedTask替换为ThreadTaskRunnerHandle::Get()->PostTask,就实现了向消息循环投递任务,为什么还要有scheduleDelayedTask,这须要深入getAuxSDPLine代码。

char const* H264VideoFileServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource)
{
  if (fAuxSDPLine != NULL) return fAuxSDPLine; // it's already been set up (for a previous client)

  CHECK(inputSource->fSessionid != nposm);
  inGetAuxSDPLines.insert(inputSource->fSessionid);

  if (fDummyRTPSink == NULL) { // we're not already setting it up for another, concurrent stream
    // Note: For H264 video files, the 'config' information ("profile-level-id" and "sprop-parameter-sets") isn't known
    // until we start reading the file.  This means that "rtpSink"s "auxSDPLine()" will be NULL initially,
    // and we need to start reading data from our file until this changes.
    fDummyRTPSink = rtpSink;

    // Start reading the file:
    fDummyRTPSink->startPlaying(*inputSource, afterPlayingDummy, this);

    // Check whether the sink's 'auxSDPLine()' is ready:
    checkForAuxSDPLine(this);
  }
  envir().taskScheduler().doEventLoop(&fDoneFlag);
  inGetAuxSDPLines.erase(inputSource->fSessionid);

  return fAuxSDPLine;
}

getAuxSDPLine执行了RTPSink的startPlaying()之后,就会通过checkForAuxSDPLine(void* clientData)检查一下auxSDP的读取是否结束,具体方法是在100毫秒后判断fDoneFlag是否是true,true表示得到了auxSDP,否则没有。由于紧挨startPlaying,第一个checkForAuxSDPLine检查到的fDoneFlag基本是false,它的作用可认是向调度器投递下一个checkForAuxSDPLine,100毫秒后执行。100毫秒后,调度器执行checkForAuxSDPLine,判断fDoneFlag,如果是true,auxSDP得到了,退出doEventLoop(&fDonwFlag),否则向调度器投递下一个checkForAuxSDPLine,100毫秒后执行。直到终有一次checkForAuxSDPLine检测到fDoneFlag是true。getAuxSDPLine也过了doEventLoop(&fDoneFlag),向上返回。

getAuxSDPLine用的调度策略是doEventLoop,导致过程中依旧得用scheduleDelayedTask。计划是将来实现网络操控kOS时一并改掉。

发现网络断开,不直接调用Close,改用ThreadTaskRunnerHandle::Get()->PostTask投递Close

上面已出现的RtspServer::SendRawBySocket,还有HandleWriteResult中rv<0,它们都不直接调用Close,而是改为PostTask投递Close。原因是在执行到它们时,live555已执行很多操作,函数栈很深,一旦Close,会消毁一些对象,像HttpConnection,live555中ClientConnection,而这些操作的上层函数可能还须要这些对象有效。为了不修改上层对这些对象依赖,改用投递。

至于函数栈不深时,像DoReadLoop,发现断开就直接调用Close。

修正GenericMediaServer::removeServerMediaSession中BUG

void GenericMediaServer::removeServerMediaSession(ServerMediaSession* serverMediaSession) {
  if (serverMediaSession == NULL) return;

  ServerMediaSession* found_sms = (ServerMediaSession*)(fServerMediaSessions->Lookup(serverMediaSession->streamName()));
  if (found_sms == serverMediaSession) {
    fServerMediaSessions->Remove(serverMediaSession->streamName());
  }
  if (serverMediaSession->referenceCount() == 0) {
    Medium::close(serverMediaSession);
  } else {
    serverMediaSession->deleteWhenUnreferenced() = True;
  }
}

原来是无条件调用fServerMediaSessions->Remove,解决BUG方法是增加“if (found_sms == serverMediaSession)”。为什么要确定fServerMediaSessions中存在该sms才删除,须要说下live555怎么管理sms。关于sms可参考“live555 源码分析:ServerMediaSession”,这里主要说怎么销毁sms。

streamName。“IP:Port/”后的那串字符串,称为资源路径。举个例子,rtsp://192.168.1.107:8554/1,streamName指是的“1”。

规则:对同一streamName的不同会话,为保证各次会话参数不会串扰,分别要对应不同的sms。什么是串扰?举个例子,Scale(sms内参数)是播时的一个参数,第一个会话用的1,第二个会话是2,不能因为第二个会话是2连带把第一个会话也设到2。

fServerMediaSessions是GenericMediaServer成员变量,整系统只一个,作用是存储sms的哈希表,哈希表的key就是streamName。构造时用的标志是STRING_HASH_KEYS,意味着key是STRING类型,所以查找时比较的不是key指针值,而是key中字符串内容。于是有个疑问,同一streamName的sms在fServerMediaSessions只能存在一个,同一streamName的N个会话如何有着N个sms?——要解决这疑问,回看上面removeServerMediaSession代码。

如果sms的引用计数是0,析构sms,否则把sms的deleteWhenUnreferenced置为true。换句话说,虽然该sms是被从fServerMediaSessions移除了,但并不表示它被析构,它会以着“私有”变量保存在拥有它引用计数的对象中,这个对象就是ClientSession。当ClientSession被析构时,这个“私有”sms被析构。

GenericMediaServer::ClientSession::~ClientSession()
{
  ...
  if (fOurServerMediaSession != NULL) {
    fOurServerMediaSession->decrementReferenceCount();
    if (fOurServerMediaSession->referenceCount() == 0
	&& fOurServerMediaSession->deleteWhenUnreferenced()) {
      fOurServer.removeServerMediaSession(fOurServerMediaSession);
      fOurServerMediaSession = NULL;
    }
  }
}

removeServerMediaSession时已把它的deleteWhenUnreferenced置为true。顺着~ClientSession执行逻辑让恢复BUG现场。此时调用removeServerMediaSession,sms-A已不在fServerMediaSessions中,但如果fServerMediaSessions内有个相同streamName的sms-B,一旦不作修正无条件把sms-B移出fServerMediaSessions,sms-B将没人销毁了(此刻deleteWhenUnreferenced是false,~ClientSession认为是带内sms,不会销毁),造成内存泄漏。

补说下为什么“fServerMediaSessions内有个相同streamName的sms-B”。把sms加入fServerMediaSessions用的是addServerMediaSession。

void GenericMediaServer::addServerMediaSession(ServerMediaSession* serverMediaSession) {
  if (serverMediaSession == NULL) return;
  
  char const* sessionName = serverMediaSession->streamName();
  if (sessionName == NULL) sessionName = "";
  removeServerMediaSession(sessionName); // in case an existing "ServerMediaSession" with this name already exists
  
  fServerMediaSessions->Add(sessionName, (void*)serverMediaSession);
}

addServerMediaSession会执行两个操作,1)从fServerMediaSessions移除同streamName的sms-A。2)把参数给的sms-B加入fServerMediaSessions。也就是说,同一个streamName的sms在fServerMediaSessions内只有一个,后来的sms-B会挤掉sms-A,让sms-A孤悬带外。

deleteWhenUnreferenced是true,表示这已是个带外sms,由拥有者去销毁;否则是带内sms,等到析构RTSPServer时,通过~RTSPServer调用cleanup去销毁。销毁sms方法都是调用removeServerMediaSession。

void GenericMediaServer::cleanup()
{
  ...
  ServerMediaSession* serverMediaSession;
  while ((serverMediaSession = (ServerMediaSession*)fServerMediaSessions->getFirst()) != NULL) {
    removeServerMediaSession(serverMediaSession); // will delete it, because it no longer has any 'client session' objects using it
  }
}

live555什么时候“构造”sms?一是在收到DESCRIBE,二是在收到SETUP。

全部评论: 0

    写评论: