线程模型

(run_loop.cc)GetTlsDelegate、thread_task_runner_tls

(run_loop.cc)GetTlsDelegate和thread_task_runner_tls是TLS(线程局部存储)中两个重要变量。(run_loop.cc)GetTlsDelegate指run_loop.cc中定义的一个叫GetTlsDelegate()的函数,对应设置该变量的函数是GetTlsDelegate().Set。thread_task_runner_tls的设置、读取则使用了延迟构造实例LazyInstance。

在类型上,(run_loop.cc)GetTlsDelegate虽是RunLoop::Delegate,但其实就是ThreadControllerWithMessagePumpImpl。它的作用是什么?——只要是这线程中创建的RunLoop,都会让共享这个ThreadControllerWithMessagePumpImpl。

thread_task_runner_tls是SingleThreadTaskRunner(下面称TaskRunner),type是TYPE_IO时,对应的真实类型是MessageLoopTaskRunner。在ThreadControllerWithMessagePumpImpl::InitializeThreadTaskRunnerHandle()中被赋值。它的作用是什么?——像句柄,通过它,调用者可以向线程模型添加任务。参考“2.1 app如何投递任务”。

一、模型四结构

SequenceManagerImpl(SequenceManager)、ThreadControllerWithMessagePumpImpl(ThreadController)、MessagePumpForIO(MessagePump)和RunLoop是线程模型的4个结构,其中前三个构成了线程模型的“静态”结构,它们每个模型各一个,模型创建时创建、销毁时销毁。RunLoop是线程模型中的动态对象,想处理任务了,直接用new base::RunLoop()构造,用完即可销毁;下一要用时再创建、然后销毁。

1.1 静态结构:ThreadControllerWithMessagePumpImpl和MessagePumpForIO

图1 Chromium中的线程模型

SequenceManagerImpl(SequenceManager)、ThreadControllerWithMessagePumpImpl(ThreadController)、MessagePumpForIO(MessagePump)组成了线程模型的“静态”结构,它们每个模型各一个。在层次上,可认为SequenceManager是第一层,它管理着ThreadController、MessagePump。ThreadController、MessagePump之间是平级关系,相互“独立”。它们被创建后,位在上层的SequenceManager调用ThreadController的BindToCurrentThread方法把MessagePump绑定到ThreadController,即赋值给当中的成员变量pump_。因为MessagePump实例最终是存储在ThreadController,所以相互“独立”中的独立加了引号。另外ThreadControllerWithMessagePumpImpl::BindToCurrentThread还会作一件事情,把此个ThreadController放入线程的本地存储,这样之后创建的RunLoop就可共享这个RunLoop::Delegate了。以下是创建三个对象及绑定的步骤。

  1. MessagePump::Create创建MessagePumpForIO。谁凋用MessagePump::Create?对非base::Thread类型的线程,通过全局函数CreateSequenceManagerForMainThreadType。
  2. 创建ThreadControllerWithMessagePumpImpl。
  3. 创建管理结构SequenceManagerImpl。需要ThreadControllerWithMessagePumpImpl作为输入参数。
  4. (ThreadControllerWithMessagePumpImpl::BindToCurrentThread)把MessagePumpForIO绑定到ThreadControllerWithMessagePumpImpl。
  5. (ThreadControllerWithMessagePumpImpl::BindToCurrentThread)把此个ThreadControllerWithMessagePumpImpl设置到(run_loop.cc)GetTlsDelegate,使得只要是这线程中创建的RunLoop,都会让共享这个ThreadControllerWithMessagePumpImpl。

1.2 RunLoop(任务循环)

RunLoop是线程模型中的动态对象,RunLoop血缘上是独立的,即不是任何类的基类或派生类。想处理任务了,直接用new base::RunLoop()构造。

std::unique_ptr<base::RunLoop> loop(new base::RunLoop());

RunLoop::Run()阻塞式执行任务循环,循环不执行完不退出。

RunLoop::Delegate* delegate_;
void RunLoop::Run() {
	if (!BeforeRun())
		return;
	delegate_->Run(application_tasks_allowed);
	AfterRun();
}

