Ros

ServiceClient::call流程

  • 如果server没在运行,client的call立即失败,没有等待时间。

本文重点在ServiceClient::call,如果要想了解其它细节,像ServiceServer创建及初始化,可参考“ros源码分析(四)service的实时反馈机制”。

一、ServiceClient::call流程

图1 ServiceClient::call流程
  1. [cient-thread]以service名称为参数在rosmaster查找该服务提供者的service_url,如果没找到该service_url,返回false。
  2. [cient-thread]把call请求写入Connection::write_buffer_。然后就等待info->finished_condition_信号量。
  3. [client进程中的PollManager-thread]向service_url发送存储在write_buffer_中的call请求。
  4. [server进程中的PollManager-thread]收到call请求,以请求数据生成ServiceCallback对象,把它放入server-thread的CallbackQueue。通知callAvailable可醒来处理新消息。
  5. [server-thread]在callAvailable内,callOneCB调用ServiceCallback::call,内中会调用app注册的cb函数,得到计算结果。
  6. [server进程中的PollManager-thread]得到ServiceCallback::call的计算结果后,把结果发向client进程。
  7. [client进程中的PollManager-thread]收到了server发来的处理结果,置finished_condition_有信号。
  8. [cient-thread]检测到finished_condition_有信号,结束第二步的阻塞状态,把计算结果作为resp,反馈到app。

 

1.1、client-thread

client线程发起call。参数req是计算请求,resp存储着call的结果。call是阻塞式调用,即要等到计算出结果后才会退出。

<libros>/include/ros/service_client.h
------
@req。[IN]req需要填写的请求,到此处时已序化为字节流格式。
@resp。[out]字节流格式表示的计算结果。后绪需要把它反序列为MRes类型的resp。
bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp, const std::string& service_md5sum)
{
  if (service_md5sum != impl_->service_md5sum_)
  {
    ROS_ERROR("Call to service [%s] with md5sum [%s] does not match md5sum when the handle was created ([%s])", impl_->name_.c_str(), service_md5sum.c_str(), impl_->service_md5sum_.c_str());

    return false;
  }

  ServiceServerLinkPtr link;

  if (impl_->persistent_)
  {
    if (!impl_->server_link_)
    {
      impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);

      if (!impl_->server_link_)
      {
        return false;
      }
    }

    link = impl_->server_link_;
  }
  else
  {
    // impl_->persistent_默认是false,进这个入口
    link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);

    if (!link)
    {
      return false;
    }
  }

  bool ret = link->call(req, resp);
  // 上面的link->call是阻塞式调用,一直要有结果后才会退出。到此处,表示已得到计算结果了。
  link.reset();

  // If we're shutting down but the node haven't finished yet, wait until we do
  while (ros::isShuttingDown() && ros::ok())
  {
    ros::WallDuration(0.001).sleep();
  }

  return ret;
}
impl_->persistent_默认是false,即对每次call都会创建个新的ServiceServerLink,此次call结束后就会销毁。call依次有两个操作。
1)createServiceServerLink。为此次call创建一个ServiceServerLink对象。
2)link->call(req, resp)。内中会执行finished_condition_.wait(lock),client将在此等待,直到server执行完。

让进入第一个操作:createServiceServerLink。
ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent,
                                             const std::string& request_md5sum, const std::string& response_md5sum,
                                             const M_string& header_values)
{

  boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
  if (shutting_down_)
  {
    return ServiceServerLinkPtr();
  }

  uint32_t serv_port;
  std::string serv_host;
  if (!lookupService(service, serv_host, serv_port))
  {
    return ServiceServerLinkPtr();
  }

  TransportTCPPtr transport(boost::make_shared<TransportTCP>(&poll_manager_->getPollSet()));

  // Make sure to initialize the connection *before* transport->connect()
  // is called, otherwise we might miss a connect error (see #434).
  ConnectionPtr connection(boost::make_shared<Connection>());
  connection_manager_->addConnection(connection);
  connection->initialize(transport, false, HeaderReceivedFunc());

  if (transport->connect(serv_host, serv_port))
  {
    ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values));

    {
      boost::mutex::scoped_lock lock(service_server_links_mutex_);
      service_server_links_.push_back(client);
    }

    client->initialize(connection);

    return client;
  }

  ROSCPP_LOG_DEBUG("Failed to connect to service [%s] (mapped=[%s]) at [%s:%d]", service.c_str(), service.c_str(), serv_host.c_str(), serv_port);

  return ServiceServerLinkPtr();
}

