This is an AI object detection routine for the Oak-D camera. It worked perfectly before I tried to add ROS to the program so that the camera AI image could be adjusted according to the machine speed. The camera will be instructed to use one of three slices of the image according to the number in "distance_value". This uses Ubuntu 20.04, Python 3.8, OpenCV, and ROS Noetic. It's to run on a Raspberry Pi but this version will run on a laptop. I don't think I'm calling my functions properly or at the right time. When I run it, it's stuck in a loop and doesn't show an image until I hit CTRL-C, then must be ended with CTRL-Z. Can an expert please help!
"""
The code is edited from docs (https://docs.luxonis.com/projects/api/en/latest/samples/Yolo/tiny_yolo/)
We add parsing from JSON files that contain configuration.
"""
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
import time
import argparse
import json
import blobconverter
import rospy
from std_msgs.msg import String
from time import sleep
# import RPi.GPIO as GPIO
# from gpiozero import Servo
z = ([0,0,0,0,0,0,0,0])
r = ([0,0,0,0,0,0,0,0])
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--model", help="Provide model name or model path for inference",
default='/home/steve/wide-weeds/openvino/result_openvino_2021.4_6shave.blob', type=str)
parser.add_argument("-c", "--config", help="Provide config path for inference",
default='/home/steve/wide-weeds/openvino/result.json', type=str)
args = parser.parse_args()
# parse config
configPath = Path(args.config)
if not configPath.exists():
raise ValueError("Path {} does not exist!".format(configPath))
with configPath.open() as f:
config = json.load(f)
nnConfig = config.get("nn_config", {})
# parse input shape
if "input_size" in nnConfig:
W, H = tuple(map(int, nnConfig.get("input_size").split('x')))
# extract metadata
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", {})
confidenceThreshold = metadata.get("confidence_threshold", {})
print(metadata)
# parse labels
nnMappings = config.get("mappings", {})
labels = nnMappings.get("labels", {})
# get model path
nnPath = args.model
if not Path(nnPath).exists():
print("No blob found at {}. Looking into DepthAI model zoo.".format(nnPath))
nnPath = str(blobconverter.from_zoo(args.model, shaves = 6, zoo_type = "depthai", use_cache=True))
# sync outputs
syncNN = True
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
# Properties
camRgb.setPreviewSize(1280, 672)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
# camRgb.setVideoSize(1920, 320)
camRgb.setFps(15)
# Faster video not needed, I think, at this point.
manip = pipeline.create(dai.node.ImageManip)
camRgb.preview.link(manip.inputImage)
configIn = pipeline.create(dai.node.XLinkIn)
xout = pipeline.create(dai.node.XLinkOut)
configIn.setStreamName('config')
xout.setStreamName("out1")
manip.out.link(xout.input)
# choose from one of three slices of preview image, according to machine speed
manip.initialConfig.setCropRect(0, 0.66666, 1, 1)
manip.setMaxOutputFrameSize(1280 * 672 * 3)
# Linking
configIn.out.link(manip.inputConfig)
# xoutRgb = pipeline.create(dai.node.XLinkOut)
nnOut = pipeline.create(dai.node.XLinkOut)
# xoutRgb.setStreamName("rgb")
nnOut.setStreamName("nn")
# Network specific settings
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setBlobPath(nnPath)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
# Linking
manip.out.link(detectionNetwork.input) # line changed
detectionNetwork.passthrough.link(xout.input) # line changed
detectionNetwork.out.link(nnOut.input)
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues will be used to get the rgb frames and nn data from the outputs defined above
t0 = time.time()
tm = str(t0)
print('Time = ',tm)
# qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
q = device.getOutputQueue(name="out1", maxSize=4, blocking=False)
configQueue = device.getInputQueue(configIn.getStreamName())
sendCamConfig = False
frame = None
detections = []
startTime = time.monotonic()
counter = 0
color2 = (255, 255, 255)
def callback(msg):
rospy.loginfo(rospy.get_caller_id() + "distance_value= %s", msg.data)
y1 = 0.333333
y2 = 0.666666
if msg.data > 5:
y1 = 0
y2 = 0.333333
elif msg.data < 3:
y1 = 0.6666666
y2 = 1
sendCamConfig = True
cfg = dai.ImageManipConfig()
cfg.setCropRect(0, y1, 1, y2)
configQueue.send(cfg)
sendCamConfig = False
def listener():
rospy.init_node('listener', anonymous=False)
sub = rospy.Subscriber("distance_value", Int32, callback)
rospy.spin()
# nn data, being the bounding box locations, are in <0..1> range - they need to be normalized with frame width/height
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame, detections):
color = (255, 0, 0)
global z
global r
z = ([0,0,0,0,0,0,0,0])
r = ([0,0,0,0,0,0,0,0])
tz = 0
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
aim = (detection.xmin + detection.xmax) / 2
aim = int(aim * 19) - 10
aim = int(aim) /10
if aim < -1: aim = -1
if aim > 0.8: aim = 0.8
z[tz] = aim
cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
r[tz] = (detection.confidence)
tz = tz + 1 # get eight values for confidence, select largest one to decide aim value
if tz > 7 :
print(" Slow down!")
break
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
# Show the frame
cv2.imshow(name, frame)
while True:
# inRgb = qRgb.get()
inDet = qDet.get()
in1 =q.tryGet()
listener()
if in1 is not None:
frame = in1.getCvFrame()
cv2.putText(frame, "NN fps: {:.2f}".format(counter / (time.monotonic() - startTime)),
(2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, color2)
if inDet is not None:
detections = inDet.detections
counter += 1
# print("Counter =",counter)
max_value = None
max_idx = None
for idx, num in enumerate(r):
if (max_value is None or num > max_value):
max_value = num
max_idx = idx
confidence = max_value
aim = z[max_idx] # Pluck proper aim value from list
if confidence > 0.6 :
# servo.value = aim
print("Servo aim= ", aim, " Spraying... ", "Counter= ", counter)
else :
print("No weeds found... ")
# rospy.spin()
if frame is not None:
displayFrame("out1", frame, detections)
# print("Only grass... ")
if cv2.waitKey(1) == ord('q'):
t0 = time.time()
tm = str(t0)
print('Time = ',tm)
break`