Hey @erik @jakaskerl ,
Some updates on our end.
Currently, we are actively using 13 cameras and we’re still facing the issue of ping was missed
at two points of our docker application:
When we are starting our docker application.
In the middle of the week
As our solution is deployed on prem, we check on it every week. During our checks we discovered that a few cameras had ping was missed
which cause our docker application to hang
As an example, we started our docker service at 07/10/2024
and the ping was missed
showed up at 09/10/2024
causing our service to hang
Here are some troubleshooting steps we’ve tried:
We’ve used a few scripts from your repository for us to obtain some metrics, this enabled us to benchmark against the desired result shown below:
Benchmark
Scripts
Poe Link test result
Here’s a sample result of a few cameras
# Cam 1
Connecting to 10.0.0.11 ...
mxid: 184430101110E8F400 (OK)
speed: 1000 (OK)
full duplex: 1 (OK)
boot mode: 3 (OK)
# Cam 3
Connecting to 10.0.0.13 ...
mxid: 18443010F16EE6F400 (OK)
speed: 1000 (OK)
full duplex: 1 (OK)
boot mode: 3 (OK)
# Cam 6
Connecting to 10.0.0.16 ...
mxid: 18443010517BE7F400 (OK)
speed: 1000 (OK)
full duplex: 1 (OK)
boot mode: 3 (OK)
As seen, our link test managed to hit the benchmark which requires a speed of 1000 Mbps
OAK bandwidth test result
Here’s a sample result of a few camera:
# Cam 1
Downlink 883.4 mbps
Uplink 219.4 mbps
# Cam 3
Downlink 890.0 mbps
Uplink 218.3 mbps
# Cam 6
Downlink 890.6 mbps
Uplink 218.4 mbps
As seen, our downlink and uplink are within the acceptable range of 800 Mbps uplink & 200 Mbps downlink
OAK Latency test
Here’s a sample result of a few cameras:
# Cam 1
Average latency 2.48 ms, Std: 0.4
# Cam 3
Average latency 2.73 ms, Std: 0.3
# Cam 6
Average latency 3.06 ms, Std: 0.3
As seen, our average latency across the camera is less than 10ms which is required to pass the benchmark
After running the test, we can conclude that we’re not limited by our network equipment as the resulting latency, bandwidth and link speed are within the benchmark range stated above.
Additionally, we can confirm that enough power is supplied to the cameras based on the following
As seen from the image, each port can deliver up to 30W to the cameras and each of it is only consuming up to 4~5W which is sufficient for the cameras to function
CPU Usage of cameras
To ensure that our pipeline isn’t too complex which can cause a high CPU usage of the LeonOS and potentially lead to a ping was missed
error, we’ve enabled logging on our setup through DEPTHAI_LEVEL=info
to observe the CPU usage.
The result shows that our pipeline is only consuming up to 12% of the CPU . Proving that our pipeline isn’t complex enough to cause a high CPU usage.
Allowing longer boot time for the cameras
On the debugging guide, a section indicates that some network equipment might not work well with the default timeout they’ve set.
To ensure that there’s enough time for the camera to boot & the watchdog doesn’t disconnect too early, we’ve introduced two environment variables:
And set it’s value to 60000 (60s), allowing more time for camera to ping the server.
However, this didn’t work as ping was missed
error still occured upon boot
Additional Context
Based on our above troubleshooting steps, we’re still facing the ping was missed
issue. Hence, we’re providing our hardware architecture to provide a better overview of our setup
Lastly, here’s a Minimal Reproducible Example (MRE) that you guys can run on your end to simulate our environment:
import os
import re
import cv2
import math
import time
import yaml
import numpy as np
import socket
#os.environ["DEPTHAI_LEVEL"] = "debug"
import depthai as dai
from collections import deque
from threading import Thread
from datetime import datetime, timedelta
from loguru import logger
from typing import List, Any
from pydantic import BaseModel
class Frame(BaseModel):
has_rgb: bool
has_depth: bool
rgb_frame: Any = None
depth_frame: Any = None
timestamp: timedelta
class OakDProPOE(Thread):
def __init__(
self,
buffer: deque,
mxid: str,
server_ip: str,
server_port: int,
fps: int = 30,
):
super().__init__()
self.fps = fps
self.buffer = buffer
self.recording = False
self.ip = server_ip
self.port = server_port
self.mxid = mxid
self.info = self.lookup_camera(mxid)
self.recording = False
self.connection: socket.socket = None
def lookup_camera(self, mxid: str):
for i in range(3):
ret, info = dai.Device.getDeviceByMxId(mxid)
if ret:
self.mxid = mxid
return info
else:
raise Exception(f"{mxid} not found!")
def start_camera(self):
self.recording = True
def stop_camera(self):
if self.connection:
self.connection.close()
self.recording = False
def poe_pipeline(self):
logger.info(f"{self.mxid} pipeline initiated")
pipeline = dai.Pipeline()
camRgb = pipeline.createColorCamera()
camRgb.setIspScale(2,3)
videoEnc = pipeline.create(dai.node.VideoEncoder)
videoEnc.setDefaultProfilePreset(30, dai.VideoEncoderProperties.Profile.MJPEG)
camRgb.video.link(videoEnc.input)
script = pipeline.create(dai.node.Script)
script.setProcessor(dai.ProcessorType.LEON_CSS)
videoEnc.bitstream.link(script.inputs['frame'])
script.setScript(
f"""
HOST_IP = '{self.ip}'
HOST_PORT = {self.port}
import socket
import time
node.warn(f'>Going to connect to {{HOST_IP}}:{{HOST_PORT}}<')
sock = socket.socket()
sock.connect((HOST_IP, HOST_PORT))
while True:
pck = node.io["frame"].get()
data = pck.getData()
ts = pck.getTimestamp()
header = f"ABCDE " + str(ts.total_seconds()).ljust(18) + str(len(data)).ljust(8)
sock.send(bytes(header, encoding='ascii'))
sock.send(data)
"""
)
logger.info(self.mxid)
device_info = dai.DeviceInfo(self.mxid)
logger.info(device_info)
try:
with dai.Device(pipeline, device_info) as device:
logger.info(f"Pipeline running on {self.mxid}")
while True:
time.sleep(1)
except Exception as e:
logger.error(e)
raise
def run(self):
def get_frame(socket, size):
bytes = socket.recv(4096)
while True:
read = 4096
if size - len(bytes) < read:
read = size - len(bytes)
bytes += socket.recv(read)
if size == len(bytes):
return bytes
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", self.port))
cam_thread = Thread(target=self.poe_pipeline)
cam_thread.daemon = True
logger.info("Pipeline started")
server.listen()
logger.info("Starting cam thread")
cam_thread.start()
self.connection, client = server.accept()
try:
logger.info(f"{self.mxid} connected")
while self.recording:
header = str(self.connection.recv(32), encoding="ascii")
chunks = re.split(" +", header)
if chunks[0] == "ABCDE":
# print(f">{header}<")
ts = float(chunks[1])
imgSize = int(chunks[2])
img = get_frame(self.connection, imgSize)
buf = np.frombuffer(img, dtype=np.byte)
# print(buf.shape, buf.size)
frame = cv2.imdecode(buf, cv2.IMREAD_COLOR)
frame = Frame(
has_rgb=True,
has_depth=True,
rgb_frame=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB),
depth_frame=None,
timestamp=timedelta(seconds=ts),
)
self.buffer.append(frame)
except Exception as e:
# TODO: Handle Server Error
logger.error(f"{self.mxid}: " + str(e))
raise
server.close()
class Controller:
def __init__(self, yaml_file: str, fps: int, server_ip: str):
self.fps = fps
self.sync_threshold = timedelta(milliseconds=math.ceil(5000 / self.fps))
self.server_ip = server_ip
self.load_camera_mapping_file(yaml_file)
def load_camera_mapping_file(self, yaml_file):
logger.info(f"Controller reading yaml file: {yaml_file}")
with open(yaml_file) as f:
self.camera_mapping = yaml.load(f, Loader=yaml.FullLoader)
# Display the yaml config
logger.info(
yaml.dump(self.camera_mapping, default_flow_style=False, sort_keys=False)
)
camera_type = self.camera_mapping["CameraType"]
self.active_cameras = {}
for cam in self.camera_mapping["ActiveCameras"]:
# Create a buffer for the camera
buffer = deque([], maxlen=self.fps * 50) # buffer holds up to 10 seconds worth of frames
main_camera_key = cam["main_camera"]["camera_id"]
if camera_type == "OakDProPoE":
main_camera = OakDProPOE(
fps=self.fps,
buffer=buffer,
mxid=cam["main_camera"]["mxid"],
server_ip=self.server_ip,
server_port=cam["main_camera"]["server_port"],
)
else:
raise Exception(f"Camera type {camera_type} is invalid!")
self.active_cameras[main_camera_key] = {
"buffer": buffer,
"synced_buffer": deque([], maxlen=500),
"main_camera": main_camera,
}
def get_active_cameras(self):
return self.active_cameras
def start_cameras(self):
for cam_idx in self.active_cameras:
camera = self.active_cameras[cam_idx]["main_camera"]
camera.daemon = True
camera.start_camera()
camera.start()
logger.info("Cameras started")
def stop_cameras(self):
logger.info("Controller stopping cameras")
for cam_idx in self.active_cameras:
camera = self.active_cameras[cam_idx]["main_camera"]
camera.stop_camera()
camera.join()
logger.info("Cameras stopped")
def check_sync(self, timestamp: timedelta):
matching_frame_indexes = []
# Try to find matching frame in each queue
for active_cam in self.active_cameras.values():
for i, frame in enumerate(active_cam["synced_buffer"]):
time_diff = abs(frame.timestamp - timestamp)
if time_diff <= self.sync_threshold:
# We now have the synced frame index for this particular camera
matching_frame_indexes.append(i)
break
# When synced frames are found, clear all unused/out-of-sync frames
if len(matching_frame_indexes) == len(self.active_cameras):
for i, q in enumerate(self.active_cameras.values()):
for j in range(0, matching_frame_indexes[i]):
q["synced_buffer"].popleft()
return True
else:
return False
def get_synced_frames(self) -> dict[int, Frame]:
# Iterate to try and get new frame from any buffer
start = time.perf_counter()
for cam in self.active_cameras:
if self.active_cameras[cam]["buffer"]:
# Get the frame from camera's buffer to controller's sync buffer
# this ensures we can synchronously process the frames in sync buffer without worrying about thread safety from the camera buffer
frame = self.active_cameras[cam]["buffer"].popleft()
self.active_cameras[cam]["synced_buffer"].append(frame)
# Check sync to see if we have a group of synchronized frames across all sync buffers
if self.check_sync(frame.timestamp):
data = {}
for cam in self.active_cameras:
data[cam] = self.active_cameras[cam]["synced_buffer"].popleft()
logger.debug(f"Sync took {time.perf_counter() - start} seconds")
start = time.perf_counter()
return data
if __name__ == "__main__":
controller = Controller(
yaml_file="configs/camera_mapping_office.yaml",
fps=5,
server_ip="192.168.1.236"
)
controller.start_cameras()
while True:
synced_frame = controller.get_synced_frames()
To run the MRE you’d need to install the requirements in from the import statement of the file and have a camera-config.yaml
file which reflects the configuration of the cameras.
Here’s how a sample camera camera-config.yaml
looks like:
CameraType: OakDProPoE
ActiveCameras:
- main_camera:
camera_id: 25
mxid: 18443010117CE9F400
server_port: 10025
backup:
- main_camera:
camera_id: 26
mxid: 18443010D1F3E7F400
server_port: 10026
backup:
- main_camera:
camera_id: 27
mxid: 18443010C1D6E1F400
server_port: 10027
backup:
- main_camera:
camera_id: 28
mxid: 19443010C172801300
server_port: 10028
backup:
Some notes on the camera-config.yaml
On the actual file, it should contain the configuration for 13 cameras
camera_id
& backup:
is not required
Each mxid
should contain a unique camera mxid
It’d be filled in at line 303 of mre.py
:
Here’s how a potential directory structure can look like:
.
└── (some-folder)/
├── mre.py
├── camera-config.yaml
└── requirements.txt