• DepthAIHardware
  • How do I deploy a yolov8 segment model on an OAK D Pro camera?

I've converted the yolov8 instance segmentation model into blob, but I don't know how to extract the output data of the neural network to segment the RGB image? Does anyone implement it?

I made some modifications to this class to get the output of the YOLO seg (you will need to download utils.py too):

ibaiGorordo/ONNX-YOLOv8-Instance-Segmentationblob/main/yoloseg/YOLOSeg.py#L91

I put the oak code here if you want to check:

https://stackoverflow.com/questions/78153689/numpy-array-slow-with-large-list

And this is the class modifed:

import math

import time

import cv2

import numpy as np

import onnxruntime

from utils import xywh2xyxy, nms, draw_detections, sigmoid

class YOLOSeg:

def __init__(self, path, conf_thres=0.7, iou_thres=0.5, num_masks=32):

    self.conf_threshold = conf_thres

    self.iou_threshold = iou_thres

    self.num_masks = num_masks

    # Initialize model

    #self.initialize_model(path)

    self.initialize_model_for_oak()

def __call__(self, image):

    return self.segment_objects(image)

    

def initialize_model_for_oak(self, input_name = 'images', input_shape = [1, 3, 640, 640], input_height = 640, input_width = 640, output_names = ['output0', 'output1']):

    # input details

    self.input_names = input_name        

    self.input_shape = input_shape

    self.input_height = input_height

    self.input_width = input_width

    # output details        

    self.output_names = output_names

def initialize_model(self, path):

    self.session = onnxruntime.InferenceSession(path,

                                                providers=['CUDAExecutionProvider',

                                                           'CPUExecutionProvider'])

    # Get model info

    self.get_input_details()

    self.get_output_details()

    

def segment_objects(self, image):

    input_tensor = self.prepare_input(image)

    # Perform inference on the image

    outputs = self.inference(input_tensor)

    self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0])

    self.mask_maps = self.process_mask_output(mask_pred, outputs[1])

    return self.boxes, self.scores, self.class_ids, self.mask_maps

    

def segment_objects_from_oakd(self, output0, output1):

    

    

    self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(output0)

    self.mask_maps = self.process_mask_output(mask_pred, output1)

    return self.boxes, self.scores, self.class_ids, self.mask_maps

def prepare_input_for_oakd(self, shape):



    self.img_height = shape[0]

    self.img_width = shape[1]

    

def prepare_input(self, image):

    self.img_height, self.img_width = image.shape[:2]

    input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize input image

    input_img = cv2.resize(input_img, (self.input_width, self.input_height))

    # Scale input pixel values to 0 to 1

    input_img = input_img / 255.0

    input_img = input_img.transpose(2, 0, 1)

    input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32)

    return input_tensor

def inference(self, input_tensor):

    start = time.perf_counter()

    outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor})

    # print(f"Inference time: {(time.perf_counter() - start)\*1000:.2f} ms")

    return outputs

def process_box_output(self, box_output):

    predictions = np.squeeze(box_output).T

    num_classes = box_output.shape[1] - self.num_masks - 4

    # Filter out object confidence scores below threshold

    scores = np.max(predictions[:, 4:4+num_classes], axis=1)

    predictions = predictions[scores > self.conf_threshold, :]

    scores = scores[scores > self.conf_threshold]

    if len(scores) == 0:

        return [], [], [], np.array([])

    box_predictions = predictions[..., :num_classes+4]

    mask_predictions = predictions[..., num_classes+4:]

    # Get the class with the highest confidence

    class_ids = np.argmax(box_predictions[:, 4:], axis=1)

    # Get bounding boxes for each object

    boxes = self.extract_boxes(box_predictions)

    # Apply non-maxima suppression to suppress weak, overlapping bounding boxes

    indices = nms(boxes, scores, self.iou_threshold)

    return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices]

