Sure,
Here is a MRE for the object at 7m.
You must put "left_7m.png" (resp. "right_7m.png") image in a left/ (resp. right/) folder, both folders in same parent folder.
I had to anonymize the images, but I obtained the same depth in the ROI as in the non-anonymized images:
- using speckle filter or no filter, I got a distance of 6680 mm (4.6% error)
- using spatial filter, I got a distance of 5465 mm (21.9% error)


#!/usr/bin/env python3
import cv2
import depthai as dai
from time import sleep
import datetime
import os
dataset_path = '/PATH/TO/LEFT_RIGHT_FOLDERS/'
output_root = dataset_path
show = False
save_depth = False
rectify = True
config_dict = {
"leftRightCheck": True, # Better handling for occlusions
"extendedDisparity": True, # Closer-in minimum depth, disparity range is doubled.
"subpixel": False, # Better accuracy for longer distance, fractional disparity 32-levels
"speckleFilter": {"enable": False},
"spatialFilter": {"enable": True},
}
if save_depth:
out_depth_path = os.path.join(output_root, 'depth')
if not os.path.exists(out_depth_path):
os.makedirs(out_depth_path)
lrcheck = config_dict["leftRightCheck"] # Better handling for occlusions
extended = config_dict["extendedDisparity"] # Closer-in minimum depth, disparity range is doubled. Unsupported for now.--> VL: seems supported
subpixel = config_dict["subpixel"] # Better accuracy for longer distance, fractional disparity 32-levels
class DatasetManager:
def __init__(self, path, left_folder, right_folder):
self.path = path
self.index = 0
self.left_folder = left_folder
self.right_folder = right_folder
self.left_path = os.path.join(self.path, self.left_folder)
self.right_path = os.path.join(self.path, self.right_folder)
self.left_names = self.get_sorted_images(self.left_path)
self.right_names = self.get_sorted_images(self.right_path)
self.length = len(self.left_names)
if self.length == 0:
raise RuntimeError("No dataset found at {}".format(path))
assert self.length == len(self.right_names), "Not same number of images in left and right folders"
def get_sorted_images(self, path):
image_files = [file for file in os.listdir(path)]
return sorted(image_files)
def get(self):
return os.path.join(self.left_path, self.left_names[self.index]), os.path.join(self.right_path, self.right_names[self.index])
def get_name(self):
return self.left_names[self.index], self.right_names[self.index], self.left_names[self.index].split('_')[1].split('.')[0]
def next(self):
self.index = (self.index + 1) % self.length
return self.get()
def prev(self):
self.index = (self.index - 1) % self.length
return self.get()
def __len__(self):
return self.length
dataset = DatasetManager(dataset_path, 'left', 'right')
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
stereo = pipeline.create(dai.node.StereoDepth)
monoLeft = pipeline.create(dai.node.XLinkIn)
monoRight = pipeline.create(dai.node.XLinkIn)
rgb = pipeline.create(dai.node.XLinkIn)
xoutDepth = pipeline.create(dai.node.XLinkOut)
monoLeft.setStreamName("in_left")
monoRight.setStreamName("in_right")
rgb.setStreamName("in_rgb")
xoutDepth.setStreamName("depth")
# Properties
camRgb.setBoardSocket(dai.CameraBoardSocket.CAM_A)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.setRectifyEdgeFillColor(0) # Black, to better see the cutout
stereo.setLeftRightCheck(lrcheck)
stereo.setExtendedDisparity(extended)
stereo.setSubpixel(subpixel)
stereo.setRectification(rectify)
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
stereo.depth.link(xoutDepth.input)
config = stereo.initialConfig.get()
config.postProcessing.speckleFilter.enable = config_dict["speckleFilter"]["enable"]
config.postProcessing.spatialFilter.enable = config_dict["spatialFilter"]["enable"]
stereo.initialConfig.set(config)
stereo.setPostProcessingHardwareResources(3, 3)
stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)
streams = ['depth']
print("Connecting and starting the pipeline")
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
inStreams = ["in_left", "in_right"]
inStreamsCameraID = [dai.CameraBoardSocket.CAM_B, dai.CameraBoardSocket.CAM_C]
in_q_list = []
for s in inStreams:
q = device.getInputQueue(s)
in_q_list.append(q)
# Create a receive queue for each stream
q_list = []
for s in streams:
q = device.getOutputQueue(s, 8, blocking=False)
q_list.append(q)
# Need to set a timestamp for input frames, for the sync stage in Stereo node
timestamp_ms = 0
index = 0
prevQueues = q_list.copy()
while True:
# Handle input streams, if any
if in_q_list:
dataset_size = len(dataset) # 1 # Number of image pairs
frame_interval_ms = 100
dataset.index = index
for i, q in enumerate(in_q_list):
if q.getName() == "in_left":
path = dataset.get()[0]
elif q.getName() == "in_right":
path = dataset.get()[1]
data = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
height, width = data.shape
data = data.reshape(height*width)
tstamp = datetime.timedelta(seconds=timestamp_ms // 1000,
milliseconds=timestamp_ms % 1000)
img = dai.ImgFrame()
img.setData(data)
img.setTimestamp(tstamp)
img.setInstanceNum(inStreamsCameraID[i])
img.setType(dai.ImgFrame.Type.RAW8)
img.setWidth(width)
img.setHeight(height)
q.send(img)
# print("Sent frame: {:25s}".format(path), "timestamp_ms:", timestamp_ms)
timestamp_ms += frame_interval_ms
index = (index + 1) % dataset_size
sleep(frame_interval_ms / 1000)
queues = q_list.copy()
def ListDiff(li1, li2):
return list(set(li1) - set(li2)) + list(set(li2) - set(li1))
diff = ListDiff(prevQueues, queues)
for s in diff:
name = s.getName()
cv2.destroyWindow(name)
prevQueues = queues.copy()
for q in queues:
if q.getName() in ["left", "right"]: continue
data = q.get()
frame = data.getFrame()
_, _, img_prefix = dataset.get_name()
if q.getName() == 'depth':
if save_depth:
cv2.imwrite(os.path.join(out_depth_path, f'depth_{img_prefix}.png'), frame)
print(f"Estimated depth at pixel (712, 955): {frame[712, 955]} mm")
if show:
cv2.imshow(q.getName(), frame)
if index == 0:
break
key = cv2.waitKey(1)
if key == ord("q"):
break