in python/website-worker/research_pacs/website_worker/main.py [0:0]
def process_new_export(msg, msg_content):
"""
Function that runs in a thread to process an export task.
Args:
msg (dict): SQS message
msg_content (dict): Content of the SQS message
"""
def extend_message_timeout():
"""
Make the message unavailable to other SQS consumers until the export task is running.
"""
logger.debug(f'Export #{task_id} - Extanding the message visibility timeout')
client.sqs.change_message_visibility(
QueueUrl=env.queue_url,
ReceiptHandle=msg['ReceiptHandle'],
VisibilityTimeout=15
)
try:
# Initialize thread variables
db = None
nb_exported = 0
nb_failed = 0
error_messages = {}
assert 'TaskId' in msg_content, 'Missing "TaskId" attribute in the message'
assert 'AccessKey' in msg_content, 'Missing "AccessKey" attribute in the message'
assert 'SecretKey' in msg_content, 'Missing "SecretKey" attribute in the message'
assert 'SessionToken' in msg_content, 'Missing "SessionToken" attribute in the message'
task_id = msg_content['TaskId']
logger.info(f'Export #{task_id} - Starting the export task')
extend_message_timeout()
last_timeout_extension = datetime.now()
try:
db = DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db)
# Retrieve the export task parameters
db_exports = DBExportTasks(db)
parameters = db_exports.get_task(task_id)
# Retrieve the DICOM instances that match the query
db_dicom_json = DBDicomJson(db)
instances = db_dicom_json.search_instances(parameters['JSONPathQuery'])
logger.debug(f'Export #{task_id} - Exporting {len(instances)} instances to Amazon S3')
except Exception as e:
logger.error(f'Export #{task_id} - Failed to list the Orthanc instances to export - {e}')
raise Exception('Failed to list the Orthanc instances to export')
# Prepare S3 credentials
credentials = {
'aws_access_key_id': msg_content['AccessKey'],
'aws_secret_access_key': msg_content['SecretKey']
}
if msg_content['SessionToken'] != '':
credentials['aws_session_token'] = msg_content['SessionToken']
# For each instance to export
for instance in instances:
instance_id = instance[0]
instance_json = instance[1]
s3_prefix = f"s3://{parameters['S3Bucket']}/{parameters['S3Prefix']}{instance_id}"
# Extand the message visibility timeout every 5 seconds to prevent the SQS message from
# being visible by other queue consumers
if (datetime.now() - last_timeout_extension).seconds > 5:
extend_message_timeout()
last_timeout_extension = datetime.now()
try:
logger.debug(f'Export #{task_id} - Exporting the instance {instance_id}')
# If the export format is DICOM, download the file from Orthanc and upload it to the S3
# bucket
if parameters['Format'] == 'dicom':
try:
transcode = parameters['Transcode'] if parameters['Transcode'] != '' else None
file_bytes = client.orthanc.download_instance_dicom(instance_id, transcode)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to download the Orthanc instance - {e}')
raise Exception('Failed to download the DICOM file from Orthanc, the Transfer Syntax UID may be incorrect or incompatible')
try:
s3_key = s3_prefix+'.dcm'
rpacs_util.write_file(file_bytes, s3_key, env.region, 'bytes', credentials)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to write the file to S3 - {e}')
raise Exception('Failed to write the file to the S3 bucket, please check the S3 path and credentials')
# If the export format is PNG or JPEG, count the number of frames and export each frame
else:
try:
nb_frames = client.orthanc.count_instance_frames(instance_id)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to count the frames in a DICOM file - {e}')
raise Exception('Orthanc failed to count the number of frames in the DICOM image')
for frame in range(nb_frames):
try:
accept = 'image/png' if parameters['Format'] == 'png' else 'image/jpeg'
file_bytes = client.orthanc.download_instance_frame(instance_id, accept, frame)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to download a frame of a DICOM file - {e}')
raise Exception(f'Failed to export frames as "{accept}" format')
try:
s3_key = f"{s3_prefix}_{frame}.{parameters['Format']}" if nb_frames > 1 else f"{s3_prefix}.{parameters['Format']}"
rpacs_util.write_file(file_bytes, s3_key, env.region, 'bytes', credentials)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to write the file to S3 - {e}')
raise Exception('Failed to write the file to the S3 bucket, please check the S3 path and credentials')
# If the DICOM attributes must be exported to a JSON document
if parameters['ExportJSON']:
try:
s3_key = s3_prefix+'.json'
instance_json_keywords = rpacs_dicom_json.add_keywords_to_dicom_json(instance_json)
rpacs_util.write_file(instance_json_keywords, s3_key, env.region, 'json', credentials)
except Exception as e:
logger.warning(f'Export #{task_id} - Failed to write the JSON file to S3 - {e}')
raise Exception('Failed to write the JSON file to the S3 bucket, please check the S3 path and credentials')
nb_exported += 1
# Count the number of occurences for each error message
except Exception as e:
nb_failed += 1
err_msg = str(e)
if not err_msg in error_messages.keys():
error_messages[err_msg] = 1
else:
error_messages[err_msg] += 1
# Update the export task results
try:
results = {
'NbExported': nb_exported,
'NbFailed': nb_failed,
}
if len(error_messages) > 0:
results['Errors'] = '. '.join([f'{v} times: {k}' for k,v in error_messages.items()])
db_exports.update_task(task_id, 'completed', results)
logger.info(f'Export #{task_id} - Export task has completed - Exported={nb_exported} Failed={nb_failed}')
except Exception as e:
raise Exception("Failed to update the export task results")
except Exception as e:
db_exports.update_task(task_id, 'failed', results={'Errors': str(e)})
finally:
client.sqs.delete_message(QueueUrl=env.queue_url, ReceiptHandle=msg['ReceiptHandle'])
if db != None:
db.close()