J
jdevore

  • Apr 17, 2024
  • Joined Apr 15, 2024
  • 0 best answers
  • Hello,

    I just wanted to reach out to see what the best practices were for implementing a "Software Trigger." A lot of the examples assume that the camera is free running and that the user ingests photos at a fixed FPS. In our use case, we need to trigger frames at an externally specified interval, i.e. at a specific robot location, but we don't want the associated complexity of a hardware trigger (and would prefer to stick with the S2 model). Bandwidth is a concern so we have to drop frames at the edge.

    The color camera has what I'm looking for in the "still capture" mode. That said, I'm needing depth data synchronized with the color.

    At first, I used a script to throw away frames until a capture command was received. This worked OK (mostly) but felt like a hack. I then tried syncing still frames with depth data. After getting the queues set up just right, this seems to be stable.

    I found that if things weren't right, longer capture intervals caused a freeze (perhaps a combination of the queue setup, pool size, and the FPS of the camera). I eventually came to the attached code segment that seems to be stable at varying capture intervals.

    My question is -- is the a better / more native way to do software triggering? Thanks!

    #!/usr/bin/env python3
    
    import cv2  # type: ignore
    import numpy as np
    import depthai as dai
    import time
    import datetime
    
    # We'll want this to be high enough to make our sync threshold
    # reasonable
    FPS = 15
    
    # Create pipeline
    pipeline = dai.Pipeline()
    device = dai.Device()
    
    # Define sources and outputs
    camRgb = pipeline.create(dai.node.ColorCamera)
    
    control = pipeline.create(dai.node.XLinkIn)
    control.setStreamName("control")
    control.out.link(camRgb.inputControl)
    
    # Properties
    camRgb.setBoardSocket(dai.CameraBoardSocket.CAM_A)
    camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
    camRgb.setFps(FPS)
    
    left = pipeline.create(dai.node.MonoCamera)
    left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
    left.setCamera("left")
    left.setFps(FPS)
    left.setNumFramesPool(2)
    
    right = pipeline.create(dai.node.MonoCamera)
    right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
    right.setCamera("right")
    right.setFps(FPS)
    right.setNumFramesPool(2)
    
    stereo = pipeline.createStereoDepth()
    # stereo.setNumFramesPool(1)
    left.out.link(stereo.left)
    right.out.link(stereo.right)
    
    sync = pipeline.create(dai.node.Sync)  # type: ignore
    sync.setSyncThreshold(datetime.timedelta(milliseconds=5))  # type: ignore
    # We want to sync every frame
    sync.setSyncAttempts(-1)  # type: ignore
    
    camRgb.still.link(sync.inputs["rgb"])  # type: ignore
    stereo.depth.link(sync.inputs["depth"])  # type: ignore
    
    sync.inputs["rgb"].setBlocking(False)  # type: ignore
    sync.inputs["rgb"].setQueueSize(1)  # type: ignore
    sync.inputs["depth"].setBlocking(False)  # type: ignore
    sync.inputs["depth"].setQueueSize(1)  # type: ignore
    
    # This doesn't work as expected... maybe a bug?
    # for input in sync.getInputs():
    #     input.setBlocking(False)
    #     input.setQueueSize(1)
    
    syncOut = pipeline.create(dai.node.XLinkOut)
    syncOut.setStreamName("sync")
    sync.out.link(syncOut.input)  # type: ignore
    
    
    cv2.namedWindow("Luxonis", cv2.WINDOW_NORMAL)
    cv2.namedWindow("Depth", cv2.WINDOW_NORMAL)
    
    with device:
        device.startPipeline(pipeline)
        camera_control = device.getInputQueue(name="control")
    
        # rgbWindowName = "rgb"
        # cv2.namedWindow(rgbWindowName, cv2.WINDOW_NORMAL)
        sync_queue = device.getOutputQueue("sync", maxSize=4, blocking=False)
    
        ctrl = dai.CameraControl()
        ctrl.setCaptureStill(True)
    
        while True:
            print("Triggering Capture")
            camera_control.send(ctrl)
    
            frame = sync_queue.get()
    
            image_oakd: np.ndarray = frame["rgb"].getCvFrame()  # type: ignore
            image_depth: np.ndarray = frame["depth"].getCvFrame()  # type: ignore
    
            cv2.imshow("Luxonis", image_oakd)
            cv2.imshow("Depth", image_depth)
    
            if cv2.waitKey(1000) == ord("q"):
                break