in deepracer_systems_pkg/deepracer_systems_pkg/software_update_module/software_update_node.py [0:0]
def usb_update_scan(self, keyworded_args):
"""Main function to scan for packages in the USB, verify the packages, decrypt
and install packages, update power LED status during the process.
Args:
keyworded_args (dict): Keyworded arguments passed to the function while scheduling.
"""
self.led_blink_request.led_index = constants.LEDIndex.POWER_LED
self.led_blink_request.color1 = constants.LEDColor.BLUE
self.led_blink_request.color2 = constants.LEDColor.NO_COLOR
self.led_blink_request.delay = 0.2
self.led_blink_service.call(self.led_blink_request)
base_path = keyworded_args.get("path", "")
name = keyworded_args.get("name", "")
node_name = keyworded_args.get("node_name", None)
encrypted_list = list()
# Find potential packages.
self.get_logger().info("Scanning for update packages...")
search_path = os.path.join(base_path, name, "*.deb.gpg")
for package_name in glob.glob(search_path):
self.get_logger().info(f" verifying {package_name}...")
if not software_update_utils.verify_package(package_name):
self.get_logger().info(" - failed to validate the package, ignoring.")
continue
encrypted_list.append(package_name)
self.get_logger().info(" + valid update package.")
if len(encrypted_list) == 0:
self.get_logger().info(f"No valid packages found in: {search_path}")
else:
decrypted_list = list()
self.get_logger().info("Decrypting:")
for encrypted_name in encrypted_list:
# Remove .gpg part.
decrypted_name = os.path.splitext(encrypted_name)[0]
# Remove the path.
decrypted_name = os.path.basename(decrypted_name)
# Add temporary path.
decrypted_name = os.path.join(constants.TEMP_DIRECTORY, decrypted_name)
self.get_logger().info(f" {decrypted_name}")
if not software_update_utils.decrypt_package(encrypted_name, decrypted_name):
self.get_logger().info(" * failed to decrypt the package.")
continue
decrypted_list.append(decrypted_name)
self.get_logger().info(" * successfully decrypted.")
if len(decrypted_list) == 0:
self.get_logger().info("No successfully decrypted packages.")
else:
self.get_logger().info("Installing packages:")
for decrypted_name in decrypted_list:
# Install the package.
self.get_logger().info(f" {decrypted_name}")
if not software_update_utils.install_debian(decrypted_name):
self.get_logger().info(" * failed to install.")
else:
self.get_logger().info(" * successfully installed.")
# Remove the temporary package file.
os.remove(decrypted_name)
# Unmount the media.
if node_name is not None:
mount_point_mgr_request = USBMountPointManagerSrv.Request()
mount_point_mgr_request.node_name = node_name
mount_point_mgr_request.action = 0
self.usb_mount_point_manager_client.call_async(mount_point_mgr_request)
self.led_solid_request.led_index = constants.LEDIndex.POWER_LED
self.led_solid_request.color = constants.LEDColor.BLUE
self.led_solid_request.hold = 0.0
self.led_solid_service.call(self.led_solid_request)