- Ros规则:相同话题名必须相同消息类型。
- 如何判断advertiese和subscribe是进程内关系?——同一进程共享内存空间,单例对象TopicManager记载着每次advertiese、subscribe。通过它们是否拥有相同的话题名称,并具有着相同的消息类型(用消息中md5sum字段),就可判断了。
- 假设message是将要publish的消息,以下是publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。
- Publication怎么知道系统中有哪些Subscription正订阅name_这个话题?——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。即使进程内有多次针对该topic name_的订阅,subscriber_links_也只一个单元和它们相对应,和这些多次对应的是Subscription中的callbacks_。
- 支持在一进程内发布和订阅同名、同类型topic,这是nocopy-intra模式的基础。但在用它时,开发者需清晰知道这样做的结果,避免自已发送的被自已收到,造成无意义轮回。
在分析Publisher::publish源码前,让先看下Ros的一条规则:相同话题名必须相同消息类型。看TopicManager是如何利用这条规则存储advertise、subscribe的调用信息。
一、相同话题名必须相同消息类型
Ros中有这么条规则:系统内如果两次advertise同一个话题名,那这两次必须是相同的消息类型。举个例子,第一次advertise时,advertise的了话题名“/scan”,消息类型是sensor_msgs::LaserScan,那第二次advertise话题“/scan”时,消息类型必须是sensor_msgs::LaserScan。

TopicManager是进程用来管理topic通信的类,采用单例设计模式,也就是说在同一个进程中,仅有一个TopicManager类对象,它管理了所有的topic相关的函数调用,它的初始化是在init函数。图1显示了它的两个成员变量advertised_topics_和subscriptions_的即时值。
std::vector<boost::shared_ptr<Publication> > advertised_topics_; std::list<boost::shared_ptr<Subscription> > subscriptions_;
advertised_topics_存储着该进程advertise过的话题信息,subscriptions_则存储着该进程subscribe过的话题信息。在图1时刻,该进程已累计执行过三种advertiesed,分别是/rosout、/cmd_ver和/chatter;累计执行过4种subscribe,分别是/map、/tf、/tf_static和、/chatter。
可以多次advertise同一个话题。这时advertise第二个时,不会改修TopicManager::advertised_topics_,但该话题名对应的Publication的callbacks_长度会增1。
可以多次subscribe同一个话题。这时subscribe第二个时,不会改修TopicManager::subscriptions_,但该话题名对应的Subscription的callbacks_长度会增1,即callbacks_长度等于该进程内对同种话题订阅次数。一个进程内多次订阅同种话题,这是很容易发生的。举个例子,move_base_node要知道tf树,它就要订阅/tf话题,cartographer_node也想知道tf树,那它也要订阅/tf,这样callbacks_中的单元就至少两个了。
可以间接把话题名当做advertised_topics_、subscriptions_的key,以下是TopicManager提供的以话题名查找Publication的lookupPublicationWithoutLock。
PublicationPtr TopicManager::lookupPublicationWithoutLock(const string &topic)
{
PublicationPtr t;
for (V_Publication::iterator i = advertised_topics_.begin();
!t && i != advertised_topics_.end(); ++i)
{
if (((*i)->getName() == topic) && (!(*i)->isDropped()))
{
t = *i;
break;
}
}
return t;
}
二、Publisher::publish流程

