yishu_corpex
 
"""
Due to an issue with our calibration, you might receive the following error when running this script on early release OAK Thermal devices:
```bash
[ImageAlign(4)] [error] Failed to get calibration data: Extrinsic connection between the requested cameraId's doesn't exist. Please recalibrate or modify your calibration data
If this happens, please download the calibration data + script from https://drive.google.com/drive/folders/1Q_MZMqWMKDC1eOqVHGPeDO-NJgFmnY5U,
place them into the same folder, connect the camera to the computer and run the script. This will update the
calibration and add required extrinsics between the camera sensors.
"""
#import cv2.aruco as aruco
import cv2
import depthai as dai
import numpy as np
import time
import json
from datetime import timedelta
class CharucoBoard:
    def __init__(
        self,
        squaresX=16,
        squaresY=9,
        squareSize=1.0,
        markerSize=0.8,
        dictSize=cv2.aruco.DICT_4X4_1000,
    ):
        """Charuco board configuration used in a captured dataset
        Args:
            squaresX (int, optional): Number of squares horizontally. Defaults to 16.
            squaresY (int, optional): Number of squares vertically. Defaults to 9.
            squareSize (float, optional): Length of the side of one square (cm). Defaults to 1.0.
            markerSize (float, optional): Length of the side of one marker (cm). Defaults to 0.8.
            dictSize (_type_, optional): cv2 aruco dictionary size. Defaults to cv2.aruco.DICT_4X4_1000.
        """
        self.squaresX = squaresX
        self.squaresY = squaresY
        self.squareSize = squareSize
        self.markerSize = markerSize
        self.dictSize = dictSize
    def __getstate__(
        self,
    ):  # Magic to allow pickling the instance without pickling the cv2 dictionary
        state = self.__dict__.copy()
        for hidden in ["_board", "_dictionary"]:
            if hidden in state:
                del state[hidden]
        return state
    def __build(self):
        self._dictionary = aruco.Dictionary_get(self.dictSize)
        self._board = aruco.CharucoBoard_create(
            self.squaresX,
            self.squaresY,
            self.squareSize,
            self.markerSize,
            self._dictionary,
        )
    @property
    def dictionary(self):
        """cv2 aruco dictionary"""
        if not hasattr(self, "_dictionary"):
            self.__build()
        return self._dictionary
    @property
    def board(self):
        """cv2 aruco board"""
        if not hasattr(self, "_board"):
            self.__build()
        return self._board
def detect_charuco_board(image: np.array, board: CharucoBoard):
    arucoParams = cv2.aruco.DetectorParameters_create()
    arucoParams.minMarkerDistanceRate = 0.01
    markers, marker_ids, rejectedImgPoints = cv2.aruco.detectMarkers(
        image, board.dictionary, parameters=arucoParams
    )  # First, detect markers
    marker_corners, marker_ids, refusd, recoverd = (
        cv2.aruco.refineDetectedMarkers(
            image,
            board.board,
            markers,
            marker_ids,
            rejectedCorners=rejectedImgPoints,
        )
    )
    criteria = (
        cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER,
        10000,
        0.00001,
    )
    # If found, add object points, image points (after refining them)
    return marker_corners, []
FPS = 25.0
RGB_SOCKET = dai.CameraBoardSocket.CAM_A
COLOR_RESOLUTION = dai.ColorCameraProperties.SensorResolution.THE_1080_P
class FPSCounter:
    def __init__(self):
        self.frameTimes = []
    def tick(self):
        now = time.time()
        self.frameTimes.append(now)
        self.frameTimes = self.frameTimes[-100:]
    def getFps(self):
        if len(self.frameTimes) <= 1:
            return 0
        # Calculate the FPS
        return (len(self.frameTimes) - 1) / (
            self.frameTimes[-1] - self.frameTimes[0]
        )
