jakaskerl
Here is the full code. I am debating on calibrating the camera again, because my issue could just be bad calibration. However, I followed the calibration instructions thoroughly and I was given the results once done.
:
import os
import time
import json
import cv2
import depthai as dai
import numpy as np
from datetime import timedelta
from numba import jit, prange
# Function definitions remain the same as before
# ...
@jit(nopython=True, parallel=True)
def reprojection(depth_image, depth_camera_intrinsics, camera_extrinsics, color_camera_intrinsics, depth_image_show = None):
height = len(depth_image)
width = len(depth_image[0])
if depth_image_show is not None:
image = np.zeros((height, width), np.uint8)
else:
image = np.zeros((height, width), np.uint16)
if(camera_extrinsics[0][3] > 0):
sign = 1
else:
sign = -1
for i in prange(0, height):
for j in prange(0, width):
if sign == 1:
# Reverse the order of the pixels
j = width - j - 1
d = depth_image[i][j]
if(d == 0):
continue
# Convert pixel to 3d point
x = (j - depth_camera_intrinsics[0][2]) \* d / depth_camera_intrinsics[0][0]
y = (i - depth_camera_intrinsics[1][2]) \* d / depth_camera_intrinsics[1][1]
z = d
# Move the point to the camera frame
x1 = camera_extrinsics[0][0] \* x + camera_extrinsics[0][1] \* y + camera_extrinsics[0][2] \* z + camera_extrinsics[0][3]
y1 = camera_extrinsics[1][0] \* x + camera_extrinsics[1][1] \* y + camera_extrinsics[1][2] \* z + camera_extrinsics[1][3]
z1 = camera_extrinsics[2][0] \* x + camera_extrinsics[2][1] \* y + camera_extrinsics[2][2] \* z + camera_extrinsics[2][3]
u = color_camera_intrinsics[0][0] \* (x1 / z1) + color_camera_intrinsics[0][2]
v = color_camera_intrinsics[1][1] \* (y1 / z1) + color_camera_intrinsics[1][2]
int_u = round(u)
int_v = round(v)
if(int_v != i):
print(f'v -> {v} and i -> {i}') # This should never be printed
if int_u >= 0 and int_u < (len(image[0]) - 1) and int_v >= 0 and int_v < len(image):
if depth_image_show is not None:
image[int_v][int_u] = depth_image_show[i][j][0]
image[int_v][int_u + sign] = depth_image_show[i][j][0]
else:
image[int_v][int_u] = z1
image[int_v][int_u + sign] = z1
return image
def create_pipeline(ALIGN_SOCKET):
pipeline = dai.Pipeline()
# Create ToF node
tof = pipeline.create(dai.node.ToF)
tof.setNumShaves(4)
# Configure the ToF node
tofConfig = tof.initialConfig.get()
tofConfig.enableFPPNCorrection = True
tofConfig.enableOpticalCorrection = True
tofConfig.enableWiggleCorrection = True
tofConfig.enableTemperatureCorrection = True
tofConfig.phaseUnwrappingLevel = 4
tof.initialConfig.set(tofConfig)
# Input for ToF configuration
xinTofConfig = pipeline.create(dai.node.XLinkIn)
xinTofConfig.setStreamName("tofConfig")
xinTofConfig.out.link(tof.inputConfig)
# Create ToF camera node
cam_tof = pipeline.create(dai.node.Camera)
cam_tof.setFps(20)
cam_tof.setImageOrientation(dai.CameraImageOrientation.ROTATE_180_DEG)
cam_tof.setBoardSocket(dai.CameraBoardSocket.CAM_A)
cam_tof.raw.link(tof.input)
# Create ColorCamera nodes for stereo pair
colorLeft = pipeline.create(dai.node.ColorCamera)
colorRight = pipeline.create(dai.node.ColorCamera)
# Configure ColorCameras
colorLeft.setBoardSocket(dai.CameraBoardSocket.CAM_B)
colorRight.setBoardSocket(dai.CameraBoardSocket.CAM_C)
colorLeft.setResolution(dai.ColorCameraProperties.SensorResolution.THE_800_P)
colorRight.setResolution(dai.ColorCameraProperties.SensorResolution.THE_800_P)
colorLeft.setFps(20)
colorRight.setFps(20)
colorLeft.setInterleaved(False)
colorRight.setInterleaved(False)
colorLeft.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
colorRight.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
#colorLeft.setImageOrientation(dai.CameraImageOrientation.NORMAL)
#colorRight.setImageOrientation(dai.CameraImageOrientation.NORMAL)
#colorLeft.setIspScale(2, 2) # Corrected line
#colorRight.setIspScale(2, 1) # Corrected line
# Create StereoDepth node
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setMedianFilter(dai.MedianFilter.MEDIAN_OFF)
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(False)
stereo.setSubpixel(False)
stereo.setDepthAlign(ALIGN_SOCKET)
# Link the RAW outputs of the ColorCameras to the StereoDepth node
colorLeft.isp.link(stereo.left)
colorRight.isp.link(stereo.right)
# Create Sync node
sync = pipeline.create(dai.node.Sync)
sync.setSyncThreshold(timedelta(milliseconds=50))
# Link outputs to Sync node with specified input names
tof.depth.link(sync.inputs["depth_tof"])
stereo.depth.link(sync.inputs["depth_stereo"])
stereo.rectifiedLeft.link(sync.inputs["left_img"])
stereo.rectifiedRight.link(sync.inputs["right_img"])
colorLeft.isp.link(sync.inputs["rgb_img"]) # Corrected line
# Create XLinkOut node
xout = pipeline.create(dai.node.XLinkOut)
xout.setStreamName("sync_out")
sync.out.link(xout.input)
return pipeline
def get_calib(RGB_SOCKET, ALIGN_SOCKET, depthSize, rgbSize):
calibData = device.readCalibration2()
M1 = np.array(calibData.getCameraIntrinsics(ALIGN_SOCKET, \*depthSize))
D1 = np.array(calibData.getDistortionCoefficients(ALIGN_SOCKET))
M2 = np.array(calibData.getCameraIntrinsics(RGB_SOCKET, \*rgbSize))
D2 = np.array(calibData.getDistortionCoefficients(RGB_SOCKET))
try:
T = (
np.array(calibData.getCameraTranslationVector(ALIGN_SOCKET, RGB_SOCKET, False))
\* 10
) # to mm for matching the depth
except:
T = np.array([0.0, 0.0, 0.001])
try:
R = np.array(calibData.getCameraExtrinsics(ALIGN_SOCKET, RGB_SOCKET, False))[
0:3, 0:3
]
except:
R = np.eye(3)
TARGET_MATRIX = M1
lensPosition = calibData.getLensPosition(RGB_SOCKET)
return M1, D1, M2, D2, T, R, TARGET_MATRIX
def getAlignedDepth(frameDepth, M1, D1, M2, D2, T, R, TARGET_MATRIX, depthSize,rgbSize):
R1, R2, _, _, _, _, _ = cv2.stereoRectify(M1, D1, M2, D2, (100, 100), R, T) # The (100,100) doesn't matter as it is not used for calculating the rotation matrices
leftMapX, leftMapY = cv2.initUndistortRectifyMap(M1, None, R1, TARGET_MATRIX, depthSize, cv2.CV_32FC1)
depthRect = cv2.remap(frameDepth, leftMapX, leftMapY, cv2.INTER_NEAREST)
newR = np.dot(R2, np.dot(R, R1.T)) # Should be very close to identity
newT = np.dot(R2, T)
combinedExtrinsics = np.eye(4)
combinedExtrinsics[0:3, 0:3] = newR
combinedExtrinsics[0:3, 3] = newT
depthAligned = reprojection(depthRect, TARGET_MATRIX, combinedExtrinsics, TARGET_MATRIX)
# Rotate the depth to the RGB frame
R_back = R2.T
mapX, mapY = cv2.initUndistortRectifyMap(TARGET_MATRIX, None, R_back, M2, rgbSize, cv2.CV_32FC1)
outputAligned = cv2.remap(depthAligned, mapX, mapY, cv2.INTER_NEAREST)
return outputAligned
MIN_DEPTH = 500 # mm
MAX_DEPTH = 10000 # mm
def colorizeDepth(frameDepth, minDepth=MIN_DEPTH, maxDepth=MAX_DEPTH):
invalidMask = frameDepth == 0
# Log the depth, minDepth and maxDepth
logDepth = np.log(frameDepth, where=frameDepth != 0)
logMinDepth = np.log(minDepth)
logMaxDepth = np.log(maxDepth)
depthFrameColor = np.interp(logDepth, (logMinDepth, logMaxDepth), (0, 255)).astype(
np.uint8
)
depthFrameColor = cv2.applyColorMap(depthFrameColor, cv2.COLORMAP_JET)
# Set invalid depth pixels to black
depthFrameColor[invalidMask] = 0
return depthFrameColor
RGB_SOCKET = dai.CameraBoardSocket.RGB
TOF_SOCKET = dai.CameraBoardSocket.CAM_A
LEFT_SOCKET = dai.CameraBoardSocket.LEFT
RIGHT_SOCKET = dai.CameraBoardSocket.RIGHT
ALIGN_SOCKET = RIGHT_SOCKET
COLOR_RESOLUTION = dai.ColorCameraProperties.SensorResolution.THE_1080_P
LEFT_RIGHT_RESOLUTION = dai.MonoCameraProperties.SensorResolution.THE_800_P
toFSize = (640, 480)
rgbSize = (1280, 800)
rgbWeight = 0.4
depthWeight = 0.6
def updateBlendWeights(percent_rgb):
global depthWeight
global rgbWeight
rgbWeight = float(percent_rgb) / 100.0
depthWeight = 1.0 - rgbWeight
if name == 'main':
pipeline = create_pipeline(ALIGN_SOCKET)
rgb_depth_window_name = "rgb-depth"
with dai.Device(pipeline) as device:
cv2.namedWindow(rgb_depth_window_name)
cv2.createTrackbar(
"RGB Weight %",
rgb_depth_window_name,
int(rgbWeight \* 100),
100,
updateBlendWeights,
)
# Create output queue
q_sync = device.getOutputQueue(name="sync_out", maxSize=4, blocking=False)
# Only retrieve calibration data if essential
try:
M1, D1, M2, D2, T, R, TARGET_MATRIX = get_calib(RGB_SOCKET, ALIGN_SOCKET, toFSize, rgbSize)
except:
raise
# Create timestamped directory
timestamp = time.strftime("%Y%m%d_%H%M%S")
save_dir = os.path.join("data", timestamp)
os.makedirs(save_dir, exist_ok=True)
frame_counter = 0
while True:
msgGrp = q_sync.get()
frames = {}
for name, msg in msgGrp:
frames[name] = msg.getCvFrame()
if len(frames) == 5:
# Process frames for display and save only when necessary
depth_tof_frame = frames['depth_tof']
depth_stereo_frame = frames['depth_stereo']
left_frame = frames['left_img']
right_frame = frames['right_img']
rgb_frame = frames['rgb_img']
# Save frames - comment out if saving is not needed to reduce lag
# depth_tof_filename = os.path.join(save_dir, f"depth_tof_{frame_counter:06d}.npy")
# depth_stereo_filename = os.path.join(save_dir, f"depth_stereo_{frame_counter:06d}.npy")
# left_filename = os.path.join(save_dir, f"left_img_{frame_counter:06d}.png")
# right_filename = os.path.join(save_dir, f"right_img_{frame_counter:06d}.png")
# rgb_filename = os.path.join(save_dir, f"rgb_img_{frame_counter:06d}.png")
# np.save(depth_tof_filename, depth_tof_frame)
# np.save(depth_stereo_filename, depth_stereo_frame)
# cv2.imwrite(left_filename, left_frame)
# cv2.imwrite(right_filename, right_frame)
# cv2.imwrite(rgb_filename, rgb_frame)
# Optional: Commenting out displays to reduce lag
# depth_tof_display = cv2.normalize(depth_tof_frame, None, 0, 255, cv2.NORM_MINMAX)
# depth_tof_display = np.uint8(depth_tof_display)
# depth_tof_display = cv2.applyColorMap(depth_tof_display, cv2.COLORMAP_JET)
# cv2.imshow("Depth ToF", depth_tof_display)
# depth_stereo_display = cv2.normalize(depth_stereo_frame, None, 0, 255, cv2.NORM_MINMAX)
# depth_stereo_display = np.uint8(depth_stereo_display)
# depth_stereo_display = cv2.applyColorMap(depth_stereo_display, cv2.COLORMAP_JET)
# cv2.imshow("Depth Stereo", depth_stereo_display)
# cv2.imshow("Left Image", left_frame)
# cv2.imshow("Right Image", right_frame)
# cv2.imshow("RGB Image", rgb_frame)
# Optional: Align depth with RGB frame (comment out if alignment is not essential)
alignedDepth = getAlignedDepth(depth_tof_frame, M1, D1, M2, D2, T, R, TARGET_MATRIX, toFSize, rgbSize)
alignedDepthColorized = colorizeDepth(alignedDepth)
# Blending RGB and Depth if visualization is needed
blended = cv2.addWeighted(rgb_frame, rgbWeight, alignedDepthColorized, depthWeight, 0)
cv2.imshow(rgb_depth_window_name, blended)
# Exit condition
if cv2.waitKey(1) == ord('q'):
break
frame_counter += 1 # Update frame count if saving or referencing frames
device.close()
print('Data collection complete.')