rtsp客户端

streamUsingTCP。rtsp协议要传的数据可分为两种,一是用于管理rtsp会话的消息,像DESCRIBE、SETUP、PLAY,它们一定通过TCP,端口号像554。第二种是媒体数据,像视频、音频,这些数据被封装成rtp包,于是称rtp数据。传送rtp数据可用UDP也可用TCP。用UDP时,需新开端口,以UDP收发。TCP时,rtp数据将和rtsp会话命令分时复用,统一通过554端口收发。streamUsingTCP指示用哪种方法,true表示使用第二种。

  1. 从网络收rtsp,解析出rtp,并从rtp拆包出编码过的视/音频帧。所用技术:live555。
  2. 解码视/音频帧。所用技术:webrtc中的解码模块。
  3. 渲染解码出的数据。所用技术:从webrtc解码输出中取VideoFrame,即基于rtc::VideoSinkInterface::OnFrame获得解码帧。

以上是之前用live555+webrtc实现的rtsp客户端,考虑到一些原因要用chromium代替live555。

  • chromium提供了非常好的连接溢出时间机制,能实现灵活的rtsp重连。
  • app基于Rose,Rose已在广泛使用chromium,live555又自带一套消息循环和socket库,这重复了。

说是用chromium代替live555,具体实现上是用chromium的消息循环和socket库,至于协议处理逻辑还是用live555中代码。

注:为让chromium中的GURL支持解析rtsp url,需修改chromium源码。具体是增加变量kRtspScheme,把它加到kStandardURLSchemes。

<chromium>/url/url_constants.cc
const char kRtspScheme[] = "rtsp";

<chromium>/url/url_util.cc
const SchemeWithType kStandardURLSchemes[] = {
  ......
  {kRtspScheme, SCHEME_WITH_PORT},  // Rtsp.
};

注:只要把live555.cpp中的use_chromium值改为false,就会使用live555自带的消息循环和socket库。

一、线程模型

图1 rtsp的线程模型

系统中至少存在1+1+N个线程,第一个1是main线程,第二个1是socket线程,N表示要同时接收N个rtsp设备,每个设备需要一个DecodingThread。

  • main线程。main函数所在的线程,系统创建。用于和各设备建立rtsp连接。
  • socket线程。使用Chromium中的base::Thread创建。使用TCP传rtsp时,建立rtsp连接(DESCRIBE、SETUP、PLAY)和后面接收媒体流数据是用同一个socket,加上Chromium硬性规定,“同一个socket上的操作必须放在同一个线程,包括创建、连接、读、写、关闭”,于是新开一个线程,专门处理和socket相关任务。当要接收N个rtsp设备时,系统也只有一个socket线程。
  • DecodingThread。webrtc创建。通过socket线程收到一帧后,解码,解码出的帧通过rtc::VideoSinkInterface::OnFrame传给app。

二、建立阶段

图1中“live555::start”执行建立rtsp连接,过程中要发送DESCRIBE、SETUP、PLAY。以下是流程。

  1. main线程向socket线程投递Start_chromium。
  2. 新建一个RunLoop对象。
  3. main线程向自已投递rtsp_setup_slice。rtsp_setup_slice做成定时器,用于检测发了请求后,如果4.5秒后还没应答,向main线程投递Quit,让退出步骤4运行的RunLoop。
  4. 运行RunLoop::Run。

步骤1的Start_chromium运行在socket线程,设置next_state_是解析域名状态,随即调用DoLoop。

int RTSPClient::DoLoop(int result) {
  DCHECK_NE(next_state_, STATE_NONE);

  int rv = result;
  do {
    State state = next_state_;
    next_state_ = STATE_NONE;
    switch (state) {
      case STATE_RESOLVE_HOST:
        DCHECK_EQ(net::OK, rv);
        rv = DoResolveHost();
        break;
      case STATE_RESOLVE_HOST_COMPLETE:
        rv = DoResolveHostComplete(rv);
        break;
      case STATE_TRANSPORT_CONNECT:
        DCHECK_EQ(net::OK, rv);
        rv = DoTransportConnect();
        break;
      case STATE_TRANSPORT_CONNECT_COMPLETE:
        rv = DoTransportConnectComplete(rv);
        break;
	  case STATE_TRANSPORT_WRITE_COMPLETE:
        rv = DoTransportWriteComplete(rv);
        break;
      case STATE_TRANSPORT_READ_COMPLETE:
        rv = DoTransportReadComplete(rv);
        break;
      default:
        NOTREACHED();
        rv = net::ERR_FAILED;
        break;
    }
  } while (rv != net::ERR_IO_PENDING && next_state_ != STATE_NONE);

  return rv;
}

