#!/usr/bin/env python3
import argparse
import collections
import os
import signal
import sys
import contextlib
import math
import time
from collections import deque
from pathlib import Path
from typing import Optional
from datetime import timedelta
import numpy as np
import cv2
import depthai as dai
import psutil
from colorama import just_fix_windows_console
from rich.console import Console, ConsoleOptions, ConsoleRenderable, RenderResult
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.prompt import IntPrompt
from rich.text import Text
from rich_argparse import RichHelpFormatter
from stress_test import stress_test, YOLO_LABELS, create_yolo
SYNC =1
just_fix_windows_console()
if globals().get("compiled"):
image_path = Path(sys.argv[0]).parent.joinpath("save_img_data")
elif getattr(sys, "frozen", False):
image_path = Path(sys.executable).joinpath("save_img_data")
else:
image_path = Path(file).parent.joinpath("save_img_data")
image_path.mkdir(parents=True, exist_ok=True)
socket_pair = {
"rgb": "rgb",
"left": "left",
"right": "right",
"cama": "rgb",
"camb": "left",
"camc": "right",
"camd": "camd",
"came": "came",
}
def socket_type_pair(arg):
socket, type_ = arg.split(",")
if socket not in socket_pair:
msg = f"{socket} is not support yet"
raise ValueError(msg)
socket = socket_pair[socket]
if type_ not in {"m", "mono", "c", "color", "t", "tof", "th", "thermal"}:
msg = f"{type} is not support yet"
raise ValueError(msg)
is_color = type in {"c", "color"}
is_tof = type_ in {"t", "tof"}
is_thermal = type_ in {"th", "thermal"}
return [socket, is_color, is_tof, is_thermal]
parser = argparse.ArgumentParser(formatter_class=RichHelpFormatter, add_help=False)
parser.add_argument(
"-cams",
"--cameras",
type=socket_type_pair,
nargs="+",
default=[],
help="Which camera sockets to enable, and type: c[olor] / m[ono] / t[of] / th[ermal]. "
"E.g: -cams rgb,m right,c . If not specified, all connected cameras will be used.",
)
parser.add_argument(
"-mres",
"--mono-resolution",
type=str,
default="800p",
choices={"480p", "400p", "720p", "800p", "1200p"},
help="Select mono camera resolution (height). Default: %(default)s",
)
parser.add_argument(
"-cres",
"--color-resolution",
default="1080p",
choices={
"720p",
"800p",
"1080p",
"1200p",
"4k",
"5mp",
"12mp",
"13mp",
"48mp",
"1352X1012",
"1440X1080",
"2024X1520",
"4000X3000",
"5312X6000",
},
help="Select color camera resolution / height. Default: %(default)s",
)
parser.add_argument(
"-rot",
"--rotate",
const="all",
choices={"all", "rgb", "mono"},
nargs="?",
help="Which cameras to rotate 180 degrees. All if not filtered",
)
parser.add_argument("-isp3afps", "--isp3afps", type=int, default=0, help="3A FPS to set for all cameras")
parser.add_argument("-ds", "--isp-downscale", default=1, type=int, help="Downscale the ISP output by this factor")
parser.add_argument(
"-rs",
"--resizable-windows",
action="store_true",
help="Make OpenCV windows resizable. Note: may introduce some artifacts",
)
parser.add_argument("-tun", "--camera-tuning", type=Path, help="Path to custom camera tuning database")
parser.add_argument("-raw", "--enable-raw", default=False, action="store_true", help="Enable the RAW camera streams")
parser.add_argument(
"-tofraw", "--tof-raw", action="store_true", help="Show just ToF raw output instead of post-processed depth"
)
parser.add_argument(
"-tofamp", "--tof-amplitude", action="store_true", help="Show also ToF amplitude output alongside depth"
)
parser.add_argument(
"-tofcm", "--tof-cm", action="store_true", help="Show ToF depth output in centimeters, capped to 255"
)
parser.add_argument(
"-tofmedian", "--tof-median", choices=[0, 3, 5, 7], default=5, type=int, help="ToF median filter kernel size"
)
parser.add_argument(
"-show",
"--show-meta",
action="store_true",
help=r"List frame metadata (seqno, timestamp, exp, iso etc). Can also toggle with \
",
)
parser.add_argument(
"-rgbprev", "--rgb-preview", action="store_true", help="Show RGB preview
stream instead of full size isp
"
)
parser.add_argument("-d", "--device", default="", type=str, help="Optional MX ID of the device to connect to.")
parser.add_argument(
"-ctimeout",
"--connection-timeout",
default=30000,
help="Connection timeout in ms. Default: %(default)s (sets DEPTHAI_CONNECTION_TIMEOUT environment variable)",
)
parser.add_argument(
"-btimeout",
"--boot-timeout",
default=30000,
help="Boot timeout in ms. Default: %(default)s (sets DEPTHAI_BOOT_TIMEOUT environment variable)",
)
parser.add_argument(
"-stress",
action="store_true",
help="Run stress test. This will override all other options (except -d/--device) and will run a heavy pipeline until the user stops it.",
)
parser.add_argument(
"-stereo", action="store_true", default=False, help="Create a stereo depth node if the device has a stereo pair."
)
parser.add_argument(
"-yolo",
type=str,
default="",
help=f"Create a yolo detection network on the specified camera. E.g: -yolo cama. Available cameras: {socket_pair.keys()}",
)
parser.add_argument("-gui", action="store_true", help="Use GUI instead of CLI")
parser.add_argument(
"-h", "--help", action="store_true", default=False, help="Show this help message and exit"
) # So you can forward --help to stress test, without it being consumed by cam_test.py
parser.add_argument(
"-usbs", "--usbSpeed", type=str, default="usb3", choices=["usb2", "usb3"], help="Force USB communication speed."
)
parser.add_argument(
"-fps", "--fps", type=float, default=30, help="Camera sensor FPS, applied to all cams"
)
args = parser.parse_args()
Set timeouts before importing depthai
os.environ["DEPTHAI_CONNECTION_TIMEOUT"] = str(args.connection_timeout)
os.environ["DEPTHAI_BOOT_TIMEOUT"] = str(args.boot_timeout)
if args.stress:
stress_test(args.device)
exit(0)
if args.help:
parser.print_help()
exit(0)
if args.gui:
import cam_test_gui
import cam_test_gui_grid_user as cam_test_gui
cam_test_gui.main()
cam_socket_opts = {
"rgb": dai.CameraBoardSocket.CAM_A,
"left": dai.CameraBoardSocket.CAM_B,
"right": dai.CameraBoardSocket.CAM_C,
"cama": dai.CameraBoardSocket.CAM_A,
"camb": dai.CameraBoardSocket.CAM_B,
"camc": dai.CameraBoardSocket.CAM_C,
"camd": dai.CameraBoardSocket.CAM_D,
}
socket_name_map = {
"rgb": "rgb",
"left": "left",
"right": "right",
"cam_a": "rgb",
"cam_b": "left",
"cam_c": "right",
"cam_d": "camd",
}
rotate = {
"rgb": args.rotate in {"all", "rgb"},
"left": args.rotate in {"all", "mono"},
"right": args.rotate in {"all", "mono"},
"cama": args.rotate in {"all", "rgb"},
"camb": args.rotate in {"all", "mono"},
"camc": args.rotate in {"all", "mono"},
"camd": args.rotate in {"all", "rgb"},
}
mono_res_opts = {
"400p": dai.MonoCameraProperties.SensorResolution.THE_400_P,
"480p": dai.MonoCameraProperties.SensorResolution.THE_480_P,
"720p": dai.MonoCameraProperties.SensorResolution.THE_720_P,
"800p": dai.MonoCameraProperties.SensorResolution.THE_800_P,
"1200p": dai.MonoCameraProperties.SensorResolution.THE_1200_P,
}
color_res_opts = {
"720p": dai.ColorCameraProperties.SensorResolution.THE_720_P,
"800p": dai.ColorCameraProperties.SensorResolution.THE_800_P,
"1080p": dai.ColorCameraProperties.SensorResolution.THE_1080_P,
"1200p": dai.ColorCameraProperties.SensorResolution.THE_1200_P,
"4k": dai.ColorCameraProperties.SensorResolution.THE_4_K,
"5mp": dai.ColorCameraProperties.SensorResolution.THE_5_MP,
"12mp": dai.ColorCameraProperties.SensorResolution.THE_12_MP,
"13mp": dai.ColorCameraProperties.SensorResolution.THE_13_MP,
"48mp": dai.ColorCameraProperties.SensorResolution.THE_48_MP,
"1352X1012": dai.ColorCameraProperties.SensorResolution.THE_1352X1012,
"1440X1080": dai.ColorCameraProperties.SensorResolution.THE_1440X1080,
"2024X1520": dai.ColorCameraProperties.SensorResolution.THE_2024X1520,
"4000X3000": dai.ColorCameraProperties.SensorResolution.THE_4000X3000,
"5312X6000": dai.ColorCameraProperties.SensorResolution.THE_5312X6000,
}
def clamp(num, v0, v1):
return max(v0, min(num, v1))
Calculates FPS over a moving window, configurable
class FPS:
def init(self, window_size=30):
if not isinstance(window_size, int) or window_size <= 0:
msg = "Window size must be a positive integer"
raise ValueError(msg)
self.dq = collections.deque(maxlen=window_size)
self.fps = 0
def update(self, timestamp=None):
if timestamp is None:
timestamp = time.monotonic()
elif not (isinstance(timestamp, (int, float))) or timestamp < 0:
msg = "Timestamp must be a positive number"
raise ValueError(msg)
self.dq.append(timestamp)
count = len(self.dq)
if count > 1:
elapsed_time = self.dq[-1] - self.dq[0]
self.fps = count / elapsed_time
else:
self.fps = 0
def get(self):
return self.fps
class Cycle:
def init(self, enum_type, start_item=None):
self.items = [item for name, item in vars(enum_type).items() if name.isupper()]
If start_item is provided, set the index to its position. Otherwise, default to 0
self.index = self.items.index(start_item) if start_item else 0
def step(self, n):
self.index = (self.index + n) % len(self.items)
return self.items[self.index]
def next(self):
return self.step(1)
def prev(self):
return self.step(-1)
def exit_cleanly(signum, frame):
print("Exiting cleanly")
cv2.destroyAllWindows()
sys.exit(0)
def socket_to_socket_opt(socket: dai.CameraBoardSocket) -> str:
return str(socket).split(".")[-1].replace("_", "").lower()
signal.signal(signal.SIGINT, exit_cleanly)
cam_socket_opts = {
"CAM_A": dai.CameraBoardSocket.CAM_A,
"CAM_B": dai.CameraBoardSocket.CAM_B,
"CAM_C": dai.CameraBoardSocket.CAM_C,
"CAM_D": dai.CameraBoardSocket.CAM_D,
}
controls = []
control_queues = {}
def create_pipeline(cam_list):
Start defining a pipeline
pipeline = dai.Pipeline()
control = pipeline.createXLinkIn()
control.setStreamName("control"+mxid)
controls.append(control)
sysInfo = pipeline.create(dai.node.SystemLogger)
linkOut = pipeline.create(dai.node.XLinkOut)
linkOut.setStreamName("sysinfo")
sysInfo.out.link(linkOut.input)
sysInfo.setRate(args.fps)
cam = {}
xout = {}
for c in cam_list:
xout[c] = pipeline.create(dai.node.XLinkOut)
xout[c].setStreamName(c)
cam[c] = pipeline.create(dai.node.ColorCamera)
cam[c].setResolution(color_res_opts[args.color_resolution])
# cam[c].setIspScale(1, 3) # 1920x1080 -> 1280x720
cam[c].video.link(xout[c].input)
cam[c].setBoardSocket(cam_socket_opts[c])
cam[c].setFps(args.fps)
control.out.link(cam[c].inputControl)
cam[c].setBoardSocket(cam_socket_opts[c])
return pipeline
def clamp(num, v0, v1):
return max(v0, min(num, v1))
class Cycle:
def init(self, enum_type, start_item=None):
self.items = [item for name, item in vars(enum_type).items() if name.isupper()]
If start_item is provided, set the index to its position. Otherwise, default to 0
self.index = self.items.index(start_item) if start_item else 0
def step(self, n):
self.index = (self.index + n) % len(self.items)
return self.items[self.index]
def next(self):
return self.step(1)
def prev(self):
return self.step(-1)
def draw_text(
frame,
text,
org,
color=(255, 255, 255),
bg_color=(128, 128, 128),
font_scale=0.5,
thickness=1,
):
cv2.putText(
frame,
text,
org,
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
bg_color,
thickness + 3,
cv2.LINE_AA,
)
cv2.putText(
frame,
text,
org,
cv2.FONT_HERSHEY_SIMPLEX,
font_scale,
color,
thickness,
cv2.LINE_AA,
)
class FPSHandler:
"""
Class that handles all FPS-related operations.
Mostly used to calculate different streams FPS, but can also be
used to feed the video file based on its FPS property, not app performance (this prevents the video from being sent
to quickly if we finish processing a frame earlier than the next video frame should be consumed)
"""
_fps_bg_color = (0, 0, 0)
_fps_color = (255, 255, 255)
_fps_type = cv2.FONT_HERSHEY_SIMPLEX
_fps_line_type = cv2.LINE_AA
def __init__(self, cap=None, max_ticks=100):
"""
Constructor that initializes the class with a video file object and a maximum ticks amount for FPS calculation
Args:
cap (cv2.VideoCapture, Optional): handler to the video file object
max_ticks (int, Optional): maximum ticks amount for FPS calculation
"""
self._timestamp = None
self._start = None
self._framerate = cap.get(cv2.CAP_PROP_FPS) if cap is not None else None
self._useCamera = cap is None
self._iterCnt = 0
self._ticks = {}
if max_ticks < 2: # noqa: PLR2004
msg = (
f"Proviced max_ticks value must be 2 or higher (supplied: {max_ticks})"
)
raise ValueError(msg)
self._maxTicks = max_ticks
def next_iter(self):
"""Marks the next iteration of the processing loop. Will use `time.sleep` method if initialized with video file object"""
if self._start is None:
self._start = time.monotonic()
if not self._useCamera and self._timestamp is not None:
frame_delay = 1.0 / self._framerate
delay = (self._timestamp + frame_delay) - time.monotonic()
if delay > 0:
time.sleep(delay)
self._timestamp = time.monotonic()
self._iterCnt += 1
def tick(self, name):
"""
Marks a point in time for specified name
Args:
name (str): Specifies timestamp name
"""
if name not in self._ticks:
self._ticks[name] = collections.deque(maxlen=self._maxTicks)
self._ticks[name].append(time.monotonic())
def tick_fps(self, name):
"""
Calculates the FPS based on specified name
Args:
name (str): Specifies timestamps' name
Returns:
float: Calculated FPS or `0.0` (default in case of failure)
"""
if name in self._ticks and len(self._ticks[name]) > 1:
time_diff = self._ticks[name][-1] - self._ticks[name][0]
return (len(self._ticks[name]) - 1) / time_diff if time_diff != 0 else 0.0
return 0.0
def fps(self):
"""
Calculates FPS value based on `nextIter` calls, being the FPS of processing loop
Returns:
float: Calculated FPS or `0.0` (default in case of failure)
"""
if self._start is None or self._timestamp is None:
return 0.0
time_diff = self._timestamp - self._start
return self._iterCnt / time_diff if time_diff != 0 else 0.0
def print_status(self):
"""prints total FPS for all names stored in :func:`tick` calls"""
print("=== TOTAL FPS ===")
for name in self._ticks:
print(f"[{name}]: {self.tick_fps(name):.1f}")
def draw_fps(self, frame, name):
"""
Draws FPS values on requested frame, calculated based on specified name
Args:
frame (numpy.ndarray): Frame object to draw values on
name (str): Specifies timestamps' name
"""
frame_fps = f"{name.upper()} FPS: {round(self.tick_fps(name), 1)}"
# cv2.rectangle(frame, (0, 0), (120, 35), (255, 255, 255), cv2.FILLED)
draw_text(
frame,
frame_fps,
(5, 15),
self._fps_color,
self._fps_bg_color,
)
if "nn" in self._ticks:
draw_text(
frame,
frame_fps,
(5, 30),
self._fps_color,
self._fps_bg_color,
)
with contextlib.ExitStack() as stack:
device_infos = dai.Device.getAllAvailableDevices()
if len(device_infos) == 0:
msg = "No devices found!"
raise RuntimeError(msg)
else:
print("Found", len(device_infos), "devices")
queues = []
mxids = []
out = {}
controlQueues = []
fps_host = {} # FPS computed based on the time we receive frames in app
fps_capt = {} # FPS computed based on capture timestamps from device
save_frame = False
for device_info in device_infos:
# Note: the pipeline isn't set here, as we don't know yet what device it is.
# The extra arguments passed are required by the existing overload variants
openvino_version = dai.OpenVINO.Version.VERSION_2021_4
usb_speed = dai.UsbSpeed.SUPER_PLUS
device = stack.enter_context(
dai.Device(openvino_version, device_info, usb_speed)
)
# Note: currently on POE, DeviceInfo.getMxId() and Device.getMxId() are different!
print("=== Connected to " + device_info.getMxId())
# 获取所有的 mxid
mxid = device.getMxId()
mxids.append(mxid)
# 打印所有的 mxid
for mxid in mxids:
print(mxid)
cameras = device.getConnectedCameras()
usb_speed = device.getUsbSpeed()
print(" >>> MXID:", mxid)
print(" >>> Cameras:", *[c.name for c in cameras])
print(" >>> USB speed:", usb_speed.name)
cam_list = {c.name for c in cameras}
# Get a customized pipeline based on identified device type
device.startPipeline(create_pipeline(cam_list))
# Output queue will be used to get the rgb frames from the output defined above
for cam in cam_list:
queues.append(
{
"queue": device.getOutputQueue(name=cam, maxSize=4, blocking=False),
"msgs": [], # Frame msgs
"mx": device.getMxId(),
"cam": cam,
}
)
input_queue_names = device.getInputQueueNames()
for queue_name in input_queue_names:
print(queue_name)
controlQueue = device.getInputQueue("control" + mxid)
controlQueues.append(controlQueue)
controlQueue1 = controlQueues[0]
controlQueue2 = controlQueues[1]
def check_sync(queues, timestamp):
matching_frames = []
for q in queues:
for i, msg in enumerate(q["msgs"]):
time_diff = abs(msg.getTimestamp() - timestamp)
# So below 17ms @ 30 FPS => frames are in sync
if time_diff <= timedelta(milliseconds=math.ceil(300 / args.fps)):
matching_frames.append(i)
break
if len(matching_frames) == len(queues):
# We have all frames synced. Remove the excess ones
for i, q in enumerate(queues):
q["msgs"] = q["msgs"][matching_frames[i] :]
return True
return False
qSysInfo = device.getOutputQueue(name="sysinfo", maxSize=4, blocking=False)
# Manual exposure/focus set step
EXP_STEP = 500 # us
ISO_STEP = 50
LENS_STEP = 3
DOT_STEP = 100
FLOOD_STEP = 100
DOT_MAX = 1200
FLOOD_MAX = 1500
WB_STEP = 200
# Defaults and limits for manual focus/exposure controls
lensPos = 150
lensMin = 0
lensMax = 255
expTime = 20000
expMin = 1
expMax = 33000
sensIso = 800
sensMin = 100
sensMax = 1600
dotIntensity = 0
floodIntensity = 0
T1 = time.perf_counter()
awb_mode = Cycle(dai.CameraControl.AutoWhiteBalanceMode)
anti_banding_mode = Cycle(dai.CameraControl.AntiBandingMode)
effect_mode = Cycle(dai.CameraControl.EffectMode)
scene_mode = Cycle(dai.CameraControl.SceneMode)
control_mode = Cycle(dai.CameraControl.ControlMode)
capture_intent = Cycle(dai.CameraControl.CaptureIntent)
ae_comp = 0
ae_lock = False
awb_lock = False
saturation = 0
contrast = 0
brightness = 0
sharpness = 0
luma_denoise = 0
chroma_denoise = 0
wbManual = 4000
control = "none"
show = args.show_meta
capture_list: list[str] = []
capture_time = time.strftime("%Y%m%d_%H%M%S")
jet_custom = cv2.applyColorMap(np.arange(256, dtype=np.uint8), cv2.COLORMAP_JET)
jet_custom[0] = [0, 0, 0]
fps_handler = FPSHandler()
for q in queues:
if args.resizable_windows:
cv2.namedWindow(f"{q['cam']} - {q['mx']}", cv2.WINDOW_NORMAL)
cv2.resizeWindow(f"{q['cam']} - {q['mx']}", 1000, 800)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
filename = f"{q['cam']} - {q['mx']}" + '.mp4'
out[f"{q['cam']} - {q['mx']}"] = cv2.VideoWriter(filename, fourcc, 20, (1920, 1080))
fps_host[f"{q['cam']} - {q['mx']}"] = FPS()
fps_capt[f"{q['cam']} - {q['mx']}"] = FPS()
while True:
if SYNC:
for q in queues:
new_msg = q["queue"].tryGet()
if new_msg is not None:
q["msgs"].append(new_msg)
fps_host[f"{q['cam']} - {q['mx']}"].update()
A=fps_host[f"{q['cam']} - {q['mx']}"].get()
#print(fps_host[f"{q['cam']} - {q['mx']}"])
fps_capt[f"{q['cam']} - {q['mx']}"].update(new_msg.getTimestamp().total_seconds())
B = fps_capt[f"{q['cam']} - {q['mx']}"].get()
#print(fps_capt[f"{q['cam']} - {q['mx']}"])
if check_sync(queues, new_msg.getTimestamp()):
#print("=" * 50)
for q in queues:
#print("Press V to start saving frames and B to stop and exit...")
fps_handler.tick(f"{q['cam']} - {q['mx']}")
pkg = q["msgs"].pop(0)
frame = pkg.getCvFrame()
#print(frame)
fps_handler.draw_fps(frame, f"{q['cam']} - {q['mx']}")
draw_text(frame, f"{pkg.getTimestamp()}", (5, 45))
cv2.imshow(f"{q['cam']} - {q['mx']}", frame)
#print(f"{q['cam']} - {q['mx']}: {pkg.getTimestamp()}")
width, height = new_msg.getWidth(), new_msg.getHeight()
if save_frame:
out[f"{q['cam']} - {q['mx']}"].write(frame)
if show:
txt = f"[{q['cam']} - {q['mx']:5}, {pkg.getSequenceNum():4}, {pkg.getTimestamp().total_seconds():.6f}] "
txt += f"Exp: {pkg.getExposureTime().total_seconds() * 1000:6.3f} ms, "
txt += f"ISO: {pkg.getSensitivity():4}, "
txt += f"Color temp: {pkg.getColorTemperature()} K"
print(txt)
capture = f"{q['cam']} - {q['mx']}" in capture_list
if capture:
capture_file_info = image_path.joinpath(
"capture_"
+ f"{q['cam']} - {q['mx']}"
+ "_"
+ str(width)
+ "x"
+ str(height)
+ "_"
+ capture_time
+ "_exp_"
+ str(int(pkg.getExposureTime().total_seconds() * 1e6))
+ "_iso_"
+ str(pkg.getSensitivity())
+ "_"
+ str(pkg.getColorTemperature())
+ "K"
+ "_"
+ str(pkg.getSequenceNum())
).as_posix()
capture_list.remove(f"{q['cam']} - {q['mx']}")
if capture:
filename = capture_file_info + ".png"
print("Saving: ", filename)
cv2.imencode(".png", frame)[1].tofile(filename)
# cv2.imwrite(filename, frame)
sysInfo = qSysInfo.get() # Blocking call, will wait until new data has arrived
t = sysInfo.chipTemperature
pid = os.getpid()
p = psutil.Process(pid)
with p.oneshot():
print(
f"\rFPS: {' '.join([f'{A:>6.2f}|{B:<6.2f}' for q in queues])} "
f"\naverage: {t.average:.2f}°C "
f"css: {t.css:.2f}°C "
f"mss: {t.mss:.2f}°C "
f"upa: {t.upa:.2f}°C "
f"dss: {t.dss:.2f}°C "
f"\ntime: {time.strftime('%H:%M:%S', time.gmtime(time.perf_counter() - T1))} ",
f"cpu: {p.cpu_percent()}% ",
f"mem: {p.memory_info().rss / 1024 ** 2:.2f} MB",
# end="\r",
# flush=True,
)
else:
for q in queues:
new_msg = q["queue"].tryGet()
if new_msg is not None:
fps_handler.tick(f"{q['cam']} - {q['mx']}")
frame = new_msg.getCvFrame()
fps_handler.draw_fps(frame, f"{q['cam']} - {q['mx']}")
draw_text(frame, f"{new_msg.getTimestamp()}", (5, 45))
cv2.imshow(f"{q['cam']} - {q['mx']}", frame)
key = cv2.waitKey(1)
if key ==ord("y"):
controlQueue=controlQueue1
if key == ord("u"):
controlQueue=controlQueue2
if key == ord("q"):
break
elif key == ord("/"):
show = not show
# print empty string as FPS status new-line separator
print("" if show else "\nprinting camera settings: OFF")
elif key == ord('v'):
save_frame = True
print("Frame saved!")
sys.stdout.flush()
elif key == ord("c"):
for item in queues:
cam = item['cam']
mx = item['mx']
extracted = f"{cam} - {mx}"
capture_list.append(extracted)
capture_time = time.strftime("%Y%m%d_%H%M%S")
elif key == ord("t"):
print("Autofocus trigger (and disable continuous)")
ctrl = dai.CameraControl()
ctrl.setAutoFocusMode(dai.CameraControl.AutoFocusMode.AUTO)
ctrl.setAutoFocusTrigger()
controlQueue.send(ctrl)
controlQueue.send(ctrl)
for q in queues:
out[f"{q['cam']} - {q['mx']}"].release()
print("Released out")
print()