scenarios/acsa_fault_detection/main.py (322 lines of code) (raw):

# Original code taken from: https://gist.github.com/raiever/df5b6c48217df521094cbe2d12c32c66 # import the necessary packages from flask import Response, Flask, render_template, jsonify import threading import argparse import datetime, time import cv2 import os import time import math from math import atan2 import numpy as np import json import sys # GLOBAL Variables OBJECT_AREA_MIN = 9000 OBJECT_AREA_MAX = 50000 LOW_H = 0 LOW_S = 0 LOW_V = 47 # Thresholding of an Image in a color range HIGH_H = 179 HIGH_S = 255 HIGH_V = 255 # Lower and upper value of color Range of the object # for color thresholding to detect the object LOWER_COLOR_RANGE = (0, 0, 0) UPPER_COLOR_RANGE = (174, 73, 255) COUNT_OBJECT = 0 HEIGHT_OF_OBJ = 0 WIDTH_OF_OBJ = 0 OBJECT_COUNT = "Object Number : {}".format(COUNT_OBJECT) source = "rtsp://localhost:554/stream" if 'RTSP_URL' in os.environ: source = os.environ.get('RTSP_URL') print("Incoming video feed read from RTSP_URL: ", source) """#acsa_storage = "/home/aksedge-user/" """ acsa_storage = "./acsa_storage/faultdata" if 'LOCAL_STORAGE' in os.environ: acsa_storage = os.environ.get('LOCAL_STORAGE') print("Storing video frames read from LOCAL_STORAGE: ", acsa_storage) # initialize the output frame and a lock used to ensure thread-safe # exchanges of the output frames (useful when multiple browsers/tabs are viewing the stream) outputFrame = None lock = threading.Lock() # initialize a flask object app = Flask(__name__) cap = cv2.VideoCapture(source) time.sleep(2.0) @app.route("/") def index(): # return the rendered template return render_template("index.html") def dimensions(box): """ Return the length and width of the object. :param box: consists of top left, right and bottom left, right co-ordinates :return: Length and width of the object """ (tl, tr, br, bl) = box x = int(math.sqrt(math.pow((bl[0] - tl[0]), 2) + math.pow((bl[1] - tl[1]), 2))) y = int(math.sqrt(math.pow((tl[0] - tr[0]), 2) + math.pow((tl[1] - tr[1]), 2))) if x > y: return x, y else: return y, x def flaw_detection(base_dir = acsa_storage): """ Measurement and defects such as color, crack and orientation of the object are found. :return: None """ global HEIGHT_OF_OBJ global WIDTH_OF_OBJ global COUNT_OBJECT global OBJ_DEFECT global FRAME_COUNT global OBJECT_COUNT while cap.isOpened(): # Read the frame from the stream ret, frame = cap.read() if not ret: break FRAME_COUNT += 1 # Check every given frame number # (Number chosen based on the frequency of object on conveyor belt) if FRAME_COUNT % frame_number == 0: HEIGHT_OF_OBJ = 0 WIDTH_OF_OBJ = 0 OBJ_DEFECT = [] data_base = [] # Convert BGR image to HSV color space img_hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Thresholding of an Image in a color range img_threshold = cv2.inRange(img_hsv, (LOW_H, LOW_S, LOW_V), (HIGH_H, HIGH_S, HIGH_V)) # Morphological opening(remove small objects from the foreground) img_threshold = cv2.erode(img_threshold, cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) img_threshold = cv2.dilate(img_threshold, cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) # Morphological closing(fill small holes in the foreground) img_threshold = cv2.dilate(img_threshold, cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) img_threshold = cv2.erode(img_threshold, cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) # Find the contours on the image contours, hierarchy = cv2.findContours(img_threshold, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) for cnt in contours: x, y, w, h = cv2.boundingRect(cnt) if OBJECT_AREA_MAX > w * h > OBJECT_AREA_MIN: box = cv2.minAreaRect(cnt) box = cv2.boxPoints(box) height, width = dimensions(np.array(box, dtype='int')) HEIGHT_OF_OBJ = round(height * one_pixel_length * 10, 2) WIDTH_OF_OBJ = round(width * one_pixel_length * 10, 2) COUNT_OBJECT += 1 OBJECT_COUNT = "Object Number : {}".format(COUNT_OBJECT) # Check for the orientation of the object frame, orientation_flag, orientation_defect = \ detect_orientation(frame, cnt) if orientation_flag: value = 1 data_base.append(value) OBJ_DEFECT.append(str(orientation_defect)) else: value = 0 data_base.append(value) # Check for the color defect of the object frame, color_flag, color_defect = detect_color(frame, cnt) if color_flag: value = 1 data_base.append(value) OBJ_DEFECT.append(str(color_defect)) else: value = 0 data_base.append(value) # Check for the crack defect of the object frame, crack_flag, crack_defect = detect_crack(frame, cnt) if crack_flag: value = 1 data_base.append(value) OBJ_DEFECT.append(str(crack_defect)) else: value = 0 data_base.append(value) # Check if none of the defect is found if not OBJ_DEFECT: value = 1 data_base.append(value) defect = "No Defect" OBJ_DEFECT.append(defect) print("No defect detected in object {}" .format(COUNT_OBJECT)) cv2.putText(frame, "Length (mm): {}".format(HEIGHT_OF_OBJ), (5, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Width (mm): {}".format(WIDTH_OF_OBJ), (5, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.imwrite("{}/Nodefect_{}.png".format( base_dir, COUNT_OBJECT), frame[y : y + h, x : x + w]) else: value = 0 data_base.append(value) print("Length (mm) = {}, width (mm) = {}".format( HEIGHT_OF_OBJ, WIDTH_OF_OBJ)) if not OBJ_DEFECT: continue all_defects = " ".join(OBJ_DEFECT) cv2.putText(frame, "Press q to quit", (410, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (255, 255, 255), 2) cv2.putText(frame, OBJECT_COUNT, (5, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Defect: {}".format(all_defects), (5, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Length (mm): {}".format(HEIGHT_OF_OBJ), (5, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Width (mm): {}".format(WIDTH_OF_OBJ), (5, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) ret, buffer = cv2.imencode('.png', frame) frame=buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') cv2.destroyAllWindows() cap.release() def get_orientation(contours): """ Gives the angle of the orientation of the object in radians. Step 1: Convert 3D matrix of contours to 2D. Step 2: Apply PCA algorithm to find angle of the data points. Step 3: If angle is greater than 0.5, return_flag is made to True else false. Step 4: Save the image in "Orientation" folder if it has a orientation defect. :param contours: contour of the object from the frame :return: angle of orientation of the object in radians """ size_points = len(contours) # data_pts stores contour values in 2D data_pts = np.empty((size_points, 2), dtype=np.float64) for i in range(data_pts.shape[0]): data_pts[i, 0] = contours[i, 0, 0] data_pts[i, 1] = contours[i, 0, 1] # Use PCA algorithm to find angle of the data points mean, eigenvector = cv2.PCACompute(data_pts, mean=None) angle = atan2(eigenvector[0, 1], eigenvector[0, 0]) return angle def detect_orientation(frame, contours, base_dir = acsa_storage): """ Identifies the Orientation of the object based on the detected angle. :param frame: Input frame from video :param contours: contour of the object from the frame :return: defect_flag, defect """ defect = "Orientation" global OBJECT_COUNT # Find the orientation of each contour angle = get_orientation(contours) # If angle is less than 0.5 then no orientation defect is present if angle < 0.5: defect_flag = False else: x, y, w, h = cv2.boundingRect(contours) print("Orientation defect detected in object {}".format(COUNT_OBJECT)) defect_flag = True cv2.imwrite("{}/Orientation_{}.png" .format(base_dir, COUNT_OBJECT), frame[y: y + h , x : x + w]) cv2.putText(frame, OBJECT_COUNT, (5, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Defect: {}".format(defect), (5, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Length (mm): {}".format(HEIGHT_OF_OBJ), (5, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Width (mm): {}".format(WIDTH_OF_OBJ), (5, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) return frame, defect_flag, defect def detect_color(frame, cnt, base_dir = acsa_storage): """ Identifies the color defect W.R.T the set default color of the object. Step 1: Increase the brightness of the image. Step 2: Convert the image to HSV Format. HSV color space gives more information about the colors of the image. It helps to identify distinct colors in the image. Step 3: Threshold the image based on the color using "inRange" function. Range of the color, which is considered as a defect for object, is passed as one of the argument to inRange function to create a mask. Step 4: Morphological opening and closing is done on the mask to remove noises and fill the gaps. Step 5: Find the contours on the mask image. Contours are filtered based on the area to get the contours of defective area. Contour of the defective area is then drawn on the original image to visualize. Step 6: Save the image in "color" folder if it has a color defect. :param frame: Input frame from the video :param cnt: Contours of the object :return: color_flag, defect """ defect = "Color" global OBJECT_COUNT color_flag = False # Increase the brightness of the image cv2.convertScaleAbs(frame, frame, 1, 20) # Convert the captured frame from BGR to HSV img_hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Threshold the image img_threshold = cv2.inRange(img_hsv, LOWER_COLOR_RANGE, UPPER_COLOR_RANGE) # Morphological opening (remove small objects from the foreground) img_threshold = cv2.erode(img_threshold, kernel=cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) img_threshold = cv2.dilate(img_threshold, kernel=cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (5, 5))) contours, hierarchy = cv2.findContours(img_threshold, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) for i in range(len(contours)): area = cv2.contourArea(contours[i]) if 2000 < area < 10000: cv2.drawContours(frame, contours[i], -1, (0, 0, 255), 2) color_flag = True if color_flag: x, y, w, h = cv2.boundingRect(cnt) print("Color defect detected in object {}".format(COUNT_OBJECT)) cv2.imwrite("{}/Color_{}.png".format(base_dir, COUNT_OBJECT), frame[y : y + h, x : x + w]) cv2.putText(frame, OBJECT_COUNT, (5, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Defect: {}".format(defect), (5, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Length (mm): {}".format(HEIGHT_OF_OBJ), (5, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Width (mm): {}".format(WIDTH_OF_OBJ), (5, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) return frame, color_flag, defect def detect_crack(frame, cnt, base_dir = acsa_storage): """ Identify the Crack defect on the object. Step 1: Convert the image to gray scale. Step 2: Blur the gray image to remove the noises. Step 3: Find the edges on the blurred image to get the contours of possible cracks. Step 4: Filter the contours to get the contour of the crack. Step 5: Draw the contour on the orignal image for visualization. Step 6: Save the image in "crack" folder if it has crack defect. :param frame: Input frame from the video :param cnt: Contours of the object :return: defect_flag, defect, cnt """ defect = "Crack" global OBJECT_COUNT defect_flag = False low_threshold = 130 kernel_size = 3 ratio = 3 # Convert the captured frame from BGR to GRAY img = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) img = cv2.blur(img, (7, 7)) # Find the edges detected_edges = cv2.Canny(img, low_threshold, low_threshold * ratio, kernel_size) # Find the contours contours, hierarchy = cv2.findContours(detected_edges, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE) if len(contours) != 0: for i in range(len(contours)): area = cv2.contourArea(contours[i]) if area > 20 or area < 9: cv2.drawContours(frame, contours, i, (0, 255, 0), 2) defect_flag = True if defect_flag: x, y, w, h = cv2.boundingRect(cnt) print("Crack defect detected in object {}".format(COUNT_OBJECT)) cv2.imwrite("{}/Crack_{}.png".format(base_dir, COUNT_OBJECT), frame[y : y + h , x : x + w ]) cv2.putText(frame, OBJECT_COUNT, (5, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Defect: {}".format(defect), (5, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Length (mm): {}".format(HEIGHT_OF_OBJ), (5, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) cv2.putText(frame, "Width (mm): {}".format(WIDTH_OF_OBJ), (5, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2) return frame, defect_flag, defect def stream(frameCount): global outputFrame, lock if cap.isOpened(): count = 0 while True: ret_val, frame = cap.read() if not None and frame.shape: if count % 3 != 2: frame = cv2.resize(frame, (640,360)) with lock: outputFrame = frame.copy() count += 1 else: continue else: print('Camera open failed') def store_jpg_frame(frame_data): current_time = datetime.datetime.now() file_name = current_time.strftime("%Y-%m-%d_%H-%M-%S") file_name = file_name + ".jpg" with open(f"{acsa_storage}/{file_name}", "wb") as f: f.write(frame_data) @app.route('/video_feed') def video_feed(): return Response(flaw_detection(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/data') def data(): files = [] for filename in os.listdir(acsa_storage): file_path = os.path.join(acsa_storage, filename) if os.path.isfile(file_path): size = os.path.getsize(file_path) modified = os.path.getmtime(file_path) files.append({'name': filename, 'size': size, 'modified': modified}) files.sort(key=lambda f: f['modified'], reverse=True) return jsonify({'files': files}) # check to see if this is the main thread of execution if __name__ == '__main__': # construct the argument parser and parse command line arguments ap = argparse.ArgumentParser() ap.add_argument("-i", "--ip", type=str, required=False, default='0.0.0.0', help="ip address of the device") ap.add_argument("-o", "--port", type=int, required=False, default=8000, help="ephemeral port number of the server (1024 to 65535)") ap.add_argument("-f", "--frame-count", type=int, default=32, help="# of frames used to construct the background model") args = vars(ap.parse_args()) input_stream = source cap = cv2.VideoCapture(input_stream) if not cap.isOpened(): print("\nCamera not plugged in... Exiting...\n") sys.exit(0) fps = cap.get(cv2.CAP_PROP_FPS) delay = (int)(1000 / fps) one_pixel_length = 0.0264583333 OBJ_DEFECT = [] frame_number = 40 FRAME_COUNT = 0 # Find dimensions and flaw detections such as color, crack and orientation # of the object. """flaw_detection()""" # start the flask app app.run(host=args["ip"], port=args["port"], debug=True, threaded=True, use_reloader=False) t = threading.Thread(target=stream, args=(args["frame_count"],)) t.daemon = True t.start() # release the video stream pointer cap.release() cv2.destroyAllWindows()