函数首先调用lookupService(service, serv_host, serv_port),在rosmaster中参数名称是“serivce”的service,serv_host、serv_port存储着service所在node的service_url,后续请求就发向该url。

因为要进网络收发了,须创建处理此次收发的TransportTCP、Connection对象。transport连接service_url成功后,创建ServiceServerLink,并调用它的initialize方法。

一次call请求包括两部分数据:请求头(Header)和消息,在ServiceServerLink::initialize,会把请求头写入PollManager线程的write_buffer_。

图2 序列化请求头

 

要能网络发送,必须序列化call请求,即把请求变成字节流。消息部分有标准方法,图2中显示了如何序列化请求头。“Watch 1”中key_vals变量是std::map格式请求头。“Memory 1”是字节流表示的请求头,头4个字节是请求头长度。序列化出请求头后,initialize会执行断点指向的write(...),把请求头写入PollManager线程的write_buffer_。

<libros>/ros_comm/roscpp/src/libros/connection.cpp
------
void Connection::write(const boost::shared_array<uint8_t>& buffer, uint32_t size, const WriteFinishedFunc& callback, bool immediate)
{
  if (dropped_ || sending_header_error_)
  {
    return;
  }

  {
    boost::mutex::scoped_lock lock(write_callback_mutex_);

    ROS_ASSERT(!write_callback_);

    write_callback_ = callback;
    write_buffer_ = buffer;
    write_size_ = size;
    write_sent_ = 0;
    has_write_callback_ = 1;
  }

  transport_->enableWrite();

  if (immediate)
  {
    // write immediately if possible
    writeTransport();
  }
}

 

这边把请求头写入PollManager线程的write_buffer_,那边PollManager线程检测到write_buffer_有数据,启动把请求发送到service-url逻辑。

1.2 server进程中的PollManager-thread

socket线程,socket上有可读的数据后,会调用ros::Connection::readTransport(),后者会调用自定义回调函数是callback,在server时,callback指向的是ServiceClientLink::onRequest。

<libros>/ros_comm/roscpp/src/libros/service_client_link.cpp
------
@buffer。序列化后的MReq。
@size。序例化后MReq字节数。
void ServiceClientLink::onRequest(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
  (void)conn;
  if (!success)
    return;

  ROS_ASSERT(conn == connection_);

  if (ServicePublicationPtr parent = parent_.lock())
  {
    parent->processRequest(buffer, size, shared_from_this());
  }
  else
  {
    ROS_BREAK();
  }
}

onRequest是ServiceClientLink的成员函数,注意是ServiceClientLink,和“1.1、client-thread”中的ServiceServerLink是两回事。parent_的类型是boost::weak_ptr<ServicePublication>。

<libros>/ros_comm/roscpp/src/libros/service_publication.cpp
------
void ServicePublication::processRequest(boost::shared_array<uint8_t> buf, size_t num_bytes, const ServiceClientLinkPtr& link)
{
  CallbackInterfacePtr cb(boost::make_shared<ServiceCallback>(helper_, buf, num_bytes, link, has_tracked_object_, tracked_object_));
  callback_queue_->addCallback(cb, (uint64_t)this);
}
  1. 构造ServiceCallback类型对象。ServiceCallback基类是CallbackInterface。CallbackInterface就是CallbackQueue中一次callOneCB处理一个的callback
  2. callback_queue_->addCallback(...)。CallbackQueue有新消息到了,通知在server-thread运行的CallbackQueue::callAvailable可醒来处理新消息。

