- 如果server没在运行,client的call立即失败,没有等待时间。
本文重点在ServiceClient::call,如果要想了解其它细节,像ServiceServer创建及初始化,可参考“ros源码分析(四)service的实时反馈机制”。
一、ServiceClient::call流程

- [cient-thread]以service名称为参数在rosmaster查找该服务提供者的service_url,如果没找到该service_url,返回false。
- [cient-thread]把call请求写入Connection::write_buffer_。然后就等待info->finished_condition_信号量。
- [client进程中的PollManager-thread]向service_url发送存储在write_buffer_中的call请求。
- [server进程中的PollManager-thread]收到call请求,以请求数据生成ServiceCallback对象,把它放入server-thread的CallbackQueue。通知callAvailable可醒来处理新消息。
- [server-thread]在callAvailable内,callOneCB调用ServiceCallback::call,内中会调用app注册的cb函数,得到计算结果。
- [server进程中的PollManager-thread]得到ServiceCallback::call的计算结果后,把结果发向client进程。
- [client进程中的PollManager-thread]收到了server发来的处理结果,置finished_condition_有信号。
- [cient-thread]检测到finished_condition_有信号,结束第二步的阻塞状态,把计算结果作为resp,反馈到app。
1.1、client-thread
client线程发起call。参数req是计算请求,resp存储着call的结果。call是阻塞式调用,即要等到计算出结果后才会退出。
<libros>/include/ros/service_client.h
------
@req。[IN]req需要填写的请求,到此处时已序化为字节流格式。
@resp。[out]字节流格式表示的计算结果。后绪需要把它反序列为MRes类型的resp。
bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp, const std::string& service_md5sum)
{
if (service_md5sum != impl_->service_md5sum_)
{
ROS_ERROR("Call to service [%s] with md5sum [%s] does not match md5sum when the handle was created ([%s])", impl_->name_.c_str(), service_md5sum.c_str(), impl_->service_md5sum_.c_str());
return false;
}
ServiceServerLinkPtr link;
if (impl_->persistent_)
{
if (!impl_->server_link_)
{
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
if (!impl_->server_link_)
{
return false;
}
}
link = impl_->server_link_;
}
else
{
// impl_->persistent_默认是false,进这个入口
link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
if (!link)
{
return false;
}
}
bool ret = link->call(req, resp);
// 上面的link->call是阻塞式调用,一直要有结果后才会退出。到此处,表示已得到计算结果了。
link.reset();
// If we're shutting down but the node haven't finished yet, wait until we do
while (ros::isShuttingDown() && ros::ok())
{
ros::WallDuration(0.001).sleep();
}
return ret;
}
impl_->persistent_默认是false,即对每次call都会创建个新的ServiceServerLink,此次call结束后就会销毁。call依次有两个操作。
1)createServiceServerLink。为此次call创建一个ServiceServerLink对象。
2)link->call(req, resp)。内中会执行finished_condition_.wait(lock),client将在此等待,直到server执行完。
让进入第一个操作:createServiceServerLink。
ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent,
const std::string& request_md5sum, const std::string& response_md5sum,
const M_string& header_values)
{
boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
if (shutting_down_)
{
return ServiceServerLinkPtr();
}
uint32_t serv_port;
std::string serv_host;
if (!lookupService(service, serv_host, serv_port))
{
return ServiceServerLinkPtr();
}
TransportTCPPtr transport(boost::make_shared<TransportTCP>(&poll_manager_->getPollSet()));
// Make sure to initialize the connection *before* transport->connect()
// is called, otherwise we might miss a connect error (see #434).
ConnectionPtr connection(boost::make_shared<Connection>());
connection_manager_->addConnection(connection);
connection->initialize(transport, false, HeaderReceivedFunc());
if (transport->connect(serv_host, serv_port))
{
ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values));
{
boost::mutex::scoped_lock lock(service_server_links_mutex_);
service_server_links_.push_back(client);
}
client->initialize(connection);
return client;
}
ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port);
return ServiceServerLinkPtr();
}函数首先调用lookupService(service, serv_host, serv_port),在rosmaster中参数名称是“serivce”的service,serv_host、serv_port存储着service所在node的service_url,后续请求就发向该url。
因为要进网络收发了,须创建处理此次收发的TransportTCP、Connection对象。transport连接service_url成功后,创建ServiceServerLink,并调用它的initialize方法。
一次call请求包括两部分数据:请求头(Header)和消息,在ServiceServerLink::initialize,会把请求头写入PollManager线程的write_buffer_。

