Sure. This is v2.28.0:
$$
class OakCameraDevice(camera_device.CameraDevice):
"""OAK CameraDevice implementation."""
OAK_DEPTH_RESOLUTIONS = {
(1280, 720): dai.MonoCameraProperties.SensorResolution.THE_720_P,
(640, 480): dai.MonoCameraProperties.SensorResolution.THE_480_P,
(640, 400): dai.MonoCameraProperties.SensorResolution.THE_400_P,
}
# The keys for this table are WXH@FPS strings, and the values are the
# latency in ms. These values were empirically determined from testing with
# Premio computers. Information provided by luxonis on expected latencies
# that we far exceed: https://docs.luxonis.com/software/depthai/optimizing/
RGB_ONLY_LATENCY_THRESHOLDS = defaultdict(
lambda: 200,
{
"3840x2160@6": 5000,
"1920x1080@30": 800,
"1920x1080@20": 320,
},
)
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize an OakCameraDevice.
Args:
*args: Variable length argument list. Passed to CameraDevice.
**kwargs: Arbitrary keyword arguments. Passed to CameraDevice.
latency_threshold: The maximum latency between camera capture
and host receive time to be allowed.
"""
super().__init__(*args, **kwargs)
if self.depth:
self.latency_threshold = self.depth.config.get("latency_threshold", 400)
self.rgb_latency_threshold = 200
else:
self.rgb_latency_threshold = self.RGB_ONLY_LATENCY_THRESHOLDS[
f"{self.rgb.resolution[0]}x{self.rgb.resolution[1]}@{self.rgb.fps}"
]
self.frame_sync_deque_size = self.rgb.fps
# Deques that store the last N RGB & Depth frames for syncing.
self.rgb_deque = deque([], maxlen=self.frame_sync_deque_size)
self.depth_deque = deque([], maxlen=self.frame_sync_deque_size)
self.left_mono_deque = deque([], maxlen=self.frame_sync_deque_size)
self.right_mono_deque = deque([], maxlen=self.frame_sync_deque_size)
self.is_wide = kwargs.get("is_wide", False)
self.last_seq_num_returned = None
self.device = None
def start(self) -> bool:
"""Start the OAK camera pipeline.
Returns:
True if the pipeline was started successfully.
"""
self.device = dai.Device(OAK_FIXED_IP)
self.pipeline = dai.Pipeline()
self.pipeline.setXLinkChunkSize(0) # Disable XLink chunking to reduce latency
# Get relevant RGB configurations
camera_exposure_us = self.rgb.config.get("camera_exposure_us", 1000)
camera_iso_sensitivity = self.rgb.config.get("camera_iso_sensitivity", 500)
if not self.rgb:
logging.error(
f"OakCameraDevice[{self.camera_index}]"
" requires an RGB camera configuration"
)
return False
# RGB output node for clients
rgb_out = self.pipeline.create(dai.node.XLinkOut)
rgb_out.setStreamName("rgb")
# Create color camera
rgb_camera = self.pipeline.create(dai.node.ColorCamera)
rgb_camera.setInterleaved(False)
rgb_camera.setBoardSocket(dai.CameraBoardSocket.RGB)
rgb_camera.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
rgb_camera.setFps(self.rgb.fps)
rgb_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
rgb_camera.initialControl.setSharpness(0)
rgb_camera.initialControl.setLumaDenoise(0)
rgb_camera.initialControl.setChromaDenoise(0)
# Resize 4k to a lower resolution on the ISP if needed
if tuple(self.rgb.resolution) == (1920, 1080):
rgb_camera.setIspScale(1, 2)
elif tuple(self.rgb.resolution) == (1280, 720):
rgb_camera.setIspScale(1, 3)
# RGB video encoding node
encoder = self.pipeline.create(dai.node.VideoEncoder)
encoder.setDefaultProfilePreset(1, dai.VideoEncoderProperties.Profile.MJPEG)
if self.is_wide:
# Create ImageManip node to correct distortion
manip = self.pipeline.createImageManip()
mesh, meshWidth, meshHeight = self.get_mesh(
self.device.readCalibration(), rgb_camera.getIspSize()
)
manip.setWarpMesh(mesh, meshWidth, meshHeight)
manip.setMaxOutputFrameSize(
rgb_camera.getIspWidth() * rgb_camera.getIspHeight() * 3 // 2,
)
rgb_camera.video.link(manip.inputImage)
manip.out.link(encoder.input)
encoder.bitstream.link(rgb_out.input)
else:
rgb_camera.video.link(encoder.input)
encoder.bitstream.link(rgb_out.input)
# If depth is enabled, add a stereo depth node to the pipeline
if self.depth:
depth_out = self.pipeline.create(dai.node.XLinkOut)
depth_out.setStreamName("depth")
brightness = self.depth.config.get("brightness", 0)
contrast = self.depth.config.get("contrast", 0)
camera_exposure_us = self.depth.config.get("camera_exposure_us")
camera_iso_sensitivity = self.depth.config.get("camera_iso_sensitivity")
spatial_filter_hole_filling_radius = self.depth.config.get(
"spatial_filter_hole_filling_radius", 3
)
spatial_filter_num_iterations = self.depth.config.get(
"spatial_filter_num_iterations", 3
)
if self.depth.resolution in self.OAK_DEPTH_RESOLUTIONS:
depth_camera_resolution = self.OAK_DEPTH_RESOLUTIONS[
self.depth.resolution
]
else:
logging.warn(
f"Unsupported depth resolution {self.depth.resolution}."
" Defaulting to 400p."
)
depth_camera_resolution = (
dai.MonoCameraProperties.SensorResolution.THE_400_P
)
white_balance = self.depth.config.get("white_balance")
# Create left/right mono cameras for Stereo depth
left_camera = self.pipeline.create(dai.node.MonoCamera)
left_camera.setResolution(depth_camera_resolution)
left_camera.setCamera("left")
left_camera.setFps(self.depth.fps)
if camera_exposure_us and camera_iso_sensitivity:
left_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
left_camera.initialControl.setBrightness(brightness)
left_camera.initialControl.setContrast(contrast)
if white_balance:
left_camera.initialControl.setManualWhiteBalance(white_balance)
right_camera = self.pipeline.create(dai.node.MonoCamera)
right_camera.setResolution(depth_camera_resolution)
right_camera.setCamera("right")
right_camera.setFps(self.depth.fps)
if camera_exposure_us and camera_iso_sensitivity:
right_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
right_camera.initialControl.setBrightness(brightness)
right_camera.initialControl.setContrast(contrast)
if white_balance:
right_camera.initialControl.setManualWhiteBalance(white_balance)
# Create a Stereo node that will produce the depth map
stereo = self.pipeline.create(dai.node.StereoDepth)
disparity_shift = self.depth.config.get("disparity_shift", 0)
stereo.initialConfig.setDisparityShift(disparity_shift)
depth_preset_mode = dai.node.StereoDepth.PresetMode.HIGH_DENSITY
stereo.setDefaultProfilePreset(depth_preset_mode)
# This is a non-edge preserving Median filter, which can be used to
# reduce noise and smoothen the depth map. Median filter is
# implemented in hardware, so it’s the fastest filter.
median_filter_str = self.depth.config.get("median_filter", "KERNEL_7x7")
if median_filter_str == "KERNEL_3x3":
median_filter = dai.MedianFilter.KERNEL_3x3
elif median_filter_str == "KERNEL_5x5":
median_filter = dai.MedianFilter.KERNEL_5x5
elif median_filter_str == "KERNEL_7x7":
median_filter = dai.MedianFilter.KERNEL_7x7
else:
median_filter = dai.MedianFilter.MEDIAN_OFF
stereo.initialConfig.setMedianFilter(median_filter)
# Computes and combines disparities in both L-R and R-L directions,
# and combine them. For better occlusion handling, discarding
# invalid disparity values
left_right_check = self.depth.config.get("left_right_check", True)
stereo.setLeftRightCheck(left_right_check)
# Disparity range increased from 0-95 to 0-190, combined from full
# resolution and downscaled images. Suitable for short range
# objects. We always set it to True.
extended_disparity = self.depth.config.get("extended_disparity", True)
stereo.setExtendedDisparity(extended_disparity)
# Subpixel mode improves the precision and is especially useful for
# long-range measurements. We always set it to False.
subpixel = self.depth.config.get("subpixel", False)
stereo.setSubpixel(subpixel)
# Align the depth map to the RGB image
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
# Set depth post-processing
config = stereo.initialConfig.get()
# Spatial Edge-Preserving Filter will fill invalid depth pixels with
# valid neighboring depth pixels. It performs a series of 1D
# horizontal and vertical passes or iterations, to enhance the
# smoothness of the reconstructed data.
spatial_filter = self.depth.config.get("spatial_filter", {})
if spatial_filter:
config.postProcessing.spatialFilter.enable = True
# An in-place heuristic symmetric hole-filling mode applied
# horizontally during the filter passes. Intended to rectify
# minor artifacts with minimal performance impact. Search radius
# for hole filling.
spatial_filter_hole_filling_radius = self.depth.config[
"spatial_filter"
].get("spatial_filter_hole_filling_radius", 3)
config.postProcessing.spatialFilter.holeFillingRadius = (
spatial_filter_hole_filling_radius
)
# Number of iterations over the image in both horizontal and
# vertical direction.
spatial_filter_num_iterations = self.depth.config["spatial_filter"].get(
"spatial_filter_num_iterations", 3
)
config.postProcessing.spatialFilter.numIterations = (
spatial_filter_num_iterations
)
# Minimum pixel brightness. If input pixel is less or equal
# than this value the depth value is invalidated.
brightness_filter_min = self.depth.config.get(
"brightness_filter_min_brightness", 0
)
config.postProcessing.brightnessFilter.minBrightness = brightness_filter_min
# Maximum range in depth units. If input pixel is less or equal
# than this value the depth value is invalidated.
brightness_filter_max = self.depth.config.get(
"brightness_filter_max_brightness", 255
)
config.postProcessing.brightnessFilter.maxBrightness = brightness_filter_max
# Threshold Filter filters out all disparity/depth pixels outside
# the configured min/max threshold values.
threshold_filter_min = self.depth.config.get(
"threshold_filter_min_range",
200, # Defaults to 200mm
)
threshold_filter_max = self.depth.config.get(
"threshold_filter_max_range",
2000, # Defaults to 2m
)
config.postProcessing.thresholdFilter.minRange = threshold_filter_min
config.postProcessing.thresholdFilter.maxRange = threshold_filter_max
# Speckle Filter is used to reduce the speckle noise. Speckle noise
# is a region with huge variance between neighboring disparity/depth
# pixels, and speckle filter tries to filter this region.
speckle_filter = self.depth.config.get("speckle_filter", {})
if speckle_filter:
config.postProcessing.speckleFilter.enable = True
speckle_range = self.depth.config["speckle_filter"].get(
"speckle_range", 4
)
config.postProcessing.speckleFilter.speckleRange = speckle_range
# Decimation Factor will sub-samples the depth map, which means it
# reduces the depth scene complexity and allows other filters to run
# faster.
decimation_filter = self.depth.config.get("decimation_filter", {})
if decimation_filter:
decimation_factor = decimation_filter.get("decimation_factor", 4)
config.postProcessing.decimationFilter.decimationFactor = (
decimation_factor
)
stereo.initialConfig.set(config)
left_camera.out.link(stereo.left)
right_camera.out.link(stereo.right)
stereo.depth.link(depth_out.input)
# Set the flood light and dot projector brightness [0-1500]
ir_laser_dot_projector_brightness = self.depth.config.get(
"ir_laser_dot_projector_brightness", 700
)
self.device.setIrLaserDotProjectorBrightness(
ir_laser_dot_projector_brightness
)
ir_flood_light_brightness = self.depth.config.get(
"ir_flood_light_brightness", 700
)
self.device.setIrFloodLightBrightness(ir_flood_light_brightness)
self.device.startPipeline(self.pipeline)
device_info = self.device.getDeviceInfo()
camera_features = self.device.getConnectedCameraFeatures()
# Create RGB Queue and callbacks for new frames
self.rgb_queue = self.device.getOutputQueue(
name="rgb", maxSize=1, blocking=False
)
self.rgb_queue.addCallback(self.__add_rgb_packet)
# If depth is enabled, create Depth Queue and callbacks for new frames
if self.depth:
self.depth_queue = self.device.getOutputQueue(
name="depth", maxSize=1, blocking=False
)
self.depth_queue.addCallback(self.__add_depth_packet)
return True
$$
and this is v3.2.1:
$$
class OakCameraDevice(camera_device.CameraDevice):
"""OAK CameraDevice implementation."""
OAK_DEPTH_RESOLUTIONS = {
(1280, 720): dai.MonoCameraProperties.SensorResolution.THE_720_P,
(640, 480): dai.MonoCameraProperties.SensorResolution.THE_480_P,
(640, 400): dai.MonoCameraProperties.SensorResolution.THE_400_P,
}
# The keys for this table are WXH@FPS strings, and the values are the
# latency in ms. These values were empirically determined from testing with
# Premio computers. Information provided by luxonis on expected latencies
# that we far exceed: https://docs.luxonis.com/software/depthai/optimizing/
RGB_ONLY_LATENCY_THRESHOLDS = defaultdict(
lambda: 200,
{
"3840x2160@6": 5000,
"1920x1080@30": 800,
"1920x1080@20": 320,
},
)
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize an OakCameraDevice.
Args:
*args: Variable length argument list. Passed to CameraDevice.
**kwargs: Arbitrary keyword arguments. Passed to CameraDevice.
latency_threshold: The maximum latency between camera capture
and host receive time to be allowed.
"""
super().__init__(*args, **kwargs)
if self.depth:
self.latency_threshold = self.depth.config.get("latency_threshold", 400)
self.rgb_latency_threshold = 200
else:
self.rgb_latency_threshold = self.RGB_ONLY_LATENCY_THRESHOLDS[
f"{self.rgb.resolution[0]}x{self.rgb.resolution[1]}@{self.rgb.fps}"
]
self.frame_sync_deque_size = self.rgb.fps
# Deques that store the last N RGB & Depth frames for syncing.
self.rgb_deque = deque([], maxlen=self.frame_sync_deque_size)
self.depth_deque = deque([], maxlen=self.frame_sync_deque_size)
self.left_mono_deque = deque([], maxlen=self.frame_sync_deque_size)
self.right_mono_deque = deque([], maxlen=self.frame_sync_deque_size)
self.is_wide = kwargs.get("is_wide", False)
self.last_seq_num_returned = None
self.pipeline = None
def start(self) -> bool:
"""Start the OAK camera pipeline.
Returns:
True if the pipeline was started successfully.
"""
# Create pipeline first
self.pipeline = dai.Pipeline()
# Get relevant RGB configurations
camera_exposure_us = self.rgb.config.get("camera_exposure_us", 1000)
camera_iso_sensitivity = self.rgb.config.get("camera_iso_sensitivity", 500)
if not self.rgb:
logging.error(
f"OakCameraDevice[{self.camera_index}]"
" requires an RGB camera configuration"
)
return False
# Create color camera using new Camera node
rgb_camera = self.pipeline.create(dai.node.Camera)
# Set camera controls before building
rgb_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
rgb_camera.initialControl.setSharpness(0)
rgb_camera.initialControl.setLumaDenoise(0)
rgb_camera.initialControl.setChromaDenoise(0)
# Build the camera with the board socket FIRST
# Note: build() must be called BEFORE requestOutput()
rgb_camera.build(dai.CameraBoardSocket.RGB)
# RGB video encoding node
encoder = self.pipeline.create(dai.node.VideoEncoder)
encoder.setDefaultProfilePreset(1, dai.VideoEncoderProperties.Profile.MJPEG)
if self.is_wide:
# Request RGB output in RGB888 format for warping
rgb_output_rgb888 = rgb_camera.requestOutput(
size=tuple(self.rgb.resolution),
type=dai.ImgFrame.Type.RGB888p,
fps=self.rgb.fps,
)
warp_node = self.pipeline.create(dai.node.Warp)
warp_node.setOutputSize(tuple(self.rgb.resolution))
max_frame_size = self.rgb.resolution[0] * self.rgb.resolution[1] * 3
warp_node.setMaxOutputFrameSize(max_frame_size)
warp_node.setInterpolation(dai.Interpolation.BILINEAR)
# Convert warped output to NV12 for encoder
manip_post_warp = self.pipeline.create(dai.node.ImageManip)
manip_post_warp.initialConfig.setFrameType(dai.ImgFrame.Type.NV12)
manip_post_warp.setMaxOutputFrameSize(
self.rgb.resolution[0] * self.rgb.resolution[1] * 3 // 2
)
rgb_output_rgb888.link(warp_node.inputImage)
warp_node.out.link(manip_post_warp.inputImage)
manip_post_warp.out.link(encoder.input)
else:
# Request RGB output in NV12 format for encoder
rgb_output_nv12 = rgb_camera.requestOutput(
size=tuple(self.rgb.resolution),
type=dai.ImgFrame.Type.NV12,
fps=self.rgb.fps,
)
rgb_output_nv12.link(encoder.input)
# If depth is enabled, add a stereo depth node to the pipeline
if self.depth:
brightness = self.depth.config.get("brightness", 0)
contrast = self.depth.config.get("contrast", 0)
camera_exposure_us = self.depth.config.get("camera_exposure_us")
camera_iso_sensitivity = self.depth.config.get("camera_iso_sensitivity")
spatial_filter_hole_filling_radius = self.depth.config.get(
"spatial_filter_hole_filling_radius", 3
)
spatial_filter_num_iterations = self.depth.config.get(
"spatial_filter_num_iterations", 3
)
if self.depth.resolution in self.OAK_DEPTH_RESOLUTIONS:
depth_camera_resolution = self.OAK_DEPTH_RESOLUTIONS[
self.depth.resolution
]
else:
logging.warn(
f"Unsupported depth resolution {self.depth.resolution}."
" Defaulting to 400p."
)
depth_camera_resolution = (
dai.MonoCameraProperties.SensorResolution.THE_400_P
)
white_balance = self.depth.config.get("white_balance")
# Create left/right cameras using Camera node (v3 API)
# MonoCamera is deprecated in v3, use Camera node instead
left_camera = self.pipeline.create(dai.node.Camera)
left_camera.initialControl.setBrightness(brightness)
left_camera.initialControl.setContrast(contrast)
if camera_exposure_us and camera_iso_sensitivity:
left_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
if white_balance:
left_camera.initialControl.setManualWhiteBalance(white_balance)
# Build with LEFT socket
left_camera.build(dai.CameraBoardSocket.LEFT)
# Request output at the depth resolution
left_output = left_camera.requestOutput(
size=self.depth.resolution,
type=dai.ImgFrame.Type.GRAY8,
fps=self.depth.fps,
)
right_camera = self.pipeline.create(dai.node.Camera)
right_camera.initialControl.setBrightness(brightness)
right_camera.initialControl.setContrast(contrast)
if camera_exposure_us and camera_iso_sensitivity:
right_camera.initialControl.setManualExposure(
camera_exposure_us, camera_iso_sensitivity
)
if white_balance:
right_camera.initialControl.setManualWhiteBalance(white_balance)
# Build with RIGHT socket
right_camera.build(dai.CameraBoardSocket.RIGHT)
# Request output at the depth resolution
right_output = right_camera.requestOutput(
size=self.depth.resolution,
type=dai.ImgFrame.Type.GRAY8,
fps=self.depth.fps,
)
# Create a Stereo node that will produce the depth map
stereo = self.pipeline.create(dai.node.StereoDepth)
disparity_shift = self.depth.config.get("disparity_shift", 0)
stereo.initialConfig.setDisparityShift(disparity_shift)
# This is a non-edge preserving Median filter, which can be used to
# reduce noise and smoothen the depth map. Median filter is
# implemented in hardware, so it's the fastest filter.
median_filter_str = self.depth.config.get("median_filter", "KERNEL_7x7")
if median_filter_str == "KERNEL_3x3":
median_filter = dai.MedianFilter.KERNEL_3x3
elif median_filter_str == "KERNEL_5x5":
median_filter = dai.MedianFilter.KERNEL_5x5
elif median_filter_str == "KERNEL_7x7":
median_filter = dai.MedianFilter.KERNEL_7x7
else:
median_filter = dai.MedianFilter.MEDIAN_OFF
stereo.initialConfig.setMedianFilter(median_filter)
# Computes and combines disparities in both L-R and R-L directions,
# and combine them. For better occlusion handling, discarding
# invalid disparity values
left_right_check = self.depth.config.get("left_right_check", True)
stereo.setLeftRightCheck(left_right_check)
# Disparity range increased from 0-95 to 0-190, combined from full
# resolution and downscaled images. Suitable for short range
# objects. We always set it to True.
extended_disparity = self.depth.config.get("extended_disparity", True)
stereo.setExtendedDisparity(extended_disparity)
# Subpixel mode improves the precision and is especially useful for
# long-range measurements. We always set it to False.
subpixel = self.depth.config.get("subpixel", False)
stereo.setSubpixel(subpixel)
# Align the depth map to the RGB image
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)
# Set output size to match RGB resolution (must be multiple of 16)
# Round down to nearest multiple of 16 if needed
depth_width = (self.rgb.resolution[0] // 16) * 16
depth_height = (self.rgb.resolution[1] // 16) * 16
stereo.setOutputSize(depth_width, depth_height)
# Log warning if RGB resolution is not a multiple of 16
if (
depth_width != self.rgb.resolution[0]
or depth_height != self.rgb.resolution[1]
):
logging.warning(
f"RGB resolution {self.rgb.resolution} is not a multiple of 16. "
f"Depth output will be {depth_width}x{depth_height}. "
f"This may cause pixel-to-meter calibration mismatches. "
f"Consider using a resolution that's a multiple of 16 "
f"(e.g., 1280x720, 1920x1080)."
)
# Set depth post-processing
# In v3, initialConfig is already the config object, no need for .get()
# Spatial Edge-Preserving Filter will fill invalid depth pixels with
# valid neighboring depth pixels. It performs a series of 1D
# horizontal and vertical passes or iterations, to enhance the
# smoothness of the reconstructed data.
spatial_filter = self.depth.config.get("spatial_filter", {})
if spatial_filter:
stereo.initialConfig.postProcessing.spatialFilter.enable = True
# An in-place heuristic symmetric hole-filling mode applied
# horizontally during the filter passes. Intended to rectify
# minor artifacts with minimal performance impact. Search radius
# for hole filling.
spatial_filter_hole_filling_radius = self.depth.config[
"spatial_filter"
].get("spatial_filter_hole_filling_radius", 3)
stereo.initialConfig.postProcessing.spatialFilter.holeFillingRadius = (
spatial_filter_hole_filling_radius
)
# Number of iterations over the image in both horizontal and
# vertical direction.
spatial_filter_num_iterations = self.depth.config["spatial_filter"].get(
"spatial_filter_num_iterations", 3
)
stereo.initialConfig.postProcessing.spatialFilter.numIterations = (
spatial_filter_num_iterations
)
# Minimum pixel brightness. If input pixel is less or equal
# than this value the depth value is invalidated.
brightness_filter_min = self.depth.config.get(
"brightness_filter_min_brightness", 0
)
stereo.initialConfig.postProcessing.brightnessFilter.minBrightness = (
brightness_filter_min
)
# Maximum range in depth units. If input pixel is less or equal
# than this value the depth value is invalidated.
brightness_filter_max = self.depth.config.get(
"brightness_filter_max_brightness", 255
)
stereo.initialConfig.postProcessing.brightnessFilter.maxBrightness = (
brightness_filter_max
)
# Threshold Filter filters out all disparity/depth pixels outside
# the configured min/max threshold values.
threshold_filter_min = self.depth.config.get(
"threshold_filter_min_range",
200, # Defaults to 200mm
)
threshold_filter_max = self.depth.config.get(
"threshold_filter_max_range",
2000, # Defaults to 2m
)
stereo.initialConfig.postProcessing.thresholdFilter.minRange = (
threshold_filter_min
)
stereo.initialConfig.postProcessing.thresholdFilter.maxRange = (
threshold_filter_max
)
# Speckle Filter is used to reduce the speckle noise. Speckle noise
# is a region with huge variance between neighboring disparity/depth
# pixels, and speckle filter tries to filter this region.
speckle_filter = self.depth.config.get("speckle_filter", {})
if speckle_filter:
stereo.initialConfig.postProcessing.speckleFilter.enable = True
speckle_range = self.depth.config["speckle_filter"].get(
"speckle_range", 4
)
stereo.initialConfig.postProcessing.speckleFilter.speckleRange = (
speckle_range
)
# Decimation Factor will sub-samples the depth map, which means it
# reduces the depth scene complexity and allows other filters to run
# faster.
decimation_filter = self.depth.config.get("decimation_filter", {})
if decimation_filter:
decimation_factor = decimation_filter.get("decimation_factor", 4)
stereo.initialConfig.postProcessing.decimationFilter.decimationFactor = decimation_factor # noqa: E501
left_output.link(stereo.left)
right_output.link(stereo.right)
# Create output queues directly from nodes (v3 API - no XLink nodes needed)
self.rgb_queue = encoder.bitstream.createOutputQueue(maxSize=1, blocking=False)
self.rgb_queue.addCallback(self.__add_rgb_packet)
# If depth is enabled, create Depth Queue and callbacks for new frames
if self.depth:
self.depth_queue = stereo.depth.createOutputQueue(maxSize=1, blocking=False)
self.depth_queue.addCallback(self.__add_depth_packet)
# Start the pipeline (v3 API)
self.pipeline.start()
# Handle wide angle mesh after pipeline is started
if self.is_wide:
device = self.pipeline.getDefaultDevice()
# For v3 Camera node, we need to get ISP size differently
# Since we requested a specific output size, we use that
isp_size = tuple(self.rgb.resolution)
mesh, meshWidth, meshHeight = self.get_mesh(
device.readCalibration(), isp_size
)
warp_node.setWarpMesh(mesh, meshWidth, meshHeight)
if self.depth:
ir_laser_dot_projector_brightness = self.depth.config.get(
"ir_laser_dot_projector_brightness", 700
)
ir_flood_light_brightness = self.depth.config.get(
"ir_flood_light_brightness", 700
)
# Note: v3 API expects intensity values in range [0.0, 1.0], but config
# values are in range [0, 1500]. Division by 1500.0 normalizes the config
# values.
self.pipeline.getDefaultDevice().setIrLaserDotProjectorIntensity(
ir_laser_dot_projector_brightness / 1500.0
)
self.pipeline.getDefaultDevice().setIrFloodLightIntensity(
ir_flood_light_brightness / 1500.0
)
device = self.pipeline.getDefaultDevice()
device_info = device.getDeviceInfo()
camera_features = device.getConnectedCameraFeatures()
return True
def __add_rgb_packet(self, packet: Any) -> None:
"""Add a packet to the RGB deque.
Args:
packet: the packet to add to the deque
"""
latency_ms = (dai.Clock.now() - packet.getTimestamp()).total_seconds() * 1000
if latency_ms > self.rgb_latency_threshold:
logging.info(
f"RGB Latency: {latency_ms:.1f} > {self.rgb_latency_threshold}"
)
decoded_rgb_image = cv2.imdecode(packet.getData(), cv2.IMREAD_UNCHANGED)
if decoded_rgb_image is None:
return
decoded_rgb_image = cv2.cvtColor(decoded_rgb_image, cv2.COLOR_BGR2RGB)
packet.setFrame(decoded_rgb_image)
self.rgb_deque.append(packet)
def __add_depth_packet(self, packet: Any) -> None:
"""Add a packet to the depth deque.
Args:
packet: the packet to add to the deque
"""
latency_ms = (dai.Clock.now() - packet.getTimestamp()).total_seconds() * 1000
if latency_ms > 200:
logging.info(f"DEPTH Latency: {latency_ms:.1f}")
self.depth_deque.append(packet)
def __add_left_mono_packet(self, packet: Any) -> None:
"""Add a packet to the depth deque.
Args:
packet: the packet to add to the deque
"""
self.left_mono_deque.append(packet)
def __add_right_mono_packet(self, packet: Any) -> None:
"""Add a packet to the depth deque.
Args:
packet: the packet to add to the deque
"""
self.right_mono_deque.append(packet)
def get_frame(self) -> Optional["capture.CameraCapture"]:
"""Retrieve a frame from the camera device.
Returns:
A CameraCapture object containing the frame and metadata.
Raises:
Exception: If the depth packet latency is too high.
"""
best_rgb_packet = None
best_depth_packet = None
best_depth_packet_position = None
if self.depth:
if self.rgb.fps != self.depth.fps:
logging.error(
f"RGB FPS ({self.rgb.fps}) does not match Depth FPS "
"({self.depth.fps}). Both FPS need to match to align "
"frames."
)
# Find the matching RGB-Depth using sequence number
# https://docs.luxonis.com/hardware/platform/deploy/frame-sync#Sequence%20number%20syncing
depth_list = reversed(list(self.depth_deque))
rgb_list = reversed(list(self.rgb_deque))
for depth_pos, depth_packet in enumerate(depth_list):
depth_seqnum = depth_packet.getSequenceNum()
for rgb_packet in rgb_list:
rgb_seqnum = rgb_packet.getSequenceNum()
if depth_seqnum == rgb_seqnum:
best_rgb_packet = rgb_packet
best_depth_packet = depth_packet
best_depth_packet_position = depth_pos
break
if best_rgb_packet:
break
else:
# If depth is not enabled, return the latest RGB frame
if len(self.rgb_deque) > 0:
best_rgb_packet = list(self.rgb_deque)[-1]
if best_rgb_packet is None:
return None
latency_ms = (
dai.Clock.now() - best_rgb_packet.getTimestamp()
).total_seconds() * 1000
# Only check latency on robots (where the depth stream is enabled).
if self.depth and latency_ms > self.latency_threshold:
logging.warning(
f"OakCameraDevice[{self.camera_index}]"
f" depth packet latency is {latency_ms}ms"
)
raise Exception("Depth packet latency is too high")
camera_capture = capture.CameraCapture()
# According to the documentation, the way to measure latency between
# camera capture and the image being received by the host is to do
# dai.Clock.now() - packet.getTimestamp().
# https://docs.luxonis.com/projects/api/en/latest/samples/host_side/latency_measurement/
# So to get the capture timestamp, we subtract the RGB latency from
# the current time.
camera_capture.timestamp = (
time.time()
- (dai.Clock.now() - best_rgb_packet.getTimestamp()).total_seconds()
)
camera_capture.device = self
rgb_frame = best_rgb_packet.getFrame().reshape(
self.rgb.resolution[1], self.rgb.resolution[0], 3
)
camera_capture.rgb_frame = rgb_frame
if self.depth:
try:
camera_capture.z_frame = best_depth_packet.getFrame()
except Exception as e:
logging.debug(
f"Failed to get depth frame from OakCameraDevice: {e}",
stack_info=True,
)
return camera_capture
def get_mesh(
self, calibData: dai.CalibrationHandler, ispSize: Tuple[int, int]
) -> Tuple:
"""Get mesh to correct distortion.
Adapted from:
https://github.com/luxonis/depthai-python/blob/
develop/examples/ColorCamera/rgb_undistort.py
Args:
calibData: the calibration data from the camera
ispSize: isp resolution
Returns:
Tuple containing the mesh, mesh width, and mesh height.
"""
camSocket = dai.CameraBoardSocket.RGB
M1 = np.array(calibData.getCameraIntrinsics(camSocket, ispSize[0], ispSize[1]))
d1 = np.array(calibData.getDistortionCoefficients(camSocket))
R1 = np.identity(3)
mapX, mapY = cv2.initUndistortRectifyMap(M1, d1, R1, M1, ispSize, cv2.CV_32FC1)
meshCellSize = 16
mesh0 = []
# Creates subsampled mesh which will be loaded on
# the device to undistort the image
for y in range(mapX.shape[0] + 1): # iterating over height of the image
if y % meshCellSize == 0:
rowLeft = []
for x in range(mapX.shape[1]): # iterating over width of the image
if x % meshCellSize == 0:
if y == mapX.shape[0] and x == mapX.shape[1]:
rowLeft.append(mapX[y - 1, x - 1])
rowLeft.append(mapY[y - 1, x - 1])
elif y == mapX.shape[0]:
rowLeft.append(mapX[y - 1, x])
rowLeft.append(mapY[y - 1, x])
elif x == mapX.shape[1]:
rowLeft.append(mapX[y, x - 1])
rowLeft.append(mapY[y, x - 1])
else:
rowLeft.append(mapX[y, x])
rowLeft.append(mapY[y, x])
if (mapX.shape[1] % meshCellSize) % 2 != 0:
rowLeft.append(0)
rowLeft.append(0)
mesh0.append(rowLeft)
mesh0 = np.array(mesh0)
meshWidth = mesh0.shape[1] // 2
meshHeight = mesh0.shape[0]
mesh0.resize(meshWidth * meshHeight, 2)
mesh = list(map(tuple, mesh0))
return mesh, meshWidth, meshHeight
def stop(self) -> None:
"""Stop the camera device."""
try:
if self.pipeline:
# Stop pipeline
self.pipeline.stop()
# Delete pipeline
del self.pipeline
self.pipeline = None
except Exception as e:
logging.warning(f"Failed to stop the camera device: {e}", stack_info=True)
$$