import os
import sys
import time
import blobconverter
import click
import depthai as dai
if sys.version_info[0] < 3:
raise Exception["Doesn't work with Py2"]
MJPEG = False
os.environ["DEPTHAI_LEVEL"] = "debug"
progressCalled = False
# TODO move this under flash(), will need to handle `progressCalled` differently
def progress(p):
global progressCalled
progressCalled = True
print(f"Flashing progress: {p*100:.1f}%")
# Will flash the bootloader if no pipeline is provided as argument
def flash(pipeline=None):
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
bootloader = dai.DeviceBootloader(bl, True)
startTime = time.monotonic()
if pipeline is None:
print("Flashing bootloader...")
bootloader.flashBootloader(progress)
else:
print("Flashing application pipeline...")
bootloader.flash(progress, pipeline)
if not progressCalled:
raise RuntimeError("Flashing failed, please try again")
elapsedTime = round(time.monotonic() - startTime, 2)
print("Done in", elapsedTime, "seconds")
@click.command()
@click.option(
"-fb",
"--flash-bootloader",
is_flag=True,
help="Updates device bootloader prior to running",
)
@click.option(
"-fp",
"--flash-pipeline",
is_flag=True,
help="Flashes pipeline. If bootloader flash is also requested, this will be flashed after",
)
@click.option(
"-gbs",
"--get-boot-state",
is_flag=True,
help="Prints out the boot state of the connected MX"
)
def main(flash_bootloader, flash_pipeline, get_boot_state):
def get_pipeline():
pipeline = dai.Pipeline()
# # Define a source - color camera
cam = pipeline.createColorCamera()
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_48_MP)
cam.setVideoSize(1920, 1080)
cam.initialControl.setSceneMode(dai.CameraControl.SceneMode.FACE_PRIORITY)
# Create MobileNet detection network
mobilenet = pipeline.create(dai.node.MobileNetDetectionNetwork)
mobilenet.setBlobPath(
blobconverter.from_zoo(name="face-detection-retail-0004", shaves=3)
)
mobilenet.setConfidenceThreshold(0.7)
crop_manip = pipeline.create(dai.node.ImageManip)
crop_manip.initialConfig.setResize(300, 300)
crop_manip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888p)
cam.isp.link(crop_manip.inputImage)
crop_manip.out.link(mobilenet.input)
# Create an UVC (USB Video Class) output node. It needs 1920x1080, NV12 input
uvc = pipeline.createUVC()
cam.video.link(uvc.input)
# Script node
script = pipeline.create(dai.node.Script)
mobilenet.out.link(script.inputs["dets"])
script.outputs["cam_cfg"].link(cam.inputConfig)
script.outputs["cam_ctrl"].link(cam.inputControl)
script.setScript(
"""
ORIGINAL_SIZE = (5312, 6000) # 48MP with size constraints described on IMX582 luxonis page
SCENE_SIZE = (1920, 1080) # 1080P
x_arr = []
y_arr = []
AVG_MAX_NUM=7
limits = [0, 0] # xmin and ymin limits
limits.append((ORIGINAL_SIZE[0] - SCENE_SIZE[0]) / ORIGINAL_SIZE[0]) # xmax limit
limits.append((ORIGINAL_SIZE[1] - SCENE_SIZE[1]) / ORIGINAL_SIZE[1]) # ymax limit
cfg = ImageManipConfig()
ctrl = CameraControl()
def average_filter(x, y):
x_arr.append(x)
y_arr.append(y)
if AVG_MAX_NUM < len(x_arr): x_arr.pop(0)
if AVG_MAX_NUM < len(y_arr): y_arr.pop(0)
x_avg = 0
y_avg = 0
for i in range(len(x_arr)):
x_avg += x_arr[i]
y_avg += y_arr[i]
x_avg = x_avg / len(x_arr)
y_avg = y_avg / len(y_arr)
if x_avg < limits[0]: x_avg = limits[0]
if y_avg < limits[1]: y_avg = limits[1]
if limits[2] < x_avg: x_avg = limits[2]
if limits[3] < y_avg: y_avg = limits[3]
return x_avg, y_avg
while True:
dets = node.io['dets'].get().detections
if len(dets) == 0: continue
coords = dets[0] # take first
# Get detection center
x = (coords.xmin + coords.xmax) / 2
y = (coords.ymin + coords.ymax) / 2
x -= SCENE_SIZE[0] / ORIGINAL_SIZE[0] / 2
y -= SCENE_SIZE[1] / ORIGINAL_SIZE[1] / 2
# node.warn(f"{x=} {y=}")
x_avg, y_avg = average_filter(x,y)
# node.warn(f"{x_avg=} {y_avg=}")
cfg.setCropRect(x_avg, y_avg, 0, 0)
node.io['cam_cfg'].send(cfg)
node.io['cam_ctrl'].send(ctrl)
"""
)
return pipeline
if flash_bootloader or flash_pipeline:
if flash_bootloader: flash()
if flash_pipeline: flash(get_pipeline())
print("Flashing successful. Please power-cycle the device")
quit()
if get_boot_state:
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
print(f"Device state: {bl.state.name}")
# with dai.Device(get_pipeline(), usb2Mode=True) as dev:
with dai.Device(get_pipeline()) as dev:
print(f"Connection speed: {dev.getUsbSpeed()}")
# Doing nothing here, just keeping the host feeding the watchdog
while True:
try:
time.sleep(0.1)
except KeyboardInterrupt:
break
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)