void RTSPClient::OnIOComplete(int result)
{
  DoLoop(result);
}

以上是基于Chromium的socket库写的标准DoLoop、OnIOComplete(关于这两函数细节参考“Chromium(2/4):消息循环和socket库”中“二、Socket库”)。DoLoop把建立rtsp连接分为三个步骤:解析域名、连接和发请求/收应答。发请求/收应答的个数是1+N+1。N是sdp中的媒体流数目,有多少个媒体流就须要处理多少个SETUP。以下摘取当中DoTransportWriteComplete进行分析。

int RTSPClient::DoTransportWriteComplete(int result)
{
	if (result > 0) {
		next_state_ = STATE_TRANSPORT_READ_COMPLETE;
		VALIDATE(fResponseBufferBytesLeft > 1, null_str);
		// why fResponseBufferBytesLeft - 1, references to begin of handleResponseBytes1:
		if (!read_by_rtpinterface_) {
			const int max_bytes = SDL_min(envir().read_buf->size(), (int)fResponseBufferBytesLeft - 1);
			result = ctrl_socket_->Read(envir().read_buf.get(), max_bytes,	envir().iocomplete);
		} else {
			// let RTPInterface read this socket. net::ERR_IO_PENDING make DoLoop exit.
			if (!in_rtpinterface_iocomplete_) {
				SDL_Log("RTSPClient::DoTransportWriteComplete(result: %i)", result);
				envir().iocomplete.Run(SPECIAL_CALL_MAGIC);
			}
			result = net::ERR_IO_PENDING;
		}
		if (result > 0) {
			result = DoTransportReadComplete(result);
		}
	}

	if (result < 0 && result != net::ERR_IO_PENDING) {
		next_state_ = STATE_NONE;
		socket_io_fail();
	}
	return result;
}

参数result>0时,表示之前Write成功,于是发起Read。当Read返回值>0,表示此次Read是同步读,立即调用DoTransportReadComplete,让处理读到的应答。否则或是异步(ERR_IO_PENDING),或错误,是错误时调用soket_io_fail。是异步时返回ERR_IO_PENDING,回到上层DoLoop,不再满足while继续循环条件,DoLoop以ERR_IO_PENDING退出。后续任务得靠系统触发OnIOComplete才能处理。

Read时,为什么要使用read_by_rtpinterface_?——使用tcp时,server发出SETUP应答后,就有可能向外发rtp包。此时在554上会出现rtsp消息和rtp包混杂情况,read_by_rtpinterface_=true表示client收到过一个SETUP应答了,为区分是SETUP/PLAY应答还是rtp数据,后面一个字节一个字节接收(rtp还是按负载长度收),直到收完PALY应答,它确保了不会收走一个本属于rtp的字节。

三、接收媒体数据阶段

首先说下TCP传输时流媒体数据的顶层格式。

  • 整个流被拆分成好多个MTU,每个MTU由两部分组成,4字节前缀和payload。4字节前缀中第一个字节是“$”,第二个字节channel号,第三个、第四个字节是后面payload字节数。RTP包位在payload中。
  • MTU长度是服务器自个设的一个值,像4+1408。而一视频帧不可能才1000多字节,于是一帧会被拆成多个MTU。
  • 会不会发生一个MTU包含多帧数据?举个例子,#8帧只有最后400个字节了,于是放在了接下MTU a中,此时MTU a还有数百字节空着,会不会放#9帧的前面数百字节。——个人认为不会出现这种情况。

