in deepracer_systems_pkg/deepracer_systems_pkg/software_update_module/software_update_node.py [0:0]
def __init__(self):
"""Create a SoftwareUpdateNode.
"""
super().__init__("software_update_process")
self.get_logger().info("software_update_process started")
# Scheduler to queue the function calls and run them in a separate thread.
self.scheduler = scheduler.Scheduler(self.get_logger())
# Threading lock object to safely update the update_state variable.
self.state_guard = threading.Lock()
self.update_state = software_update_config.SoftwareUpdateState.UPDATE_UNKNOWN
# Threading Event object to indicate if the software update check is completed.
self.check_complete = threading.Event()
# Flag to indicate software update check in progress.
self.check_in_progress = False
# List of packages that have updates available.
self.update_list = list()
# Flag to identify if the network is connected.
self.is_network_connected = False
# Flag to identify if software update check has to performed
# again when network is connected.
self.reschedule_software_update_check = False
# The apt cache object.
self.cache = apt.Cache()
# Double buffer object containing the current update percentage and progress state.
self.pct_dict_db = utility.DoubleBuffer(clear_data_on_get=False)
self.pct_dict_db.put({software_update_config.PROG_STATE_KEY:
software_update_config.PROGRESS_STATES[0],
software_update_config.PROG_PCT_KEY: 0.0})
# Timer to periodically check for software update.
if software_update_config.ENABLE_PERIODIC_SOFTWARE_UPDATE:
self.get_logger().info("Schedule the software update check every "
f"{software_update_config.SOFTWARE_UPDATE_PERIOD_IN_SECONDS} "
"seconds.")
self.schedule_update_check_cb = ReentrantCallbackGroup()
self.update_check_timer = \
self.create_timer(software_update_config.SOFTWARE_UPDATE_PERIOD_IN_SECONDS,
self.schedule_update_check,
callback_group=self.schedule_update_check_cb)
# Publisher that sends software update pct and status.
# Guaranteed delivery is needed to send messages to late-joining subscription.
qos_profile = QoSProfile(depth=1)
qos_profile.history = QoSHistoryPolicy.KEEP_LAST
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
self.pct_obj = SoftwareUpdatePctMsg()
self.software_update_pct_pub_cb = ReentrantCallbackGroup()
self.software_update_pct_publisher = \
self.create_publisher(SoftwareUpdatePctMsg,
software_update_config.SOFTWARE_UPDATE_PCT_TOPIC_NAME,
qos_profile=qos_profile,
callback_group=self.software_update_pct_pub_cb)
# Service to check if there is a software update available.
self.software_update_check_cb_group = ReentrantCallbackGroup()
self.software_update_check_service = \
self.create_service(SoftwareUpdateCheckSrv,
software_update_config.SOFTWARE_UPDATE_CHECK_SERVICE_NAME,
self.software_update_check,
callback_group=self.software_update_check_cb_group)
# Service to execute the software update and install latest packages.
self.begin_update_service_cb_group = ReentrantCallbackGroup()
self.begin_update_service = \
self.create_service(BeginSoftwareUpdateSrv,
software_update_config.BEGIN_SOFTWARE_UPDATE_SERVICE_NAME,
self.begin_update,
callback_group=self.begin_update_service_cb_group)
# Service to get the current software update state.
self.software_update_state_service_cb_group = ReentrantCallbackGroup()
self.software_update_state_service = \
self.create_service(SoftwareUpdateStateSrv,
software_update_config.SOFTWARE_UPDATE_STATE_SERVICE_NAME,
self.software_update_state,
callback_group=self.software_update_state_service_cb_group)
self.nw_con_status_cb_group = ReentrantCallbackGroup()
self.network_connection_status_sub = \
self.create_subscription(NetworkConnectionStatus,
software_update_config.NETWORK_CONNECTION_STATUS_TOPIC_NAME,
self.network_connection_status_cb,
10,
callback_group=self.nw_con_status_cb_group)
# Clients to Status LED services that are called to indicate progress/success/failure
# status while loading model.
self.led_cb_group = MutuallyExclusiveCallbackGroup()
self.led_blink_service = self.create_client(SetStatusLedBlinkSrv,
constants.LED_BLINK_SERVICE_NAME,
callback_group=self.led_cb_group)
self.led_solid_service = self.create_client(SetStatusLedSolidSrv,
constants.LED_SOLID_SERVICE_NAME,
callback_group=self.led_cb_group)
while not self.led_blink_service.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Led blink service not available, waiting again...")
while not self.led_solid_service.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Led solid service not available, waiting again...")
self.led_blink_request = SetStatusLedBlinkSrv.Request()
self.led_solid_request = SetStatusLedSolidSrv.Request()
# Client to USB File system subscription service that allows the node to install
# signed software packages(*.deb.gpg) present in the USB. The usb_monitor_node
# will trigger notification if it finds the "update" folder from the watchlist
# in the USB drive.
self.usb_sub_cb_group = ReentrantCallbackGroup()
self.usb_file_system_subscribe_client = self.create_client(USBFileSystemSubscribeSrv,
constants.USB_FILE_SYSTEM_SUBSCRIBE_SERVICE_NAME,
callback_group=self.usb_sub_cb_group)
while not self.usb_file_system_subscribe_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("File System Subscribe not available, waiting again...")
# Client to USB Mount point manager service to indicate that the usb_monitor_node can safely
# decrement the counter for the mount point once the action function for the "update" folder
# file being watched by software_update_node is succesfully executed.
self.usb_mpm_cb_group = ReentrantCallbackGroup()
self.usb_mount_point_manager_client = self.create_client(USBMountPointManagerSrv,
constants.USB_MOUNT_POINT_MANAGER_SERVICE_NAME,
callback_group=self.usb_mpm_cb_group)
while not self.usb_mount_point_manager_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("USB mount point manager service not available, waiting again...")
# Subscriber to USB File system notification publisher to recieve the broadcasted messages
# with file/folder details, whenever a watched file is identified in the USB connected.
self.usb_notif_cb_group = ReentrantCallbackGroup()
self.usb_file_system_notification_sub = self.create_subscription(USBFileSystemNotificationMsg,
constants.USB_FILE_SYSTEM_NOTIFICATION_TOPIC,
self.usb_file_system_notification_cb,
10,
callback_group=self.usb_notif_cb_group)
# Add the "update" folder to the watchlist.
usb_file_system_subscribe_request = USBFileSystemSubscribeSrv.Request()
usb_file_system_subscribe_request.file_name = software_update_config.UPDATE_SOURCE_DIRECTORY
usb_file_system_subscribe_request.callback_name = software_update_config.SCHEDULE_USB_UPDATE_SCAN_CB
usb_file_system_subscribe_request.verify_name_exists = True
self.usb_file_system_subscribe_client.call_async(usb_file_system_subscribe_request)
# Set the power led to blue.
self.led_solid_request.led_index = constants.LEDIndex.POWER_LED
self.led_solid_request.color = constants.LEDColor.BLUE
self.led_solid_request.hold = 0.0
self.led_solid_service.call_async(self.led_solid_request)
# Heartbeat timer.
self.timer_count = 0
self.timer = self.create_timer(5.0, self.timer_callback)
# Schedule update check.
if software_update_config.ENABLE_PERIODIC_SOFTWARE_UPDATE:
self.schedule_update_check()
self.get_logger().info("Software Update node successfully created")