I've converted the yolov8 instance segmentation model into blob, but I don't know how to extract the output data of the neural network to segment the RGB image? Does anyone implement it?
How do I deploy a yolov8 segment model on an OAK D Pro camera?
- Edited
I made some modifications to this class to get the output of the YOLO seg (you will need to download utils.py too):
ibaiGorordo/ONNX-YOLOv8-Instance-Segmentationblob/main/yoloseg/YOLOSeg.py#L91
I put the oak code here if you want to check:
https://stackoverflow.com/questions/78153689/numpy-array-slow-with-large-list
And this is the class modifed:
import math
import time
import cv2
import numpy as np
import onnxruntime
from utils import xywh2xyxy, nms, draw_detections, sigmoid
class YOLOSeg:
def __init__(self, path, conf_thres=0.7, iou_thres=0.5, num_masks=32): self.conf_threshold = conf_thres self.iou_threshold = iou_thres self.num_masks = num_masks # Initialize model #self.initialize_model(path) self.initialize_model_for_oak() def __call__(self, image): return self.segment_objects(image) def initialize_model_for_oak(self, input_name = 'images', input_shape = [1, 3, 640, 640], input_height = 640, input_width = 640, output_names = ['output0', 'output1']): # input details self.input_names = input_name self.input_shape = input_shape self.input_height = input_height self.input_width = input_width # output details self.output_names = output_names def initialize_model(self, path): self.session = onnxruntime.InferenceSession(path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) # Get model info self.get_input_details() self.get_output_details() def segment_objects(self, image): input_tensor = self.prepare_input(image) # Perform inference on the image outputs = self.inference(input_tensor) self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0]) self.mask_maps = self.process_mask_output(mask_pred, outputs[1]) return self.boxes, self.scores, self.class_ids, self.mask_maps def segment_objects_from_oakd(self, output0, output1): self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(output0) self.mask_maps = self.process_mask_output(mask_pred, output1) return self.boxes, self.scores, self.class_ids, self.mask_maps def prepare_input_for_oakd(self, shape): self.img_height = shape[0] self.img_width = shape[1] def prepare_input(self, image): self.img_height, self.img_width = image.shape[:2] input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Resize input image input_img = cv2.resize(input_img, (self.input_width, self.input_height)) # Scale input pixel values to 0 to 1 input_img = input_img / 255.0 input_img = input_img.transpose(2, 0, 1) input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32) return input_tensor def inference(self, input_tensor): start = time.perf_counter() outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor}) # print(f"Inference time: {(time.perf_counter() - start)\*1000:.2f} ms") return outputs def process_box_output(self, box_output): predictions = np.squeeze(box_output).T num_classes = box_output.shape[1] - self.num_masks - 4 # Filter out object confidence scores below threshold scores = np.max(predictions[:, 4:4+num_classes], axis=1) predictions = predictions[scores > self.conf_threshold, :] scores = scores[scores > self.conf_threshold] if len(scores) == 0: return [], [], [], np.array([]) box_predictions = predictions[..., :num_classes+4] mask_predictions = predictions[..., num_classes+4:] # Get the class with the highest confidence class_ids = np.argmax(box_predictions[:, 4:], axis=1) # Get bounding boxes for each object boxes = self.extract_boxes(box_predictions) # Apply non-maxima suppression to suppress weak, overlapping bounding boxes indices = nms(boxes, scores, self.iou_threshold) return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices] def process_mask_output(self, mask_predictions, mask_output): if mask_predictions.shape[0] == 0: return [] mask_output = np.squeeze(mask_output) # Calculate the mask maps for each box num_mask, mask_height, mask_width = mask_output.shape # CHW masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1))) masks = masks.reshape((-1, mask_height, mask_width)) # Downscale the boxes to match the mask size scale_boxes = self.rescale_boxes(self.boxes, (self.img_height, self.img_width), (mask_height, mask_width)) # For every box/mask pair, get the mask map mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width)) blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height)) for i in range(len(scale_boxes)): scale_x1 = int(math.floor(scale_boxes[i][0])) scale_y1 = int(math.floor(scale_boxes[i][1])) scale_x2 = int(math.ceil(scale_boxes[i][2])) scale_y2 = int(math.ceil(scale_boxes[i][3])) x1 = int(math.floor(self.boxes[i][0])) y1 = int(math.floor(self.boxes[i][1])) x2 = int(math.ceil(self.boxes[i][2])) y2 = int(math.ceil(self.boxes[i][3])) scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2] crop_mask = cv2.resize(scale_crop_mask, (x2 - x1, y2 - y1), interpolation=cv2.INTER_CUBIC) crop_mask = cv2.blur(crop_mask, blur_size) crop_mask = (crop_mask > 0.5).astype(np.uint8) mask_maps[i, y1:y2, x1:x2] = crop_mask return mask_maps def extract_boxes(self, box_predictions): # Extract boxes from predictions boxes = box_predictions[:, :4] # Scale boxes to original image dimensions boxes = self.rescale_boxes(boxes, (self.input_height, self.input_width), (self.img_height, self.img_width)) # Convert boxes to xyxy format boxes = xywh2xyxy(boxes) # Check the boxes are within the image boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width) boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height) boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width) boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height) return boxes def draw_detections(self, image, draw_scores=True, mask_alpha=0.4): return draw_detections(image, self.boxes, self.scores, self.class_ids, mask_alpha) def draw_masks(self, image, draw_scores=True, mask_alpha=0.5): return draw_detections(image, self.boxes, self.scores, self.class_ids, mask_alpha, mask_maps=self.mask_maps) def get_input_details(self): model_inputs = self.session.get_inputs() self.input_names = [model_inputs[i].name for i in range(len(model_inputs))] self.input_shape = model_inputs[0].shape self.input_height = self.input_shape[2] self.input_width = self.input_shape[3] def get_output_details(self): model_outputs = self.session.get_outputs() self.output_names = [model_outputs[i].name for i in range(len(model_outputs))] @staticmethod def rescale_boxes(boxes, input_shape, image_shape): # Rescale boxes to original image dimensions input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]]) boxes = np.divide(boxes, input_shape, dtype=np.float32) boxes \*= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]]) return boxes
if name == 'main':
from imread_from_url import imread_from_url model_path = "./modelo_base_segmentacion_coco/yolov8n-seg.onnx" # Initialize YOLOv8 Instance Segmentator yoloseg = YOLOSeg(model_path, conf_thres=0.3, iou_thres=0.5) img_url = "https://live.staticflickr.com/13/19041780_d6fd803de0_3k.jpg" img = imread_from_url(img_url) # Detect Objects yoloseg(img) # Draw detections combined_img = yoloseg.draw_masks(img) cv2.namedWindow("Output", cv2.WINDOW_NORMAL) cv2.imshow("Output", combined_img) cv2.waitKey(0)
It works but I have a problem of huge latency right now (4.2 seconds), if you reach better results please let me know!.
pedro-UCA Thank you very much for the codeļ¼
But I get an error after running the program:
output0 = np.reshape(in_nn_yolo.getLayerFp16("output0"), newshape=([1, 116, 8400]))
ValueError: cannot reshape array of size 0 into shape (1,116,8400)
The blob model I use is yolo's official yolov8n-seg.blob, and when I run the program, I comment out the YOLOSeg.py if name == 'main': after that, I don't know what the problem is?
- Edited
Sorry I put the code as comment not as code, I will list all the code here to make it clear:
This is the YOLOSeg.py
import math
import time
import cv2
import numpy as np
import onnxruntime
from utils import xywh2xyxy, nms, draw_detections, sigmoid
class YOLOSeg:
def __init__(self, path, conf_thres=0.7, iou_thres=0.5, num_masks=32):
self.conf_threshold = conf_thres
self.iou_threshold = iou_thres
self.num_masks = num_masks
# Initialize model
#self.initialize_model(path)
self.initialize_model_for_oak()
def __call__(self, image):
return self.segment_objects(image)
def initialize_model_for_oak(self, input_name = 'images', input_shape = [1, 3, 640, 640], input_height = 640, input_width = 640, output_names = ['output0', 'output1']):
# input details
self.input_names = input_name
self.input_shape = input_shape
self.input_height = input_height
self.input_width = input_width
# output details
self.output_names = output_names
def initialize_model(self, path):
self.session = onnxruntime.InferenceSession(path,
providers=['CUDAExecutionProvider',
'CPUExecutionProvider'])
# Get model info
self.get_input_details()
self.get_output_details()
def segment_objects(self, image):
input_tensor = self.prepare_input(image)
# Perform inference on the image
outputs = self.inference(input_tensor)
self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0])
self.mask_maps = self.process_mask_output(mask_pred, outputs[1])
return self.boxes, self.scores, self.class_ids, self.mask_maps
def segment_objects_from_oakd(self, output0, output1):
self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(output0)
self.mask_maps = self.process_mask_output(mask_pred, output1)
return self.boxes, self.scores, self.class_ids, self.mask_maps
def prepare_input_for_oakd(self, shape):
self.img_height = shape[0]
self.img_width = shape[1]
def prepare_input(self, image):
self.img_height, self.img_width = image.shape[:2]
input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize input image
input_img = cv2.resize(input_img, (self.input_width, self.input_height))
# Scale input pixel values to 0 to 1
input_img = input_img / 255.0
input_img = input_img.transpose(2, 0, 1)
input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32)
return input_tensor
def inference(self, input_tensor):
start = time.perf_counter()
outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor})
# print(f"Inference time: {(time.perf_counter() - start)*1000:.2f} ms")
return outputs
def process_box_output(self, box_output):
predictions = np.squeeze(box_output).T
num_classes = box_output.shape[1] - self.num_masks - 4
# Filter out object confidence scores below threshold
scores = np.max(predictions[:, 4:4+num_classes], axis=1)
predictions = predictions[scores > self.conf_threshold, :]
scores = scores[scores > self.conf_threshold]
if len(scores) == 0:
return [], [], [], np.array([])
box_predictions = predictions[..., :num_classes+4]
mask_predictions = predictions[..., num_classes+4:]
# Get the class with the highest confidence
class_ids = np.argmax(box_predictions[:, 4:], axis=1)
# Get bounding boxes for each object
boxes = self.extract_boxes(box_predictions)
# Apply non-maxima suppression to suppress weak, overlapping bounding boxes
indices = nms(boxes, scores, self.iou_threshold)
return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices]
def process_mask_output(self, mask_predictions, mask_output):
if mask_predictions.shape[0] == 0:
return []
mask_output = np.squeeze(mask_output)
# Calculate the mask maps for each box
num_mask, mask_height, mask_width = mask_output.shape # CHW
masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1)))
masks = masks.reshape((-1, mask_height, mask_width))
# Downscale the boxes to match the mask size
scale_boxes = self.rescale_boxes(self.boxes,
(self.img_height, self.img_width),
(mask_height, mask_width))
# For every box/mask pair, get the mask map
mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width))
blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height))
for i in range(len(scale_boxes)):
scale_x1 = int(math.floor(scale_boxes[i][0]))
scale_y1 = int(math.floor(scale_boxes[i][1]))
scale_x2 = int(math.ceil(scale_boxes[i][2]))
scale_y2 = int(math.ceil(scale_boxes[i][3]))
x1 = int(math.floor(self.boxes[i][0]))
y1 = int(math.floor(self.boxes[i][1]))
x2 = int(math.ceil(self.boxes[i][2]))
y2 = int(math.ceil(self.boxes[i][3]))
scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2]
crop_mask = cv2.resize(scale_crop_mask,
(x2 - x1, y2 - y1),
interpolation=cv2.INTER_CUBIC)
crop_mask = cv2.blur(crop_mask, blur_size)
crop_mask = (crop_mask > 0.5).astype(np.uint8)
mask_maps[i, y1:y2, x1:x2] = crop_mask
return mask_maps
def extract_boxes(self, box_predictions):
# Extract boxes from predictions
boxes = box_predictions[:, :4]
# Scale boxes to original image dimensions
boxes = self.rescale_boxes(boxes,
(self.input_height, self.input_width),
(self.img_height, self.img_width))
# Convert boxes to xyxy format
boxes = xywh2xyxy(boxes)
# Check the boxes are within the image
boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width)
boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height)
boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width)
boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height)
return boxes
def draw_detections(self, image, draw_scores=True, mask_alpha=0.4):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha)
def draw_masks(self, image, draw_scores=True, mask_alpha=0.5):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha, mask_maps=self.mask_maps)
def get_input_details(self):
model_inputs = self.session.get_inputs()
self.input_names = [model_inputs[i].name for i in range(len(model_inputs))]
self.input_shape = model_inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
def get_output_details(self):
model_outputs = self.session.get_outputs()
self.output_names = [model_outputs[i].name for i in range(len(model_outputs))]
@staticmethod
def rescale_boxes(boxes, input_shape, image_shape):
# Rescale boxes to original image dimensions
input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]])
boxes = np.divide(boxes, input_shape, dtype=np.float32)
boxes *= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])
return boxes
if __name__ == '__main__':
from imread_from_url import imread_from_url
# Initialize YOLOv8 Instance Segmentator
yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
img_url = "https://live.staticflickr.com/13/19041780_d6fd803de0_3k.jpg"
img = imread_from_url(img_url)
# Detect Objects
yoloseg(img)
# Draw detections
combined_img = yoloseg.draw_masks(img)
cv2.namedWindow("Output", cv2.WINDOW_NORMAL)
cv2.imshow("Output", combined_img)
cv2.waitKey(0)
And you will need in the same folder this utils.py:
ibaiGorordo/ONNX-YOLOv8-Instance-Segmentationblob/main/yoloseg/utils.py
You only need to pass the output as I do here:
import cv2
import numpy as np
import depthai as dai
import time
from YOLOSeg import YOLOSeg
pathYoloBlob = "./yolov8n-seg.blob"
# Create OAK-D pipeline
pipeline = dai.Pipeline()
cam_rgb = pipeline.createColorCamera()
cam_rgb.setPreviewSize(640, 640)
cam_rgb.setInterleaved(False)
nn = pipeline.create(dai.node.NeuralNetwork)
nn.setBlobPath(pathYoloBlob)
cam_rgb.preview.link(nn.input)
xout_rgb = pipeline.createXLinkOut()
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)
xout_nn_yolo = pipeline.createXLinkOut()
xout_nn_yolo.setStreamName("nn_yolo")
nn.out.link(xout_nn_yolo.input)
# Start aplication
with depthai.Device(pipeline) as device:
q_rgb = device.getOutputQueue("rgb")
q_nn_yolo = device.getOutputQueue("nn_yolo")
frame = None
# Since the detections returned by nn have values from <0..1> range, they need to be multiplied by frame width/height to
# receive the actual position of the bounding box on the image
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
# Main host-side application loop
while True:
in_rgb = q_rgb.tryGet()
in_nn_yolo = q_nn_yolo.tryGet()
if in_rgb is not None:
frame = in_rgb.getCvFrame()
if in_nn_yolo is not None:
# Here is the problem
output0 = np.reshape(in_nn_yolo.getLayerFp16("output0"), newshape=([1, 116, 8400]))
output1 = np.reshape(in_nn_yolo.getLayerFp16("output1"), newshape=([1, 32, 160, 160]))
# Si tenemos ambos outputs podemos calcular la mascara final
if( len(output0) > 0 and len(output1) > 0 ):
#Post-process, this is fast, no problems here
yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
yoloseg.prepare_input_for_oakd(frame.shape[:2])
yoloseg.segment_objects_from_oakd(output0,output1)
combined_img = yoloseg.draw_masks(frame.copy())
cv2.imshow("Output", combined_img)
else:
print("in_nn_yolo EMPTY")
else:
print("in_rgb EMPTY")
# at any time, you can press "q" and exit the main loop, therefore exiting the program itself
if cv2.waitKey(1) == ord('q'):
break
The "yolov8n-seg.blob" that I use is the original of ultralytics https://docs.ultralytics.com/es/tasks/segment/#export
yolo export model=yolov8n-seg.pt format=onnx
And convert it to blob using the luxonis tool http://blobconverter.luxonis.com/ with:
Choose OpenVINO version -> 2022.1
Choose model source -> ONNX
and with the parameter --data_type=FP16 --mean_values=[0,0,0] --scale_values=[255,255,255] and 6 shaves
I do not remember right now if mean_values=[0,0,0] because I did it in another computer but if I am not wrong is that because yolo works with [0,1] internaly.
I think I have summarize the steps I made to make it work, if you get an empty list for output0 check if you made the same as I do.
Regards,
Pedro.
Just use https://netron.app/ with the .onnx of your model to check the right output0/1 dimensions.
- Edited
Your method for instance segmentation works for me but I also need depth.
The depthai-sdk has MobilenetSpatialDetections and YoloSpatialDetections which gives the z-axis value. I want to achieve a similar one for segmentation. Something like YoloSpatialSegments which is not yet implemented.
Could you please provide more information on how to get the depth of the segmented object?
Thanks in advance.
Hi @u111s ,
If you have depth aligned to color stream, and do segmetnation on color stream, you could overlay segmentation results on depth stream. If you do that, you have a mask and depth info, by combining them you'd get only depth points of the segmented class. Then you could take eg. median depth pixel (or some smarter approach) to get Z of the segmented class.
Thougths?
Thanks, Erik