def process_new_export()

in python/website-worker/research_pacs/website_worker/main.py [0:0]


def process_new_export(msg, msg_content):
  """
  Function that runs in a thread to process an export task.
  
  Args:
    msg (dict): SQS message
    msg_content (dict): Content of the SQS message
  
  """
  def extend_message_timeout():
    """
    Make the message unavailable to other SQS consumers until the export task is running.
    
    """
    logger.debug(f'Export #{task_id} - Extanding the message visibility timeout')
    client.sqs.change_message_visibility(
      QueueUrl=env.queue_url,
      ReceiptHandle=msg['ReceiptHandle'],
      VisibilityTimeout=15
    )
  
  try:
    # Initialize thread variables
    db = None
    nb_exported = 0
    nb_failed = 0
    error_messages = {}
    
    assert 'TaskId' in msg_content, 'Missing "TaskId" attribute in the message'
    assert 'AccessKey' in msg_content, 'Missing "AccessKey" attribute in the message'
    assert 'SecretKey' in msg_content, 'Missing "SecretKey" attribute in the message'
    assert 'SessionToken' in msg_content, 'Missing "SessionToken" attribute in the message'
    
    task_id = msg_content['TaskId']
    logger.info(f'Export #{task_id} - Starting the export task')
    
    extend_message_timeout()
    last_timeout_extension = datetime.now()
    
    try:
      db = DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db)
      
      # Retrieve the export task parameters
      db_exports = DBExportTasks(db)
      parameters = db_exports.get_task(task_id)
      
      # Retrieve the DICOM instances that match the query
      db_dicom_json = DBDicomJson(db)
      instances = db_dicom_json.search_instances(parameters['JSONPathQuery'])
      logger.debug(f'Export #{task_id} - Exporting {len(instances)} instances to Amazon S3')
      
    except Exception as e:
      logger.error(f'Export #{task_id} - Failed to list the Orthanc instances to export - {e}')
      raise Exception('Failed to list the Orthanc instances to export')
    
    # Prepare S3 credentials
    credentials = {
      'aws_access_key_id': msg_content['AccessKey'],
      'aws_secret_access_key': msg_content['SecretKey']
    }
    if msg_content['SessionToken'] != '':
      credentials['aws_session_token'] = msg_content['SessionToken']
    
    # For each instance to export
    for instance in instances:
      instance_id = instance[0]
      instance_json = instance[1]
      s3_prefix = f"s3://{parameters['S3Bucket']}/{parameters['S3Prefix']}{instance_id}"
      
      # Extand the message visibility timeout every 5 seconds to prevent the SQS message from
      # being visible by other queue consumers
      if (datetime.now() - last_timeout_extension).seconds > 5:
        extend_message_timeout()
        last_timeout_extension = datetime.now()
    
      try:
        logger.debug(f'Export #{task_id} - Exporting the instance {instance_id}')
        
        # If the export format is DICOM, download the file from Orthanc and upload it to the S3 
        # bucket
        if parameters['Format'] == 'dicom':
          
          try:
            transcode = parameters['Transcode'] if parameters['Transcode'] != '' else None
            file_bytes = client.orthanc.download_instance_dicom(instance_id, transcode)
          except Exception as e:
            logger.warning(f'Export #{task_id} - Failed to download the Orthanc instance - {e}')
            raise Exception('Failed to download the DICOM file from Orthanc, the Transfer Syntax UID may be incorrect or incompatible')
            
          try:
            s3_key = s3_prefix+'.dcm'
            rpacs_util.write_file(file_bytes, s3_key, env.region, 'bytes', credentials)
          except Exception as e:
            logger.warning(f'Export #{task_id} - Failed to write the file to S3 - {e}')
            raise Exception('Failed to write the file to the S3 bucket, please check the S3 path and credentials')
        
        # If the export format is PNG or JPEG, count the number of frames and export each frame
        else:

          try:
            nb_frames = client.orthanc.count_instance_frames(instance_id)
          except Exception as e:
            logger.warning(f'Export #{task_id} - Failed to count the frames in a DICOM file - {e}')
            raise Exception('Orthanc failed to count the number of frames in the DICOM image')
          
          for frame in range(nb_frames):
            
            try:
              accept = 'image/png' if parameters['Format'] == 'png' else 'image/jpeg'
              file_bytes = client.orthanc.download_instance_frame(instance_id, accept, frame)
            except Exception as e:
              logger.warning(f'Export #{task_id} - Failed to download a frame of a DICOM file - {e}')
              raise Exception(f'Failed to export frames as "{accept}" format')
              
            try:
              s3_key = f"{s3_prefix}_{frame}.{parameters['Format']}" if nb_frames > 1 else f"{s3_prefix}.{parameters['Format']}"
              rpacs_util.write_file(file_bytes, s3_key, env.region, 'bytes', credentials)
            except Exception as e:
              logger.warning(f'Export #{task_id} - Failed to write the file to S3 - {e}')
              raise Exception('Failed to write the file to the S3 bucket, please check the S3 path and credentials')
          
        # If the DICOM attributes must be exported to a JSON document
        if parameters['ExportJSON']:
          
          try:
            s3_key = s3_prefix+'.json'
            instance_json_keywords = rpacs_dicom_json.add_keywords_to_dicom_json(instance_json)
            rpacs_util.write_file(instance_json_keywords, s3_key, env.region, 'json', credentials)
          except Exception as e:
            logger.warning(f'Export #{task_id} - Failed to write the JSON file to S3 - {e}')
            raise Exception('Failed to write the JSON file to the S3 bucket, please check the S3 path and credentials')
        
        nb_exported += 1
        
      # Count the number of occurences for each error message
      except Exception as e:
        nb_failed += 1
        err_msg = str(e)
        if not err_msg in error_messages.keys():
          error_messages[err_msg] = 1
        else:
          error_messages[err_msg] += 1
    
    # Update the export task results
    try:
      results = {
        'NbExported': nb_exported,
        'NbFailed': nb_failed,
      }
      if len(error_messages) > 0:
        results['Errors'] = '. '.join([f'{v} times: {k}' for k,v in error_messages.items()])
      db_exports.update_task(task_id, 'completed', results)
      logger.info(f'Export #{task_id} - Export task has completed - Exported={nb_exported} Failed={nb_failed}')
      
    except Exception as e:
      raise Exception("Failed to update the export task results")
    
  except Exception as e:
    db_exports.update_task(task_id, 'failed', results={'Errors': str(e)})
  
  finally:
    client.sqs.delete_message(QueueUrl=env.queue_url, ReceiptHandle=msg['ReceiptHandle'])
    if db != None:
      db.close()