in src/markov/boto/s3/files/model_metadata.py [0:0]
def parse_model_metadata(local_model_metadata_path):
"""parse model metadata give the local path
Args:
local_model_metadata_path (str): local model metadata string
Returns:
dict (str, obj): dictionary of all required information parsed out of model_metadata.json file
"""
model_metadata = dict()
try:
with open(local_model_metadata_path, "r") as json_file:
data = json.load(json_file)
# simapp_version 2.0+ should contain version as key in
# model_metadata.json
if ModelMetadataKeys.ACTION_SPACE.value not in data:
raise ValueError("no action space defined")
action_values = data[ModelMetadataKeys.ACTION_SPACE.value]
if ModelMetadataKeys.VERSION.value in data:
simapp_version = float(data[ModelMetadataKeys.VERSION.value])
if simapp_version >= SIMAPP_VERSION_2:
sensor = data[ModelMetadataKeys.SENSOR.value]
else:
sensor = [Input.OBSERVATION.value]
else:
if ModelMetadataKeys.SENSOR.value in data:
sensor = data[ModelMetadataKeys.SENSOR.value]
simapp_version = SIMAPP_VERSION_2
else:
sensor = [Input.OBSERVATION.value]
simapp_version = SIMAPP_VERSION_1
# Model Metadata Sensor validation
camera_sensors = [Input.OBSERVATION.value, Input.CAMERA.value, Input.LEFT_CAMERA.value,
Input.STEREO.value]
camera_sensor_type_count = sum([sensor_elem in camera_sensors for sensor_elem in sensor])
if camera_sensor_type_count != 1:
raise ValueError("model_metadata MUST contain only 1 camera sensor: {}".format(sensor))
lidar_sensors = [Input.LIDAR.value, Input.SECTOR_LIDAR.value, Input.DISCRETIZED_SECTOR_LIDAR.value]
lidar_sensor_type_count = sum([sensor_elem in lidar_sensors for sensor_elem in sensor])
if lidar_sensor_type_count > 1:
raise ValueError("model_metadata contains more than 1 lidar sensor: {}".format(sensor))
num_lidar_sectors_key = ModelMetadataKeys.NUM_SECTORS.value
num_values_per_sector_key = ModelMetadataKeys.NUM_VALUES_PER_SECTOR.value
clipping_dist_key = ModelMetadataKeys.CLIPPING_DISTANCE.value
lidar_config = {num_lidar_sectors_key: DiscretizedSectorLidarDefaults.NUMBER_OF_SECTORS,
num_values_per_sector_key: DiscretizedSectorLidarDefaults.NUM_VALUES_PER_SECTOR,
clipping_dist_key: DiscretizedSectorLidarDefaults.CLIPPING_DIST}
if ModelMetadataKeys.LIDAR_CONFIG.value in data:
config = data[ModelMetadataKeys.LIDAR_CONFIG.value]
if num_lidar_sectors_key in config:
config[num_lidar_sectors_key] = int(config[num_lidar_sectors_key])
if config[num_lidar_sectors_key] <= 0 or \
LIDAR_360_DEGREE_SAMPLE % config[num_lidar_sectors_key] != 0:
err_msg_format = "{} % Number of sector ({}) must be 0!"
err_msg = err_msg_format.format(LIDAR_360_DEGREE_SAMPLE,
config[num_lidar_sectors_key])
raise ValueError(err_msg)
lidar_config[num_lidar_sectors_key] = config[num_lidar_sectors_key]
if num_values_per_sector_key in config:
config[num_values_per_sector_key] = int(config[num_values_per_sector_key])
if config[num_values_per_sector_key] < 1:
err_msg_format = "Number of values per sector ({}) cannot be smaller than 1."
err_msg = err_msg_format.format(config[num_values_per_sector_key])
raise ValueError(err_msg)
lidar_config[num_values_per_sector_key] = config[num_values_per_sector_key]
if clipping_dist_key in config:
if LIDAR_360_DEGREE_MIN_RANGE > config[clipping_dist_key] > LIDAR_360_DEGREE_MAX_RANGE:
err_msg_format = "Clipping distance ({}) must be between {} and {} inclusively."
err_msg = err_msg_format.format(config[num_values_per_sector_key],
LIDAR_360_DEGREE_MIN_RANGE,
LIDAR_360_DEGREE_MAX_RANGE)
raise ValueError(err_msg)
lidar_config[clipping_dist_key] = config[clipping_dist_key]
if ModelMetadataKeys.NEURAL_NETWORK.value in data:
network = data[ModelMetadataKeys.NEURAL_NETWORK.value]
else:
network = NeuralNetwork.DEEP_CONVOLUTIONAL_NETWORK_SHALLOW.value
training_algorithm = TrainingAlgorithm.CLIPPED_PPO.value
if ModelMetadataKeys.TRAINING_ALGORITHM.value in data:
data_training_algorithm = data[ModelMetadataKeys.TRAINING_ALGORITHM.value].lower().strip()
# Update the training algorithm value if its valid else log and exit
if TrainingAlgorithm.has_training_algorithm(data_training_algorithm):
training_algorithm = data_training_algorithm
else:
log_and_exit("Unknown training_algorithm found while parsing model_metadata. \
training_algorithm: {}".format(data_training_algorithm),
SIMAPP_SIMULATION_WORKER_EXCEPTION,
SIMAPP_EVENT_ERROR_CODE_500)
action_space_type = ActionSpaceTypes.DISCRETE.value
if ModelMetadataKeys.ACTION_SPACE_TYPE.value in data:
data_action_space_type = data[ModelMetadataKeys.ACTION_SPACE_TYPE.value].lower().strip()
# Update the training algorithm value if its valid else log and exit
if ActionSpaceTypes.has_action_space(data_action_space_type):
action_space_type = data_action_space_type
else:
log_and_exit("Unknown action_space_type found while parsing model_metadata. \
action_space_type: {}".format(data_action_space_type),
SIMAPP_SIMULATION_WORKER_EXCEPTION,
SIMAPP_EVENT_ERROR_CODE_500)
LOG.info("Sensor list %s, network %s, simapp_version %s, training_algorithm %s, action_space_type %s lidar_config %s",
sensor, network,
simapp_version, training_algorithm, action_space_type, lidar_config)
model_metadata[ModelMetadataKeys.SENSOR.value] = sensor
model_metadata[ModelMetadataKeys.NEURAL_NETWORK.value] = network
model_metadata[ModelMetadataKeys.VERSION.value] = simapp_version
model_metadata[ModelMetadataKeys.TRAINING_ALGORITHM.value] = training_algorithm
model_metadata[ModelMetadataKeys.ACTION_SPACE.value] = action_values
model_metadata[ModelMetadataKeys.ACTION_SPACE_TYPE.value] = action_space_type
model_metadata[ModelMetadataKeys.LIDAR_CONFIG.value] = lidar_config
return model_metadata
except ValueError as ex:
raise ValueError('model_metadata ValueError: {}'.format(ex))
except Exception as ex:
raise Exception('Model metadata does not exist: {}'.format(ex))