jakaskerl
Hi, here is my code
# USAGE
# python recognize_camera.py --m yolov8n or
# python recognize_camera.py --m yolov8s
# python recognize_camera.py --m langsigne06
# python recognize_camera.py --m r153tello
# python recognize_camera.py --m r153telloyolov8
# import the necessary packages
# from pyimagesearch import config
import os
import time
import cv2
import depthai as dai
from pathlib import Path
import json
import numpy as np
# initialize a depthai camera pipeline (create config file and model weights)
print("[INFO] initializing a depthai camera pipeline...")
model_name = "yolov8n"
with open("Json/yolov8n.json", "r") as f:
config = json.load(f)
print(model_name)
# create camera pipeline
config_path = os.path.join(
"Json", model_name + ".json")
model_path = os.path.join(
"Json", model_name + ".blob")
label_map = config["mappings"]["labels"]
# CAMERA_PREV_DIM = (512, 512)
FRAME_SIZE = (512, 512)
DET_INPUT_SIZE = (512, 512)
syncNN = True
# initialize a depthai pipeline
pipeline = dai.Pipeline()
# load model config file and fetch nn_config parameters
print("[INFO] loading model config...")
configPath = Path(config_path)
nnConfig = config.get("nn_config", {})
print("[INFO] extracting metadata from model config...")
# using nnConfig extract metadata like classes,
# iou and confidence threshold, number of coordinates
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", {})
confidenceThreshold = metadata.get("confidence_threshold", {})
print(metadata)
print("[INFO] configuring source and outputs...")
camRgb = pipeline.create(dai.node.ColorCamera)
detectionNetwork = pipeline.create(dai.node.YoloSpatialDetectionNetwork)
xoutRgb = pipeline.create(dai.node.XLinkOut)
nnOut = pipeline.create(dai.node.XLinkOut)
nnNetworkOut = pipeline.create(dai.node.XLinkOut)
xoutDepth = pipeline.create(dai.node.XLinkOut)
print("[INFO] setting stream names for queues...")
xoutRgb.setStreamName("preview")
nnOut.setStreamName("det_out")
xoutDepth.setStreamName("depth")
nnNetworkOut.setStreamName("nnNetwork")
print("[INFO] setting camera properties...")
camRgb.setPreviewSize(FRAME_SIZE[0], FRAME_SIZE[1])
# camRgb.setPreviewSize(CAMERA_PREV_DIM)
#camRgb.setSensorCrop(.1, .2)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(40)
print("[INFO] setting YOLO network properties...")
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setBlobPath(model_path)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
detectionNetwork.setDepthLowerThreshold(100)
detectionNetwork.setDepthUpperThreshold(5000)
detectionNetwork.setBoundingBoxScaleFactor(0.5)
print("[INFO] creating links...")
camRgb.preview.link(detectionNetwork.input)
detectionNetwork.passthrough.link(xoutRgb.input)
detectionNetwork.out.link(nnOut.input)
print(model_name.lower())
# Define mono camera sources for stereo depth
mono_left = pipeline.createMonoCamera()
mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
mono_left.setBoardSocket(dai.CameraBoardSocket.CAM_B)
mono_right = pipeline.createMonoCamera()
mono_right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
mono_right.setBoardSocket(dai.CameraBoardSocket.CAM_C)
# Create stereo depth node
stereo = pipeline.createStereoDepth()
stereo.setLeftRightCheck(True)
# Linking
mono_left.out.link(stereo.left)
mono_right.out.link(stereo.right)
# setting node configs
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
# Align depth map to the perspective of RGB camera, on which inference is done
stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)
stereo.setOutputSize(mono_left.getResolutionWidth(), mono_left.getResolutionHeight())
stereo.setSubpixel(True)
stereo.depth.link(detectionNetwork.inputDepth)
detectionNetwork.passthroughDepth.link(xoutDepth.input)
#detectionNetwork.outNetwork.link(nnNetworkOut.input)
# Create preview output
disparity_out = pipeline.createXLinkOut()
disparity_out.setStreamName("disparity")
stereo.disparity.link(disparity_out.input)
def display_info(frame, bbox, detection, fps):
# Display bounding box
cv2.rectangle(frame, bbox, (0, 255, 0), 2)
# Create background for showing details
cv2.rectangle(frame, (5, 5, 175, 100), (50, 0, 0), -1)
# Display class name on the frame
class_name = label_map[detection.label]
cv2.putText(frame, class_name, (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 0.6, (0, 0, 255))
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60), cv2.FONT_HERSHEY_TRIPLEX, 1,
(0, 0, 255))
# Display instructions on the frame
cv2.putText(frame, f'FPS: {fps:.2f}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255))
cv2.putText(frame, f"X: {int(detection.spatialCoordinates.x)} mm", (bbox[0] + 10, bbox[1] + 80), cv2.FONT_HERSHEY_TRIPLEX,0.5, 255)
cv2.putText(frame, f"Y: {int(detection.spatialCoordinates.y)} mm", (bbox[0] + 10, bbox[1] + 95), cv2.FONT_HERSHEY_TRIPLEX,0.5, 255)
cv2.putText(frame, f"Z: {int(detection.spatialCoordinates.z)} mm", (bbox[0] + 10, bbox[1] + 110), cv2.FONT_HERSHEY_TRIPLEX,0.5, 255)
def bbox_calc(det):
# Correct bounding box
xmin = max(0, det.xmin)
ymin = max(0, det.ymin)
xmax = min(det.xmax, 1)
ymax = min(det.ymax, 1)
# Calculate coordinates
x = int(xmin * FRAME_SIZE[0])
y = int(ymin * FRAME_SIZE[1])
w = int(xmax * FRAME_SIZE[0] - xmin * FRAME_SIZE[0])
h = int(ymax * FRAME_SIZE[1] - ymin * FRAME_SIZE[1])
bbox_r = (x, y, w, h)
return bbox_r
# Frame count
frame_count = 0
# Placeholder fps value
fps = 0
# Used to record the time when we processed last frames
prev_frame_time = 0
# Used to record the time at which we processed current frames
new_frame_time = 0
# Set status colors
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues will be used to get the rgb frames and nn data from the outputs defined above
print("[INFO] extracting Queues data...")
previewQueue = device.getOutputQueue(name="preview", maxSize=4, blocking=False)
detectionNNQueue = device.getOutputQueue(name="det_out", maxSize=4, blocking=False)
depthQueue = device.getOutputQueue(name="depth", maxSize=4, blocking=False)
networkQueue = device.getOutputQueue(name="nnNetwork", maxSize=4, blocking=False)
startTime = time.monotonic()
counter = 0
fps = 0
color = (255, 255, 255)
while True:
try:
print("[INFO] RGB Frames...")
inPreview = previewQueue.get()
except:
print("[INFO] Erreur : Extracting RGB Frames...")
print("[INFO] Detections Frames...")
inDet = detectionNNQueue.get()
print("[INFO] Depth Frames...")
depth = depthQueue.get()
#inNN = networkQueue.get()
print("[INFO] Open Cv format Frames...")
frame = inPreview.getCvFrame()
depthFrame = depth.getFrame() # depthFrame values are in millimeters
counter+=1
current_time = time.monotonic()
if (current_time - startTime) > 1 :
fps = counter / (current_time - startTime)
counter = 0
startTime = current_time
detections = inDet.detections
# If the frame is available, draw bounding boxes on it and show the frame
print("[INFO] Detections...")
for detection in detections:
bbox_r = bbox_calc(detection)
display_info(frame, bbox_r, detection, fps)
print("[INFO] end of detections...")
cv2.putText(frame, "NN fps: {:.2f}".format(fps), (2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, color)
cv2.imshow("rgb", frame)
if cv2.waitKey(1) == ord('q'):
break