in collection_manager/collection_manager/services/CollectionWatcher.py [0:0]
def __init__(self,
collections_path: str,
granule_updated_callback: Callable[[str, Collection], Awaitable],
s3_bucket: Optional[str] = None,
collections_refresh_interval: float = 30):
if not os.path.isabs(collections_path):
raise RelativePathError("Collections config path must be an absolute path.")
self._collections_path = collections_path
self._granule_updated_callback = granule_updated_callback
self._collections_refresh_interval = collections_refresh_interval
self._collections_by_dir: Dict[str, Set[Collection]] = defaultdict(set)
self._observer = S3Observer(s3_bucket, initial_scan=True) if s3_bucket else Observer()
self._granule_watches = set()