def parse_model_metadata()

in src/markov/boto/s3/files/model_metadata.py [0:0]


    def parse_model_metadata(local_model_metadata_path):
        """parse model metadata give the local path

        Args:
            local_model_metadata_path (str): local model metadata string

        Returns:
            dict (str, obj): dictionary of all required information parsed out of model_metadata.json file

        """
        model_metadata = dict()
        try:
            with open(local_model_metadata_path, "r") as json_file:
                data = json.load(json_file)
                # simapp_version 2.0+ should contain version as key in
                # model_metadata.json
                if ModelMetadataKeys.ACTION_SPACE.value not in data:
                    raise ValueError("no action space defined")
                action_values = data[ModelMetadataKeys.ACTION_SPACE.value]
                if ModelMetadataKeys.VERSION.value in data:
                    simapp_version = float(data[ModelMetadataKeys.VERSION.value])
                    if simapp_version >= SIMAPP_VERSION_2:
                        sensor = data[ModelMetadataKeys.SENSOR.value]
                    else:
                        sensor = [Input.OBSERVATION.value]
                else:
                    if ModelMetadataKeys.SENSOR.value in data:
                        sensor = data[ModelMetadataKeys.SENSOR.value]
                        simapp_version = SIMAPP_VERSION_2
                    else:
                        sensor = [Input.OBSERVATION.value]
                        simapp_version = SIMAPP_VERSION_1

                # Model Metadata Sensor validation
                camera_sensors = [Input.OBSERVATION.value, Input.CAMERA.value, Input.LEFT_CAMERA.value,
                                  Input.STEREO.value]
                camera_sensor_type_count = sum([sensor_elem in camera_sensors for sensor_elem in sensor])
                if camera_sensor_type_count != 1:
                    raise ValueError("model_metadata MUST contain only 1 camera sensor: {}".format(sensor))

                lidar_sensors = [Input.LIDAR.value, Input.SECTOR_LIDAR.value, Input.DISCRETIZED_SECTOR_LIDAR.value]
                lidar_sensor_type_count = sum([sensor_elem in lidar_sensors for sensor_elem in sensor])
                if lidar_sensor_type_count > 1:
                    raise ValueError("model_metadata contains more than 1 lidar sensor: {}".format(sensor))

                num_lidar_sectors_key = ModelMetadataKeys.NUM_SECTORS.value
                num_values_per_sector_key = ModelMetadataKeys.NUM_VALUES_PER_SECTOR.value
                clipping_dist_key = ModelMetadataKeys.CLIPPING_DISTANCE.value

                lidar_config = {num_lidar_sectors_key: DiscretizedSectorLidarDefaults.NUMBER_OF_SECTORS,
                                num_values_per_sector_key: DiscretizedSectorLidarDefaults.NUM_VALUES_PER_SECTOR,
                                clipping_dist_key: DiscretizedSectorLidarDefaults.CLIPPING_DIST}
                if ModelMetadataKeys.LIDAR_CONFIG.value in data:
                    config = data[ModelMetadataKeys.LIDAR_CONFIG.value]
                    if num_lidar_sectors_key in config:
                        config[num_lidar_sectors_key] = int(config[num_lidar_sectors_key])
                        if config[num_lidar_sectors_key] <= 0 or \
                                LIDAR_360_DEGREE_SAMPLE % config[num_lidar_sectors_key] != 0:
                            err_msg_format = "{} % Number of sector ({}) must be 0!"
                            err_msg = err_msg_format.format(LIDAR_360_DEGREE_SAMPLE,
                                                            config[num_lidar_sectors_key])
                            raise ValueError(err_msg)
                        lidar_config[num_lidar_sectors_key] = config[num_lidar_sectors_key]
                    if num_values_per_sector_key in config:
                        config[num_values_per_sector_key] = int(config[num_values_per_sector_key])
                        if config[num_values_per_sector_key] < 1:
                            err_msg_format = "Number of values per sector ({}) cannot be smaller than 1."
                            err_msg = err_msg_format.format(config[num_values_per_sector_key])
                            raise ValueError(err_msg)
                        lidar_config[num_values_per_sector_key] = config[num_values_per_sector_key]
                    if clipping_dist_key in config:
                        if LIDAR_360_DEGREE_MIN_RANGE > config[clipping_dist_key] > LIDAR_360_DEGREE_MAX_RANGE:
                            err_msg_format = "Clipping distance ({}) must be between {} and {} inclusively."
                            err_msg = err_msg_format.format(config[num_values_per_sector_key],
                                                            LIDAR_360_DEGREE_MIN_RANGE,
                                                            LIDAR_360_DEGREE_MAX_RANGE)
                            raise ValueError(err_msg)
                        lidar_config[clipping_dist_key] = config[clipping_dist_key]

                if ModelMetadataKeys.NEURAL_NETWORK.value in data:
                    network = data[ModelMetadataKeys.NEURAL_NETWORK.value]
                else:
                    network = NeuralNetwork.DEEP_CONVOLUTIONAL_NETWORK_SHALLOW.value
                training_algorithm = TrainingAlgorithm.CLIPPED_PPO.value
                if ModelMetadataKeys.TRAINING_ALGORITHM.value in data:
                    data_training_algorithm = data[ModelMetadataKeys.TRAINING_ALGORITHM.value].lower().strip()
                    # Update the training algorithm value if its valid else log and exit
                    if TrainingAlgorithm.has_training_algorithm(data_training_algorithm):
                        training_algorithm = data_training_algorithm
                    else:
                        log_and_exit("Unknown training_algorithm found while parsing model_metadata. \
                            training_algorithm: {}".format(data_training_algorithm),
                                     SIMAPP_SIMULATION_WORKER_EXCEPTION,
                                     SIMAPP_EVENT_ERROR_CODE_500)
                action_space_type = ActionSpaceTypes.DISCRETE.value
                if ModelMetadataKeys.ACTION_SPACE_TYPE.value in data:
                    data_action_space_type = data[ModelMetadataKeys.ACTION_SPACE_TYPE.value].lower().strip()
                    # Update the training algorithm value if its valid else log and exit
                    if ActionSpaceTypes.has_action_space(data_action_space_type):
                        action_space_type = data_action_space_type
                    else:
                        log_and_exit("Unknown action_space_type found while parsing model_metadata. \
                            action_space_type: {}".format(data_action_space_type),
                                     SIMAPP_SIMULATION_WORKER_EXCEPTION,
                                     SIMAPP_EVENT_ERROR_CODE_500)

            LOG.info("Sensor list %s, network %s, simapp_version %s, training_algorithm %s, action_space_type %s lidar_config %s",
                     sensor, network,
                     simapp_version, training_algorithm, action_space_type, lidar_config)
            model_metadata[ModelMetadataKeys.SENSOR.value] = sensor
            model_metadata[ModelMetadataKeys.NEURAL_NETWORK.value] = network
            model_metadata[ModelMetadataKeys.VERSION.value] = simapp_version
            model_metadata[ModelMetadataKeys.TRAINING_ALGORITHM.value] = training_algorithm
            model_metadata[ModelMetadataKeys.ACTION_SPACE.value] = action_values
            model_metadata[ModelMetadataKeys.ACTION_SPACE_TYPE.value] = action_space_type
            model_metadata[ModelMetadataKeys.LIDAR_CONFIG.value] = lidar_config
            return model_metadata
        except ValueError as ex:
            raise ValueError('model_metadata ValueError: {}'.format(ex))
        except Exception as ex:
            raise Exception('Model metadata does not exist: {}'.format(ex))