Ros

ros的CallbackQueue机制

术语

  • cb。表示app注册向ros的回调函数。
  • 消息。(topic机制)publisher发布的topic,(server机制)client发出的请求。往往作为cb的一个参数。

CallbackQueue设计了么这一套消息处理框架,可以让不同线程,甚至不同进程,能一致性处理消息的产生和消费。当中有Callback字眼,是说消费时,使用的方式是回调app向ros注册的那些个cb。像(topic机制)subcriber收到topic;(server机制)server收到client发来的请求;(action机制)server收到client发来的goal、cancel,client收到server发来的feedback、status、result。

CallbackQueue中的callbacks_存储着消息,数据结构是双端队列(std::deque)。单个消息在队列中的存储方式是boost::shared_ptr。因而挪动callbacks_中消息时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。

callbacks_中单元存储的信息不仅仅是cb参数,像sensor_msgs::LaserScan,还包括app注册的函数。因而有了该单元就可直接回调cb了。

CallbackQueue::callAvailable负责消费消息,它在哪线程执行,决定了CallbackQueue中那些个处理消息的cb是运行在哪线程。

 

一、CallbackQueue::callAvailable

功能是收集上次callAvailable到此次callAvailable之间生产者产生的任务,并把这些任务挪到更安全的tls->callback。然后调用callOneCB逐个处理任务。另外,若进入时callback_没有新任务,而且参数timeout不是0,函数会先等待condition_事件。

为确保在消费者(topic机制中的subcriber)线程调用cb,要让消费者线程调用callAvailable。

class CallbackQueue : public CallbackQueueInterface
{
  struct CallbackInfo
  {
    CallbackInfo()
    : removal_id(0)
    , marked_for_removal(false)
    {}
    // 封装了topic数据及cb。
    // CallbackInterface是基类,实际存储时:
    // topic机制中类型是SubscriptionQueue。server机制中类型是ServiceCallback。
    CallbackInterfacePtr callback;
    uint64_t removal_id;
    // 一旦为true,表示此个任务已被移除,不须要真的去回调app注册的cb。
    bool marked_for_removal;
  };
  typedef std::list<CallbackInfo> L_CallbackInfo;
  typedef std::deque<CallbackInfo> D_CallbackInfo;
  D_CallbackInfo callbacks_;
};

<libros>/ros_comm/roscpp/src/libros/callback_queue.cpp
------
void CallbackQueue::callAvailable(ros::WallDuration timeout)
{
  setupTLS();
  TLS* tls = tls_.get();

  {
    boost::mutex::scoped_lock lock(mutex_);

    if (!enabled_)
    {
      return;
    }
    // 生产者负责填充callbacks_。
    // callbacks_是std::deque,队列中一个单元对应一个未处理任务。
    if (callbacks_.empty())
    {
      if (!timeout.isZero())
      { 
        // 生产者没产出新任务,等待timeout。
        // 生产者一旦把数据放到callbacks_,它会置condition_有信号。
        condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
      }

      if (callbacks_.empty() || !enabled_)
      {
        // 等了timeout还没数据,结束此次处理
        return;
      }
    }

    bool was_empty = tls->callbacks.empty();

    // 把此次新搜到的待处理任务callbacks_追加到另一个队列tls->callbacks。
    tls->callbacks.insert(tls->callbacks.end(), callbacks_.begin(), callbacks_.end());
    callbacks_.clear();

    calling_ += tls->callbacks.size();

    if (was_empty)
    {
      // 后绪callOnCB(tls)处理的是tls->cb_it指向的单元。
      tls->cb_it = tls->callbacks.begin();
    }
  }

  size_t called = 0;

  while (!tls->callbacks.empty())
  {
    if (callOneCB(tls) != Empty)
    {
      ++called;
    }
  }

  {
    boost::mutex::scoped_lock lock(mutex_);
    calling_ -= called;
  }
}

针对每个任务,调用callOneCB。callOneCB处理tls->cb_it指向的那个任务,当然,它往往就是tls->callbacks中的第一个任务。处理方法就是回调app注册向ros的cb。

顺便说下,CallbackQueue提供了另个版本的提取任务操作:callOne(timeout),它和callAvailable区别是每次最多只处理一个任务。

 

二、CallbackInterfacePtr、CallbackInterface

CallbackInterface用于封装一个任务。这个任务包括cb和数据(即cb的参数)。

callbacks_队列的单元类型是CallbackInfo(见上面代码)。CallbackInfo当中有个callback字段,其类型是CallbackInterfacePtr。

<libros>/include/ros/callback_queue_interface.h
------
class ROSCPP_DECL CallbackInterface
{
public:
  /**
   * \brief Possible results for the call() method
   */
  enum CallResult
  {
    Success,   ///< Call succeeded
    TryAgain,  ///< Call not ready, try again later
    Invalid,   ///< Call no longer valid
  };

  virtual ~CallbackInterface() {}

  /**
   * \brief Call this callback
   * \return The result of the call
   */
  virtual CallResult call() = 0;
  /**
   * \brief Provides the opportunity for specifying that a callback is not ready to be called
   * before call() actually takes place.
   */
  virtual bool ready() { return true; }
};
typedef boost::shared_ptr<CallbackInterface> CallbackInterfacePtr;

callback类型是boost::shared_ptr,因而挪动callbacks_中任务时,虽然有构造/析构boost::shared_ptr对象开销,但不会复制数据。

CallbackInterface::call会调用app注册的cb。

callbacks_限定了CallbackQueue能处理的数据类型,即数据必须是CallbackInterface。从以上定义可看出,CallbackInterface是个协议类,以下是ros中定义的CallbackInterface派生类。

派生类使用场景
SubscriptionQueuetopic机制
ServiceCallbackservice机制
TimerQueueCallback-
PeerConnDisconnCallback-

 

三、生产者、消费者通用逻辑

3.1 生产者(topic中的publisher,server中的client)

  1. (topic机制)publisher要publish新topic了,基于该topic生成SubscriptionQueue。(service机制)service收到client发来的网络请求了,基于请求生成ServiceCallback。
  2. 准备好CallbackInterface后,调用CallbackQueue::addCallback。addCallback主要操作是把此个CallbackInterface加入callbacks_,并调用condition_.notify_one()。

3.2 消费者(topic中的subcriber,server中的server)

  1. 创建ros::NodeHandle后,调用NodeHandle.setCallbackQueue,设置自个的CallbackQueue。
  2. 不断调用CallbackQueue::callAvailable(timeout)或CallbackQueue::callOne(ros::WallDuration timeout)。如果是轮询,timeout置为0。

CallbackQueue::callAvailable在哪线程调用决定了cb是运行在哪线程,因而以上操作会确保app注册的cb运行在消费者线程。

 

全部评论: 0

    写评论: