Skip to content

Latest commit

 

History

History
370 lines (265 loc) · 12.7 KB

Executor and Callback.md

File metadata and controls

370 lines (265 loc) · 12.7 KB

Executor and Callback

根据官方设计说明,ROS1 和 ROS2 的 C++实现中均有单线程模型和多线程模型,而在 ROS2 中能自行设计颗粒度更细的线程模型

Usage

🔧 用例 1: 使用多线程执行回调函数

ROS1

  • multi-single多线程执行多个不同的回调函数(比如同时执行AB两个回调函数)
// 方案一:
ros::MultiThreadedSpinner spinner(4); // Use 4 threads
spinner.spin(); // spin() will not return until the node has been shutdown

// 方案二:(颗粒度更细,需要自行控制开关 start/stop)
ros::AsyncSpinner spinner(4); // Use 4 threads
spinner.start();
ros::waitForShutdown();
  • multi-single多线程执行多个相同的回调函数(比如同时执行两个A回调函数)
void ChatterCallback(const std_msgs::String::ConstPtr &msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());
  std::this_thread::sleep_for(0.02s);
}

int main(int argc, char **argv) {
  ros::init(argc, argv, "listener");
  ros::NodeHandle n;
  
    // 默认情况下是对相同的回调函数上互斥锁而不能同时执行
  ros::SubscribeOptions ops;
  ops.template init<std_msgs::String>("chatter", 1, ChatterCallback);
  ops.allow_concurrent_callbacks = true;
  ros::Subscriber sub1 = n.subscribe(ops);
    
  ros::MultiThreadedSpinner spinner(2);
  spinner.spin();
  return 0;
}

ROS2

