in deepracer_systems_pkg/deepracer_systems_pkg/model_loader_module/model_loader_node.py [0:0]
def transfer_models(self, keyworded_args):
"""Main function to identify, extract and copy files from directory path
passed as parameter to the /opt/aws/deeprace/artifacts folder.
Args:
keyworded_args (dict): Keyworded arguments passed to the function while scheduling.
"""
self.call_blink_led_service()
base_path = keyworded_args.get("path", "")
name = keyworded_args.get("name", "")
node_name = keyworded_args.get("node_name", None)
self.get_logger().info("Reading the source directory...")
# Get the list of archives from the source directory.
search_path = os.path.join(base_path, name)
list_of_archives = self.get_list_of_archives(search_path)
self.models_in_progress = dict()
# Remove possible old remaining directories.
for old_temp_directory in glob.glob(os.path.join(constants.TEMP_DIRECTORY,
f"{model_loader_config.MODEL_TEMP_LEAF_DIRECTORY}-*")):
file_system_utils.remove_dir_tree(old_temp_directory)
# Remove all installed models if requested.
if self.enable_model_wipe:
self.wipe_existing_models()
source_count = len(list_of_archives)
if source_count == 0:
self.get_logger().info("No new models to install detected...")
else:
self.get_logger().info(f"Processing {source_count} potential model(s)...")
for archive_name in list_of_archives:
self.get_logger().info(f" * processing {archive_name}...")
# Determine the model name.
model_name = archive_name
dot_pos = model_name.find(".")
if dot_pos != -1:
model_name = model_name[:dot_pos]
# Create the temp directory.
model_temp_directory = os.path.join(constants.TEMP_DIRECTORY,
f"{model_loader_config.MODEL_TEMP_LEAF_DIRECTORY}-{model_name}")
if os.path.isdir(model_temp_directory):
self.get_logger().info(f" ! ignoring model with duplicate name: {model_name}")
continue
if not file_system_utils.create_dir(model_temp_directory):
continue
# Extract the archive.
archive_path = os.path.join(search_path, archive_name)
if not self.extract_archive(archive_path, model_temp_directory):
file_system_utils.remove_dir_tree(model_temp_directory)
continue
# Get the list of models.
model_list = self.get_model_list(model_temp_directory)
if len(model_list) == 0:
file_system_utils.remove_dir_tree(model_temp_directory)
self.get_logger().info(" ! no models found in the archive, ignoring")
continue
if len(model_list) > 1:
file_system_utils.remove_dir_tree(model_temp_directory)
self.get_logger().info(" ! unexpected: more than one models found in the archive,"
" ignoring all of them")
continue
# Golden model?
if model_loader_config.ENABLE_GOLDEN_MODEL \
and (model_name == model_loader_config.GOLDEN_MODEL_SOURCE_NAME):
self.get_logger().info(" golden model detected")
model_name = model_loader_config.GOLDEN_MODEL_SOURCE_NAME
# Intel model optimizer does not handle spaces in the path correctly, replace with underscores.
if model_loader_config.REPLACE_MODEL_NAMESPACES:
model_name = model_name.replace(" ", "_")
# Extract model info.
model_file_path, model_checksum = model_list[0]
model_install_directory = os.path.join(model_loader_config.MODEL_INSTALL_ROOT_DIRECTORY, model_name)
checksum_path = os.path.join(model_install_directory, model_loader_config.MODEL_CHECKSUM_FILE)
# Verify the checksum.
if model_checksum == file_system_utils.read_line(checksum_path).strip():
self.get_logger().info(" model already installed, ignoring")
file_system_utils.remove_dir_tree(model_temp_directory)
continue
# Add to the dictionary to be processed later.
self.get_logger().info(" scheduling for installation")
self.models_in_progress[model_name] = model_install_state.ModelInstallState(model_temp_directory,
model_file_path,
model_checksum,
model_install_directory,
self.model_optimizer_client,
self.get_logger())
# Unmount the media.
if node_name is not None:
mount_point_mgr_request = USBMountPointManagerSrv.Request()
mount_point_mgr_request.node_name = node_name
mount_point_mgr_request.action = 0
self.call_solid_led_service()