中间的delegate_->Run是主体,那delegate_是什么?——类型是RunLoop::Delegate。RunLoop::Delegate是半协议类,真实执行时需要从它派生、实现出自个派生类,这个类就是ThreadControllerWithMessagePumpImpl。但是ThreadControllerWithMessagePumpImpl不仅继承自RunLoop::Delegate,还有MessagePump::Delegate,图1中后面的MessagePumpForIO须要这个继承关系。

构造RunLoop时用不须要参数的new base::RunLoop(),ThreadControllerWithMessagePumpImpl实例是如何传给成员变量delegate_?

RunLoop::RunLoop(Type type)
	: delegate_(GetTlsDelegate().Get())
	, ...
{...}

前面已说过,(run_loop.cc)GetTlsDelegate().Get()返回该线程中的ThreadControllerWithMessagePumpImpl实例,所以只要是这线程中创建的RunLoop,都会共享这个ThreadControllerWithMessagePumpImpl。

综上所述,delegate_->Run就是ThreadControllerWithMessagePumpImpl::Run,接下看它执行什么。

std::unique_ptr<MessagePump> pump_;
void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed, TimeDelta timeout) {
	pump_->Run(this);
}

pump_类型中MessagePump,MessagePump是个半协议类,真正执行是它的派生类,这个派生类根据不同系统不同定义,像windows是MessagePumpWin,换句话说,是创建过程中被绑定到自已的MessagePumpForIO。

图1中,MessagePumpForIO::DoRunLoop()是任务循环主体,它围绕一个任务队列(TaskQueue)不断地进行循环,直到被通知停止为止。在围绕任务队列循环期间,它会不断地检查任务队列是否为空。如果不为空,那么就会将里面的任务(Task)取出来,并且进行处理。这样,一个线程如果要请求另外一个线程执行某一个操作,那么只需要将该操作封装成一个任务,并且发送到目标线程的任务队列去即可。

在代码中,可认为一个任务对应一次函数回调。

1.3 退出RunLoop::Run、DoIdleWork

有两种方法退出RunLoop::Run,一是让RunLoop::BeforeRun返回false。二是调用MessagePump::Quit()。

  • 让RunLoop::BeforeRun返回false。RunLoop::Run首先会调用BeforeRun,后者返回false就会退出Run。BeforeRun返回false的条件是quit_called_等于true。调用RunLoop::Quit()可以把quit_called_设为true。但是,执行BeforeRun时机是Run刚开始运行、还没执行任何任务,这个退出方法往往没机会用。
  • 调用MessagePump::Quit()。这个方法基于的规则:当RunLoop::Run进入delegate_->Run后,执行的是MessagePump::Run,只要退出了MessagePump::Run就意味着退出RunLoop::Run。MessagePump::Quit()是个纯虚方法,它给派生类制定了一种机制:当调用该函数后,必须能退出MessagePump::Run。以派生类MessagePumpForIO为例,执行Quit时会把should_quit置为true。针对MessagePumpForIO::DoRunLoop,只要执行过一个任务就会检查should_quit,一旦发现是true就会退出DoRunLoop。
void MessagePumpWin::Quit() {
  DCHECK(state_);
  state_->should_quit = true;
}

因为第一种方法往往没机会用,以下退出RunLoop::Run指的是用第二种方法,即调用MessagePump::Quit(),具体到调用代码是“pump_->Quit()”。

DoIdleWork在DoRunLoop,主要功能是退出RunLoop::Run。

bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
  if (ProcessNextDelayedNonNestableTask())
    return true;
  if (ShouldQuitWhenIdle())
    pump_->Quit();
  ...
  return false;
}

bool RunLoop::Delegate::ShouldQuitWhenIdle() {
  const auto* top_loop = active_run_loops_.top();
  if (top_loop->quit_when_idle_received_) {
    TRACE_EVENT_WITH_FLOW0("toplevel.flow", "RunLoop_ExitedOnIdle",
                           TRACE_ID_LOCAL(top_loop), TRACE_EVENT_FLAG_FLOW_IN);
    return true;
  }
  return false;
}

