def update_metadata()

in pathology/transformation_pipeline/ingestion_lib/dicom_gen/wsi_to_dicom/metadata_storage_client.py [0:0]


  def update_metadata(self):
    """Checks if cache is out of date.

    Updates cache if necessary.

    Raises:
      MetadataDownloadExceptionError
    """
    metadata_files_found = []
    try:
      storage_client = cloud_storage.Client()
      metadata_bucket = cloud_storage.bucket.Bucket(
          client=storage_client, name=self._metadata_ingest_storage_bucket
      )
      metadata_blobs = []
      bucket_uri = f'gs://{self._metadata_ingest_storage_bucket}'
      cloud_logging_client.info(
          'Checking for new wsi-slide metadata.',
          {'metadata_bucket': bucket_uri},
      )

      for blob in storage_client.list_blobs(metadata_bucket):
        name = blob.name
        upper_name = name.upper()
        if upper_name.endswith('.CSV') or is_schema(upper_name):
          md5_hash = blob.md5_hash
          size = blob.size
          create_time = blob.time_created.timestamp()
          metadata_blobs.append(
              MetadataBlob(name, md5_hash, size, create_time, '')
          )
          metadata_files_found.append(name)
    except google.api_core.exceptions.NotFound as exp:
      msg = (
          f'Error querying {self._metadata_ingest_storage_bucket} '
          ' for csv metadata.'
      )
      cloud_logging_client.error(
          msg,
          {
              'Metadata_storage_bucket': self._metadata_ingest_storage_bucket,
              'metadata_files_found': str(metadata_files_found),
          },
          exp,
      )
      raise MetadataDownloadExceptionError(msg) from exp

    metadata_blobs = sorted(
        metadata_blobs, key=lambda x: x.create_time, reverse=True
    )
    if not self._has_metadata_changed(metadata_blobs):
      cloud_logging_client.info(
          'Metadata unchanged. Using cached files.',
          {'metadata_files_found': str(metadata_files_found)},
      )
    else:
      cloud_logging_client.info(
          'Metadata changed.',
          {'metadata_files_found': str(metadata_files_found)},
      )
      if self._working_root_metadata_dir is not None:
        self._working_root_metadata_dir.cleanup()
      self._working_root_metadata_dir = tempfile.TemporaryDirectory('metadata')
      start_time = time.time()
      downloaded_metadata_list = [
          _download_blob(
              storage_client,
              self._working_root_metadata_dir.name,
              self._metadata_ingest_storage_bucket,
              blob,
          )
          for blob in metadata_blobs
      ]
      cloud_logging_client.info(
          'Downloaded metadata',
          {
              'metadata_file_list': str(downloaded_metadata_list),
              'download_time_sec': time.time() - start_time,
          },
      )
      self._csv_metadata_cache = metadata_blobs