要了解live555如何接收rtsp数据可参考“live555从RTSP服务器读取数据到使用接收到的数据流程分析”。核心函数是MultiFramedRTPSource::networkReadHandler1,它既负责从socket接收媒体流,又负责处理收到的数据。处理过程包括,1)数据存到BufferedPacket链表,2)收到完整的一帧后,存到app要求的fTo,并调用app设置的、收到一帧后的回调函数afterGettingFrame。

3.1 live555::frame_slice接收一帧

让回看图1中“live555::frame_slice”,它向socket线程接收一帧数据。以下是流程。

  1. DecodingThread向socket线程投递continuePlaying_chromium。
  2. DecodingThread线程向自已投递quit_runloop,延迟时间5秒。一旦5秒后步骤3运行的RunLoop还没退出(意味着5秒也没收到一帧),让退出RunLoop。
  3. 运行RunLoop::Run。导致Run退出的可能原因,1)接收到一帧,2)中间发生错误,3)5秒超时。

步骤1的continuePlaying_chromium运行在socket线程,执行两个操作。1)调用continuePlaying,它设置相关变量,让live555进入NeedDelivery状态。当中有个参数叫fIsCurrentlyAwaitingData,执行前必须false,执行后设为true。2)调用RTPInterface::chromium_slice,它会循环调用SocketDescriptor::tcpReadhandle1_chromium,直到后者返回false。

tcpReadhandle1_chromium来自live555提供的tcpReadhandle1,只是把读socket部分改为用chromium的socket库。从socket收到数据后,原有的live555处理逻辑都不用变。

3.2 fIsCurrentlyAwaitingData变量

为什么要额外说这个变量,让看tcpReadHandler1_chromium中代码。

Boolean SocketDescriptor::tcpReadHandler1_chromium(int rv, ...)
{
  ...        
  Boolean callAgain = True;
  switch (fTCPReadingState) {
  ...
  case AWAITING_PACKET_DATA: {
    callAgain = False;
    fTCPReadingState = AWAITING_DOLLAR;
    // fStreamChannelId存储着此个MTU的channel号,由channel号找到能处理它的RTPInterface。
    RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
    if (rtpInterface->fNextTCPReadSize == 0) {
      // 已读出该MTU的所有payload,告知caller再次以rv=0调用tcpReadHandler1_chromium
      callAgain = true;
      break;
    }
    if (rtpInterface->fReadHandlerProc != NULL) {
      fTCPReadingState = AWAITING_PACKET_DATA;
      rtpInterface->fLastTCPReadResult_ = rv;
      rtpInterface->fReadHandlerProc(rtpInterface->fOwner, 0);
      if (rtpInterface->fNextTCPReadSize == 0) {
        if (fEnv.setup_finished) {
          RTPSource* source = nullptr;
          if (rtpInterface->fOwner->isSource()) {
            // 此个RTPInterface用于RTPSource
            source = static_cast<RTPSource*>(rtpInterface->fOwner);
          } else {
            // 此个RTPInterface既然不用于RTPSource,那一定用于RTCPInstance
            VALIDATE(rtpInterface->fOwner->isRTCPInstance(), null_str);
          }
          if (source == nullptr || source->isCurrentlyAwaitingData()) {
            // 如果此个RTPInterface用于RTPSource,还须满足isCurrentlyAwaitingData=true,caller才主动发read。isCurrentlyAwaitingData=false意味着是由DummySink::continuePlaying_chromiumy主动发read。
            callAgain = true;
            break;
          }
      } else {
        // during setup, continue finish it.
        callAgain = true;
        break;
      }
    }
  }
  break;
  } // case AWAITING_PACKET_DATA
  } // switch (fTCPReadingState)
  return callAgain;
}

处理了MTU前缀4字节后,SocketDescriptor进入AWAITING_PACKET_DATA状态,rtpInterface->fNextTCPReadSize存储着4字节中后两字节的值,即payload长度。rtpInterface->fReadHandlerProc会很快调用核心函数MultiFramedRTPSource::networkReadHandler1,后者去读socket时,每次最多读fNextTCPReadSize字节。随着networkReadHandler1不断被执行,socket读出数据越多,fNextTCPReadSize会不断减少,减少到0时表示已读完这个MTU。一旦读完MTU,后续不会再有异步触发出OnIOComplete,此刻需要主动把callAgain置为true,通知上层的chromium_slice或OnIOComplete再次调用tcpReadHandler1_chromium,去读下一个MTU。当isCurrentlyAwaitingData=false,意味着是由DummySink::continuePlaying_chromiumy主动发read。

