Carma-platform v4.2.0
CARMA Platform is built on robot operating system (ROS) and utilizes open source software (OSS) that enables Cooperative Driving Automation (CDA) features to allow Automated Driving Systems to interact and cooperate with infrastructure and other vehicles through communication.
speedharm_auto_configure.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2
3# Copyright (C) 2018-2021 LEIDOS.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17
18from __future__ import print_function
19import json
20import requests
21import re
22import urlparse
23from datetime import datetime
24import time
25import calendar
26
27# Disable annoying insecure requests warnings, we're using a self-signed cert
28from requests.packages.urllib3.exceptions import InsecureRequestWarning
29requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
30
31version_id = "v0.1" # Version string for cli client
32base_url = "http://35.153.64.44:8081" # Base url of server, defaults to CARMA AWS server
33
34# Constants
35vehicles_url = "/rest/vehicles"
36experiments_url = "/rest/experiments"
37algorithms_url = "/rest/algorithms"
38default_algorithm = "gov.dot.fhwa.saxton.speedharm.executive.algorithms.ReduceSpeedAlgorithm"
39sleep_duration_millis = 1000
40
41def log(msg):
42 cur_datetime = datetime.utcnow()
43 unix_ts = calendar.timegm(cur_datetime.utctimetuple())
44 print("[" + str(unix_ts) + "] " + str(msg))
45
47 log(resp.status_code)
48 log(resp.headers)
49 log(resp.json())
50
52 r = requests.get(urlparse.urljoin(base_url, experiments_url), verify=False).json()
53 for exp in r:
54 if exp["description"].find("AUTO-CONFIG") > -1:
55 return urlparse.urljoin(base_url, experiments_url + "/" + exp["id"])
56
57 return None
58
59def create_experiment(desc, loc):
60 full_url = urlparse.urljoin(base_url, experiments_url)
61 headers = { "Content-Type" : "application/json" }
62 r = requests.post(full_url, data=json.dumps({"description": desc, "location": loc}), headers=headers, verify=False)
64 return r.headers["Location"]
65
66def assign_experiment(veh_id, experiment_url):
67 headers = { "Content-Type" : "application/json" }
68 data = json.dumps({"id": int(veh_id)})
69 r = requests.post(urlparse.urljoin(experiment_url + "/vehicles", ""), data=data, headers=headers, verify=False)
70
72 r = requests.get(urlparse.urljoin(base_url, vehicles_url), verify=False)
74 return r.json()
75
76def create_algorithm(algo_name):
77 full_url = urlparse.urljoin(base_url, algorithms_url)
78 headers = { "Content-Type" : "application/json" }
79 data = json.dumps({"className": algo_name})
80
81 r = requests.post(full_url, data=data, headers=headers, verify=False)
83 return r.headers["Location"]
84
85def assign_algorithm(veh_id, algo_url):
86 headers = { "Content-Type" : "application/json" }
87 data = json.dumps({"id": int(veh_id)})
88 r = requests.post(urlparse.urljoin(algo_url + "/vehicles", ""), data=data, headers=headers, verify=False)
90
91def main():
92 log("Speed Harmonization Auto Configurator " + version_id + ".")
93 log("Connecting to " + base_url + "...")
94 while True:
95 for veh in get_registered_veh_data():
96 if not "expId" in veh:
97 # Vehicle isn't yet assigned to an experiment, assume it's also not assigned to an algorithm
98 log("Detected new vehicle ID=" + str(veh["id"]) + " with no experiment, assigning to experiment.")
99 active_experiment_url = get_active_experiment_url()
100 if not active_experiment_url:
101 cur_datetime = datetime.utcnow()
102 unix_ts = calendar.timegm(cur_datetime.utctimetuple())
103 active_experiment_url = create_experiment("AUTO-CONFIG Experiment Generated @ " + str(unix_ts), "UNSPECIFIED")
104 log("Created new experiment at " + active_experiment_url)
105 else:
106 log("Discovered active experiment at " + active_experiment_url)
107 assign_experiment(veh["id"], active_experiment_url)
108 log("Assigned vehicle ID=" + str(veh["id"]) + " to experiment " + active_experiment_url)
109 # Vehicle isn't yet assigned to an algorithm
110 log("Detected new vehicle ID=" + str(veh["id"]) + " with no algorithm, assigning to algorithm.")
111 algo_url = create_algorithm(default_algorithm)
112 log("Created new algorithm of type " + default_algorithm + " at " + algo_url)
113 assign_algorithm(veh["id"], algo_url)
114 log("Assigned vehicle ID=" + str(veh["id"]) + " to algorithm " + algo_url)
115 log("Sleeping " + str(sleep_duration_millis) + "ms...")
116 time.sleep(sleep_duration_millis / 1000.0)
117
118if __name__ == "__main__":
119 main()
def assign_algorithm(veh_id, algo_url)
def assign_experiment(veh_id, experiment_url)