def lambda_handler()

in SNS/SNSFastPublish/sns_fast_publish.py [0:0]


def lambda_handler(event, context):
    global publish_errors, bucket, key

    # Start time logging
    log('start')

    try:
        # Read the uploaded object from bucket
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
        response = s3.get_object(Bucket=bucket, Key=key)
        body = json.loads(zlib.decompress(response['Body'].read(), 16+zlib.MAX_WBITS))

        # Check if sending same message to all endpoints
        if "SameMessage" in body.keys() and body['SameMessage']:
            message = body['Message']
        else:
            message = None

        endpoints = body['Endpoints']

        # Publish in parallel using several threads
        e = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads)
        for endpoint in endpoints:
            e.submit(publish, endpoint, message)
        e.shutdown()
    except Exception as e:
        print(e.message + ' Aborting...')
        raise e

    print('Publish complete.')

    # Finish time logging
    log('end')

    # Remove the uploaded object
    try:
        response = s3.delete_object(Bucket=bucket, Key=key)
        if 'ResponseMetadata' in response.keys() and response['ResponseMetadata']['HTTPStatusCode'] == 204:
            print('Removed s3://%s/%s' % (bucket, key))
    except Exception as e:
        print(e)

    # Upload errors if any to S3
    if len(publish_errors) > 0:
        result_data = '\n'.join(publish_errors)
        logfile_key = key.replace('.json.gz', '') + '_error.log'
        save_to_s3(result_data, bucket, logfile_key)

        # Reset publish error log
        publish_errors = []

    # Store time log to s3
    log('save')