yishu_corpex
"""
Due to an issue with our calibration, you might receive the following error when running this script on early release OAK Thermal devices:
```bash
[ImageAlign(4)] [error] Failed to get calibration data: Extrinsic connection between the requested cameraId's doesn't exist. Please recalibrate or modify your calibration data
If this happens, please download the calibration data + script from https://drive.google.com/drive/folders/1Q_MZMqWMKDC1eOqVHGPeDO-NJgFmnY5U,
place them into the same folder, connect the camera to the computer and run the script. This will update the
calibration and add required extrinsics between the camera sensors.
"""
#import cv2.aruco as aruco
import cv2
import depthai as dai
import numpy as np
import time
import json
from datetime import timedelta
class CharucoBoard:
def __init__(
self,
squaresX=16,
squaresY=9,
squareSize=1.0,
markerSize=0.8,
dictSize=cv2.aruco.DICT_4X4_1000,
):
"""Charuco board configuration used in a captured dataset
Args:
squaresX (int, optional): Number of squares horizontally. Defaults to 16.
squaresY (int, optional): Number of squares vertically. Defaults to 9.
squareSize (float, optional): Length of the side of one square (cm). Defaults to 1.0.
markerSize (float, optional): Length of the side of one marker (cm). Defaults to 0.8.
dictSize (_type_, optional): cv2 aruco dictionary size. Defaults to cv2.aruco.DICT_4X4_1000.
"""
self.squaresX = squaresX
self.squaresY = squaresY
self.squareSize = squareSize
self.markerSize = markerSize
self.dictSize = dictSize
def __getstate__(
self,
): # Magic to allow pickling the instance without pickling the cv2 dictionary
state = self.__dict__.copy()
for hidden in ["_board", "_dictionary"]:
if hidden in state:
del state[hidden]
return state
def __build(self):
self._dictionary = aruco.Dictionary_get(self.dictSize)
self._board = aruco.CharucoBoard_create(
self.squaresX,
self.squaresY,
self.squareSize,
self.markerSize,
self._dictionary,
)
@property
def dictionary(self):
"""cv2 aruco dictionary"""
if not hasattr(self, "_dictionary"):
self.__build()
return self._dictionary
@property
def board(self):
"""cv2 aruco board"""
if not hasattr(self, "_board"):
self.__build()
return self._board
def detect_charuco_board(image: np.array, board: CharucoBoard):
arucoParams = cv2.aruco.DetectorParameters_create()
arucoParams.minMarkerDistanceRate = 0.01
markers, marker_ids, rejectedImgPoints = cv2.aruco.detectMarkers(
image, board.dictionary, parameters=arucoParams
) # First, detect markers
marker_corners, marker_ids, refusd, recoverd = (
cv2.aruco.refineDetectedMarkers(
image,
board.board,
markers,
marker_ids,
rejectedCorners=rejectedImgPoints,
)
)
criteria = (
cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER,
10000,
0.00001,
)
# If found, add object points, image points (after refining them)
return marker_corners, []
FPS = 25.0
RGB_SOCKET = dai.CameraBoardSocket.CAM_A
COLOR_RESOLUTION = dai.ColorCameraProperties.SensorResolution.THE_1080_P
class FPSCounter:
def __init__(self):
self.frameTimes = []
def tick(self):
now = time.time()
self.frameTimes.append(now)
self.frameTimes = self.frameTimes[-100:]
def getFps(self):
if len(self.frameTimes) <= 1:
return 0
# Calculate the FPS
return (len(self.frameTimes) - 1) / (
self.frameTimes[-1] - self.frameTimes[0]
)
#device = dai.Device("10.11.1.239")
#device = dai.Device("10.11.1.206")
device = dai.Device("10.11.1.215")
print('mxid', device.getMxId())
print(f"device = {device.getDeviceName()}")
thermalWidth, thermalHeight = -1, -1
thermalFound = False
for features in device.getConnectedCameraFeatures():
if dai.CameraSensorType.THERMAL in features.supportedTypes:
thermalFound = True
thermalSocket = features.socket
thermalWidth, thermalHeight = features.width, features.height
break
if not thermalFound:
raise RuntimeError("No thermal camera found!")
ISP_SCALE = 3
calibrationHandler = device.readCalibration()
with open("calibration.json", "w") as fp:
json.dump(calibrationHandler.eepromToJson(), fp, indent=2)
rgbDistortion = calibrationHandler.getDistortionCoefficients(RGB_SOCKET)
thermalDistortion = calibrationHandler.getDistortionCoefficients(thermalSocket)
distortionModel = calibrationHandler.getDistortionModel(RGB_SOCKET)
if distortionModel != dai.CameraModel.Perspective:
raise RuntimeError(
"Unsupported distortion model for RGB camera. This example supports only Perspective model."
)
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
thermalCam = pipeline.create(dai.node.Camera)
thermalCam.setBoardSocket(thermalSocket)
thermalCam.setFps(FPS)
sync = pipeline.create(dai.node.Sync)
out = pipeline.create(dai.node.XLinkOut)
align = pipeline.create(dai.node.ImageAlign)
cfgIn = pipeline.create(dai.node.XLinkIn)
camRgb.setBoardSocket(RGB_SOCKET)
camRgb.setResolution(COLOR_RESOLUTION)
camRgb.setFps(FPS)
camRgb.setIspScale(1, ISP_SCALE)
out.setStreamName("out")
sync.setSyncThreshold(timedelta(seconds=0.5 / FPS))
cfgIn.setStreamName("config")
cfg = align.initialConfig.get()
staticDepthPlane = cfg.staticDepthPlane
offsetTX = 0
offsetTY = 0
offsetTZ = 0
# Linking
align.outputAligned.link(sync.inputs["aligned"])
camRgb.isp.link(sync.inputs["rgb"])
camRgb.isp.link(align.inputAlignTo)
thermalCam.raw.link(align.input)
thermalCam.raw.link(sync.inputs["thermal"])
sync.out.link(out.input)
cfgIn.out.link(align.inputConfig)
rgbWeight = 0.4
thermalWeight = 0.6
import numpy as np
from numba import jit, prange
import math
@jit(nopython=True, parallel=True)
def reprojection(
thermalFrame,
depth_image,
depth_camera_intrinsics,
camera_extrinsics,
color_camera_intrinsics,
depth_image_show=None,
):
height = len(depth_image)
width = len(depth_image[0])
if depth_image_show is not None:
image = np.zeros((height, width), np.uint8)
else:
image = np.zeros((height, width), np.uint16)
if camera_extrinsics[0][3] > 0:
sign = 1
else:
sign = -1
for i in prange(0, height):
for j in prange(0, width):
if sign == 1:
# Reverse the order of the pixels
j = width - j - 1
d = depth_image[i][j]
if d == 0:
continue
# Convert pixel to 3d point
x = (
(j - depth_camera_intrinsics[0][2])
* d
/ depth_camera_intrinsics[0][0]
)
y = (
(i - depth_camera_intrinsics[1][2])
* d
/ depth_camera_intrinsics[1][1]
)
z = d
# Move the point to the camera frame
x1 = (
camera_extrinsics[0][0] * x
+ camera_extrinsics[0][1] * y
+ camera_extrinsics[0][2] * z
+ camera_extrinsics[0][3]
)
y1 = (
camera_extrinsics[1][0] * x
+ camera_extrinsics[1][1] * y
+ camera_extrinsics[1][2] * z
+ camera_extrinsics[1][3]
)
z1 = (
camera_extrinsics[2][0] * x
+ camera_extrinsics[2][1] * y
+ camera_extrinsics[2][2] * z
+ camera_extrinsics[2][3]
)
u = (
color_camera_intrinsics[0][0] * (x1 / z1)
+ color_camera_intrinsics[0][2]
)
v = (
color_camera_intrinsics[1][1] * (y1 / z1)
+ color_camera_intrinsics[1][2]
)
int_u = int(math.floor(u + 0.5)) # numba-friendly “round”
int_v = int(math.floor(v + 0.5))
if int_v != i:
print(f"v -> {v} and i -> {i}") # This should never be printed
if (
int_u >= 0
and int_u < (len(image[0]) - 1)
and int_v >= 0
and int_v < len(image)
):
if depth_image_show is not None:
image[int_v][int_u] = thermalFrame[i][j]
image[int_v][int_u + sign] = thermalFrame[i][j]
else:
image[int_v][int_u] = thermalFrame[i][j]
image[int_v][int_u + sign] = thermalFrame[i][j]
return image
def getAlignedDepth(
thermalFrame, frameDepth, M1, D1, M2, D2, R, T, TARGET_MATRIX
):
# R1, R2, _, _, _, _, _ = cv2.stereoRectify(
# M1, D1, M2, D2, (100, 100), R, T
# ) # The (100,100) doesn't matter as it is not used for calculating the rotation matrices
R1, R2, _, _, _, _, _ = cv2.stereoRectify(
M1, D1, M2, D2, frameDepth.shape, R, T
) # The (100,100) doesn't matter as it is not used for calculating the rotation matrices
R1 = np.eye(3)
R2 = np.eye(3)
leftMapX, leftMapY = cv2.initUndistortRectifyMap(
M1, D1, R1, TARGET_MATRIX, depthSize, cv2.CV_32FC1
)
thermalFrame = cv2.remap(
thermalFrame, leftMapX, leftMapY, cv2.INTER_NEAREST
)
newR = np.dot(R2, np.dot(R, R1.T)) # Should be very close to identity
newT = np.dot(R2, T)
combinedExtrinsics = np.eye(4)
combinedExtrinsics[0:3, 0:3] = newR
combinedExtrinsics[0:3, 3] = newT
thermalAligned = reprojection(
thermalFrame,
frameDepth,
TARGET_MATRIX,
combinedExtrinsics,
TARGET_MATRIX,
)
# Rotate the depth to the RGB frame
R_back = R2.T
mapX, mapY = cv2.initUndistortRectifyMap(
TARGET_MATRIX, None, R_back, M2, rgbSize, cv2.CV_32FC1
)
outputAligned = cv2.remap(thermalAligned, mapX, mapY, cv2.INTER_NEAREST)
return outputAligned
def updateBlendWeights(percentRgb):
"""
Update the rgb and depth weights used to blend depth/rgb image
@param[in] percent_rgb The rgb weight expressed as a percentage (0..100)
"""
global thermalWeight
global rgbWeight
rgbWeight = float(percentRgb) / 100.0
thermalWeight = 1.0 - rgbWeight
def updateDepthPlane(depth):
global staticDepthPlane
staticDepthPlane = depth
def updateOffsetTX(newOffset):
global offsetTX
offsetTX = (newOffset - 3000) / 100
def updateOffsetTY(newOffset):
global offsetTY
offsetTY = (newOffset - 3000) / 100
def updateOffsetTZ(newOffset):
global offsetTZ
offsetTZ = (newOffset - 3000) / 100
def to_uint8_gray(img):
# If color, convert to gray
if img.ndim == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
if img.dtype == np.uint8:
return img
# Robust 16-bit → 8-bit mapping using percentiles
lo, hi = np.percentile(img, (1, 99))
scale = 255.0 / max(1e-6, (hi - lo))
img8 = np.clip((img - lo) * scale, 0, 255).astype(np.uint8)
return img8
# Connect to device and start pipeline
with device:
device.startPipeline(pipeline)
queue = device.getOutputQueue("out", 8, False)
cfgQ = device.getInputQueue("config")
# Configure windows; trackbar adjusts blending ratio of rgb/depth
windowName = "rgb-thermal"
# Set the window to be resizable and the initial size
cv2.namedWindow(windowName, cv2.WINDOW_NORMAL)
cv2.resizeWindow(windowName, 1280, 720)
cv2.namedWindow("thermal", cv2.WINDOW_NORMAL)
cv2.resizeWindow("thermal", 1280, 720)
cv2.createTrackbar(
"RGB Weight %",
windowName,
int(rgbWeight * 100),
100,
updateBlendWeights,
)
cv2.createTrackbar(
"Static Depth Plane [mm]",
windowName,
0,
2000,
updateDepthPlane,
)
cv2.createTrackbar(
"OffsetTX",
windowName,
3000,
6000,
updateOffsetTX,
)
cv2.createTrackbar(
"OffsetTY",
windowName,
3000,
6000,
updateOffsetTY,
)
cv2.createTrackbar(
"OffsetTZ",
windowName,
3000,
6000,
updateOffsetTZ,
)
fpsCounter = FPSCounter()
while True:
messageGroup = queue.get()
assert isinstance(messageGroup, dai.MessageGroup)
frameRgb = messageGroup["rgb"]
assert isinstance(frameRgb, dai.ImgFrame)
thermalAligned = messageGroup["aligned"]
assert isinstance(thermalAligned, dai.ImgFrame)
thermalRaw = messageGroup["thermal"]
assert isinstance(thermalRaw, dai.ImgFrame)
frameRgbCv = frameRgb.getCvFrame()
fpsCounter.tick()
# print(int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0]))
rgbIntrinsics = calibrationHandler.getCameraIntrinsics(
RGB_SOCKET, int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0])
)
cvFrameUndistorted = cv2.undistort(
frameRgbCv,
np.array(rgbIntrinsics),
np.array(rgbDistortion),
)
# Colorize the aligned depth
thermalFrame = thermalAligned.getCvFrame().astype(np.float32)
thermalIntrinsics = calibrationHandler.getCameraIntrinsics(
thermalSocket,
int(thermalFrame.shape[1]),
int(thermalFrame.shape[0]),
)
thermalUndistorted = cv2.undistort(
thermalFrame,
np.array(thermalIntrinsics),
np.array(thermalDistortion),
)
# Create a mask for nan values
mask = np.isnan(thermalUndistorted)
# Replace nan values with a mean for visualization
thermalUndistorted[mask] = np.nanmean(thermalUndistorted)
thermalUndistorted = cv2.normalize(
thermalUndistorted, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
)
colormappedFrame = cv2.applyColorMap(
thermalUndistorted, cv2.COLORMAP_MAGMA
)
# Apply the mask back with black pixels (0)
colormappedFrame[mask] = 0
RGB_SOCKET = dai.CameraBoardSocket.CAM_A
COLOR_RESOLUTION = (
dai.ColorCameraProperties.SensorResolution.THE_1080_P
)
ALIGN_SOCKET = dai.CameraBoardSocket.CAM_E
depthSize = (256, 192)
rgbSize = (640, 360)
try:
M1 = np.array(
calibrationHandler.getCameraIntrinsics(
ALIGN_SOCKET, *depthSize
)
)
D1 = np.array(
calibrationHandler.getDistortionCoefficients(ALIGN_SOCKET)
)
M2 = np.array(
calibrationHandler.getCameraIntrinsics(RGB_SOCKET, *rgbSize)
)
D2 = np.array(
calibrationHandler.getDistortionCoefficients(RGB_SOCKET)
)
T = (
np.array(
calibrationHandler.getCameraTranslationVector(
ALIGN_SOCKET, RGB_SOCKET, False
)
)
* 10
) # to mm for matching the depth
print('tvec1', T)
T[0] += offsetTX
T[1] += offsetTY
T[2] += offsetTZ
print('tvec1+o', T)
R = np.array(
calibrationHandler.getCameraExtrinsics(
ALIGN_SOCKET, RGB_SOCKET, False
)
)[0:3, 0:3]
TARGET_MATRIX = M1
lensPosition = calibrationHandler.getLensPosition(RGB_SOCKET)
except:
raise
staticDepthPlane_2D = np.full_like(
thermalRaw.getCvFrame().astype(np.float32),
staticDepthPlane,
dtype=np.float32,
)
#import importlib
#import dynamic
try:
#importlib.reload(dynamic)
alignedDepth = getAlignedDepth(
thermalRaw.getCvFrame().astype(np.float32),
staticDepthPlane_2D,
M1,
D1,
M2,
D2,
R,
T,
TARGET_MATRIX,
)
rgb = cvFrameUndistorted.astype(np.float32)
# --- colorize alignedDepth before blending ---
depth = alignedDepth.astype(np.float32)
depth_mask = np.isnan(depth) | (depth <= 0)
if np.any(~depth_mask):
vmin = np.nanpercentile(depth[~depth_mask], 2)
vmax = np.nanpercentile(depth[~depth_mask], 98)
if vmax <= vmin:
vmax = vmin + 1.0
else:
vmin, vmax = 0.0, 1.0
depth_clipped = np.clip(depth, vmin, vmax)
depth_clipped[depth_mask] = vmin
depth_u8 = cv2.normalize(
depth_clipped, None, 0, 255, cv2.NORM_MINMAX
).astype(np.uint8)
depth_color = cv2.applyColorMap(depth_u8, cv2.COLORMAP_MAGMA)
depth_color[depth_mask] = 0
depth_bgr32 = depth_color.astype(np.float32)
blended32 = cv2.addWeighted(
rgb, rgbWeight, depth_bgr32, thermalWeight, 0, dtype=cv2.CV_32F
)
blended = np.clip(blended32, 0, 255).astype(np.uint8)
# cv2.putText(
# blended,
# f"FPS: {fpsCounter.getFps():.2f}",
# (10, 30),
# cv2.FONT_HERSHEY_SIMPLEX,
# 1,
# (255, 255, 255),
# 2,
# )
cv2.imshow(windowName, blended)
thermalFrame_n = cv2.normalize(
thermalRaw.getCvFrame().astype(np.float32),
None,
0,
255,
cv2.NORM_MINMAX,
cv2.CV_8U,
)
thermalFrame_c = cv2.applyColorMap(thermalFrame_n, cv2.COLORMAP_MAGMA)
thermalIntrinsics = np.array(
calibrationHandler.getCameraIntrinsics(
thermalSocket,
int(thermalFrame.shape[1]),
int(thermalFrame.shape[0]),
)
)
# thermalFrame = cv2.undistort(
# thermalFrame,
# np.array(thermalIntrinsics),
# np.array(thermalDistortion),
# )
w, h = int(thermalFrame.shape[1]), int(thermalFrame.shape[0])
alpha = 0.45
new_camera_matrix, roi = cv2.getOptimalNewCameraMatrix(
thermalIntrinsics,
np.array(thermalDistortion),
(w, h),
alpha,
(w, h),
)
# Undistort frame
thermalFrame = cv2.undistort(
thermalFrame_c,
thermalIntrinsics,
np.array(thermalDistortion),
None,
new_camera_matrix,
)
# cv2.imshow(f'thermal-recfitied, alpha = {alpha}', thermalFrame)
# cv2.imshow('rgb-rectifted', cvFrameUndistorted)
# cv2.imshow("thermal", thermalFrame_c)
# cv2.imshow("rgb", frameRgbCv)
alpha = 0.45
w, h = int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0])
new_camera_matrix, roi = cv2.getOptimalNewCameraMatrix(
np.array(rgbIntrinsics),
np.array(rgbDistortion),
(w, h),
alpha,
(w, h),
)
cvFrameUndistorted = cv2.undistort(
frameRgbCv,
np.array(rgbIntrinsics),
np.array(rgbDistortion),
None,
new_camera_matrix,
)
except Exception as e:
print('Failed to display the frame:', e)
key = cv2.waitKey(1)
if key == ord("q"):
break
elif key == ord("m"):
import numpy as np
# If you run headless and see "libGL error: failed to load driver: swrast",
# uncomment the next two lines *before* importing pyplot:
# import matplotlib
# matplotlib.use("Agg") # or set env: MPLBACKEND=Agg
import matplotlib.pyplot as plt
tvec = calibrationHandler.getCameraTranslationVector(
RGB_SOCKET, dai.CameraBoardSocket.CAM_E, False
)
print('tvec2', tvec)
tvec = [-2.5, 0, 0]
tv = np.array(tvec, dtype=float).ravel()
tv_norm = np.linalg.norm(tv)
plt.figure(figsize=(12, 7), constrained_layout=True)
plt.subplot(2, 2, 1)
plt.title(f"Thermal rectified, alpha = {alpha}")
im1 = plt.imshow(thermalFrame)
plt.axis("off")
plt.colorbar(im1, fraction=0.046, pad=0.04)
plt.subplot(2, 2, 2)
plt.title(f"RGB rectified, alpha = {alpha}")
plt.imshow(cvFrameUndistorted)
plt.axis("off")
plt.subplot(2, 2, 3)
plt.title("RGB - thermal blend, alpha = 0")
plt.imshow(blended)
plt.axis("off")
ax4 = plt.subplot(2, 2, 4)
ax4.set_title("Translation vector (RGB → CAM_E)")
txt = (
f"t = [{tv[0]:.4g}, {tv[1]:.4g}, {tv[2]:.4g}]\n"
f"|t| = {tv_norm:.4g}"
)
ax4.text(
0.5,
0.55,
txt,
ha="center",
va="center",
fontsize=13,
family="monospace",
transform=ax4.transAxes,
)
ax4.text(
0.5,
0.2,
"Note: units/axes depend on your rig.",
ha="center",
va="center",
fontsize=9,
transform=ax4.transAxes,
)
ax4.axis("off")
plt.show()
cfg.staticDepthPlane = staticDepthPlane
cfgQ.send(cfg)
Thanks,
Jaka