Sorry I put the code as comment not as code, I will list all the code here to make it clear:
This is the YOLOSeg.py
import math
import time
import cv2
import numpy as np
import onnxruntime
from utils import xywh2xyxy, nms, draw_detections, sigmoid
class YOLOSeg:
def __init__(self, path, conf_thres=0.7, iou_thres=0.5, num_masks=32):
self.conf_threshold = conf_thres
self.iou_threshold = iou_thres
self.num_masks = num_masks
# Initialize model
#self.initialize_model(path)
self.initialize_model_for_oak()
def __call__(self, image):
return self.segment_objects(image)
def initialize_model_for_oak(self, input_name = 'images', input_shape = [1, 3, 640, 640], input_height = 640, input_width = 640, output_names = ['output0', 'output1']):
# input details
self.input_names = input_name
self.input_shape = input_shape
self.input_height = input_height
self.input_width = input_width
# output details
self.output_names = output_names
def initialize_model(self, path):
self.session = onnxruntime.InferenceSession(path,
providers=['CUDAExecutionProvider',
'CPUExecutionProvider'])
# Get model info
self.get_input_details()
self.get_output_details()
def segment_objects(self, image):
input_tensor = self.prepare_input(image)
# Perform inference on the image
outputs = self.inference(input_tensor)
self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0])
self.mask_maps = self.process_mask_output(mask_pred, outputs[1])
return self.boxes, self.scores, self.class_ids, self.mask_maps
def segment_objects_from_oakd(self, output0, output1):
self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(output0)
self.mask_maps = self.process_mask_output(mask_pred, output1)
return self.boxes, self.scores, self.class_ids, self.mask_maps
def prepare_input_for_oakd(self, shape):
self.img_height = shape[0]
self.img_width = shape[1]
def prepare_input(self, image):
self.img_height, self.img_width = image.shape[:2]
input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize input image
input_img = cv2.resize(input_img, (self.input_width, self.input_height))
# Scale input pixel values to 0 to 1
input_img = input_img / 255.0
input_img = input_img.transpose(2, 0, 1)
input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32)
return input_tensor
def inference(self, input_tensor):
start = time.perf_counter()
outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor})
# print(f"Inference time: {(time.perf_counter() - start)*1000:.2f} ms")
return outputs
def process_box_output(self, box_output):
predictions = np.squeeze(box_output).T
num_classes = box_output.shape[1] - self.num_masks - 4
# Filter out object confidence scores below threshold
scores = np.max(predictions[:, 4:4+num_classes], axis=1)
predictions = predictions[scores > self.conf_threshold, :]
scores = scores[scores > self.conf_threshold]
if len(scores) == 0:
return [], [], [], np.array([])
box_predictions = predictions[..., :num_classes+4]
mask_predictions = predictions[..., num_classes+4:]
# Get the class with the highest confidence
class_ids = np.argmax(box_predictions[:, 4:], axis=1)
# Get bounding boxes for each object
boxes = self.extract_boxes(box_predictions)
# Apply non-maxima suppression to suppress weak, overlapping bounding boxes
indices = nms(boxes, scores, self.iou_threshold)
return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices]
def process_mask_output(self, mask_predictions, mask_output):
if mask_predictions.shape[0] == 0:
return []
mask_output = np.squeeze(mask_output)
# Calculate the mask maps for each box
num_mask, mask_height, mask_width = mask_output.shape # CHW
masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1)))
masks = masks.reshape((-1, mask_height, mask_width))
# Downscale the boxes to match the mask size
scale_boxes = self.rescale_boxes(self.boxes,
(self.img_height, self.img_width),
(mask_height, mask_width))
# For every box/mask pair, get the mask map
mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width))
blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height))
for i in range(len(scale_boxes)):
scale_x1 = int(math.floor(scale_boxes[i][0]))
scale_y1 = int(math.floor(scale_boxes[i][1]))
scale_x2 = int(math.ceil(scale_boxes[i][2]))
scale_y2 = int(math.ceil(scale_boxes[i][3]))
x1 = int(math.floor(self.boxes[i][0]))
y1 = int(math.floor(self.boxes[i][1]))
x2 = int(math.ceil(self.boxes[i][2]))
y2 = int(math.ceil(self.boxes[i][3]))
scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2]
crop_mask = cv2.resize(scale_crop_mask,
(x2 - x1, y2 - y1),
interpolation=cv2.INTER_CUBIC)
crop_mask = cv2.blur(crop_mask, blur_size)
crop_mask = (crop_mask > 0.5).astype(np.uint8)
mask_maps[i, y1:y2, x1:x2] = crop_mask
return mask_maps
def extract_boxes(self, box_predictions):
# Extract boxes from predictions
boxes = box_predictions[:, :4]
# Scale boxes to original image dimensions
boxes = self.rescale_boxes(boxes,
(self.input_height, self.input_width),
(self.img_height, self.img_width))
# Convert boxes to xyxy format
boxes = xywh2xyxy(boxes)
# Check the boxes are within the image
boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width)
boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height)
boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width)
boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height)
return boxes
def draw_detections(self, image, draw_scores=True, mask_alpha=0.4):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha)
def draw_masks(self, image, draw_scores=True, mask_alpha=0.5):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha, mask_maps=self.mask_maps)
def get_input_details(self):
model_inputs = self.session.get_inputs()
self.input_names = [model_inputs[i].name for i in range(len(model_inputs))]
self.input_shape = model_inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
def get_output_details(self):
model_outputs = self.session.get_outputs()
self.output_names = [model_outputs[i].name for i in range(len(model_outputs))]
@staticmethod
def rescale_boxes(boxes, input_shape, image_shape):
# Rescale boxes to original image dimensions
input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]])
boxes = np.divide(boxes, input_shape, dtype=np.float32)
boxes *= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])
return boxes
if __name__ == '__main__':
from imread_from_url import imread_from_url
# Initialize YOLOv8 Instance Segmentator
yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
img_url = "https://live.staticflickr.com/13/19041780_d6fd803de0_3k.jpg"
img = imread_from_url(img_url)
# Detect Objects
yoloseg(img)
# Draw detections
combined_img = yoloseg.draw_masks(img)
cv2.namedWindow("Output", cv2.WINDOW_NORMAL)
cv2.imshow("Output", combined_img)
cv2.waitKey(0)
And you will need in the same folder this utils.py:
ibaiGorordo/ONNX-YOLOv8-Instance-Segmentationblob/main/yoloseg/utils.py
You only need to pass the output as I do here:
import cv2
import numpy as np
import depthai as dai
import time
from YOLOSeg import YOLOSeg
pathYoloBlob = "./yolov8n-seg.blob"
# Create OAK-D pipeline
pipeline = dai.Pipeline()
cam_rgb = pipeline.createColorCamera()
cam_rgb.setPreviewSize(640, 640)
cam_rgb.setInterleaved(False)
nn = pipeline.create(dai.node.NeuralNetwork)
nn.setBlobPath(pathYoloBlob)
cam_rgb.preview.link(nn.input)
xout_rgb = pipeline.createXLinkOut()
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)
xout_nn_yolo = pipeline.createXLinkOut()
xout_nn_yolo.setStreamName("nn_yolo")
nn.out.link(xout_nn_yolo.input)
# Start aplication
with depthai.Device(pipeline) as device:
q_rgb = device.getOutputQueue("rgb")
q_nn_yolo = device.getOutputQueue("nn_yolo")
frame = None
# Since the detections returned by nn have values from <0..1> range, they need to be multiplied by frame width/height to
# receive the actual position of the bounding box on the image
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
# Main host-side application loop
while True:
in_rgb = q_rgb.tryGet()
in_nn_yolo = q_nn_yolo.tryGet()
if in_rgb is not None:
frame = in_rgb.getCvFrame()
if in_nn_yolo is not None:
# Here is the problem
output0 = np.reshape(in_nn_yolo.getLayerFp16("output0"), newshape=([1, 116, 8400]))
output1 = np.reshape(in_nn_yolo.getLayerFp16("output1"), newshape=([1, 32, 160, 160]))
# Si tenemos ambos outputs podemos calcular la mascara final
if( len(output0) > 0 and len(output1) > 0 ):
#Post-process, this is fast, no problems here
yoloseg = YOLOSeg("", conf_thres=0.3, iou_thres=0.5)
yoloseg.prepare_input_for_oakd(frame.shape[:2])
yoloseg.segment_objects_from_oakd(output0,output1)
combined_img = yoloseg.draw_masks(frame.copy())
cv2.imshow("Output", combined_img)
else:
print("in_nn_yolo EMPTY")
else:
print("in_rgb EMPTY")
# at any time, you can press "q" and exit the main loop, therefore exiting the program itself
if cv2.waitKey(1) == ord('q'):
break
The "yolov8n-seg.blob" that I use is the original of ultralytics https://docs.ultralytics.com/es/tasks/segment/#export
yolo export model=yolov8n-seg.pt format=onnx
And convert it to blob using the luxonis tool http://blobconverter.luxonis.com/ with:
Choose OpenVINO version -> 2022.1
Choose model source -> ONNX
and with the parameter --data_type=FP16 --mean_values=[0,0,0] --scale_values=[255,255,255] and 6 shaves
I do not remember right now if mean_values=[0,0,0] because I did it in another computer but if I am not wrong is that because yolo works with [0,1] internaly.
I think I have summarize the steps I made to make it work, if you get an empty list for output0 check if you made the same as I do.
Regards,
Pedro.