from pathlib import Path
import cv2
from depthai import CameraBoardSocket
from depthai import ColorCameraProperties
from depthai import Device
from depthai import IMUSensor
from depthai import ImgFrame
from depthai import MonoCameraProperties
from depthai import Pipeline
from depthai import RawStereoDepthConfig
from depthai import StereoDepthProperties
from depthai import VideoEncoderProperties
HI_RES_STREAM_NAME = "hi_res"
COLOR_STREAM_NAME = "color"
CONTROL_STREAM_NAME = "control"
RIGHT_STREAM_NAME = "right"
DEPTH_COLOR_STREAM_NAME = "depth_color_aligned"
DEPTH_RIGHT_STREAM_NAME = "depth_right_aligned"
FEATURES_STREAM_NAME = "features"
DETECTIONS_STREAM_NAME = "detections"
IMU_STREAM_NAME = "imu"
HI_RES_IMAGE_SCRIPT_ISTREAM = "IN"
HI_RES_IMAGE_SCRIPT_OSTREAM = "OUT"
def create_hi_res_image_script(istream: str, ostream: str, modulo: int) -> str:
return f"""
i = 0
while True:
image = node.io["{istream}"].get()
i += 1
i %= {modulo}
if i == 0:
node.io["{ostream}"].send(image)
"""
pipeline = Pipeline()
color = pipeline.createColorCamera()
color.setBoardSocket(CameraBoardSocket.RGB)
color.setFps(5.0)
color.setResolution(ColorCameraProperties.SensorResolution.THE_4_K)
color.setVideoSize(2144, 2144)
color.setInterleaved(False)
color.initialControl.setSharpness(0)
color.initialControl.setLumaDenoise(0)
color.initialControl.setChromaDenoise(1)
color.initialControl.setAutoFocusLensRange(0, 255)
color.initialControl.setAutoExposureLimit(1000)
left = pipeline.createMonoCamera()
left.setBoardSocket(CameraBoardSocket.LEFT)
left.setFps(10.0)
left.setResolution(MonoCameraProperties.SensorResolution.THE_400_P)
right = pipeline.createMonoCamera()
right.setBoardSocket(CameraBoardSocket.RIGHT)
right.setFps(10.0)
right.setResolution(MonoCameraProperties.SensorResolution.THE_400_P)
depth = pipeline.createStereoDepth()
depth.setLeftRightCheck(True)
depth.initialConfig.setConfidenceThreshold(180)
depth.initialConfig.setMedianFilter(StereoDepthProperties.MedianFilter.MEDIAN_OFF)
depth.initialConfig.setSubpixel(True)
depth.initialConfig.setBilateralFilterSigma(0)
depth.initialConfig.setSubpixelFractionalBits(5)
depth.initialConfig.setDisparityShift(30)
depth.initialConfig.setDepthUnit(RawStereoDepthConfig.AlgorithmControl.DepthUnit.CUSTOM)
depth_config = depth.initialConfig.get()
depth_config.algorithmControl.customDepthUnitMultiplier = 50000
depth.initialConfig.set(depth_config)
depth.enableDistortionCorrection(True)
manip = pipeline.createImageManip()
manip.initialConfig.setResize(448, 448)
manip.initialConfig.setFrameType(ImgFrame.Type.BGR888p)
hi_res_image_script = pipeline.createScript()
hi_res_image_script.setScript(create_hi_res_image_script(HI_RES_IMAGE_SCRIPT_ISTREAM, HI_RES_IMAGE_SCRIPT_OSTREAM, 40))
video_encoder = pipeline.createVideoEncoder()
video_encoder.setProfile(VideoEncoderProperties.Profile.MJPEG)
video_encoder.setQuality(90)
features = pipeline.createFeatureTracker()
features.setHardwareResources(numShaves=2, numMemorySlices=2)
detection = pipeline.createYoloDetectionNetwork()
detection.setBlobPath(Path.home() / 'path' / 'to' / 'blob')
detection.setConfidenceThreshold(0.1)
detection.setNumClasses(2)
detection.setCoordinateSize(4)
detection.setIouThreshold(0.5)
detection.setNumInferenceThreads(2)
detection.input.setBlocking(False)
detection.input.setQueueSize(1)
imu = pipeline.createIMU()
imu.enableIMUSensor(IMUSensor.GYROSCOPE_RAW, 50)
imu.enableIMUSensor(IMUSensor.ACCELEROMETER_RAW, 50)
imu.setBatchReportThreshold(1)
imu.setMaxBatchReports(10)
control_in = pipeline.createXLinkIn()
hi_res_out = pipeline.createXLinkOut()
color_out = pipeline.createXLinkOut()
right_out = pipeline.createXLinkOut()
depth_out = pipeline.createXLinkOut()
features_out = pipeline.createXLinkOut()
detection_out = pipeline.createXLinkOut()
imu_out = pipeline.createXLinkOut()
control_in.setStreamName(CONTROL_STREAM_NAME)
hi_res_out.setStreamName(HI_RES_STREAM_NAME)
color_out.setStreamName(COLOR_STREAM_NAME)
right_out.setStreamName(RIGHT_STREAM_NAME)
depth_out.setStreamName(DEPTH_RIGHT_STREAM_NAME)
features_out.setStreamName(FEATURES_STREAM_NAME)
detection_out.setStreamName(DETECTIONS_STREAM_NAME)
imu_out.setStreamName(IMU_STREAM_NAME)
control_in.out.link(color.inputControl)
color.video.link(manip.inputImage)
color.video.link(hi_res_image_script.inputs[HI_RES_IMAGE_SCRIPT_ISTREAM])
manip.out.link(color_out.input)
manip.out.link(detection.input)
hi_res_image_script.outputs[HI_RES_IMAGE_SCRIPT_OSTREAM].link(video_encoder.input)
video_encoder.bitstream.link(hi_res_out.input)
left.out.link(depth.left)
right.out.link(depth.right)
depth.depth.link(depth_out.input)
depth.rectifiedRight.link(features.inputImage)
features.outputFeatures.link(features_out.input)
detection.out.link(detection_out.input)
depth.rectifiedRight.link(right_out.input)
imu.out.link(imu_out.input)
depth_out.input.setBlocking(False)
depth_out.input.setQueueSize(1)
#
with Device(pipeline) as device:
color_queue = device.getOutputQueue(name=COLOR_STREAM_NAME, maxSize=4, blocking=False)
while True:
color_frame = color_queue.get().getCvFrame()
print(str(device.readCalibration().getCameraIntrinsics(CameraBoardSocket.RGB)))
print(str(device.readCalibration().getDistortionCoefficients(CameraBoardSocket.RGB)))
cv2.imshow('color', color_frame)
cv2.waitKey(100)