- Ros规则:相同话题名必须相同消息类型。
- 如何判断advertiese和subscribe是进程内关系?——同一进程共享内存空间,单例对象TopicManager记载着每次advertiese、subscribe。通过它们是否拥有相同的话题名称,并具有着相同的消息类型(用消息中md5sum字段),就可判断了。
- 假设message是将要publish的消息,以下是publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。
- Publication怎么知道系统中有哪些Subscription正订阅name_这个话题?——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。即使进程内有多次针对该topic name_的订阅,subscriber_links_也只一个单元和它们相对应,和这些多次对应的是Subscription中的callbacks_。
- 支持在一进程内发布和订阅同名、同类型topic,这是nocopy-intra模式的基础。但在用它时,开发者需清晰知道这样做的结果,避免自已发送的被自已收到,造成无意义轮回。
在分析Publisher::publish源码前,让先看下Ros的一条规则:相同话题名必须相同消息类型。看TopicManager是如何利用这条规则存储advertise、subscribe的调用信息。
一、相同话题名必须相同消息类型
Ros中有这么条规则:系统内如果两次advertise同一个话题名,那这两次必须是相同的消息类型。举个例子,第一次advertise时,advertise的了话题名“/scan”,消息类型是sensor_msgs::LaserScan,那第二次advertise话题“/scan”时,消息类型必须是sensor_msgs::LaserScan。

TopicManager是进程用来管理topic通信的类,采用单例设计模式,也就是说在同一个进程中,仅有一个TopicManager类对象,它管理了所有的topic相关的函数调用,它的初始化是在init函数。图1显示了它的两个成员变量advertised_topics_和subscriptions_的即时值。
std::vector<boost::shared_ptr<Publication> > advertised_topics_; std::list<boost::shared_ptr<Subscription> > subscriptions_;
advertised_topics_存储着该进程advertise过的话题信息,subscriptions_则存储着该进程subscribe过的话题信息。在图1时刻,该进程已累计执行过三种advertiesed,分别是/rosout、/cmd_ver和/chatter;累计执行过4种subscribe,分别是/map、/tf、/tf_static和、/chatter。
可以多次advertise同一个话题。这时advertise第二个时,不会改修TopicManager::advertised_topics_,但该话题名对应的Publication的callbacks_长度会增1。
可以多次subscribe同一个话题。这时subscribe第二个时,不会改修TopicManager::subscriptions_,但该话题名对应的Subscription的callbacks_长度会增1,即callbacks_长度等于该进程内对同种话题订阅次数。一个进程内多次订阅同种话题,这是很容易发生的。举个例子,move_base_node要知道tf树,它就要订阅/tf话题,cartographer_node也想知道tf树,那它也要订阅/tf,这样callbacks_中的单元就至少两个了。
可以间接把话题名当做advertised_topics_、subscriptions_的key,以下是TopicManager提供的以话题名查找Publication的lookupPublicationWithoutLock。
PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic) { PublicationPtr t; for (V_Publication::iterator i = advertised_topics_.begin(); !t && i != advertised_topics_.end(); ++i) { if (((*i)->getName() == topic) && (!(*i)->isDropped())) { t = *i; break; } } return t; }
二、Publisher::publish流程

