术语
- cb。表示app注册向ros的回调函数。
- 消息。(topic机制)publisher发布的topic,(server机制)client发出的请求。往往作为cb的一个参数。
- CallbackQueue::callAvailable在哪线程调用,决定了cb是运行在哪线程。不是CallbackQueue在哪线程创建。
CallbackQueue设计了么这一套消息处理框架,可以让不同线程,甚至不同进程,能一致性处理消息的产生和消费。当中有Callback字眼,是说消费时,使用的方式是回调app向ros注册的那些个cb。像(topic机制)subcriber收到topic;(server机制)server收到client发来的请求;(action机制)server收到client发来的goal、cancel,client收到server发来的feedback、status、result。
CallbackQueue中的callbacks_存储着消息,数据结构是双端队列(std::deque)。单个消息在队列中的存储方式是boost::shared_ptr。因而挪动callbacks_中消息时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。
callbacks_中单元存储的信息不仅仅是cb参数,像sensor_msgs::LaserScan,还包括app注册的函数。因而有了该单元就可直接回调cb了。
CallbackQueue::callAvailable负责消费消息,它在哪线程执行,决定了CallbackQueue中那些个处理消息的cb是运行在哪线程。
一、CallbackQueue::callAvailable
功能是收集上次callAvailable到此次callAvailable之间生产者产生的任务,并把这些任务挪到更安全的tls->callback。然后调用callOneCB逐个处理任务。另外,若进入时callback_没有新任务,而且参数timeout不是0,函数会先等待condition_事件。
为确保在消费者(topic机制中的subcriber)线程调用cb,要让消费者线程调用callAvailable。
class CallbackQueue : public CallbackQueueInterface
{
struct CallbackInfo
{
CallbackInfo()
: removal_id(0)
, marked_for_removal(false)
{}
// 封装了topic数据及cb。
// CallbackInterface是基类,实际存储时:
// topic机制中类型是SubscriptionQueue。server机制中类型是ServiceCallback。
CallbackInterfacePtr callback;
uint64_t removal_id;
// 一旦为true,表示此个任务已被移除,不须要真的去回调app注册的cb。
bool marked_for_removal;
};
typedef std::list<CallbackInfo> L_CallbackInfo;
typedef std::deque<CallbackInfo> D_CallbackInfo;
D_CallbackInfo callbacks_;
};
<libros>/ros_comm/roscpp/src/libros/callback_queue.cpp
------
void CallbackQueue::callAvailable(ros::WallDuration timeout)
{
setupTLS();
TLS* tls = tls_.get();
{
boost::mutex::scoped_lock lock(mutex_);
if (!enabled_)
{
return;
}
// 生产者负责填充callbacks_。
// callbacks_是std::deque,队列中一个单元对应一个未处理任务。
if (callbacks_.empty())
{
if (!timeout.isZero())
{
// 生产者没产出新任务,等待timeout。
// 生产者一旦把数据放到callbacks_,它会置condition_有信号。
condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
}
if (callbacks_.empty() || !enabled_)
{
// 等了timeout还没数据,结束此次处理
return;
}
}
bool was_empty = tls->callbacks.empty();
// 把此次新搜到的待处理任务callbacks_追加到另一个队列tls->callbacks。
tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
callbacks_.clear();
calling_ += tls->callbacks.size();
if (was_empty)
{
// 后绪callOnCB(tls)处理的是tls->cb_it指向的单元。
tls->cb_it = tls->callbacks.begin();
}
}
size_t called = 0;
while (!tls->callbacks.empty())
{
if (callOneCB(tls) != Empty)
{
++called;
}
}
{
boost::mutex::scoped_lock lock(mutex_);
calling_ -= called;
}
}针对每个任务,调用callOneCB。callOneCB处理tls->cb_it指向的那个任务,当然,它往往就是tls->callbacks中的第一个任务。处理方法就是回调app注册向ros的cb。
顺便说下,CallbackQueue提供了另个版本的提取任务操作:callOne(timeout),它和callAvailable区别是每次最多只处理一个任务。
二、CallbackInterfacePtr、CallbackInterface
CallbackInterface用于封装一个任务。这个任务包括cb和数据(即cb的参数)。
callbacks_队列的单元类型是CallbackInfo(见上面代码)。CallbackInfo当中有个callback字段,其类型是CallbackInterfacePtr。
<libros>/include/ros/callback_queue_interface.h
------
class ROSCPP_DECL CallbackInterface
{
public:
/**
* \brief Possible results for the call() method
*/
enum CallResult
{
Success, ///< Call succeeded
TryAgain, ///< Call not ready, try again later
Invalid, ///< Call no longer valid
};
virtual ~CallbackInterface() {}
/**
* \brief Call this callback
* \return The result of the call
*/
virtual CallResult call() = 0;
/**
* \brief Provides the opportunity for specifying that a callback is not ready to be called
* before call() actually takes place.
*/
virtual bool ready() { return true; }
};
typedef boost::shared_ptr<CallbackInterface> CallbackInterfacePtr;callback类型是boost::shared_ptr,因而挪动callbacks_中任务时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。
CallbackInterface::call会调用app注册的cb。
callbacks_限定了CallbackQueue能处理的数据类型,即数据必须是CallbackInterface。从以上定义可看出,CallbackInterface是个协议类,以下是ros中定义的CallbackInterface派生类。
| 派生类 | 使用场景 |
| SubscriptionQueue | topic机制 |
| ServiceCallback | service机制 |
| TimerQueueCallback | - |
| PeerConnDisconnCallback | - |
三、生产者、消费者通用逻辑
3.1 生产者(topic中的publisher,server中的client)
- (topic机制)publisher要publish新topic了,基于该topic生成SubscriptionQueue。(service机制)service收到client发来的网络请求了,基于请求生成ServiceCallback。
- 准备好CallbackInterface后,调用CallbackQueue::addCallback。addCallback主要操作是把此个CallbackInterface加入callbacks_,并调用condition_.notify_one()。
3.2 消费者(topic中的subcriber,server中的server)
- 创建ros::NodeHandle后,调用NodeHandle.setCallbackQueue,设置自个的CallbackQueue。
- 不断调用CallbackQueue::callAvailable(timeout)或CallbackQueue::callOne(ros::WallDuration timeout)。如果是轮询,timeout置为0。
CallbackQueue::callAvailable在哪线程调用决定了cb是运行在哪线程,因而以上操作会确保app注册的cb运行在消费者线程。