术语
- cb。表示app注册向ros的回调函数。
- 消息。(topic机制)publisher发布的topic,(server机制)client发出的请求。往往作为cb的一个参数。
- CallbackQueue::callAvailable在哪线程调用,决定了cb是运行在哪线程。不是CallbackQueue在哪线程创建。
CallbackQueue设计了么这一套消息处理框架,可以让不同线程,甚至不同进程,能一致性处理消息的产生和消费。当中有Callback字眼,是说消费时,使用的方式是回调app向ros注册的那些个cb。像(topic机制)subcriber收到topic;(server机制)server收到client发来的请求;(action机制)server收到client发来的goal、cancel,client收到server发来的feedback、status、result。
CallbackQueue中的callbacks_存储着消息,数据结构是双端队列(std::deque)。单个消息在队列中的存储方式是boost::shared_ptr。因而挪动callbacks_中消息时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。
callbacks_中单元存储的信息不仅仅是cb参数,像sensor_msgs::LaserScan,还包括app注册的函数。因而有了该单元就可直接回调cb了。
CallbackQueue::callAvailable负责消费消息,它在哪线程执行,决定了CallbackQueue中那些个处理消息的cb是运行在哪线程。
一、CallbackQueue::callAvailable
功能是收集上次callAvailable到此次callAvailable之间生产者产生的任务,并把这些任务挪到更安全的tls->callback。然后调用callOneCB逐个处理任务。另外,若进入时callback_没有新任务,而且参数timeout不是0,函数会先等待condition_事件。
为确保在消费者(topic机制中的subcriber)线程调用cb,要让消费者线程调用callAvailable。
class CallbackQueue : public CallbackQueueInterface { struct CallbackInfo { CallbackInfo() : removal_id(0) , marked_for_removal(false) {} // 封装了topic数据及cb。 // CallbackInterface是基类,实际存储时: // topic机制中类型是SubscriptionQueue。server机制中类型是ServiceCallback。 CallbackInterfacePtr callback; uint64_t removal_id; // 一旦为true,表示此个任务已被移除,不须要真的去回调app注册的cb。 bool marked_for_removal; }; typedef std::list<CallbackInfo> L_CallbackInfo; typedef std::deque<CallbackInfo> D_CallbackInfo; D_CallbackInfo callbacks_; }; <libros>/ros_comm/roscpp/src/libros/callback_queue.cpp ------ void CallbackQueue::callAvailable(ros::WallDuration timeout) { setupTLS(); TLS* tls = tls_.get(); { boost::mutex::scoped_lock lock(mutex_); if (!enabled_) { return; } // 生产者负责填充callbacks_。 // callbacks_是std::deque,队列中一个单元对应一个未处理任务。 if (callbacks_.empty()) { if (!timeout.isZero()) { // 生产者没产出新任务,等待timeout。 // 生产者一旦把数据放到callbacks_,它会置condition_有信号。 condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec())); } if (callbacks_.empty() || !enabled_) { // 等了timeout还没数据,结束此次处理 return; } } bool was_empty = tls->callbacks.empty(); // 把此次新搜到的待处理任务callbacks_追加到另一个队列tls->callbacks。 tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end()); callbacks_.clear(); calling_ += tls->callbacks.size(); if (was_empty) { // 后绪callOnCB(tls)处理的是tls->cb_it指向的单元。 tls->cb_it = tls->callbacks.begin(); } } size_t called = 0; while (!tls->callbacks.empty()) { if (callOneCB(tls) != Empty) { ++called; } } { boost::mutex::scoped_lock lock(mutex_); calling_ -= called; } }
针对每个任务,调用callOneCB。callOneCB处理tls->cb_it指向的那个任务,当然,它往往就是tls->callbacks中的第一个任务。处理方法就是回调app注册向ros的cb。
顺便说下,CallbackQueue提供了另个版本的提取任务操作:callOne(timeout),它和callAvailable区别是每次最多只处理一个任务。
二、CallbackInterfacePtr、CallbackInterface
CallbackInterface用于封装一个任务。这个任务包括cb和数据(即cb的参数)。
callbacks_队列的单元类型是CallbackInfo(见上面代码)。CallbackInfo当中有个callback字段,其类型是CallbackInterfacePtr。
<libros>/include/ros/callback_queue_interface.h ------ class ROSCPP_DECL CallbackInterface { public: /** * \brief Possible results for the call() method */ enum CallResult { Success, ///< Call succeeded TryAgain, ///< Call not ready, try again later Invalid, ///< Call no longer valid }; virtual ~CallbackInterface() {} /** * \brief Call this callback * \return The result of the call */ virtual CallResult call() = 0; /** * \brief Provides the opportunity for specifying that a callback is not ready to be called * before call() actually takes place. */ virtual bool ready() { return true; } }; typedef boost::shared_ptr<CallbackInterface> CallbackInterfacePtr;
callback类型是boost::shared_ptr,因而挪动callbacks_中任务时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。
CallbackInterface::call会调用app注册的cb。
callbacks_限定了CallbackQueue能处理的数据类型,即数据必须是CallbackInterface。从以上定义可看出,CallbackInterface是个协议类,以下是ros中定义的CallbackInterface派生类。
派生类 | 使用场景 |
SubscriptionQueue | topic机制 |
ServiceCallback | service机制 |
TimerQueueCallback | - |
PeerConnDisconnCallback | - |
三、生产者、消费者通用逻辑
3.1 生产者(topic中的publisher,server中的client)
- (topic机制)publisher要publish新topic了,基于该topic生成SubscriptionQueue。(service机制)service收到client发来的网络请求了,基于请求生成ServiceCallback。
- 准备好CallbackInterface后,调用CallbackQueue::addCallback。addCallback主要操作是把此个CallbackInterface加入callbacks_,并调用condition_.notify_one()。
3.2 消费者(topic中的subcriber,server中的server)
- 创建ros::NodeHandle后,调用NodeHandle.setCallbackQueue,设置自个的CallbackQueue。
- 不断调用CallbackQueue::callAvailable(timeout)或CallbackQueue::callOne(ros::WallDuration timeout)。如果是轮询,timeout置为0。
CallbackQueue::callAvailable在哪线程调用决定了cb是运行在哪线程,因而以上操作会确保app注册的cb运行在消费者线程。