Hi erik,
Sorry for the delay
Can you please provide an example how to get frame from passthrough?
Here are MRE, python script, blob and onnx model, and the image of cubes(model should recognize them) )
#!/usr/bin/env python3
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
import time
import tensorflow as tf
import keras_cv
import keras
import onnxruntime
nnPath = str((Path('./models/YOLO KERAS/model_fp16_full.blob')).resolve().absolute())
nnPath_onnx = str((Path('./models/YOLO KERAS/model_fp16_full.onnx')).resolve().absolute())
session= onnxruntime.InferenceSession(nnPath_onnx)
input_name=session.get_inputs()[0].name
output_name0=session.get_outputs()[0].name
output_name1=session.get_outputs()[1].name
image_path = "all.jpg"
BOX_REGRESSION_CHANNELS=64
def decode_regression_to_boxes(preds):
"""Decodes the results of the YOLOV8Detector forward-pass into boxes.
Returns left / top / right / bottom predictions with respect to anchor
points.
Each coordinate is encoded with 16 predicted values. Those predictions are
softmaxed and multiplied by [0..15] to make predictions. The resulting
predictions are relative to the stride of an anchor box (and correspondingly
relative to the scale of the feature map from which the predictions came).
"""
preds_bbox = keras.layers.Reshape((-1, 4, BOX_REGRESSION_CHANNELS // 4))(
preds
)
preds_bbox = tf.nn.softmax(preds_bbox, axis=-1) * tf.range(
BOX_REGRESSION_CHANNELS // 4, dtype="float32"
)
return tf.reduce_sum(preds_bbox, axis=-1)
def dist2bbox(distance, anchor_points):
"""Decodes distance predictions into xyxy boxes.
Input left / top / right / bottom predictions are transformed into xyxy box
predictions based on anchor points.
The resulting xyxy predictions must be scaled by the stride of their
corresponding anchor points to yield an absolute xyxy box.
"""
left_top, right_bottom = tf.split(distance, 2, axis=-1)
x1y1 = anchor_points - left_top
x2y2 = anchor_points + right_bottom
return tf.concat((x1y1, x2y2), axis=-1) # xyxy bbox
def get_anchors(
image_shape,
strides=[8, 16, 32],
base_anchors=[0.5, 0.5],
):
"""Gets anchor points for YOLOV8.
YOLOV8 uses anchor points representing the center of proposed boxes, and
matches ground truth boxes to anchors based on center points.
Args:
image_shape: tuple or list of two integers representing the height and
width of input images, respectively.
strides: tuple of list of integers, the size of the strides across the
image size that should be used to create anchors.
base_anchors: tuple or list of two integers representing the offset from
(0,0) to start creating the center of anchor boxes, relative to the
stride. For example, using the default (0.5, 0.5) creates the first
anchor box for each stride such that its center is half of a stride
from the edge of the image.
Returns:
A tuple of anchor centerpoints and anchor strides. Multiplying the
two together will yield the centerpoints in absolute x,y format.
"""
base_anchors = tf.constant(base_anchors, dtype="float32")
all_anchors = []
all_strides = []
for stride in strides:
hh_centers = tf.range(0, image_shape[0], stride)
ww_centers = tf.range(0, image_shape[1], stride)
ww_grid, hh_grid = tf.meshgrid(ww_centers, hh_centers)
grid = tf.cast(
tf.reshape(tf.stack([hh_grid, ww_grid], 2), [-1, 1, 2]),
"float32",
)
anchors = (
tf.expand_dims(
base_anchors * tf.constant([stride, stride], "float32"), 0
)
+ grid
)
anchors = tf.reshape(anchors, [-1, 2])
all_anchors.append(anchors)
all_strides.append(tf.repeat(stride, anchors.shape[0]))
all_anchors = tf.cast(tf.concat(all_anchors, axis=0), "float32")
all_strides = tf.cast(tf.concat(all_strides, axis=0), "float32")
all_anchors = all_anchors / all_strides[:, None]
# Swap the x and y coordinates of the anchors.
all_anchors = tf.concat(
[all_anchors[:, 1, None], all_anchors[:, 0, None]], axis=-1
)
return all_anchors, all_strides
def decode_predictions(
boxes_,
scores_,
images,
):
boxes = boxes_
scores = scores_
boxes = decode_regression_to_boxes(boxes)
anchor_points, stride_tensor = get_anchors(image_shape=(640,640,3))
stride_tensor = tf.expand_dims(stride_tensor, axis=-1)
box_preds = dist2bbox(boxes, anchor_points) * stride_tensor
prediction_decoder = keras_cv.layers.MultiClassNonMaxSuppression(
bounding_box_format="xyxy",
from_logits=False,
iou_threshold = 0.5, confidence_threshold = 0.5
)
return prediction_decoder(box_preds, scores)
# Get argument first
labelMap = [
"green", "pink", "orange"
]
# Create pipeline
pipeline = dai.Pipeline()
camRgb = pipeline.create(dai.node.ColorCamera)
camRgb.setPreviewSize(640, 640)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFp16(True) # Model requires FP16 input
# NN that detects faces in the image
nn = pipeline.create(dai.node.NeuralNetwork)
nn.setBlobPath(nnPath)
nn.setNumInferenceThreads(2)
camRgb.preview.link(nn.input)
# Send bouding box from the NN to the host via XLink
nn_xout = pipeline.create(dai.node.XLinkOut)
nn_xout.setStreamName("nn")
nn.out.link(nn_xout.input)
# Send rgb frames to the host
rgb_xout = pipeline.create(dai.node.XLinkOut)
rgb_xout.setStreamName("rgb")
nn.passthrough.link(rgb_xout.input)
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues will be used to get the rgb frames and nn data from the outputs defined above
qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
detections = []
while True:
inRgb = qRgb.get()
frame = np.array(inRgb.getData()).view(np.float16).reshape((3,640,640)).transpose(1, 2, 0).astype(np.uint8).copy()
in_nn = qDet.tryGet()
if in_nn is not None:
# [print(f"Layer name: {l.name}, Type: {l.dataType}, Dimensions: {l.dims}") for l in inDet.getAllLayers()]
# Extract the output shape: (batch_size, channels, num_predictions)
boxes = np.array(in_nn.getLayerFp16('box')).reshape(1, 8400, 64).astype(dtype=np.float32)
classes = np.array(in_nn.getLayerFp16('class')).reshape(1, 8400, 3).astype(dtype=np.float32)
detections=[]
# print(classes)
result=decode_predictions(boxes, classes, np.expand_dims(np.array(frame),axis=0))
# print(result)
result_boxes=result["boxes"]
num_of_dects=result["num_detections"]
print("num_of_dects")
print(num_of_dects)
if result_boxes[0][0][0] !=-1.0:
detection = {
"label": 1,
"confidence": 0.1,
"box": result_boxes[0][0]}
detections.append(detection)
# Load and preprocess the image
image = cv2.imread(image_path)
image = cv2.resize(image, (640, 640)) # Resize to model's input size
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
image = inRgb.getCvFrame()
print(image.dtype)
image = np.expand_dims(image, axis=0)
image = np.reshape(image, (1,3,640,640))
res = session.run(
output_names=[output_name0, output_name1],
input_feed={input_name: image}
)
result=decode_predictions(res[0], res[1], np.expand_dims(np.array(frame),axis=0))
# print(result)
result_boxes=result["boxes"]
num_of_dects=result["num_detections"]
print("num_of_dects onnx")
print(num_of_dects)
cv2.imshow("rgb", frame)
if cv2.waitKey(1) == ord('q'):
break
- blob model
- onnx model
Converted with this params - --data_type=FP16 --mean_values=[0,0,0] --scale_values=[1,1,1] --layout=NHWC --input_shape=[1,3,640,640] --ip FP16 shaves - 6 version - 2022.1
image