#device = dai.Device("10.11.1.239")
#device = dai.Device("10.11.1.206")
device = dai.Device("10.11.1.215")
print('mxid', device.getMxId())
print(f"device = {device.getDeviceName()}")
thermalWidth, thermalHeight = -1, -1
thermalFound = False
for features in device.getConnectedCameraFeatures():
    if dai.CameraSensorType.THERMAL in features.supportedTypes:
        thermalFound = True
        thermalSocket = features.socket
        thermalWidth, thermalHeight = features.width, features.height
        break
if not thermalFound:
    raise RuntimeError("No thermal camera found!")
ISP_SCALE = 3
calibrationHandler = device.readCalibration()
with open("calibration.json", "w") as fp:
    json.dump(calibrationHandler.eepromToJson(), fp, indent=2)
rgbDistortion = calibrationHandler.getDistortionCoefficients(RGB_SOCKET)
thermalDistortion = calibrationHandler.getDistortionCoefficients(thermalSocket)
distortionModel = calibrationHandler.getDistortionModel(RGB_SOCKET)
if distortionModel != dai.CameraModel.Perspective:
    raise RuntimeError(
        "Unsupported distortion model for RGB camera. This example supports only Perspective model."
    )
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
thermalCam = pipeline.create(dai.node.Camera)
thermalCam.setBoardSocket(thermalSocket)
thermalCam.setFps(FPS)
sync = pipeline.create(dai.node.Sync)
out = pipeline.create(dai.node.XLinkOut)
align = pipeline.create(dai.node.ImageAlign)
cfgIn = pipeline.create(dai.node.XLinkIn)
camRgb.setBoardSocket(RGB_SOCKET)
camRgb.setResolution(COLOR_RESOLUTION)
camRgb.setFps(FPS)
camRgb.setIspScale(1, ISP_SCALE)
out.setStreamName("out")
sync.setSyncThreshold(timedelta(seconds=0.5 / FPS))
cfgIn.setStreamName("config")
cfg = align.initialConfig.get()
staticDepthPlane = cfg.staticDepthPlane
offsetTX = 0
offsetTY = 0
offsetTZ = 0
# Linking
align.outputAligned.link(sync.inputs["aligned"])
camRgb.isp.link(sync.inputs["rgb"])
camRgb.isp.link(align.inputAlignTo)
thermalCam.raw.link(align.input)
thermalCam.raw.link(sync.inputs["thermal"])
sync.out.link(out.input)
cfgIn.out.link(align.inputConfig)
rgbWeight = 0.4
thermalWeight = 0.6
import numpy as np
from numba import jit, prange
import math
@jit(nopython=True, parallel=True)
def reprojection(
    thermalFrame,
    depth_image,
    depth_camera_intrinsics,
    camera_extrinsics,
    color_camera_intrinsics,
    depth_image_show=None,
):
    height = len(depth_image)
    width = len(depth_image[0])
    if depth_image_show is not None:
        image = np.zeros((height, width), np.uint8)
    else:
        image = np.zeros((height, width), np.uint16)
    if camera_extrinsics[0][3] > 0:
        sign = 1
    else:
        sign = -1
    for i in prange(0, height):
        for j in prange(0, width):
            if sign == 1:
                # Reverse the order of the pixels
                j = width - j - 1
            d = depth_image[i][j]
            if d == 0:
                continue
            # Convert pixel to 3d point
            x = (
                (j - depth_camera_intrinsics[0][2])
                * d
                / depth_camera_intrinsics[0][0]
            )
            y = (
                (i - depth_camera_intrinsics[1][2])
                * d
                / depth_camera_intrinsics[1][1]
            )
            z = d
            # Move the point to the camera frame
            x1 = (
                camera_extrinsics[0][0] * x
                + camera_extrinsics[0][1] * y
                + camera_extrinsics[0][2] * z
                + camera_extrinsics[0][3]
            )
            y1 = (
                camera_extrinsics[1][0] * x
                + camera_extrinsics[1][1] * y
                + camera_extrinsics[1][2] * z
                + camera_extrinsics[1][3]
            )
            z1 = (
                camera_extrinsics[2][0] * x
                + camera_extrinsics[2][1] * y
                + camera_extrinsics[2][2] * z
                + camera_extrinsics[2][3]
            )
            u = (
                color_camera_intrinsics[0][0] * (x1 / z1)
                + color_camera_intrinsics[0][2]
            )
            v = (
                color_camera_intrinsics[1][1] * (y1 / z1)
                + color_camera_intrinsics[1][2]
            )
            int_u = int(math.floor(u + 0.5))  # numba-friendly “round”
            int_v = int(math.floor(v + 0.5))
            if int_v != i:
                print(f"v -> {v} and i -> {i}")  # This should never be printed
            if (
                int_u >= 0
                and int_u < (len(image[0]) - 1)
                and int_v >= 0
                and int_v < len(image)
            ):
                if depth_image_show is not None:
                    image[int_v][int_u] = thermalFrame[i][j]
                    image[int_v][int_u + sign] = thermalFrame[i][j]
                else:
                    image[int_v][int_u] = thermalFrame[i][j]
                    image[int_v][int_u + sign] = thermalFrame[i][j]
    return image
