Hi @jakaskerl thanks for the quick response!
When using the depthai-python example rgb*_camera_control.py the frames hover around 5fps and has no drops. I have tried running the main.py in gen2-face-detection in depthai-experiments and there was no fps drop.
Here is my MRE.
from pathlib import Path
import os
import cv2
import depthai as dai
import numpy as np
import time
import json
# Import config file options
conf={}
try:
with Path(os.environ.get('BASE_PATH'), 'config.json').open('r') as configFile:
conf = json.load(configFile)
except Exception as e:
print('Error loading `config.json` file.')
print(e)
exit()
# parse config
modelConfigPath = Path(os.environ.get('BASE_PATH'), 'json', conf.get('model_name', {}) + '.json')
if not modelConfigPath.exists():
raise ValueError("Path {} does not exist!".format(modelConfigPath))
modelConfig={}
with modelConfigPath.open('r') as f:
modelConfig = json.load(f)
nnConfig = modelConfig.get("nn_config", {})
# parse input shape
if "input_size" in nnConfig:
model_width, model_height = tuple(map(int, nnConfig.get("input_size").split('x')))
# extract metadata
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", {})
confidenceThreshold = metadata.get("confidence_threshold", {})
# parse labels
nnMappings = modelConfig.get("mappings", {})
labels = nnMappings.get("labels", {})
# get model path
nnPath = Path(os.environ.get('BASE_PATH'), 'model', conf.get('model_name', {}) + '.blob')
if not nnPath.exists():
print("No blob found at {}.".format(nnPath))
# Create pipeline
pipeline = dai.Pipeline()
wide_angle = conf.get('wide_angle', False)
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
if wide_angle:
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_800_P)
else:
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
objectTracker = pipeline.create(dai.node.ObjectTracker)
# camRgb.setIspScale(2,3) # 1080P -> 720P
previewOut = pipeline.create(dai.node.XLinkOut)
trackerOut = pipeline.create(dai.node.XLinkOut)
configIn = pipeline.create(dai.node.XLinkIn)
previewOut.setStreamName("preview")
trackerOut.setStreamName("tracklets")
configIn.setStreamName('config')
cropped_resolution = int(conf.get('resolution', 640))
# Properties
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(20)
camRgb.setVideoSize(cropped_resolution,cropped_resolution)
camRgb.setPreviewSize(cropped_resolution,cropped_resolution)
# Use ImageManip to resize to 320x320 with letterboxing
manip = pipeline.create(dai.node.ImageManip)
if conf.get('letterbox', {}) == True:
if wide_angle:
camRgb.setVideoSize(1280,800)
camRgb.setPreviewSize(1280,800)
camRgb.setVideoSize(1920,1080)
camRgb.setPreviewSize(1920,1080)
manip.setMaxOutputFrameSize(model_width * model_height * 3)
manip.initialConfig.setResizeThumbnail(model_width, model_height)
camRgb.preview.link(manip.inputImage)
manip.out.link(detectionNetwork.input)
manip.out.link(objectTracker.inputTrackerFrame)
# Network specific settings
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setBlobPath(nnPath)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
objectTracker.setDetectionLabelsToTrack([0])
# possible tracking types: ZERO_TERM_COLOR_HISTOGRAM, ZERO_TERM_IMAGELESS, SHORT_TERM_IMAGELESS, SHORT_TERM_KCF
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM)
# take the smallest ID when new object is tracked, possible options: SMALLEST_ID, UNIQUE_ID
objectTracker.setTrackerIdAssignmentPolicy(dai.TrackerIdAssignmentPolicy.UNIQUE_ID)
objectTracker.setTrackerThreshold(0.5)
# Linking
objectTracker.passthroughTrackerFrame.link(previewOut.input)
configIn.out.link(camRgb.inputConfig)
detectionNetwork.passthrough.link(objectTracker.inputDetectionFrame)
detectionNetwork.out.link(objectTracker.inputDetections)
objectTracker.out.link(trackerOut.input)
class TrackableObject:
def __init__(self, objectID, centroid):
# store the object ID, then initialize a list of centroids
# using the current centroid
self.objectID = objectID
self.centroids = [centroid]
# initialize a boolean used to indicate if the object has
# already been counted or not
self.counted = False
objectCounter = [0, 0, 0, 0]
counter_multiplier = int(conf.get('counter_multiplier', 1))
config_direction = conf.get('direction', None)
if config_direction == 'left' or config_direction == 'right':
axis = 'vertical'
elif config_direction == 'up' or config_direction == 'down':
axis = 'horizontal'
else:
print('No direction specified, check config file.')
exit()
while True:
try:
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
#preview = device.getOutputQueue("preview", 4, False)
preview = device.getOutputQueue("preview", 12, False)
tracklets = device.getOutputQueue("tracklets", 12, False)
configQueue = device.getInputQueue("config", 12, False)
# Max cropX & cropY
maxCropX = (camRgb.getIspWidth() - camRgb.getVideoWidth()) / camRgb.getIspWidth()
maxCropY = (camRgb.getIspHeight() - camRgb.getVideoHeight()) / camRgb.getIspHeight()
# Default crop
crops = conf.get('crops', {})
cropX = float(crops.get('x', {}))
cropY = float(crops.get('y', {}))
sendCamConfig = True
roi_position = float(conf.get('roi', {}))
if sendCamConfig:
cfg = dai.ImageManipConfig()
cfg.setCropRect(cropX, cropY, maxCropX, maxCropY)
cfg.setKeepAspectRatio(False)
configQueue.send(cfg)
sendCamConfig = False
startTime = time.monotonic()
counter = 0
fps = 0
frame = None
trackableObjects = {}
try:
while(True):
try:
imgFrame = preview.get()
track = tracklets.get()
except RuntimeError as e:
print(f"Error reading data from depthAI device: {e}")
continue # Continue to the next iteration of the loop
counter+=1
current_time = time.monotonic()
if (current_time - startTime) > 1 :
fps = counter / (current_time - startTime)
counter = 0
startTime = current_time
color = (255, 0, 0)
frame = imgFrame.getCvFrame()
height = frame.shape[0]
width = frame.shape[1]
trackletsData = track.tracklets
tracklets_counter = 0
for t in trackletsData:
to = trackableObjects.get(t.id, None)
# calculate centroid
roi = t.roi.denormalize(frame.shape[1], frame.shape[0])
x1 = int(roi.topLeft().x)
y1 = int(roi.topLeft().y)
x2 = int(roi.bottomRight().x)
y2 = int(roi.bottomRight().y)
centroid = (int((x2-x1)/2+x1), int((y2-y1)/2+y1))
newX = int((x2-x1)/2+x1)
newY = int((y2-y1)/2+y1)
# If new tracklet, save its centroid
if t.status == dai.Tracklet.TrackingStatus.NEW:
to = TrackableObject(t.id, centroid)
elif to is not None and t.status == dai.Tracklet.TrackingStatus.TRACKED:
if axis == "vertical" and not to.counted:
x = [c[0] for c in to.centroids]
direction = centroid[0] - np.mean(x)
if centroid[0] > roi_position*width and direction > 0 and np.mean(x) < roi_position*width:
objectCounter[1] += counter_multiplier
to.counted = True
elif centroid[0] < roi_position*width and direction < 0 and np.mean(x) > roi_position*width:
objectCounter[0] += counter_multiplier
to.counted = True
elif axis == "horizontal" and not to.counted:
y = [c[1] for c in to.centroids]
direction = centroid[1] - np.mean(y)
if centroid[1] > roi_position*height and direction > 0 and np.mean(y) < roi_position*height:
objectCounter[3] += counter_multiplier
to.counted = True
elif centroid[1] < roi_position*height and direction < 0 and np.mean(y) > roi_position*height:
objectCounter[2] += counter_multiplier
to.counted = True
to.centroids.append(centroid)
trackableObjects[t.id] = to
if os.environ.get('cam', None) == 'true':
if t.status != dai.Tracklet.TrackingStatus.LOST and t.status != dai.Tracklet.TrackingStatus.REMOVED:
tracklets_counter += 1
text = "ID {}".format(t.id)
cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
cv2.circle(
frame, (centroid[0], centroid[1]), 4, (255, 255, 255), -1)
label = t.label
cv2.putText(frame, str(label), (x1 + 10, y1 + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"ID: {[t.id]}", (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, t.status.name, (x1 + 10, y1 + 50), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, cv2.FONT_HERSHEY_SIMPLEX)
if os.environ.get('cam', None) == 'true':
# Draw ROI line
if axis == "vertical":
cv2.line(frame, (int(roi_position*width), 0),
(int(roi_position*width), height), (0xFF, 0, 0), 5)
elif axis == "horizontal":
cv2.line(frame, (0, int(roi_position*height)),
(width, int(roi_position*height)), (0xFF, 0, 0), 5)
# display count and status
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(frame, str(fps), (20, 20), font, 0.5, 255)
if config_direction == "left":
cv2.putText(frame, f'Left: {objectCounter[0]}', (
10, 35), font, 0.8, (0, 0, 0xFF), 2, font)
elif config_direction == "right":
cv2.putText(frame, f'Right: {objectCounter[1]}', (
10, 35), font, 0.8, (0, 0, 0xFF), 2, font)
elif config_direction == "up":
cv2.putText(frame, f'Up: {objectCounter[2]}', (
10, 35), font, 0.8, (0, 0, 0xFF), 2, font)
elif config_direction == "down":
cv2.putText(frame, f'Down: {objectCounter[3]}', (
10, 35), font, 0.8, (0, 0, 0xFF), 2, font)
if os.environ.get('cam', None) == 'true':
cv2.imshow("tracker", frame)
if cv2.waitKey(1) == ord('q'):
exit()
except Exception as e:
print(f"Inner Loop Break. {str(e)}")
# cam_disconnect_post('Inner Loop Failed.', str(e))
continue
except Exception as e:
print(f"Outer Loop Break. {str(e)}")
# cam_disconnect_post('Outer Loop Failed.', str(e))
continue