in pathology/transformation_pipeline/ingestion_lib/dicom_gen/wsi_to_dicom/metadata_storage_client.py [0:0]
def update_metadata(self):
"""Checks if cache is out of date.
Updates cache if necessary.
Raises:
MetadataDownloadExceptionError
"""
metadata_files_found = []
try:
storage_client = cloud_storage.Client()
metadata_bucket = cloud_storage.bucket.Bucket(
client=storage_client, name=self._metadata_ingest_storage_bucket
)
metadata_blobs = []
bucket_uri = f'gs://{self._metadata_ingest_storage_bucket}'
cloud_logging_client.info(
'Checking for new wsi-slide metadata.',
{'metadata_bucket': bucket_uri},
)
for blob in storage_client.list_blobs(metadata_bucket):
name = blob.name
upper_name = name.upper()
if upper_name.endswith('.CSV') or is_schema(upper_name):
md5_hash = blob.md5_hash
size = blob.size
create_time = blob.time_created.timestamp()
metadata_blobs.append(
MetadataBlob(name, md5_hash, size, create_time, '')
)
metadata_files_found.append(name)
except google.api_core.exceptions.NotFound as exp:
msg = (
f'Error querying {self._metadata_ingest_storage_bucket} '
' for csv metadata.'
)
cloud_logging_client.error(
msg,
{
'Metadata_storage_bucket': self._metadata_ingest_storage_bucket,
'metadata_files_found': str(metadata_files_found),
},
exp,
)
raise MetadataDownloadExceptionError(msg) from exp
metadata_blobs = sorted(
metadata_blobs, key=lambda x: x.create_time, reverse=True
)
if not self._has_metadata_changed(metadata_blobs):
cloud_logging_client.info(
'Metadata unchanged. Using cached files.',
{'metadata_files_found': str(metadata_files_found)},
)
else:
cloud_logging_client.info(
'Metadata changed.',
{'metadata_files_found': str(metadata_files_found)},
)
if self._working_root_metadata_dir is not None:
self._working_root_metadata_dir.cleanup()
self._working_root_metadata_dir = tempfile.TemporaryDirectory('metadata')
start_time = time.time()
downloaded_metadata_list = [
_download_blob(
storage_client,
self._working_root_metadata_dir.name,
self._metadata_ingest_storage_bucket,
blob,
)
for blob in metadata_blobs
]
cloud_logging_client.info(
'Downloaded metadata',
{
'metadata_file_list': str(downloaded_metadata_list),
'download_time_sec': time.time() - start_time,
},
)
self._csv_metadata_cache = metadata_blobs