@jakaskerl
Just to follow up, any questions regarding the MRE I provided.
Also, I created the exact same pipeline in python running with depthai-library docker image, and I will run 10 times without any problem. I attach the python script below.
#!/usr/bin/env python3
from pathlib import Path
import cv2
import depthai as dai
import numpy as np
import argparse
import time
import errno
import os
import sys
import json
parser = argparse.ArgumentParser()
parser.add_argument('-nn', '--nn_model', help='select model path for inference',
default='/workspaces/yolov6n_openvino_2022.1_6shave.blob', type=str)
parser.add_argument('-c', '--config', help='Provide config path for inference',
default='/workspaces/yolov6n.json', type=str)
parser.add_argument('-fps', '--fps', type=float, help='Frame rate of camera capturing', default=15)
args = parser.parse_args()
nn_path = Path(args.nn_model)
config_path = Path(args.config)
config_fps = args.fps
if not nn_path.is_file():
sys.exit('NN not found!')
if not config_path.is_file():
sys.exit('Config not found!')
with config_path.open() as f:
config = json.load(f)
nn_config = config.get("nn_config", {})
if "input_size" in nn_config:
nn_width, nn_height = tuple(map(int, nn_config.get("input_size").split('x')))
metadata = nn_config.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchor_masks = metadata.get("anchor_masks", {})
iou_threshold = metadata.get("iou_threshold", {})
confidence_threshold = metadata.get("confidence_threshold", {})
print(metadata)
nn_mappings = config.get("mappings", {})
labels = nn_mappings.get("labels", {})
# pipeline
pipeline = dai.Pipeline()
camera_l = pipeline.create(dai.node.ColorCamera)
detection_nn_l = pipeline.create(dai.node.YoloDetectionNetwork)
xout_rgb_l = pipeline.create(dai.node.XLinkOut)
xout_nn_l = pipeline.create(dai.node.XLinkOut)
camera_r = pipeline.create(dai.node.ColorCamera)
detection_nn_r = pipeline.create(dai.node.YoloDetectionNetwork)
xout_rgb_r = pipeline.create(dai.node.XLinkOut)
xout_nn_r = pipeline.create(dai.node.XLinkOut)
xout_rgb_l.setStreamName("rgb_l")
xout_nn_l.setStreamName("nn_l")
xout_rgb_r.setStreamName("rgb_r")
xout_nn_r.setStreamName("nn_r")
camera_l.setPreviewSize(nn_width, nn_height)
camera_l.setBoardSocket(dai.CameraBoardSocket.CAM_B)
camera_l.setPreviewKeepAspectRatio(False)
camera_l.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camera_l.setInterleaved(False)
camera_l.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camera_l.setFps(config_fps)
camera_r.setPreviewSize(nn_width, nn_height)
camera_r.setBoardSocket(dai.CameraBoardSocket.CAM_C)
camera_r.setPreviewKeepAspectRatio(False)
camera_r.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camera_r.setInterleaved(False)
camera_r.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camera_r.setFps(config_fps)
detection_nn_l.setConfidenceThreshold(confidence_threshold)
detection_nn_l.setNumClasses(classes)
detection_nn_l.setCoordinateSize(coordinates)
detection_nn_l.setAnchors(anchors)
detection_nn_l.setAnchorMasks(anchor_masks)
detection_nn_l.setIouThreshold(iou_threshold)
detection_nn_l.setBlobPath(str(nn_path))
detection_nn_l.setNumInferenceThreads(1)
detection_nn_l.setNumNCEPerInferenceThread(1)
detection_nn_l.input.setBlocking(False)
detection_nn_r.setConfidenceThreshold(confidence_threshold)
detection_nn_r.setNumClasses(classes)
detection_nn_r.setCoordinateSize(coordinates)
detection_nn_r.setAnchors(anchors)
detection_nn_r.setAnchorMasks(anchor_masks)
detection_nn_r.setIouThreshold(iou_threshold)
detection_nn_r.setBlobPath(str(nn_path))
detection_nn_r.setNumInferenceThreads(1)
detection_nn_r.setNumNCEPerInferenceThread(1)
detection_nn_r.input.setBlocking(False)
# linking
camera_l.preview.link(xout_rgb_l.input)
camera_l.preview.link(detection_nn_l.input)
detection_nn_l.out.link(xout_nn_l.input)
camera_r.preview.link(xout_rgb_r.input)
camera_r.preview.link(detection_nn_r.input)
detection_nn_r.out.link(xout_nn_r.input)
with dai.Device(pipeline) as device:
print('Device name:', device.getDeviceName())
if device.getBootloaderVersion() is not None:
print('Bootloader version:', device.getBootloaderVersion())
print('Usb speed:', device.getUsbSpeed().name)
print('Connected cameras:', device.getConnectedCameraFeatures())
device_info = device.getDeviceInfo()
print('Device mixid:', device_info.getMxId())
q_rgb_l = device.getOutputQueue(name="rgb_l", maxSize=4, blocking=False)
q_nn_l = device.getOutputQueue(name="nn_l", maxSize=4, blocking=False)
q_rgb_r = device.getOutputQueue(name="rgb_r", maxSize=4, blocking=False)
q_nn_r = device.getOutputQueue(name="nn_r", maxSize=4, blocking=False)
start_time = time.monotonic()
counter_l = 0
detections_l = []
fps_l = 0
frame_l = None
counter_r = 0
detections_r = []
fps_r = 0
frame_r = None
text_color = (0, 0, 255)
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame, detections):
color = (0, 0, 255)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.imshow(name, frame)
while True:
in_nn_l = q_nn_l.tryGet()
in_rgb_l = q_rgb_l.tryGet()
in_nn_r = q_nn_r.tryGet()
in_rgb_r = q_rgb_r.tryGet()
if in_rgb_l is not None:
frame_l = in_rgb_l.getCvFrame()
fps_l = counter_l / (time.monotonic() - start_time)
cv2.putText(frame_l, "NN fps: {:.2f}".format(fps_l),
(2, frame_l.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.5, text_color)
if in_rgb_r is not None:
frame_r = in_rgb_r.getCvFrame()
fps_r = counter_r / (time.monotonic() - start_time)
cv2.putText(frame_r, "NN fps: {:.2f}".format(fps_r),
(2, frame_r.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.5, text_color)
if in_nn_l is not None:
detections_l = in_nn_l.detections
counter_l += 1
if in_nn_r is not None:
detections_r = in_nn_r.detections
counter_r += 1
if frame_l is not None:
displayFrame("left_camera", frame_l, detections_l)
if frame_r is not None:
displayFrame("right_camera", frame_r, detections_r)
if cv2.waitKey(1) == ord('q'):
break
The pipeline graph for the python version is the same
This link contains the code and model to run this python MRE
Thanks
Lincoln