in samcli/lib/utils/file_observer.py [0:0]
def on_change(self, event: FileSystemEvent) -> None:
"""
It got executed once there is a change in one of the paths that watchdog is observing.
This method will check if any of the input paths is really changed, and based on that it will
invoke the input on_change function with the changed paths
Parameters
----------
event: watchdog.events.FileSystemEvent
Determines that there is a change happened to some file/dir in the observed paths
"""
with self._watch_lock:
if event.event_type == EVENT_TYPE_OPENED:
LOG.debug("Ignoring file system OPENED event")
return
LOG.debug("a %s change got detected in path %s", event.event_type, event.src_path)
for group, _observed_paths in self._observed_paths_per_group.items():
observed_paths = None
if event.event_type == EVENT_TYPE_DELETED:
observed_paths = [
path
for path in _observed_paths
if path == event.src_path
or path in self._watch_dog_observed_paths.get(f"{event.src_path!r}_False", [])
]
elif isinstance(event.src_path, str):
observed_paths = [path for path in _observed_paths if event.src_path.startswith(path)]
if not observed_paths:
continue
LOG.debug("affected paths of this change %s", observed_paths)
changed_paths = []
for path in observed_paths:
path_obj = Path(path)
# The path got deleted
if not path_obj.exists():
_observed_paths.pop(path, None)
changed_paths += [path]
else:
new_checksum = calculate_checksum(path)
if new_checksum and new_checksum != _observed_paths.get(path, None):
changed_paths += [path]
_observed_paths[path] = new_checksum
else:
LOG.debug("the path %s content does not change", path)
if changed_paths:
self._observed_groups_handlers[group](changed_paths)