Ros

Publisher::publish源码分析

  • Ros规则:相同话题名必须相同消息类型。
  • 如何判断advertiese和subscribe是进程内关系?——同一进程共享内存空间,单例对象TopicManager记载着每次advertiese、subscribe。通过它们是否拥有相同的话题名称,并具有着相同的消息类型(用消息中md5sum字段),就可判断了。
  • 假设message是将要publish的消息,以下是publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。
  • Publication怎么知道系统中有哪些Subscription正订阅name_这个话题?——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。即使进程内有多次针对该topic name_的订阅,subscriber_links_也只一个单元和它们相对应,和这些多次对应的是Subscription中的callbacks_。
  • 支持在一进程内发布和订阅同名、同类型topic,这是nocopy-intra模式的基础。但在用它时,开发者需清晰知道这样做的结果,避免自已发送的被自已收到,造成无意义轮回。

在分析Publisher::publish源码前,让先看下Ros的一条规则:相同话题名必须相同消息类型。看TopicManager是如何利用这条规则存储advertise、subscribe的调用信息。

 

一、相同话题名必须相同消息类型

Ros中有这么条规则:系统内如果两次advertise同一个话题名,那这两次必须是相同的消息类型。举个例子,第一次advertise时,advertise的了话题名“/scan”,消息类型是sensor_msgs::LaserScan,那第二次advertise话题“/scan”时,消息类型必须是sensor_msgs::LaserScan。

图1 TopicManager中的advertised_topics_、subscriptions_

TopicManager是进程用来管理topic通信的类,采用单例设计模式,也就是说在同一个进程中,仅有一个TopicManager类对象,它管理了所有的topic相关的函数调用,它的初始化是在init函数。图1显示了它的两个成员变量advertised_topics_和subscriptions_的即时值。

std::vector<boost::shared_ptr<Publication> > advertised_topics_;
std::list<boost::shared_ptr<Subscription> > subscriptions_;

advertised_topics_存储着该进程advertise过的话题信息,subscriptions_则存储着该进程subscribe过的话题信息。在图1时刻,该进程已累计执行过三种advertiesed,分别是/rosout、/cmd_ver和/chatter;累计执行过4种subscribe,分别是/map、/tf、/tf_static和、/chatter。

可以多次advertise同一个话题。这时advertise第二个时,不会改修TopicManager::advertised_topics_,但该话题名对应的Publication的callbacks_长度会增1。

可以多次subscribe同一个话题。这时subscribe第二个时,不会改修TopicManager::subscriptions_,但该话题名对应的Subscription的callbacks_长度会增1,即callbacks_长度等于该进程内对同种话题订阅次数。一个进程内多次订阅同种话题,这是很容易发生的。举个例子,move_base_node要知道tf树,它就要订阅/tf话题,cartographer_node也想知道tf树,那它也要订阅/tf,这样callbacks_中的单元就至少两个了。

可以间接把话题名当做advertised_topics_、subscriptions_的key,以下是TopicManager提供的以话题名查找Publication的lookupPublicationWithoutLock。

PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
{
  PublicationPtr t;
  for (V_Publication::iterator i = advertised_topics_.begin();
       !t && i != advertised_topics_.end(); ++i)
  {
    if (((*i)->getName() == topic) && (!(*i)->isDropped()))
    {
      t = *i;
      break;
    }
  }

  return t;
}

 

二、Publisher::publish流程

图2 Publisher::publish函数栈

本文重点讨论进程内且非复制的Publish。

<libros>/include/ros/publisher.h
------
class ROSCPP_DECL Publisher
{
    template <typename M>
      void publish(const boost::shared_ptr<M>& message) const
    {
      using namespace serialization;

      if (!impl_)
        {
          ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
          return;
        }

      if (!impl_->isValid())
        {
          ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
          return;
        }

      ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(*message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(*message),
                     "Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
                     mt::datatype<M>(*message), mt::md5sum<M>(*message),
                     impl_->datatype_.c_str(), impl_->md5sum_.c_str());

      SerializedMessage m;
      m.type_info = &typeid(M);
      m.message = message;

      publish(boost::bind(serializeMessage<M>, boost::ref(*message)), m);
    }
}

NodeHandle.advertise注册发布者时得到一个Publisher对象。Publisher有两种public的publish方法声明:void publish(const M& message)和这里贴出的void publish(const boost::shared_ptr<M>& message)。对第一种,若不作修改,会强制使用序列化message逻辑。具体是到了调用TopicManager::publish(...)时,会进入“serialize = true”那个入口,这将导致序列化消息。为让第一种也能做到不执行序列化,代码中加了“if (ros::nocopy_intra())”判断,判断是true时改去执行这里贴出的以shared_ptr类型的传参版本publish。此个publish是针对进程内非复制,接下就要调用prviate通用版publish。调用时须生成个参数m,m的type_info、message赋上有效值,但buf值是nullptr。准备好m后,调用通用版publish。

