15from launch 
import LaunchDescription, LaunchContext
 
   16from launch_ros.actions 
import Node
 
   17from launch.actions 
import OpaqueFunction
 
   18from launch.substitutions 
import LaunchConfiguration
 
   19from launch.actions 
import DeclareLaunchArgument, ExecuteProcess
 
   20from launch.substitutions 
import PathJoinSubstitution
 
   21from launch_ros.substitutions 
import FindPackageShare
 
   23from datetime 
import datetime
 
   29def record_ros2_rosbag(context: LaunchContext, vehicle_config_param_file, rosbag2_qos_override_param_file):
 
   32    vehicle_config_param_file_string = context.perform_substitution(vehicle_config_param_file)
 
   37    overriding_qos_profiles = context.perform_substitution(rosbag2_qos_override_param_file)
 
   40    with open(vehicle_config_param_file_string, 
'r') 
as f:
 
   42        vehicle_config_params = yaml.safe_load(f)
 
   44        if "use_ros2_rosbag" in vehicle_config_params:
 
   45            if vehicle_config_params[
"use_ros2_rosbag"] == 
True:
 
   47                if "exclude_default" in vehicle_config_params:
 
   48                    if (vehicle_config_params[
"exclude_default"] == 
True) 
and (
"excluded_default_topics" in vehicle_config_params):
 
   49                        exclude_topics.extend(vehicle_config_params[
"excluded_default_topics"])
 
   51                if "exclude_lidar" in vehicle_config_params:
 
   52                    if (vehicle_config_params[
"exclude_lidar"] == 
True) 
and (
"excluded_lidar_topics" in vehicle_config_params):
 
   53                        exclude_topics.extend(vehicle_config_params[
"excluded_lidar_topics"])
 
   55                if "exclude_camera" in vehicle_config_params:
 
   56                    if (vehicle_config_params[
"exclude_camera"] == 
True) 
and (
"excluded_camera_topics" in vehicle_config_params):
 
   57                        exclude_topics.extend(vehicle_config_params[
"excluded_camera_topics"])
 
   59                if "exclude_can" in vehicle_config_params:
 
   60                    if (vehicle_config_params[
"exclude_can"] == 
True) 
and (
"excluded_can_topics" in vehicle_config_params):
 
   61                        exclude_topics.extend(vehicle_config_params[
"excluded_can_topics"])
 
   64                exclude_topics_regex = 
"|".join(exclude_topics) 
if exclude_topics 
else "" 
   66                proc = ExecuteProcess(
 
   68                            'bash', 
'-c', 
'ros2 bag record -a -s mcap --qos-profile-overrides-path ' 
   69                            + overriding_qos_profiles + 
' -o /opt/carma/logs/rosbag2_' 
   70                            + 
str(datetime.now().strftime(
'%Y-%m-%d_%H%M%S')) + 
' -x "' 
   71                            + exclude_topics_regex
 
   73                            + 
'" >/dev/null 2>&1'],  
 
   83    vehicle_config_dir = LaunchConfiguration(
'vehicle_config_dir')
 
   84    declare_vehicle_config_dir_arg = DeclareLaunchArgument(
 
   85        name = 
'vehicle_config_dir',
 
   86        default_value = 
"/opt/carma/vehicle/config",
 
   87        description = 
"Path to vehicle configuration directory populated by carma-config" 
   91    vehicle_config_param_file = LaunchConfiguration(
'vehicle_config_param_file')
 
   92    declare_vehicle_config_param_file_arg = DeclareLaunchArgument(
 
   93        name = 
'vehicle_config_param_file',
 
   94        default_value = [vehicle_config_dir, 
"/VehicleConfigParams.yaml"],
 
   95        description = 
"Path to file contain vehicle configuration parameters" 
   98    rosbag2_qos_override_param_file = LaunchConfiguration(
'rosbag2_qos_override_param_file')
 
   99    declare_rosbag2_qos_override_param_file = DeclareLaunchArgument(
 
  100        name=
'rosbag2_qos_override_param_file',
 
  101        default_value = PathJoinSubstitution([
 
  102                    FindPackageShare(
'carma'),
'config',
 
  103                    'rosbag2_qos_overrides.yaml' 
  105        description = 
"Path to file containing rosbag2 override qos settings" 
  108    return LaunchDescription([
 
  109        declare_vehicle_config_dir_arg,
 
  110        declare_vehicle_config_param_file_arg,
 
  111        declare_rosbag2_qos_override_param_file,
 
  112        OpaqueFunction(function=record_ros2_rosbag, args=[vehicle_config_param_file, rosbag2_qos_override_param_file])
 
def generate_launch_description()
def record_ros2_rosbag(LaunchContext context, vehicle_config_param_file, rosbag2_qos_override_param_file)