jakaskerl
Of course. So what the code does is:
- Setup camera pipelines
- Record a video (color and stereo) and imu data segment while a certain condition is satisfied.
- Convert the encoded files at the end
I'd make sure to power the device via that powerbank and only use the PI for communication. -> I'm already doing that. The Pi and the camera are both connected to the Powerbank.
class CameraRecorder(Recorder):
def __init__(self) -> None:
super().__init__("camera")
def initialize(self) -> None:
(found, device_info) = dai.Device.getFirstAvailableDevice()
self.state = Recorder.State.IDLE if found else Recorder.State.NOT_FOUND
if found:
pipeline = dai.Pipeline()
self.color_fps = 20
color = pipeline.create(dai.node.ColorCamera)
color_enc = pipeline.create(dai.node.VideoEncoder)
xout_preview = pipeline.create(dai.node.XLinkOut)
xout_preview.setStreamName("preview")
xout_color = pipeline.create(dai.node.XLinkOut)
xout_color.setStreamName("color")
color.setCamera("color")
color.setBoardSocket(dai.CameraBoardSocket.CAM_A)
color.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
color.setFps(self.color_fps)
color_enc.setDefaultProfilePreset(self.color_fps, dai.VideoEncoderProperties.Profile.H265_MAIN)
xout_color.input.setBlocking(False)
xout_color.input.setQueueSize(1)
color.video.link(color_enc.input)
color.preview.link(xout_preview.input)
color_enc.bitstream.link(xout_color.input)
self.stereo_fps = 20
mono_left = pipeline.create(dai.node.MonoCamera)
mono_right = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
stereo_enc = pipeline.create(dai.node.VideoEncoder)
xout_stereo = pipeline.create(dai.node.XLinkOut)
mono_left.setCamera("left")
mono_right.setCamera("right")
for mono in (mono_left, mono_right):
mono.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
mono.setFps(self.stereo_fps)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
stereo.setRectifyEdgeFillColor(0)
stereo.setLeftRightCheck(False)
stereo.setExtendedDisparity(False)
stereo.setSubpixel(False)
xout_stereo.setStreamName("stereo")
stereo_enc.setDefaultProfilePreset(self.stereo_fps, dai.VideoEncoderProperties.Profile.H265_MAIN)
mono_left.out.link(stereo.left)
mono_right.out.link(stereo.right)
stereo.disparity.link(stereo_enc.input)
stereo_enc.bitstream.link(xout_stereo.input)
imu = pipeline.create(dai.node.IMU)
xout_imu = pipeline.create(dai.node.XLinkOut)
xout_imu.setStreamName("imu")
imu.enableIMUSensor([dai.IMUSensor.ACCELEROMETER_RAW, dai.IMUSensor.GYROSCOPE_RAW], 100)
imu.setBatchReportThreshold(1)
imu.setMaxBatchReports(10)
imu.out.link(xout_imu.input)
self.device = dai.Device(pipeline, maxUsbSpeed=dai.UsbSpeed.SUPER_PLUS)
def record_segment(self) -> str:
start_time = time.monotonic()
color_enc = self.device.getOutputQueue(name="color", maxSize=1, blocking=False)
stereo_enc = self.device.getOutputQueue(name="stereo", maxSize=1, blocking=False)
imu = self.device.getOutputQueue(name="imu", maxSize=50, blocking=False)
color_enc_output_path = str(get_output_path() / self.subfolder / "color_temp.h265")
color_output_path = str(get_output_file_path("color", "mp4", self.subfolder))
stereo_enc_output_path = str(get_output_path() / self.subfolder / "stereo_temp.h265")
stereo_output_path = str(get_output_file_path("stereo", "mp4", self.subfolder))
imu_output_path = str(get_output_file_path("imu", "csv", self.subfolder))
with open(imu_output_path, "a", newline="") as imu_file, open(color_enc_output_path, "wb") as color_file, open(
stereo_enc_output_path, "wb"
) as stereo_file:
imu_writer = csv.writer(imu_file)
while self.record_condition(time.monotonic() - start_time):
while color_enc.has():
color_enc.get().getData().tofile(color_file)
while stereo_enc.has():
stereo_enc.get().getData().tofile(stereo_file)
for packet in imu.get().packets:
gyro = packet.gyroscope
accel = packet.acceleroMeter
mag = packet.magneticField
rot = packet.rotationVector
time_stamp = datetime.now()
imu_data = [
time_stamp,
imuF.format(accel.x),
imuF.format(accel.y),
imuF.format(accel.z),
imuF.format(gyro.x),
imuF.format(gyro.y),
imuF.format(gyro.z),
imuF.format(mag.x),
imuF.format(mag.y),
imuF.format(mag.z),
imuF.format(rot.i),
imuF.format(rot.j),
imuF.format(rot.k),
rot.real,
rot.accuracy,
]
imu_writer.writerow(imu_data)
if Config.ENABLE_PREVIEW:
preview = self.device.getOutputQueue(name="preview")
cv2.imwrite(str(get_output_path() / self.subfolder / "preview.jpg"), preview.get().getFrame())
convert_h265_file(color_enc_output_path, color_output_path, self.color_fps)
convert_h265_file(stereo_enc_output_path, stereo_output_path, self.stereo_fps)
color_head, color_tail = os.path.split(color_output_path)
stereo_head, stereo_tail = os.path.split(stereo_output_path)
imu_head, imu_tail = os.path.split(imu_output_path)
return f"{color_tail}:{stereo_tail}:{imu_tail}"