要求
- 至少能在Android、Windows上运行。Android上用NDK编译,Windows上用Visual Studio编译。
- 在Android,只支持进程内不复制(nocopy-intra)。Windows则需要支持nocopy-intra和Ros原有一node一进程,并可以灵活切换。
一、进程内不复制(nocopy-intra)
进程内不复制包含两个要求:
- 所有node运行在同一个进程。实现方法是改为每个node运行在一个新线程。在操作系统,可认为一个进程就是一个app,因而进程内不复制意味着所有功能要运行一个app,对windows就是一个*.exe。
- node间传递消息时尽可能避免序列化/反序列化,以及减少复制次数。原有一node一进程时,node间通信要使用xml-rpc,像发布、订阅话题,消息往往有着这样流程:node#1生成消息 --> 复制消息(可能没有,可能N次) --> 消息对象序列化为xml格式的字节流 ==>socket==> node#2反序列化回消息对象 --> 复制消息(可能没有,可能N次) --> node#2使用消息。因为node#1、node#2是在两个进程,为适应两进程间xml-rpc发送,消息不得不进行序列化、反序列化。在进程内不复制时,node#1、node#2已在同一个进程,用着同一个内存空间,node#2可简单地用新指针指向或内存复制来得到node#1生成的消息对象。“进程内不复制”中的不复制就是要在过程中不执行消息的序列化、反序列化,同时尽可能减少消息对象复制次数。在这过程中可能会更多使用std::shared_ptr/boost::shared_ptr,相比序列化、反序列、复制等开销,构造shared_ptr的基本可忽略不计,尤其遇到大内存消息对象,像nav_msgs::OccupancyGrid。
<libros>/ros_comm/roscpp/src/libros/common.cpp ------ namespace ros { #ifdef _WIN32 bool nocopy_intra = false; #else bool nocopy_intra = true; #endif bool disable_latching = nocopy_intra || true; }
为支持nocopy-intra,以及和Ros已有的机制相互切换,移植时增加了两个变量。
- nocopy_intra。true时表示使用nocopy-intra,false时表示保持Ros原有node机制。由于android下,一个node对应一个进程是不现实的,在android强制使用nocopy-intra。windows则须要同时支持两种。
- disable_latching。true时表示强制禁用掉发布topic时的latching,即publisher不会有保留最后一个消息功能。在nocopy-intra时,不能让publisher出现需要序列化的消息,所以要求disable_latching是true。强制禁用latching是否会引出副作用,须要做更广泛讨论。
1.1 每个node须要有各自的CallbackQueue,禁止使用g_global_queue
Ros原有机制下,因为每个node运行在新进程,天然保证了每个g_global_queue各自独立。node改为运行在一个线程后,若用g_global_queue就极容易出现有多个node共用一个CallbackQueue。为减少查问题难度,遵循一个规则:禁止使用g_global_queue。
<libros>/ros_comm/roscpp/src/libros/node_handle.cpp ------ Subscriber NodeHandle::subscribe(SubscribeOptions& ops) { ops.topic = resolveName(ops.topic); if (ops.callback_queue == 0) { if (callback_queue_) { ops.callback_queue = callback_queue_; } else { // 一旦进入这里,意味着要使用g_global_queue。 ops.callback_queue = getGlobalCallbackQueue(); 有效性诊断,禁止进这个入口 VALIDATE(false, null_str); } } .... }
各node须自个提供CallbackQueue,为检查代码是否满足了这个要求,可在node_handle.cpp做有类似上面的有效性诊断,调用下面这些函数时callback_queue_不能是nullptr。
- Publisher NodeHandle::advertise(AdvertiseOptions& ops)
- ServiceServer NodeHandle::advertiseService(AdvertiseServiceOptions& ops)
- Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
- Timer NodeHandle::createTimer(TimerOptions& ops) const
出于同样原因,要禁止调用会使用g_global_queue的api,像spin()、spinOnce()。
1.2 node代码
各node为实现自个提供CallbackQueue,做类似下面修改。
#include <ros/callback_queue.h> void package__node(bool& exit) { ros::CallbackQueue cbqueue; n.setCallbackQueue(&cbqueue); ros::WallDuration timeout(0.1f); while (!exit & ros::ok()) { cbqueue.callAvailable(timeout); } }
尽量不要使用“cbqueue.callAvailable(ros::WallDuration())”,这种调用没有sleep,这会让cpu占用率很快到100%。
不要存在全局的ros::Publisher、ros::Subscriber、ros::NodeHandle。一旦定义为全局,那它们在析构前,如果已经调用过ros::shutdown(最后一个ros::NodeHandle已析构),析构它们时会导致非法。所以为了安全,不要把它们定义全局变量。
怎么把全局改为非全局,可参考<libros>/ros_comm/topic_tools/src/throttle.cpp。
node的主函数main,改到一node一线程后,不可能叫main了,命名统一用“<功能包名>__<node名>”格式。举个例子,“cartographer_ros__cartographer_node”表示“cartographer_ros”功能包中的“cartographer_node”节点。tf__static_transform_publisher则是“tf”功能包中的“static_transform_publisher”节点
1.3 ros master
nocopy-intra将不会使用xml-rpc,但考虑到灵活切换,没有彻底不编译xml-rpc。改动绝大部分在master.cpp。
<libros>/ros_comm/roscpp/src/libros/master.cpp ------ static trosmaster rosmaster; bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlRpc::XmlRpcValue& response, XmlRpc::XmlRpcValue& payload, bool wait_for_master) { threading::lock lock(rosmaster.mutex_); XmlRpc::XmlRpcValue payload2; int methodid = rosmaster.methodid(method); if (methodid != nposm) { XmlRpc::XmlRpcValue& _payload = ros::nocopy_intra? payload: payload2; bool ret = rosmaster.handle(methodid, request, _payload); if (ros::nocopy_intra) { return ret; } } else { VALIDATE(!ros::nocopy_intra, null_str); } ... }
ros要向ros master发请求时一定会调用execute。在nocopy_intra时,rosmaster处理完后就返回,不再发向后续的xml-rpc。
trosmaster是为支持nocopy_intra专门增加的类,在master.cpp定义并实现。
二、去除plugin机制
一来plugin依赖的库太难编译,像poco,二来计划用别的方法(lua)代替这功能,决定不支持plugin机制。因为不支持plugin了,须要修改使用了该机制的node代码。下面以替换move_base中的bgp_loader_为例进行说明。
1、注释掉move_base.h中的定义bgp_loader_
2、修改planner_的创建方法
planner_ = bgp_loader_.createInstance(global_planner); planner_->initialize(bgp_loader_.getName(global_planner), planner_costmap_ros_); 替换为 planner_.reset(new global_planner::GlobalPlanner); planner_->initialize("GlobalPlanner", planner_costmap_ros_);
3、注释掉PLUGINLIB_EXPORT_CLASS
// PLUGINLIB_EXPORT_CLASS(costmap_2d::StaticLayer, costmap_2d::Layer)
去除plugin机制后,下面的头文件或目录就不在ros源码中了。
- 目录。<libros>/include/class_loader
- 目录。<libros>/include/pluginlib
- 文件。<libros>/pluginlib/class_list_macros.h
- 文件。<libros>/pluginlib/class_loader.hpp
三、注意条件ros::ok(),避免让进入死循环
改为所有node运行在同一个进程后,意味着要所有node退出后ros::ok()才会返回true,某个node若以ros::ok()为结束条件,那会造成死循环。举个例子,MoveBase要使用Costmap2DROS。构造Costmap2DROS时,它会“死”等odom和base_footprint两坐标系之间可转换。
Costmap2DROS::Costmap2DROS(...) { ... while (ros::ok() && !tf_.canTransform(global_frame_, robot_base_frame_, ros::Time(), ros::Duration(0.1), &tf_error)) { ... } }
由于ros::ok()一直true,发布这两坐标转换关系的node可能已终止,tf_.canTransform就一直是false,上面这个while就进入死循环,须要增加条件让能退出这个循环。
Costmap2DROS::Costmap2DROS(...) { ... while (ros::ok() && !exit && !tf_.canTransform(global_frame_, robot_base_frame_, ros::Time(), ros::Duration(0.1), &tf_error)) { ... } }
四、去除boost::thread
一开始把boost::thread想简单了,认为去除boost::thread就可不编译<boost>/libs/thread。去除之后发现还是需要编译thread。开发者如果认为没必要,那可略过这里。
个人对代码要求是简洁、易懂,认为boost库是严重违背这原则,所以尽可能不用boost。rose已提供了一个基于chromium线程模型写的net::tworker,修改思路就是用它代替boost::thread。下面用move_base.h中的planner_thread_为例进行说明。
<libros>/include/move_base/move_base.h ------ boost::thread* planner_thread_; 改为 net::tworker* planner_thread_; <libros>/move_base/src/move_base.cpp ------ planner_thread_ = new boost::thread(boost::bind(&MoveBase::planThread, this)); 改为 planner_thread_ = new net::tworker(boost::bind(&MoveBase::planThread, this, _1), NULL, NULL, boost::bind(&MoveBase::OnTriggerExit, this), "MoveBase_planThread"); void MoveBase::OnTriggerExit() { planner_cond_.notify_one(); }
如果在终止boost线程时用了interrupt(),那要关注在线程函数是否使用了等待中断点。在终止线程时,须额外唤醒那些中断点。MoveBase恰是用了interrupt(),因而需提供一个额外触发退出函数OnTriggerExit,在该函数让planner_cond_有信号。
五、调试输出
5.1. 不要使用::ros::console::levels::Debug级别的输出函数
像ROS_DEBUG_STREAM_NAMED,因为level是::ros::console::levels::Debug时,isEnabledFor总是返回false。估计是levels::Debug级别的输出太多,光输出就要耗掉极大资源,官方默认就把它禁了。
bool isEnabledFor(void* handle, ::ros::console::Level level) { return level != ::ros::console::levels::Debug; }
5.2 修改最低可输出的调试级别
<libros>/console_bridge/console.cpp ------ struct DefaultOutputHandler { DefaultOutputHandler(void) { output_handler_ = static_cast<console_bridge::OutputHandler*>(&std_output_handler_); previous_output_handler_ = output_handler_; logLevel_ = console_bridge::CONSOLE_BRIDGE_LOG_WARN; { // 修正自已希望的最低调试级别到logLevel_。 logLevel_ = console_bridge::CONSOLE_BRIDGE_LOG_DEBUG; } } ... };
5.3 修改自已控制台输出函数
ros默认控制台输出函数是fprintf,在Visual Studio,这时输出到Dos风格控制台,不是常用的Output窗口。为输出到Output,须要修改。这把它改为跨平台输出函数SDL_Log。
/rosconsole/src/rosconsole/rosconsole.cpp ------ void Formatter::print(void* logger_handle, ::ros::console::Level level, const char* str, const char* file, const char* function, int line) { // print in red to stderr if level doesn't match any of the predefined ones const char* color = COLOR_RED; FILE* f = stderr; ... // 注释掉默认fprintf,改为自已的SDL_Log。 // 再添加“f = nullptr”,是为了让接下那个if不会生效。 // fprintf(f, "%s\n", ss.str().c_str()); SDL_Log("%s", ss.str().c_str()); f = nullptr; if (g_force_stdout_line_buffered && f == stdout) { ... } }
六、boost::function/boost::bind改为std::function/std::bind
会要求编辑环境至少c++11,而混用boost::function/boost::bind、std::function/std::bind,会给编译带来麻烦。和这修改对应,占位符“boost::placeholders::_1”等,写要改为“_1”。
七、C++编译器升到C++17,要做的修改
升到C++17后,一些地方或不修改,会出警告或出错。
7.1 去除<std::allocator<void> >
typedef ::geometry_msgs::Pose_<std::allocator<void> > Pose; 改为 typedef ::geometry_msgs::Pose_<std::allocator<char> > Pose;
7.2 修改typename ContainerAllocator::template rebind<char>::other
typedef std::basic_string<char, std::char_traits<char>, typename ContainerAllocator::template rebind<char>::other > _child_frame_id_type; 改为 typedef std::basic_string<char, std::char_traits<char>, typename std::allocator_traits<ContainerAllocator>::template rebind_alloc<char> > _child_frame_id_type;
7.3 去除public std::iterator<...>
class XYIndexRangeIterator : public std::iterator<std::input_iterator_tag, Eigen::Array2i> { ... }; ==> class XYIndexRangeIterator { public: // 定义迭代器所需的类型别名 using iterator_category = std::input_iterator_tag; using value_type = Eigen::Array2i; using difference_type = std::ptrdiff_t; // 迭代器距离类型 using pointer = Eigen::Array2i*; // 值类型指针 using reference = Eigen::Array2i&; // 值类型引用 // 类的其他成员(构造函数、运算符重载等) // ... };
八、针对一些场景时,修改代码让方便调试
这些修改只在windows系统上有效
移动后执行某个任务,但希望移动时不会让机器人产生移动,即不会发cmd_vel。
示例:要测试充电时的对准充电桩。
- 不要连IMU。
- 选中地图中一个位置,该位置的yaw不限角度。
- dwa_planner_ros.cpp中的dbg_charge_align改为true。有想到是在选位置时,选的(x, y)都小于0.1,但这个在第二次导航时,是可能导致移动的,还是改dbg_charge_align为true安全。