in python/change-pooler/research_pacs/change_pooler/main.py [0:0]
def main():
logger.info('Starting change pooler')
try:
global env
env = get_env()
# Create the clients
global client
client = ClientList()
client.add('db', DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db))
client.add('db_last_state', DBKeyJsonValue(client.db, table_name="rpacs_change_pooler_last_state"))
client.add('orthanc', OrthancClient(env.orthanc_host, env.orthanc_user, env.orthanc_pwd))
client.add('sqs', boto3.client('sqs', region_name=env.region))
# Retrieve the last Orthanc change ID (Seq) already processed from the database
last_state = client.db_last_state.get(key=env.orthanc_host, init_value={"last_seq": 0})
current_last_seq = last_state['last_seq']
logger.info(f'Last Orthanc change ID (Seq) already processed = {current_last_seq}')
# This variable will be set to True if we fail to update the last change ID (Seq) that
# was processed in the database
db_update_orthanc_failed = False
# Exit if any of the previous initialization steps failed
except Exception as e:
logger.fatal(f'Failed to initialize the program - {e}')
sys.exit(1)
# Loop until the program is interrupted by SIGINT or SIGTERM
killer = GracefulKiller()
while not killer.kill_now:
# Retrieve and process the changes that occured in Orthanc whose change ID (Seq) is larger
# than `current_last_seq`
try:
changes, last_seq = client.orthanc.get_changes(from_seq=current_last_seq)
new_last_seq = current_last_seq
# If you run Orthanc with no persistent storage, the Orthanc change ID is zeroed if you lose
# the Orthanc database content. In that case, we reset the change ID to zero.
if last_seq < current_last_seq:
logger.warning(f'Orthanc may have been reinitialized. Setting the last Orthanc change ID (Seq) to 0')
new_last_seq = 0
else:
for change in changes:
logger.debug(f'New Orthanc change: {json.dumps(change)}')
if change['ChangeType'] == 'NewInstance':
logger.info(f"New DICOM instance in Orthanc - ID={change['ID']}")
# Send a SQS message to notify of the new DICOM instance. If the message could not be
# sent, we interrupt the loop iteration and retry later
if send_sqs_message({'EventType': 'NewDICOM', 'Source': f"orthanc://{change['ID']}"}) is False:
break
# Increment the last Orthanc change ID already processed
new_last_seq = change['Seq']
# Update the last Orthanc change ID in the database, if its value changed or if the previous
# update failed
if new_last_seq != current_last_seq or db_update_orthanc_failed is True:
current_last_seq = new_last_seq
try:
client.db_last_state.update(key=env.orthanc_host, new_value_dict={"last_seq": current_last_seq})
db_update_orthanc_failed = False
except Exception as e:
logger.warning(f'Failed to update the last Orthanc change ID - {e}')
db_update_orthanc_failed = True
except Exception as e:
logger.error(f'Failed to get and process Orthanc changes - {e}')
# Close the DB connection after each iteration
client.db.close()
# Wait 5 seconds before the next iteration
logger.debug(f"Waiting 5 seconds")
killer.sleep(5)
# Before the program exits
logger.info('Stopping change pooler')