in pathology/transformation_pipeline/local_main.py [0:0]
def main(unused_argv):
container_dirs = _get_container_dirs()
test_dirs_exist = [
container_dirs.metadata,
container_dirs.imaging,
container_dirs.processed_images,
]
if _is_mocked_dicom_store():
test_dirs_exist.append(container_dirs.dicom_store)
if not _LOCALHOST_DICOM_STORE_FLG.value:
cloud_logging_client.critical('Localhost DICOM Store is undefined.')
raise _LocalhostMissingDicomStoreConfigurationError(
'Localhost DICOM Store is undefined.'
)
for path in test_dirs_exist:
_can_read_write_dir(path, False)
if not _has_metadata_schema_in_dir(container_dirs.metadata):
_copy_default_schema_to_metadata_dir(container_dirs.metadata)
try:
gcs_bucket_name_generating_pubsub = 'image_ingest'
gcs_bucket_name_holding_processed_imaging = 'transform_output'
gcs_bucket_name_holding_metadata = 'metadata'
gcs_file_to_ingest_list = _build_ingest_file_list(
gcs_bucket_name_generating_pubsub
)
# Project id hosting storage buckets and pub/sub subscription
project_id = 'mock-project-id'
transform_pod_uid = str(uuid.uuid4())
# Name of pub/sub subscription listening on.
gcs_subscription = 'mock-gcs-subscription'
# dicom store image are uploaded into
if _is_mocked_dicom_store():
dicomweb_url = _MOCK_DICOM_STORE_URL
else:
dicomweb_url = _LOCALHOST_DICOM_STORE_FLG.value
with contextlib.ExitStack() as context_list:
context_list.enter_context(
flagsaver.flagsaver(
metadata_bucket=gcs_bucket_name_holding_metadata,
gcs_subscription=gcs_subscription,
ingest_succeeded_uri=(
f'gs://{gcs_bucket_name_holding_processed_imaging}/success'
),
ingest_failed_uri=(
f'gs://{gcs_bucket_name_holding_processed_imaging}/failure'
),
dicomweb_url=dicomweb_url,
project_id=project_id,
gcs_file_to_ingest_list=gcs_file_to_ingest_list,
# Send logs to to container standard out
**_get_logging_destination(project_id),
transform_pod_uid=transform_pod_uid,
pod_uid=transform_pod_uid,
# Pod host is not set.
pod_hostname='',
)
)
if gcs_file_to_ingest_list is None:
context_list.enter_context(
gcs_pubsub_mock.MockPubSub(
project_id,
gcs_subscription,
gcs_bucket_name_generating_pubsub,
_get_container_dirs().imaging,
call_if_no_files=_call_if_no_files,
message_queue_delay_sec=_PUBSUB_MOCK_MESSAGE_DELAY_SEC_FLG.value,
)
)
if (
not _ENABLE_TRANSFORMATION_PIPELINE_GENERATED_PUBSUB_MESSAGES_FLG.value
):
context_list.enter_context(
mock.patch('google.cloud.pubsub_v1.PublisherClient', autospec=True)
)
context_list.enter_context(
gcs_mock.GcsMock({
gcs_bucket_name_holding_metadata: container_dirs.metadata,
gcs_bucket_name_holding_processed_imaging: (
container_dirs.processed_images
),
gcs_bucket_name_generating_pubsub: container_dirs.imaging,
})
)
if _is_mocked_dicom_store():
mock_dicom_store = context_list.enter_context(
dicom_store_mock.MockDicomStores(
_MOCK_DICOM_STORE_URL,
real_http=True, # Pass unhandled requests through mock.
)
)
mock_dicom_store[_MOCK_DICOM_STORE_URL].set_dicom_store_disk_storage(
container_dirs.dicom_store
)
gke_main.main(unused_argv=None)
cloud_logging_client.info('Transformation pipeline done.')
except Exception as exp:
cloud_logging_client.critical(
'Unexpected error running transformation pipeline', exp
)
raise