15from launch
import LaunchDescription, LaunchContext
16from launch_ros.actions
import Node
17from launch.actions
import OpaqueFunction
18from launch.substitutions
import LaunchConfiguration
19from launch.actions
import DeclareLaunchArgument, ExecuteProcess
20from launch.substitutions
import PathJoinSubstitution
21from launch_ros.substitutions
import FindPackageShare
23from datetime
import datetime
29def record_ros2_rosbag(context: LaunchContext, vehicle_config_param_file, rosbag2_qos_override_param_file):
32 vehicle_config_param_file_string = context.perform_substitution(vehicle_config_param_file)
35 exclude_topics_regex =
""
37 overriding_qos_profiles = context.perform_substitution(rosbag2_qos_override_param_file)
40 with open(vehicle_config_param_file_string,
'r')
as f:
42 vehicle_config_params = yaml.safe_load(f)
44 if "use_ros2_rosbag" in vehicle_config_params:
45 if vehicle_config_params[
"use_ros2_rosbag"] ==
True:
47 if "exclude_default" in vehicle_config_params:
48 if (vehicle_config_params[
"exclude_default"] ==
True)
and (
"excluded_default_topics" in vehicle_config_params):
49 for topic
in vehicle_config_params[
"excluded_default_topics"]:
50 exclude_topics_regex += str(topic) +
"|"
52 if "exclude_lidar" in vehicle_config_params:
53 if (vehicle_config_params[
"exclude_lidar"] ==
True)
and (
"excluded_lidar_topics" in vehicle_config_params):
54 for topic
in vehicle_config_params[
"excluded_lidar_topics"]:
55 exclude_topics_regex += str(topic) +
"|"
57 if "exclude_camera" in vehicle_config_params:
58 if (vehicle_config_params[
"exclude_camera"] ==
True)
and (
"excluded_camera_topics" in vehicle_config_params):
59 for topic
in vehicle_config_params[
"excluded_camera_topics"]:
60 exclude_topics_regex += str(topic) +
"|"
62 if "exclude_can" in vehicle_config_params:
63 if (vehicle_config_params[
"exclude_can"] ==
True)
and (
"excluded_can_topics" in vehicle_config_params):
64 for topic
in vehicle_config_params[
"excluded_can_topics"]:
65 exclude_topics_regex += str(topic) +
"|"
67 proc = ExecuteProcess(
68 cmd=[
'ros2',
'bag',
'record',
'-a',
'-s',
'mcap',
69 '--qos-profile-overrides-path', overriding_qos_profiles,
70 '-o',
'/opt/carma/logs/rosbag2_' + str(datetime.now().strftime(
'%Y-%m-%d_%H%M%S')),
71 '-x', exclude_topics_regex],
81 vehicle_config_dir = LaunchConfiguration(
'vehicle_config_dir')
82 declare_vehicle_config_dir_arg = DeclareLaunchArgument(
83 name =
'vehicle_config_dir',
84 default_value =
"/opt/carma/vehicle/config",
85 description =
"Path to file containing vehicle config directories"
89 vehicle_config_param_file = LaunchConfiguration(
'vehicle_config_param_file')
90 declare_vehicle_config_param_file_arg = DeclareLaunchArgument(
91 name =
'vehicle_config_param_file',
92 default_value = [vehicle_config_dir,
"/VehicleConfigParams.yaml"],
93 description =
"Path to file contain vehicle configuration parameters"
96 rosbag2_qos_override_param_file = LaunchConfiguration(
'rosbag2_qos_override_param_file')
97 declare_rosbag2_qos_override_param_file = DeclareLaunchArgument(
98 name=
'rosbag2_qos_override_param_file',
99 default_value = PathJoinSubstitution([
100 FindPackageShare(
'carma'),
'config',
101 'rosbag2_qos_overrides.yaml'
103 description =
"Path to file containing rosbag2 override qos settings"
106 return LaunchDescription([
107 declare_vehicle_config_dir_arg,
108 declare_vehicle_config_param_file_arg,
109 declare_rosbag2_qos_override_param_file,
110 OpaqueFunction(function=record_ros2_rosbag, args=[vehicle_config_param_file, rosbag2_qos_override_param_file])
def generate_launch_description()
def record_ros2_rosbag(LaunchContext context, vehicle_config_param_file, rosbag2_qos_override_param_file)