in python/website/research_pacs/website/main.py [0:0]
def export_page():
"""Page Export DICOM Instances to Amazon S3"""
def create_export_task():
"""Create a new export task. Returns `None` if the task was created, or an error message."""
# Retrieve inputs from the form submitted
query = request.form.get('query', default='')
export_format = request.form.get('format', default='')
export_json = request.form.get('json') == 'on'
transcode = request.form.get('transcode', default='')
s3_path = request.form.get('s3_path', default='')
access_key = request.form.get('access_key', default='')
secret_key = request.form.get('secret_key', default='')
session_token = request.form.get('session_token', default='')
# Verify the inputs
try:
assert export_format in ('dicom', 'png', 'jpeg'), 'The export format must be DICOM, PNG or JPEG.'
match = re.search('^s3:\/\/([^\/]+)\/((?:|.+\/))$', s3_path)
assert match, 'The S3 path is invalid. It must be "s3://bucket/" or "s3://bucket/prefix/", and it must end with a "/".'
assert len(access_key) > 0, 'You must provide an AWS Access Key.'
assert len(secret_key) > 0, 'You must provide an AWS Secret Key.'
except Exception as e:
return str(e)
# Translate the query and the user's instance access permissions into a JSON Path query
try:
jsonpath_query = client.permissions.get_jsonpath_query(query)
logger.debug(f'Export - JSON Path Query: {jsonpath_query}')
except ValueError:
return 'Your query is invalid.'
# Reject the request if the user already has tasks with the "exporting" status
if db_exports.has_user_ongoing_exports(g.user) is True:
return 'You already have an ongoing export task. Please wait for the task to complete, or for one hour after the previous task was created.'
# Add the export task into the database
parameters = {
'Query': query,
'JSONPathQuery': jsonpath_query,
'Format': export_format,
'ExportJSON': export_json,
'Transcode': transcode,
'S3Bucket': match.group(1),
'S3Prefix': match.group(2)
}
task_id = db_exports.insert_task(g.user, parameters)
client.access_logger.log_new_export(parameters, task_id)
# Send a SQS message so that the website worker can process the export task
client_sqs = boto3.client('sqs', region_name=env.region)
client_sqs.send_message(
QueueUrl=env.queue_url,
MessageBody=json.dumps({
'EventType': 'NewExport',
'TaskId': task_id,
'AccessKey': access_key,
'SecretKey': secret_key,
'SessionToken': session_token
})
)
g.db = DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db)
db_exports = DBExportTasks(g.db)
error_message = None
# Create a new export task if the form was submitted (method POST). If the creation succeeded,
# redirect the user to the same page with a GET method to avoid multiple form submissions
if request.method == 'POST':
error_message = create_export_task()
if error_message is None:
return flask.redirect('/aws/export')
# Display the export tasks for this user
tasks = [
{'Date': task[0], 'Status': task[1], 'Parameters': task[2], 'Results': task[3]}
for task in db_exports.list_tasks()
]
return flask.render_template('export.html', error_message=error_message, tasks=tasks)