in python/de-identifier/research_pacs/de_identifier/main.py [0:0]
def main():
logger.info('Starting de-identifier')
try:
global env
env = get_env()
# Create the clients
global client
client = rpacs_util.ClientList()
client.add('db', DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db))
client.add('db_msg', DBKeyJsonValue(db_client=client.db, table_name="rpacs_related_msg"))
client.add('db_mapping', DBDicomMapping(db_client=client.db))
client.add('src_orthanc', OrthancClient(env.src_orthanc_host, env.src_orthanc_user, env.src_orthanc_pwd))
client.add('dst_orthanc', OrthancClient(env.dst_orthanc_host, env.dst_orthanc_user, env.dst_orthanc_pwd))
client.add('sqs', boto3.client('sqs', region_name=env.region))
# Exit if any of the previous steps failed
except Exception as e:
logger.fatal(f'Failed to initialize the program - {e}')
sys.exit(1)
# Loop until the program is interrupted
killer = rpacs_util.GracefulKiller()
while not killer.kill_now:
# Retrieve up to 10 messages from the SQS queue
try:
messages_returned = False
logger.debug(f'Retrieving messages from the SQS queue')
sqs_response = client.sqs.receive_message(
QueueUrl=env.queue_url,
AttributeNames=['ApproximateReceiveCount'],
MaxNumberOfMessages=10,
MessageAttributeNames=['All'],
VisibilityTimeout=env.queue_timeout,
WaitTimeSeconds=1
)
# Process each message and delete it from the queue if it succeeded
messages = sqs_response['Messages'] if 'Messages' in sqs_response else []
logger.debug(f'SQS returned {len(messages)} messages to process')
if len(messages) > 0:
messages_returned = True
for message in messages:
try:
# Delete the message if it was served more than `queue_max_attemps`
nb_attempts = int(message['Attributes']['ApproximateReceiveCount'])
if nb_attempts > env.queue_max_attemps:
client.sqs.delete_message(QueueUrl=env.queue_url, ReceiptHandle=message['ReceiptHandle'])
continue
process_message(message['Body'])
client.sqs.delete_message(QueueUrl=env.queue_url, ReceiptHandle=message['ReceiptHandle'])
except Exception as e:
logger.error(f'Failed to process the message ({nb_attempts} attempts) - {e}')
except Exception as e:
logger.error(f'Failed to poll messages from SQS - {e}')
# Close the DB connection after each iteration
client.db.close()
# Wait 5 seconds if the previous request returned no SQS message
if messages_returned is False:
logger.debug(f"Waiting 5 seconds")
killer.sleep(5)
# Before the program exits
logger.info('Stopping de-identifier')