pathology/transformation_pipeline/local_main.py (230 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""DOCKER CLI entrypoint for transformation pipeline."""
import contextlib
import dataclasses
import os
import shutil
from typing import Any, List, Mapping, Optional
from unittest import mock
import uuid
from absl import app
from absl import flags
from absl.testing import flagsaver
from pathology.shared_libs.flags import flag_utils
from pathology.shared_libs.logging_lib import cloud_logging_client
from pathology.shared_libs.test_utils.dicom_store_mock import dicom_store_mock
from pathology.shared_libs.test_utils.gcs_mock import gcs_mock
from pathology.shared_libs.test_utils.gcs_mock import gcs_pubsub_mock
from pathology.transformation_pipeline import gke_main
from pathology.transformation_pipeline.ingestion_lib.dicom_gen.wsi_to_dicom import metadata_storage_client
_CONTAINER_BASE_DIR = ''
_MOCK_DICOM_STORE_URL = 'https://mock.dicom.store.com/dicomWeb'
_EXAMPLE_METADATA_MAPPING_SCHEMA_PATH = os.path.join(
os.path.dirname(__file__), 'example', 'metadata_mapping_schemas'
)
_POLL_IMAGE_INGESTION_DIR_FLG = flags.DEFINE_boolean(
'poll_image_ingestion_dir',
flag_utils.env_value_to_bool('POLL_IMAGE_INGESTION_DIR', False),
'Monitor image ingest dir for new images; transformation pipeline will'
' continue to run after all images in the ingestion dir have been'
' processed.',
)
_ENABLE_CLOUD_OPS_LOGGING_FLG = flags.DEFINE_boolean(
'enable_cloud_ops_logging',
flag_utils.env_value_to_bool('ENABLE_CLOUD_OPS_LOGGING', False),
'Enables publishing transformation logs to cloud operations',
)
_ENABLE_TRANSFORMATION_PIPELINE_GENERATED_PUBSUB_MESSAGES_FLG = (
flags.DEFINE_boolean(
'enable_transformation_pipeline_generated_published_pubsub_messages',
flag_utils.env_value_to_bool(
'ENABLE_TRANSFORMATION_PIPELINE_GENERATED_PUBSUB_MESSAGES', False
),
'Enables transformation pipeline generation of pub/sub messages.',
)
)
_PUBSUB_MOCK_MESSAGE_DELAY_SEC_FLG = flags.DEFINE_float(
'pubsub_mock_message_delay_sec',
float(os.getenv('PUBSUB_MOCK_MESSAGE_DELAY_SEC', '30.0')),
'Delay to wait before processing mock gcs pub/sub msg to ensure file has'
' finished copying into image input bucket.',
)
_LOCALHOST_DICOM_STORE_FLG = flags.DEFINE_string(
'localhost_dicom_store',
os.getenv('LOCALHOST_DICOM_STORE', ''),
'External directory or http server being connected to (Required).',
)
class _LocalhostMissingDicomStoreConfigurationError(Exception):
pass
class _UnableToReadWriteFromDirError(Exception):
pass
@dataclasses.dataclass(frozen=True)
class _ContainerDirs:
metadata: str
imaging: str
processed_images: str
dicom_store: str
def _has_metadata_schema_in_dir(metadata_dir: str) -> bool:
for path in os.listdir(metadata_dir):
if metadata_storage_client.is_schema(path):
return True
return False
def _copy_default_schema_to_metadata_dir(metadata_dir: str):
"""Copy example metadata mapping schemas to metadata dir."""
for schema_filesname in os.listdir(_EXAMPLE_METADATA_MAPPING_SCHEMA_PATH):
example_schema_path = os.path.join(
_EXAMPLE_METADATA_MAPPING_SCHEMA_PATH, schema_filesname
)
if not metadata_storage_client.is_schema(example_schema_path):
continue
try:
shutil.copyfile(
example_schema_path, os.path.join(metadata_dir, schema_filesname)
)
except OSError as exp:
cloud_logging_client.warning(
'Error occured copying example metadata mapping schema to metadata'
' dir.',
exp,
)
continue
def _is_mocked_dicom_store() -> bool:
store = _LOCALHOST_DICOM_STORE_FLG.value.lower()
return not store.startswith('http://') and not store.startswith('https://')
def _get_container_dirs() -> _ContainerDirs:
base_dir = _CONTAINER_BASE_DIR.rstrip('/')
return _ContainerDirs(
f'{base_dir}/input_metadata',
f'{base_dir}/input_imaging',
f'{base_dir}/processed_imaging',
f'{base_dir}/mock_dicom_store' if _is_mocked_dicom_store() else '',
)
def _can_read_write_dir(path: str, readonly: bool):
"""Validates that mounted path supports expected, read/write."""
if not os.path.isdir(path):
cloud_logging_client.critical(f'Directory {path} does not exist.')
raise _UnableToReadWriteFromDirError(f'Directory {path} does not exist.')
if not os.access(path, os.R_OK):
cloud_logging_client.critical(f'Cannot read from directory: {path}')
raise _UnableToReadWriteFromDirError(f'Cannot read directory {path}.')
if readonly:
return
if not os.access(path, os.W_OK):
cloud_logging_client.critical(f'Cannot write to directory: {path}')
raise _UnableToReadWriteFromDirError(f'Cannot write to directory {path}.')
def _call_if_no_files() -> None:
"""Call back called when mock gcs pub/sub queue has no files."""
return
def _build_ingest_file_list(bucket: str) -> Optional[List[str]]:
"""Returns a list of files to ingest."""
if _POLL_IMAGE_INGESTION_DIR_FLG.value:
return None
base_dir = _get_container_dirs().imaging.rstrip('/')
base_dir_len = len(base_dir)
return_files = []
for root, _, files in os.walk(base_dir):
gcs_root = f'gs://{bucket}{root[base_dir_len:]}'
for fname in files:
if fname.startswith('.'):
continue
return_files.append(os.path.join(gcs_root, fname))
return return_files
def _get_logging_destination(project_id: str) -> Mapping[str, Any]:
if _ENABLE_CLOUD_OPS_LOGGING_FLG.value:
return dict()
return dict(
ops_log_project=project_id,
ops_log_name='transformation_pipeline',
debug_logging_use_absl_logging=True,
)
def main(unused_argv):
container_dirs = _get_container_dirs()
test_dirs_exist = [
container_dirs.metadata,
container_dirs.imaging,
container_dirs.processed_images,
]
if _is_mocked_dicom_store():
test_dirs_exist.append(container_dirs.dicom_store)
if not _LOCALHOST_DICOM_STORE_FLG.value:
cloud_logging_client.critical('Localhost DICOM Store is undefined.')
raise _LocalhostMissingDicomStoreConfigurationError(
'Localhost DICOM Store is undefined.'
)
for path in test_dirs_exist:
_can_read_write_dir(path, False)
if not _has_metadata_schema_in_dir(container_dirs.metadata):
_copy_default_schema_to_metadata_dir(container_dirs.metadata)
try:
gcs_bucket_name_generating_pubsub = 'image_ingest'
gcs_bucket_name_holding_processed_imaging = 'transform_output'
gcs_bucket_name_holding_metadata = 'metadata'
gcs_file_to_ingest_list = _build_ingest_file_list(
gcs_bucket_name_generating_pubsub
)
# Project id hosting storage buckets and pub/sub subscription
project_id = 'mock-project-id'
transform_pod_uid = str(uuid.uuid4())
# Name of pub/sub subscription listening on.
gcs_subscription = 'mock-gcs-subscription'
# dicom store image are uploaded into
if _is_mocked_dicom_store():
dicomweb_url = _MOCK_DICOM_STORE_URL
else:
dicomweb_url = _LOCALHOST_DICOM_STORE_FLG.value
with contextlib.ExitStack() as context_list:
context_list.enter_context(
flagsaver.flagsaver(
metadata_bucket=gcs_bucket_name_holding_metadata,
gcs_subscription=gcs_subscription,
ingest_succeeded_uri=(
f'gs://{gcs_bucket_name_holding_processed_imaging}/success'
),
ingest_failed_uri=(
f'gs://{gcs_bucket_name_holding_processed_imaging}/failure'
),
dicomweb_url=dicomweb_url,
project_id=project_id,
gcs_file_to_ingest_list=gcs_file_to_ingest_list,
# Send logs to to container standard out
**_get_logging_destination(project_id),
transform_pod_uid=transform_pod_uid,
pod_uid=transform_pod_uid,
# Pod host is not set.
pod_hostname='',
)
)
if gcs_file_to_ingest_list is None:
context_list.enter_context(
gcs_pubsub_mock.MockPubSub(
project_id,
gcs_subscription,
gcs_bucket_name_generating_pubsub,
_get_container_dirs().imaging,
call_if_no_files=_call_if_no_files,
message_queue_delay_sec=_PUBSUB_MOCK_MESSAGE_DELAY_SEC_FLG.value,
)
)
if (
not _ENABLE_TRANSFORMATION_PIPELINE_GENERATED_PUBSUB_MESSAGES_FLG.value
):
context_list.enter_context(
mock.patch('google.cloud.pubsub_v1.PublisherClient', autospec=True)
)
context_list.enter_context(
gcs_mock.GcsMock({
gcs_bucket_name_holding_metadata: container_dirs.metadata,
gcs_bucket_name_holding_processed_imaging: (
container_dirs.processed_images
),
gcs_bucket_name_generating_pubsub: container_dirs.imaging,
})
)
if _is_mocked_dicom_store():
mock_dicom_store = context_list.enter_context(
dicom_store_mock.MockDicomStores(
_MOCK_DICOM_STORE_URL,
real_http=True, # Pass unhandled requests through mock.
)
)
mock_dicom_store[_MOCK_DICOM_STORE_URL].set_dicom_store_disk_storage(
container_dirs.dicom_store
)
gke_main.main(unused_argv=None)
cloud_logging_client.info('Transformation pipeline done.')
except Exception as exp:
cloud_logging_client.critical(
'Unexpected error running transformation pipeline', exp
)
raise
if __name__ == '__main__':
app.run(main)