I am using a branch of the 2.17.0.0 that has UVC support.
depthai-2.17.0.0.dev0+5d55eb3a801210bc69aab3580ccdc032b92ee380-cp310-cp310-win_amd64.whl
import os
import sys
import time
import blobconverter
import click
import depthai as dai
#import GPIO
if sys.version_info[0] < 3:
raise Exception["Doesn't work with Py2"]
MJPEG = False
os.environ["DEPTHAI_LEVEL"] = "debug"
progressCalled = False
# TODO move this under flash(), will need to handle `progressCalled` differently
def progress(p):
global progressCalled
progressCalled = True
print(f"Flashing progress: {p*100:.1f}%")
# Will flash the bootloader if no pipeline is provided as argument
def flash(pipeline=None):
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
bootloader = dai.DeviceBootloader(bl, True)
startTime = time.monotonic()
if pipeline is None:
print("Flashing bootloader...")
bootloader.flashBootloader(progress)
else:
print("Flashing application pipeline...")
bootloader.flash(progress, pipeline)
if not progressCalled:
raise RuntimeError("Flashing failed, please try again")
elapsedTime = round(time.monotonic() - startTime, 2)
print("Done in", elapsedTime, "seconds")
@click.command()
@click.option(
"-fb",
"--flash-bootloader",
is_flag=True,
help="Updates device bootloader prior to running",
)
@click.option(
"-fp",
"--flash-pipeline",
is_flag=True,
help="Flashes pipeline. If bootloader flash is also requested, this will be flashed after",
)
@click.option(
"-gbs",
"--get-boot-state",
is_flag=True,
help="Prints out the boot state of the connected MX"
)
def main(flash_bootloader, flash_pipeline, get_boot_state):
# colorCam.video (setVideoSize(1920,1080) --> UVC
# colorCam.isp --> ImageManip_scale -> face_detect -> Script -> colorCam.inputConfig
def get_pipeline():
pipeline = dai.Pipeline()
# # Define a source - color camera
cam = pipeline.createColorCamera()
cam.setBoardSocket(dai.CameraBoardSocket.RGB)
# cam.setInterleaved(False)
# cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_48_MP)
# cam.initialControl.setManualExposure(20000, 400)
# cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_12_MP)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
cam.setVideoSize(1920, 1080)
cam.initialControl.setSceneMode(dai.CameraControl.SceneMode.FACE_PRIORITY)
#first rotate frame 90 deg
#do face detection
#crop rotated 4k image into 1080x1920( as in the lossless zooming example),
#then rotate that to 1080 you can feed into the UVC node.
# Create MobileNet detection network
mobilenet = pipeline.create(dai.node.MobileNetDetectionNetwork)
mobilenet.setBlobPath(
blobconverter.from_zoo(name="face-detection-retail-0004", shaves=3)
)
mobilenet.setConfidenceThreshold(0.7)
crop_manip = pipeline.create(dai.node.ImageManip)
crop_manip.initialConfig.setResize(300, 300)
crop_manip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888p)
cam.isp.link(crop_manip.inputImage)
crop_manip.out.link(mobilenet.input)
# Create an UVC (USB Video Class) output node. It needs 1920x1080, NV12 input
uvc = pipeline.createUVC()
cam.video.link(uvc.input)
# Script node
script = pipeline.create(dai.node.Script)
mobilenet.out.link(script.inputs["dets"])
script.outputs["cam_cfg"].link(cam.inputConfig)
script.outputs["cam_ctrl"].link(cam.inputControl)
script.setScript(
"""
ORIGINAL_SIZE = (5312, 6000) # 48MP with size constraints described on IMX582 luxonis page
SCENE_SIZE = (1920, 1080) # 1080P
x_arr = []
y_arr = []
AVG_MAX_NUM=7
limits = [0, 0] # xmin and ymin limits
limits.append((ORIGINAL_SIZE[0] - SCENE_SIZE[0]) / ORIGINAL_SIZE[0]) # xmax limit
limits.append((ORIGINAL_SIZE[1] - SCENE_SIZE[1]) / ORIGINAL_SIZE[1]) # ymax limit
cfg = ImageManipConfig()
ctrl = CameraControl()
def average_filter(x, y):
x_arr.append(x)
y_arr.append(y)
if AVG_MAX_NUM < len(x_arr): x_arr.pop(0)
if AVG_MAX_NUM < len(y_arr): y_arr.pop(0)
x_avg = 0
y_avg = 0
for i in range(len(x_arr)):
x_avg += x_arr[i]
y_avg += y_arr[i]
x_avg = x_avg / len(x_arr)
y_avg = y_avg / len(y_arr)
if x_avg < limits[0]: x_avg = limits[0]
if y_avg < limits[1]: y_avg = limits[1]
if limits[2] < x_avg: x_avg = limits[2]
if limits[3] < y_avg: y_avg = limits[3]
return x_avg, y_avg
while True:
dets = node.io['dets'].get().detections
if len(dets) == 0: continue
coords = dets[0] # take first
width = (coords.xmax - coords.xmin) * ORIGINAL_SIZE[0]
height = (coords.ymax - coords.ymin) * ORIGINAL_SIZE[1]
x_pixel = int(max(0, coords.xmin * ORIGINAL_SIZE[0]))
y_pixel = int(max(0, coords.ymin * ORIGINAL_SIZE[1]))
# ctrl.setAutoFocusRegion(x_pixel, y_pixel, int(width), int(height))
# ctrl.setAutoExposureRegion(x_pixel, y_pixel, int(width), int(height))
# Get detection center
x = (coords.xmin + coords.xmax) / 2
y = (coords.ymin + coords.ymax) / 2
x -= SCENE_SIZE[0] / ORIGINAL_SIZE[0] / 2
y -= SCENE_SIZE[1] / ORIGINAL_SIZE[1] / 2
# node.warn(f"{x=} {y=}")
x_avg, y_avg = average_filter(x,y)
# node.warn(f"{x_avg=} {y_avg=}")
cfg.setCropRect(x_avg, y_avg, 0, 0)
node.io['cam_cfg'].send(cfg)
node.io['cam_ctrl'].send(ctrl)
# import GPIO
# MX_PIN = 60
#
# ret = GPIO.setup(MX_PIN, GPIO.OUT, GPIO.PULL_DOWN)
# toggleVal = True
#
# while True:
# data = node.io['in'].get() # Wait for a message from the host computer
#
# node.warn('GPIO toggle: ' + str(toggleVal))
# toggleVal = not toggleVal
# ret = GPIO.write(MX_PIN, toggleVal) # Toggle the GPIO
"""
)
# pipeline.setXLinkChunkSize(0) # experimental throughput change
return pipeline
if flash_bootloader or flash_pipeline:
if flash_bootloader: flash()
if flash_pipeline: flash(get_pipeline())
print("Flashing successful. Please power-cycle the device")
quit()
if get_boot_state:
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
print(f"Device state: {bl.state.name}")
# with dai.Device(get_pipeline(), usb2Mode=True) as dev:
with dai.Device(get_pipeline()) as dev:
print(f"Connection speed: {dev.getUsbSpeed()}")
# Doing nothing here, just keeping the host feeding the watchdog
while True:
try:
time.sleep(0.1)
except KeyboardInterrupt:
break
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)