15from launch
import LaunchDescription, LaunchContext
16from launch_ros.actions
import Node
17from launch.actions
import OpaqueFunction
18from launch.substitutions
import LaunchConfiguration
19from launch.actions
import DeclareLaunchArgument, ExecuteProcess
20from launch.substitutions
import PathJoinSubstitution
21from launch_ros.substitutions
import FindPackageShare
23from datetime
import datetime
29def record_ros2_rosbag(context: LaunchContext, vehicle_config_param_file, rosbag2_qos_override_param_file):
32 vehicle_config_param_file_string = context.perform_substitution(vehicle_config_param_file)
37 overriding_qos_profiles = context.perform_substitution(rosbag2_qos_override_param_file)
40 with open(vehicle_config_param_file_string,
'r')
as f:
42 vehicle_config_params = yaml.safe_load(f)
44 if "use_ros2_rosbag" in vehicle_config_params:
45 if vehicle_config_params[
"use_ros2_rosbag"] ==
True:
47 if "exclude_default" in vehicle_config_params:
48 if (vehicle_config_params[
"exclude_default"] ==
True)
and (
"excluded_default_topics" in vehicle_config_params):
49 exclude_topics.extend(vehicle_config_params[
"excluded_default_topics"])
51 if "exclude_lidar" in vehicle_config_params:
52 if (vehicle_config_params[
"exclude_lidar"] ==
True)
and (
"excluded_lidar_topics" in vehicle_config_params):
53 exclude_topics.extend(vehicle_config_params[
"excluded_lidar_topics"])
55 if "exclude_camera" in vehicle_config_params:
56 if (vehicle_config_params[
"exclude_camera"] ==
True)
and (
"excluded_camera_topics" in vehicle_config_params):
57 exclude_topics.extend(vehicle_config_params[
"excluded_camera_topics"])
59 if "exclude_can" in vehicle_config_params:
60 if (vehicle_config_params[
"exclude_can"] ==
True)
and (
"excluded_can_topics" in vehicle_config_params):
61 exclude_topics.extend(vehicle_config_params[
"excluded_can_topics"])
64 exclude_topics_regex =
"|".join(exclude_topics)
if exclude_topics
else ""
66 proc = ExecuteProcess(
68 'bash',
'-c',
'ros2 bag record -a -s mcap --qos-profile-overrides-path '
69 + overriding_qos_profiles +
' -o /opt/carma/logs/rosbag2_'
70 +
str(datetime.now().strftime(
'%Y-%m-%d_%H%M%S')) +
' -x "'
71 + exclude_topics_regex
73 +
'" >/dev/null 2>&1'],
83 vehicle_config_dir = LaunchConfiguration(
'vehicle_config_dir')
84 declare_vehicle_config_dir_arg = DeclareLaunchArgument(
85 name =
'vehicle_config_dir',
86 default_value =
"/opt/carma/vehicle/config",
87 description =
"Path to file containing vehicle config directories"
91 vehicle_config_param_file = LaunchConfiguration(
'vehicle_config_param_file')
92 declare_vehicle_config_param_file_arg = DeclareLaunchArgument(
93 name =
'vehicle_config_param_file',
94 default_value = [vehicle_config_dir,
"/VehicleConfigParams.yaml"],
95 description =
"Path to file contain vehicle configuration parameters"
98 rosbag2_qos_override_param_file = LaunchConfiguration(
'rosbag2_qos_override_param_file')
99 declare_rosbag2_qos_override_param_file = DeclareLaunchArgument(
100 name=
'rosbag2_qos_override_param_file',
101 default_value = PathJoinSubstitution([
102 FindPackageShare(
'carma'),
'config',
103 'rosbag2_qos_overrides.yaml'
105 description =
"Path to file containing rosbag2 override qos settings"
108 return LaunchDescription([
109 declare_vehicle_config_dir_arg,
110 declare_vehicle_config_param_file_arg,
111 declare_rosbag2_qos_override_param_file,
112 OpaqueFunction(function=record_ros2_rosbag, args=[vehicle_config_param_file, rosbag2_qos_override_param_file])
def generate_launch_description()
def record_ros2_rosbag(LaunchContext context, vehicle_config_param_file, rosbag2_qos_override_param_file)