#!/usr/bin/env python3
"""
Mono Camera Object Detection using DepthAI YOLO
Reads configuration from JSON and runs inference.
"""
import cv2
import depthai as dai
import numpy as np
import json
import time
import os
from pathlib import Path
from datetime import datetime
import blobconverter
def run_inference(model_path, config_path):
# Load configuration file
configPath = Path(config_path)
if not configPath.exists():
raise ValueError(f"Path {configPath} does not exist!")
with configPath.open() as f:
config = json.load(f)
nnConfig = config.get("nn_config", {})
# Parse input size
if "input_size" in nnConfig:
W, H = tuple(map(int, nnConfig.get("input_size").split('x')))
# Extract YOLO-specific metadata
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", 80) # Default to 80 if missing
coordinates = metadata.get("coordinates", 4)
anchors = metadata.get("anchors", [])
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", 0.5)
confidenceThreshold = metadata.get("confidence_threshold", 0.5)
# Parse labels
nnMappings = config.get("mappings", {})
labels = nnMappings.get("labels", {})
# Check if model exists, else download from DepthAI zoo
nnPath = model_path
if not Path(nnPath).exists():
print(f"No blob found at {nnPath}. Downloading from DepthAI Model Zoo...")
nnPath = str(blobconverter.from_zoo(model_path, shaves=6, zoo_type="depthai", use_cache=True))
# Create DepthAI Pipeline
pipeline = dai.Pipeline()
# Define Mono Camera
mono = pipeline.create(dai.node.MonoCamera)
mono.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
mono.setFps(30)
# Image Manipulation to Resize Input to NN
manip = pipeline.create(dai.node.ImageManip)
manip.initialConfig.setResize(W, H)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888p)
manip.setMaxOutputFrameSize(W*H*3) # Convert mono to RGB
mono.out.link(manip.inputImage)
# Define YOLO Detection Network
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
detectionNetwork.setBlobPath(nnPath)
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
# Linking Mono Camera to NN
manip.out.link(detectionNetwork.input)
# Define Output Streams
xoutMono = pipeline.create(dai.node.XLinkOut)
xoutNN = pipeline.create(dai.node.XLinkOut)
xoutMono.setStreamName("mono")
xoutNN.setStreamName("nn")
# Linking Outputs
detectionNetwork.passthrough.link(xoutMono.input)
detectionNetwork.out.link(xoutNN.input)
# Create output directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = "output_mono"
os.makedirs(output_dir, exist_ok=True)
raw_video_path = os.path.join(output_dir, f"raw_{timestamp}.avi")
detected_video_path = os.path.join(output_dir, f"detected_{timestamp}.avi")
# Define Video Writers
fourcc = cv2.VideoWriter_fourcc(*'XVID')
fps = 30
raw_writer = cv2.VideoWriter(raw_video_path, fourcc, fps, (W, H))
detected_writer = cv2.VideoWriter(detected_video_path, fourcc, fps, (W, H))
# Helper function for normalization
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame, detections):
color = (255, 0, 0)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
detected_writer.write(frame)
cv2.imshow(name, frame)
# Start Pipeline Execution
with dai.Device(pipeline) as device:
ir_emitter_brightness = 1200 # Set between 0 and 1500 (max)
device.setIrLaserDotProjectorBrightness(ir_emitter_brightness)
device.setIrFloodLightBrightness(ir_emitter_brightness)
qMono = device.getOutputQueue(name="mono", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
frame = None
detections = []
startTime = time.monotonic()
counter = 0
while True:
inMono = qMono.get()
inDet = qDet.get()
if inMono is not None:
frame = inMono.getCvFrame()
raw_writer.write(frame)
cv2.putText(frame, "NN FPS: {:.2f}".format(counter / (time.monotonic() - startTime)),
(2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, (255, 255, 255))
if inDet is not None:
detections = inDet.detections
counter += 1
if frame is not None:
displayFrame("Mono Camera Detection", frame, detections)
if cv2.waitKey(1) == ord('q'):
break
# Release Resources
raw_writer.release()
detected_writer.release()
cv2.destroyAllWindows()
model_path = './model/drone_rc_birds_1000epoch/best_dorne_rc_birds_1000_openvino_2022.1_6shave.blob'
config_path = './model/drone_rc_birds_1000epoch/best_dorne_rc_birds_1000.json'
# Example usage
# model_path = './model/blob_640_nano_rc_b_d_1k/best_openvino_2022.1_6shave.blob'
# config_path = './model/blob_640_nano_rc_b_d_1k/best.json'
run_inference(model_path, config_path)