Callback Group)订阅器的回调函数若没有指定 group 的从属,则使用默认的 callback group(Mutually Exclusive Callback Group

// 创建 option->配置 callback group
rclcpp::SubscriptionOptions options;
options.callback_group = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
my_subscription = this->create_subscription<Int32>("/topic", rclcpp::SensorDataQoS(), callback, options);
🔧 用例 2: 使用特定的线程处理特定的消息队列

设置一个专门处理点云回调函数的线程(线程数 = 1),处理 IMU 回调函数和定时器回调函数的线程(线程数 = 内核数)

#include <thread>

// 创建处理 IMU 回调函数和定时器回调函数的线程(线程数 = 内核数 - 1)
int core_number = std::thread::hardware_concurrency()
ros::AsyncSpinner spinner(core_number - 1); // Simple constructor. Uses the global callback queue.
spinner.start();

// >>> 创建一个专门处理点云回调函数的线程 >>>
// 创建专门接收点云的消息队列和 spinner
ros::NodeHandle pc_nh;
ros::CallbackQueue pc_queue;
// 使所有关联该句柄的订阅回调函数和定时器都走这个回调函数队列,调用 ros::spin() and ros::spinOnce() 时不会处理这部分的回调函数
pc_nh.setCallbackQueue(&pc_queue)
ros::Subscriber sub_a = pc_nh.subscribe("/velodyne_points", 1, callBackPointCloud);
// 为点云回调函数创建一个新的线程
// 方案一:
// std::thread pc_spinner_thread([&pc_queue]() {
//     ros::SingleThreadedSpinner spinner;
//     spinner.spin(&pc_queue);
// });
// pc_spinner_thread.join();

// 方案二:
ros::AsyncSpinner spinner(1, &pc_queue);
spinner.start();
ros::waitForShutdown();

FAQ

暂未找到权威的资料,部分回答为自己的推理

问题 1: queue size 的含义?

queue_size 对应的是publisher queue size(待发布数据的缓存队列)和 subscriber queue size(待处理的接收数据的缓存队列)

问题 2: rospy 和 roscpp 中 spin 的区别?

rospy.spin() 只是起阻塞作用(自旋锁/忙等),防止主进程结束;而roscpp中的 spinspinonce一方面起阻塞作用,另一方面用于触发回调函数的执行

问题 3: 发布器的数据处理逻辑?(猜测)

调用pubish()时,发布器线程(publisher thread)会将相关的原始数据放到对应的发布器队列中(publisher queue ),如果队列已满则丢弃旧的数据;自旋线程(spinner thread)再根据发布器队列中对应的数据,对数据进行序列化和进行发布

问题 4: 订阅器的数据处理逻辑?(猜测)

接收器线程(receiver thread)将接收到的序列化数据放到各自的订阅器队列中(subscriber queue )中,如果队列已满则丢弃旧的数据;自旋线程(spinner thread )根据订阅器队列中对应的数据,对数据进行反序列化和调用相关的回调函数

问题 5: 如何只处理最新的数据?

在 ros 中,可能会遇到一些很耗时的操作,比如点云配准,图像特征提取。这样的话,回调函数的处理时间就会变得很长。如果发布端发布数据的频率高于订阅端处理的速度,同时订阅端没有限制地处理所有的数据的话,就会使订阅端一直处理较旧的数据。最终的数据和数据的处理之间的时延将会很高。希望处理最新的数据的话,就需要将发布器和订阅器的队列长度设置为 1。如下为图像处理时队列长度不为 1 的效果图(左为输出效果,右为输入图像,可看出有较大的时延)(实测:inference时间和ros image数据传输耗时为 ms 级别)

img

问题 6: rospy 回调函数的多线程处理机制

rospy 中处理回调函数时会派生出一个新的线程去执行(线程名与主题名相同);如果有 n 个回调函数(处理的是不同的 topic)则会派生出 n 个线程;如果有回调函数处理的是相同 topic 则共用一个线程

img

问题 7: 各种队列
类型 作用
订阅器队列
(消息队列)
subscriber queue
用于存储将要被回调函数处理的消息;当队列已满,则移除最旧的数据
发布器队列
(消息队列)
publisher queue
用于存储 publish() 将要发布的消息;当队列已满,则移除最旧的数据
回调函数队列
callback queue
问题 8: 如何解决 ROS1 中 rosbag play 的数据丢包的问题? 方案一:降低数据发布的频率,以适配订阅端回调函数处理数据的速度 \ 方案二:在 ROS 1 中,将订阅器的 queue_size 设得足够大;rosbag play 播放时添加选项 --queue <一个足够大的数>;当 rosbag 准备播放完时,按空格键暂停,避免 rosbag 发布器中的数据停止发布。相关原理猜想为,在 ROS 的设计中,发布器要等自己的数据被订阅器处理(包括回调调用或者因为队列的维护而丢弃)了才发下一个数据。而待发的数据也会放到发布队列中,如果发布队列的数据超出上限时则丢掉最旧的数据。

测试代码:

subscriber

import time

try:
    import rclpy
    from rclpy.node import Node
    from rclpy.qos import QoSDurabilityPolicy, QoSProfile, qos_profile_sensor_data

    __ROS_VERSION__ = 2
except ImportError:
    try:
        import rospy

        __ROS_VERSION__ = 1


        class Node:
            def __init__(self, node_name):
                rospy.init_node(node_name)
    except ImportError:
        raise ImportError("Please install ROS2 or ROS1")

from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import Image


class MyNode(Node):

    def __init__(self):
        super().__init__("test_subscriber")

        if __ROS_VERSION__ == 1:
            self.sub = rospy.Subscriber("/test", PoseStamped, self.test_callback, queue_size=10000)
        elif __ROS_VERSION__ == 2:
            self.sub = self.create_subscription(PoseStamped, "/test", self.test_callback, 10000)

    def test_callback(self, msg):
        time.sleep(1)

        if __ROS_VERSION__ == 1:
            rospy.loginfo(f'Subscribe: {msg.header.stamp.secs} {msg.header.stamp.nsecs}')
        elif __ROS_VERSION__ == 2:
            self.get_logger().info(f'Subscribe: {msg.header.stamp}')


def ros1_wrapper():
    """Wrapper function for ROS1."""
    MyNode()
    rospy.spin()


def ros2_wrapper():
    """Wrapper function for ROS2."""
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    if __ROS_VERSION__ == 1:
        ros1_wrapper()
    elif __ROS_VERSION__ == 2:
        ros2_wrapper()

publisher

import time

import cv2
import numpy as np
from tqdm import tqdm

try:
    import rclpy
    from rclpy.node import Node
    from rclpy.qos import (QoSDurabilityPolicy, QoSProfile,
                           qos_profile_sensor_data)

    __ROS_VERSION__ = 2
except ImportError:
    try:
        import rospy

        __ROS_VERSION__ = 1


        class Node:
            def __init__(self, node_name):
                rospy.init_node(node_name)
    except ImportError:
        raise ImportError("Please install ROS2 or ROS1")

from geometry_msgs.msg import PoseStamped


class MyNode(Node):
    def __init__(self):
        super().__init__('test_publisher')

        if __ROS_VERSION__ == 1:
            self.pub = rospy.Publisher("/test", PoseStamped, queue_size=10000)
        elif __ROS_VERSION__ == 2:
            self.pub = self.create_publisher(PoseStamped, 'test', 10000)

        self.process()

    def process(self):
        for i in range(0, 15000):
            msg = PoseStamped()
            if __ROS_VERSION__ == 1:
                msg.header.stamp = rospy.Time.now()
                rospy.loginfo(f'Publish: {msg.header.stamp.secs} {msg.header.stamp.nsecs}')
            elif __ROS_VERSION__ == 2:
                msg.header.stamp = self.get_clock().now().to_msg()
                self.get_logger().info(f'Publish: {msg.header.stamp}')

            self.pub.publish(msg)
            time.sleep(0.1)


def ros1_wrapper():
    """Wrapper function for ROS1."""
    MyNode()
    rospy.spin()


def ros2_wrapper():
    """Wrapper function for ROS2."""
    rclpy.init()
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    if __ROS_VERSION__ == 1:
        ros1_wrapper()
    elif __ROS_VERSION__ == 2:
        ros2_wrapper()

另实测,ROS2 rclpy 不存在发布器 stop 后数据不发布的问题

需根据如下参考资料,进一步补充:

问题 9: 什么时候需要构建多个回调函数队列?

希望处理某些数据时(比如点云)不会阻塞其他的线程;希望创建多个线程,其中一个线程专门用于密集型计算的

Reference

摘要 链接
精讲多线程之 MultiThreadedSpinner https://zhuanlan.zhihu.com/p/375418691
ROS Spinning, Threading, Queuing https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f
ROSCON(concurrency (2019)) https://roscon.ros.org/2019/talks/roscon2019_concurrency.pdf
https://vimeo.com/379127709