(run_loop.cc)GetTlsDelegate、thread_task_runner_tls
(run_loop.cc)GetTlsDelegate和thread_task_runner_tls是TLS(线程局部存储)中两个重要变量。(run_loop.cc)GetTlsDelegate指run_loop.cc中定义的一个叫GetTlsDelegate()的函数,对应设置该变量的函数是GetTlsDelegate().Set。thread_task_runner_tls的设置、读取则使用了延迟构造实例LazyInstance。
在类型上,(run_loop.cc)GetTlsDelegate虽是RunLoop::Delegate,但其实就是ThreadControllerWithMessagePumpImpl。它的作用是什么?——只要是这线程中创建的RunLoop,都会让共享这个ThreadControllerWithMessagePumpImpl。
thread_task_runner_tls是SingleThreadTaskRunner(下面称TaskRunner),type是TYPE_IO时,对应的真实类型是MessageLoopTaskRunner。在ThreadControllerWithMessagePumpImpl::InitializeThreadTaskRunnerHandle()中被赋值。它的作用是什么?——像句柄,通过它,调用者可以向线程模型添加任务。参考“2.1 app如何投递任务”。
一、模型四结构
SequenceManagerImpl(SequenceManager)、ThreadControllerWithMessagePumpImpl(ThreadController)、MessagePumpForIO(MessagePump)和RunLoop是线程模型的4个结构,其中前三个构成了线程模型的“静态”结构,它们每个模型各一个,模型创建时创建、销毁时销毁。RunLoop是线程模型中的动态对象,想处理任务了,直接用new base::RunLoop()构造,用完即可销毁;下一要用时再创建、然后销毁。
1.1 静态结构:ThreadControllerWithMessagePumpImpl和MessagePumpForIO
SequenceManagerImpl(SequenceManager)、ThreadControllerWithMessagePumpImpl(ThreadController)、MessagePumpForIO(MessagePump)组成了线程模型的“静态”结构,它们每个模型各一个。在层次上,可认为SequenceManager是第一层,它管理着ThreadController、MessagePump。ThreadController、MessagePump之间是平级关系,相互“独立”。它们被创建后,位在上层的SequenceManager调用ThreadController的BindToCurrentThread方法把MessagePump绑定到ThreadController,即赋值给当中的成员变量pump_。因为MessagePump实例最终是存储在ThreadController,所以相互“独立”中的独立加了引号。另外ThreadControllerWithMessagePumpImpl::BindToCurrentThread还会作一件事情,把此个ThreadController放入线程的本地存储,这样之后创建的RunLoop就可共享这个RunLoop::Delegate了。以下是创建三个对象及绑定的步骤。
- MessagePump::Create创建MessagePumpForIO。谁凋用MessagePump::Create?对非base::Thread类型的线程,通过全局函数CreateSequenceManagerForMainThreadType。
- 创建ThreadControllerWithMessagePumpImpl。
- 创建管理结构SequenceManagerImpl。需要ThreadControllerWithMessagePumpImpl作为输入参数。
- (ThreadControllerWithMessagePumpImpl::BindToCurrentThread)把MessagePumpForIO绑定到ThreadControllerWithMessagePumpImpl。
- (ThreadControllerWithMessagePumpImpl::BindToCurrentThread)把此个ThreadControllerWithMessagePumpImpl设置到(run_loop.cc)GetTlsDelegate,使得只要是这线程中创建的RunLoop,都会让共享这个ThreadControllerWithMessagePumpImpl。
1.2 RunLoop(任务循环)
RunLoop是线程模型中的动态对象,RunLoop血缘上是独立的,即不是任何类的基类或派生类。想处理任务了,直接用new base::RunLoop()构造。
std::unique_ptr<base::RunLoop> loop(new base::RunLoop());
RunLoop::Run()阻塞式执行任务循环,循环不执行完不退出。
RunLoop::Delegate* delegate_; void RunLoop::Run() { if (!BeforeRun()) return; delegate_->Run(application_tasks_allowed); AfterRun(); }
中间的delegate_->Run是主体,那delegate_是什么?——类型是RunLoop::Delegate。RunLoop::Delegate是半协议类,真实执行时需要从它派生、实现出自个派生类,这个类就是ThreadControllerWithMessagePumpImpl。但是ThreadControllerWithMessagePumpImpl不仅继承自RunLoop::Delegate,还有MessagePump::Delegate,图1中后面的MessagePumpForIO须要这个继承关系。
构造RunLoop时用不须要参数的new base::RunLoop(),ThreadControllerWithMessagePumpImpl实例是如何传给成员变量delegate_?
RunLoop::RunLoop(Type type) : delegate_(GetTlsDelegate().Get()) , ... {...}
前面已说过,(run_loop.cc)GetTlsDelegate().Get()返回该线程中的ThreadControllerWithMessagePumpImpl实例,所以只要是这线程中创建的RunLoop,都会共享这个ThreadControllerWithMessagePumpImpl。
综上所述,delegate_->Run就是ThreadControllerWithMessagePumpImpl::Run,接下看它执行什么。
std::unique_ptr<MessagePump> pump_; void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed, TimeDelta timeout) { pump_->Run(this); }
pump_类型中MessagePump,MessagePump是个半协议类,真正执行是它的派生类,这个派生类根据不同系统不同定义,像windows是MessagePumpWin,换句话说,是创建过程中被绑定到自已的MessagePumpForIO。
图1中,MessagePumpForIO::DoRunLoop()是任务循环主体,它围绕一个任务队列(TaskQueue)不断地进行循环,直到被通知停止为止。在围绕任务队列循环期间,它会不断地检查任务队列是否为空。如果不为空,那么就会将里面的任务(Task)取出来,并且进行处理。这样,一个线程如果要请求另外一个线程执行某一个操作,那么只需要将该操作封装成一个任务,并且发送到目标线程的任务队列去即可。
在代码中,可认为一个任务对应一次函数回调。
1.3 退出RunLoop::Run、DoIdleWork
有两种方法退出RunLoop::Run,一是让RunLoop::BeforeRun返回false。二是调用MessagePump::Quit()。
- 让RunLoop::BeforeRun返回false。RunLoop::Run首先会调用BeforeRun,后者返回false就会退出Run。BeforeRun返回false的条件是quit_called_等于true。调用RunLoop::Quit()可以把quit_called_设为true。但是,执行BeforeRun时机是Run刚开始运行、还没执行任何任务,这个退出方法往往没机会用。
- 调用MessagePump::Quit()。这个方法基于的规则:当RunLoop::Run进入delegate_->Run后,执行的是MessagePump::Run,只要退出了MessagePump::Run就意味着退出RunLoop::Run。MessagePump::Quit()是个纯虚方法,它给派生类制定了一种机制:当调用该函数后,必须能退出MessagePump::Run。以派生类MessagePumpForIO为例,执行Quit时会把should_quit置为true。针对MessagePumpForIO::DoRunLoop,只要执行过一个任务就会检查should_quit,一旦发现是true就会退出DoRunLoop。
void MessagePumpWin::Quit() { DCHECK(state_); state_->should_quit = true; }
因为第一种方法往往没机会用,以下退出RunLoop::Run指的是用第二种方法,即调用MessagePump::Quit(),具体到调用代码是“pump_->Quit()”。
DoIdleWork在DoRunLoop,主要功能是退出RunLoop::Run。
bool ThreadControllerWithMessagePumpImpl::DoIdleWork() { if (ProcessNextDelayedNonNestableTask()) return true; if (ShouldQuitWhenIdle()) pump_->Quit(); ... return false; } bool RunLoop::Delegate::ShouldQuitWhenIdle() { const auto* top_loop = active_run_loops_.top(); if (top_loop->quit_when_idle_received_) { TRACE_EVENT_WITH_FLOW0("toplevel.flow", "RunLoop_ExitedOnIdle", TRACE_ID_LOCAL(top_loop), TRACE_EVENT_FLAG_FLOW_IN); return true; } return false; }
由以上代码可得知,当该线程最项端RunLoop的quit_when_idle_received_是true时,执行DoIdleWork将导致退出RunLoop::Run。有什么方法让quit_when_idle_received_置为true?1)RunLoop::QuitWhenIdle(),2)RunLoop::QuitCurrentWhenIdleDeprecated()(static函数),3)向它投递base::MessageLoop::QuitWhenIdleClosure()。以下是几种退出RunLoop::Run方法。
- RunLoop::Quit()。类似直接调用pump_->Quit(),执行完该语句所在的函数后就退出,如果除该语句所在任务外还有任务,在退出前这些任务会不会被执行?——不会,delegate->DoWork()/delegate->DoDelayedWork(&delayed_work_time_)最多执行一个任务,执行完一个任务后,会返回true(more_work_is_plausible=true),指示队列中不排除还有任务。退出Run不用等到DoIdleWork。
- RunLoop::QuitWhenIdle()。退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。
- RunLoop::QuitCurrentWhenIdleDeprecated()(static函数)。基于QuitWhenIdle,退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。
- base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, base::MessageLoop::QuitWhenIdleClosure())。基于QuitWhenIdle,退出Run要等到DoIdleWork,执行和退出中间可能会执行其它任务。
2、3、4方法有个共同点,只是把顶端RunLoop的quit_when_idle_received_置为true,以便DoRunLoop执行到DoIdleWork时可退出任务循环。这种方法有个要特别注意地方,要是任务循环中一直有任务,那more_work_is_plausible一直将是true,导致DoRunLoop不会执行DoIdleWork,也就无法退出任务循环。举个例子,rose在实现http时(url_request_http_job_rose.cc),要持续向循环投递show_slice,导致循环中一直有这个任务,所以只能用第一种方法。
总来的说,安全方法还是认为在发出退出后,那些积压的任务还是可能会被执行的,那些任务需考虑判断RunLoop是否处于正处于退出阶段,app估计需增加私有closing变量。
针对RunLoop::Quit(),把quit_called_设为true后,后续的delegate->Quit()会调用pump->Quit(),即第二种方法。
void RunLoop::Quit() { ... quit_called_ = true; if (running_ && delegate_->active_run_loops_.top() == this) { // This is the inner-most RunLoop, so quit now. delegate_->Quit(); } }
如果在本线程的任务中执行退出,是否有效?——有效。当然,对调用RunLoop::Quit()时,即直接调用pump_->Quit(),必须等到调用该语句所在任务执行完后才会出。
1.4 在非Thread创建的线程使用线程模型
chromium中,对base::Thread线程,Thread::StartWithOptions会创建线程模型三静态结构及准备任务队列。但那些非base::Thread创建的线程,该怎么使用线程模型。
void trtspcapture::VideoReceiveStream::DecodeThreadFunction(void* ptr) { scoped_refptr<base::sequence_manager::TaskQueue> task_queue_; scoped_refptr<base::SingleThreadTaskRunner> task_runner_; std::unique_ptr<base::sequence_manager::SequenceManager> sequence_manager_ = base::test::CreateSequenceManagerForMainThreadType(base::test::TaskEnvironment::MainThreadType::IO); sequence_manager_->set_runloop_quit_with_delete_tasks(true); { task_queue_ = sequence_manager_->CreateTaskQueue( base::sequence_manager::TaskQueue::Spec("task_environment_default") .SetTimeDomain(nullptr)); task_runner_ = task_queue_->task_runner(); sequence_manager_->SetDefaultTaskRunner(task_runner_); // simple_task_executor_ = std::make_unique<SimpleTaskExecutor>(task_runner_); CHECK(base::ThreadTaskRunnerHandle::IsSet()) << "ThreadTaskRunnerHandle should've been set now."; // CompleteInitialization(); } while (static_cast<trtspcapture::VideoReceiveStream*>(ptr)->Decode()) {} }
以上代码是“Chromium(3/4):rtsp客户端”中DecodingThread的线程函数,DecodingThread是webrtc创建。为了向socket thread接收帧,它需要一个消息循环。正如上面代码所示,创建线程模型三结构只要一条CreateSequenceManagerForMainThreadType就够了,后面set_runloop_quit_with_delete_tasks和清空任务队列有关,详见后面的“2.3 清空任务队列”。
chromium已考虑到有app会在主线程使用线程模型,提供了一个叫base::test::TaskEnvironment类,只要构造该类的对象就可在该线程使用线程模型。为简化使用,Rose略微修改了base::rose::TaskEnvironment。具体实现上,base_instance内置类型是TaskEnvironment的成员chromium_env_,base_instance构造函数中被构造。
base_instance::base_instance(...) : chromium_env_(base::test::TaskEnvironment::TimeSource::SYSTEM_TIME, base::test::TaskEnvironment::MainThreadType::IO) , ...... {......}
构造TaskEnvironment时,参数TimeSource要赋值TimeSource::SYSTEM_TIME,否则使用的时钟将不是实时时钟,会使得PostDelayTask都出时间不对问题。
二、投递任务(PostTask)
2.1 app如何投递任务
以下是app投递任务的三种方法。
// 已拥有一个base::Thread的指针。一般用于向其它线程投递任务 thread_->task_runner()->PostTask/PostDelayedTask // 没有base::Thread指针。要投递到本代码正在运行的、线程中的任务队列。图1显示了SequencedTaskRunner、SingleThreadTaskRunner是父子关系。 base::SequencedTaskRunnerHandle::Get()->PostTask/PostDelayedTask; base::ThreadTaskRunnerHandle::Get()->PostTask/PostDelayedTask;
SequencedTaskRunnerHandle::Get()得到是一个SequencedTaskRunner指针。它是个static方法。线程A构造出SequencedTaskRunnerHandle后,会把它设为局部存储sequenced_task_runner_tls。SequencedTaskRunnerHandle::Get()执行的是访问局部存储,把sequenced_task_runner_tls中成员task_runner_(不是sequenced_task_runner_tls)值返回给app。
SequencedTaskRunner是如何和TaskQueueImpl实现绑定的?——构造TaskQueueImpl时,会构造成员task_poster_,task_poster_.outer_指向自已。构造结束后,会调用TaskQueueImpl::CreateTaskRunner创建TaskRunner,并把成员变量task_poster_赋值给TaskRunner的同名成员变量。这两种对象中,task_poster_类型都是scoped_refptr<GuardedTaskPoster>,因而它们指向的是同一个对象,必须它们都已被销毁后,task_poster_这对象才会销毁。
类似sequenced_task_runner_tls,ThreadTaskRunnerHandle::Get()访问的是线程局部存储thread_task_runner_tls,得到的值类型SingleThreadTaskRunner。
综上所述,app通过SequencedTaskRunnerHandle::Get()或ThreadTaskRunnerHandle::Get()得到指向本线程TaskRunner指针,调用它的PostTask。通过内部成员,PostTask会找到此个TaskRunner绑定到的TaskQueueImpl,进而调用TaskQueueImpl::PostTask。
2.2 存储、转移任务
参考图1右半部,SequenceManagerImpl把任务放在三个队列:active_queues、queues_to_gracefully_shutdown、queues_to_delete,列队的类型都是TaskQueueImpl。通常只用active_queues。
对每个TaskQueueImpl,内部又有四个队列,两个incoming队列(xxx_incoming_queue)、两个work队列(类型WorkQueue)。不论即时还是延时,任务首先进入incoming队列。执行DoWorkImpl时,任务必须先移到work队列。一旦任务进入work队列,不管这任务是即时还是延时,都意味着任务可以立即执行。chromium把work队列还分两个,猜测是用于后绪选择哪个队列去执行的优先级判断。
图2显示了这4个队列、以及任务在队列之间的移动。从incoming队列移动到work队列,都在SelectNextTaskImpl中执行。
Task* SequenceManagerImpl::SelectNextTaskImpl(SelectTaskOption option) { // 加载即时work队列(main_thread_only().immediate_work_queue)。方法是把any_thread_.immediate_incoming_queue交换给它。具体执行的函数是TaskQueueImpl::ReloadEmptyImmediateWorkQueue ReloadEmptyWorkQueues(); LazyNow lazy_now(controller_->GetClock()); // MoveReadyDelayedTasksToWorkQueue负责把到期的延时任务从delayed_incoming_queue移到delayed_work_queue, MoveReadyDelayedTasksToWorkQueues(&lazy_now); }
- SequencedTaskSource执行存储、转移任务等操作,但SequencedTaskSource是协议类,实现它的是SequenceManagerImpl。也就是说SequenceManagerImpl负责存储任务。
- 在SequenceManagerImpl构造函数会把自已传给ThreadControllerWithMessagePumpImpl,后者把它以指针形式放在自已的MainThreadOnly.task_source。通过task_source,ThreadControllerWithMessagePumpImpl可去操作任务了。
- 任务循环中,具体执行任务的函数是ThreadControllerWithMessagePumpImpl::DoWorkImpl,一次DoWorkImpl包括了提取任务,执行任务。那一次最多处理多少个任务?由MainThreadOnly.work_batch_size决定。work_batch_size不是指任务队列中有多少个任务,而是DoWorkImpl一次最多处理多少个任务,这个值默认是1。运行过程中极可能也不会改。
- 提取任务。MainThreadOnly.task_source->SelectNextTask执行提取任务。即通过SequencedTaskSource.SelectNextTask得到当前要执行的任务,
- 执行任务。得到任务(Task*)后,task_annotator_.RunTask("SequenceManager RunTask", task);执行这个任务。执行任务无非是运行一个函数,为什么要用一个专门类?chromium为让第三方工具可以知道内部正在干什么,每执行一个任务要输出track信息。
2.3 清空任务队列
如果不做修改,析构RunLoop不会清空SequenceManagerImpl中的任务,这可能会造成不便。还是以“Chromium(3/4):rtsp客户端”中的DecodingThread为例,它向socket线程投递收一帧的任务后,就向自已投递5秒后执行的Delayed任务:quit_runloop。如果5秒内socket线程收到一帧,RunLoop退出,否则执行quit_runloop,叫退出RunLoop。也就是说,当第N次是5秒内退出时,此次投递的quit_runloop将不会被执行,而是放在了TaskQueue中的delayed_tasks_,当运行到第N+i次RunLoop::Run,时间离第N次投递时刻超过5秒,于是调用第N次投递的quit_runloop,从而造成本次RunLoop不该有的退出。为此有个疑问,有没有办法主动清空任务。
void SequenceManagerImpl::DeletePendingTasks() { // 要求task_execution_stack必须是空,即不能在某个任务中执行 DCHECK(main_thread_only().task_execution_stack.empty()) << "Tasks should be deleted outside RunLoop"; for (TaskQueueImpl* task_queue : main_thread_only().active_queues) task_queue->DeletePendingTasks(); for (const auto& it : main_thread_only().queues_to_gracefully_shutdown) it.first->DeletePendingTasks(); for (const auto& it : main_thread_only().queues_to_delete) it.first->DeletePendingTasks(); } // 针对每个TaskQueueImpl,就是清空内中的4个队列 void TaskQueueImpl::DeletePendingTasks() { main_thread_only().delayed_work_queue->DeletePendingTasks(); main_thread_only().immediate_work_queue->DeletePendingTasks(); // TODO(altimin): Add clear() method to DelayedIncomingQueue. DelayedIncomingQueue queue_to_delete; main_thread_only().delayed_incoming_queue.swap(&queue_to_delete); TaskDeque deque; { ... deque.swap(any_thread_.immediate_incoming_queue); ... } ... }
SequenceManagerImpl::DeletePendingTasks是Chromium提供的清任务函数,要把它加入到RunLoop的析构函数。
RunLoop::~RunLoop() { // ~RunLoop() must happen-after the RunLoop is done running but it doesn't // have to be on |sequence_checker_| (it usually is but sometimes it can be a // member of a RefCountedThreadSafe object and be destroyed on another thread // after being quit). DCHECK(!running_); if (GetTlsDelegate().Get()) { // for "ThreadPoolServiceThread", this delegate_ has deleted, but delegate_ isn't empty, reference to Delegate::~Delegate(). . DCHECK(delegate_); delegate_->DeletePendingTasksEx(false); } } void ThreadControllerWithMessagePumpImpl::DeletePendingTasksEx(bool force) { if (!runloop_quit_with_delete_tasks_ && !force) { return; } DCHECK(main_thread_only().task_source); main_thread_only().task_source->DeletePendingTasks(); }
清空任务逻辑是这样:如果ThreadControllerWithMessagePumpImpl设了runloop_quit_with_delete_tasks_=true,基于它的RunLoop::Run被析构时,会自动清空任务队列。runloop_quit_with_delete_tasks_默认值是false,可用set_runloop_quit_with_delete_tasks修改它。
void ThreadControllerWithMessagePumpImpl::set_runloop_quit_with_delete_tasks(bool with_delete_tasks) { runloop_quit_with_delete_tasks_ = with_delete_tasks; }
Rose主线程的ThreadControllerWithMessagePumpImpl由base::test::TaskEnvironment构造,默认就把runloop_quit_with_delete_tasks_设为true。即基于Rose写的app,对主线程中的RunLoop,析构时一定会清空任务队列。
回看SequenceManagerImpl::DeletePendingTasks,它要求task_execution_stack必须是空,即不能在某个任务中执行。这导致一个问题,如何在任务中执行清空任务队列。
void base::Thread::SetRequireDeletePendingTasks(base::WaitableEvent& e);
Thread::SetRequireDeletePendingTasks是Rose提供的清队列api,必须在任务中调用。由于调用时刻是在任务中,chromium要等到该任务退出了再去清队列。
TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl(LazyNow* continuation_lazy_now) { for (int i = 0; i < main_thread_only().work_batch_size; i++) { Task* task = main_thread_only().task_source->SelectNextTask(select_task_option); { // 执行任务。在此期间app调用SetRequireDeletePendingTasks。 task_annotator_.RunTask("SequenceManager RunTask", task); // app自写的任务执行结束,app应该等待e变有信号。 // DidRunTask会从main_thread_only().task_execution_stack移除此任务,如果task_execution_stack本来只有一个任务(不嵌套的话,都如此),将变空。意味着可以调用DeletePendingTasks。 main_thread_only().task_source->DidRunTask(); } main_thread_only().run_level_tracker.OnTaskEnded(); if (delete_pending_tasks_e_ != nullptr) { // 调用DeletePendingTasks清空任务 main_thread_only().task_source->DeletePendingTasks(); // 给参数e发信号,通知app,清任务操作完成。这样app就可放心知道队列被已被清空。若想继续投递其它任务,也可投递了。 delete_pending_tasks_e_->Signal(); delete_pending_tasks_e_ = nullptr; } } }
- 必须在任务中调用SetRequireDeletePendingTasks。
- app负责创建、销毁参数e。调用SetRequireDeletePendingTasks时,e必须无信号。
- app必须等e变有信号后才能继续执行和任务有关的操作,像Thread::Reset。Reset会向队列投递ThreadQuitHelper,要是SetRequireDeletePendingTasks一返回就立刻调用Reset,有可能会使新投递的ThreadQuitHelper被DeletePendingTasks清掉!——DoWorkImpl也是要等SetRequireDeletePendingTasks结束后才调用DeletePendingTasks。