Hello, I am having trouble converting "spatial_mono_video.py" from depthai examples, to run on monoLeft/Right video footages instead of monoLeft/Right cams.
Approach:
- Remove the two MonoCamera nodes with XLinkIn nodes
- Open left/right videos and get images from cv2.VideoCapture()
- Put images onto dai.ImgFrame
- Send dai.ImgFrames into inputques of XLinkIn nodes.
I tried to keep the changes minimal, trying to implement the approach above to the original code.
However, I am running into an error that it cannot read data from stream. I am not sure if the error occurs just at the end with an XLinkOUt node, or before that. Would you have any suggestions for how to proceed?
Error message:
[14442C10E1A9F4D600] [169.254.1.222] [6.288] [StereoDepth(2)] [error] There is no available extrinsic calibration between camera ID: '0' and '0'
[2025-03-05 05:55:59.408] [depthai] [error] Device with id 14442C10E1A9F4D600 has crashed. Crash dump logs are stored in: C:\Users\jkim\Documents\Interactive_Application\timelab-cameras\spatial_detection.cache\depthai\crashdumps\71dcaa019bd5a7cb7646c64b2af962f567c9a9dd\crash_dump.json - please report to developers.
Traceback (most recent call last):
File "C:\Users\jkim\Documents\Interactive_Application\timelab-cameras\spatial_detection\spatial_mobilenet_mono_video.py", line 125, in <module>
inRectified = previewQueue.get()
RuntimeError: Communication exception - possible device error/misconfiguration. Original message 'Couldn't read data from stream: 'right' (X_LINK_ERROR)'
Code
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
import time
import os
#function used when putting video image frames to dai.imgFrame
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
#Paths
nnPath = str((Path(__file__).parent / Path('../models/mobilenet-ssd_openvino_2021.4_6shave.blob')).resolve().absolute())
monoLeft_path = 'C:\\Users\\jkim\\Documents\\Interactive_Application\\timelab-cameras\\videos\\mono_left.mp4'
monoRight_path = 'C:\\Users\\jkim\\Documents\\Interactive_Application\\timelab-cameras\\videos\\mono_right.mp4'
# MobilenetSSD label texts
labelMap = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
"diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
syncNN = True
#create pipeline
pipeline = dai.Pipeline()
monoLeft = pipeline.create(dai.node.XLinkIn)
monoRight = pipeline.create(dai.node.XLinkIn)
stereo = pipeline.create(dai.node.StereoDepth)
spatialDetectionNetwork = pipeline.create(dai.node.MobileNetSpatialDetectionNetwork)
imageManip = pipeline.create(dai.node.ImageManip)
xoutManip = pipeline.create(dai.node.XLinkOut)
nnOut = pipeline.create(dai.node.XLinkOut)
xoutDepth = pipeline.create(dai.node.XLinkOut)
monoLeft.setStreamName("monoLeft")
monoRight.setStreamName("monoRight")
xoutManip.setStreamName("right")
nnOut.setStreamName("detections")
xoutDepth.setStreamName("depth")
# Properties
imageManip.initialConfig.setResize(300, 300)
# The NN model expects BGR input. By default ImageManip output type would be same as input (gray in this case)
imageManip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888p)
# StereoDepth
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setSubpixel(True)
# Define a neural network that will make predictions based on the source frames
spatialDetectionNetwork.setConfidenceThreshold(0.5)
spatialDetectionNetwork.setBlobPath(nnPath)
spatialDetectionNetwork.input.setBlocking(False)
spatialDetectionNetwork.setBoundingBoxScaleFactor(0.5)
spatialDetectionNetwork.setDepthLowerThreshold(100)
spatialDetectionNetwork.setDepthUpperThreshold(5000)
# Linking
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
imageManip.out.link(spatialDetectionNetwork.input)
if syncNN:
spatialDetectionNetwork.passthrough.link(xoutManip.input)
else:
imageManip.out.link(xoutManip.input)
spatialDetectionNetwork.out.link(nnOut.input)
stereo.rectifiedRight.link(imageManip.inputImage)
stereo.depth.link(spatialDetectionNetwork.inputDepth)
spatialDetectionNetwork.passthroughDepth.link(xoutDepth.input)
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Input queues to send image frames from video to device
monoLeftInQ = device.getInputQueue(name = "monoLeft")
monoRightInQ = device.getInputQueue(name = "monoRight")
# Output queues will be used to get the rgb frames and nn data from the outputs defined above
previewQueue = device.getOutputQueue(name="right", maxSize=4, blocking=False)
detectionNNQueue = device.getOutputQueue(name="detections", maxSize=4, blocking=False)
depthQueue = device.getOutputQueue(name="depth", maxSize=4, blocking=False)
rectifiedRight = None
detections = []
startTime = time.monotonic()
counter = 0
fps = 0
color = (255, 255, 255)
capL = cv2.VideoCapture(monoLeft_path)
capR = cv2.VideoCapture(monoRight_path)
while capL.isOpened():
print("debug: capL opened")
readL, frameL = capL.read()
readR, frameR = capR.read()
if not readL: #need to be readL AND readR
break
#From Left Video send frame to dai.imgFrame to inputQueue
#you should set this up as a function to make it reusable
imgL = dai.ImgFrame()
imgL.setData(to_planar(frameL, (300, 300)))
imgL.setTimestamp(time.monotonic())
imgL.setWidth(1280)
imgL.setHeight(720)
monoLeftInQ.send(imgL)
#From Right Video send frame to dai.imgFrame to inputQueue
imgR = dai.ImgFrame()
imgR.setData(to_planar(frameR, (300, 300)))
imgR.setTimestamp(time.monotonic())
imgR.setWidth(1280)
imgR.setHeight(720)
monoRightInQ.send(imgR)
inRectified = previewQueue.get()
inDet = detectionNNQueue.get()
inDepth = depthQueue.get()
counter += 1
currentTime = time.monotonic()
if (currentTime - startTime) > 1:
fps = counter / (currentTime - startTime)
counter = 0
startTime = currentTime
rectifiedRight = inRectified.getCvFrame()
depthFrame = inDepth.getFrame() # depthFrame values are in millimeters
depth_downscaled = depthFrame[::4]
if np.all(depth_downscaled == 0):
min_depth = 0 # Set a default minimum depth value when all elements are zero
else:
min_depth = np.percentile(depth_downscaled[depth_downscaled != 0], 1)
max_depth = np.percentile(depth_downscaled, 99)
depthFrameColor = np.interp(depthFrame, (min_depth, max_depth), (0, 255)).astype(np.uint8)
depthFrameColor = cv2.applyColorMap(depthFrameColor, cv2.COLORMAP_HOT)
detections = inDet.detections
# If the rectifiedRight is available, draw bounding boxes on it and show the rectifiedRight
height = rectifiedRight.shape[0]
width = rectifiedRight.shape[1]
for detection in detections:
roiData = detection.boundingBoxMapping
roi = roiData.roi
roi = roi.denormalize(depthFrameColor.shape[1], depthFrameColor.shape[0])
topLeft = roi.topLeft()
bottomRight = roi.bottomRight()
xmin = int(topLeft.x)
ymin = int(topLeft.y)
xmax = int(bottomRight.x)
ymax = int(bottomRight.y)
cv2.rectangle(depthFrameColor, (xmin, ymin), (xmax, ymax), color, cv2.FONT_HERSHEY_SCRIPT_SIMPLEX)
# Denormalize bounding box
x1 = int(detection.xmin * width)
x2 = int(detection.xmax * width)
y1 = int(detection.ymin * height)
y2 = int(detection.ymax * height)
try:
label = labelMap[detection.label]
except:
label = detection.label
cv2.putText(rectifiedRight, str(label), (x1 + 10, y1 + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(rectifiedRight, "{:.2f}".format(detection.confidence*100), (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(rectifiedRight, f"X: {int(detection.spatialCoordinates.x)} mm", (x1 + 10, y1 + 50), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(rectifiedRight, f"Y: {int(detection.spatialCoordinates.y)} mm", (x1 + 10, y1 + 65), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(rectifiedRight, f"Z: {int(detection.spatialCoordinates.z)} mm", (x1 + 10, y1 + 80), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(rectifiedRight, (x1, y1), (x2, y2), color, cv2.FONT_HERSHEY_SIMPLEX)
cv2.putText(rectifiedRight, "NN fps: {:.2f}".format(fps), (2, rectifiedRight.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, color)
cv2.imshow("depth", depthFrameColor)
cv2.imshow("rectified right", rectifiedRight)
if cv2.waitKey(1) == ord('q'):
break