pathology/transformation_pipeline/gke_main.py (156 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """DPAS DICOM transformation pipeline. Overview: - Pipeline hosted in GKE - Sync polls cloud storage pub/sub subscription or ingests specified list of files. - Downloads image to container. - Converts downloaded image to DICOM using specified conversion tooling. - Performs hash based duplicate testing DICOMs for duplication. - Uploads generated non duplicated DICOM to DICOM Store. - Moves image trigging pipeline from input to output/success or output/failure bucket. - Published ingest complete pub/sub msg. - All Operations logged in cloud operations. """ import threading from typing import Mapping, Optional from absl import app from pathology.shared_libs.build_version import build_version from pathology.shared_libs.logging_lib import cloud_logging_client from pathology.transformation_pipeline import ingest_flags from pathology.transformation_pipeline.ingestion_lib import abstract_pubsub_message_handler from pathology.transformation_pipeline.ingestion_lib import polling_client from pathology.transformation_pipeline.ingestion_lib import redis_client from pathology.transformation_pipeline.ingestion_lib.dicom_gen import ingestion_dicom_store_urls from pathology.transformation_pipeline.ingestion_lib.dicom_gen.ai_to_dicom import png_to_dicom from pathology.transformation_pipeline.ingestion_lib.dicom_gen.wsi_to_dicom import ingest_dicom_store_handler from pathology.transformation_pipeline.ingestion_lib.dicom_gen.wsi_to_dicom import ingest_gcs_handler from pathology.transformation_pipeline.ingestion_lib.dicom_gen.wsi_to_dicom import ingestion_complete_oof_trigger_pubsub_topic from pathology.transformation_pipeline.ingestion_lib.dicom_gen.wsi_to_dicom import metadata_storage_client _LICENSE_FILE_PATH = './thirdparty_licenses.txt' class LicenseMissingError(Exception): pass class RedisConnectionFailedError(Exception): pass _running_polling_client: Optional[polling_client.PollingClient] = None _running_polling_client_lock = threading.Lock() def stop_polling_client() -> None: with _running_polling_client_lock: if _running_polling_client is not None: _running_polling_client.stop_polling_client() def _set_running_polling_client( client: Optional[polling_client.PollingClient], ) -> None: with _running_polling_client_lock: global _running_polling_client _running_polling_client = client def _run_polling_client_ingest( project: str, sub_to_handler: Mapping[ str, abstract_pubsub_message_handler.AbstractPubSubMsgHandler ], ) -> None: """Runs polling client ingestion. Args: project: GCP project id for Pub/Sub subscriptions. sub_to_handler: Map of Pub/Sub subscription id to message handler. """ # Wrap the polling client in a 'with' block to automatically call close() to # close the underlying gRPC channel when done. with polling_client.PollingClient(project, sub_to_handler) as client: _set_running_polling_client(client) try: client.run() finally: _set_running_polling_client(None) def _run_oof_ingest() -> None: """Runs OOF ingestion.""" if not ingest_flags.OOF_SUBSCRIPTION_FLG.value: oof_msg = ( 'Missing OOF subscription for OOF transformation pipeline. Either ' '--oof_subscription flag or OOF_SUBSCRIPTION env variable must be set.' ) cloud_logging_client.critical(oof_msg) raise ValueError(oof_msg) ingest_handler = png_to_dicom.AiPngtoDicomSecondaryCapture( dicom_store_web_path=ingestion_dicom_store_urls.get_main_dicom_web_url(), use_oof_legacy_pipeline=ingest_flags.OOF_LEGACY_INFERENCE_PIPELINE_FLG.value, ) sub_to_handler = {ingest_flags.OOF_SUBSCRIPTION_FLG.value: ingest_handler} _run_polling_client_ingest(ingest_flags.PROJECT_ID_FLG.value, sub_to_handler) def _get_oof_trigger_config() -> ( Optional[ingest_gcs_handler.InferenceTriggerConfig] ): oof_dicom_store_url = ingestion_dicom_store_urls.get_oof_dicom_web_url() if not oof_dicom_store_url: return None return ingest_gcs_handler.InferenceTriggerConfig( dicom_store_web_path=oof_dicom_store_url, pubsub_topic=ingestion_complete_oof_trigger_pubsub_topic.get_oof_trigger_pubsub_topic(), use_oof_legacy_pipeline=ingest_flags.OOF_LEGACY_INFERENCE_PIPELINE_FLG.value, inference_config_path=ingest_flags.OOF_INFERENCE_CONFIG_PATH_FLG.value, ) def _run_default_ingest() -> None: """Runs default ingestion. If --dicom_store_subscription is set, runs round robin with GCS and DICOM store polling. """ if not ingest_flags.GCS_SUBSCRIPTION_FLG.value: gcs_msg = ( 'Missing GCS subscription for default transformation pipeline. Either ' '--gcs_subscription flag or GCS_SUBSCRIPTION env variable must be set.' ) cloud_logging_client.critical(gcs_msg) raise ValueError(gcs_msg) sub_to_handler = {} metadata_client = metadata_storage_client.MetadataStorageClient() gcs_handler = ingest_gcs_handler.IngestGcsPubSubHandler( ingest_succeeded_uri=ingest_flags.INGEST_SUCCEEDED_URI_FLG.value, ingest_failed_uri=ingest_flags.INGEST_FAILED_URI_FLG.value, dicom_store_web_path=ingestion_dicom_store_urls.get_main_dicom_web_url(), ingest_ignore_root_dirs=frozenset( ingest_flags.INGEST_IGNORE_ROOT_DIR_FLG.value ), metadata_client=metadata_client, oof_trigger_config=_get_oof_trigger_config(), ) sub_to_handler[ingest_flags.GCS_SUBSCRIPTION_FLG.value] = gcs_handler if ingest_flags.DICOM_STORE_SUBSCRIPTION_FLG.value: dicom_store_handler = ( ingest_dicom_store_handler.IngestDicomStorePubSubHandler( metadata_client ) ) sub_to_handler[ingest_flags.DICOM_STORE_SUBSCRIPTION_FLG.value] = ( dicom_store_handler ) else: cloud_logging_client.info( 'Running GCS ingest only. To include DICOM store, either ' '--dicom_store_subscription flag or DICOM_STORE_SUBSCRIPTION env ' 'variable must be set. ' ) _run_polling_client_ingest(ingest_flags.PROJECT_ID_FLG.value, sub_to_handler) def main(unused_argv): copyright_notification = ( 'Copyright 2021 Google LLC.\n\n' 'Your use of this software is subject to your agreement with Google. ' 'You may not copy, modify, or distribute this software except as ' 'permitted under your agreement with Google.' ) build_version.init_cloud_logging_build_version() cloud_logging_client.info(copyright_notification) try: with open(_LICENSE_FILE_PATH, 'rt') as infile: software_license = infile.read() except FileNotFoundError: software_license = None if not software_license: cloud_logging_client.critical('GKE container is missing software license.') raise LicenseMissingError('Missing software license') cloud_logging_client.info( 'Opensource software licenses', {'licenses': software_license} ) log = {'redis_server': ingest_flags.REDIS_SERVER_IP_FLG.value} if not redis_client.redis_client().has_redis_client(): cloud_logging_client.info('Redis is not configured', log) else: cloud_logging_client.info('Redis is configured', log) if redis_client.redis_client().ping(): cloud_logging_client.info('Successfully pinged redis server', log) else: cloud_logging_client.critical('Could not ping redis server', log) raise RedisConnectionFailedError( 'Could not connect to redis server ' f'{ingest_flags.REDIS_SERVER_IP_FLG.value}.' ) transformation_pipeline = ( ingest_flags.TRANSFORMATION_PIPELINE_FLG.value.strip().lower() ) if transformation_pipeline == 'oof': _run_oof_ingest() elif transformation_pipeline == 'default': _run_default_ingest() else: err_msg = ( f'Invalid TRANSFORMATION_PIPELINE value: {transformation_pipeline}. ' 'Expected: default or oof.' ) cloud_logging_client.critical(err_msg) raise ValueError(err_msg) if __name__ == '__main__': app.run(main)