#!/usr/bin/env python3
"""
Mono Camera Object Detection using DepthAI YOLO
Reads configuration from JSON and runs inference.
"""
import cv2
import depthai as dai
import numpy as np
import json
import time
import os
from pathlib import Path
from datetime import datetime
import blobconverter
def run_inference(model_path, config_path):
# Load configuration file
configPath = Path(config_path)
if not configPath.exists():
raise ValueError(f"Path {configPath} does not exist!")
with configPath.open() as f:
config = json.load(f)
nnConfig = config.get("nn_config", {})
# Parse input size
if "input_size" in nnConfig:
W, H = tuple(map(int, nnConfig.get("input_size").split('x')))
# Extract YOLO-specific metadata
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", 80) # Default to 80 if missing
coordinates = metadata.get("coordinates", 4)
anchors = metadata.get("anchors", [])
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", 0.5)
confidenceThreshold = metadata.get("confidence_threshold", 0.5)
# Parse labels
nnMappings = config.get("mappings", {})
labels = nnMappings.get("labels", {})
# Check if model exists, else download from DepthAI zoo
nnPath = model_path
if not Path(nnPath).exists():
print(f"No blob found at {nnPath}. Downloading from DepthAI Model Zoo...")
nnPath = str(blobconverter.from_zoo(model_path, shaves=6, zoo_type="depthai", use_cache=True))
# Create DepthAI Pipeline
pipeline = dai.Pipeline()
# Define Mono Camera
mono = pipeline.create(dai.node.MonoCamera)
mono.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
mono.setFps(30)
# Image Manipulation to Resize Input to NN
manip = pipeline.create(dai.node.ImageManip)
manip.initialConfig.setResize(W, H)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.BGR888p)
manip.setMaxOutputFrameSize(W*H*3) # Convert mono to RGB
mono.out.link(manip.inputImage)
# Define YOLO Detection Network
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
detectionNetwork.setBlobPath(nnPath)
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
# Linking Mono Camera to NN
manip.out.link(detectionNetwork.input)
# Define Output Streams
xoutMono = pipeline.create(dai.node.XLinkOut)
xoutNN = pipeline.create(dai.node.XLinkOut)
xoutMono.setStreamName("mono")
xoutNN.setStreamName("nn")
# Linking Outputs
detectionNetwork.passthrough.link(xoutMono.input)
detectionNetwork.out.link(xoutNN.input)
# Create output directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = "output_mono"
os.makedirs(output_dir, exist_ok=True)
raw_video_path = os.path.join(output_dir, f"raw_{timestamp}.avi")
detected_video_path = os.path.join(output_dir, f"detected_{timestamp}.avi")
# Define Video Writers
fourcc = cv2.VideoWriter_fourcc(*'XVID')
fps = 30
raw_writer = cv2.VideoWriter(raw_video_path, fourcc, fps, (W, H))
detected_writer = cv2.VideoWriter(detected_video_path, fourcc, fps, (W, H))
# Helper function for normalization
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame, detections):
color = (255, 0, 0)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40),
cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
detected_writer.write(frame)
cv2.imshow(name, frame)
# Start Pipeline Execution
with dai.Device(pipeline) as device:
ir_emitter_brightness = 1200 # Set between 0 and 1500 (max)
device.setIrLaserDotProjectorBrightness(ir_emitter_brightness)
device.setIrFloodLightBrightness(ir_emitter_brightness)
qMono = device.getOutputQueue(name="mono", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
frame = None
detections = []
startTime = time.monotonic()
counter = 0
while True:
inMono = qMono.get()
inDet = qDet.get()
if inMono is not None:
frame = inMono.getCvFrame()
raw_writer.write(frame)
cv2.putText(frame, "NN FPS: {:.2f}".format(counter / (time.monotonic() - startTime)),
(2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, (255, 255, 255))
if inDet is not None:
detections = inDet.detections
counter += 1
if frame is not None:
displayFrame("Mono Camera Detection", frame, detections)
if cv2.waitKey(1) == ord('q'):
break
# Release Resources
raw_writer.release()
detected_writer.release()
cv2.destroyAllWindows()
model_path = './model/drone_rc_birds_1000epoch/best_dorne_rc_birds_1000_openvino_2022.1_6shave.blob'
config_path = './model/drone_rc_birds_1000epoch/best_dorne_rc_birds_1000.json'
# Example usage
# model_path = './model/blob_640_nano_rc_b_d_1k/best_openvino_2022.1_6shave.blob'
# config_path = './model/blob_640_nano_rc_b_d_1k/best.json'
run_inference(model_path, config_path)
Unknown

- 5 days ago
- Joined Sep 2, 2024
- 0 best answers
hi @jakaskerl whether the code given below for detection using the mono camera is correct?
is it activating the ir flood light to detect in the night?thanks @jakaskerl , Actually my aim is to use color camera and process the detections on the grayscale frame, the model i am using is trained on 3 channel images.
hi @jakaskerl ,Thanks for the support.
I want to use the model trained with the 3 channels images, can we convert the grayscale frame to 3-channel before passing to the BLOB model for detection, if yes , how?Can you please give me correct code with all the fixes done.
hi @jakaskerl , even on doing "Instead of
camRgb.setPreviewSize(W, H)
, docamRgb.setVideoSize(W, H)".
I m getting error for the above fix - [1844301051C738F500] [1.3] [4.342] [DetectionNetwork(2)] [error] Input tensor 'images' (0) exceeds available data range. Data size (802816B), tensor offset (0), size (2408448B) - skipping inference
@jakaskerl getting this error continuously…
[1844301051C738F500] [1.3] [3.097] [ImageManip(1)] [error] Output image is bigger (2073600B) than maximum frame size specified in properties (802816B) - skipping frame. Please use the setMaxOutputFrameSize API to explicitly config the [maximum] output size.please note: the model is trained on 896*896 resolution.
please give the code which can run such models without any errors.
#!/usr/bin/env python3 """ The code is edited from docs (https://docs.luxonis.com/projects/api/en/latest/samples/Yolo/tiny_yolo/) We add parsing from JSON files that contain configuration and process detections on a grayscale frame. """ from pathlib import Path import cv2 import depthai as dai import numpy as np import time import json import blobconverter import os def run_inference(model_path, config_path): # parse config configPath = Path(config_path) if not configPath.exists(): raise ValueError("Path {} does not exist!".format(configPath)) with configPath.open() as f: config = json.load(f) nnConfig = config.get("nn_config", {}) # parse input shape if "input_size" in nnConfig: W, H = tuple(map(int, nnConfig.get("input_size").split('x'))) # extract metadata metadata = nnConfig.get("NN_specific_metadata", {}) classes = metadata.get("classes", {}) coordinates = metadata.get("coordinates", {}) anchors = metadata.get("anchors", {}) anchorMasks = metadata.get("anchor_masks", {}) iouThreshold = metadata.get("iou_threshold", {}) confidenceThreshold = metadata.get("confidence_threshold", {}) # parse labels nnMappings = config.get("mappings", {}) labels = nnMappings.get("labels", {}) # get model path nnPath = model_path if not Path(nnPath).exists(): print("No blob found at {}. Looking into DepthAI model zoo.".format(nnPath)) nnPath = str(blobconverter.from_zoo(model_path, shaves=6, zoo_type="depthai", use_cache=True)) # Create pipeline pipeline = dai.Pipeline() # Define sources and outputs camRgb = pipeline.create(dai.node.ColorCamera) manip = pipeline.create(dai.node.ImageManip) detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork) xoutRgb = pipeline.create(dai.node.XLinkOut) nnOut = pipeline.create(dai.node.XLinkOut) xoutRgb.setStreamName("rgb") nnOut.setStreamName("nn") # Camera properties camRgb.setPreviewSize(W, H) camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P) camRgb.setInterleaved(False) camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB) camRgb.setFps(30) # Convert to grayscale manip.initialConfig.setFrameType(dai.ImgFrame.Type.GRAY8) manip.setMaxOutputFrameSize(W * H ) # Ensure correct output size allocation for grayscale camRgb.video.link(manip.inputImage) # Network specific settings detectionNetwork.setConfidenceThreshold(confidenceThreshold) detectionNetwork.setNumClasses(classes) detectionNetwork.setCoordinateSize(coordinates) detectionNetwork.setAnchors(anchors) detectionNetwork.setAnchorMasks(anchorMasks) detectionNetwork.setIouThreshold(iouThreshold) detectionNetwork.setBlobPath(nnPath) detectionNetwork.setNumInferenceThreads(2) detectionNetwork.input.setBlocking(False) # Linking grayscale image to the detection network manip.out.link(detectionNetwork.input) detectionNetwork.passthrough.link(xoutRgb.input) detectionNetwork.out.link(nnOut.input) # Connect to device and start pipeline with dai.Device(pipeline) as device: qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False) qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False) frame = None detections = [] startTime = time.monotonic() counter = 0 while True: inRgb = qRgb.get() inDet = qDet.get() if inRgb is not None: frame = inRgb.getCvFrame() cv2.putText(frame, "NN fps: {:.2f}".format(counter / (time.monotonic() - startTime)), (2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, (255, 255, 255)) print(counter / (time.monotonic() - startTime)) if inDet is not None: detections = inDet.detections counter += 1 if frame is not None: cv2.imshow("Grayscale Detection", frame) if cv2.waitKey(1) == ord('q'): break # Example usage model_path = '../model/896_yolov5n6_blob/best_openvino_2022.1_6shave.blob' config_path = '../model/896_yolov5n6_blob/best.json' run_inference(model_path, config_path)
hi @jakaskerl
the above code is not working if the model is trained on img size 896*896.i am not able to detect oakd camera on jetson orin nano with jetpack 6.1, can you please give me sample code with dependencies required.
Hi @jakaskerl its a .pt file only. Please click on the black pop out icon above "file size", a new tab will open then you will be able to download the .pt file.
Thanks.
Hey @jakaskerl here is the link to download the model. it will be great if custom model trained using yolov6n6 can also be converted to blob.
hey @jakaskerl , thankyou.
i face error while converting yolovn6 pretrained to blob. the error is "Error while converting to openvino". i am using tools.luxonis.com
Hey @erik
I have trained a custom model using YOLOv5n. While converting the model to a blob format using RVC2, I encountered an error. However, when I used RVC3 for the conversion, it successfully generated the blob. Unfortunately, when running this blob on DepthAI, I received the following error:"RuntimeError: BlobReader error: File does not seem to be a supported neural network blob."
It's worth noting that a pre-trained YOLOv5n model was converted using RVC2 without issues and ran successfully as a blob. The issue only arises with the custom-trained model.
Could you please assist in troubleshooting this problem?