本文重点讨论进程内且非复制的Publish。
<libros>/include/ros/publisher.h
------
class ROSCPP_DECL Publisher
{
template <typename M>
void publish(const boost::shared_ptr<M>& message) const
{
using namespace serialization;
if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher");
return;
}
if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
ROS_ASSERT_MSG(impl_->md5sum_ == "*" || std::string(mt::md5sum<M>(*message)) == "*" || impl_->md5sum_ == mt::md5sum<M>(*message),
"Trying to publish message of type [%s/%s] on a publisher with type [%s/%s]",
mt::datatype<M>(*message), mt::md5sum<M>(*message),
impl_->datatype_.c_str(), impl_->md5sum_.c_str());
SerializedMessage m;
m.type_info = &typeid(M);
m.message = message;
publish(boost::bind(serializeMessage<M>, boost::ref(*message)), m);
}
}NodeHandle.advertise注册发布者时得到一个Publisher对象。Publisher有两种public的publish方法声明:void publish(const M& message)和这里贴出的void publish(const boost::shared_ptr<M>& message)。对第一种,若不作修改,会强制使用序列化message逻辑。具体是到了调用TopicManager::publish(...)时,会进入“serialize = true”那个入口,这将导致序列化消息。为让第一种也能做到不执行序列化,代码中加了“if (ros::nocopy_intra())”判断,判断是true时改去执行这里贴出的以shared_ptr类型的传参版本publish。此个publish是针对进程内非复制,接下就要调用prviate通用版publish。调用时须生成个参数m,m的type_info、message赋上有效值,但buf值是nullptr。准备好m后,调用通用版publish。
<libros>/ros_comm/roscpp/src/libros/publisher.cpp
------
@serfunc:函数,执行它后,会对msg执行序列化。
@m:包含有msg的SerializedMessage。虽然用的类型是“已序列化消息”,但其实没执行序列化,只是准备好了序化时须要参数type_info、message。
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
if (!impl_)
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
if (!impl_->isValid())
{
ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
return;
}
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
if (isLatched()) {
boost::mutex::scoped_lock lock(impl_->last_message_mutex_);
impl_->last_message_ = m;
}
}在确定此个Publisher已调用过有效的advertise后(impl_->isValid()==true),调用TopicManager单例对象的publish方法。由于整系统只一个TopicManager对象,在系统中所有NodeHandle都共享这个方法。
<libros>/ros_comm/roscpp/src/libros/topic_manager.cpp
------
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
if (isShuttingDown())
{
return;
}
// 之前已说过lookupPublicationWithoutLock,它的功能是根据话题名找到Publication。
PublicationPtr p = lookupPublicationWithoutLock(topic);
if (p->hasSubscribers() || p->isLatching())
{
// 此个publication有订阅者或者advertise(...)是设置latch属性,意味着有订阅者关心这消息
ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
// Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
// do a no-copy publish.
// 确认要发布给的订阅者是什么类别,如果是进程内的,并且使用者相同的c++类型,就可以进行no-copy模式的publish
// 这里定义两个标志nocopy和serialize。
bool nocopy = false;
bool serialize = false;
// We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
if (m.type_info && m.message)
{
// 非复制版本pushlish会为m设置有效type_info、message。
// 注意这里参数serialize、nocopy都是传值,getPublishTypes可能会修改serialize和nocopy的值
p->getPublishTypes(serialize, nocopy, *m.type_info);
}
else
{
serialize = true;
}
if (!nocopy)
{
m.message.reset();
m.type_info = 0;
}
if (serialize || p->isLatching())
{
// serfunc是个函数,执行它后,会对msg执行序列化,以下语句会执行序列化msg,结果存在m2。
// 由于可能出现nocopy=true,那时m.message、type_info是有效的,此处只是把和序列化结果相关的字段进行赋值,不整个覆盖m。
SerializedMessage m2 = serfunc();
m.buf = m2.buf;
m.num_bytes = m2.num_bytes;
m.message_start = m2.message_start;
}
p->publish(m);
// If we're not doing a serialized publish we don't need to signal the pollset. The write()
// call inside signal() is actually relatively expensive when doing a nocopy publish.
if (serialize)
{
poll_manager_->getPollSet().signal();
}
}
else
{
// 如果publication既没有订阅者,也没有latch属性,就执行自增序列就好,sequence一般是分布式系统的唯一id。
p->incrementSequence();
}
}publication的latch属性。这个属性的含义是是否为订阅者保留最后一次发送的信息。如果该属性设为true,当新的订阅者订阅时,就必然会收到之前发布者所发布的最后一条消息。
针对进程内非复制场景,TopicManager::publish依次执行两个操作。
- p->getPublishTypes(...)判断此次发布是否是进程内非复制。
- p->publish(...),执行发布。
p->getPublishTypes只是根据当前订阅者情况计算出nocopy、serialize,不执行对发布有影响的操作。如果是进程内不复制,执行p->getPublishTypes后,nocopy=true,serialize=false。接下进入p->publish(...)。
<libros>/ros_comm/roscpp/src/libros/publication.cpp
------
void Publication::publish(SerializedMessage& m)
{
if (m.message)
{
// nocopy == true
boost::mutex::scoped_lock lock(subscriber_links_mutex_);
V_SubscriberLink::const_iterator it = subscriber_links_.begin();
V_SubscriberLink::const_iterator end = subscriber_links_.end();
for (; it != end; ++it)
{
const SubscriberLinkPtr& sub = *it;
if (sub->isIntraprocess())
{
sub->enqueueMessage(m, false, true);
}
}
m.message.reset();
}
if (m.buf)
{
// serialize || p->isLatching()。
boost::mutex::scoped_lock lock(publish_queue_mutex_);
publish_queue_.push_back(m);
}
}Publication怎么知道系统中有哪些Subscription正订阅name_这个话题,——通过subscriber_links_。subscriber_links_是个std::vector,既存储着进程内产生的订阅,也存储着进程间产生的订阅。是进程内时,实现的类是IntraProcessSubscriberLink;是进程间时,实现的类是TransportSubscriberLink。
sub->enqueueMessage是个虚函数,进程内订阅对应的是IntraProcessSubscriberLink::enqueueMessage。
<libros>/ros_comm/roscpp/src/libros/intraprocess_subscriber_link.cpp
-------
void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
boost::recursive_mutex::scoped_lock lock(drop_mutex_);
if (dropped_)
{
return;
}
ROS_ASSERT(subscriber_);
subscriber_->handleMessage(m, ser, nocopy);
}在这里最让人难受的就是subscriber_的理解,它的声明出现在intra_processSubscriber_link.h文件中。
IntraProcessPublisherLinkPtr subscriber_;
可以这样理解,发布者向订阅者发布信息时,Publication会先从自己的订阅列表中找出进程内的订阅者IntraProcessSubscriberLink,然后就会让后者进行enqueueMessage操作,这个入队操作到下面实际又交给了IntraProcessPublisherLink对象,调用后者的handleMessage。
<libros>/ros_comm/roscpp/src/libros/intraprocess_publisher_link.cpp
------
void IntraProcessPublisherLink::handleMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
boost::recursive_mutex::scoped_lock lock(drop_mutex_);
if (dropped_)
{
return;
}
stats_.bytes_received_ += m.num_bytes;
stats_.messages_received_++;
SubscriptionPtr parent = parent_.lock();
if (parent)
{
stats_.drops_ += parent->handleMessage(m, ser, nocopy, header_.getValues(), shared_from_this());
}
}到这里,m这个message就已经成功交给了正订阅Publication.name_的Subscription,接下就是Subscription调用handleMessage处理m。
<libros>/ros_comm/roscpp/src/libros/subscription.cpp
------
uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
{
boost::mutex::scoped_lock lock(callbacks_mutex_);
uint32_t drops = 0;
// 为提高构建MessageDeserializer效率,用了个缓存:cached_deserializers_。
// 对这topic name_,若只有一个subscribe,会降低些效率,但有多个subscribe时应该会有提升。
cached_deserializers_.clear();
ros::Time receipt_time = ros::Time::now();
for (V_CallbackInfo::iterator cb = callbacks_.begin();
cb != callbacks_.end(); ++cb)
{
const CallbackInfoPtr& info = *cb;
ROS_ASSERT(info->callback_queue_);
const std::type_info* ti = &info->helper_->getTypeInfo();
if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
{
// deserializer(MessageDeserializer)包含了m(SerializedMessage),
// 最终放入订阅者的是deserializer,m是包含在了deserializer。
// 虽然放置MessageDeserializer,不等于要对消息进行序列化,见MessageDeserializer::deserialize()。
MessageDeserializerPtr deserializer;
V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
for (; des_it != des_end; ++des_it)
{
if (*des_it->first == *ti)
{
deserializer = des_it->second;
break;
}
}
if (!deserializer)
{
deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
cached_deserializers_.push_back(std::make_pair(ti, deserializer));
}
// 到此已有一个包含m的deserializer
bool was_full = false;
bool nonconst_need_copy = false;
if (callbacks_.size() > 1)
{
nonconst_need_copy = true;
}
// subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。
// CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);
if (was_full)
{
++drops;
}
else
{
// 此个消息已放入subscription_queue_,CallbackQueue有新消息到了,CallbackQueue::callAvailable可醒来处理消息了。
info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
}
}
}
...
cached_deserializers_.clear();
return drops;
}对同个topic name_,进程内可能存在多次subscribe,每次subscribe会向callbacks_增加一个单元,单元类型是Subscription::CallbackInfo。
<libros>/include/ros/subscription.h
------
class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscription>
{
struct CallbackInfo
{
// 它就是订阅者callAvailable时的CallbackQueue
CallbackQueueInterface* callback_queue_;
// Only used if callback_queue_ is non-NULL (NodeHandle API)
SubscriptionCallbackHelperPtr helper_;
// SubscriptionQueue基类是CallbackInterface。
// CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
SubscriptionQueuePtr subscription_queue_;
bool has_tracked_object_;
VoidConstWPtr tracked_object_;
};
typedef boost::shared_ptr<CallbackInfo> CallbackInfoPtr;
typedef std::vector<CallbackInfoPtr> V_CallbackInfo;
V_CallbackInfo callbacks_;
};callbacks_长度就是进程内针对这个topic name_的subscribe次数,Subscription::handleMessage会枚举这些个subscribe,对每个subscribe依次执行三个操作。
- 生成一个包含m的deserializer。最终放入订阅者的是deserializer。
- info->subscription_queue_->push(...)。把deserializer等信息放入订阅者队列。
- info->callback_queue_->addCallback(...)。此个消息已放入subscription_queue_,意味着CallbackQueue有新消息到了,通知CallbackQueue::callAvailable可醒来处理新消息。
subscription_queue_类型是boost::shared_ptr<SubscriptionQueue>,SubscriptionQueue基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback。结合CallbackQueue机制,有人可能会进入个误区:callAvailable从CallbackQueue的callbacks_(这个callbacks_和Subscription的callbacks_是完全不同的两个东西)取出一个cb后,该cb会被删除,那是不是把subscription_queue_销毁了?——不会。callAvailable取出一个cb后,的确会删除该cb,但cb中保存的类型是CallbackInterfacePtr,删除的是boost::shared_ptr<CallbackInterface>,而不是CallbackInterface,删除只是把引用计数减1。总的来说,在callAvailable执行完一次callOneCB后,subscription_queue_不会被删除,删除的是subscription_queue_内消息队列queue_中的一个单元(见后面的“SubscriptionQueue::call”)。和这个删除相对应,之前第二步的SubscriptionQueue::push则是向queue_增加此个单元。
假设message是将要publish的消息,小结publish流程。用message生成类型SerializedMessage的m。以m为参数调用TopicManager::publish。TopicManager找出该topic name_对应的Publication。Publication的subscriber_links_存储着当前系统中有哪些Subscription正订阅name_这个话题。由此可找到正订阅该name_的Subscription,调用Subscription::handleMessage。针对这个topic name_,进程内可能存在多次subscribe,handleMessage会逐个处理。具体到每一次,先由m生成的deserializer,把deserializer等信息放入订阅者队列,然后通知CallbackQueue::callAvailable可醒来处理新消息了。
二、SubscriptionQueue::call和反序列化消息
在CallbackQueue::callAvailable上下文,从CallbackQueue的callbacks_取出某个cb后,会调用该cb的call()方法,从而完成回调的具体操作。在topic机制中,cb对应类型是SubscriptionQueue。
<libros>/ros_comm/roscpp/src/libros/subscription_queue.cpp
------
CallbackInterface::CallResult SubscriptionQueue::call()
{
// The callback may result in our own destruction. Therefore, we may need to keep a reference to ourselves
// that outlasts the scoped_try_lock
boost::shared_ptr<SubscriptionQueue> self;
boost::recursive_mutex::scoped_try_lock lock(callback_mutex_, boost::defer_lock);
if (!allow_concurrent_callbacks_)
{
lock.try_lock();
if (!lock.owns_lock())
{
return CallbackInterface::TryAgain;
}
}
VoidConstPtr tracker;
Item i;
{
boost::mutex::scoped_lock lock(queue_mutex_);
if (queue_.empty())
{
return CallbackInterface::Invalid;
}
i = queue_.front();
if (queue_.empty())
{
return CallbackInterface::Invalid;
}
if (i.has_tracked_object)
{
tracker = i.tracked_object.lock();
if (!tracker)
{
return CallbackInterface::Invalid;
}
}
queue_.pop_front();
--queue_size_;
}
VoidConstPtr msg = i.deserializer->deserialize();
// msg can be null here if deserialization failed
if (msg)
{
try
{
self = shared_from_this();
}
catch (boost::bad_weak_ptr&) // For the tests, where we don't create a shared_ptr
{}
SubscriptionCallbackHelperCallParams params;
params.event = MessageEvent<void const>(msg, i.deserializer->getConnectionHeader(), i.receipt_time, i.nonconst_need_copy, MessageEvent<void const>::CreateFunction());
i.helper->call(params);
}
return CallbackInterface::Success;
}SubscriptionQueue::call依次会执行三个操作。
- 取出queue_队列的头单元,赋值给i。
- i.deserializer->deserialize()。反序列化出消息的原始数据msg。
- i.helper->call(params)。以msg为参数执行回调。
针对第二步,有人会有误区:一直说non-copy-intra是不序列化消息,这里怎么还是反序列化了?——不要被deserialize这名称误导了,其它没执行反序列化,让进入MessageDeserializer::deserialize()。
<libros>/ros_comm/roscpp/src/libros/message_deserializer.cpp
------
VoidConstPtr MessageDeserializer::deserialize()
{
boost::mutex::scoped_lock lock(mutex_);
if (msg_)
{
return msg_;
}
if (serialized_message_.message)
{
// 由于non-copy-intra时,message总是有效的,于是会进入这里。
// 只是赋值而已,不会执行真正的反序列化操作helper_->deserialize。
msg_ = serialized_message_.message;
return msg_;
}
if (!serialized_message_.buf && serialized_message_.num_bytes > 0)
{
// If the buffer has been reset it means we tried to deserialize and failed
return VoidConstPtr();
}
try
{
SubscriptionCallbackHelperDeserializeParams params;
params.buffer = serialized_message_.message_start;
params.length = serialized_message_.num_bytes - (serialized_message_.message_start - serialized_message_.buf.get());
params.connection_header = connection_header_;
msg_ = helper_->deserialize(params);
}
catch (std::exception& e)
{
ROS_ERROR("Exception thrown when deserializing message of length [%d] from [%s]: %s", (uint32_t)serialized_message_.num_bytes, (*connection_header_)["callerid"].c_str(), e.what());
}
serialized_message_.buf.reset();
return msg_;
}由于non-copy-intra时,serialized_message_.message总是有效,于是“msg_ = serialized_message_.message”,不会执行真正的反序列化操作helper_->deserialize。