由以上代码可得知,当该线程最项端RunLoop的quit_when_idle_received_是true时,执行DoIdleWork将导致退出RunLoop::Run。有什么方法让quit_when_idle_received_置为true?1)RunLoop::QuitWhenIdle(),2)RunLoop::QuitCurrentWhenIdleDeprecated()(static函数),3)向它投递base::MessageLoop::QuitWhenIdleClosure()。以下是几种退出RunLoop::Run方法。

  1. RunLoop::Quit()。类似直接调用pump_->Quit(),执行完该语句所在的函数后就退出,如果除该语句所在任务外还有任务,在退出前这些任务会不会被执行?——不会,delegate->DoWork()/delegate->DoDelayedWork(&delayed_work_time_)最多执行一个任务,执行完一个任务后,会返回true(more_work_is_plausible=true),指示队列中不排除还有任务。退出Run不用等到DoIdleWork。
  2. RunLoop::QuitWhenIdle()。退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。
  3. RunLoop::QuitCurrentWhenIdleDeprecated()(static函数)。基于QuitWhenIdle,退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。
  4. base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, base::MessageLoop::QuitWhenIdleClosure())。基于QuitWhenIdle,退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。

2、3、4方法有个共同点,只是把顶端RunLoop的quit_when_idle_received_置为true,以便DoRunLoop执行到DoIdleWork时可退出任务循环。这种方法有个要特别注意地方,要是任务循环中一直有任务,那more_work_is_plausible一直将是true,导致DoRunLoop不会执行DoIdleWork,也就无法退出任务循环。举个例子,rose在实现http时(url_request_http_job_rose.cc),要持续向循环投递show_slice,导致循环中一直有这个任务,所以只能用第一种方法。

总来的说,安全方法还是认为在发出退出后,那些积压的任务还是可能会被执行的,那些任务需考虑判断RunLoop是否处于正处于退出阶段,app估计需增加私有closing变量。

针对RunLoop::Quit(),把quit_called_设为true后,后续的delegate->Quit()会调用pump->Quit(),即第二种方法。

void RunLoop::Quit() {
  ...
  quit_called_ = true;
  if (running_ && delegate_->active_run_loops_.top() == this) {
    // This is the inner-most RunLoop, so quit now.
    delegate_->Quit();
  }
}

如果在本线程的任务中执行退出,是否有效?——有效。当然,对调用RunLoop::Quit()时,即直接调用pump_->Quit(),必须等到调用该语句所在任务执行完后才会出。

1.4 在非Thread创建的线程使用线程模型

chromium中,对base::Thread线程,Thread::StartWithOptions会创建线程模型三静态结构及准备任务队列。但那些非base::Thread创建的线程,该怎么使用线程模型。

void trtspcapture::VideoReceiveStream::DecodeThreadFunction(void* ptr) {
  scoped_refptr<base::sequence_manager::TaskQueue> task_queue_;
  scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

  std::unique_ptr<base::sequence_manager::SequenceManager> sequence_manager_ = base::test::CreateSequenceManagerForMainThreadType(base::test::TaskEnvironment::MainThreadType::IO);
  sequence_manager_->set_runloop_quit_with_delete_tasks(true);
  {
    task_queue_ = sequence_manager_->CreateTaskQueue(
			base::sequence_manager::TaskQueue::Spec("task_environment_default")
				.SetTimeDomain(nullptr));
    task_runner_ = task_queue_->task_runner();
    sequence_manager_->SetDefaultTaskRunner(task_runner_);
    // simple_task_executor_ = std::make_unique<SimpleTaskExecutor>(task_runner_);
    CHECK(base::ThreadTaskRunnerHandle::IsSet()) << "ThreadTaskRunnerHandle should've been set now.";
    // CompleteInitialization();
  }
  while (static_cast<trtspcapture::VideoReceiveStream*>(ptr)->Decode()) {}
}

