I cannot run a simple pipeline that rectifies the frames from the main camera at full (48MP) resolution. I successfully rectified the image with the provided calibration data from the device at 1920×1080 and 3840×2160, but running the same pipeline at resolutions 2000×1500, 4000×3000, or 8000×6000 provides the following artefact with no errors:

Could you help me rectifying the image on the device side?
Here there is a MRE:
import cv2
import depthai as dai
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass
@dataclass
class PoseData:
resolution: tuple[int, int]
pose: NDArray
roi: tuple[int, int, int, int]
@dataclass(frozen=True)
class LensCalibrationData:
# Camera intrinsic matrix
intrinsics: NDArray[np.float64]
# Distortion coefficients
distortion: NDArray[np.float64]
# Resolution at calibration
resolution: tuple[int, int]
@classmethod
def from_luxonis(cls, calib_data: dai.CalibrationHandler, camera_socket: dai.CameraBoardSocket,
target_size: tuple[int, int] = None):
if target_size is not None:
width, height = target_size
intrinsics = calib_data.getCameraIntrinsics(camera_socket, resizeWidth=width, resizeHeight=height)
else:
intrinsics, width, height = calib_data.getDefaultIntrinsics(camera_socket)
assert calib_data.getDistortionModel(camera_socket) == dai.CameraModel.Perspective
return cls(
intrinsics=np.array(intrinsics),
distortion=np.array(calib_data.getDistortionCoefficients(camera_socket)),
resolution=(width, height)
)
def get_rectification_map(calibration_data: LensCalibrationData, alpha=0):
intrinsics, distortion = calibration_data.intrinsics, calibration_data.distortion
mtx, roi = cv2.getOptimalNewCameraMatrix(intrinsics, distortion, calibration_data.resolution, alpha,
calibration_data.resolution)
map_x, map_y = cv2.initUndistortRectifyMap(intrinsics, distortion, np.eye(3), mtx, calibration_data.resolution,
cv2.CV_32F)
return map_x, map_y, mtx, roi
def get_mesh(calibration_data: LensCalibrationData, alpha=0):
map_x, map_y, mtx, roi = get_rectification_map(calibration_data, alpha)
# CELL_SIZE = 16
CELL_SIZE = 64
assert map_x.shape == map_y.shape
overextension0, overextension1 = map_x.shape[0] % CELL_SIZE, map_x.shape[1] % CELL_SIZE
if overextension0 != 0 or overextension1 != 0:
map_x = map_x[:map_x.shape[0] - overextension0, :map_x.shape[1] - overextension1]
map_y = map_y[:map_y.shape[0] - overextension0, :map_y.shape[1] - overextension1]
m = np.empty((map_x.shape[0] // CELL_SIZE, 2 * (map_x.shape[1] // CELL_SIZE)))
m[:, ::2] = map_x[::CELL_SIZE, ::CELL_SIZE]
m[:, 1::2] = map_y[::CELL_SIZE, ::CELL_SIZE]
return m, PoseData(calibration_data.resolution, mtx, tuple(roi))
def _warp_mesh(rectify: dai.node.Warp, mesh: NDArray):
mesh_width = mesh.shape[1] // 2
mesh_height = mesh.shape[0]
mesh.resize(mesh_width * mesh_height, 2)
assert len(mesh.shape) == 2 and mesh.shape[1] == 2
rectify.setWarpMesh([dai.Point2f(p[0], p[1]) for p in mesh], mesh_width, mesh_height)
# 1280*720@120fps,1920*1080@60fps,2000*1500@50fps,3840*2160@20fps,4000*3000@14fps,8000*6000@3fps
# https://www.arducam.com/downloads/datasheet/B0478_48MP_IMX586_USB3.0_Camera_Datasheet.pdf
FULL_RES = (8000, 6000) # 48MP
LOW_RES = (3840, 2160)
RESOLUTION = LOW_RES # FULL_RES does not work
with dai.Device() as device:
print(f'Camera {device.getProductName()}, MXID {device.getDeviceId()}')
print(f"IMU type: {device.getConnectedIMU()}, firmware version: {device.getIMUFirmwareVersion()}")
mesh, pose = get_mesh(
LensCalibrationData.from_luxonis(device.readCalibration(), dai.CameraBoardSocket.CAM_A, RESOLUTION), 1)
with dai.Pipeline() as pipeline:
camera = pipeline.create(dai.node.Camera).build()
camera_output = camera.requestOutput(size=RESOLUTION, type=dai.ImgFrame.Type.GRAY8)
rectify = pipeline.create(dai.node.Warp)
rectify.setInterpolation(dai.Interpolation.BILINEAR)
_warp_mesh(rectify, mesh)
rectify.setOutputSize(RESOLUTION)
rectify.setMaxOutputFrameSize(2 * RESOLUTION[0] * RESOLUTION[1])
camera_output.link(rectify.inputImage)
output_queue = rectify.out.createOutputQueue()
cv2.namedWindow("Image", cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO)
pipeline.start()
while pipeline.isRunning():
message: dai.ImgFrame = output_queue.get()
frame = message.getCvFrame()
assert message.getType() == dai.ImgFrame.Type.GRAY8
print(f'Resolution: {message.getWidth()}x{message.getHeight()}')
cv2.imshow("detections", frame)
if cv2.waitKey(1) == ord("q"):
break