要能网络发送,必须序列化call请求,即把请求变成字节流。消息部分有标准方法,图2中显示了如何序列化请求头。“Watch 1”中key_vals变量是std::map格式请求头。“Memory 1”是字节流表示的请求头,头4个字节是请求头长度。序列化出请求头后,initialize会执行断点指向的write(...),把请求头写入PollManager线程的write_buffer_。
<libros>/ros_comm/roscpp/src/libros/connection.cpp
------
void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
{
if (dropped_ || sending_header_error_)
{
return;
}
{
boost::mutex::scoped_lock lock(write_callback_mutex_);
ROS_ASSERT(!write_callback_);
write_callback_ = callback;
write_buffer_ = buffer;
write_size_ = size;
write_sent_ = 0;
has_write_callback_ = 1;
}
transport_->enableWrite();
if (immediate)
{
// write immediately if possible
writeTransport();
}
}
这边把请求头写入PollManager线程的write_buffer_,那边PollManager线程检测到write_buffer_有数据,启动把请求发送到service-url逻辑。
1.2 server进程中的PollManager-thread
socket线程,socket上有可读的数据后,会调用ros::Connection::readTransport(),后者会调用自定义回调函数是callback,在server时,callback指向的是ServiceClientLink::onRequest。
<libros>/ros_comm/roscpp/src/libros/service_client_link.cpp
------
@buffer。序列化后的MReq。
@size。序例化后MReq字节数。
void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
(void)conn;
if (!success)
return;
ROS_ASSERT(conn == connection_);
if (ServicePublicationPtr parent = parent_.lock())
{
parent->processRequest(buffer, size, shared_from_this());
}
else
{
ROS_BREAK();
}
}onRequest是ServiceClientLink的成员函数,注意是ServiceClientLink,和“1.1、client-thread”中的ServiceServerLink是两回事。parent_的类型是boost::weak_ptr<ServicePublication>。
<libros>/ros_comm/roscpp/src/libros/service_publication.cpp
------
void ServicePublication::processRequest(boost::shared_array<uint8_t> buf, size_t num_bytes, const ServiceClientLinkPtr& link)
{
CallbackInterfacePtr cb(boost::make_shared<ServiceCallback>(helper_, buf, num_bytes, link, has_tracked_object_, tracked_object_));
callback_queue_->addCallback(cb, (uint64_t)this);
}- 构造ServiceCallback类型对象。ServiceCallback基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
- callback_queue_->addCallback(...)。CallbackQueue有新消息到了,通知在server-thread运行的CallbackQueue::callAvailable可醒来处理新消息。
至此call请求已放入CallbackQueue,PollManager-thread需要一直等,直到计算出此次的结果(MResp)。但CallbackQueue::callAvailable是在server-thread执行,PollManager-thread如何知道server-thread已算出MResp?——server-thread调用app注册的cb算出MResp后,会把序列化后的MResp写入PollManager的Connection::write_buffer_,PollManager线程检测到write_buffer_有数据,就知道计算已得出MResp,而且write_buffer_存的就是序列化后的MResp。
1.3 server-thread
server-thread执行标准CallbackQuery流程:由CallbackQueue::callAvailable调用callOneCB,callOneCB调用CallbackInterface::call。针对service机制,就是ServiceCallback::call,call中则调用app设定的cb函数。callOneCB在退出前,会析构掉ServiceCallback,即此个ServiceCallback是在server进程中的PollManager-thread构造,在server-thread析构。
<libros>/ros_comm/roscpp/src/libros/service_publication.cpp
------
class ServiceCallback : public CallbackInterface
{
public:
...
virtual CallResult call()
{
if (link_->getConnection()->isDropped())
{
return Invalid;
}
VoidConstPtr tracker;
if (has_tracked_object_)
{
tracker = tracked_object_.lock();
if (!tracker)
{
SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
link_->processResponse(false, res);
return Invalid;
}
}
ServiceCallbackHelperCallParams params;
// 由buffer_, num_bytes_反序列化出MReq。
params.request = SerializedMessage(buffer_, num_bytes_);
// 这个connection_header就是call请求的消息头,即图2中的“key_vals”变量。
params.connection_header = link_->getConnection()->getHeader().getValues();
try
{
bool ok = helper_->call(params);
if (ok)
{
link_->processResponse(true, params.response);
}
else
{
SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
link_->processResponse(false, res);
}
}
catch (std::exception& e)
{
...
}
return Success;
}
...
};call()依次做两个操作。
- SerializedMessage。反序列化出MReq,结果放在params.request。
- helper_->call(params)。调用app设置的cb,计算MResp。
- link_->processResponse(...)。把序列化后的MResp写入PollManager的Connection::write_buffer_,“唤起”PollManager-thread进入得到MResp后的逻辑。
<libros>/include/ros/service_callback_helper.h
------
template<typename MReq, typename MRes>
struct ServiceSpec
{
typedef MReq RequestType;
typedef MRes ResponseType;
typedef boost::shared_ptr<RequestType> RequestPtr;
typedef boost::shared_ptr<ResponseType> ResponsePtr;
typedef boost::function<bool(RequestType&, ResponseType&)> CallbackType;
static bool call(const CallbackType& cb, ServiceSpecCallParams<RequestType, ResponseType>& params)
{
return cb(*params.request, *params.response);
}
};
class ServiceCallbackHelperT : public ServiceCallbackHelper
{
virtual bool call(ServiceCallbackHelperCallParams& params)
{
namespace ser = serialization;
RequestPtr req(create_req_());
ResponsePtr res(create_res_());
ser::deserializeMessage(params.request, *req);
ServiceSpecCallParams<RequestType, ResponseType> call_params;
call_params.request = req;
call_params.response = res;
call_params.connection_header = params.connection_header;
bool ok = Spec::call(callback_, call_params);
params.response = ser::serializeServiceResponse(ok, *res);
return ok;
}
};ServiceCallbackHelperT::call会调用Spec::call,是Spec::call最终调用app的cb,在这过程中并没有用到connection_header。得到MResp后,立刻序列化,序列化后的结果放在params.response。要注意,这里序列化用的是ser::serializeServiceResponse,不是ser::serializeMessage。序列化后也是字节流,但这字节流更适合后面的网络传输,不能直接用于标准反序列化操作ser::deserializeMessage。