以上代码是“Chromium(3/4):rtsp客户端”中DecodingThread的线程函数,DecodingThread是webrtc创建。为了向socket thread接收帧,它需要一个消息循环。正如上面代码所示,创建线程模型三结构只要一条CreateSequenceManagerForMainThreadType就够了,后面set_runloop_quit_with_delete_tasks和清空任务队列有关,详见后面的“2.3 清空任务队列”。

chromium已考虑到有app会在主线程使用线程模型,提供了一个叫base::test::TaskEnvironment类,只要构造该类的对象就可在该线程使用线程模型。为简化使用,Rose略微修改了base::rose::TaskEnvironment。具体实现上,base_instance内置类型是TaskEnvironment的成员chromium_env_,base_instance构造函数中被构造。

base_instance::base_instance(...)
	: chromium_env_(base::test::TaskEnvironment::TimeSource::SYSTEM_TIME, base::test::TaskEnvironment::MainThreadType::IO)  , ......
{......}

构造TaskEnvironment时,参数TimeSource要赋值TimeSource::SYSTEM_TIME,否则使用的时钟将不是实时时钟,会使得PostDelayTask都出时间不对问题。

二、投递任务(PostTask)

2.1 app如何投递任务

以下是app投递任务的三种方法。

// 已拥有一个base::Thread的指针。一般用于向其它线程投递任务
thread_->task_runner()->PostTask/PostDelayedTask

// 没有base::Thread指针。要投递到本代码正在运行的、线程中的任务队列。图1显示了SequencedTaskRunner、SingleThreadTaskRunner是父子关系。
base::SequencedTaskRunnerHandle::Get()->PostTask/PostDelayedTask;
base::ThreadTaskRunnerHandle::Get()->PostTask/PostDelayedTask;

SequencedTaskRunnerHandle::Get()得到是一个SequencedTaskRunner指针。它是个static方法。线程A构造出SequencedTaskRunnerHandle后,会把它设为局部存储sequenced_task_runner_tls。SequencedTaskRunnerHandle::Get()执行的是访问局部存储,把sequenced_task_runner_tls中成员task_runner_(不是sequenced_task_runner_tls)值返回给app。

SequencedTaskRunner是如何和TaskQueueImpl实现绑定的?——构造TaskQueueImpl时,会构造成员task_poster_,task_poster_.outer_指向自已。构造结束后,会调用TaskQueueImpl::CreateTaskRunner创建TaskRunner,并把成员变量task_poster_赋值给TaskRunner的同名成员变量。这两种对象中,task_poster_类型都是scoped_refptr<GuardedTaskPoster>,因而它们指向的是同一个对象,必须它们都已被销毁后,task_poster_这对象才会销毁。

类似sequenced_task_runner_tls,ThreadTaskRunnerHandle::Get()访问的是线程局部存储thread_task_runner_tls,得到的值类型SingleThreadTaskRunner。

综上所述,app通过SequencedTaskRunnerHandle::Get()或ThreadTaskRunnerHandle::Get()得到指向本线程TaskRunner指针,调用它的PostTask。通过内部成员,PostTask会找到此个TaskRunner绑定到的TaskQueueImpl,进而调用TaskQueueImpl::PostTask。

2.2 存储、转移任务

参考图1右半部,SequenceManagerImpl把任务放在三个队列:active_queues、queues_to_gracefully_shutdown、queues_to_delete,列队的类型都是TaskQueueImpl。通常只用active_queues。

对每个TaskQueueImpl,内部又有四个队列,两个incoming队列(xxx_incoming_queue)、两个work队列(类型WorkQueue)。不论即时还是延时,任务首先进入incoming队列。执行DoWorkImpl时,任务必须先移到work队列。一旦任务进入work队列,不管这任务是即时还是延时,都意味着任务可以立即执行。chromium把work队列还分两个,猜测是用于后绪选择哪个队列去执行的优先级判断。

图2 任务在TaskQueueImpl中流动过程

图2显示了这4个队列、以及任务在队列之间的移动。从incoming队列移动到work队列,都在SelectNextTaskImpl中执行。

