in source/CRRMonitor/CRRMonitor.py [0:0]
def message_handler(event):
def log_statistics(Src, Dst, Tstamp, Size, ET, roundTo):
# -------------------------------------------------------------
# Derive the statistic bucket from source/dest and time bucket
# (5 minute rolling window)
#
statbucket = Src + ':' + Dst
ts = datetime.strptime(Tstamp, timefmt)
secs = (ts.replace(tzinfo=None) - ts.min).seconds
rounding = (secs+roundTo/2) // roundTo * roundTo
ts = ts + timedelta(0, rounding-secs, -ts.microsecond)
timebucket = datetime.strftime(ts, timefmt)
statbucket += ':' + timebucket
# -------------------------------------------------------------
# Init a dict to use to hold our attrs for DDB
stat_exp_attrs = {}
# -------------------------------------------------------------
# Build the DDB UpdateExpression
stat_update_exp = 'SET timebucket = :t, source_bucket = :o, dest_bucket = :r ADD objects :a, size :c, elapsed :d'
# -------------------------------------------------------------
# push the first attr: s3Object
stat_exp_attrs[':a'] = {'N': '1'}
stat_exp_attrs[':c'] = {'N': Size}
stat_exp_attrs[':d'] = {'N': ET}
stat_exp_attrs[':t'] = {'S': timebucket}
stat_exp_attrs[':o'] = {'S': Src}
stat_exp_attrs[':r'] = {'S': Dst}
# Update the DDB table
try:
response = client['ddb']['handle'].update_item(
TableName=stattable,
Key={'OriginReplicaBucket': {'S': statbucket}},
UpdateExpression=stat_update_exp,
ExpressionAttributeValues=stat_exp_attrs)
except Exception as e:
print(e)
print('Table ' + stattable + ' update failed')
raise e
# Initialize a counter for failed replications for the source bucket
if not Src in initfail:
initfail[Src] = 'foo'
if Dst != 'FAILED' and initfail[Src] != timebucket:
print('Initializing FAILED bucket for ' + Src + ':' + timebucket)
statbucket = Src + ':FAILED:' + timebucket
stat_exp_attrs = {}
# -------------------------------------------------------------
# Build the DDB UpdateExpression
stat_update_exp = 'SET timebucket = :t, source_bucket = :o, dest_bucket = :r ADD objects :a, size :c, elapsed :d'
# -------------------------------------------------------------
# push the first attr: s3Object
stat_exp_attrs[':a'] = {'N': '0'}
stat_exp_attrs[':c'] = {'N': '1'}
stat_exp_attrs[':d'] = {'N': '1'}
stat_exp_attrs[':t'] = {'S': timebucket}
stat_exp_attrs[':o'] = {'S': Src}
stat_exp_attrs[':r'] = {'S': 'FAILED'}
try:
response = client['ddb']['handle'].update_item(
TableName=stattable,
Key={'OriginReplicaBucket': {'S': statbucket }},
UpdateExpression=stat_update_exp,
ExpressionAttributeValues=stat_exp_attrs)
initfail[Src] = timebucket
except Exception as e:
print(e)
print('Table ' + stattable + ' update failed')
raise e
#print('Stats written to ' + statbucket)
# So this will work with CloudWatch Events directly or via SNS, let's look
# at the structure of the incoming JSON. Note that this has not been
# tested with CloudWatch events directly, but should be a simple matter.
# I kept the code here as it adds no overhead but is a solid flexible
# example.
#
# A Cloudwatch Event looks like event[event json]
# An SNS notification looks like event['Records'][0][event json]
# print("Received raw event: " + json.dumps(event, indent=2))
# Create a reference in evdata that points to the correct element in the
# event dictionary
if 'detail-type' in event:
evdata = event
elif 'Records' in event:
# An SNS notification will have another layer in the dict. Look for
# EventSource = aws:sns. Otherwise generate an exception and get out.
if event['Records'][0]['EventSource'] == 'aws:sns':
#print('Message is ' + event['Records'][0]['Sns']['Message'])
evdata = json.loads(event['Records'][0]['Sns']['Message'])
#print("Message event: " + json.dumps(evdata, indent=2))
else:
# Unrecognized event format: uncomment print statements to
# identify the format and enhance this logic. At the end of
# the day, evdata must contain the dict for the event record
# of the Cloudwatch log event for the S3 update notification
print('Error: unrecognized event format received')
raise Exception('Unrecognized event format')
elif 'MessageId' in event:
evdata = json.loads(event['Message'])
else:
evdata = event
if DEBUG > 1:
print(json.dumps(evdata))
#-----------------------------------------------------------------
# Quietly ignore all but PutObject
#
if evdata['detail']['eventName'] != 'PutObject':
if DEBUG > 0:
print('Ignoring ' + evdata['detail']['eventName'] + ' event')
return
#-----------------------------------------------------------------
#
# Collect the data we want for the DynamoDB table
#
region = evdata['region']
bucket = evdata['detail']['requestParameters']['bucketName']
key = evdata['detail']['requestParameters']['key']
# This timestamp is from the CW Event record and is most accurate
now = evdata['detail']['eventTime']
# Init a dict to use to hold our attrs for DDB
ddb_exp_attrs = {}
# Build th e DDB UpdateExpression
ddb_update_exp = 'set s3Object = :a'
# push the first attr: s3Object
ddb_exp_attrs[':a'] = {'S': key}
# establish s3 client per region, but only once.
if not region in s3client:
s3client[region] = boto3.client('s3', region)
# -----------------------------------------------------------------
# Do a head_object. If the object no longer exists just return.
#
try:
response = s3client[region].head_object(
Bucket=bucket,
Key=key
)
except ClientError as e:
# { "Error": {
# "Code": "403",
# "Message": "Forbidden"
# },
# "ResponseMetadata": {
# "RequestId": "B7C8873E3C067128",
# "HostId": "kYARs5PKMuah57ewyzYq6l5laO4xu9fcWFYVnEPLMHeqNSF4yLhrYIhbbUT0Tw7hp3f2PgCQO9E=",
# "HTTPStatusCode": 403,
# "HTTPHeaders": {
# "x-amz-request-id": "B7C8873E3C067128",
# "x-amz-id-2": "kYARs5PKMuah57ewyzYq6l5laO4xu9fcWFYVnEPLMHeqNSF4yLhrYIhbbUT0Tw7hp3f2PgCQO9E=",
# "content-type": "application/xml",
# "transfer-encoding": "chunked",
# "date": "Tue, 25 Sep 2018 11:58:48 GMT",
# "server": "AmazonS3"
# },
# "RetryAttempts": 0
# }
# }
if e.response['Error']['Code'] == '403':
print('IGNORING: CRRMonitor does not have access to Object - ' + \
evdata['detail']['requestParameters']['bucketName'] + '/' + \
evdata['detail']['requestParameters']['key'])
elif e.response['Error']['Code'] == '404':
print('IGNORING: Object no longer exists - ' + \
evdata['detail']['requestParameters']['bucketName'] + '/' + \
evdata['detail']['requestParameters']['key'])
else:
# Need to improve this to recognize specifically a 404
print('Unhandled ClientError ' + str(e))
print(json.dumps(e.response))
#print('Removing from queue / ignoring')
return
except Exception as e:
# Need to improve this to recognize specifically a 404
print('Unandled Exception ' + str(e))
print('Removing from queue / ignoring')
return
# 2) check that the x-amz-replication-status header is present
# response['ResponseMetadata']['HTTPHeaders']['x-amz-replication-status']
#
# Note that this function is only called when an object is written. Assume that
# the object was written and the x-amz-replication-status is a final status for
# this object in this bucket. So, if it is the source it can be COMPLETED, PENDING,
# or FAILED. If it is the replica it can only be REPLICA.
#
# That in mind, the update date/time for the REPLICA will always be definitive for
# the end_datetime column
#
# Conversely, the source object is always definitive for the start_datetime.
#
# Code must not assume that the events (source and dest) are processed in the correct
# order. Any process consuming the DynamoDB table should do their own Elapsed Time
# calculation.
#
# Reference the dict we want for clarity in the code
headers = response['ResponseMetadata']['HTTPHeaders']
# If this object has no x-amz-replication-status header then we can leave
if 'x-amz-replication-status' not in headers:
# This is not a replicated object - get out
if DEBUG > 0:
print('Not a replicated object')
return()
# repstatus is a pointer to the headers (for code clarity)
repstatus = headers['x-amz-replication-status']
# -----------------------------------------------------------------
# Verify that the DynamoDB table exists. Note: we could create it
# but that takes so long that the lambda function may time out.
# Better to create it in the CFn template and handle this as a
# failure condition
#
try:
response = client['ddb']['handle'].describe_table(
TableName=ddbtable
)
except Exception as e:
print(e)
print('Table ' + ddbtable + ' does not exist - need to create it')
raise e
# Update object size
objsize = headers['content-length']
ddb_update_exp += ', ObjectSize = :s'
ddb_exp_attrs[':s'] = {'N': objsize}
ETag = {'S': headers['etag'][1:-1] + ':' + headers['x-amz-version-id'][1:-1]}
# -----------------------------------------------------------------
# If the object already has a DDB record get it
#
ddbdata = client['ddb']['handle'].get_item(
TableName=ddbtable,
Key={'ETag': ETag},
ConsistentRead=True
)
ddbitem = {} # reset the dict
if 'Item' in ddbdata:
ddbitem = ddbdata['Item']
if DEBUG > 4:
print("DDB record: " + json.dumps(ddbitem, indent=2))
#
# Is this a REPLICA? Use timestamp as completion time
#
# Note: replica only updates s3Replica, replication_status, and end_datetime.
#
# We do this so we don't have to handle conditional update of fields that might get
# stepped on of the events are processed out of order.
#
if repstatus == 'REPLICA':
# print('Processing a REPLICA object: ' + ETag['S'])
ddb_update_exp += ', s3Replica = :d'
ddb_exp_attrs[':d'] = {'S': bucket}
#print('s3Replica: ' + bucket)
ddb_update_exp += ', end_datetime = :e'
ddb_exp_attrs[':e'] = {'S': now} # 'now' is from the event data
#print('end_datetime: ' + now)
# Set the ttl
purge = datetime.strptime(now, timefmt) - timedelta(hours=purge_thresh) # datetime object
ttl = purge.strftime('%s')
ddb_update_exp += ', itemttl = :p'
ddb_exp_attrs[':p'] = {'N': ttl}
# If this is a replica then status is COMPLETE
ddb_update_exp += ', replication_status = :b'
ddb_exp_attrs[':b'] = {'S': 'COMPLETED'}
#print('replication_status: COMPLETED (implied)')
if 'start_datetime' in ddbitem and 'crr_rate' not in ddbitem:
etime = datetime.strptime(now, timefmt) - datetime.strptime(ddbitem['start_datetime']['S'], timefmt)
etimesecs = (etime.days * 24 * 60 * 60) + etime.seconds
#print("Calculate elapsed time in seconds")
crr_rate = int(objsize) * 8 / (etimesecs + 1) # Add 1 to prevent /0 errors
ddb_update_exp += ', crr_rate = :r'
ddb_exp_attrs[':r'] = {'N': str(crr_rate)}
#print('crr_rate: ', crr_rate)
ddb_update_exp += ', elapsed = :t'
ddb_exp_attrs[':t'] = {'N': str(etimesecs)}
#print('elapsed: ', etimesecs)
log_statistics(
ddbitem['s3Origin']['S'],
bucket,
ddbitem['start_datetime']['S'],
objsize,
str(etimesecs),
300)
# -----------------------------------------------------------------
# Or is this a SOURCE? Use timestamp as replication start time
#
else:
ddb_update_exp += ', s3Origin = :f'
ddb_exp_attrs[':f'] = {'S': bucket}
# If this is not a replica then do not report status. It's not important and
# makes the DynamoDB update much more complicated. Just get the start time
#
# We also do not care what the status is. If it has a FAILED status we could
# write code to send a notification, but that's outside our scope.
if repstatus == 'COMPLETED' or repstatus == 'FAILED' or repstatus == 'PENDING':
# print('Processing a ORIGINAL object: ' + ETag['S'] + ' status: ' + repstatus)
ddb_update_exp += ', start_datetime = :g'
ddb_exp_attrs[':g'] = {'S': now}
# ---------------------------------------------------------
# If we already got the replica event...
#
if 'end_datetime' in ddbitem and 'crr_rate' not in ddbitem:
etime = datetime.strptime(ddbitem['end_datetime']['S'], timefmt) - datetime.strptime(now, timefmt)
etimesecs = (etime.days * 24 * 60 * 60) + etime.seconds
#print("Calculate elapsed time in seconds")
crr_rate = int(objsize) * 8 / (etimesecs + 1) # Add 1 to prevent /0 errors
ddb_update_exp += ', crr_rate = :r'
ddb_exp_attrs[':r'] = {'N': str(crr_rate)}
# Set the ttl
purge = datetime.strptime(ddbitem['end_datetime']['S'], timefmt) - timedelta(hours=purge_thresh) # datetime object
ttl = purge.strftime('%s')
ddb_update_exp += ', itemttl = :p'
ddb_exp_attrs[':p'] = {'N': ttl}
ddb_update_exp += ', elapsed = :t'
ddb_exp_attrs[':t'] = {'N': str(etimesecs)}
log_statistics(
bucket,ddbitem['s3Replica']['S'],
ddbitem['end_datetime']['S'],
objsize,
str(etimesecs),300)
# ---------------------------------------------------------
# We did not yet get the replica event
#
else:
if repstatus == 'FAILED':
# If replication failed this is the only time we will see this object.
# Update the status to FAILED
ddb_update_exp += ', replication_status = :b'
ddb_exp_attrs[':b'] = {'S': 'FAILED'}
log_statistics(
bucket,
'FAILED',
now,
'0',
'1',
300)
else:
print('Unknown Replication Status: ' + repstatus)
raise Exception('Unknown Replication Status')
# Create a record in the DDB table
try:
response = client['ddb']['handle'].update_item(
TableName=ddbtable,
Key={'ETag': ETag},
UpdateExpression=ddb_update_exp,
ExpressionAttributeValues=ddb_exp_attrs)
except Exception as e:
print(e)
print('Table ' + ddbtable + ' update failed')
raise e