in source/CRRMonitorHousekeeping/CRRMonitorHousekeeping.py [0:0]
def lambda_handler(event, context):
# -----------------------------------------------------------------
# save items in S3 - save items
#
def save_item(item):
print('Save Item' + json.dumps(item))
try:
response = client['firehose']['handle'].put_record(
DeliveryStreamName=kinesisfirestream,
Record={
'Data': json.dumps(item) + '\n'
}
)
except Exception as e:
print(e)
print('Error saving ' + item['ETag']['S'] + ' from ' + ddbtable)
raise e
# -----------------------------------------------------------------
# -----------------------------------------------------------------
# post_stats - post statistics to CloudWatch
#
def post_stats(item):
print('Posting statistics to CloudWatch for ' + item['source_bucket']['S'] + ' time bucket ' + item['timebucket']['S'])
ts=item['timebucket']['S']
# -------------------------------------------------------------
# Special Handling: Failed replicatons are reported in the
# same data format. The destination bucket will be FAILED.
# Pull these out separately to a different CW metric.
if item['dest_bucket']['S'] == 'FAILED':
try:
client['cw']['handle'].put_metric_data(
Namespace='CRRMonitor',
MetricData=[
{
'MetricName': 'FailedReplications',
'Dimensions': [
{
'Name': 'SourceBucket',
'Value': item['source_bucket']['S']
}
],
'Timestamp': ts,
'Value': int(item['objects']['N'])
},
]
)
except Exception as e:
print(e)
print('Error creating CloudWatch metric FailedReplications')
raise e
else:
try:
client['cw']['handle'].put_metric_data(
Namespace='CRRMonitor',
MetricData=[
{
'MetricName': 'ReplicationObjects',
'Dimensions': [
{
'Name': 'SourceBucket',
'Value': item['source_bucket']['S']
},
{
'Name': 'DestBucket',
'Value': item['dest_bucket']['S']
}
],
'Timestamp': ts,
'Value': int(item['objects']['N'])
},
]
)
except Exception as e:
print(e)
print('Error creating CloudWatch metric')
raise e
try:
client['cw']['handle'].put_metric_data(
Namespace='CRRMonitor',
MetricData=[
{
'MetricName': 'ReplicationSpeed',
'Dimensions': [
{
'Name': 'SourceBucket',
'Value': item['source_bucket']['S']
},
{
'Name': 'DestBucket',
'Value': item['dest_bucket']['S']
}
],
'Timestamp': ts,
'Value': ((int(item['size']['N'])*8)/1024)/(int(item['elapsed']['N'])+1)
},
]
)
except Exception as e:
print(e)
print('Error creating CloudWatch metric')
raise e
print ('Statistics posted to ' + ts)
try:
client['ddb']['handle'].delete_item(
TableName=stattable,
Key={
'OriginReplicaBucket': {
'S': item['source_bucket']['S'] + ':' + item['dest_bucket']['S'] + ':' + ts
}
}
)
print('Purged statistics date for ' + ts)
except Exception as e:
print(e)
print('Error purging from ' + ts)
raise e
#======================== post_stats ==============================
#==================================================================
# firehose: retrieve all records completed in the last 5 minutes
# Stream them to firehose
def firehose(ts):
begts=ts - timedelta(minutes=5)
arch_beg = begts.strftime(timefmt)
arch_end = ts.strftime(timefmt)
# Set scan filter attrs
eav = {
":archbeg": { "S": arch_beg },
":archend": { "S": arch_end }
}
print('Reading from ' + ddbtable)
try:
response = client['ddb']['handle'].scan(
TableName=ddbtable,
ExpressionAttributeValues=eav,
FilterExpression="end_datetime >= :archbeg and end_datetime < :archend",
Limit=1000
)
except Exception as e:
print(e)
print('Table ' + ddbtable + ' scan failed')
raise e
print('Archiving items from ' + ddbtable + ' beg>=' + arch_beg + ' end=' + arch_end)
for i in response['Items']:
save_item(i)
while 'LastEvaluatedKey' in response:
response = client['ddb']['handle'].scan(
TableName=ddbtable,
FilterExpression="end_datetime >= :archbeg and end_datetime < :archend",
ExpressionAttributeValues=eav,
ExclusiveStartKey=response['LastEvaluatedKey'],
Limit=1000
)
for i in response['Items']:
print('Items LastEvaluated ' + i['ETag']['S'])
save_item(i)
#====================== firehose ==================================
# What time is it?
ts = datetime.utcnow()
# CRRMonitor logs forward (rounds up). We want to read from the last bucket,
# not the current on. So round down to the previous 5 min interval
secs = (ts.replace(tzinfo=None) - ts.min).seconds
rounding = (secs-roundTo/2) // roundTo * roundTo
ts = ts + timedelta(0,rounding-secs,-ts.microsecond)
# save the timestamp we created in a str
statbucket = datetime.strftime(ts, timefmt) # We'll get stats from this bucket
print('Logging from ' + statbucket)
# -----------------------------------------------------------------
# Process Statistics
#
# Get the name of the 5 minute stat bucket that we just stopped
# logging to, read the data, and delete the record.
#
try:
client['ddb']['handle'].describe_table(
TableName = stattable
)
except Exception as e:
print(e)
print('Table ' + stattable + ' does not exist - need to create it')
raise e
eav = {
":stats": { "S": statbucket }
}
try:
response = client['ddb']['handle'].scan(
TableName=stattable,
ExpressionAttributeValues=eav,
FilterExpression="timebucket <= :stats",
ConsistentRead=True
)
except Exception as e:
print(e)
print('Table ' + ddbtable + ' scan failed')
raise e
if len(response['Items']) == 0:
print('WARNING: No stats bucket found for ' + statbucket)
for i in response['Items']:
post_stats(i)
while 'LastEvaluatedKey' in response:
try:
response = client['ddb']['handle'].scan(
TableName=ddbtable,
FilterExpression="timebucket <= :stats",
ExpressionAttributeValues=eav,
ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=True
)
except Exception as e:
print(e)
print('Table ' + ddbtable + ' scan failed')
raise e
for i in response['Items']:
post_stats(i)
# Archive to firehose
if stream_to_kinesis == 'Yes':
firehose(ts)