本文重点讨论进程内且非复制的Publish。
<libros>/include/ros/publisher.h ------ class ROSCPP_DECL Publisher { template <typename M> void publish(const boost::shared_ptr<M>& message) const { using namespace serialization; if (!impl_) { ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher"); return; } if (!impl_->isValid()) { ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str()); return; } ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(*message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(*message), "Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]", mt::datatype<M>(*message), mt::md5sum<M>(*message), impl_->datatype_.c_str(), impl_->md5sum_.c_str()); SerializedMessage m; m.type_info = &typeid(M); m.message = message; publish(boost::bind(serializeMessage<M>, boost::ref(*message)), m); } }
NodeHandle.advertise注册发布者时得到一个Publisher对象。Publisher有两种public的publish方法声明:void publish(const M& message)和这里贴出的void publish(const boost::shared_ptr<M>& message)。对第一种,若不作修改,会强制使用序列化message逻辑。具体是到了调用TopicManager::publish(...)时,会进入“serialize = true”那个入口,这将导致序列化消息。为让第一种也能做到不执行序列化,代码中加了“if (ros::nocopy_intra())”判断,判断是true时改去执行这里贴出的以shared_ptr类型的传参版本publish。此个publish是针对进程内非复制,接下就要调用prviate通用版publish。调用时须生成个参数m,m的type_info、message赋上有效值,但buf值是nullptr。准备好m后,调用通用版publish。
<libros>/ros_comm/roscpp/src/libros/publisher.cpp ------ @serfunc:函数,执行它后,会对msg执行序列化。 @m:包含有msg的SerializedMessage。虽然用的类型是“已序列化消息”,但其实没执行序列化,只是准备好了序化时须要参数type_info、message。 void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const { if (!impl_) { ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str()); return; } if (!impl_->isValid()) { ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str()); return; } TopicManager::instance()->publish(impl_->topic_, serfunc, m); if (isLatched()) { boost::mutex::scoped_lock lock(impl_->last_message_mutex_); impl_->last_message_ = m; } }
在确定此个Publisher已调用过有效的advertise后(impl_->isValid()==true),调用TopicManager单例对象的publish方法。由于整系统只一个TopicManager对象,在系统中所有NodeHandle都共享这个方法。
<libros>/ros_comm/roscpp/src/libros/topic_manager.cpp ------ void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) { boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_); if (isShuttingDown()) { return; } // 之前已说过lookupPublicationWithoutLock,它的功能是根据话题名找到Publication。 PublicationPtr p = lookupPublicationWithoutLock(topic); if (p->hasSubscribers() || p->isLatching()) { // 此个publication有订阅者或者advertise(...)是设置latch属性,意味着有订阅者关心这消息 ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence()); // Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can // do a no-copy publish. // 确认要发布给的订阅者是什么类别,如果是进程内的,并且使用者相同的c++类型,就可以进行no-copy模式的publish // 这里定义两个标志nocopy和serialize。 bool nocopy = false; bool serialize = false; // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it if (m.type_info && m.message) { // 非复制版本pushlish会为m设置有效type_info、message。 // 注意这里参数serialize、nocopy都是传值,getPublishTypes可能会修改serialize和nocopy的值 p->getPublishTypes(serialize, nocopy, *m.type_info); } else { serialize = true; } if (!nocopy) { m.message.reset(); m.type_info = 0; } if (serialize || p->isLatching()) { // serfunc是个函数,执行它后,会对msg执行序列化,以下语句会执行序列化msg,结果存在m2。 // 由于可能出现nocopy=true,那时m.message、type_info是有效的,此处只是把和序列化结果相关的字段进行赋值,不整个覆盖m。 SerializedMessage m2 = serfunc(); m.buf = m2.buf; m.num_bytes = m2.num_bytes; m.message_start = m2.message_start; } p->publish(m); // If we're not doing a serialized publish we don't need to signal the pollset. The write() // call inside signal() is actually relatively expensive when doing a nocopy publish. if (serialize) { poll_manager_->getPollSet().signal(); } } else { // 如果publication既没有订阅者,也没有latch属性,就执行自增序列就好,sequence一般是分布式系统的唯一id。 p->incrementSequence(); } }
publication的latch属性。这个属性的含义是是否为订阅者保留最后一次发送的信息。如果该属性设为true,当新的订阅者订阅时,就必然会收到之前发布者所发布的最后一条消息。
针对进程内非复制场景,TopicManager::publish依次执行两个操作。
- p->getPublishTypes(...)判断此次发布是否是进程内非复制。
- p->publish(...),执行发布。
p->getPublishTypes只是根据当前订阅者情况计算出nocopy、serialize,不执行对发布有影响的操作。如果是进程内不复制,执行p->getPublishTypes后,nocopy=true,serialize=false。接下进入p->publish(...)。
<libros>/ros_comm/roscpp/src/libros/publication.cpp ------ void Publication::publish(SerializedMessage& m) { if (m.message) { // nocopy == true boost::mutex::scoped_lock lock(subscriber_links_mutex_); V_SubscriberLink::const_iterator it = subscriber_links_.begin(); V_SubscriberLink::const_iterator end = subscriber_links_.end(); for (; it != end; ++it) { const SubscriberLinkPtr& sub = *it; if (sub->isIntraprocess()) { sub->enqueueMessage(m, false, true); } } m.message.reset(); } if (m.buf) { // serialize || p->isLatching()。 boost::mutex::scoped_lock lock(publish_queue_mutex_); publish_queue_.push_back(m); } }
Publication怎么知道系统中有哪些Subscription正订阅name_这个话题,——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。
sub->enqueueMessage是个虚函数,进程内订阅对应的是IntraProcessSubscriberLink::enqueueMessage。
<libros>/ros_comm/roscpp/src/libros/intraprocess_subscriber_link.cpp ------- void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy) { boost::recursive_mutex::scoped_lock lock(drop_mutex_); if (dropped_) { return; } ROS_ASSERT(subscriber_); subscriber_->handleMessage(m, ser, nocopy); }
在这里最让人难受的就是subscriber_的理解,它的声明出现在intra_processSubscriber_link.h文件中。
IntraProcessPublisherLinkPtr subscriber_;
可以这样理解,发布者向订阅者发布信息时,Publication会先从自己的订阅列表中找出进程内的订阅者IntraProcessSubscriberLink,然后就会让后者进行enqueueMessage操作,这个入队操作到下面实际又交给了IntraProcessPublisherLink对象,调用后者的handleMessage。
<libros>/ros_comm/roscpp/src/libros/intraprocess_publisher_link.cpp ------ void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy) { boost::recursive_mutex::scoped_lock lock(drop_mutex_); if (dropped_) { return; } stats_.bytes_received_ += m.num_bytes; stats_.messages_received_++; SubscriptionPtr parent = parent_.lock(); if (parent) { stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this()); } }
到这里,m这个message就已经成功交给了正订阅Publication.name_的Subscription,接下就是Subscription调用handleMessage处理m。
<libros>/ros_comm/roscpp/src/libros/subscription.cpp ------ uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link) { boost::mutex::scoped_lock lock(callbacks_mutex_); uint32_t drops = 0; // 为提高构建MessageDeserializer效率,用了个缓存:cached_deserializers_。 // 对这topic name_,若只有一个subscribe,会降低些效率,但有多个subscribe时应该会有提升。 cached_deserializers_.clear(); ros::Time receipt_time = ros::Time::now(); for (V_CallbackInfo::iterator cb = callbacks_.begin(); cb != callbacks_.end(); ++cb) { const CallbackInfoPtr& info = *cb; ROS_ASSERT(info->callback_queue_); const std::type_info* ti = &info->helper_->getTypeInfo(); if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info))) { // deserializer(MessageDeserializer)包含了m(SerializedMessage), // 最终放入订阅者的是deserializer,m是包含在了deserializer。 // 虽然放置MessageDeserializer,不等于要对消息进行序列化,见MessageDeserializer::deserialize()。 MessageDeserializerPtr deserializer; V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin(); V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end(); for (; des_it != des_end; ++des_it) { if (*des_it->first == *ti) { deserializer = des_it->second; break; } } if (!deserializer) { deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header); cached_deserializers_.push_back(std::make_pair(ti, deserializer)); } // 到此已有一个包含m的deserializer bool was_full = false; bool nonconst_need_copy = false; if (callbacks_.size() > 1) { nonconst_need_copy = true; } // subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。 // CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full); if (was_full) { ++drops; } else { // 此个消息已放入subscription_queue_,CallbackQueue有新消息到了,CallbackQueue::callAvailable可醒来处理消息了。 info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get()); } } } ... cached_deserializers_.clear(); return drops; }
对同个topic name_,进程内可能存在多次subscribe,每次subscribe会向callbacks_增加一个单元,单元类型是Subscription::CallbackInfo。
<libros>/include/ros/subscription.h ------ class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription> { struct CallbackInfo { // 它就是订阅者callAvailable时的CallbackQueue CallbackQueueInterface* callback_queue_; // Only used if callback_queue_ is non-NULL (NodeHandle API) SubscriptionCallbackHelperPtr helper_; // SubscriptionQueue基类是CallbackInterface。 // CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback SubscriptionQueuePtr subscription_queue_; bool has_tracked_object_; VoidConstWPtr tracked_object_; }; typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr; typedef std::vector<CallbackInfoPtr> V_CallbackInfo; V_CallbackInfo callbacks_; };
callbacks_长度就是进程内针对这个topic name_的subscribe次数,Subscription::handleMessage会枚举这些个subscribe,对每个subscribe依次执行三个操作。
- 生成一个包含m的deserializer。最终放入订阅者的是deserializer。
- info->subscription_queue_->push(...)。把deserializer等信息放入订阅者队列。
- info->callback_queue_->addCallback(...)。此个消息已放入subscription_queue_,意味着CallbackQueue有新消息到了,通知CallbackQueue::callAvailable可醒来处理新消息。
subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback。结合CallbackQueue机制,有人可能会进入个误区:callAvailable从CallbackQueue的callbacks_(这个callbacks_和Subscription的callbacks_是完全不同的两个东西)取出一个cb后,该cb会被删除,那是不是把subscription_queue_销毁了?——不会。callAvailable取出一个cb后,的确会删除该cb,但cb中保存的类型是CallbackInterfacePtr,删除的是boost::shared_ptr<CallbackInterface>,而不是CallbackInterface,删除只是把引用计数减1。总的来说,在callAvailable执行完一次callOneCB后,subscription_queue_不会被删除,删除的是subscription_queue_内消息队列queue_中的一个单元(见后面的“SubscriptionQueue::call”)。和这个删除相对应,之前第二步的SubscriptionQueue::push则是向queue_增加此个单元。
假设message是将要publish的消息,小结publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。
二、SubscriptionQueue::call和反序列化消息
在CallbackQueue::callAvailable上下文,从CallbackQueue的callbacks_取出某个cb后,会调用该cb的call()方法,从而完成回调的具体操作。在topic机制中,cb对应类型是SubscriptionQueue。
<libros>/ros_comm/roscpp/src/libros/subscription_queue.cpp ------ CallbackInterface::CallResult SubscriptionQueue::call() { // The callback may result in our own destruction. Therefore, we may need to keep a reference to ourselves // that outlasts the scoped_try_lock boost::shared_ptr<SubscriptionQueue> self; boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock); if (!allow_concurrent_callbacks_) { lock.try_lock(); if (!lock.owns_lock()) { return CallbackInterface::TryAgain; } } VoidConstPtr tracker; Item i; { boost::mutex::scoped_lock lock(queue_mutex_); if (queue_.empty()) { return CallbackInterface::Invalid; } i = queue_.front(); if (queue_.empty()) { return CallbackInterface::Invalid; } if (i.has_tracked_object) { tracker = i.tracked_object.lock(); if (!tracker) { return CallbackInterface::Invalid; } } queue_.pop_front(); --queue_size_; } VoidConstPtr msg = i.deserializer->deserialize(); // msg can be null here if deserialization failed if (msg) { try { self = shared_from_this(); } catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr {} SubscriptionCallbackHelperCallParams params; params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction()); i.helper->call(params); } return CallbackInterface::Success; }
SubscriptionQueue::call依次会执行三个操作。
- 取出queue_队列的头单元,赋值给i。
- i.deserializer->deserialize()。反序列化出消息的原始数据msg。
- i.helper->call(params)。以msg为参数执行回调。
针对第二步,有人会有误区:一直说non-copy-intra是不序列化消息,这里怎么还是反序列化了?——不要被deserialize这名称误导了,其它没执行反序列化,让进入MessageDeserializer::deserialize()。
<libros>/ros_comm/roscpp/src/libros/message_deserializer.cpp ------ VoidConstPtr MessageDeserializer::deserialize() { boost::mutex::scoped_lock lock(mutex_); if (msg_) { return msg_; } if (serialized_message_.message) { // 由于non-copy-intra时,message总是有效的,于是会进入这里。 // 只是赋值而已,不会执行真正的反序列化操作helper_->deserialize。 msg_ = serialized_message_.message; return msg_; } if (!serialized_message_.buf && serialized_message_.num_bytes > 0) { // If the buffer has been reset it means we tried to deserialize and failed return VoidConstPtr(); } try { SubscriptionCallbackHelperDeserializeParams params; params.buffer = serialized_message_.message_start; params.length = serialized_message_.num_bytes - (serialized_message_.message_start - serialized_message_.buf.get()); params.connection_header = connection_header_; msg_ = helper_->deserialize(params); } catch (std::exception& e) { ROS_ERROR("Exception thrown when deserializing message of length [%d] from [%s]: %s", (uint32_t)serialized_message_.num_bytes, (*connection_header_)["callerid"].c_str(), e.what()); } serialized_message_.buf.reset(); return msg_; }
由于non-copy-intra时,serialized_message_.message总是有效,于是“msg_ = serialized_message_.message”,不会执行真正的反序列化操作helper_->deserialize。