def getAlignedDepth(
    thermalFrame, frameDepth, M1, D1, M2, D2, R, T, TARGET_MATRIX
):
    # R1, R2, _, _, _, _, _ = cv2.stereoRectify(
    #     M1, D1, M2, D2, (100, 100), R, T
    # )  # The (100,100) doesn't matter as it is not used for calculating the rotation matrices
    R1, R2, _, _, _, _, _ = cv2.stereoRectify(
        M1, D1, M2, D2, frameDepth.shape, R, T
    )  # The (100,100) doesn't matter as it is not used for calculating the rotation matrices
    R1 = np.eye(3)
    R2 = np.eye(3)
    
    leftMapX, leftMapY = cv2.initUndistortRectifyMap(
        M1, D1, R1, TARGET_MATRIX, depthSize, cv2.CV_32FC1
    )
    thermalFrame = cv2.remap(
        thermalFrame, leftMapX, leftMapY, cv2.INTER_NEAREST
    )
    newR = np.dot(R2, np.dot(R, R1.T))  # Should be very close to identity
    newT = np.dot(R2, T)
    combinedExtrinsics = np.eye(4)
    combinedExtrinsics[0:3, 0:3] = newR
    combinedExtrinsics[0:3, 3] = newT
    thermalAligned = reprojection(
        thermalFrame,
        frameDepth,
        TARGET_MATRIX,
        combinedExtrinsics,
        TARGET_MATRIX,
    )
    # Rotate the depth to the RGB frame
    R_back = R2.T
    mapX, mapY = cv2.initUndistortRectifyMap(
        TARGET_MATRIX, None, R_back, M2, rgbSize, cv2.CV_32FC1
    )
    outputAligned = cv2.remap(thermalAligned, mapX, mapY, cv2.INTER_NEAREST)
    return outputAligned
def updateBlendWeights(percentRgb):
    """
    Update the rgb and depth weights used to blend depth/rgb image
    @param[in] percent_rgb The rgb weight expressed as a percentage (0..100)
    """
    global thermalWeight
    global rgbWeight
    rgbWeight = float(percentRgb) / 100.0
    thermalWeight = 1.0 - rgbWeight
def updateDepthPlane(depth):
    global staticDepthPlane
    staticDepthPlane = depth
def updateOffsetTX(newOffset):
    global offsetTX
    offsetTX = (newOffset - 3000) / 100
def updateOffsetTY(newOffset):
    global offsetTY
    offsetTY = (newOffset - 3000) / 100
def updateOffsetTZ(newOffset):
    global offsetTZ
    offsetTZ = (newOffset - 3000) / 100