def process_mask_output(self, mask_predictions, mask_output):

    if mask_predictions.shape[0] == 0:

        return []

    mask_output = np.squeeze(mask_output)

    # Calculate the mask maps for each box

    num_mask, mask_height, mask_width = mask_output.shape  # CHW

    masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1)))

    masks = masks.reshape((-1, mask_height, mask_width))

    # Downscale the boxes to match the mask size

    scale_boxes = self.rescale_boxes(self.boxes,

                               (self.img_height, self.img_width),

                               (mask_height, mask_width))

    # For every box/mask pair, get the mask map

    mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width))

    blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height))

    for i in range(len(scale_boxes)):

        scale_x1 = int(math.floor(scale_boxes[i][0]))

        scale_y1 = int(math.floor(scale_boxes[i][1]))

        scale_x2 = int(math.ceil(scale_boxes[i][2]))

        scale_y2 = int(math.ceil(scale_boxes[i][3]))

        x1 = int(math.floor(self.boxes[i][0]))

        y1 = int(math.floor(self.boxes[i][1]))

        x2 = int(math.ceil(self.boxes[i][2]))

        y2 = int(math.ceil(self.boxes[i][3]))

        scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2]

        crop_mask = cv2.resize(scale_crop_mask,

                          (x2 - x1, y2 - y1),

                          interpolation=cv2.INTER_CUBIC)

        crop_mask = cv2.blur(crop_mask, blur_size)

        crop_mask = (crop_mask > 0.5).astype(np.uint8)

        mask_maps[i, y1:y2, x1:x2] = crop_mask

    return mask_maps

def extract_boxes(self, box_predictions):

    # Extract boxes from predictions

    boxes = box_predictions[:, :4]

    # Scale boxes to original image dimensions

    boxes = self.rescale_boxes(boxes,

                               (self.input_height, self.input_width),

                               (self.img_height, self.img_width))

    # Convert boxes to xyxy format

    boxes = xywh2xyxy(boxes)

    # Check the boxes are within the image

    boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width)

    boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height)

    boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width)

    boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height)

    return boxes

def draw_detections(self, image, draw_scores=True, mask_alpha=0.4):

    return draw_detections(image, self.boxes, self.scores,

                           self.class_ids, mask_alpha)

def draw_masks(self, image, draw_scores=True, mask_alpha=0.5):

    return draw_detections(image, self.boxes, self.scores,

                           self.class_ids, mask_alpha, mask_maps=self.mask_maps)

def get_input_details(self):

    model_inputs = self.session.get_inputs()

    self.input_names = [model_inputs[i].name for i in range(len(model_inputs))]

    self.input_shape = model_inputs[0].shape

    self.input_height = self.input_shape[2]

    self.input_width = self.input_shape[3]

def get_output_details(self):

    model_outputs = self.session.get_outputs()

    self.output_names = [model_outputs[i].name for i in range(len(model_outputs))]

@staticmethod

def rescale_boxes(boxes, input_shape, image_shape):

    # Rescale boxes to original image dimensions

    input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]])

    boxes = np.divide(boxes, input_shape, dtype=np.float32)

    boxes \*= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])

    return boxes

if name == 'main':

from imread_from_url import imread_from_url

model_path = "./modelo_base_segmentacion_coco/yolov8n-seg.onnx"

# Initialize YOLOv8 Instance Segmentator

yoloseg = YOLOSeg(model_path, conf_thres=0.3, iou_thres=0.5)

img_url = "https://live.staticflickr.com/13/19041780_d6fd803de0_3k.jpg"

img = imread_from_url(img_url)

# Detect Objects

yoloseg(img)

# Draw detections

combined_img = yoloseg.draw_masks(img)

cv2.namedWindow("Output", cv2.WINDOW_NORMAL)

cv2.imshow("Output", combined_img)

cv2.waitKey(0)

