I've inherited demo code (Python) for a human detection application using an OAK-D SR PoE camera and DepthAI v3-rc4. I am working to integrate it in a more robust form into a robotics application. I am new to DepthAI.
The robot that contains this camera includes several other types of OAK cameras for other needs. All the cameras have statically assigned IPs on the Robot's internal network. The other cameras are used by a third party navigation platform. So, the other 4 cameras exist on the network, but I do not have access to the code that communicates with them.
Rather than being fully abstract, the pipeline setup I am working to improve appears to trigger a camera connection attempt as part of setting up the nodes, cameras, detection network, etc. Because of this and the 5 total cameras on the network (4 of which do not match the pipeline I am working with), pipeline creation regularly fails. I've had to run a retry loop that tries again after catching the exception ("Camera socket not found on the connected device."). This is both a time-consuming startup approach and quite brittle/ugly given that we know exactly which camera should be used.
The code I have does work, but it is not sufficiently robust for production. I need to tackle these connection issues, using a snapshot of the detection model blob from Model Zoo, etc.
Generally speaking, what's the right approach to set up a pipeline and run a detection network that only attempts to connect to one specific, target camera using v3? The code I'm working with does not presently use a with depthai.Device(…)
context and is definitely trying to connect to cameras behind the scenes during pipeline set up before detection even begins.
Specifically, what in the pipeline setup routine below is triggering camera connection, and how can I modify this routine and the consumers of the queues and detection network it creates to only communicate with a specific camera IP address?
I've tried a handful of approaches but am still fumbling through understanding the basic concepts of using DepthAI and the v3 API. So far I've not found a code example that sufficiently demonstrates a connection to a specific camera by IP address with these functions of this pipeline.
def _create_depthai_pipeline(self):
"""Create and configure the DepthAI pipeline"""
# Find the target device by IP address
# target_device_info = self._find_device_by_ip('10.4.8.125')
# if not target_device_info:
# raise RuntimeError(f"DepthAI device not found at IP: {'10.4.8.125'}")
self._pipeline = dai.Pipeline()
# Set up ToF sensor
tof = self._pipeline.create(dai.node.ToF).build()
# Set up RGB Camera
camera_node = self._pipeline.create(dai.node.Camera).build(
boardSocket=dai.CameraBoardSocket.CAM_C, # RGB_SOCKET
sensorFps=self._fps
)
camera_full = camera_node.requestFullResolutionOutput(type=dai.ImgFrame.Type.BGR888p)
camera_nn = camera_node.requestOutput(
size=(416, 416),
resizeMode=dai.ImgResizeMode.LETTERBOX,
type=dai.ImgFrame.Type.BGR888p
)
# Set up image processing for ToF alignment
tof_resize = self._pipeline.create(dai.node.ImageManip)
tof_resize.initialConfig.setOutputSize(416, 416, dai.ImageManipConfig.ResizeMode.LETTERBOX)
# Set up detection network
self._detection_network = self._pipeline.create(dai.node.DetectionNetwork)
self._detection_network.setFromModelZoo(
dai.NNModelDescription(model="yolov6-nano", platform="RVC2"),
useCached=True
)
# Create sync and align nodes
sync = self._pipeline.create(dai.node.Sync)
sync.setSyncThreshold(timedelta(seconds=0.5 / self._fps))
align = self._pipeline.create(dai.node.ImageAlign)
# Linking
camera_nn.link(self._detection_network.input)
tof.depth.link(align.input)
camera_full.link(align.inputAlignTo)
align.outputAligned.link(tof_resize.inputImage)
self._detection_network.passthrough.link(sync.inputs["rgb"])
tof_resize.out.link(sync.inputs["depth_aligned"])
sync.inputs["rgb"].setBlocking(False)
# Create output queues
self._detection_queue = self._detection_network.out.createOutputQueue()
self._sync_queue = sync.out.createOutputQueue()