Hello everyone,
I’m facing the following issue:
I’m using three Oak-D Lite FF cameras with the code shown below. The code works as follows:
I initialize my cameras, creating a queue for the RGB image, one for the left stereo image, one for the right stereo image, and one for the point cloud.
I then set up a button.
Whenever the button is pressed, the most recent RGB image, both stereo images, and the point cloud are saved.
Afterward, the code waits for the next button press and repeats the process.
One key requirement is that the RGB image must be captured in 4K resolution. The point cloud can be smaller, but its resolution must be defined.
I’ve now discovered that the RGB images are about 800 ms behind (“in the past”) when I capture them. The stereo images, on the other hand, are perfectly up to date at the time of the button click. If I run the same script on my laptop, everything works fine. However, on a Raspberry Pi, this 800 ms offset appears.
What could be causing this delay? Am I building the pipeline incorrectly, or am I synchronizing incorrectly? If so, wouldn’t that also affect the laptop? I’m using Y-adapters for power, so a power supply issue should be ruled out.
I set the queue length to 1 in hopes that I would always receive the most current image.
Does anyone have an idea how to fix this 800 ms delay for the RGB images?
Thank you in advance for your help!
import sys
import os
import cv2
import depthai as dai
import contextlib
from datetime import datetime
import json
import open3d as o3d
import numpy as np
import logging
import os
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from gpiozero import Button
log_filename = "logs/application.log"
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)
logger = logging.getLogger(__name__)
logger.addHandler(file_handler)
class Sorter_Image_Collector:
def __init__(self):
self.take_picture_event = False
self.output_dir = "output2"
self.button = Button(pin=4, pull_up=True, bounce_time=0.2)
logger.info(
"Sorter_Image_Collector initialized with output directory set to 'output2'."
)
def wait_for_trigger(self):
"""
Waits for the button to be pressed to trigger the event.
"""
logger.info("Waiting for button press to trigger the event.")
self.button.wait_for_press()
logger.info("Button pressed, triggering event.")
self.take_picture_event = True
def save_camera_parameters_to_json(self, device, id, timestamp):
logger.info(
"Saving camera parameters for device ID %d at timestamp %s", id, timestamp
)
try:
# Read device calibration data
calibData = device.readCalibration()
# Intrinsic parameters and distortion coefficients for the RGB camera
M_rgb = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_A)
D_rgb = calibData.getDistortionCoefficients(dai.CameraBoardSocket.CAM_A)
# Intrinsic parameters and distortion coefficients for the left and right cameras
M_left = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_B)
D_left = calibData.getDistortionCoefficients(dai.CameraBoardSocket.CAM_B)
M_right = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_C)
D_right = calibData.getDistortionCoefficients(dai.CameraBoardSocket.CAM_C)
# Stereo rectification rotation matrices
R_left = (
calibData.getStereoLeftRectificationRotation()
) # Rotation-Matrix for the left camera
R_right = (
calibData.getStereoRightRectificationRotation()
) # Rotation-Matrix for the right camera
# Extrinsics between the left and right cameras and between the left camera and the RGB camera
extrinsics_left_right = calibData.getCameraExtrinsics(
dai.CameraBoardSocket.CAM_B, dai.CameraBoardSocket.CAM_C
)
extrinsics_left_rgb = calibData.getCameraExtrinsics(
dai.CameraBoardSocket.CAM_B, dai.CameraBoardSocket.CAM_A
)
# Create a dictionary with the camera parameters
camera_params = {
"RGB_Camera": {
"intrinsics_matrix": M_rgb, # Intrinsic parameters (RGB camera)
"distortion_coefficients": {
"k1": D_rgb[0],
"k2": D_rgb[1],
"p1": D_rgb[2],
"p2": D_rgb[3],
"k3": D_rgb[4],
},
},
"Left_Camera": {
"intrinsics_matrix": M_left, # Intrinsic parameters (Left camera)
"distortion_coefficients": {
"k1": D_left[0],
"k2": D_left[1],
"p1": D_left[2],
"p2": D_left[3],
"k3": D_left[4],
},
"rectification_rotation_matrix": R_left, # Rotation matrix (Rectification)
},
"Right_Camera": {
"intrinsics_matrix": M_right, # Intrinsische Parameter (Rechte Kamera)
"distortion_coefficients": {
"k1": D_right[0],
"k2": D_right[1],
"p1": D_right[2],
"p2": D_right[3],
"k3": D_right[4],
},
"rectification_rotation_matrix": R_right, # Rotation matrix (Rectification)
},
"Extrinsics": {
"Left_to_Right": extrinsics_left_right, # Extrinsic parameters between left and right camera
"Left_to_RGB": extrinsics_left_rgb, # Extrinsic parameters between left camera and RGB camera
},
}
# Define the directory and filename for the JSON file
directory = f"{self.output_dir}/parameter"
filename = f"{directory}/{timestamp}_camera_parameters_{id}.json"
# Create the directory if it doesn't exist
os.makedirs(directory, exist_ok=True)
# Save the camera parameters to a JSON file
with open(filename, "w") as json_file:
json.dump(camera_params, json_file, indent=4)
logger.info("Camera parameters saved to %s", filename)
except Exception as e:
logger.error("Failed to save camera parameters: %s", e)
def create_pipeline(self):
logger.info("Creating pipeline with camera configurations.")
pipeline = dai.Pipeline()
# Erstelle die erforderlichen Nodes
camRgb = pipeline.create(dai.node.ColorCamera)
monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
depth = pipeline.create(dai.node.StereoDepth)
pointcloud = pipeline.create(dai.node.PointCloud)
sync = pipeline.create(dai.node.Sync)
xOut = pipeline.create(dai.node.XLinkOut)
xOut.setStreamName("out")
xOut.input.setBlocking(False)
xOut.input.setQueueSize(1)
# Einstellungen für die Farb-Kamera (4K)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
camRgb.setBoardSocket(dai.CameraBoardSocket.CAM_A)
camRgb.setFps(30)
camRgb.initialControl.setManualFocus(60)
# Einstellungen für die Mono-Kameras
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.CAM_B)
monoLeft.setFps(30)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.CAM_C)
monoRight.setFps(30)
# StereoDepth konfigurieren
depth.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_ACCURACY)
depth.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
depth.setLeftRightCheck(True)
depth.setExtendedDisparity(False)
depth.setSubpixel(True)
# Tiefe an die Farb-Kamera ausrichten (CAM_A)
depth.setDepthAlign(dai.CameraBoardSocket.CAM_A)
# Tiefe soll auf 1920x1080 skaliert werden (z.B. für die Point-Cloud)
depth.setOutputSize(1920, 1080)
# Verknüpfe die Mono-Ausgänge mit dem StereoDepth-Node
monoLeft.out.link(depth.left)
monoRight.out.link(depth.right)
# Verknüpfe die Tiefen-Ausgabe mit dem PointCloud-Node
depth.depth.link(pointcloud.inputDepth)
# Falls du nur die synchronisierten Outputs (RGB + PCL) ausgeben willst:
camRgb.isp.link(sync.inputs["rgb"])
pointcloud.outputPointCloud.link(sync.inputs["pcl"])
sync.out.link(xOut.input)
# Zusätzliche Ausgänge für Debug/Monitoring der Mono-Kameras
leftQueue = pipeline.create(dai.node.XLinkOut)
leftQueue.setStreamName("left")
monoLeft.out.link(leftQueue.input)
rightQueue = pipeline.create(dai.node.XLinkOut)
rightQueue.setStreamName("right")
monoRight.out.link(rightQueue.input)
logger.info("Pipeline created successfully.")
return pipeline
def run(self):
"""
Executes the main process for device connection, data acquisition, and storage.
"""
logger.info(
"Starting main process for device connection, data acquisition, and storage."
)
# Connect to devices using ExitStack for auto-closing
with contextlib.ExitStack() as stack:
device_infos = dai.Device.getAllAvailableDevices()
devices, rgb_queues, left_queues, right_queues = self.initialize_devices(
device_infos, stack
)
while True:
self.wait_for_trigger()
if self.take_picture_event:
time.sleep(0.8)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
frames = self.capture_frames(rgb_queues, left_queues, right_queues)
self.save_all_frames(frames, timestamp)
self.save_all_parameters(devices, timestamp)
self.take_picture_event = False
logger.info(f"Picture taken and saved at timestamp {timestamp}")
def initialize_devices(self, device_infos, stack):
devices, rgb_queues, left_queues, right_queues = [], [], [], []
for device_info in device_infos:
try:
device = stack.enter_context(
dai.Device(
dai.OpenVINO.Version.VERSION_2021_4,
device_info,
dai.UsbSpeed.SUPER,
)
)
devices.append(device)
logger.info(f"Connected to device {device_info.getMxId()}")
pipeline = self.create_pipeline()
device.startPipeline(pipeline)
rgb_queues.append(
(device.getOutputQueue("out", 1, False), "RGB_" + device.getMxId())
)
left_queues.append(
(
device.getOutputQueue("left", 1, False),
"Stereo_Left_" + device.getMxId(),
)
)
right_queues.append(
(
device.getOutputQueue("right", 1, False),
"Stereo_Right_" + device.getMxId(),
)
)
except Exception as e:
logger.error(f"Error initializing device {device_info.getMxId()}: {e}")
return devices, rgb_queues, left_queues, right_queues
def capture_frames(self, rgb_queues, left_queues, right_queues):
frames = {"rgb": {}, "left": {}, "right": {}, "pointcloud": {}}
for q_rgb, stream_name in rgb_queues:
messages = q_rgb.tryGetAll()
if messages:
newest = messages[-1]
frames["rgb"][stream_name] = newest["rgb"].getCvFrame()
frames["pointcloud"][stream_name] = newest["pcl"]
for q, stream_name in left_queues:
messages = q.tryGetAll()
if messages:
newest = messages[-1]
frames["left"][stream_name] = newest.getCvFrame()
for q, stream_name in right_queues:
messages = q.tryGetAll()
if messages:
newest = messages[-1]
frames["right"][stream_name] = newest.getCvFrame()
return frames
def save_all_frames(self, frames, timestamp):
def save_frame(frame, dir_name, file_name):
os.makedirs(dir_name, exist_ok=True)
cv2.imwrite(file_name, frame)
logger.info(f"Saved frame to {file_name}")
for stream_name, rgb_frame in frames["rgb"].items():
save_frame(
rgb_frame,
f"{self.output_dir}/images/rgb",
f"{self.output_dir}/images/rgb/{timestamp}_{stream_name}.jpg",
)
if stream_name in frames["pointcloud"]:
self.save_pointcloud(
frames["pointcloud"][stream_name], rgb_frame, stream_name, timestamp
)
for stream_name, left_frame in frames["left"].items():
save_frame(
left_frame,
f"{self.output_dir}/images/stereo_left",
f"{self.output_dir}/images/stereo_left/{timestamp}_{stream_name}.jpg",
)
for stream_name, right_frame in frames["right"].items():
save_frame(
right_frame,
f"{self.output_dir}/images/stereo_right",
f"{self.output_dir}/images/stereo_right/{timestamp}_{stream_name}.jpg",
)
def save_pointcloud(self, pointcloud, rgb_frame, stream_name, timestamp):
if pointcloud:
# Skalieren des RGB-Bildes auf die Pointcloud-Auflösung (1280x720)
rgb_resized = cv2.resize(rgb_frame, (1920, 1080))
# Punktkoordinaten aus der Pointcloud extrahieren und normalisieren
points = pointcloud.getPoints().astype(np.float64) / 1000.0
distances = np.linalg.norm(points, axis=1)
valid_indices = distances < 1.0
# Farben vom skalierten RGB-Bild holen
colors = cv2.cvtColor(rgb_resized, cv2.COLOR_BGR2RGB).reshape(-1, 3)
colors = colors[: len(points)][valid_indices]
# Nur gültige Punkte und Farben auswählen
points = points[valid_indices]
if len(points) > 0:
# Pointcloud-Objekt erstellen und Punkte & Farben setzen
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)
pcd.colors = o3d.utility.Vector3dVector(colors / 255.0)
# Verzeichnis erstellen und Pointcloud speichern
os.makedirs(f"{self.output_dir}/pointcloud", exist_ok=True)
filename = f"{self.output_dir}/pointcloud/{timestamp}_{stream_name}.ply"
o3d.io.write_point_cloud(filename, pcd)
logger.info(f"Pointcloud saved to {filename}")
def save_all_parameters(self, devices, timestamp):
for i, device in enumerate(devices):
try:
self.save_camera_parameters_to_json(
device=device, id=i, timestamp=timestamp
)
logger.info(
f"Camera parameters saved for device {i} at timestamp {timestamp}"
)
except Exception as e:
logger.error(f"Error saving camera parameters for device {i}: {e}")
def main():
"""
Entry point for the Sorter Image Collector application.
This function initializes the Sorter_Image_Collector application and starts its execution.
"""
try:
logger.info("Starting Sorter Image Collector application.")
app = Sorter_Image_Collector() # Create Sorter_Image_Collector object
app.run() # Run Sorter_Image_Collector application
logger.info("Sorter Image Collector application finished.")
except RuntimeError as e:
print(f"Fehler aufgetreten: {e}")
if "X_LINK_ERROR" in str(e) or "Communication exception" in str(e):
print("Kameraabsturz erkannt. Anwendung wird neu gestartet...")
time.sleep(5) # Kurze Pause vor dem Neustart
os.execl(sys.executable, sys.executable, *sys.argv)
if __name__ == "__main__":
main() # Call main function