It works but I have a problem of huge latency right now (4.2 seconds), if you reach better results please let me know!.

    pedro-UCA Thank you very much for the codeļ¼

    But I get an error after running the program:

    output0 = np.reshape(in_nn_yolo.getLayerFp16("output0"), newshape=([1, 116, 8400]))

    ValueError: cannot reshape array of size 0 into shape (1,116,8400)

    The blob model I use is yolo's official yolov8n-seg.blob, and when I run the program, I comment out the YOLOSeg.py if name == 'main': after that, I don't know what the problem is?

    Sorry I put the code as comment not as code, I will list all the code here to make it clear:

    This is the YOLOSeg.py

     import math
    import time
    import cv2
    import numpy as np
    import onnxruntime
    
    from utils import xywh2xyxy, nms, draw_detections, sigmoid
    
    
    class YOLOSeg:
    
        def __init__(self, path, conf_thres=0.7, iou_thres=0.5, num_masks=32):
            self.conf_threshold = conf_thres
            self.iou_threshold = iou_thres
            self.num_masks = num_masks
    
            # Initialize model
            #self.initialize_model(path)
            self.initialize_model_for_oak()
    
        def __call__(self, image):
            return self.segment_objects(image)
            
        def initialize_model_for_oak(self, input_name = 'images', input_shape = [1, 3, 640, 640], input_height = 640, input_width = 640, output_names = ['output0', 'output1']):
    
            # input details
            self.input_names = input_name        
            self.input_shape = input_shape
            self.input_height = input_height
            self.input_width = input_width
    
            # output details        
            self.output_names = output_names
    
        def initialize_model(self, path):
            self.session = onnxruntime.InferenceSession(path,
                                                        providers=['CUDAExecutionProvider',
                                                                   'CPUExecutionProvider'])
            # Get model info
            self.get_input_details()
            self.get_output_details()
            
    
        def segment_objects(self, image):
            input_tensor = self.prepare_input(image)
    
            # Perform inference on the image
            outputs = self.inference(input_tensor)
    
            self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0])
            self.mask_maps = self.process_mask_output(mask_pred, outputs[1])
    
            return self.boxes, self.scores, self.class_ids, self.mask_maps
            
        def segment_objects_from_oakd(self, output0, output1):
            
            
            self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(output0)
            self.mask_maps = self.process_mask_output(mask_pred, output1)
    
            return self.boxes, self.scores, self.class_ids, self.mask_maps
        def prepare_input_for_oakd(self, shape):
        
            self.img_height = shape[0]
            self.img_width = shape[1]
            
        def prepare_input(self, image):
            self.img_height, self.img_width = image.shape[:2]
    
            input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
            # Resize input image
            input_img = cv2.resize(input_img, (self.input_width, self.input_height))
    
            # Scale input pixel values to 0 to 1
            input_img = input_img / 255.0
            input_img = input_img.transpose(2, 0, 1)
            input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32)
    
            return input_tensor
    
        def inference(self, input_tensor):
            start = time.perf_counter()
            outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor})
    
            # print(f"Inference time: {(time.perf_counter() - start)*1000:.2f} ms")
            return outputs
    
        def process_box_output(self, box_output):
    
            predictions = np.squeeze(box_output).T
            num_classes = box_output.shape[1] - self.num_masks - 4
    
            # Filter out object confidence scores below threshold
            scores = np.max(predictions[:, 4:4+num_classes], axis=1)
            predictions = predictions[scores > self.conf_threshold, :]
            scores = scores[scores > self.conf_threshold]
    
            if len(scores) == 0:
                return [], [], [], np.array([])
    
            box_predictions = predictions[..., :num_classes+4]
            mask_predictions = predictions[..., num_classes+4:]
    
            # Get the class with the highest confidence
            class_ids = np.argmax(box_predictions[:, 4:], axis=1)
    
            # Get bounding boxes for each object
            boxes = self.extract_boxes(box_predictions)
    
            # Apply non-maxima suppression to suppress weak, overlapping bounding boxes
            indices = nms(boxes, scores, self.iou_threshold)
    
            return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices]
    
        def process_mask_output(self, mask_predictions, mask_output):
    
            if mask_predictions.shape[0] == 0:
                return []
    
            mask_output = np.squeeze(mask_output)
    
            # Calculate the mask maps for each box
            num_mask, mask_height, mask_width = mask_output.shape  # CHW
            masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1)))
            masks = masks.reshape((-1, mask_height, mask_width))
    
            # Downscale the boxes to match the mask size
            scale_boxes = self.rescale_boxes(self.boxes,
                                       (self.img_height, self.img_width),
                                       (mask_height, mask_width))
    
            # For every box/mask pair, get the mask map
            mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width))
            blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height))
            for i in range(len(scale_boxes)):
    
                scale_x1 = int(math.floor(scale_boxes[i][0]))
                scale_y1 = int(math.floor(scale_boxes[i][1]))
                scale_x2 = int(math.ceil(scale_boxes[i][2]))
                scale_y2 = int(math.ceil(scale_boxes[i][3]))
    
                x1 = int(math.floor(self.boxes[i][0]))
                y1 = int(math.floor(self.boxes[i][1]))
                x2 = int(math.ceil(self.boxes[i][2]))
                y2 = int(math.ceil(self.boxes[i][3]))
    
                scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2]
                crop_mask = cv2.resize(scale_crop_mask,
                                  (x2 - x1, y2 - y1),
                                  interpolation=cv2.INTER_CUBIC)
    
                crop_mask = cv2.blur(crop_mask, blur_size)
    
                crop_mask = (crop_mask > 0.5).astype(np.uint8)
                mask_maps[i, y1:y2, x1:x2] = crop_mask
    
            return mask_maps
    
        def extract_boxes(self, box_predictions):
            # Extract boxes from predictions
            boxes = box_predictions[:, :4]
    
            # Scale boxes to original image dimensions
            boxes = self.rescale_boxes(boxes,
                                       (self.input_height, self.input_width),
                                       (self.img_height, self.img_width))
    
            # Convert boxes to xyxy format
            boxes = xywh2xyxy(boxes)
    
            # Check the boxes are within the image
            boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width)
            boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height)
            boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width)
            boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height)
    
            return boxes
    
        def draw_detections(self, image, draw_scores=True, mask_alpha=0.4):
            return draw_detections(image, self.boxes, self.scores,
                                   self.class_ids, mask_alpha)
    
        def draw_masks(self, image, draw_scores=True, mask_alpha=0.5):
            return draw_detections(image, self.boxes, self.scores,
                                   self.class_ids, mask_alpha, mask_maps=self.mask_maps)
    
        def get_input_details(self):
            model_inputs = self.session.get_inputs()
            self.input_names = [model_inputs[i].name for i in range(len(model_inputs))]
    
            self.input_shape = model_inputs[0].shape
            self.input_height = self.input_shape[2]
            self.input_width = self.input_shape[3]
    
        def get_output_details(self):
            model_outputs = self.session.get_outputs()
            self.output_names = [model_outputs[i].name for i in range(len(model_outputs))]
    
    
        @staticmethod
        def rescale_boxes(boxes, input_shape, image_shape):
            # Rescale boxes to original image dimensions
            input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]])
            boxes = np.divide(boxes, input_shape, dtype=np.float32)
            boxes *= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])
    
            return boxes
    
    
    if __name__ == '__main__':
        from imread_from_url import imread_from_url
    
    
        # Initialize YOLOv8 Instance Segmentator
        yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
    
        img_url = "https://live.staticflickr.com/13/19041780_d6fd803de0_3k.jpg"
        img = imread_from_url(img_url)
    
        # Detect Objects
        yoloseg(img)
    
        # Draw detections
        combined_img = yoloseg.draw_masks(img)
        cv2.namedWindow("Output", cv2.WINDOW_NORMAL)
        cv2.imshow("Output", combined_img)
        cv2.waitKey(0)

    And you will need in the same folder this utils.py:

    ibaiGorordo/ONNX-YOLOv8-Instance-Segmentationblob/main/yoloseg/utils.py

    You only need to pass the output as I do here:

    import cv2
    import numpy as np
    import depthai as dai
    import time
    from YOLOSeg import YOLOSeg
    
    pathYoloBlob = "./yolov8n-seg.blob"
    
    # Create OAK-D pipeline
    pipeline = dai.Pipeline()
    
    cam_rgb = pipeline.createColorCamera()
    cam_rgb.setPreviewSize(640, 640)  
    cam_rgb.setInterleaved(False)
    
    nn = pipeline.create(dai.node.NeuralNetwork)
    nn.setBlobPath(pathYoloBlob)
    
    cam_rgb.preview.link(nn.input)
    
    xout_rgb = pipeline.createXLinkOut()
    xout_rgb.setStreamName("rgb")
    cam_rgb.preview.link(xout_rgb.input)
    
    xout_nn_yolo = pipeline.createXLinkOut()
    xout_nn_yolo.setStreamName("nn_yolo")
    nn.out.link(xout_nn_yolo.input)
    
    # Start aplication
    with depthai.Device(pipeline) as device:
    
        q_rgb = device.getOutputQueue("rgb")
        q_nn_yolo = device.getOutputQueue("nn_yolo")
    
    
        frame = None
    
        # Since the detections returned by nn have values from <0..1> range, they need to be multiplied by frame width/height to
        # receive the actual position of the bounding box on the image
        def frameNorm(frame, bbox):
            normVals = np.full(len(bbox), frame.shape[0])
            normVals[::2] = frame.shape[1]
            return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
    
        # Main host-side application loop
        while True:
    
            in_rgb = q_rgb.tryGet()
            in_nn_yolo = q_nn_yolo.tryGet()
    
            if in_rgb is not None:
    
                frame = in_rgb.getCvFrame()  
    
                if in_nn_yolo is not None:
                    
                    # Here is the problem
                    output0 = np.reshape(in_nn_yolo.getLayerFp16("output0"), newshape=([1, 116, 8400]))
                    output1 = np.reshape(in_nn_yolo.getLayerFp16("output1"), newshape=([1, 32, 160, 160]))               
     
                    # Si tenemos ambos outputs podemos calcular la mascara final
                    if( len(output0) > 0 and len(output1) > 0 ):                 
    
                        #Post-process, this is fast, no problems here
                        yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
                        yoloseg.prepare_input_for_oakd(frame.shape[:2])
                        yoloseg.segment_objects_from_oakd(output0,output1)
                        combined_img = yoloseg.draw_masks(frame.copy())
                        cv2.imshow("Output", combined_img)
    
                else:
                    print("in_nn_yolo EMPTY")
    
            else:
                print("in_rgb EMPTY")
            # at any time, you can press "q" and exit the main loop, therefore exiting the program itself
            if cv2.waitKey(1) == ord('q'):
                break

    The "yolov8n-seg.blob" that I use is the original of ultralytics https://docs.ultralytics.com/es/tasks/segment/#export
    yolo export model=yolov8n-seg.pt format=onnx
    And convert it to blob using the luxonis tool http://blobconverter.luxonis.com/ with:
    Choose OpenVINO version -> 2022.1
    Choose model source -> ONNX
    and with the parameter --data_type=FP16 --mean_values=[0,0,0] --scale_values=[255,255,255] and 6 shaves
    I do not remember right now if mean_values=[0,0,0] because I did it in another computer but if I am not wrong is that because yolo works with [0,1] internaly.

    I think I have summarize the steps I made to make it work, if you get an empty list for output0 check if you made the same as I do.

    Regards,
    Pedro.

      pedro-UCA Thank you very much for your detailed guidance! I have successfully run the code, and like you, I have a long delay, about four seconds. But the model I trained with my own dataset still got an error: ValueError: cannot reshape array of size 310800 into shape (1,116,8400).

      pedro-UCA Thank you very much for the method, I tried it and it worked. When I changed output0 to the actual dimension, the program worked fine. Since I've only trained one class, there's almost no lag in running, but segmentation doesn't work as well.

      @pedro-UCA

      @erik

      Your method for instance segmentation works for me but I also need depth.

      The depthai-sdk has MobilenetSpatialDetections and YoloSpatialDetections which gives the z-axis value. I want to achieve a similar one for segmentation. Something like YoloSpatialSegments which is not yet implemented.

      Could you please provide more information on how to get the depth of the segmented object?

      Thanks in advance.

      Hi @u111s ,
      If you have depth aligned to color stream, and do segmetnation on color stream, you could overlay segmentation results on depth stream. If you do that, you have a mask and depth info, by combining them you'd get only depth points of the segmented class. Then you could take eg. median depth pixel (or some smarter approach) to get Z of the segmented class.
      Thougths?
      Thanks, Erik