# Object detector.
import argparse
from datetime import datetime
import depthai as dai
from pathlib import Path
import pdb
import threading
import time
EPOCH = datetime.utcfromtimestamp(0)
def unix_time_micros(dt):
return int((dt - EPOCH).total_seconds() * 1e6)
# Yolo object detector which uses stereo to predict bounding boxes and depth.
class YoloSpatialObjectDetector():
def __init__(self, device_info, camera_type):
nnPath = str((Path(__file__).parent / Path('20240721_best_ckpt_by_epoch_300_exp2_openvino_2022.1_6shave.blob')).resolve().absolute())
if not Path(nnPath).exists():
raise FileNotFoundError(f'Required file not found: {nnPath}')
# YOLO label names.
self.labelMap = [
"person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train",
"truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant",
"bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie",
"suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat",
"baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich",
"orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake",
"chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor",
"laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven",
"toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors",
"teddy bear", "hair drier", "toothbrush"
]
self.avoidLabels = {
"person": 0.4,
}
# Create pipeline.
pipeline = dai.Pipeline()
# Define sources and outputs.
camRgb = pipeline.create(dai.node.ColorCamera)
spatialDetectionNetwork = pipeline.create(dai.node.YoloSpatialDetectionNetwork)
monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
nnNetworkOut = pipeline.create(dai.node.XLinkOut)
xoutColorize = pipeline.create(dai.node.XLinkOut)
xoutRgb = pipeline.create(dai.node.XLinkOut)
xoutNN = pipeline.create(dai.node.XLinkOut)
xoutDepth = pipeline.create(dai.node.XLinkOut)
xoutColorize.setStreamName("colorize")
xoutRgb.setStreamName("rgb")
xoutNN.setStreamName("detections")
xoutDepth.setStreamName("depth")
nnNetworkOut.setStreamName("nnNetwork")
# Properties.
camRgb.setPreviewSize(416, 416)
if camera_type == 'STANDARD':
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setIspScale(2, 5)
elif camera_type == 'WIDE':
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_800_P)
camRgb.setIspScale(3, 4)
else:
raise ValueError('Invalid camera type.')
camRgb.setPreviewKeepAspectRatio(False)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.isp.link(xoutColorize.input)
camRgb.setFps(10)
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
monoLeft.setFps(10)
monoRight.setFps(10)
# Setting node configs.
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setMedianFilter(dai.StereoDepthProperties.MedianFilter.KERNEL_7x7)
stereo.initialConfig.setConfidenceThreshold(40)
stereo.setLeftRightCheck(True)
# Align depth map to the perspective of RGB camera, on which inference is done.
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
stereo.setSubpixel(True)
spatialDetectionNetwork.setBlobPath(nnPath)
spatialDetectionNetwork.setConfidenceThreshold(0.29)
spatialDetectionNetwork.input.setBlocking(False)
spatialDetectionNetwork.setBoundingBoxScaleFactor(0.5)
spatialDetectionNetwork.setDepthLowerThreshold(100) # mm
spatialDetectionNetwork.setDepthUpperThreshold(18000) # mm
# Yolo specific parameters.
spatialDetectionNetwork.setNumClasses(80)
spatialDetectionNetwork.setCoordinateSize(4)
spatialDetectionNetwork.setIouThreshold(0.5)
spatialDetectionNetwork.setSpatialCalculationAlgorithm(dai.SpatialLocationCalculatorAlgorithm.MEDIAN)
# Linking.
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
camRgb.preview.link(spatialDetectionNetwork.input)
spatialDetectionNetwork.passthrough.link(xoutRgb.input)
spatialDetectionNetwork.out.link(xoutNN.input)
stereo.depth.link(spatialDetectionNetwork.inputDepth)
spatialDetectionNetwork.passthroughDepth.link(xoutDepth.input)
spatialDetectionNetwork.outNetwork.link(nnNetworkOut.input)
# Oak-D connected to onboard computer.
self.device_info = device_info
self.cap = dai.Device(pipeline, device_info)
self.cap.setLogLevel(dai.LogLevel.DEBUG)
self.cap.setLogOutputLevel(dai.LogLevel.DEBUG)
# Output queues will be used to get the rgb frames and nn data from the outputs defined above.
self.colorizeQueue = self.cap.getOutputQueue(name="colorize", maxSize=1, blocking=False)
self.previewQueue = self.cap.getOutputQueue(name="rgb", maxSize=1, blocking=False)
self.detectionNNQueue = self.cap.getOutputQueue(name="detections", maxSize=1, blocking=False)
self.depthQueue = self.cap.getOutputQueue(name="depth", maxSize=1, blocking=False)
self.networkQueue = self.cap.getOutputQueue(name="nnNetwork", maxSize=1, blocking=False)
print('Bootloader version: ', self.cap.getBootloaderVersion(), device_info)
print('Initialized YoloSpatialObjectDetector.')
class ObjectDetectorManager():
def __init__(self,
camera_one_id='169.254.1.222',
camera_two_id='169.254.1.221',
camera_three_id='169.254.1.220',
camera_four_id='169.254.1.219'):
devices = dai.Device.getAllAvailableDevices()
print(f'Found {len(devices)} cameras:', devices)
if len(devices) < 4:
raise ValueError('Not enough devices. Object detection requires 4 cameras.')
self.detector_one = None
self.detector_two = None
self.detector_three = None
self.detector_four = None
for device in devices:
if device.name == camera_one_id or device.mxid == camera_one_id:
self.detector_one = YoloSpatialObjectDetector(device, 'STANDARD')
elif device.name == camera_two_id or device.mxid == camera_two_id:
self.detector_two = YoloSpatialObjectDetector(device, 'STANDARD')
elif device.name == camera_three_id or device.mxid == camera_three_id:
self.detector_three = YoloSpatialObjectDetector(device, 'STANDARD')
elif device.name == camera_four_id or device.mxid == camera_four_id:
self.detector_four = YoloSpatialObjectDetector(device, 'STANDARD')
assert self.detector_one is not None
assert self.detector_two is not None
assert self.detector_three is not None
assert self.detector_four is not None
def run(self):
print('Launching ObjectDetectorManager.')
# Give time to warm up the cameras.
time.sleep(5)
# Time between images (seconds).
min_time_elapsed_image = 0.2
startTimeImage = time.monotonic()
while True:
currentTime = time.monotonic()
if (currentTime - startTimeImage < min_time_elapsed_image):
continue
print('Getting images and detections', currentTime)
inPreviewOne = self.detector_one.previewQueue.get()
frameOne = inPreviewOne.getCvFrame()
inColorOne = self.detector_one.colorizeQueue.get()
colorFrameOne = inColorOne.getCvFrame()
inDepthOne = self.detector_one.depthQueue.get()
depthFrameOne = inDepthOne.getCvFrame()
inPreviewTwo = self.detector_two.previewQueue.get()
frameTwo = inPreviewTwo.getCvFrame()
inColorTwo = self.detector_two.colorizeQueue.get()
colorFrameTwo = inColorTwo.getCvFrame()
inDepthTwo = self.detector_two.depthQueue.get()
depthFrameTwo = inDepthTwo.getCvFrame()
inPreviewThree = self.detector_three.previewQueue.get()
frameThree = inPreviewThree.getCvFrame()
inColorThree = self.detector_three.colorizeQueue.get()
colorFrameThree = inColorThree.getCvFrame()
inDepthThree = self.detector_three.depthQueue.get()
depthFrameThree = inDepthThree.getCvFrame()
inPreviewFour = self.detector_four.previewQueue.get()
frameFour = inPreviewFour.getCvFrame()
inColorFour = self.detector_four.colorizeQueue.get()
colorFrameFour = inColorFour.getCvFrame()
inDepthFour = self.detector_four.depthQueue.get()
depthFrameFour = inDepthFour.getCvFrame()
# Detection bboxes.
inDetOne = self.detector_one.detectionNNQueue.get()
detectionsOne = inDetOne.detections
inDetTwo = self.detector_two.detectionNNQueue.get()
detectionsTwo = inDetTwo.detections
inDetThree = self.detector_three.detectionNNQueue.get()
detectionsThree = inDetThree.detections
inDetFour = self.detector_four.detectionNNQueue.get()
detectionsFour = inDetFour.detections
print('Done getting images and detections', time.monotonic())
startTimeImage = currentTime
class PropagatingThread(threading.Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret
parser = argparse.ArgumentParser()
opt = parser.parse_args()
if __name__ == '__main__':
try:
# Object detection thread.
manager = ObjectDetectorManager()
detection_thread = PropagatingThread(target=manager.run)
detection_thread.start()
detection_thread.join()
except Exception as e:
print(f'Object detector failed: {e}. Relaunching again in 5 seconds.')@"JustinZheng"#p27400