CheM

- Jul 19, 2024
- Joined Jul 1, 2024
- 0 best answers
- Edited
Hi All,
I am trying to do a 2 stage prediction with pretrained models face-detection-retail-0004 and saved_model_openvino_2021.4_6shave for face and mask detection. Where only if a face is detected them submit the whole frame for mask detection. I get each individual model to predict properly when run individually in separate scripts, but when I try to mat it a 2 stage prediction in one script, the face detection works, but mask detection never happens. What am I doing wrong in the script below? Any Thoughts?
`import sys
sys.path.insert(0,'../..')
import cv2 # opencv - display the videos stream
import depthai # depthai - access the camera and its data packets
from uielements import DisplayValueLabel
import pyttsx3
import threading
import queue
#color for label text and confidence in BGR order
text_color = (255, 0, 0)
#Prevents multiple speech to talk
tts_queue = queue.Queue()
#Define type to speech
def tts_worker():
engine = pyttsx3.init()
while True:
message = tts_queue.get()
if message is None:
break
engine.say(message)
engine.runAndWait()
tts_queue.task_done()
#Start TTS worker thread
tts_thread = threading.Thread(target=tts_worker)
tts_thread.daemon = True
tts_thread.start()
#Main
def main():
pipeline = depthai.Pipeline()
# Create ColorCamera Node cam_rgb = pipeline.create(depthai.node.ColorCamera) cam_rgb.setPreviewSize(400, 400) cam_rgb.setInterleaved(False) # ImageManip that will crop the frame before sending it to the Face detection NN node manip_mask = pipeline.create(depthai.node.ImageManip) manip_mask.initialConfig.setResize(224, 224) # ImageManip that will crop the frame before sending it to the Face detection NN node manip_face = pipeline.create(depthai.node.ImageManip) manip_face.initialConfig.setResize(300, 300) # connect the 600x600 preview output to the input of the ImageManip node cam_rgb.preview.link(manip_face.inputImage) #Create face dectection NuralNetwork face_nn = pipeline.create(depthai.node.NeuralNetwork) face_nn.setBlobPath("./face-detection-retail-0004.blob") manip_face.out.link(face_nn.input) # Create NeuralNetwork Node that we will use to load our saved customer model custom_nn = pipeline.create(depthai.node.NeuralNetwork) custom_nn.setBlobPath("./saved_model_openvino_2021.4_6shave.blob") manip_mask.out.link(custom_nn.input) # Create XLinkOut nodes to send data to host for the camera xout_rgb = pipeline.create(depthai.node.XLinkOut) xout_rgb.setStreamName("rgb") cam_rgb.preview.link(xout_rgb.input) # Create XLinkOut nodes to send data to host for the neural network-ORIGINAL xout_face_nn = pipeline.create(depthai.node.XLinkOut) xout_face_nn.setStreamName("face_nn") face_nn.out.link(xout_face_nn.input) # Create XLinkOut nodes to send data to host for the neural network-ORIGINAL xout_custom_nn = pipeline.create(depthai.node.XLinkOut) xout_custom_nn.setStreamName("custom_nn") custom_nn.out.link(xout_custom_nn.input) display_label = DisplayValueLabel(10, 10, 275, 40, 'Mask?: ') # get the virtual device, and loop forever reading messages # from the internal queue with depthai.Device(pipeline) as device: # get a reference to the rgb queue, which contains the 600x600 frames from OAK camera # and a reference to the NeuralNetwork output with the model outputs of the mask # predictor q_rgb = device.getOutputQueue("rgb") q_fnn = device.getOutputQueue("face_nn") q_nn = device.getOutputQueue("custom_nn") det_fac = "" det_msk ="" frame = None while True: # read a message from each of the queues in_rgb = q_rgb.tryGet() in_face_nn = q_fnn.tryGet() in_custom_nn = q_nn.tryGet() if in_rgb is not None: # then we have a frame from the OAK frame = in_rgb.getCvFrame() if in_face_nn is not None: detections = in_face_nn.getFirstLayerFp16() print ("Face Detection output:", detections) for i in range(0, len(detections), 7): confidence = detections[i + 2] if confidence > 0.6: print(f"Face detected with confidence {confidence}") det_fac = "YES" # Check for mask only if face detected if in_custom_nn is not None: print("CUSTOM_NOT_NONE") mask, no_mask = in_custom_nn.getLayerFp16('StatefulPartitionedCall/model/dense_1/Softmax') # print the results of the prediction print(f"Mask[{round(mask,1)}], No Mask[{round(no_mask,1)}]") if round(mask,1) > round(no_mask,1): display_label.set_value("MASK") else: display_label.set_value("NO MASK") det_msk = "NO" if det_fac == "YES" and det_msk == "NO": print("HIT") if tts_queue.empty(): tts_queue.put("Please wear a mask") if frame is not None: display_label.draw(frame) # Show the frame from the OAK device cv2.imshow("TF Face Mask", frame) if cv2.waitKey(1) == ord('q'): break #SIgnal the TTS worker thread to exit tts_queue.put(None) tts_thread.join()
if
name
== "
main
":
main()`