• DepthAI-v2
  • Issues when applying a `NeuralNetwork` node for object detection

Hi,

I have issues when using the NeuralNetwork node instead of the YoloDetectionNetwork node for object detection.

The below MWE applies the yolov8 coco 416x416 model from the DepthAI model zoo on the video stream. If using the YoloDetectionNetwork (useYDN = True), plausible detections are returned. However, when using the NeuralNetwork (useYDN=False), no detections are returned at all, even when selecting a confidence threshold of 0.

For decoding the messages of the NeuralNetwork I stuck to the example given here: https://github.com/ultralytics/ultralytics/blob/main/examples/YOLOv8-OpenCV-ONNX-Python/main.py
The first abnormality I recognized is, that the shape of the resulting tensor seems to be 1x85x2704 instead of the expected 1x84x2704 (80 class labels + 4 bounding box coordinates). In the resulting tensor, all values except for the first four bounding box entries are 0 (confirming the observation that no objects are detected).

This is a toy example, as I could simply be using the YoloDetectionNetwork for this. However, in practice, I want to deploy my custom model on the Oak device, which forces me to use the NeuralNetwork node and I have not yet managed to get it running.

Can somebody spot my error and point me in the right direction? I have been stuck on this issue for quite some time now and would be very happy once I succeed. Thank you very much in advance.

import blobconverter
import cv2
import depthai as dai
import numpy as np
import sys


def setup_pipeline(
    use_YoloDetectionNetwork: bool,
    model_path: str,
    confidence_threshold: float,
    model_width: int = 416,
    model_height: int = 416,
    video_fps: int = 10
) -> dai.Pipeline:
    pipeline = dai.Pipeline()

    # Camera
    cam = pipeline.create(dai.node.ColorCamera)
    cam.setBoardSocket(dai.CameraBoardSocket.CAM_A)
    cam.setInterleaved(False)
    cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
    cam.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
    cam.setFps(video_fps)
    cam.setPreviewSize(model_width, model_width)
    cam.setPreviewKeepAspectRatio(False)

    # NeuralNetwork
    if use_YoloDetectionNetwork:
        nn = pipeline.create(dai.node.YoloDetectionNetwork)
        nn.setCoordinateSize(4)
        nn.setNumClasses(80)
        nn.setConfidenceThreshold(confidence_threshold)
    else:
        nn = pipeline.create(dai.node.NeuralNetwork)

    nn.setBlobPath(model_path)
    nn.setNumInferenceThreads(2)

    # Outputs
    nnXout = pipeline.create(dai.node.XLinkOut)
    nnXout.setStreamName("nn")

    video_out = pipeline.create(dai.node.XLinkOut)
    video_out.setStreamName("video")

    # Linking
    cam.preview.link(nn.input)
    nn.out.link(nnXout.input)
    nn.passthrough.link(video_out.input)

    return pipeline


def decode_detections(
    outputs: np.ndarray,
    confidence_threshold: float
):
    # Prepare output array
    outputs = np.array([cv2.transpose(outputs[0])])
    rows = outputs.shape[1]

    boxes = []
    scores = []
    class_ids = []

    # Iterate through output to collect bounding boxes, confidence scores,
    # and class IDs
    for i in range(rows):
        x, y, w, h = outputs[0][i][:4]
        classes_scores = outputs[0][i][4:]
        (_, maxScore, _, (_, maxClassIndex)) = cv2.minMaxLoc(classes_scores)
        box = [x - (0.5 * w),
               y - (0.5 * h),
               w,
               h]

        if maxScore > confidence_threshold:
            boxes.append(box)
            scores.append(maxScore)
            class_ids.append(maxClassIndex)

    # Apply NMS (Non-maximum suppression)
    result_boxes = cv2.dnn.NMSBoxes(boxes,
                                    scores,
                                    confidence_threshold,
                                    0.45,
                                    0.5)

    # Iterate through NMS results to draw bounding boxes and labels
    detections = []
    for i in range(len(result_boxes)):
        index = result_boxes[i]
        box = boxes[index]
        detection = {
            "confidence": scores[index],
            "label": class_ids[index],
            "xmin": box[0],
            "xmax": box[0] + box[2],
            "ymin": box[1] + box[3],
            "ymax": box[1] + box[3],
        }
        detections.append(detection)

    return detections


model_path = blobconverter.from_zoo(name="yolov8n_coco_416x416",
                                    zoo_type="depthai",
                                    shaves=6)

use_YDN = eval(sys.argv[1]) if len(sys.argv) > 1 else False
confidence_threshold = float(sys.argv[2]) if len(sys.argv) > 2 else 0.3