def to_uint8_gray(img):
    # If color, convert to gray
    if img.ndim == 3:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    if img.dtype == np.uint8:
        return img
    # Robust 16-bit → 8-bit mapping using percentiles
    lo, hi = np.percentile(img, (1, 99))
    scale = 255.0 / max(1e-6, (hi - lo))
    img8 = np.clip((img - lo) * scale, 0, 255).astype(np.uint8)
    return img8
# Connect to device and start pipeline
with device:
    device.startPipeline(pipeline)
    queue = device.getOutputQueue("out", 8, False)
    cfgQ = device.getInputQueue("config")
    # Configure windows; trackbar adjusts blending ratio of rgb/depth
    windowName = "rgb-thermal"
    # Set the window to be resizable and the initial size
    cv2.namedWindow(windowName, cv2.WINDOW_NORMAL)
    cv2.resizeWindow(windowName, 1280, 720)
    cv2.namedWindow("thermal", cv2.WINDOW_NORMAL)
    cv2.resizeWindow("thermal", 1280, 720)
    cv2.createTrackbar(
        "RGB Weight %",
        windowName,
        int(rgbWeight * 100),
        100,
        updateBlendWeights,
    )
    cv2.createTrackbar(
        "Static Depth Plane [mm]",
        windowName,
        0,
        2000,
        updateDepthPlane,
    )
    cv2.createTrackbar(
        "OffsetTX",
        windowName,
        3000,
        6000,
        updateOffsetTX,
    )
    cv2.createTrackbar(
        "OffsetTY",
        windowName,
        3000,
        6000,
        updateOffsetTY,
    )
    cv2.createTrackbar(
        "OffsetTZ",
        windowName,
        3000,
        6000,
        updateOffsetTZ,
    )
    fpsCounter = FPSCounter()
    while True:
        messageGroup = queue.get()
        assert isinstance(messageGroup, dai.MessageGroup)
        frameRgb = messageGroup["rgb"]
        assert isinstance(frameRgb, dai.ImgFrame)
        thermalAligned = messageGroup["aligned"]
        assert isinstance(thermalAligned, dai.ImgFrame)
        thermalRaw = messageGroup["thermal"]
        assert isinstance(thermalRaw, dai.ImgFrame)
        frameRgbCv = frameRgb.getCvFrame()
        fpsCounter.tick()
        # print(int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0]))
        rgbIntrinsics = calibrationHandler.getCameraIntrinsics(
            RGB_SOCKET, int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0])
        )
        cvFrameUndistorted = cv2.undistort(
            frameRgbCv,
            np.array(rgbIntrinsics),
            np.array(rgbDistortion),
        )
        # Colorize the aligned depth
        thermalFrame = thermalAligned.getCvFrame().astype(np.float32)
        thermalIntrinsics = calibrationHandler.getCameraIntrinsics(
            thermalSocket,
            int(thermalFrame.shape[1]),
            int(thermalFrame.shape[0]),
        )
        thermalUndistorted = cv2.undistort(
            thermalFrame,
            np.array(thermalIntrinsics),
            np.array(thermalDistortion),
        )
        # Create a mask for nan values
        mask = np.isnan(thermalUndistorted)
        # Replace nan values with a mean for visualization
        thermalUndistorted[mask] = np.nanmean(thermalUndistorted)
        thermalUndistorted = cv2.normalize(
            thermalUndistorted, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U
        )
        colormappedFrame = cv2.applyColorMap(
            thermalUndistorted, cv2.COLORMAP_MAGMA
        )
        # Apply the mask back with black pixels (0)
        colormappedFrame[mask] = 0
        RGB_SOCKET = dai.CameraBoardSocket.CAM_A
        COLOR_RESOLUTION = (
            dai.ColorCameraProperties.SensorResolution.THE_1080_P
        )
        ALIGN_SOCKET = dai.CameraBoardSocket.CAM_E
        depthSize = (256, 192)
        rgbSize = (640, 360)
        try:
            M1 = np.array(
                calibrationHandler.getCameraIntrinsics(
                    ALIGN_SOCKET, *depthSize
                )
            )
            D1 = np.array(
                calibrationHandler.getDistortionCoefficients(ALIGN_SOCKET)
            )
            M2 = np.array(
                calibrationHandler.getCameraIntrinsics(RGB_SOCKET, *rgbSize)
            )
            D2 = np.array(
                calibrationHandler.getDistortionCoefficients(RGB_SOCKET)
            )
            T = (
                np.array(
                    calibrationHandler.getCameraTranslationVector(
                        ALIGN_SOCKET, RGB_SOCKET, False
                    )
                )
                * 10
            )  # to mm for matching the depth
            print('tvec1', T)
            T[0] += offsetTX
            T[1] += offsetTY
            T[2] += offsetTZ
            print('tvec1+o', T)
            R = np.array(
                calibrationHandler.getCameraExtrinsics(
                    ALIGN_SOCKET, RGB_SOCKET, False
                )
            )[0:3, 0:3]
            TARGET_MATRIX = M1
            lensPosition = calibrationHandler.getLensPosition(RGB_SOCKET)
        except:
            raise
        staticDepthPlane_2D = np.full_like(
            thermalRaw.getCvFrame().astype(np.float32),
            staticDepthPlane,
            dtype=np.float32,
        )
        
        #import importlib
        #import dynamic
        
        try:
            #importlib.reload(dynamic)
            alignedDepth = getAlignedDepth(
                thermalRaw.getCvFrame().astype(np.float32),
                staticDepthPlane_2D,
                M1,
                D1,
                M2,
                D2,
                R,
                T,
                TARGET_MATRIX,
            )
            rgb = cvFrameUndistorted.astype(np.float32)
            # --- colorize alignedDepth before blending ---
            depth = alignedDepth.astype(np.float32)
            depth_mask = np.isnan(depth) | (depth <= 0)
            if np.any(~depth_mask):
                vmin = np.nanpercentile(depth[~depth_mask], 2)
                vmax = np.nanpercentile(depth[~depth_mask], 98)
                if vmax <= vmin:
                    vmax = vmin + 1.0
            else:
                vmin, vmax = 0.0, 1.0
            depth_clipped = np.clip(depth, vmin, vmax)
            depth_clipped[depth_mask] = vmin
            depth_u8 = cv2.normalize(
                depth_clipped, None, 0, 255, cv2.NORM_MINMAX
            ).astype(np.uint8)
            depth_color = cv2.applyColorMap(depth_u8, cv2.COLORMAP_MAGMA)
            depth_color[depth_mask] = 0
            depth_bgr32 = depth_color.astype(np.float32)
            blended32 = cv2.addWeighted(
                rgb, rgbWeight, depth_bgr32, thermalWeight, 0, dtype=cv2.CV_32F
            )
            blended = np.clip(blended32, 0, 255).astype(np.uint8)
            # cv2.putText(
            #     blended,
            #     f"FPS: {fpsCounter.getFps():.2f}",
            #     (10, 30),
            #     cv2.FONT_HERSHEY_SIMPLEX,
            #     1,
            #     (255, 255, 255),
            #     2,
            # )
            cv2.imshow(windowName, blended)
            thermalFrame_n = cv2.normalize(
                thermalRaw.getCvFrame().astype(np.float32),
                None,
                0,
                255,
                cv2.NORM_MINMAX,
                cv2.CV_8U,
            )
            thermalFrame_c = cv2.applyColorMap(thermalFrame_n, cv2.COLORMAP_MAGMA)
            thermalIntrinsics = np.array(
                calibrationHandler.getCameraIntrinsics(
                    thermalSocket,
                    int(thermalFrame.shape[1]),
                    int(thermalFrame.shape[0]),
                )
            )
            # thermalFrame = cv2.undistort(
            #     thermalFrame,
            #     np.array(thermalIntrinsics),
            #     np.array(thermalDistortion),
            # )
            w, h = int(thermalFrame.shape[1]), int(thermalFrame.shape[0])
            alpha = 0.45
            new_camera_matrix, roi = cv2.getOptimalNewCameraMatrix(
                thermalIntrinsics,
                np.array(thermalDistortion),
                (w, h),
                alpha,
                (w, h),
            )
            # Undistort frame
            thermalFrame = cv2.undistort(
                thermalFrame_c,
                thermalIntrinsics,
                np.array(thermalDistortion),
                None,
                new_camera_matrix,
            )
            # cv2.imshow(f'thermal-recfitied, alpha = {alpha}', thermalFrame)
            # cv2.imshow('rgb-rectifted', cvFrameUndistorted)
            # cv2.imshow("thermal", thermalFrame_c)
            # cv2.imshow("rgb", frameRgbCv)
            alpha = 0.45
            w, h = int(frameRgbCv.shape[1]), int(frameRgbCv.shape[0])
            new_camera_matrix, roi = cv2.getOptimalNewCameraMatrix(
                np.array(rgbIntrinsics),
                np.array(rgbDistortion),
                (w, h),
                alpha,
                (w, h),
            )
            cvFrameUndistorted = cv2.undistort(
                frameRgbCv,
                np.array(rgbIntrinsics),
                np.array(rgbDistortion),
                None,
                new_camera_matrix,
            )
        except Exception as e:
            print('Failed to display the frame:', e)
        key = cv2.waitKey(1)
        if key == ord("q"):
            break
        elif key == ord("m"):
            import numpy as np
            # If you run headless and see "libGL error: failed to load driver: swrast",
            # uncomment the next two lines *before* importing pyplot:
            # import matplotlib
            # matplotlib.use("Agg")  # or set env: MPLBACKEND=Agg
            import matplotlib.pyplot as plt
            tvec = calibrationHandler.getCameraTranslationVector(
                RGB_SOCKET, dai.CameraBoardSocket.CAM_E, False
            )
            print('tvec2', tvec)
            tvec = [-2.5, 0, 0]
            tv = np.array(tvec, dtype=float).ravel()
            tv_norm = np.linalg.norm(tv)
            plt.figure(figsize=(12, 7), constrained_layout=True)
            plt.subplot(2, 2, 1)
            plt.title(f"Thermal rectified, alpha = {alpha}")
            im1 = plt.imshow(thermalFrame)
            plt.axis("off")
            plt.colorbar(im1, fraction=0.046, pad=0.04)
            plt.subplot(2, 2, 2)
            plt.title(f"RGB rectified, alpha = {alpha}")
            plt.imshow(cvFrameUndistorted)
            plt.axis("off")
            plt.subplot(2, 2, 3)
            plt.title("RGB - thermal blend, alpha = 0")
            plt.imshow(blended)
            plt.axis("off")
            ax4 = plt.subplot(2, 2, 4)
            ax4.set_title("Translation vector (RGB → CAM_E)")
            txt = (
                f"t = [{tv[0]:.4g}, {tv[1]:.4g}, {tv[2]:.4g}]\n"
                f"|t| = {tv_norm:.4g}"
            )
            ax4.text(
                0.5,
                0.55,
                txt,
                ha="center",
                va="center",
                fontsize=13,
                family="monospace",
                transform=ax4.transAxes,
            )
            ax4.text(
                0.5,
                0.2,
                "Note: units/axes depend on your rig.",
                ha="center",
                va="center",
                fontsize=9,
                transform=ax4.transAxes,
            )
            ax4.axis("off")
            plt.show()
        cfg.staticDepthPlane = staticDepthPlane
        cfgQ.send(cfg)
Thanks, 
Jaka