in lambda/imageprocessor/imageprocessor.py [0:0]
def process_image(event, context):
#Initialize clients
rekog_client = boto3.client('rekognition')
sns_client = boto3.client('sns')
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
#Load config
config = load_config()
s3_bucket = config["s3_bucket"]
s3_key_frames_root = config["s3_key_frames_root"]
ddb_table = dynamodb.Table(config["ddb_table"])
rekog_max_labels = config["rekog_max_labels"]
rekog_min_conf = float(config["rekog_min_conf"])
label_watch_list = config["label_watch_list"]
label_watch_min_conf = float(config["label_watch_min_conf"])
label_watch_phone_num = config.get("label_watch_phone_num", "")
label_watch_sns_topic_arn = config.get("label_watch_sns_topic_arn", "")
#Iterate on frames fetched from Kinesis
for record in event['Records']:
frame_package_b64 = record['kinesis']['data']
frame_package = pickle.loads(base64.b64decode(frame_package_b64))
img_bytes = frame_package["ImageBytes"]
approx_capture_ts = frame_package["ApproximateCaptureTime"]
frame_count = frame_package["FrameCount"]
now_ts = time.time()
frame_id = str(uuid.uuid4())
processed_timestamp = Decimal(now_ts)
approx_capture_timestamp = Decimal(approx_capture_ts)
now = convert_ts(now_ts, config)
year = now.strftime("%Y")
mon = now.strftime("%m")
day = now.strftime("%d")
hour = now.strftime("%H")
try:
rekog_response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
except Exception as e:
#Log error and ignore frame. You might want to add that frame to a dead-letter queue.
print(e)
return
#Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
labels_on_watch_list = []
for label in rekog_response['Labels']:
lbl = label['Name']
conf = label['Confidence']
label['OnWatchList'] = False
#Print labels and confidence to lambda console
print('{} .. conf %{:.2f}'.format(lbl, conf))
#Check label watch list and trigger action
if (lbl.upper() in (label.upper() for label in label_watch_list)
and conf >= label_watch_min_conf):
label['OnWatchList'] = True
labels_on_watch_list.append(deepcopy(label))
#Convert from float to decimal for DynamoDB
label['Confidence'] = Decimal(conf)
for instance in label['Instances']:
instance['BoundingBox']['Width'] = Decimal(instance['BoundingBox']['Width'])
instance['BoundingBox']['Height'] = Decimal(instance['BoundingBox']['Height'])
instance['BoundingBox']['Left'] = Decimal(instance['BoundingBox']['Left'])
instance['BoundingBox']['Top'] = Decimal(instance['BoundingBox']['Top'])
instance['Confidence'] = Decimal(instance['Confidence'])
#Send out notification(s), if needed
if len(labels_on_watch_list) > 0 \
and (label_watch_phone_num or label_watch_sns_topic_arn):
notification_txt = 'On {}...\n'.format(now.strftime('%x, %-I:%M %p %Z'))
for label in labels_on_watch_list:
notification_txt += '- "{}" was detected with {}% confidence.\n'.format(
label['Name'],
round(label['Confidence'], 2))
print(notification_txt)
if label_watch_phone_num:
sns_client.publish(PhoneNumber=label_watch_phone_num, Message=notification_txt)
if label_watch_sns_topic_arn:
resp = sns_client.publish(
TopicArn=label_watch_sns_topic_arn,
Message=json.dumps(
{
"message": notification_txt,
"labels": labels_on_watch_list
}
)
)
if resp.get("MessageId", ""):
print("Successfully published alert message to SNS.")
#Store frame image in S3
s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, frame_id)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=img_bytes
)
#Persist frame data in dynamodb
item = {
'frame_id': frame_id,
'processed_timestamp' : processed_timestamp,
'approx_capture_timestamp' : approx_capture_timestamp,
'rekog_labels' : rekog_response['Labels'],
'rekog_orientation_correction' :
rekog_response['OrientationCorrection']
if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
'processed_year_month' : year + mon, #To be used as a Hash Key for DynamoDB GSI
's3_bucket' : s3_bucket,
's3_key' : s3_key
}
ddb_table.put_item(Item=item)
print('Successfully processed {} records.'.format(len(event['Records'])))
return