So I tried a lot of versions of this so far to make it work, but every time the performance of or the yolo detection or of the feature tracking is declining. There is something with the pipeline or with some conditions that I am doing wrong? Cus separately they are working well. And also the images displayed from the camera look a little delayed.
import sys
from pathlib import Path
import depthai as dai
import cv2
import numpy as np
import math
from collections import deque, Counter, defaultdict
# === CONFIG ===
nnPath = r"C:\Users\320286674\PycharmProjects\PythonProject\depthai-python\Philips\bst.blob"
labelMap = ["battery icon", "foam", "intense", "regular", "sensitive", "star"]
FPS = 10
# Thresholds are **per frame**
MOTION_THRESHOLD = 0.05
MIN_MOVING_FEATURES = 2
ROTATION_THRESHOLD = 0.5
ZOOM_THRESHOLD = 0.015 # ~1.5% per frame (tuned)
CHARACTERIZE_WINDOW = 3
ASSIGN_IOU_THRESHOLD = 0.4
TRACK_DROPOUT_FRAMES = 3
MAX_PAIRED_FEATS = 120
if not Path(nnPath).exists():
print(f"[ERROR] Model blob not found at: {nnPath}")
sys.exit(1)
pipeline = dai.Pipeline()
# Create nodes
cam = pipeline.createColorCamera()
yolo = pipeline.createYoloDetectionNetwork()
ftrk = pipeline.createFeatureTracker()
# Output nodes
xRgb = pipeline.createXLinkOut(); xRgb.setStreamName("rgb")
xDet = pipeline.createXLinkOut(); xDet.setStreamName("det")
xFeat = pipeline.createXLinkOut(); xFeat.setStreamName("feat")
# Camera properties
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setFps(10)
cam.setPreviewSize(640, 640)
cam.setPreviewKeepAspectRatio(False)
cam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
cam.setInterleaved(False)
cam.setVideoSize(640, 640)
# YOLO properties
yolo.setBlobPath(nnPath) # Path to your .blob model
yolo.setNumClasses(len(labelMap))
yolo.setCoordinateSize(4)
yolo.setConfidenceThreshold(0.5)
yolo.setIouThreshold(0.5)
yolo.setNumInferenceThreads(2)
yolo.input.setQueueSize(1)
yolo.input.setBlocking(False)
# FeatureTracker properties
ftrk.setHardwareResources(1, 2)
ftrk.initialConfig.setNumTargetFeatures(400)
ftrk.inputImage.setQueueSize(1)
ftrk.inputImage.setBlocking(False)
# Linking
cam.preview.link(yolo.input) # BGR for YOLO
cam.isp.link(ftrk.inputImage) # NV12 for FeatureTracker
yolo.passthrough.link(xRgb.input) # RGB output
yolo.out.link(xDet.input) # Detection output
ftrk.outputFeatures.link(xFeat.input) # Feature output
# --- Helpers ---
def draw_det(img, box, label, moving, movement_text, color=None):
x1, y1, x2, y2 = box
color = (0,0,255) if moving else (0,255,0) if color is None else color
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
cv2.putText(img, label, (x1, max(0, y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
cv2.putText(img, movement_text, (x1, min(img.shape[0]-5, y2+18)),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2, cv2.LINE_AA)
def direction_name(dx, dy):
ang = (math.degrees(math.atan2(-dy, dx)) + 360) % 360
dirs = ["right","up-right","up","up-left","left","down-left","down","down-right"]
return dirs[int(((ang + 22.5) % 360) // 45)]
def robust_rot_scale(prev_rel, curr_rel):
if len(prev_rel) < 3:
return 0.0, 0.0
H = prev_rel.T @ curr_rel
U, _, Vt = np.linalg.svd(H)
R = U @ Vt
if np.linalg.det(R) < 0:
Vt[1,:] *= -1
R = U @ Vt
Rp = (R @ prev_rel.T).T
denom = (prev_rel * prev_rel).sum()
scale = (Rp * curr_rel).sum() / denom if denom > 1e-9 else 1.0
theta = math.atan2(R[1,0], R[0,0])
return math.degrees(theta), (scale - 1.0)
def characterize_motion(curr_pts, prev_pts):
n = len(curr_pts)
if n < 3 or len(prev_pts) != n:
return "static", "no significant motion", 0.0, 0.0, 0.0
curr = np.asarray(curr_pts, np.float32)
prev = np.asarray(prev_pts, np.float32)
# 1) translation by median flow
flows = curr - prev
med_dx = float(np.median(flows[:,0])); med_dy = float(np.median(flows[:,1]))
vmag_pf = math.hypot(med_dx, med_dy)
# Optional downsample to cap cost
if n > MAX_PAIRED_FEATS:
idx = np.linspace(0, n-1, MAX_PAIRED_FEATS, dtype=int)
curr = curr[idx]; prev = prev[idx]; flows = flows[idx]
# 2) rotation & scale on centered points
pc = prev - prev.mean(axis=0, keepdims=True)
cc = curr - curr.mean(axis=0, keepdims=True)
rot_deg_pf, scale_pf = robust_rot_scale(pc, cc)
# score vs thresholds
mix_margin = 1.2
scores = [
("translate", vmag_pf / max(1e-6, MOTION_THRESHOLD)),
("rotate", abs(rot_deg_pf) / max(1e-6, ROTATION_THRESHOLD)),
("zoom", abs(scale_pf) / max(1e-6, ZOOM_THRESHOLD)),
]
scores.sort(key=lambda x: x[1], reverse=True)
best_mode, best_score = scores[0]
second_score = scores[1][1]
if best_score < 1.0:
return "static", "no significant motion", vmag_pf, rot_deg_pf, scale_pf
elif second_score >= best_score / mix_margin:
txt = f"mixed: move~{vmag_pf*FPS:.2f}px/s, rot~{abs(rot_deg_pf)*FPS:.2f}°/s, zoom~{abs(scale_pf)*100*FPS:.2f}%/s"
return "mixed", txt, vmag_pf, rot_deg_pf, scale_pf
elif best_mode == "translate":
return "translate", f"translating ~{vmag_pf*FPS:.2f}px/s {direction_name(med_dx, med_dy)}", vmag_pf, rot_deg_pf, scale_pf
elif best_mode == "rotate":
return "rotate", f"rotating ~{abs(rot_deg_pf)*FPS:.2f}°/s {'ccw' if rot_deg_pf>0 else 'cw'}", vmag_pf, rot_deg_pf, scale_pf
else:
return "zoom", f"zooming {'in' if scale_pf>0 else 'out'} ~{abs(scale_pf)*100*FPS:.2f}%/s", vmag_pf, rot_deg_pf, scale_pf
def compute_iou(boxA, boxB):
xA = max(boxA[0], boxB[0]); yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2]); yB = min(boxA[3], boxB[3])
interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
return interArea / float(boxAArea + boxBArea - interArea + 1e-6)
class Track:
def __init__(self, tid, label, box, frame_idx):
self.tid = tid; self.label = label; self.box = box
self.char_history = deque(maxlen=CHARACTERIZE_WINDOW)
self.mode_history = []
self.first_frame = frame_idx
self.last_frame = frame_idx
self.last_result = None
def add(self, mode, text, vmag_pf, rot_pf, scale_pf, box, frame_idx):
self.mode_history.append(mode)
self.char_history.append((abs(vmag_pf), abs(rot_pf), abs(scale_pf)))
self.box = box; self.last_frame = frame_idx
self.last_result = (mode, text, vmag_pf, rot_pf, scale_pf, box, frame_idx)
def summarize(self):
if not self.mode_history: return "no behavior"
c = Counter(self.mode_history); total = len(self.mode_history)
top, cnt = c.most_common(1)[0]
percent = cnt / total * 100
details = ", ".join(f"{m}({n})" for m, n in c.items())
return f"lifetime: {total}f, dominant: {top} ({percent:.0f}%), all: {details}"
def summary_str(self):
if self.last_result:
mode, text, *_ = self.last_result
return f"{self.label} (id:{self.tid}) vanished: mode={mode}, {text} | {self.summarize()}"
return f"{self.label} (id:{self.tid}) vanished: no last state."
prev_feature_positions = {}
tracks = []
next_tid = 1
# --- Sequence number sync ---
class PacketBuffer:
def __init__(self, maxlen=8):
self.bufs = defaultdict(dict)
self.maxlen = maxlen
def add(self, kind, pkt):
if hasattr(pkt, "getSequenceNum"):
seq = pkt.getSequenceNum()
self.bufs[seq][kind] = pkt
drop = [k for k in self.bufs if k < seq - self.maxlen]
for k in drop: del self.bufs[k]
def pop_synced(self):
for seq in sorted(self.bufs):
d = self.bufs[seq]
if "rgb" in d and "det" in d and "feat" in d:
out = (d["rgb"], d["det"], d["feat"])
del self.bufs[seq]
return out
return None,None,None
with dai.Device(pipeline) as device:
qRgb = device.getOutputQueue("rgb", maxSize=4, blocking=False)
qFeat = device.getOutputQueue("feat", maxSize=4, blocking=False)
qDet = device.getOutputQueue("det", maxSize=4, blocking=False)
buf = PacketBuffer(maxlen=8)
frame_i = 0
try:
while True:
for q, kind in [(qRgb,"rgb"), (qDet,"det"), (qFeat,"feat")]:
pkt = q.tryGet()
while pkt is not None:
buf.add(kind, pkt)
pkt = q.tryGet()
inRgb, inDet, inFeat = buf.pop_synced()
if None in (inRgb, inDet, inFeat):
key = cv2.waitKey(1)
if key == ord('q'): break
continue
tracked = inFeat.trackedFeatures
if not tracked:
key = cv2.waitKey(1)
if key == ord('q'): break
continue
frame = inRgb.getCvFrame()
H, W = frame.shape[:2]
xs = np.fromiter((float(f.position.x) for f in tracked), count=len(tracked), dtype=np.float32)
ys = np.fromiter((float(f.position.y) for f in tracked), count=len(tracked), dtype=np.float32)
ids = np.fromiter((int(f.id) for f in tracked), count=len(tracked), dtype=np.int64)
norm = (xs >= 0.0) & (xs <= 2.0) & (ys >= 0.0) & (ys <= 2.0)
xs = xs.copy(); ys = ys.copy()
xs[norm] *= W; ys[norm] *= H
curr_feature_positions = {int(i): (float(x), float(y)) for i, x, y in zip(ids, xs, ys)}
detections = []
if hasattr(inDet, "detections"):
for det in inDet.detections:
label = labelMap[det.label] if hasattr(det, "label") and det.label < len(labelMap) else str(det.label)
box = (int(det.xmin*W), int(det.ymin*H), int(det.xmax*W), int(det.ymax*H))
x1,y1,x2,y2 = box
mask = (xs >= x1) & (xs <= x2) & (ys >= y1) & (ys <= y2)
if not np.any(mask):
detections.append({"label": label, "box": box, "moving": False, "pct80": 0.0,
"mode":"static", "text":"no significant motion",
"vmag_pf":0.0, "rot_pf":0.0, "scale_pf":0.0})
continue
ids_sel = ids[mask]
curr_arr = np.column_stack((xs[mask], ys[mask])).astype(np.float32)
prev_list = [prev_feature_positions.get(int(fid)) for fid in ids_sel]
pair_mask = np.array([p is not None for p in prev_list], dtype=bool)
if pair_mask.sum() < 3:
detections.append({"label": label, "box": box, "moving": False, "pct80": 0.0,
"mode":"static", "text":"no significant motion",
"vmag_pf":0.0, "rot_pf":0.0, "scale_pf":0.0})
continue
prev_arr = np.array([prev_list[i] for i in range(len(prev_list)) if pair_mask[i]], dtype=np.float32)
curr_arr = curr_arr[pair_mask]
if len(curr_arr) > MAX_PAIRED_FEATS:
idx = np.linspace(0, len(curr_arr)-1, MAX_PAIRED_FEATS, dtype=int)
curr_arr = curr_arr[idx]; prev_arr = prev_arr[idx]
disps = np.linalg.norm(curr_arr - prev_arr, axis=1)
pct80 = float(np.percentile(disps, 80))
num_moving = int((disps > MOTION_THRESHOLD).sum())
moving = (pct80 > MOTION_THRESHOLD) and (num_moving >= MIN_MOVING_FEATURES)
mode, text, vmag_pf, rot_pf, scale_pf = characterize_motion(curr_arr, prev_arr)
detections.append({
"label": label, "box": box, "moving": moving, "pct80": pct80,
"mode": mode, "text": text,
"vmag_pf": vmag_pf, "rot_pf": rot_pf, "scale_pf": scale_pf
})
updated_tracks = []
matched_old_tids = set()
for det in detections:
best_iou, best_j = 0.0, None
for j, tr in enumerate(tracks):
if tr.label != det["label"]: continue
iou = compute_iou(det["box"], tr.box)
if iou > best_iou:
best_iou, best_j = iou, j
if best_iou > ASSIGN_IOU_THRESHOLD and best_j is not None:
tr = tracks[best_j]
tr.add(det["mode"], det["text"], det["vmag_pf"], det["rot_pf"], det["scale_pf"], det["box"], frame_i)
updated_tracks.append(tr)
matched_old_tids.add(tr.tid)
draw_det(frame, det["box"], f"{det['label']} (id:{tr.tid})", det["moving"], det["text"])
else:
tr = Track(next_tid, det["label"], det["box"], frame_i)
tr.add(det["mode"], det["text"], det["vmag_pf"], det["rot_pf"], det["scale_pf"], det["box"], frame_i)
updated_tracks.append(tr)
draw_det(frame, det["box"], f"{det['label']} (id:{tr.tid})", det["moving"], det["text"], color=(255,128,0))
next_tid += 1
# Bring forward unmatched-but-not-expired tracks; drop expired (and print)
for tr in tracks:
if tr.tid in matched_old_tids:
continue
age = frame_i - tr.last_frame
if age >= TRACK_DROPOUT_FRAMES:
print(tr.summary_str())
else:
updated_tracks.append(tr)
tracks = updated_tracks
cv2.imshow("YOLO + FeatureTracker - Icon Motion", frame)
prev_feature_positions = curr_feature_positions
key = cv2.waitKey(1)
if key == ord('q'):
break
frame_i += 1
except KeyboardInterrupt:
pass
cv2.destroyAllWindows()