in pathology/transformation_pipeline/local/start_local.py [0:0]
def main(unused_argv):
if _parsed_args.log and _is_logging_to_cloud_ops():
raise _StartLocalHostError(
'Invalid configuration, can not capture logs locally and send logs to'
' cloud operations.'
)
with contextlib.ExitStack() as stack_list:
log_file = (
stack_list.enter_context(open(_parsed_args.log, 'wt'))
if _parsed_args.log
else None
)
metadata_dir = _get_dir('Metadata', stack_list, _parsed_args.metadata_dir)
image_ingestion_dir = _get_dir(
'Image ingestion', stack_list, _parsed_args.image_ingestion_dir
)
processed_image_dir = _get_dir(
'Processed image', stack_list, _parsed_args.processed_image_dir
)
_test_dicom_store()
poll_image_ingestion_dir = _get_poll_image_ingestion_dir(
image_ingestion_dir
)
_copy_file_list_to_input_dir(
stack_list,
_parsed_args.input_images,
image_ingestion_dir.path,
poll_image_ingestion_dir,
)
if _parsed_args.dicom_store is None:
dicom_store = stack_list.enter_context(tempfile.TemporaryDirectory())
print(
'\n'.join([
f'DICOM STORE: {dicom_store}',
(
'Warning: DICOM Store was not set and is initialized a'
' temporary directory. DICOM images created will not persist'
' after termination of the executing python script.'
),
'',
])
)
else:
dicom_store = _parsed_args.dicom_store
print(f'DICOM STORE: {_parsed_args.dicom_store}')
docker_args = _gen_docker_execution_args(
image_ingestion_dir,
metadata_dir,
processed_image_dir,
dicom_store,
poll_image_ingestion_dir,
)
if _parsed_args.log_environment_variables:
log = _get_transform_env_log(_get_env_settings(docker_args))
msg = [
'\n\nTransform Pipleine Enviromental Variables',
'-' * 41,
log,
'-' * 41,
]
logging.info('\n'.join(msg))
return
shell_script_path = _parsed_args.write_docker_run_shellscript
if shell_script_path:
if (
metadata_dir.is_temp
or image_ingestion_dir.is_temp
or processed_image_dir.is_temp
):
raise _StartLocalHostError(
'Invalid configuration, metadata, image ingestion, and processed'
' image dirs must be defined.'
)
_write_docker_run_shellscript(shell_script_path, docker_args)
return
docker_instance_name = _parsed_args.running_docker_instance_name
# remove prexisting running docker.
subprocess.run(['docker', 'rm', docker_instance_name], check=False)
# start docker container and log output.
proc = subprocess.Popen(
args=docker_args,
stdout=None if _is_logging_to_cloud_ops() else subprocess.PIPE,
stderr=None if _is_logging_to_cloud_ops() else subprocess.STDOUT,
)
if _is_logging_to_cloud_ops():
logging.info('Logs are being sent to cloud operations.')
try:
_handle_log(proc, log_file)
finally:
proc.terminate()
# remove stopped docker.
subprocess.run(['docker', 'rm', docker_instance_name], check=False)
_handle_log(proc, log_file)