in usb_monitor_pkg/usb_monitor_pkg/usb_monitor_node.py [0:0]
def process(self, filesystem, post_action=None):
"""Helper function to parse the contents of the USB drive and publish notification
messages for watched files/folders.
Args:
filesystem (str): Filesystem path where teh device is mounted.
post_action (function, optional): Any function that needs to be executed
while decrementing the reference for the
filesystem after completing processing the
file/folder.
Defaults to None.
Returns:
bool: True if successfully processed the filesystem else False.
"""
# Mount the filesystem and get the mount point.
mount_point = self.get_mount_point(filesystem, post_action)
if mount_point.name == "":
return False
# Check if any of the watched files are present.
for file_name, callback_name, verify_name_exists in self.subscribers:
full_name = os.path.join(mount_point.name, file_name)
if verify_name_exists and \
(not os.path.isdir(full_name)
and not os.path.isfile(full_name)):
self.get_logger().info("verify_name_exists : "
f"{full_name} "
f"{os.path.isdir(full_name)} "
f"{full_name}")
continue
mount_point.ref_inc()
notification_msg = USBFileSystemNotificationMsg()
notification_msg.path = mount_point.name
notification_msg.file_name = file_name
notification_msg.callback_name = callback_name
notification_msg.node_name = filesystem
self.get_logger().info("USB File system notification:"
f" {notification_msg.path}"
f" {notification_msg.file_name}"
f" {notification_msg.node_name}"
f" {notification_msg.callback_name}")
self.usb_file_system_notification_publisher.publish(notification_msg)
# Dereference mount point.
mount_point.ref_dec()
return True