Carma-platform v4.2.0
CARMA Platform is built on robot operating system (ROS) and utilizes open source software (OSS) that enables Cooperative Driving Automation (CDA) features to allow Automated Driving Systems to interact and cooperate with infrastructure and other vehicles through communication.
ros2_rosbag.launch.py
Go to the documentation of this file.
1# Copyright (C) 2023 LEIDOS.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from launch import LaunchDescription, LaunchContext
16from launch_ros.actions import Node
17from launch.actions import OpaqueFunction
18from launch.substitutions import LaunchConfiguration
19from launch.actions import DeclareLaunchArgument, ExecuteProcess
20from launch.substitutions import PathJoinSubstitution
21from launch_ros.substitutions import FindPackageShare
22
23from datetime import datetime
24import pathlib
25import yaml
26
27# This function is used to generate a command to record a ROS 2 rosbag that excludes topics
28# topics as provided in the appropriate configuration file.
29def record_ros2_rosbag(context: LaunchContext, vehicle_config_param_file, rosbag2_qos_override_param_file):
30
31 # Convert LaunchConfiguration object to its string representation
32 vehicle_config_param_file_string = context.perform_substitution(vehicle_config_param_file)
33
34 # Initialize string that will contain the regex for topics to exclude from the ROS 2 rosbag
35 exclude_topics_regex = ""
36
37 overriding_qos_profiles = context.perform_substitution(rosbag2_qos_override_param_file)
38
39 # Open vehicle config params file to process various rosbag settings
40 with open(vehicle_config_param_file_string, 'r') as f:
41
42 vehicle_config_params = yaml.safe_load(f)
43
44 if "use_ros2_rosbag" in vehicle_config_params:
45 if vehicle_config_params["use_ros2_rosbag"] == True:
46
47 if "exclude_default" in vehicle_config_params:
48 if (vehicle_config_params["exclude_default"] == True) and ("excluded_default_topics" in vehicle_config_params):
49 for topic in vehicle_config_params["excluded_default_topics"]:
50 exclude_topics_regex += str(topic) + "|"
51
52 if "exclude_lidar" in vehicle_config_params:
53 if (vehicle_config_params["exclude_lidar"] == True) and ("excluded_lidar_topics" in vehicle_config_params):
54 for topic in vehicle_config_params["excluded_lidar_topics"]:
55 exclude_topics_regex += str(topic) + "|"
56
57 if "exclude_camera" in vehicle_config_params:
58 if (vehicle_config_params["exclude_camera"] == True) and ("excluded_camera_topics" in vehicle_config_params):
59 for topic in vehicle_config_params["excluded_camera_topics"]:
60 exclude_topics_regex += str(topic) + "|"
61
62 if "exclude_can" in vehicle_config_params:
63 if (vehicle_config_params["exclude_can"] == True) and ("excluded_can_topics" in vehicle_config_params):
64 for topic in vehicle_config_params["excluded_can_topics"]:
65 exclude_topics_regex += str(topic) + "|"
66
67 proc = ExecuteProcess(
68 cmd=['ros2', 'bag', 'record', '-a', '-s', 'mcap',
69 '--qos-profile-overrides-path', overriding_qos_profiles,
70 '-o', '/opt/carma/logs/rosbag2_' + str(datetime.now().strftime('%Y-%m-%d_%H%M%S')),
71 '-x', exclude_topics_regex],
72 output='screen',
73 shell='true'
74 )
75
76 return [proc]
77
78
80 # Declare the vehicle_config_dir launch argument
81 vehicle_config_dir = LaunchConfiguration('vehicle_config_dir')
82 declare_vehicle_config_dir_arg = DeclareLaunchArgument(
83 name = 'vehicle_config_dir',
84 default_value = "/opt/carma/vehicle/config",
85 description = "Path to file containing vehicle config directories"
86 )
87
88 # Declare the vehicle_config_param_file launch argument
89 vehicle_config_param_file = LaunchConfiguration('vehicle_config_param_file')
90 declare_vehicle_config_param_file_arg = DeclareLaunchArgument(
91 name = 'vehicle_config_param_file',
92 default_value = [vehicle_config_dir, "/VehicleConfigParams.yaml"],
93 description = "Path to file contain vehicle configuration parameters"
94 )
95
96 rosbag2_qos_override_param_file = LaunchConfiguration('rosbag2_qos_override_param_file')
97 declare_rosbag2_qos_override_param_file = DeclareLaunchArgument(
98 name='rosbag2_qos_override_param_file',
99 default_value = PathJoinSubstitution([
100 FindPackageShare('carma'),'config',
101 'rosbag2_qos_overrides.yaml'
102 ]),
103 description = "Path to file containing rosbag2 override qos settings"
104 )
105
106 return LaunchDescription([
107 declare_vehicle_config_dir_arg,
108 declare_vehicle_config_param_file_arg,
109 declare_rosbag2_qos_override_param_file,
110 OpaqueFunction(function=record_ros2_rosbag, args=[vehicle_config_param_file, rosbag2_qos_override_param_file])
111 ])
def generate_launch_description()
def record_ros2_rosbag(LaunchContext context, vehicle_config_param_file, rosbag2_qos_override_param_file)