fIsCurrentlyAwaitingData有什么用?——多个MTU组成一帧,总会遇到读完一个MTU时,该帧恰好读完,这变量让知道什么时候已读完一帧。它在continuePlaying时被置为true。networkReadHandler1判断出收完一帧后,在调用app的afterGettingFrame前会把它置为false。

为避免发生线程争抢写fIsCurrentlyAwaitingData,要让在socket线程执行continuePlaying。要是换放在DecodingThread执行,会产成BUG,让看以下序列。

  1. (socket线程)在第N次的tcpReadHandler1_chromium,判断出收完一帧了,置fIsCurrentlyAwaitingData=false。调用app的afterGettingFrame,后者向DecodingThread投递退出RunLoop任务。
  2.  (DecodingThread)执行步骤1投递来的退出RunLoop任务,消费该帧。完后了调用continuePlaying,于是fIsCurrentlyAwaitingData又被设为true。并按图1所示把chromium_slice投递到socket线程。
  3.  (socket线程)继续执行到第N次tcpReadHandler1_chromium中的if (rtpInterface->fNextTCPReadSize == 0 && source->isCurrentlyAwaitingData())(注:还是第N次,由于线程调度,整个步骤2在1和3之间执行),此时isCurrentlyAwaitingData是true,callAgain被错误地置为true,退不出上一个的chromium_slice任务。加上步骤2新投递的,socket线程的任务队列将有两个chromium_slice,此时极可能发生破坏“如果Connect/Read/Write返回ERR_IO_PENDING,在没触发下次连接完成/可读/可写事件前,禁止再次调用Connect/Read/Write。”规则的致命错误。

四、live555注释

4.1 _Tables

class _Tables {
	MediaLookupTable* mediaTable;
	void* socketTable;
};
  • mediaTable存储着那些从Medium派生的对象。静态函数MediaLookupTable::ourMedia(UsageEnvironment& env)得到这个指针。
  • socketTable真正类型是HashTable*,存储着SocketDescriptor。表中key或是sockNum(使用live555自带socket库),或是netSock(使用chromium的socket库)。全局函数socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent)得到这个指针。

每个RTSPClient实例有且只有一个UsageEnvironment,_Tables存放在UsageEnvironment中的liveMediaPriv成员,该成员定义的类型是void*,真正类型是_Tables*。静态函数_Tables::getOurTables(env)得到这个指针。

综上所述,每个RTSPClient实例有且只有一个mediaTable、一个socketTable。

4.2 MediaSession、MediaSubsession、MediaSubsessionIterator

收到sdp后,就要由它调用静态函数MediaSession::createNew。

MediaSession* MediaSession::createNew(UsageEnvironment& env, char const* sdpDescription) {
  MediaSession* newSession = new MediaSession(env);
  if (newSession != NULL) {
    if (!newSession->initializeWithSDP(sdpDescription)) {
      delete newSession;
      return NULL;
    }
  }
  return newSession;
}

createNew执行两个操作,一是创建MediaSession,二是调用新建对象的initializeWithSDP方法。于是经过createNew后,此个rtsp会话需要的MediaSession、MediaSubsession就都已创建了,并且MediaSession中的fSubsessionsHead、fSubsessionsTail都已指向了正确位置。

MediaSubsessionIterator用于枚举MediaSession中的所有流。新建MediaSubsessionIterator对象后调用next,或reset后调用next,此个next得到的是第一条流。以下是个枚举范例。

MediaSubsessionIterator iter(mediaSession);
MediaSubsession* subsession;
while ((subsession = iter.next()) != NULL) {
	subsession指向一条有效MediaSubsession。
}

4.3 SocketDescriptor

