in deepracer_systems_pkg/deepracer_systems_pkg/software_update_module/software_update_node.py [0:0]
def execute_update(self, keyworded_args):
"""Main function to execute the software update process triggered from console. It commits
the udpated packages found, verify the package installation, publish update percentage
post installation, update power LED status during the process and reboot the device
after installation process is completed.
Args:
keyworded_args (dict): Keyworded arguments passed to the function while scheduling.
"""
self.get_logger().info("Starting software update...")
self.publish_pct_timer = self.create_timer(2.0, self.publish_update_pct)
with utility.AutoLock(self.state_guard):
# Ignore if already up to date.
if self.update_state == software_update_config.SoftwareUpdateState.UP_TO_DATE:
self.get_logger().info(f"Software is up to date, ignoring: {self.update_state}")
return
# Ignore if in progress.
if self.update_state == software_update_config.SoftwareUpdateState.UPDATE_IN_PROGRESS:
self.get_logger().info("Software update is already in progress, ignoring.")
return
# Mark as in progress.
self.update_state = software_update_config.SoftwareUpdateState.UPDATE_IN_PROGRESS
self.led_blink_request.led_index = constants.LEDIndex.POWER_LED
self.led_blink_request.color1 = constants.LEDColor.BLUE
self.led_blink_request.color2 = constants.LEDColor.NO_COLOR
self.led_blink_request.delay = 0.2
self.led_blink_service.call(self.led_blink_request)
try:
for package in self.update_list:
self.get_logger().info(f"** Installing Software update for {package.name}")
package.mark_install()
commit_response = self.cache.commit(fetch_progress.FetchProgress(self.pct_dict_db,
self.get_logger()),
install_progress.InstallProgress(self.pct_dict_db,
self.get_logger()))
self.get_logger().info(f"** Apt cache committed with latest packages: {commit_response}")
retry_verification = 0
installation_successful = False
while retry_verification < software_update_config.MAX_UPDATE_VERIFICATION_RETRY_COUNT:
if self.verify_software_update():
self.get_logger().info("Software update verified")
installation_successful = True
# Write to software update status as atleast one
# update has completed successfully.
with open(constants.SOFTWARE_UPDATE_STATUS_PATH, "w") \
as software_update_status_file:
software_update_status = {"update_completed": True}
json.dump(software_update_status, software_update_status_file)
self.get_logger().info("Written to software update status file.")
break
else:
self.get_logger().error("Software update could not be verified. "
f"Retrying {retry_verification+1}/{5}..")
retry_verification += 1
time.sleep(5)
if not installation_successful:
raise Exception("Failed to install packages")
# Update is only 100% after the commit call exits
# Set the state to complete
self.publish_pct_timer.cancel()
self.pct_dict_db.put({software_update_config.PROG_STATE_KEY:
software_update_config.PROGRESS_STATES[5],
software_update_config.PROG_PCT_KEY: 100.0})
self.publish_update_pct()
with utility.AutoLock(self.state_guard):
self.update_state = software_update_config.SoftwareUpdateState.UP_TO_DATE
self.get_logger().info("Software update complete.")
except Exception as ex:
self.led_solid_request.led_index = constants.LEDIndex.POWER_LED
self.led_solid_request.color = constants.LEDColor.RED
self.led_solid_request.hold = 2.0
self.led_solid_service.call(self.led_solid_request)
with utility.AutoLock(self.state_guard):
self.update_state = software_update_config.SoftwareUpdateState.UPDATE_AVAILABLE
self.get_logger().info(f"Software update failed: {ex}")
self.pct_dict_db.put({software_update_config.PROG_STATE_KEY:
software_update_config.PROGRESS_STATES[0],
software_update_config.PROG_PCT_KEY: 0.0})
self.publish_update_pct()
#
# Not rebooting the software update is dangerous and leave it in unstable state
# The parameter passed to the software update is to reboot after some sleep time.
# This is required so that the webserver can return 100% compeletion to the client
# Before it reboots itself.
#
reboot_after = keyworded_args.get("reboot_after", 0)
self.get_logger().info(f"Reboot after: {reboot_after} seconds")
time.sleep(reboot_after)
self.led_solid_request.led_index = constants.LEDIndex.POWER_LED
self.led_solid_request.color = constants.LEDColor.NO_COLOR
self.led_solid_request.hold = 0.0
self.led_solid_service.call(self.led_solid_request)
self.get_logger().info("Rebooting...")
os.system("reboot")