def lambda_handler()

in source/CRRMonitorHousekeeping/CRRMonitorHousekeeping.py [0:0]


def lambda_handler(event, context):
    # -----------------------------------------------------------------
    # save items in S3 - save items
    #
    def save_item(item):
        print('Save Item' + json.dumps(item))
        try:
            response = client['firehose']['handle'].put_record(
                DeliveryStreamName=kinesisfirestream,
                Record={
                    'Data': json.dumps(item) + '\n'
                }
            )
        except Exception as e:
            print(e)
            print('Error saving ' + item['ETag']['S'] + ' from ' + ddbtable)
            raise e

            # -----------------------------------------------------------------

    # -----------------------------------------------------------------
    # post_stats - post statistics to CloudWatch
    #
    def post_stats(item):
        print('Posting statistics to CloudWatch for ' + item['source_bucket']['S'] + ' time bucket ' + item['timebucket']['S'])

        ts=item['timebucket']['S']

        # -------------------------------------------------------------
        # Special Handling: Failed replicatons are reported in the
        # same data format. The destination bucket will be FAILED.
        # Pull these out separately to a different CW metric.
        if item['dest_bucket']['S'] == 'FAILED':
            try:
                client['cw']['handle'].put_metric_data(
                    Namespace='CRRMonitor',
                    MetricData=[
                        {
                            'MetricName': 'FailedReplications',
                            'Dimensions': [
                                {
                                    'Name': 'SourceBucket',
                                    'Value': item['source_bucket']['S']
                                }
                            ],
                            'Timestamp': ts,
                            'Value': int(item['objects']['N'])
                        },
                    ]
                )
            except Exception as e:
                print(e)
                print('Error creating CloudWatch metric FailedReplications')
                raise e

        else:
            try:
                client['cw']['handle'].put_metric_data(
                    Namespace='CRRMonitor',
                    MetricData=[
                        {
                            'MetricName': 'ReplicationObjects',
                            'Dimensions': [
                                {
                                    'Name': 'SourceBucket',
                                    'Value': item['source_bucket']['S']
                                },
                                {
                                    'Name': 'DestBucket',
                                    'Value': item['dest_bucket']['S']
                                }
                            ],
                            'Timestamp': ts,
                            'Value': int(item['objects']['N'])
                        },
                    ]
                )
            except Exception as e:
                print(e)
                print('Error creating CloudWatch metric')
                raise e

            try:
                client['cw']['handle'].put_metric_data(
                    Namespace='CRRMonitor',
                    MetricData=[
                        {
                            'MetricName': 'ReplicationSpeed',
                            'Dimensions': [
                                {
                                    'Name': 'SourceBucket',
                                    'Value': item['source_bucket']['S']
                                },
                                {
                                    'Name': 'DestBucket',
                                    'Value': item['dest_bucket']['S']
                                }
                            ],
                            'Timestamp': ts,
                            'Value': ((int(item['size']['N'])*8)/1024)/(int(item['elapsed']['N'])+1)
                        },
                    ]
                )
            except Exception as e:
                print(e)
                print('Error creating CloudWatch metric')
                raise e

        print ('Statistics posted to ' + ts)

        try:
            client['ddb']['handle'].delete_item(
                TableName=stattable,
                Key={
                    'OriginReplicaBucket': {
                        'S': item['source_bucket']['S'] + ':' + item['dest_bucket']['S'] + ':' + ts
                    }
                }
            )
            print('Purged statistics date for ' + ts)
        except Exception as e:
            print(e)
            print('Error purging from ' + ts)
            raise e

    #======================== post_stats ==============================

    #==================================================================
    # firehose: retrieve all records completed in the last 5 minutes
    # Stream them to firehose
    def firehose(ts):
        begts=ts - timedelta(minutes=5)
        arch_beg = begts.strftime(timefmt)
        arch_end = ts.strftime(timefmt)
        # Set scan filter attrs
        eav = {
                ":archbeg": { "S": arch_beg },
                ":archend": { "S": arch_end }
            }

        print('Reading from ' + ddbtable)
        try:
            response = client['ddb']['handle'].scan(
                TableName=ddbtable,
                ExpressionAttributeValues=eav,
                FilterExpression="end_datetime >= :archbeg and end_datetime < :archend",
                Limit=1000
                )
        except Exception as e:
            print(e)
            print('Table ' + ddbtable + ' scan failed')
            raise e

        print('Archiving items from ' + ddbtable + ' beg>=' + arch_beg + ' end=' + arch_end)

        for i in response['Items']:
            save_item(i)

        while 'LastEvaluatedKey' in response:
            response = client['ddb']['handle'].scan(
                TableName=ddbtable,
                FilterExpression="end_datetime >= :archbeg and end_datetime < :archend",
                ExpressionAttributeValues=eav,
                ExclusiveStartKey=response['LastEvaluatedKey'],
                Limit=1000
                )

            for i in response['Items']:
                print('Items LastEvaluated ' + i['ETag']['S'])
                save_item(i)
    #====================== firehose ==================================

    # What time is it?
    ts = datetime.utcnow()

    # CRRMonitor logs forward (rounds up). We want to read from the last bucket,
    # not the current on. So round down to the previous 5 min interval
    secs = (ts.replace(tzinfo=None) - ts.min).seconds
    rounding = (secs-roundTo/2) // roundTo * roundTo
    ts = ts + timedelta(0,rounding-secs,-ts.microsecond)

    # save the timestamp we created in a str
    statbucket = datetime.strftime(ts, timefmt) # We'll get stats from this bucket
    print('Logging from ' + statbucket)

    # -----------------------------------------------------------------
    # Process Statistics
    #
    # Get the name of the 5 minute stat bucket that we just stopped
    #  logging to, read the data, and delete the record.
    #
    try:
        client['ddb']['handle'].describe_table(
            TableName = stattable
        )
    except Exception as e:
        print(e)
        print('Table ' + stattable + ' does not exist - need to create it')
        raise e

    eav = {
            ":stats": { "S": statbucket }
        }

    try:
        response = client['ddb']['handle'].scan(
            TableName=stattable,
            ExpressionAttributeValues=eav,
            FilterExpression="timebucket <= :stats",
            ConsistentRead=True
            )
    except Exception as e:
        print(e)
        print('Table ' + ddbtable + ' scan failed')
        raise e

    if len(response['Items']) == 0:
        print('WARNING: No stats bucket found for ' + statbucket)

    for i in response['Items']:
        post_stats(i)

    while 'LastEvaluatedKey' in response:
        try:
            response = client['ddb']['handle'].scan(
                TableName=ddbtable,
                FilterExpression="timebucket <= :stats",
                ExpressionAttributeValues=eav,
                ExclusiveStartKey=response['LastEvaluatedKey'],
                ConsistentRead=True
                )
        except Exception as e:
            print(e)
            print('Table ' + ddbtable + ' scan failed')
            raise e

        for i in response['Items']:
            post_stats(i)

    # Archive to firehose
    if stream_to_kinesis == 'Yes':
        firehose(ts)