def usb_update_scan()

in deepracer_systems_pkg/deepracer_systems_pkg/software_update_module/software_update_node.py [0:0]


    def usb_update_scan(self, keyworded_args):
        """Main function to scan for packages in the USB, verify the packages, decrypt
           and install packages, update power LED status during the process.

        Args:
            keyworded_args (dict): Keyworded arguments passed to the function while scheduling.
        """

        self.led_blink_request.led_index = constants.LEDIndex.POWER_LED
        self.led_blink_request.color1 = constants.LEDColor.BLUE
        self.led_blink_request.color2 = constants.LEDColor.NO_COLOR
        self.led_blink_request.delay = 0.2
        self.led_blink_service.call(self.led_blink_request)

        base_path = keyworded_args.get("path", "")
        name = keyworded_args.get("name", "")
        node_name = keyworded_args.get("node_name", None)

        encrypted_list = list()

        # Find potential packages.
        self.get_logger().info("Scanning for update packages...")
        search_path = os.path.join(base_path, name, "*.deb.gpg")
        for package_name in glob.glob(search_path):
            self.get_logger().info(f"  verifying {package_name}...")
            if not software_update_utils.verify_package(package_name):
                self.get_logger().info("  - failed to validate the package, ignoring.")
                continue

            encrypted_list.append(package_name)
            self.get_logger().info("  + valid update package.")

        if len(encrypted_list) == 0:
            self.get_logger().info(f"No valid packages found in: {search_path}")

        else:
            decrypted_list = list()

            self.get_logger().info("Decrypting:")
            for encrypted_name in encrypted_list:

                # Remove .gpg part.
                decrypted_name = os.path.splitext(encrypted_name)[0]

                # Remove the path.
                decrypted_name = os.path.basename(decrypted_name)

                # Add temporary path.
                decrypted_name = os.path.join(constants.TEMP_DIRECTORY, decrypted_name)

                self.get_logger().info(f"  {decrypted_name}")
                if not software_update_utils.decrypt_package(encrypted_name, decrypted_name):
                    self.get_logger().info("  * failed to decrypt the package.")
                    continue

                decrypted_list.append(decrypted_name)
                self.get_logger().info("  * successfully decrypted.")

            if len(decrypted_list) == 0:
                self.get_logger().info("No successfully decrypted packages.")

            else:
                self.get_logger().info("Installing packages:")
                for decrypted_name in decrypted_list:

                    # Install the package.
                    self.get_logger().info(f"  {decrypted_name}")
                    if not software_update_utils.install_debian(decrypted_name):
                        self.get_logger().info("  * failed to install.")
                    else:
                        self.get_logger().info("  * successfully installed.")

                    # Remove the temporary package file.
                    os.remove(decrypted_name)

        # Unmount the media.
        if node_name is not None:
            mount_point_mgr_request = USBMountPointManagerSrv.Request()
            mount_point_mgr_request.node_name = node_name
            mount_point_mgr_request.action = 0
            self.usb_mount_point_manager_client.call_async(mount_point_mgr_request)

        self.led_solid_request.led_index = constants.LEDIndex.POWER_LED
        self.led_solid_request.color = constants.LEDColor.BLUE
        self.led_solid_request.hold = 0.0
        self.led_solid_service.call(self.led_solid_request)