<libros>/ros_comm/roscpp/src/libros/publisher.cpp
------
@serfunc:函数,执行它后,会对msg执行序列化。
@m:包含有msg的SerializedMessage。虽然用的类型是“已序列化消息”,但其实没执行序列化,只是准备好了序化时须要参数type_info、message。
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
  if (!impl_)
  {
    ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
    return;
  }

  if (!impl_->isValid())
  {
    ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
    return;
  }

  TopicManager::instance()->publish(impl_->topic_, serfunc, m);

  if (isLatched()) {
    boost::mutex::scoped_lock lock(impl_->last_message_mutex_);
    impl_->last_message_ = m;
  }
}

在确定此个Publisher已调用过有效的advertise后(impl_->isValid()==true),调用TopicManager单例对象的publish方法。由于整系统只一个TopicManager对象,在系统中所有NodeHandle都共享这个方法。

<libros>/ros_comm/roscpp/src/libros/topic_manager.cpp
------
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);

  if (isShuttingDown())
  {
    return;
  }

  // 之前已说过lookupPublicationWithoutLock,它的功能是根据话题名找到Publication。
  PublicationPtr p = lookupPublicationWithoutLock(topic);
  if (p->hasSubscribers() || p->isLatching())
  {
    // 此个publication有订阅者或者advertise(...)是设置latch属性,意味着有订阅者关心这消息
    ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

    // Determine what kinds of subscribers we're publishing to.  If they're intraprocess with the same C++ type we can
    // do a no-copy publish.
    // 确认要发布给的订阅者是什么类别,如果是进程内的,并且使用者相同的c++类型,就可以进行no-copy模式的publish
    // 这里定义两个标志nocopy和serialize。
    bool nocopy = false;
    bool serialize = false;

    // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
    if (m.type_info && m.message)
    {
      // 非复制版本pushlish会为m设置有效type_info、message。
      // 注意这里参数serialize、nocopy都是传值,getPublishTypes可能会修改serialize和nocopy的值
      p->getPublishTypes(serialize, nocopy, *m.type_info);
    }
    else
    {
      serialize = true;
    }

    if (!nocopy)
    {
      m.message.reset();
      m.type_info = 0;
    }

    if (serialize || p->isLatching())
    {
      // serfunc是个函数,执行它后,会对msg执行序列化,以下语句会执行序列化msg,结果存在m2。
      // 由于可能出现nocopy=true,那时m.message、type_info是有效的,此处只是把和序列化结果相关的字段进行赋值,不整个覆盖m。
      SerializedMessage m2 = serfunc();
      m.buf = m2.buf;
      m.num_bytes = m2.num_bytes;
      m.message_start = m2.message_start;
    }

    p->publish(m);

    // If we're not doing a serialized publish we don't need to signal the pollset.  The write()
    // call inside signal() is actually relatively expensive when doing a nocopy publish.
    if (serialize)
    {
      poll_manager_->getPollSet().signal();
    }
  }
  else
  {
    // 如果publication既没有订阅者,也没有latch属性,就执行自增序列就好,sequence一般是分布式系统的唯一id。
    p->incrementSequence();
  }
}

publication的latch属性。这个属性的含义是是否为订阅者保留最后一次发送的信息。如果该属性设为true,当新的订阅者订阅时,就必然会收到之前发布者所发布的最后一条消息。

针对进程内非复制场景,TopicManager::publish依次执行两个操作。

  1. p->getPublishTypes(...)判断此次发布是否是进程内非复制。
  2. p->publish(...),执行发布。

p->getPublishTypes只是根据当前订阅者情况计算出nocopy、serialize,不执行对发布有影响的操作。如果是进程内不复制,执行p->getPublishTypes后,nocopy=true,serialize=false。接下进入p->publish(...)。

<libros>/ros_comm/roscpp/src/libros/publication.cpp
------
void Publication::publish(SerializedMessage& m)
{
  if (m.message)
  {
    // nocopy == true
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    V_SubscriberLink::const_iterator it = subscriber_links_.begin();
    V_SubscriberLink::const_iterator end = subscriber_links_.end();
    for (; it != end; ++it)
    {
      const SubscriberLinkPtr& sub = *it;
      if (sub->isIntraprocess())
      {
        sub->enqueueMessage(m, false, true);
      }
    }

    m.message.reset();
  }

  if (m.buf)
  {
    // serialize || p->isLatching()。
    boost::mutex::scoped_lock lock(publish_queue_mutex_);
    publish_queue_.push_back(m);
  }
}

