But Here I have the MRE now.
I hope this is ok. The formatting is a bit off.
In line 51 and 52 the fps and resolution can be changed:
import contextlib
import logging
import os
from datetime import datetime
import cv2
import depthai as dai
filename = None
class VideoSaver(dai.node.HostNode):
def init(self, *args, **kwargs):
dai.node.HostNode.init(self, *args, **kwargs)
global filename
self.file_handle = open(filename, 'wb')
def build(self, \*args):
self.link_args(\*args)
return self
def process(self, frame):
frame.getData().tofile(self.file_handle)
class Camera(object):
_instance = None
_ready = False
fps = None
video_savers = []
def __new__(cls, _mock: bool=False):
if cls._instance is None:
logging.debug("Initiate camera instance.")
cls._instance = super(Camera, cls).__new__(cls)
cls.fps = 25
cls._mock = _mock
return cls._instance
def __init__(self, _mock: bool=False):
self.MAX_DURATION = 20 # Maximum duration in seconds for recording
self.cam_path = "Videorecording"
def createPipeline(self, pipeline, mxId: str) -> tuple[dai.Pipeline, dai.MessageQueue | None]:
pipeline.setXLinkChunkSize(0) # to avoid splitting the encoded frames into chunks (important for 4k)
global filename
os.makedirs(os.path.join(self.cam_path), exist_ok=True)
filename = os.path.join(self.cam_path, f"video_{mxId}.encoded")
fps = 30
resolution = (1280, 800)
# Camera queue definitions
cam = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
output = cam.requestOutput(
resolution,
type=dai.ImgFrame.Type.NV12,
enableUndistortion=True,
fps=fps
)
encoded = pipeline.create(dai.node.VideoEncoder).build(
output, frameRate=fps, profile=dai.VideoEncoderProperties.Profile.MJPEG
)
self.video_savers.append(pipeline.create(VideoSaver).build(encoded.out))
return pipeline, None
def run(self):
logging.info("Start process to record with camera.")
with contextlib.ExitStack() as stack:
deviceInfos = dai.Device.getAllAvailableDevices()
print("=== Found devices: ", deviceInfos)
pipelines: dict = {}
# Create a pipeline and queues for each connected device
for ctr, deviceInfo in enumerate(deviceInfos):
# Create pipeline context
pipeline = stack.enter_context(dai.Pipeline())
# Create and start pipeline for each device
pipeline, output = self.createPipeline(pipeline, str(ctr))
pipelines[str(ctr)] = pipeline
# Run the main loop for all connected devices
for key, pipeline in pipelines.items():
pipeline.start()
print(f"Camera [{key}] started recording at: {datetime.now()}")
start_time = datetime.now()
while self.ready and (datetime.now() - start_time).total_seconds() < 10:
if cv2.waitKey(1) == ord("q") or not self.ready:
logging.info("Stopping camera recording.")
self.ready = False
break
# Cleanup: Stop all pipelines
for pipeline in pipelines.values():
pipeline.stop()
pipeline.wait()
for saver in self.video_savers:
saver.file_handle.close()
@property
def ready(self) -> bool:
return self._ready
@ready.setter
def ready(self, value: bool):
self._ready = value
if name == "main":
cam = Camera()
cam.ready = True
cam.run()