def main()

in python/change-pooler/research_pacs/change_pooler/main.py [0:0]


def main():
  logger.info('Starting change pooler')

  try:
    global env
    env = get_env()
    
    # Create the clients
    global client
    client = ClientList()
    client.add('db', DB(env.pg_host, env.pg_port, env.pg_user, env.pg_pwd, env.pg_db))
    client.add('db_last_state', DBKeyJsonValue(client.db, table_name="rpacs_change_pooler_last_state"))
    client.add('orthanc', OrthancClient(env.orthanc_host, env.orthanc_user, env.orthanc_pwd))
    client.add('sqs', boto3.client('sqs', region_name=env.region))

    # Retrieve the last Orthanc change ID (Seq) already processed from the database
    last_state = client.db_last_state.get(key=env.orthanc_host, init_value={"last_seq": 0})
    current_last_seq = last_state['last_seq']
    logger.info(f'Last Orthanc change ID (Seq) already processed = {current_last_seq}')
    
    # This variable will be set to True if we fail to update the last change ID (Seq) that 
    # was processed in the database
    db_update_orthanc_failed = False
  
  # Exit if any of the previous initialization steps failed
  except Exception as e:
    logger.fatal(f'Failed to initialize the program - {e}')
    sys.exit(1)

  # Loop until the program is interrupted by SIGINT or SIGTERM
  killer = GracefulKiller()
  while not killer.kill_now:
    
    # Retrieve and process the changes that occured in Orthanc whose change ID (Seq) is larger 
    # than `current_last_seq`
    try:
      changes, last_seq = client.orthanc.get_changes(from_seq=current_last_seq)
      new_last_seq = current_last_seq
      
      # If you run Orthanc with no persistent storage, the Orthanc change ID is zeroed if you lose 
      # the Orthanc database content. In that case, we reset the change ID to zero.
      if last_seq < current_last_seq:
        logger.warning(f'Orthanc may have been reinitialized. Setting the last Orthanc change ID (Seq) to 0')
        new_last_seq = 0

      else:
        for change in changes:
          logger.debug(f'New Orthanc change: {json.dumps(change)}')
          
          if change['ChangeType'] == 'NewInstance':
            logger.info(f"New DICOM instance in Orthanc - ID={change['ID']}")
            
            # Send a SQS message to notify of the new DICOM instance. If the message could not be 
            # sent, we interrupt the loop iteration and retry later
            if send_sqs_message({'EventType': 'NewDICOM', 'Source': f"orthanc://{change['ID']}"}) is False:
              break
          
          # Increment the last Orthanc change ID already processed
          new_last_seq = change['Seq']
        
      # Update the last Orthanc change ID in the database, if its value changed or if the previous 
      # update failed
      if new_last_seq != current_last_seq or db_update_orthanc_failed is True:
        current_last_seq = new_last_seq
        try:
          client.db_last_state.update(key=env.orthanc_host, new_value_dict={"last_seq": current_last_seq})
          db_update_orthanc_failed = False
        except Exception as e:
          logger.warning(f'Failed to update the last Orthanc change ID - {e}')
          db_update_orthanc_failed = True
          
    except Exception as e:
      logger.error(f'Failed to get and process Orthanc changes - {e}')

    # Close the DB connection after each iteration
    client.db.close()

    # Wait 5 seconds before the next iteration
    logger.debug(f"Waiting 5 seconds")
    killer.sleep(5)

  # Before the program exits
  logger.info('Stopping change pooler')