SocketDescriptor用于封装socket,一个SocketDescriptor对应一个socket。一次会话时创建的socket集中存放在每个会话只一个的socketTable(参考“4.1 _Tables”)。

SocketDescriptor内有个叫fSubChannelHashTable的HashTable,这表的key是streamChannelId,value是RTPInterface*。streamChannelId是怎么来的?它包含在服务器对SETUP的应答中,一般从0开始。假设有两条流,每条流都有RTP、RTCP,通常来说,第一条流RTP的streamChannelId是0,RTCP是1;第二条流RTP的streamChannelId是2,RTCP是3。

fSubChannelHashTable有什么用?——当streamUsingTCP是true,在接收媒体数据阶段,554上的流被拆分成好多个MTU,每个MTU由两部分组成,4字节前缀和payload。4字节前缀中第一个字节是“$”,第二个字节channel号,第三个、第四个字节是后面payload字节数。channel号的值就是streamChannelId。于是SocketDescriptor根据channel号在fSubChannelHashTable找出能处理它的RTPInterface。这部分逻辑参考“3.2 fIsCurrentlyAwaitingData变量“。补说下,RTCPInstance也是通过RTPInteface来收发网络数据。

4.4 TaskScheduler(任务调度模型)

在调用上,上层使用TaskScheduler的方法就是调用TaskScheduler的doEventLoop。

void BasicTaskScheduler0::doEventLoop(char volatile* watchVariable) {
  // Repeatedly loop, handling readble sockets and timed events:
  while (1) {
    if (watchVariable != NULL && *watchVariable != 0) break;
    SingleStep();
  }
}
三种任务用到的两种操作函数
typedef void TaskFunc(void* clientData);
typedef void BackgroundHandlerProc(void* clientData, int mask);

doEventLoop是个阻塞式函数,退出条件是SignleStep在执行过程中把watchVariable置为非0。参数watchVariable是上层自个管理的变量,SingleStep在执行过程中会调用上层提供的操作函数,上层什么时候想退出循环了,就把watchVariable置为非0。doEventLoop好处是可以让多种操作序列化到一个线程执行,这很好解决了多线程同步问题。

SingleStep是TaskScheduler的时间片函数,它其实有一个参数maxDelayTime,作用是做为select时溢出等待时间。缺省时值填0,意味着select只作即时检查,不等待。它会依次去执行三种任务,这三种任务被放在三个独立的集合,分别是HandlerDescriptor、TriggeredEventHandler和DelayQueue。

  • HandlerDescriptor。1)用途。用在后台读,它和socket机制有关,一个HandlerDescriptor对应一个socket。当select机制查询到有socket发生事件时,就在此链表查找处理者,然且调用相应的处理者操作。2)集合结构:链表。节点类型是HandlerDescriptor,操作函数原型。BackgroundHandlerProc。3)如何添加。BasicTaskScheduler::setBackgroundHandling。
  • 为方便遍历集合中节点,提供辅助类HandlerIterator,该类主要操作是next,HandlerIterator::next有4个特点。1)返回值是“上一次”的HandlerDescriptor。2)执行完后内部指向下一个HandlerDescriptor。3)HandlerIterator构造函数时内部指向第一个HandlerDescriptor,所以第一次next返回的是第一个HandlerDescriptor。4)当此次返回的已是最后一个时,内部指向nullptr,因而下一次next将返回nullptr,caller可根据next的返回值是否nullptr来判断是否枚举完了。
  • TriggeredEventHandler。1)用途。有事件发生了,但不在SingleStep线程,为简化多线程同步,就要把这些事件要执行的处理序列化到SingleStep线程。2)集合结构:数组。单元类型是TaskFunc,同时就是操作函数类型。3)如何添加。BasicTaskScheduler0::createEventTrigger。
  • DelayQueue。1)用途。它有点像TriggeredEventHandler,但TriggeredEventHandler是即时执行,它则可以设置一个延时时间,一旦该节点给出时刻到了,SingleStep就会调用这个DelayQueue中的操作。2)集合结构:链表。节点类型是DelayQueue,操作函数类型TaskFunc。3)如何添加。BasicTaskScheduler0::scheduleDelayedTask。

全部评论: 0

    写评论: