in source/lambda/iot-dr-region-syncer/iot-region-to-region-syncer.py [0:0]
def lambda_handler(event, context):
logger.info('syncer: start')
global NUM_THINGS_SYNCED, NUM_THINGS_EXIST, NUM_ERRORS
logger.info('event: {}'.format(event))
NUM_THINGS_SYNCED = 0
NUM_THINGS_EXIST = 0
NUM_ERRORS = 0
if MAX_WORKERS > 50:
logger.error('max allowed workers is 50 defined: {}'.format(MAX_WORKERS))
raise Exception('max allowed workers is 50 defined: {}'.format(MAX_WORKERS))
max_pool_connections = 10
if MAX_WORKERS >= 10:
max_pool_connections = round(MAX_WORKERS*1.2)
logger.info('max_pool_connections: {}'.format(max_pool_connections))
boto3_config = Config(
max_pool_connections = max_pool_connections,
retries = {'max_attempts': 10, 'mode': 'standard'}
)
c_iot_p = boto3.client('iot', config=boto3_config, region_name=PRIMARY_REGION)
c_iot_s = boto3.client('iot', config=boto3_config, region_name=SECONDARY_REGION)
executor = futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
logger.info('executor: started: {}'.format(executor))
if registry_indexing_enabled(c_iot_p):
logger.info('registry indexing enabled - using search_index to get things')
get_search_things(c_iot_p, c_iot_s, QUERY_STRING, 100, executor)
else:
logger.info('registry indexing disabled - using list_things to get things')
get_list_things(c_iot_p, c_iot_s)
logger.info('executor: waiting to finish')
executor.shutdown(wait=True)
logger.info('executor: shutted down')
if SYNC_MODE == "smart":
logger.info('syncer: stats: NUM_THINGS_SYNCED: {} NUM_THINGS_EXIST: {} NUM_ERRORS: {}'.format(NUM_THINGS_SYNCED, NUM_THINGS_EXIST, NUM_ERRORS))
else:
logger.info('syncer: stats: NUM_THINGS_SYNCED: {} NUM_ERRORS: {}'.format(NUM_THINGS_SYNCED, NUM_ERRORS))
logger.info('syncer: stop')
return True