in SNS/SNSFastPublish/sns_fast_publish.py [0:0]
def lambda_handler(event, context):
global publish_errors, bucket, key
# Start time logging
log('start')
try:
# Read the uploaded object from bucket
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
response = s3.get_object(Bucket=bucket, Key=key)
body = json.loads(zlib.decompress(response['Body'].read(), 16+zlib.MAX_WBITS))
# Check if sending same message to all endpoints
if "SameMessage" in body.keys() and body['SameMessage']:
message = body['Message']
else:
message = None
endpoints = body['Endpoints']
# Publish in parallel using several threads
e = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads)
for endpoint in endpoints:
e.submit(publish, endpoint, message)
e.shutdown()
except Exception as e:
print(e.message + ' Aborting...')
raise e
print('Publish complete.')
# Finish time logging
log('end')
# Remove the uploaded object
try:
response = s3.delete_object(Bucket=bucket, Key=key)
if 'ResponseMetadata' in response.keys() and response['ResponseMetadata']['HTTPStatusCode'] == 204:
print('Removed s3://%s/%s' % (bucket, key))
except Exception as e:
print(e)
# Upload errors if any to S3
if len(publish_errors) > 0:
result_data = '\n'.join(publish_errors)
logfile_key = key.replace('.json.gz', '') + '_error.log'
save_to_s3(result_data, bucket, logfile_key)
# Reset publish error log
publish_errors = []
# Store time log to s3
log('save')