Task* SequenceManagerImpl::SelectNextTaskImpl(SelectTaskOption option) {
  // 加载即时work队列(main_thread_only().immediate_work_queue)。方法是把any_thread_.immediate_incoming_queue交换给它。具体执行的函数是TaskQueueImpl::ReloadEmptyImmediateWorkQueue
  ReloadEmptyWorkQueues();

  LazyNow lazy_now(controller_->GetClock());
  // MoveReadyDelayedTasksToWorkQueue负责把到期的延时任务从delayed_incoming_queue移到delayed_work_queue,
  MoveReadyDelayedTasksToWorkQueues(&lazy_now);
}
  1. SequencedTaskSource执行存储、转移任务等操作,但SequencedTaskSource是协议类,实现它的是SequenceManagerImpl。也就是说SequenceManagerImpl负责存储任务。
  2. 在SequenceManagerImpl构造函数会把自已传给ThreadControllerWithMessagePumpImpl,后者把它以指针形式放在自已的MainThreadOnly.task_source。通过task_source,ThreadControllerWithMessagePumpImpl可去操作任务了。
  3. 任务循环中,具体执行任务的函数是ThreadControllerWithMessagePumpImpl::DoWorkImpl,一次DoWorkImpl包括了提取任务,执行任务。那一次最多处理多少个任务?由MainThreadOnly.work_batch_size决定。work_batch_size不是指任务队列中有多少个任务,而是DoWorkImpl一次最多处理多少个任务,这个值默认是1。运行过程中极可能也不会改。
  4. 提取任务。MainThreadOnly.task_source->SelectNextTask执行提取任务。即通过SequencedTaskSource.SelectNextTask得到当前要执行的任务,
  5. 执行任务。得到任务(Task*)后,task_annotator_.RunTask("SequenceManager RunTask", task);执行这个任务。执行任务无非是运行一个函数,为什么要用一个专门类?chromium为让第三方工具可以知道内部正在干什么,每执行一个任务要输出track信息。

2.3 清空任务队列

如果不做修改,析构RunLoop不会清空SequenceManagerImpl中的任务,这可能会造成不便。还是以“Chromium(3/4):rtsp客户端”中的DecodingThread为例,它向socket线程投递收一帧的任务后,就向自已投递5秒后执行的Delayed任务:quit_runloop。如果5秒内socket线程收到一帧,RunLoop退出,否则执行quit_runloop,叫退出RunLoop。也就是说,当第N次是5秒内退出时,此次投递的quit_runloop将不会被执行,而是放在了TaskQueue中的delayed_tasks_,当运行到第N+i次RunLoop::Run,时间离第N次投递时刻超过5秒,于是调用第N次投递的quit_runloop,从而造成本次RunLoop不该有的退出。为此有个疑问,有没有办法主动清空任务。

void SequenceManagerImpl::DeletePendingTasks() {
  // 要求task_execution_stack必须是空,即不能在某个任务中执行
  DCHECK(main_thread_only().task_execution_stack.empty())
      << "Tasks should be deleted outside RunLoop";
  for (TaskQueueImpl* task_queue : main_thread_only().active_queues)
    task_queue->DeletePendingTasks();
  for (const auto& it : main_thread_only().queues_to_gracefully_shutdown)
    it.first->DeletePendingTasks();
  for (const auto& it : main_thread_only().queues_to_delete)
    it.first->DeletePendingTasks();
}

// 针对每个TaskQueueImpl,就是清空内中的4个队列
void TaskQueueImpl::DeletePendingTasks() {
  main_thread_only().delayed_work_queue->DeletePendingTasks();
  main_thread_only().immediate_work_queue->DeletePendingTasks();
  // TODO(altimin): Add clear() method to DelayedIncomingQueue.
  DelayedIncomingQueue queue_to_delete;
  main_thread_only().delayed_incoming_queue.swap(&queue_to_delete);
  TaskDeque deque;
  {
    ...
    deque.swap(any_thread_.immediate_incoming_queue);
  ...
  }
  ...
}

