in assets/model_monitoring/components/src/shared_utilities/store_url.py [0:0]
def any_files(self, relative_path_pattern: str,
container_client: Union[FileSystemClient, ContainerClient, None] = None) -> bool:
"""Check if file matching the `relative_path_pattern` exists, wildcard supported."""
def file_name_match(file_name: str, pattern: str) -> bool:
# fnmatch will match /a/b.txt with /*.txt, but we want it to return False
# so split the path and pattern and then match with fnmatch within each section
path_sections = file_name.split("/")
pattern_sections = pattern.split("/")
if len(path_sections) != len(pattern_sections):
return False
return all(fnmatch.fnmatch(file_name, pattern)
for file_name, pattern in zip(path_sections, pattern_sections))
def any_files(file_names: list, pattern):
return any(file_name_match(file_name, pattern) for file_name in file_names)
base_path = self._base_url.replace('\\', '/') if self.is_local_path() else self.path
full_path_pattern = f"{base_path.rstrip('/')}/{relative_path_pattern.strip('/')}"
# find the non-wildcard part of the path
pattern_sections = full_path_pattern.split("/")
for idx in range(len(pattern_sections)):
if "*" in pattern_sections[idx] or '?' in pattern_sections[idx]:
break
non_wildcard_path = ("/".join(pattern_sections[:idx]) + "/").lstrip("/") # lstrip to handle idx == 0
path_pattern = "/".join(pattern_sections[idx:])
# match the wildcard part of the path
container_client = container_client or self.get_container_client()
if not container_client: # local
return any(glob.iglob(full_path_pattern))
try:
if isinstance(container_client, FileSystemClient): # gen2
if container_client.get_directory_client(non_wildcard_path).exists():
paths = container_client.get_paths(non_wildcard_path, True)
file_names = [path.name[len(non_wildcard_path):] for path in paths if not path.is_directory]
else:
return False
else: # blob
blobs = container_client.list_blobs(name_starts_with=non_wildcard_path)
file_names = [blob.name[len(non_wildcard_path):] for blob in blobs]
except HttpResponseError as hre:
if hre.status_code == 403:
error_message = self._get_no_permission_message()
raise InvalidInputError(error_message)
else:
raise hre
return any_files(file_names, path_pattern)