print(f"using {'YoloDetectionNetwork' if use_YDN else 'NeuralNetwork'}"
      f" with confidence threshold of {confidence_threshold}")

pipeline = setup_pipeline(use_YDN,
                          model_path,
                          confidence_threshold)

with dai.Device(pipeline) as device:
    nn_queue = device.getOutputQueue("nn")
    video_queue = device.getOutputQueue(name="video")

    while True:
        in_nn = nn_queue.get()
        in_video = video_queue.get()

        if use_YDN:
            detections = in_nn.detections
        else:
            in_nn = in_nn.getFirstLayerFp16()
            in_nn = np.asarray(in_nn, dtype=np.int32).reshape((1, 85, 2704))
            detections = decode_detections(in_nn, confidence_threshold)

        if len(detections) > 0:
            print(",".join([str(d.label) for d in detections]))
        else:
            print("No detections in frame.")

    Hi @AxelF
    Have you tried decoding as done here?

    Also, the np.int32:

    AxelF in_nn = np.asarray(in_nn, dtype=np.int32).reshape((1, 85, 2704))

    is likely a FP value.

    Thanks,
    Jaka

    HI @jakaskerl

    Thank you very much for your reply! The hint regarding the data type of the tensor already helped a lot.

    I also tried the decoding method that you linked as well as an updated version of this method by ultralytics. However, the results are still not as expected.

    Below is an updated example using the methods described above. This example executes the object detection on the attached image bus.jpg. However, the output consists of four identical bounding boxes with a class of "umbrella", which differs substantially from the detections when using the YoloDetectionNetwork as described in my previous post.

    Again, thank you for your help! I would be really happy if you could provide further help.

    import blobconverter
    import cv2
    import depthai as dai
    import numpy as np
    import time
    import torch
    import torchvision
    from typing import List
    
    import ultralytics.utils.ops
    from ultralytics.utils import yaml_load
    from ultralytics.utils.checks import check_yaml
    
    
    def tensor2imgdetection(tensor: torch.Tensor) -> List[dai.ImgDetection]:
        detections = list()
        for res in tensor:
            detection = dai.ImgDetection
            detection.confidence = res[4]
            detection.label = int(res[5])
            detection.xmin = res[0]
            detection.xmax = res[2]
            detection.ymin = res[1]
            detection.ymax = res[3]
            detections.append(detection)
        return detections
    
    
    def xywh2xyxy(x):
        # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2]
        # where xy1=top-left, xy2=bottom-right
        y = torch.zeros_like(x) if isinstance(
            x, torch.Tensor) else np.zeros_like(x)
        y[:, 0] = x[:, 0] - x[:, 2] / 2  # top left x
        y[:, 1] = x[:, 1] - x[:, 3] / 2  # top left y
        y[:, 2] = x[:, 0] + x[:, 2] / 2  # bottom right x
        y[:, 3] = x[:, 1] + x[:, 3] / 2  # bottom right y
        return y
    
    
    def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45):
        prediction = np.array([cv2.transpose(prediction[0])])
        prediction = torch.from_numpy(prediction)
        if prediction.dtype is torch.float16:
            prediction = prediction.float()  # to FP32
    
        xc = prediction[..., 4] >= conf_thres  # candidates
    
        for xi, x in enumerate(prediction):  # image index, image inference
            # Apply constraints
            x = x[xc[xi]]  # confidence
    
            # If none remain process next image
            if not x.shape[0]:
                continue
    
            # Compute conf
            # x[:, 5:] *= x[:, 4:5]  # conf = obj_conf * cls_conf
    
            # Box (center x, center y, width, height) to (x1, y1, x2, y2)
            box = xywh2xyxy(x[:, :4])
    
            # Detections matrix nx6 (xyxy, conf, cls)
            conf, j = x[:, 5:].max(1, keepdim=True)
            x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres]
    
            # If none remain process next image
            n = x.shape[0]  # number of boxes
            if not n:
                continue
    
            boxes, scores = x[:, :4], x[:, 4]
            i = torchvision.ops.boxes.nms(boxes, scores, iou_thres)
            detections = tensor2imgdetection(x[i])
    
        return detections
    
    
    def setup_pipeline(model_path: str,
                       image_path: str = None) -> dai.Pipeline:
        pipeline = dai.Pipeline()
    
        # Input image
        img_in = pipeline.create(dai.node.XLinkIn)
        img_in.setStreamName("img_in")
    
        # Detection node
        nn = pipeline.create(dai.node.NeuralNetwork)
        nn.setNumPoolFrames(4)
        nn.setBlobPath(model_path)
        nn.setNumInferenceThreads(2)
    
        # Outputs
        nnXout = pipeline.create(dai.node.XLinkOut)
        nnXout.setStreamName("nn")
    
        video_out = pipeline.create(dai.node.XLinkOut)
        video_out.setStreamName("video")
    
        # Linking
        img_in.out.link(nn.input)
        nn.out.link(nnXout.input)
        nn.passthrough.link(video_out.input)
    
        return pipeline
    
    
    label_map = yaml_load(check_yaml("coco8.yaml"))["names"]
    model_path = blobconverter.from_zoo(name="yolov8n_coco_416x416",
                                        zoo_type="depthai",
                                        shaves=8)
    model_width = 416
    model_height = 416
    tensor_shape = (1, 85, 2704)
    
    
    def print_detection(detection: dai.ImgDetection) -> None:
        d = detection
        print(f"{label_map[d.label]:<8s} (conf={d.confidence:.2f}) "
              f"[xmin={d.xmin:.2f}, ymin={d.ymin:.2f}, "
              f"xmax={d.xmax:.2f}, ymax={d.ymax:.2f}]")
    
    
    def to_planar(arr: np.ndarray, shape: tuple) -> list:
        return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
    
    
    def main(confidence_threshold, image_path):
        print("using NeuralNetwork with confidence threshold of " +
              str(confidence_threshold))
    
        pipeline = setup_pipeline(model_path, image_path)
    
        with dai.Device(pipeline) as device:
            nn_queue = device.getOutputQueue("nn")
            video_queue = device.getOutputQueue(name="video")
            input_queue = device.getInputQueue("img_in")
    
            detections = []
    
            def display_frame(name, frame):
                color = (255, 0, 0)
                for d in detections:
                    xmin = int(d.xmin)
                    xmax = int(d.xmax)
                    ymin = int(d.ymin)
                    ymax = int(d.ymax)
                    cv2.putText(frame, label_map[d.label],
                                (xmin + 10, ymin + 20),
                                cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
                    cv2.putText(frame, f"{int(d.confidence * 100)}%",
                                (xmin + 10, ymin + 40),
                                cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
                    cv2.rectangle(frame, (xmin, ymin),
                                  (xmax, ymax), color, 2)
                # Show the frame
                cv2.imshow(name, frame)
    
            frame = cv2.imread(image_path)
            tstamp = time.monotonic()
            img_frame = dai.ImgFrame()
            img_frame.setData(to_planar(frame, (model_width, model_height)))
            img_frame.setTimestamp(tstamp)
            img_frame.setSequenceNum(1)
            img_frame.setType(dai.RawImgFrame.Type.BGR888p)
            img_frame.setWidth(model_width)
            img_frame.setHeight(model_height)
            input_queue.send(img_frame)
    
            in_nn = nn_queue.get()
            in_video = video_queue.get()
    
            in_nn = np.array(in_nn.getFirstLayerFp16())
            in_nn = in_nn.reshape(tensor_shape)
    
            # Variant 1: use ultralytics method
            # detections = tensor2imgdetection(
            #     ultralytics.utils.ops.non_max_suppression(
            #         torch.from_numpy(in_nn),
            #         confidence_threshold, nc=len(label_map))[0]
            # )
            # Variant 2: use method by https://github.com/luxonis/depthai-experiments/blob/master/gen2-yolo/host-decoding/main.py
            detections = non_max_suppression(in_nn, confidence_threshold)
    
            display_frame("detections", in_video.getCvFrame())
    
            [print_detection(d) for d in detections]
    
            cv2.waitKey(0)
    
    
    if __name__ == '__main__':
        confidence_threshold = 0.1
        image_path = 'bus.jpg'
        main(confidence_threshold, image_path)

    Hi @AxelF
    Create a frame with dai.ImgFrame() (you forgot to call the constructor).

    There also seem to be unnecessary type conversions in NMS.

    def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45):
        # Convert prediction to a torch tensor if it's not already
        if not isinstance(prediction, torch.Tensor):
            prediction = torch.from_numpy(prediction)
            if prediction.dtype is torch.float16:
                prediction = prediction.float()  # Convert to FP32
    
        # Filter out predictions below the confidence threshold
        mask = prediction[:, 4] >= conf_thres
        prediction = prediction[mask]
    
        # If no prediction meets the confidence threshold, return empty list
        if prediction.shape[0] == 0:
            return

    Thanks,
    Jaka