- 如果server没在运行,client的call立即失败,没有等待时间。
本文重点在ServiceClient::call,如果要想了解其它细节,像ServiceServer创建及初始化,可参考“ros源码分析(四)service的实时反馈机制”。
一、ServiceClient::call流程

- [cient-thread]以service名称为参数在rosmaster查找该服务提供者的service_url,如果没找到该service_url,返回false。
- [cient-thread]把call请求写入Connection::write_buffer_。然后就等待info->finished_condition_信号量。
- [client进程中的PollManager-thread]向service_url发送存储在write_buffer_中的call请求。
- [server进程中的PollManager-thread]收到call请求,以请求数据生成ServiceCallback对象,把它放入server-thread的CallbackQueue。通知callAvailable可醒来处理新消息。
- [server-thread]在callAvailable内,callOneCB调用ServiceCallback::call,内中会调用app注册的cb函数,得到计算结果。
- [server进程中的PollManager-thread]得到ServiceCallback::call的计算结果后,把结果发向client进程。
- [client进程中的PollManager-thread]收到了server发来的处理结果,置finished_condition_有信号。
- [cient-thread]检测到finished_condition_有信号,结束第二步的阻塞状态,把计算结果作为resp,反馈到app。
1.1、client-thread
client线程发起call。参数req是计算请求,resp存储着call的结果。call是阻塞式调用,即要等到计算出结果后才会退出。
<libros>/include/ros/service_client.h ------ @req。[IN]req需要填写的请求,到此处时已序化为字节流格式。 @resp。[out]字节流格式表示的计算结果。后绪需要把它反序列为MRes类型的resp。 bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp, const std::string& service_md5sum) { if (service_md5sum != impl_->service_md5sum_) { ROS_ERROR("Call to service [%s] with md5sum [%s] does not match md5sum when the handle was created ([%s])", impl_->name_.c_str(), service_md5sum.c_str(), impl_->service_md5sum_.c_str()); return false; } ServiceServerLinkPtr link; if (impl_->persistent_) { if (!impl_->server_link_) { impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_); if (!impl_->server_link_) { return false; } } link = impl_->server_link_; } else { // impl_->persistent_默认是false,进这个入口 link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_); if (!link) { return false; } } bool ret = link->call(req, resp); // 上面的link->call是阻塞式调用,一直要有结果后才会退出。到此处,表示已得到计算结果了。 link.reset(); // If we're shutting down but the node haven't finished yet, wait until we do while (ros::isShuttingDown() && ros::ok()) { ros::WallDuration(0.001).sleep(); } return ret; } impl_->persistent_默认是false,即对每次call都会创建个新的ServiceServerLink,此次call结束后就会销毁。call依次有两个操作。 1)createServiceServerLink。为此次call创建一个ServiceServerLink对象。 2)link->call(req, resp)。内中会执行finished_condition_.wait(lock),client将在此等待,直到server执行完。 让进入第一个操作:createServiceServerLink。 ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent, const std::string& request_md5sum, const std::string& response_md5sum, const M_string& header_values) { boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_); if (shutting_down_) { return ServiceServerLinkPtr(); } uint32_t serv_port; std::string serv_host; if (!lookupService(service, serv_host, serv_port)) { return ServiceServerLinkPtr(); } TransportTCPPtr transport(boost::make_shared<TransportTCP>(&poll_manager_->getPollSet())); // Make sure to initialize the connection *before* transport->connect() // is called, otherwise we might miss a connect error (see #434). ConnectionPtr connection(boost::make_shared<Connection>()); connection_manager_->addConnection(connection); connection->initialize(transport, false, HeaderReceivedFunc()); if (transport->connect(serv_host, serv_port)) { ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values)); { boost::mutex::scoped_lock lock(service_server_links_mutex_); service_server_links_.push_back(client); } client->initialize(connection); return client; } ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port); return ServiceServerLinkPtr(); }
函数首先调用lookupService(service, serv_host, serv_port),在rosmaster中参数名称是“serivce”的service,serv_host、serv_port存储着service所在node的service_url,后续请求就发向该url。
因为要进网络收发了,须创建处理此次收发的TransportTCP、Connection对象。transport连接service_url成功后,创建ServiceServerLink,并调用它的initialize方法。
一次call请求包括两部分数据:请求头(Header)和消息,在ServiceServerLink::initialize,会把请求头写入PollManager线程的write_buffer_。

要能网络发送,必须序列化call请求,即把请求变成字节流。消息部分有标准方法,图2中显示了如何序列化请求头。“Watch 1”中key_vals变量是std::map格式请求头。“Memory 1”是字节流表示的请求头,头4个字节是请求头长度。序列化出请求头后,initialize会执行断点指向的write(...),把请求头写入PollManager线程的write_buffer_。
<libros>/ros_comm/roscpp/src/libros/connection.cpp ------ void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate) { if (dropped_ || sending_header_error_) { return; } { boost::mutex::scoped_lock lock(write_callback_mutex_); ROS_ASSERT(!write_callback_); write_callback_ = callback; write_buffer_ = buffer; write_size_ = size; write_sent_ = 0; has_write_callback_ = 1; } transport_->enableWrite(); if (immediate) { // write immediately if possible writeTransport(); } }
这边把请求头写入PollManager线程的write_buffer_,那边PollManager线程检测到write_buffer_有数据,启动把请求发送到service-url逻辑。
1.2 server进程中的PollManager-thread
socket线程,socket上有可读的数据后,会调用ros::Connection::readTransport(),后者会调用自定义回调函数是callback,在server时,callback指向的是ServiceClientLink::onRequest。
<libros>/ros_comm/roscpp/src/libros/service_client_link.cpp ------ @buffer。序列化后的MReq。 @size。序例化后MReq字节数。 void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success) { (void)conn; if (!success) return; ROS_ASSERT(conn == connection_); if (ServicePublicationPtr parent = parent_.lock()) { parent->processRequest(buffer, size, shared_from_this()); } else { ROS_BREAK(); } }
onRequest是ServiceClientLink的成员函数,注意是ServiceClientLink,和“1.1、client-thread”中的ServiceServerLink是两回事。parent_的类型是boost::weak_ptr<ServicePublication>。
<libros>/ros_comm/roscpp/src/libros/service_publication.cpp ------ void ServicePublication::processRequest(boost::shared_array<uint8_t> buf, size_t num_bytes, const ServiceClientLinkPtr& link) { CallbackInterfacePtr cb(boost::make_shared<ServiceCallback>(helper_, buf, num_bytes, link, has_tracked_object_, tracked_object_)); callback_queue_->addCallback(cb, (uint64_t)this); }
- 构造ServiceCallback类型对象。ServiceCallback基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
- callback_queue_->addCallback(...)。CallbackQueue有新消息到了,通知在server-thread运行的CallbackQueue::callAvailable可醒来处理新消息。
至此call请求已放入CallbackQueue,PollManager-thread需要一直等,直到计算出此次的结果(MResp)。但CallbackQueue::callAvailable是在server-thread执行,PollManager-thread如何知道server-thread已算出MResp?——server-thread调用app注册的cb算出MResp后,会把序列化后的MResp写入PollManager的Connection::write_buffer_,PollManager线程检测到write_buffer_有数据,就知道计算已得出MResp,而且write_buffer_存的就是序列化后的MResp。
1.3 server-thread
server-thread执行标准CallbackQuery流程:由CallbackQueue::callAvailable调用callOneCB,callOneCB调用CallbackInterface::call。针对service机制,就是ServiceCallback::call,call中则调用app设定的cb函数。callOneCB在退出前,会析构掉ServiceCallback,即此个ServiceCallback是在server进程中的PollManager-thread构造,在server-thread析构。
<libros>/ros_comm/roscpp/src/libros/service_publication.cpp ------ class ServiceCallback : public CallbackInterface { public: ... virtual CallResult call() { if (link_->getConnection()->isDropped()) { return Invalid; } VoidConstPtr tracker; if (has_tracked_object_) { tracker = tracked_object_.lock(); if (!tracker) { SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0); link_->processResponse(false, res); return Invalid; } } ServiceCallbackHelperCallParams params; // 由buffer_, num_bytes_反序列化出MReq。 params.request = SerializedMessage(buffer_, num_bytes_); // 这个connection_header就是call请求的消息头,即图2中的“key_vals”变量。 params.connection_header = link_->getConnection()->getHeader().getValues(); try { bool ok = helper_->call(params); if (ok) { link_->processResponse(true, params.response); } else { SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0); link_->processResponse(false, res); } } catch (std::exception& e) { ... } return Success; } ... };
call()依次做两个操作。
- SerializedMessage。反序列化出MReq,结果放在params.request。
- helper_->call(params)。调用app设置的cb,计算MResp。
- link_->processResponse(...)。把序列化后的MResp写入PollManager的Connection::write_buffer_,“唤起”PollManager-thread进入得到MResp后的逻辑。
<libros>/include/ros/service_callback_helper.h ------ template<typename MReq, typename MRes> struct ServiceSpec { typedef MReq RequestType; typedef MRes ResponseType; typedef boost::shared_ptr<RequestType> RequestPtr; typedef boost::shared_ptr<ResponseType> ResponsePtr; typedef boost::function<bool(RequestType&, ResponseType&)> CallbackType; static bool call(const CallbackType& cb, ServiceSpecCallParams<RequestType, ResponseType>& params) { return cb(*params.request, *params.response); } }; class ServiceCallbackHelperT : public ServiceCallbackHelper { virtual bool call(ServiceCallbackHelperCallParams& params) { namespace ser = serialization; RequestPtr req(create_req_()); ResponsePtr res(create_res_()); ser::deserializeMessage(params.request, *req); ServiceSpecCallParams<RequestType, ResponseType> call_params; call_params.request = req; call_params.response = res; call_params.connection_header = params.connection_header; bool ok = Spec::call(callback_, call_params); params.response = ser::serializeServiceResponse(ok, *res); return ok; } };
ServiceCallbackHelperT::call会调用Spec::call,是Spec::call最终调用app的cb,在这过程中并没有用到connection_header。得到MResp后,立刻序列化,序列化后的结果放在params.response。要注意,这里序列化用的是ser::serializeServiceResponse,不是ser::serializeMessage。序列化后也是字节流,但这字节流更适合后面的网络传输,不能直接用于标准反序列化操作ser::deserializeMessage。