SequenceManagerImpl::DeletePendingTasks是Chromium提供的清任务函数,要把它加入到RunLoop的析构函数。

RunLoop::~RunLoop() {
  // ~RunLoop() must happen-after the RunLoop is done running but it doesn't
  // have to be on |sequence_checker_| (it usually is but sometimes it can be a
  // member of a RefCountedThreadSafe object and be destroyed on another thread
  // after being quit).
  DCHECK(!running_);
  if (GetTlsDelegate().Get()) {
    // for "ThreadPoolServiceThread", this delegate_ has deleted, but delegate_ isn't empty, reference to Delegate::~Delegate(). .
    DCHECK(delegate_);
    delegate_->DeletePendingTasksEx(false);
  }
}

void ThreadControllerWithMessagePumpImpl::DeletePendingTasksEx(bool force) {
  if (!runloop_quit_with_delete_tasks_ && !force) {
	  return;
  }
  DCHECK(main_thread_only().task_source);
  main_thread_only().task_source->DeletePendingTasks();
}

清空任务逻辑是这样:如果ThreadControllerWithMessagePumpImpl设了runloop_quit_with_delete_tasks_=true,基于它的RunLoop::Run被析构时,会自动清空任务队列。runloop_quit_with_delete_tasks_默认值是false,可用set_runloop_quit_with_delete_tasks修改它。

void ThreadControllerWithMessagePumpImpl::set_runloop_quit_with_delete_tasks(bool with_delete_tasks) {
  runloop_quit_with_delete_tasks_ = with_delete_tasks;
}

Rose主线程的ThreadControllerWithMessagePumpImpl由base::test::TaskEnvironment构造,默认就把runloop_quit_with_delete_tasks_设为true。即基于Rose写的app,对主线程中的RunLoop,析构时一定会清空任务队列。

回看SequenceManagerImpl::DeletePendingTasks,它要求task_execution_stack必须是空,即不能在某个任务中执行。这导致一个问题,如何在任务中执行清空任务队列。

void base::Thread::SetRequireDeletePendingTasks(base::WaitableEvent& e);

Thread::SetRequireDeletePendingTasks是Rose提供的清队列api,必须在任务中调用。由于调用时刻是在任务中,chromium要等到该任务退出了再去清队列。

TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl(LazyNow* continuation_lazy_now) {
  for (int i = 0; i < main_thread_only().work_batch_size; i++) {
    Task* task = main_thread_only().task_source->SelectNextTask(select_task_option);
    {
      // 执行任务。在此期间app调用SetRequireDeletePendingTasks。
      task_annotator_.RunTask("SequenceManager RunTask", task);
      // app自写的任务执行结束,app应该等待e变有信号。

      // DidRunTask会从main_thread_only().task_execution_stack移除此任务,如果task_execution_stack本来只有一个任务(不嵌套的话,都如此),将变空。意味着可以调用DeletePendingTasks。
      main_thread_only().task_source->DidRunTask();
    }
    main_thread_only().run_level_tracker.OnTaskEnded();

    if (delete_pending_tasks_e_ != nullptr) {
      // 调用DeletePendingTasks清空任务
      main_thread_only().task_source->DeletePendingTasks();
      // 给参数e发信号,通知app,清任务操作完成。这样app就可放心知道队列被已被清空。若想继续投递其它任务,也可投递了。
      delete_pending_tasks_e_->Signal();
      delete_pending_tasks_e_ = nullptr;
    }
  }
}
  • 必须在任务中调用SetRequireDeletePendingTasks。
  • app负责创建、销毁参数e。调用SetRequireDeletePendingTasks时,e必须无信号。
  • app必须等e变有信号后才能继续执行和任务有关的操作,像Thread::Reset。Reset会向队列投递ThreadQuitHelper,要是SetRequireDeletePendingTasks一返回就立刻调用Reset,有可能会使新投递的ThreadQuitHelper被DeletePendingTasks清掉!——DoWorkImpl也是要等SetRequireDeletePendingTasks结束后才调用DeletePendingTasks。

全部评论: 0

    写评论: