in build_and_train_models/sm-object_detection_birds/tools/birdsOnEdge.py [0:0]
def greengrass_infinite_infer_run():
"""Entry point of the lambda function"""
client.publish(topic=iot_topic, payload="Start of run loop...")
try:
# This object detection model is implemented as single shot detector (ssd), since
# the number of labels is small we create a dictionary that will help us convert
# the machine labels to human readable labels.
model_type = "ssd"
output_map = {}
with open("classes.txt") as f:
for line in f:
(key, val) = line.split()
output_map[int(key)] = val
client.publish(topic=iot_topic, payload="Classes to be detected: " + str(output_map))
# Create a local display instance that will dump the image bytes to a FIFO
# file that the image can be rendered locally.
local_display = LocalDisplay("480p")
local_display.start()
# The height and width of the training set images
input_height = 512
input_width = 512
# Load the model onto the GPU.
# optimize the model
client.publish(topic=iot_topic, payload="Optimizing model...")
ret, model_path = mo.optimize("deploy_ssd_resnet50_512", input_width, input_height)
# load the model
client.publish(topic=iot_topic, payload="Loading model...")
model = awscam.Model(model_path, {"GPU": 1})
client.publish(topic=iot_topic, payload="Custom object detection model loaded")
# Set the threshold for detection
detection_threshold = 0.40
# Do inference until the lambda is killed.
while True:
# Get a frame from the video stream
ret, frame = awscam.getLastFrame()
if not ret:
raise Exception("Failed to get frame from the stream")
# Resize frame to the same size as the training set.
frame_resize = cv2.resize(frame, (input_height, input_width))
# Run the images through the inference engine and parse the results using
# the parser API. Note it is possible to get the output of doInference
# and do the parsing manually, but since it is a ssd model,
# a simple API is provided.
client.publish(topic=iot_topic, payload="Calling inference on next frame...")
parsed_inference_results = model.parseResult(
model_type, model.doInference(frame_resize)
)
# Compute the scale in order to draw bounding boxes on the full resolution
# image.
yscale = float(frame.shape[0]) / float(input_height)
xscale = float(frame.shape[1]) / float(input_width)
# Dictionary to be filled with labels and probabilities for MQTT
cloud_output = {}
# Get the detected objects and probabilities
default_color = (255, 165, 20)
i = 0
for obj in parsed_inference_results[model_type]:
if obj["prob"] > detection_threshold:
# Add bounding boxes to full resolution frame
xmin = int(xscale * obj["xmin"])
ymin = int(yscale * obj["ymin"])
xmax = int(xscale * obj["xmax"])
ymax = int(yscale * obj["ymax"])
# See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html
# for more information about the cv2.rectangle method.
# Method signature: image, point1, point2, color, and tickness.
cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), default_color, 10)
# Amount to offset the label/probability text above the bounding box.
text_offset = 15
# See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html
# for more information about the cv2.putText method.
# Method signature: image, text, origin, font face, font scale, color,
# and thickness
cv2.putText(
frame,
"{}: {:.2f}%".format(output_map[obj["label"]], obj["prob"] * 100),
(xmin, ymin - text_offset),
cv2.FONT_HERSHEY_SIMPLEX,
2.5,
(255, 165, 20),
6,
)
# Store label and probability to send to cloud
cloud_output[output_map[obj["label"]]] = obj["prob"]
# Set the next frame in the local display stream.
local_display.set_frame_data(frame)
# Send results to the cloud
client.publish(topic=iot_topic, payload=json.dumps(cloud_output))
except Exception as ex:
client.publish(topic=iot_topic, payload="Error in object detection lambda: {}".format(ex))