def transfer_models()

in deepracer_systems_pkg/deepracer_systems_pkg/model_loader_module/model_loader_node.py [0:0]


    def transfer_models(self, keyworded_args):
        """Main function to identify, extract and copy files from directory path
           passed as parameter to the /opt/aws/deeprace/artifacts folder.

        Args:
            keyworded_args (dict): Keyworded arguments passed to the function while scheduling.
        """
        self.call_blink_led_service()
        base_path = keyworded_args.get("path", "")
        name = keyworded_args.get("name", "")
        node_name = keyworded_args.get("node_name", None)

        self.get_logger().info("Reading the source directory...")

        # Get the list of archives from the source directory.
        search_path = os.path.join(base_path, name)
        list_of_archives = self.get_list_of_archives(search_path)

        self.models_in_progress = dict()

        # Remove possible old remaining directories.
        for old_temp_directory in glob.glob(os.path.join(constants.TEMP_DIRECTORY,
                                                         f"{model_loader_config.MODEL_TEMP_LEAF_DIRECTORY}-*")):
            file_system_utils.remove_dir_tree(old_temp_directory)

        # Remove all installed models if requested.
        if self.enable_model_wipe:
            self.wipe_existing_models()

        source_count = len(list_of_archives)
        if source_count == 0:
            self.get_logger().info("No new models to install detected...")
        else:
            self.get_logger().info(f"Processing {source_count} potential model(s)...")

        for archive_name in list_of_archives:
            self.get_logger().info(f"  * processing {archive_name}...")

            # Determine the model name.
            model_name = archive_name
            dot_pos = model_name.find(".")
            if dot_pos != -1:
                model_name = model_name[:dot_pos]

            # Create the temp directory.
            model_temp_directory = os.path.join(constants.TEMP_DIRECTORY,
                                                f"{model_loader_config.MODEL_TEMP_LEAF_DIRECTORY}-{model_name}")
            if os.path.isdir(model_temp_directory):
                self.get_logger().info(f"    ! ignoring model with duplicate name: {model_name}")
                continue

            if not file_system_utils.create_dir(model_temp_directory):
                continue

            # Extract the archive.
            archive_path = os.path.join(search_path, archive_name)
            if not self.extract_archive(archive_path, model_temp_directory):
                file_system_utils.remove_dir_tree(model_temp_directory)
                continue

            # Get the list of models.
            model_list = self.get_model_list(model_temp_directory)

            if len(model_list) == 0:
                file_system_utils.remove_dir_tree(model_temp_directory)
                self.get_logger().info("    ! no models found in the archive, ignoring")
                continue

            if len(model_list) > 1:
                file_system_utils.remove_dir_tree(model_temp_directory)
                self.get_logger().info("    ! unexpected: more than one models found in the archive,"
                                       " ignoring all of them")
                continue

            # Golden model?
            if model_loader_config.ENABLE_GOLDEN_MODEL \
               and (model_name == model_loader_config.GOLDEN_MODEL_SOURCE_NAME):
                self.get_logger().info("    golden model detected")
                model_name = model_loader_config.GOLDEN_MODEL_SOURCE_NAME

            # Intel model optimizer does not handle spaces in the path correctly, replace with underscores.
            if model_loader_config.REPLACE_MODEL_NAMESPACES:
                model_name = model_name.replace(" ", "_")

            # Extract model info.
            model_file_path, model_checksum = model_list[0]
            model_install_directory = os.path.join(model_loader_config.MODEL_INSTALL_ROOT_DIRECTORY, model_name)
            checksum_path = os.path.join(model_install_directory, model_loader_config.MODEL_CHECKSUM_FILE)

            # Verify the checksum.
            if model_checksum == file_system_utils.read_line(checksum_path).strip():
                self.get_logger().info("    model already installed, ignoring")
                file_system_utils.remove_dir_tree(model_temp_directory)
                continue

            # Add to the dictionary to be processed later.
            self.get_logger().info("    scheduling for installation")
            self.models_in_progress[model_name] = model_install_state.ModelInstallState(model_temp_directory,
                                                                                        model_file_path,
                                                                                        model_checksum,
                                                                                        model_install_directory,
                                                                                        self.model_optimizer_client,
                                                                                        self.get_logger())

        # Unmount the media.
        if node_name is not None:
            mount_point_mgr_request = USBMountPointManagerSrv.Request()
            mount_point_mgr_request.node_name = node_name
            mount_point_mgr_request.action = 0

        self.call_solid_led_service()