jakaskerl
Hi jakaskerl. I hope you're doing great. Thank you so much for your patience and guidance. Here's the full code. I'm sorry I can't provide you with an MRE because I still can't find what really causes the change of data. I'm suspecting the pipeline part because of its lack of synchronization. I'm still trying to find where the error occurs so that I can fix my problem. Any tips and advice are very much appreciated. Please help. I'm using Oak-D Pro. Once again, thanks a lot.
Sincerely,
Marc
from pathlib import Path
import blobconverter
import cv2
import depthai as dai
import numpy as np
MIN_THRESHOLD = 15. # Degrees in yaw/pitch/roll to be considered as head movement
def frame_norm(debug_frame, bbox):
normVals = np.full(len(bbox),debug_frame.shape[0])
normVals[::2] = debug_frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
emotions = ['neutral', 'happy', 'sad', 'surprise', 'anger']
def create_pipeline(stereo):
print("Creating pipeline...")
pipeline = dai.Pipeline()
#if args.camera:
# ColorCamera
print("Creating Color Camera...")
cam = pipeline.create(dai.node.ColorCamera)
cam.setPreviewSize(1080, 1080)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setInterleaved(False)
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_xout = pipeline.createXLinkOut()
cam_xout.setStreamName("cam_out")
cam.preview.link(cam_xout.input)
copy_manip = pipeline.create(dai.node.ImageManip)
copy_manip.setNumFramesPool(15)
copy_manip.setMaxOutputFrameSize(3499200)
cam.preview.link(copy_manip.inputImage)
# ImageManip that will crop the frame before sending it to the Face detection NN node
face_det_manip = pipeline.create(dai.node.ImageManip)
face_det_manip.initialConfig.setResize(300, 300)
face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
copy_manip.out.link(face_det_manip.inputImage)
# NeuralNetwork for face detection
# Link Face ImageManip -> Face detection NN node
if stereo:
monoLeft = pipeline.create(dai.node.MonoCamera)
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoRight = pipeline.create(dai.node.MonoCamera)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
# Spatial Detection network if OAK-D
print("OAK-D detected, app will display spatial coordiantes")
face_det_nn = pipeline.create(dai.node.MobileNetSpatialDetectionNetwork)
face_det_nn.setBoundingBoxScaleFactor(0.8)
face_det_nn.setDepthLowerThreshold(100)
face_det_nn.setDepthUpperThreshold(5000)
stereo.depth.link(face_det_nn.inputDepth)
else: # Detection network if OAK-1
print("OAK-1 detected, app won't display spatial coordiantes")
face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
face_det_nn.setConfidenceThreshold(0.5)
face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6))
face_det_nn.input.setQueueSize(1)
face_det_manip.out.link(face_det_nn.input)
# Send face detections to the host (for bounding boxes)
face_det_xout = pipeline.create(dai.node.XLinkOut)
face_det_xout.setStreamName("face_det_out")
face_det_nn.out.link(face_det_xout.input)
image_manip_script = pipeline.create(dai.node.Script)
image_manip_script.inputs['face_det_in'].setBlocking(False)
image_manip_script.inputs['face_det_in'].setQueueSize(4)
face_det_nn.out.link(image_manip_script.inputs['face_det_in'])
image_manip_script.setScript("""
while True:
face_dets = node.io['face_det_in'].get().detections
# node.warn(f"Faces detected: {len(face_dets)}")
for det in face_dets:
cfg = ImageManipConfig()
cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
cfg.setResize(62, 62)
cfg.setKeepAspectRatio(False)
node.io['to_manip'].send(cfg)
cfg1 = ImageManipConfig()
cfg1.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
cfg1.setResize(64, 64)
cfg1.setKeepAspectRatio(False)
node.io['emotions_manip_cfg'].send(cfg1)
cfg2 = ImageManipConfig()
cfg2.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax)
cfg2.setResize(60, 60)
cfg2.setKeepAspectRatio(False)
node.io['pose_manip_cfg'].send(cfg2)
""")
#image manip for age and gender recogntion
age_gender_manip = pipeline.create(dai.node.ImageManip)
age_gender_manip.initialConfig.setResize(62, 62)
age_gender_manip.setWaitForConfigInput(False)
image_manip_script.outputs['to_manip'].link(age_gender_manip.inputConfig)
#image manip for emotion recognition
emotion_manip = pipeline.create(dai.node.ImageManip)
emotion_manip.initialConfig.setResize(64, 64)
emotion_manip.setWaitForConfigInput(False)
image_manip_script.outputs['emotions_manip_cfg'].link(emotion_manip.inputConfig)
#image manip for head pose estimation
pose_manip = pipeline.create(dai.node.ImageManip)
pose_manip.initialConfig.setResize(60, 60)
pose_manip.setWaitForConfigInput(False)
image_manip_script.outputs['pose_manip_cfg'].link(pose_manip.inputConfig)
cam.preview.link(face_det_manip.inputImage)
cam.preview.link(age_gender_manip.inputImage)
cam.preview.link(emotion_manip.inputImage)
cam.preview.link(pose_manip.inputImage)
# NeuralNetwork for age gender recognition
print("Creating age_gender Detection Neural Network...")
age_gender_nn = pipeline.create(dai.node.NeuralNetwork)
age_gender_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6))
age_gender_manip.out.link(age_gender_nn.input)
age_gender_nn_xout = pipeline.create(dai.node.XLinkOut)
age_gender_nn_xout.setStreamName("age_gender_out")
age_gender_nn.out.link(age_gender_nn_xout.input)
#NeuralNetwork for emotion recogniotn
print("Creating emotion Detection Neural Network...")
emotion_nn = pipeline.create(dai.node.NeuralNetwork)
emotion_nn.setBlobPath(blobconverter.from_zoo(name="emotions-recognition-retail-0003", shaves=6))
emotion_manip.out.link(emotion_nn.input)
emotion_nn_xout = pipeline.create(dai.node.XLinkOut)
emotion_nn_xout.setStreamName("emotion_out")
emotion_nn.out.link(emotion_nn_xout.input)
#NeuralNetwork for head post estimation recogniotn
print("Creating head posture Detection Neural Network...")
pose_nn = pipeline.create(dai.node.NeuralNetwork)
pose_nn.setBlobPath(blobconverter.from_zoo(name="head-pose-estimation-adas-0001", shaves=6))
pose_manip.out.link(pose_nn.input)
pose_nn_xout = pipeline.create(dai.node.XLinkOut)
pose_nn_xout.setStreamName("pose_out")
pose_nn.out.link(pose_nn_xout.input)
print("Pipeline succesfully created")
return pipeline
#uploading pipeine to the device
with dai.Device() as device:
stereo = 1 < len(device.getConnectedCameras())
device.setLogLevel(dai.LogLevel.WARN)
device.setLogOutputLevel(dai.LogLevel.WARN)
print("Starting pipeline...")
device.startPipeline(create_pipeline(stereo))
cam_out = device.getOutputQueue("cam_out", 4, False)
face_q = device.getOutputQueue("face_det_out", 4, False)
age_gender_q = device.getOutputQueue("age_gender_out", 4, False)
emotion_q = device.getOutputQueue("emotion_out", 4, False)
pose_q = device.getOutputQueue("pose_out",4, False)
def get_frame():
return True, cam_out.get().getCvFrame()
try:
while True:
read_correctly, frame = get_frame()
if not read_correctly:
break
if frame is not None:
debug_frame = frame.copy()
det_in = face_q.tryGet()
if det_in is not None:
detections = det_in.detections
for detection in detections:
bbox = frame_norm(debug_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
det = age_gender_q.get()
det2 = emotion_q.get()
det3 = pose_q.get()
emotion_results = np.array(det2.getFirstLayerFp16())
emotion_name = emotions[np.argmax(emotion_results)]
age = int(float(np.squeeze(np.array(det.getLayerFp16('age_conv3')))) * 100)
gender = np.squeeze(np.array(det.getLayerFp16('prob')))
gender_str = "female" if gender[0] > gender[1] else "male"
confidence = detection.confidence
# Decoding of recognition results
yaw = det3.getLayerFp16('angle_y_fc')[0]
pitch = det3.getLayerFp16('angle_p_fc')[0]
roll = det3.getLayerFp16('angle_r_fc')[0]
"""
pitch > 0 Head down, < 0 look up
yaw > 0 Turn right < 0 Turn left
roll > 0 Tilt right, < 0 Tilt left
"""
#for emotion recognitions decoding the result
vals = np.array([abs(pitch),abs(yaw),abs(roll)])
max_index = np.argmax(vals)
txt = None
if vals[max_index] > MIN_THRESHOLD:
cv2.putText(debug_frame, "pitch:{:.0f}, yaw:{:.0f}, roll:{:.0f}".format(pitch,yaw,roll), (bbox[0]+10-15, bbox[1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 8)
if max_index == 1:
if yaw > 0:
txt = "Turn right"
else:
txt = "Turn left"
elif max_index == 2:
if roll > 0: txt = "Tilt right"
else: txt = "Tilt left"
cv2.putText(debug_frame, txt, (bbox[0], bbox[1]+30), cv2.FONT_HERSHEY_TRIPLEX, 1, (0, 0, 0), 8)
cv2.putText(debug_frame, txt, (bbox[0], bbox[1]+30), cv2.FONT_HERSHEY_TRIPLEX, 1, (255, 255, 255), 2)
cv2.rectangle(debug_frame, (bbox[0], bbox[1]),(bbox[2], bbox[3]), (10, 245, 10), 2)
cv2.putText(debug_frame, emotion_name, (bbox[0], bbox[1]-125), cv2.FONT_HERSHEY_TRIPLEX, .5, (0, 0, 0), 8)
cv2.putText(debug_frame, emotion_name, (bbox[0], bbox[1]-125), cv2.FONT_HERSHEY_TRIPLEX, .5, (255, 255, 255), 2)
cv2.putText(debug_frame, str(age), (bbox[0]+125, bbox[1]-10), cv2.FONT_HERSHEY_TRIPLEX, .5, (0, 0, 0), 8)
cv2.putText(debug_frame, str(age), (bbox[0]+125, bbox[1]-10), cv2.FONT_HERSHEY_TRIPLEX, .5, (255, 255, 255), 2)
cv2.putText(debug_frame, f"Score:{confidence}", (bbox[0], bbox[1]-50), cv2.FONT_HERSHEY_TRIPLEX, .5, (0, 0, 0), 8)
cv2.putText(debug_frame, f"Score:{confidence}", (bbox[0], bbox[1]-50), cv2.FONT_HERSHEY_TRIPLEX, .5, (255, 255, 255), 2)
cv2.putText(debug_frame, gender_str, (bbox[0], bbox[1]-10), cv2.FONT_HERSHEY_TRIPLEX, .5, (0, 0, 0), 8)
cv2.putText(debug_frame, gender_str, (bbox[0], bbox[1]-10), cv2.FONT_HERSHEY_TRIPLEX, .5, (255, 255, 255), 2)
#measuring depth distance
if stereo:
# You could also get detection.spatialCoordinates.x and detection.spatialCoordinates.y coordinates
coords = "{:.2f}".format((detection.spatialCoordinates.z /1000))
cv2.putText(debug_frame, coords, (bbox[0], bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (0, 0, 0), 8)
cv2.putText(debug_frame, coords, (bbox[0], bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 255, 255), 2)
aspect_ratio = debug_frame.shape[1] / debug_frame.shape[0]
cv2.imshow("Camera_view", debug_frame)
if cv2.waitKey(1) == ord('q'):
cv2.destroyAllWindows()
break
except KeyboardInterrupt:
pass