def message_handler()

in source/CRRMonitor/CRRMonitor.py [0:0]


def message_handler(event):
    def log_statistics(Src, Dst, Tstamp, Size, ET, roundTo):
        # -------------------------------------------------------------
        # Derive the statistic bucket from source/dest and time bucket
        # (5 minute rolling window)
        #
        statbucket = Src + ':' + Dst
        ts = datetime.strptime(Tstamp, timefmt)
        secs = (ts.replace(tzinfo=None) - ts.min).seconds
        rounding = (secs+roundTo/2) // roundTo * roundTo
        ts = ts + timedelta(0, rounding-secs, -ts.microsecond)
        timebucket = datetime.strftime(ts, timefmt)
        statbucket += ':' + timebucket
        # -------------------------------------------------------------
        # Init a dict to use to hold our attrs for DDB
        stat_exp_attrs = {}
        # -------------------------------------------------------------
        # Build the DDB UpdateExpression
        stat_update_exp = 'SET timebucket = :t, source_bucket = :o, dest_bucket = :r ADD objects :a, size :c, elapsed :d'
        # -------------------------------------------------------------
        # push the first attr: s3Object
        stat_exp_attrs[':a'] = {'N': '1'}
        stat_exp_attrs[':c'] = {'N': Size}
        stat_exp_attrs[':d'] = {'N': ET}
        stat_exp_attrs[':t'] = {'S': timebucket}
        stat_exp_attrs[':o'] = {'S': Src}
        stat_exp_attrs[':r'] = {'S': Dst}
        
        # Update the DDB table
        try:
            response = client['ddb']['handle'].update_item(
                TableName=stattable,
                Key={'OriginReplicaBucket': {'S': statbucket}},
                UpdateExpression=stat_update_exp,
                ExpressionAttributeValues=stat_exp_attrs)
        except Exception as e:
            print(e)
            print('Table ' + stattable + ' update failed')
            raise e

        # Initialize a counter for failed replications for the source bucket
        if not Src in initfail:
            initfail[Src] = 'foo'
        if Dst != 'FAILED' and initfail[Src] != timebucket:
            print('Initializing FAILED bucket for ' + Src + ':' + timebucket)
            statbucket = Src + ':FAILED:' + timebucket
            stat_exp_attrs = {}
            # -------------------------------------------------------------
            # Build the DDB UpdateExpression
            stat_update_exp = 'SET timebucket = :t, source_bucket = :o, dest_bucket = :r ADD objects :a, size :c, elapsed :d'
            # -------------------------------------------------------------
            # push the first attr: s3Object
            stat_exp_attrs[':a'] = {'N': '0'}
            stat_exp_attrs[':c'] = {'N': '1'}
            stat_exp_attrs[':d'] = {'N': '1'}
            stat_exp_attrs[':t'] = {'S': timebucket}
            stat_exp_attrs[':o'] = {'S': Src}
            stat_exp_attrs[':r'] = {'S': 'FAILED'}

            try:
                response = client['ddb']['handle'].update_item(
                    TableName=stattable,
                    Key={'OriginReplicaBucket': {'S': statbucket }},
                    UpdateExpression=stat_update_exp,
                    ExpressionAttributeValues=stat_exp_attrs)
                initfail[Src] = timebucket
            except Exception as e:
                print(e)
                print('Table ' + stattable + ' update failed')
                raise e

        #print('Stats written to ' + statbucket)

    # So this will work with CloudWatch Events directly or via SNS, let's look
    #   at the structure of the incoming JSON. Note that this has not been
    #   tested with CloudWatch events directly, but should be a simple matter.
    #   I kept the code here as it adds no overhead but is a solid flexible
    #   example.
    #
    # A Cloudwatch Event looks like event[event json]
    # An SNS notification looks like event['Records'][0][event json]
    # print("Received raw event: " + json.dumps(event, indent=2))

    # Create a reference in evdata that points to the correct element in the
    #   event dictionary
    if 'detail-type' in event:
        evdata = event
    elif 'Records' in event:
        # An SNS notification will have another layer in the dict. Look for
        #   EventSource = aws:sns. Otherwise generate an exception and get out.
        if event['Records'][0]['EventSource'] == 'aws:sns':
            #print('Message is ' + event['Records'][0]['Sns']['Message'])
            evdata = json.loads(event['Records'][0]['Sns']['Message'])
            #print("Message event: " + json.dumps(evdata, indent=2))

        else:
            # Unrecognized event format: uncomment print statements to
            #    identify the format and enhance this logic. At the end of
            #    the day, evdata must contain the dict for the event record
            #    of the Cloudwatch log event for the S3 update notification
            print('Error: unrecognized event format received')
            raise Exception('Unrecognized event format')

    elif 'MessageId' in event:
        evdata = json.loads(event['Message'])
    else:
        evdata = event

    if DEBUG > 1:
        print(json.dumps(evdata))

    #-----------------------------------------------------------------
    # Quietly ignore all but PutObject
    #
    if evdata['detail']['eventName'] != 'PutObject':
        if DEBUG > 0:
            print('Ignoring ' + evdata['detail']['eventName'] + ' event')
        return

    #-----------------------------------------------------------------
    #
    # Collect the data we want for the DynamoDB table
    #
    region = evdata['region']
    bucket = evdata['detail']['requestParameters']['bucketName']
    key = evdata['detail']['requestParameters']['key']

    # This timestamp is from the CW Event record and is most accurate
    now = evdata['detail']['eventTime']

    # Init a dict to use to hold our attrs for DDB
    ddb_exp_attrs = {}
    # Build th e DDB UpdateExpression
    ddb_update_exp = 'set s3Object = :a'
    # push the first attr: s3Object
    ddb_exp_attrs[':a'] = {'S': key}


    # establish s3 client per region, but only once.
    if not region in s3client:
        s3client[region] = boto3.client('s3', region)

    # -----------------------------------------------------------------
    # Do a head_object. If the object no longer exists just return.
    #
    try:
        response = s3client[region].head_object(
            Bucket=bucket,
            Key=key
            )
    except ClientError as e:
        #  {  "Error": {
        #         "Code": "403",
        #         "Message": "Forbidden"
        #     },
        #     "ResponseMetadata": {
        #         "RequestId": "B7C8873E3C067128",
        #         "HostId": "kYARs5PKMuah57ewyzYq6l5laO4xu9fcWFYVnEPLMHeqNSF4yLhrYIhbbUT0Tw7hp3f2PgCQO9E=",
        #         "HTTPStatusCode": 403,
        #         "HTTPHeaders": {
        #             "x-amz-request-id": "B7C8873E3C067128",
        #             "x-amz-id-2": "kYARs5PKMuah57ewyzYq6l5laO4xu9fcWFYVnEPLMHeqNSF4yLhrYIhbbUT0Tw7hp3f2PgCQO9E=",
        #             "content-type": "application/xml",
        #             "transfer-encoding": "chunked",
        #             "date": "Tue, 25 Sep 2018 11:58:48 GMT",
        #             "server": "AmazonS3"
        #         },
        #         "RetryAttempts": 0
        #     }
        #   }

        if e.response['Error']['Code'] == '403':
            print('IGNORING: CRRMonitor does not have access to Object - ' + \
                evdata['detail']['requestParameters']['bucketName'] + '/' + \
                evdata['detail']['requestParameters']['key'])
        elif e.response['Error']['Code'] == '404':
            print('IGNORING: Object no longer exists - ' + \
                evdata['detail']['requestParameters']['bucketName'] + '/' + \
                evdata['detail']['requestParameters']['key'])

        else:
            # Need to improve this to recognize specifically a 404
            print('Unhandled ClientError ' + str(e))
            print(json.dumps(e.response))

        #print('Removing from queue / ignoring')
        return

    except Exception as e:
        # Need to improve this to recognize specifically a 404
        print('Unandled Exception ' + str(e))
        print('Removing from queue / ignoring')
        return


    # 2) check that the x-amz-replication-status header is present
    #    response['ResponseMetadata']['HTTPHeaders']['x-amz-replication-status']
    #
    # Note that this function is only called when an object is written. Assume that
    #   the object was written and the x-amz-replication-status is a final status for
    #   this object in this bucket. So, if it is the source it can be COMPLETED, PENDING,
    #   or FAILED. If it is the replica it can only be REPLICA.
    #
    # That in mind, the update date/time for the REPLICA will always be definitive for
    #   the end_datetime column
    #
    # Conversely, the source object is always definitive for the start_datetime.
    #
    # Code must not assume that the events (source and dest) are processed in the correct
    #   order. Any process consuming the DynamoDB table should do their own Elapsed Time
    #   calculation.
    #
    # Reference the dict we want for clarity in the code
    headers = response['ResponseMetadata']['HTTPHeaders']

    # If this object has no x-amz-replication-status header then we can leave
    if 'x-amz-replication-status' not in headers:
        # This is not a replicated object - get out
        if DEBUG > 0:
            print('Not a replicated object')
        return()

    # repstatus is a pointer to the headers (for code clarity)
    repstatus = headers['x-amz-replication-status']

    # -----------------------------------------------------------------
    # Verify that the DynamoDB table exists. Note: we could create it
    #  but that takes so long that the lambda function may time out.
    #  Better to create it in the CFn template and handle this as a
    #  failure condition
    #
    try:
        response = client['ddb']['handle'].describe_table(
            TableName=ddbtable
        )
    except Exception as e:
        print(e)
        print('Table ' + ddbtable + ' does not exist - need to create it')
        raise e

    # Update object size
    objsize = headers['content-length']
    ddb_update_exp += ', ObjectSize = :s'
    ddb_exp_attrs[':s'] = {'N': objsize}

    ETag = {'S': headers['etag'][1:-1] + ':' + headers['x-amz-version-id'][1:-1]}

    # -----------------------------------------------------------------
    # If the object already has a DDB record get it
    #
    ddbdata = client['ddb']['handle'].get_item(
        TableName=ddbtable,
        Key={'ETag': ETag},
        ConsistentRead=True
        )

    ddbitem = {} # reset the dict
    if 'Item' in ddbdata:
        ddbitem = ddbdata['Item']
        if DEBUG > 4:
            print("DDB record: " + json.dumps(ddbitem, indent=2))

    #
    # Is this a REPLICA? Use timestamp as completion time
    #
    # Note: replica only updates s3Replica, replication_status, and end_datetime.
    #
    # We do this so we don't have to handle conditional update of fields that might get
    # stepped on of the events are processed out of order.
    #
    if repstatus == 'REPLICA':
        # print('Processing a REPLICA object: ' + ETag['S'])
        ddb_update_exp += ', s3Replica = :d'
        ddb_exp_attrs[':d'] = {'S': bucket}
        #print('s3Replica: ' + bucket)

        ddb_update_exp += ', end_datetime = :e'
        ddb_exp_attrs[':e'] = {'S': now} # 'now' is from the event data
        #print('end_datetime: ' + now)

        # Set the ttl
        purge = datetime.strptime(now, timefmt) - timedelta(hours=purge_thresh) # datetime object
        ttl = purge.strftime('%s')
        ddb_update_exp += ', itemttl = :p'
        ddb_exp_attrs[':p'] = {'N': ttl}

        # If this is a replica then status is COMPLETE
        ddb_update_exp += ', replication_status = :b'
        ddb_exp_attrs[':b'] = {'S': 'COMPLETED'}
        #print('replication_status: COMPLETED (implied)')

        if 'start_datetime' in ddbitem and 'crr_rate' not in ddbitem:
            etime = datetime.strptime(now, timefmt) - datetime.strptime(ddbitem['start_datetime']['S'], timefmt)
            etimesecs = (etime.days * 24 * 60 * 60) + etime.seconds
            #print("Calculate elapsed time in seconds")
            crr_rate = int(objsize) * 8 / (etimesecs + 1) # Add 1 to prevent /0 errors
            ddb_update_exp += ', crr_rate = :r'
            ddb_exp_attrs[':r'] = {'N': str(crr_rate)}
            #print('crr_rate: ', crr_rate)

            ddb_update_exp += ', elapsed = :t'
            ddb_exp_attrs[':t'] = {'N': str(etimesecs)}
            #print('elapsed: ', etimesecs)
            log_statistics(
                ddbitem['s3Origin']['S'],
                bucket,
                ddbitem['start_datetime']['S'],
                objsize,
                str(etimesecs),
                300)
    # -----------------------------------------------------------------
    # Or is this a SOURCE? Use timestamp as replication start time
    #
    else:

        ddb_update_exp += ', s3Origin = :f'
        ddb_exp_attrs[':f'] = {'S': bucket}

        # If this is not a replica then do not report status. It's not important and
        # makes the DynamoDB update much more complicated. Just get the start time
        #
        # We also do not care what the status is. If it has a FAILED status we could
        # write code to send a notification, but that's outside our scope.
        if repstatus == 'COMPLETED' or repstatus == 'FAILED' or repstatus == 'PENDING':
            # print('Processing a ORIGINAL object: ' + ETag['S'] + ' status: ' + repstatus)
            ddb_update_exp += ', start_datetime = :g'
            ddb_exp_attrs[':g'] = {'S': now}
            # ---------------------------------------------------------
            # If we already got the replica event...
            #
            if 'end_datetime' in ddbitem and 'crr_rate' not in ddbitem:
                etime = datetime.strptime(ddbitem['end_datetime']['S'], timefmt) - datetime.strptime(now, timefmt)
                etimesecs = (etime.days * 24 * 60 * 60) + etime.seconds
                #print("Calculate elapsed time in seconds")
                crr_rate = int(objsize) * 8 / (etimesecs + 1) # Add 1 to prevent /0 errors
                ddb_update_exp += ', crr_rate = :r'
                ddb_exp_attrs[':r'] = {'N': str(crr_rate)}

                # Set the ttl
                purge = datetime.strptime(ddbitem['end_datetime']['S'], timefmt) - timedelta(hours=purge_thresh) # datetime object
                ttl = purge.strftime('%s')
                ddb_update_exp += ', itemttl = :p'
                ddb_exp_attrs[':p'] = {'N': ttl}

                ddb_update_exp += ', elapsed = :t'
                ddb_exp_attrs[':t'] = {'N': str(etimesecs)}

                log_statistics(
                    bucket,ddbitem['s3Replica']['S'], 
                    ddbitem['end_datetime']['S'], 
                    objsize,
                    str(etimesecs),300)
            # ---------------------------------------------------------
            # We did not yet get the replica event
            #
            else:
                if repstatus == 'FAILED':
                    # If replication failed this is the only time we will see this object.
                    # Update the status to FAILED
                    ddb_update_exp += ', replication_status = :b'
                    ddb_exp_attrs[':b'] = {'S': 'FAILED'}
                    log_statistics(
                        bucket,
                        'FAILED',
                        now,
                        '0',
                        '1',
                        300)

        else:
            print('Unknown Replication Status: ' + repstatus)
            raise Exception('Unknown Replication Status')



# Create a record in the DDB table
    try:
        response = client['ddb']['handle'].update_item(
            TableName=ddbtable,
            Key={'ETag': ETag},
            UpdateExpression=ddb_update_exp,
            ExpressionAttributeValues=ddb_exp_attrs)
    except Exception as e:
        print(e)
        print('Table ' + ddbtable + ' update failed')
        raise e