Hello everyone,
I’m working on a high-performance DepthAI pipeline with a target latency of under 100 milliseconds. However, despite extensive testing and optimizations, I haven’t been able to reduce the latency below 128 milliseconds. This setup uses YOLOv6 Nano with 6 SHAVEs, compiled at 640x640 resolution.
My focus is on measuring actual end-to-end latency rather than just inference latency. To achieve this, I record a stopwatch, capture frames, and then compare the timestamps of the received image with the real-time frame.
The script runs on a Raspberry Pi CM4 mounted on a custom carrier board, and since no real-time display is required, it does not include an image output.
Has anyone tackled similar latency challenges with DepthAI? Any insights or suggestions would be greatly appreciated!
import cv2
import depthai as dai
blob = "35k_openvino_2022.1_6shave.blob"
class Oak:
def init(self):
self.pipeline = dai.Pipeline()
self.detections = []
self._camera = self._pipeline.createColorCamera()
self._camera.setFps(21)
self._camera.setPreviewSize(640, 640)
self._camera.setBoardSocket(dai.CameraBoardSocket.AUTO)
self._camera.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
self._camera.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
self._camera.setInterleaved(*False*)
self._yolo = self._pipeline.createYoloDetectionNetwork()
self._yolo.setConfidenceThreshold(0.5)
self._yolo.setIouThreshold(0.5)
self._yolo.setNumClasses(5)
self._yolo.setCoordinateSize(4)
self._yolo.setBlobPath(blob)
self._yolo.input.setBlocking(*False*)
self._yoloOut = self._pipeline.createXLinkOut()
self._yoloOut.setStreamName("yolo")
self._camera.preview.link(self._yolo.input)
self._yolo.out.link(self._yoloOut.input)
*def* run(self):
*try*:
*with* dai.Device(self._pipeline) *as* device:
nnQueue = device.getOutputQueue(name="yolo", maxSize=4, blocking=*False*)
*while True*:
nnInput = nnQueue.get()
*if* nnInput *is not None*:
self._detections = nnInput.detections
*if* cv2.waitKey(1) == ord('q'):
*break*
Exception as e:
print(f"Error: {e}")
finally:
pass
get(self, label=None):
filteredDetections = []
*for* detection *in* self._detections:
*if label is None or* detection.label == *label*:
filteredDetections.append({
"label": detection.label,
"confidence": detection.confidence,
"bbox": (detection.xmin, detection.ymin, detection.xmax, detection.ymax),
})
*return* filteredDetections