Hi erik , here you go.
device: Oak-D PRO AF
latest version of depthai and depthai-sdk
Note device = dai.Device("111111111111111111", maxUsbSpeed=dai.UsbSpeed.SUPER_PLUS)
, please change it to your own device mxID.
# %%
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
import time
import matplotlib.pyplot as plt
import math
import PySimpleGUI as gui
from depthai_sdk.fps import FPSHandler
import torch
# %%
class HostSync:
# adapted from https://github.com/luxonis/depthai-experiments/blob/master/gen2-depth-driven-focus/main.py
def __init__(self):
self.arrays = {}
def add_msg(self, name, msg):
if not name in self.arrays:
self.arrays[name] = []
# Add msg to array
self.arrays[name].append({"msg": msg, "seq": msg.getSequenceNum()})
synced = {}
for name, arr in self.arrays.items():
for i, obj in enumerate(arr):
if msg.getSequenceNum() == obj["seq"]:
synced[name] = obj["msg"]
if len(synced) == (2): # Color, Depth
# Remove old msgs
for name, arr in self.arrays.items():
for i, obj in enumerate(arr):
if obj["seq"] < msg.getSequenceNum():
arr.remove(obj)
else:
break
return synced
return False
class TextHelper:
# from: https://github.com/luxonis/depthai-experiments/blob/master/gen2-calc-spatials-on-host/utility.py
def __init__(self) -> None:
self.bg_color = (0, 0, 0)
self.color = (255, 255, 255)
self.text_type = cv2.FONT_HERSHEY_SIMPLEX
self.line_type = cv2.LINE_AA
def putText(self, frame, text, coords):
cv2.putText(frame, text, coords, self.text_type, 0.5, self.bg_color, 3, self.line_type)
cv2.putText(frame, text, coords, self.text_type, 0.5, self.color, 1, self.line_type)
def rectangle(self, frame, p1, p2):
cv2.rectangle(frame, p1, p2, self.bg_color, 3)
cv2.rectangle(frame, p1, p2, self.color, 1)
# %%
def printSystemInformation(info):
# from https://docs.luxonis.com/projects/api/en/latest/samples/SystemLogger/system_information/#system-information
m = 1024 * 1024 # MiB
print(f"Ddr used / total - {info.ddrMemoryUsage.used / m:.2f} / {info.ddrMemoryUsage.total / m:.2f} MiB")
print(f"Cmx used / total - {info.cmxMemoryUsage.used / m:.2f} / {info.cmxMemoryUsage.total / m:.2f} MiB")
print(f"LeonCss heap used / total - {info.leonCssMemoryUsage.used / m:.2f} / {info.leonCssMemoryUsage.total / m:.2f} MiB")
print(f"LeonMss heap used / total - {info.leonMssMemoryUsage.used / m:.2f} / {info.leonMssMemoryUsage.total / m:.2f} MiB")
t = info.chipTemperature
print(f"Chip temperature - average: {t.average:.2f}, css: {t.css:.2f}, mss: {t.mss:.2f}, upa: {t.upa:.2f}, dss: {t.dss:.2f}")
print(f"Cpu usage - Leon CSS: {info.leonCssCpuUsage.average * 100:.2f}%, Leon MSS: {info.leonMssCpuUsage.average * 100:.2f} %")
print("----------------------------------------")
# %%
def calc_spatials(frame, frame_resized, result, depthFrame, calibData, depthData):
# adapted from : https://github.com/luxonis/depthai-experiments/blob/master/gen2-calc-spatials-on-host/calc.py
def calc_angle(frame, offset, HFOV):
return math.atan(math.tan(HFOV / 2.0) * offset / (frame.shape[1] / 2.0))
imgH, imgW, _ = frame.shape
resizedh, resizedw, _ = frame_resized.shape
# only process the first detection
scaled_bb = {
"xmin": int(result.xmin[0]/imgW*resizedw),
"ymin": int(result.ymin[0]/imgH*resizedh),
"xmax": int(result.xmax[0]/imgW*resizedw),
"ymax": int(result.ymax[0]/imgH*resizedh),
}
depthROI = depthFrame[scaled_bb["ymin"]:scaled_bb["ymax"], scaled_bb["xmin"]:scaled_bb["xmax"]]
inRange = (150 <= depthROI) & (depthROI <= 1000) # treshold
averageDepth = np.mean(depthROI[inRange])
centroid = { # Get centroid of the ROI
'x': int((scaled_bb["xmax"] + scaled_bb["xmin"]) / 2),
'y': int((scaled_bb["ymax"] + scaled_bb["ymin"]) / 2)
}
midW = int(depthFrame.shape[1] / 2) # middle of the depth img width
midH = int(depthFrame.shape[0] / 2) # middle of the depth img height
bb_x_pos = centroid['x'] - midW
bb_y_pos = centroid['y'] - midH
HFOV = np.deg2rad(calibData.getFov(dai.CameraBoardSocket(depthData.getInstanceNum())))
angle_x = calc_angle(depthFrame, bb_x_pos, HFOV)
angle_y = calc_angle(depthFrame, bb_y_pos, HFOV)
spatials = {
'z': averageDepth,
'x': averageDepth * math.tan(angle_x),
'y': -averageDepth * math.tan(angle_y)
}
return spatials, scaled_bb, centroid
# %%
device = dai.Device("184430102145421300", maxUsbSpeed=dai.UsbSpeed.SUPER_PLUS)
# %%
# Create pipeline
pipeline = dai.Pipeline()
pipeline.setXLinkChunkSize(0)
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
xoutRgb = pipeline.create(dai.node.XLinkOut)
xoutDepth = pipeline.create(dai.node.XLinkOut)
xoutRgb.setStreamName("rgb")
xoutDepth.setStreamName("depth")
# mono and rgb properties
set_fps = 30
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_12_MP)
camRgb.setFps(set_fps)
# camRgb.setInterleaved(False)
try:
calibData = device.readCalibration2()
lensPosition = calibData.getLensPosition(dai.CameraBoardSocket.RGB)
if lensPosition:
camRgb.initialControl.setManualFocus(lensPosition)
else:
gui.popup("No Calib data for lensposition")
except:
raise
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoLeft.setFps(set_fps)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
monoRight.setFps(set_fps)
# stereoConfig
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
stereoConfig = stereo.initialConfig.get()
stereoConfig.postProcessing.speckleFilter.enable = True
stereoConfig.postProcessing.speckleFilter.speckleRange = 50
stereoConfig.postProcessing.spatialFilter.enable = True
stereoConfig.postProcessing.spatialFilter.holeFillingRadius = 2
stereoConfig.postProcessing.spatialFilter.numIterations = 1
stereoConfig.postProcessing.thresholdFilter.minRange = 150
stereoConfig.postProcessing.thresholdFilter.maxRange = 1000
stereoConfig.postProcessing.decimationFilter.decimationFactor = 2
stereo.initialConfig.set(stereoConfig)
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(True)
# Align depth map to rgb
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
stereo.setOutputSize(800, 599)
# Linking
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
camRgb.isp.link(xoutRgb.input)
stereo.depth.link(xoutDepth.input)
# logger
sysLog = pipeline.create(dai.node.SystemLogger)
linkOut = pipeline.create(dai.node.XLinkOut)
linkOut.setStreamName("sysinfo")
sysLog.setRate(0.5)
sysLog.out.link(linkOut.input)
# gets coordinate (pixels) of mouse on cv2 window when mouse hover overs cv2 window
point = (0, 0)
def show_distance(event, x,y, args, params):
global point
point = (x,y)
# print(f"point: {point}")
cv2.namedWindow("12mp")
cv2.setMouseCallback("12mp", show_distance)
# %%
# obj detection
model = torch.hub.load('ultralytics/yolov5', "yolov5s")
model.classes = [0] # detects `people` only
# %%
device.startPipeline(pipeline)
device.setLogLevel(dai.LogLevel.INFO)
device.setLogOutputLevel(dai.LogLevel.INFO)
calibData = device.readCalibration()
outputs = ['rgb', 'depth']
queues = [device.getOutputQueue(name, 4, False) for name in outputs]
qSysInfo = device.getOutputQueue(name="sysinfo", maxSize=4, blocking=False)
sync = HostSync()
fps = FPSHandler()
text = TextHelper()
synced_msgs = None
while True:
sysInfo = qSysInfo.tryGet()
if sysInfo is not None:
printSystemInformation(sysInfo)
for q in queues:
if q.has():
synced_msgs = sync.add_msg(q.getName(), q.get())
if synced_msgs:
fps.nextIter()
print('FPS', fps.fps())
frame = synced_msgs["rgb"].getCvFrame()
if 'depth' in synced_msgs:
depthFrame: dai.ImgFrame = synced_msgs["depth"].getFrame()
fov = synced_msgs['depth'].getInstanceNum()
frame_resized = cv2.resize(frame,(800,599), interpolation=cv2.INTER_NEAREST)
# object det yolov5 on host
result = model(frame[:,:,::-1],size=840)
result = result.pandas().xyxy[0]
if len(result)>0:
spatials, scaled_bb, centroid = calc_spatials(frame, frame_resized, result, depthFrame, calibData, synced_msgs["depth"])
text.rectangle(frame_resized, (scaled_bb["xmin"], scaled_bb["ymin"]), (scaled_bb["xmax"], scaled_bb["ymax"]))
text.putText(frame_resized, "X: " + ("{:.1f}cm".format(spatials['x']/10) if not math.isnan(spatials['x']) else "--"), (centroid["x"] + 10, centroid["y"] + 20))
text.putText(frame_resized, "Y: " + ("{:.1f}cm".format(spatials['y']/10) if not math.isnan(spatials['y']) else "--"), (centroid["x"] + 10, centroid["y"] + 35))
text.putText(frame_resized, "Z: " + ("{:.1f}cm".format(spatials['z']/10) if not math.isnan(spatials['z']) else "--"), (centroid["x"] + 10, centroid["y"] + 50))
distance = depthFrame[point[1]][point[0]]
cv2.circle(frame_resized, point, 10, (0,0,255))
cv2.putText(frame_resized, "{}cm".format(distance), (point[0],point[1]), cv2.FONT_HERSHEY_PLAIN, 2 , (255,255,255), 2 )
cv2.imshow("12mp", frame_resized)
if cv2.waitKey(1) == ord('q'):
break
device.close()