至此call请求已放入CallbackQueue,PollManager-thread需要一直等,直到计算出此次的结果(MResp)。但CallbackQueue::callAvailable是在server-thread执行,PollManager-thread如何知道server-thread已算出MResp?——server-thread调用app注册的cb算出MResp后,会把序列化后的MResp写入PollManager的Connection::write_buffer_,PollManager线程检测到write_buffer_有数据,就知道计算已得出MResp,而且write_buffer_存的就是序列化后的MResp。

1.3 server-thread

server-thread执行标准CallbackQuery流程:由CallbackQueue::callAvailable调用callOneCB,callOneCB调用CallbackInterface::call。针对service机制,就是ServiceCallback::call,call中则调用app设定的cb函数。callOneCB在退出前,会析构掉ServiceCallback,即此个ServiceCallback是在server进程中的PollManager-thread构造,在server-thread析构。

 

<libros>/ros_comm/roscpp/src/libros/service_publication.cpp
------
class ServiceCallback : public CallbackInterface
{
public:
  ...

  virtual CallResult call()
  {
    if (link_->getConnection()->isDropped())
    {
      return Invalid;
    }

    VoidConstPtr tracker;
    if (has_tracked_object_)
    {
      tracker = tracked_object_.lock();

      if (!tracker)
      {
        SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
        link_->processResponse(false, res);
        return Invalid;
      }
    }

    ServiceCallbackHelperCallParams params;
    // 由buffer_, num_bytes_反序列化出MReq。
    params.request = SerializedMessage(buffer_, num_bytes_);
    // 这个connection_header就是call请求的消息头,即图2中的“key_vals”变量。
    params.connection_header = link_->getConnection()->getHeader().getValues();
    try
    {

      bool ok = helper_->call(params);
      if (ok)
      {
        link_->processResponse(true, params.response);
      }
      else
      {
        SerializedMessage res = serialization::serializeServiceResponse<uint32_t>(false, 0);
        link_->processResponse(false, res);
      }
    }
    catch (std::exception& e)
    {
      ...
    }

    return Success;
  }

  ...
};

call()依次做两个操作。

  • SerializedMessage。反序列化出MReq,结果放在params.request。
  • helper_->call(params)。调用app设置的cb,计算MResp。
  • link_->processResponse(...)。把序列化后的MResp写入PollManager的Connection::write_buffer_,“唤起”PollManager-thread进入得到MResp后的逻辑。
<libros>/include/ros/service_callback_helper.h
------
template<typename MReq, typename MRes>
struct ServiceSpec
{
  typedef MReq RequestType;
  typedef MRes ResponseType;
  typedef boost::shared_ptr<RequestType> RequestPtr;
  typedef boost::shared_ptr<ResponseType> ResponsePtr;
  typedef boost::function<bool(RequestType&, ResponseType&)> CallbackType;

  static bool call(const CallbackType& cb, ServiceSpecCallParams<RequestType, ResponseType>& params)
  {
    return cb(*params.request, *params.response);
  }
};

class ServiceCallbackHelperT : public ServiceCallbackHelper
{
  virtual bool call(ServiceCallbackHelperCallParams& params)
  {
    namespace ser = serialization;
    RequestPtr req(create_req_());
    ResponsePtr res(create_res_());

    ser::deserializeMessage(params.request, *req);

    ServiceSpecCallParams<RequestType, ResponseType> call_params;
    call_params.request = req;
    call_params.response = res;
    call_params.connection_header = params.connection_header;
    bool ok = Spec::call(callback_, call_params);
    params.response = ser::serializeServiceResponse(ok, *res);
    return ok;
  }
};

ServiceCallbackHelperT::call会调用Spec::call,是Spec::call最终调用app的cb,在这过程中并没有用到connection_header。得到MResp后,立刻序列化,序列化后的结果放在params.response。要注意,这里序列化用的是ser::serializeServiceResponse,不是ser::serializeMessage。序列化后也是字节流,但这字节流更适合后面的网络传输,不能直接用于标准反序列化操作ser::deserializeMessage。

全部评论: 0

    写评论: