def get_slide_metadata_from_csv()

in pathology/transformation_pipeline/ingestion_lib/dicom_gen/wsi_to_dicom/metadata_storage_client.py [0:0]


  def get_slide_metadata_from_csv(self, pk_value: str) -> pandas.DataFrame:
    """Returns metadata for a slide.

    Args:
      pk_value: Slide metadata primary key value.

    Returns:
      Pandas Dataframe

    Raises:
      MetadataNotFoundExceptionError: Unable to find metadata.
      MetadataDefinedOnMultipleRowError: Metadata defined on multiple rows.
    """
    tbl = self._slide_metadata_cache.get(pk_value)
    if tbl is not None:
      return tbl.copy()
    csv_found = False
    chunksize = 10**6
    for metadata in self._csv_metadata_cache:
      if metadata.filename.upper().endswith('.CSV'):
        csv_found = True
        with csv_util.read_csv(metadata.filename, chunksize) as csv_chunks:
          for df in csv_chunks:
            primary_key_column_name = (
                dicom_schema_util.find_data_frame_column_name(
                    df, ingest_flags.METADATA_PRIMARY_KEY_COLUMN_NAME_FLG.value
                )
            )
            if primary_key_column_name is None:
              cloud_logging_client.warning(
                  'CSV file does not contain metadata primary key column name;'
                  ' CSV file ignored.',
                  {
                      ingest_const.LogKeywords.FILENAME: metadata.filename,
                      ingest_const.LogKeywords.METADATA_PRIMARY_KEY_COLUMN_NAME: (
                          ingest_flags.METADATA_PRIMARY_KEY_COLUMN_NAME_FLG.value
                      ),
                  },
              )
              break
            searchdf = df.loc[df[primary_key_column_name] == pk_value]
            row, _ = searchdf.shape
            if row == 1:
              cloud_logging_client.info(
                  f'Primary key {pk_value} found in CSV metadata',
                  {ingest_const.LogKeywords.METADATA_PRIMARY_KEY: pk_value},
              )
              self._slide_metadata_cache[pk_value] = searchdf.copy()
              return searchdf
            elif row > 1:
              cloud_logging_client.error(
                  'Multiple primary keys found in metadata',
                  {ingest_const.LogKeywords.METADATA_PRIMARY_KEY: pk_value},
              )
              raise MetadataDefinedOnMultipleRowError(
                  pk_value, metadata.filename
              )

    if not csv_found:
      cloud_logging_client.error(
          'No CSV metadata found. Primary key not found in metadata',
          {ingest_const.LogKeywords.METADATA_PRIMARY_KEY: pk_value},
      )
      raise MetadataNotFoundExceptionError(
          'No CSV metadata found. Primary key is not in metadata'
      )
    raise MetadataNotFoundExceptionError(
        f'Primary key: {pk_value} is not in metadata'
    )