Publication怎么知道系统中有哪些Subscription正订阅name_这个话题,——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。

sub->enqueueMessage是个虚函数,进程内订阅对应的是IntraProcessSubscriberLink::enqueueMessage。

<libros>/ros_comm/roscpp/src/libros/intraprocess_subscriber_link.cpp
-------
void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
  if (dropped_)
  {
    return;
  }

  ROS_ASSERT(subscriber_);
  subscriber_->handleMessage(m, ser, nocopy);
}

在这里最让人难受的就是subscriber_的理解,它的声明出现在intra_processSubscriber_link.h文件中。

IntraProcessPublisherLinkPtr subscriber_;

可以这样理解,发布者向订阅者发布信息时,Publication会先从自己的订阅列表中找出进程内的订阅者IntraProcessSubscriberLink,然后就会让后者进行enqueueMessage操作,这个入队操作到下面实际又交给了IntraProcessPublisherLink对象,调用后者的handleMessage。

<libros>/ros_comm/roscpp/src/libros/intraprocess_publisher_link.cpp
------
void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
  if (dropped_)
  {
    return;
  }

  stats_.bytes_received_ += m.num_bytes;
  stats_.messages_received_++;

  SubscriptionPtr parent = parent_.lock();

  if (parent)
  {
    stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this());
  }
}

到这里,m这个message就已经成功交给了正订阅Publication.name_的Subscription,接下就是Subscription调用handleMessage处理m。

<libros>/ros_comm/roscpp/src/libros/subscription.cpp
------
uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
{
  boost::mutex::scoped_lock lock(callbacks_mutex_);

  uint32_t drops = 0;

  // 为提高构建MessageDeserializer效率,用了个缓存:cached_deserializers_。
  // 对这topic name_,若只有一个subscribe,会降低些效率,但有多个subscribe时应该会有提升。
  cached_deserializers_.clear();

  ros::Time receipt_time = ros::Time::now();

  for (V_CallbackInfo::iterator cb = callbacks_.begin();
       cb != callbacks_.end(); ++cb)
  {
    const CallbackInfoPtr& info = *cb;

    ROS_ASSERT(info->callback_queue_);

    const std::type_info* ti = &info->helper_->getTypeInfo();

    if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
    {
      // deserializer(MessageDeserializer)包含了m(SerializedMessage),
      // 最终放入订阅者的是deserializer,m是包含在了deserializer。
      // 虽然放置MessageDeserializer,不等于要对消息进行序列化,见MessageDeserializer::deserialize()。
      MessageDeserializerPtr deserializer;

      V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
      V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
      for (; des_it != des_end; ++des_it)
      {
        if (*des_it->first == *ti)
        {
          deserializer = des_it->second;
          break;
        }
      }

      if (!deserializer)
      {
        deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
        cached_deserializers_.push_back(std::make_pair(ti, deserializer));
      }

      // 到此已有一个包含m的deserializer
      bool was_full = false;
      bool nonconst_need_copy = false;
      if (callbacks_.size() > 1)
      {
        nonconst_need_copy = true;
      }
      // subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。
      // CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
      info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);

      if (was_full)
      {
        ++drops;
      }
      else
      {
        // 此个消息已放入subscription_queue_,CallbackQueue有新消息到了,CallbackQueue::callAvailable可醒来处理消息了。
        info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
      }
    }
  }

  ...
  cached_deserializers_.clear();

  return drops;
}

对同个topic name_,进程内可能存在多次subscribe,每次subscribe会向callbacks_增加一个单元,单元类型是Subscription::CallbackInfo。

<libros>/include/ros/subscription.h
------
class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
{
  struct CallbackInfo
  {
    // 它就是订阅者callAvailable时的CallbackQueue
    CallbackQueueInterface* callback_queue_;

    // Only used if callback_queue_ is non-NULL (NodeHandle API)
    SubscriptionCallbackHelperPtr helper_;

    // SubscriptionQueue基类是CallbackInterface。
    // CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
    SubscriptionQueuePtr subscription_queue_;

    bool has_tracked_object_;
    VoidConstWPtr tracked_object_;
  };
  typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
  typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
  V_CallbackInfo callbacks_;
};

callbacks_长度就是进程内针对这个topic name_的subscribe次数,Subscription::handleMessage会枚举这些个subscribe,对每个subscribe依次执行三个操作。

  1. 生成一个包含m的deserializer。最终放入订阅者的是deserializer。
  2. info->subscription_queue_->push(...)。把deserializer等信息放入订阅者队列。
  3. info->callback_queue_->addCallback(...)。此个消息已放入subscription_queue_,意味着CallbackQueue有新消息到了,通知CallbackQueue::callAvailable可醒来处理新消息。

subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback。结合CallbackQueue机制,有人可能会进入个误区:callAvailable从CallbackQueue的callbacks_(这个callbacks_和Subscription的callbacks_是完全不同的两个东西)取出一个cb后,该cb会被删除,那是不是把subscription_queue_销毁了?——不会。callAvailable取出一个cb后,的确会删除该cb,但cb中保存的类型是CallbackInterfacePtr,删除的是boost::shared_ptr<CallbackInterface>,而不是CallbackInterface,删除只是把引用计数减1。总的来说,在callAvailable执行完一次callOneCB后,subscription_queue_不会被删除,删除的是subscription_queue_内消息队列queue_中的一个单元(见后面的“SubscriptionQueue::call”)。和这个删除相对应,之前第二步的SubscriptionQueue::push则是向queue_增加此个单元。

假设message是将要publish的消息,小结publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。

 

二、SubscriptionQueue::call和反序列化消息

在CallbackQueue::callAvailable上下文,从CallbackQueue的callbacks_取出某个cb后,会调用该cb的call()方法,从而完成回调的具体操作。在topic机制中,cb对应类型是SubscriptionQueue。

<libros>/ros_comm/roscpp/src/libros/subscription_queue.cpp
------
CallbackInterface::CallResult SubscriptionQueue::call()
{
  // The callback may result in our own destruction.  Therefore, we may need to keep a reference to ourselves
  // that outlasts the scoped_try_lock
  boost::shared_ptr<SubscriptionQueue> self;
  boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);

  if (!allow_concurrent_callbacks_)
  {
    lock.try_lock();
    if (!lock.owns_lock())
    {
      return CallbackInterface::TryAgain;
    }
  }

  VoidConstPtr tracker;
  Item i;

  {
    boost::mutex::scoped_lock lock(queue_mutex_);

    if (queue_.empty())
    {
      return CallbackInterface::Invalid;
    }

    i = queue_.front();

    if (queue_.empty())
    {
      return CallbackInterface::Invalid;
    }

    if (i.has_tracked_object)
    {
      tracker = i.tracked_object.lock();

      if (!tracker)
      {
        return CallbackInterface::Invalid;
      }
    }

    queue_.pop_front();
    --queue_size_;
  }

  VoidConstPtr msg = i.deserializer->deserialize();

  // msg can be null here if deserialization failed
  if (msg)
  {
    try
    {
      self = shared_from_this();
    }
    catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr
    {}

    SubscriptionCallbackHelperCallParams params;
    params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
    i.helper->call(params);
  }

  return CallbackInterface::Success;
}

SubscriptionQueue::call依次会执行三个操作。

  1. 取出queue_队列的头单元,赋值给i。
  2. i.deserializer->deserialize()。反序列化出消息的原始数据msg。
  3. i.helper->call(params)。以msg为参数执行回调。

针对第二步,有人会有误区:一直说non-copy-intra是不序列化消息,这里怎么还是反序列化了?——不要被deserialize这名称误导了,其它没执行反序列化,让进入MessageDeserializer::deserialize()。

<libros>/ros_comm/roscpp/src/libros/message_deserializer.cpp
------
VoidConstPtr MessageDeserializer::deserialize()
{
  boost::mutex::scoped_lock lock(mutex_);

  if (msg_)
  {
    return msg_;
  }

  if (serialized_message_.message)
  {
    // 由于non-copy-intra时,message总是有效的,于是会进入这里。
    // 只是赋值而已,不会执行真正的反序列化操作helper_->deserialize。
    msg_ = serialized_message_.message;
    return msg_;
  }

  if (!serialized_message_.buf && serialized_message_.num_bytes > 0)
  {
    // If the buffer has been reset it means we tried to deserialize and failed
    return VoidConstPtr();
  }

  try
  {
    SubscriptionCallbackHelperDeserializeParams params;
    params.buffer = serialized_message_.message_start;
    params.length = serialized_message_.num_bytes - (serialized_message_.message_start - serialized_message_.buf.get());
    params.connection_header = connection_header_;
    msg_ = helper_->deserialize(params);
  }
  catch (std::exception& e)
  {
    ROS_ERROR("Exception thrown when deserializing message of length [%d] from [%s]: %s", (uint32_t)serialized_message_.num_bytes, (*connection_header_)["callerid"].c_str(), e.what());
  }

  serialized_message_.buf.reset();

  return msg_;
}

由于non-copy-intra时,serialized_message_.message总是有效,于是“msg_ = serialized_message_.message”,不会执行真正的反序列化操作helper_->deserialize。

全部评论: 0

    写评论: