Carma-platform v4.2.0
CARMA Platform is built on robot operating system (ROS) and utilizes open source software (OSS) that enables Cooperative Driving Automation (CDA) features to allow Automated Driving Systems to interact and cooperate with infrastructure and other vehicles through communication.
ros2_rosbag.launch.py
Go to the documentation of this file.
1# Copyright (C) 2023 LEIDOS.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from launch import LaunchDescription, LaunchContext
16from launch_ros.actions import Node
17from launch.actions import OpaqueFunction
18from launch.substitutions import LaunchConfiguration
19from launch.actions import DeclareLaunchArgument, ExecuteProcess
20from launch.substitutions import PathJoinSubstitution
21from launch_ros.substitutions import FindPackageShare
22
23from datetime import datetime
24import pathlib
25import yaml
26
27# This function is used to generate a command to record a ROS 2 rosbag that excludes topics
28# topics as provided in the appropriate configuration file.
29def record_ros2_rosbag(context: LaunchContext, vehicle_config_param_file, rosbag2_qos_override_param_file):
30
31 # Convert LaunchConfiguration object to its string representation
32 vehicle_config_param_file_string = context.perform_substitution(vehicle_config_param_file)
33
34 # Initialize an array that will contain the regex for topics to exclude from the ROS 2 rosbag
35 exclude_topics = []
36
37 overriding_qos_profiles = context.perform_substitution(rosbag2_qos_override_param_file)
38
39 # Open vehicle config params file to process various rosbag settings
40 with open(vehicle_config_param_file_string, 'r') as f:
41
42 vehicle_config_params = yaml.safe_load(f)
43
44 if "use_ros2_rosbag" in vehicle_config_params:
45 if vehicle_config_params["use_ros2_rosbag"] == True:
46
47 if "exclude_default" in vehicle_config_params:
48 if (vehicle_config_params["exclude_default"] == True) and ("excluded_default_topics" in vehicle_config_params):
49 exclude_topics.extend(vehicle_config_params["excluded_default_topics"])
50
51 if "exclude_lidar" in vehicle_config_params:
52 if (vehicle_config_params["exclude_lidar"] == True) and ("excluded_lidar_topics" in vehicle_config_params):
53 exclude_topics.extend(vehicle_config_params["excluded_lidar_topics"])
54
55 if "exclude_camera" in vehicle_config_params:
56 if (vehicle_config_params["exclude_camera"] == True) and ("excluded_camera_topics" in vehicle_config_params):
57 exclude_topics.extend(vehicle_config_params["excluded_camera_topics"])
58
59 if "exclude_can" in vehicle_config_params:
60 if (vehicle_config_params["exclude_can"] == True) and ("excluded_can_topics" in vehicle_config_params):
61 exclude_topics.extend(vehicle_config_params["excluded_can_topics"])
62
63 # Join the topics with | to create a proper regex
64 exclude_topics_regex = "|".join(exclude_topics) if exclude_topics else ""
65
66 proc = ExecuteProcess(
67 cmd=['ros2', 'bag', 'record', '-a', '-s', 'mcap',
68 '--qos-profile-overrides-path', overriding_qos_profiles,
69 '-o', '/opt/carma/logs/rosbag2_' + str(datetime.now().strftime('%Y-%m-%d_%H%M%S')),
70 '-x', exclude_topics_regex],
71 output='screen',
72 shell='true'
73 )
74
75 return [proc]
76
77
79 # Declare the vehicle_config_dir launch argument
80 vehicle_config_dir = LaunchConfiguration('vehicle_config_dir')
81 declare_vehicle_config_dir_arg = DeclareLaunchArgument(
82 name = 'vehicle_config_dir',
83 default_value = "/opt/carma/vehicle/config",
84 description = "Path to file containing vehicle config directories"
85 )
86
87 # Declare the vehicle_config_param_file launch argument
88 vehicle_config_param_file = LaunchConfiguration('vehicle_config_param_file')
89 declare_vehicle_config_param_file_arg = DeclareLaunchArgument(
90 name = 'vehicle_config_param_file',
91 default_value = [vehicle_config_dir, "/VehicleConfigParams.yaml"],
92 description = "Path to file contain vehicle configuration parameters"
93 )
94
95 rosbag2_qos_override_param_file = LaunchConfiguration('rosbag2_qos_override_param_file')
96 declare_rosbag2_qos_override_param_file = DeclareLaunchArgument(
97 name='rosbag2_qos_override_param_file',
98 default_value = PathJoinSubstitution([
99 FindPackageShare('carma'),'config',
100 'rosbag2_qos_overrides.yaml'
101 ]),
102 description = "Path to file containing rosbag2 override qos settings"
103 )
104
105 return LaunchDescription([
106 declare_vehicle_config_dir_arg,
107 declare_vehicle_config_param_file_arg,
108 declare_rosbag2_qos_override_param_file,
109 OpaqueFunction(function=record_ros2_rosbag, args=[vehicle_config_param_file, rosbag2_qos_override_param_file])
110 ])
def generate_launch_description()
def record_ros2_rosbag(LaunchContext context, vehicle_config_param_file, rosbag2_qos_override_param_file)