I try and make an application that tries to analyze the behavior of an image, if it is moving, and what image is. I made a program that is performing object detection, and based on if it is finding the object start analyzing if it moving or not. It is working for one or two animations in a run, but then my Luxonis Oak-1 is blocking, and I have to turn off the running and start another one, and that is not ideal for the future usage of my application. I keep try and switch and modify functions, but if I fix something the accuracy of the yolo model, or the feature tracking, keeps declining or is not working at all.
#!/usr/bin/env python3
"""
YOLOv8 + FeatureTracker — minimal viz & minimal prints
On-screen:
• Only bounding boxes (green if moving, red if not)
Console:
• "<icon name>: moving=<True/False>"
Press 'q' to quit.
"""
import sys, time
from pathlib import Path
from collections import deque, defaultdict
import cv2
import depthai as dai
import numpy as np
# ==== CONFIG =================================================================
nnPath = r"C:\Users\320286674\PycharmProjects\PythonProject\depthai-python\bst.blob"
if not Path(nnPath).exists():
print(f"[ERROR] Model blob not found at:\n{nnPath}")
sys.exit(1)
labelMap = ["battery icon", "foam", "star"]
FPS = 15
# Subpart motion (cell grid) — MORE SENSITIVE (your current settings)
CELL_NX = 2
CELL_NY = 2
MOVE_PIX_THR = 0.12
EMA_ALPHA = 0.50
CELL_MIN_MOVERS = 1
DEBOUNCE_FRAMES = 2
MIN_FEATS_FOR_DRIFT = 3
# Dense Optical Flow fallback — MORE EAGER
FLOW_FALLBACK = True
FLOW_MIN_FEATS = 8
FLOW_EVERY_N = 1
FLOW_WINSIZE = 21
FLOW_LEVELS = 3
FLOW_TOP_PCT = 90
FLOW_THR = 0.22
FLOW_DELTA = 0.03
TEXTURE_GRAD_THR = 14.0
TEXTURE_MIN_FRAC = 0.02
ROI_DOWNSCALE = 0.5
# Object-level decision
MIN_MOVING_CELLS_OBJECT = 1
# YOLO thresholds
YOLO_CONF = 0.70
YOLO_IOU = 0.70
KEEP_DET_FOR = 6
# Printing control
PRINT_VALUES = True # set False to silence console output
PRINT_THROTTLE = 1 # print every frame
# ==== PIPELINE ===============================================================
pipeline = dai.Pipeline()
cam = pipeline.create(dai.node.ColorCamera)
toGray = pipeline.create(dai.node.ImageManip)
yolo = pipeline.create(dai.node.YoloDetectionNetwork)
tracker = pipeline.create(dai.node.FeatureTracker)
xoutRgb = pipeline.create(dai.node.XLinkOut)
xoutDet = pipeline.create(dai.node.XLinkOut)
xoutFeat = pipeline.create(dai.node.XLinkOut)
xoutRgb.setStreamName("rgb")
xoutDet.setStreamName("det")
xoutFeat.setStreamName("feat")
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setFps(FPS)
cam.setPreviewSize(640, 640)
cam.setPreviewKeepAspectRatio(True)
cam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
cam.setInterleaved(False)
cam.preview.link(yolo.input)
toGray.initialConfig.setFrameType(dai.ImgFrame.Type.GRAY8)
cam.preview.link(toGray.inputImage)
toGray.out.link(tracker.inputImage)
tracker.setHardwareResources(1, 2)
tracker.initialConfig.setNumTargetFeatures(200)
yolo.setBlobPath(nnPath)
yolo.setNumClasses(len(labelMap))
yolo.setCoordinateSize(4)
yolo.setConfidenceThreshold(YOLO_CONF)
yolo.setIouThreshold(YOLO_IOU)
yolo.setNumInferenceThreads(2)
yolo.input.setQueueSize(1)
yolo.input.setBlocking(False)
yolo.passthrough.link(xoutRgb.input)
yolo.out.link(xoutDet.input)
yolo.setAnchors([])
yolo.setAnchorMasks({
})
# F
tracker.outputFeatures.link(xoutFeat.input)
# ==== HELPERS ================================================================
def pixel_bboxes(dets, w, h, allow_labels=None, pad_px=1):
out = []
for d in dets:
if allow_labels is not None and d.label not in allow_labels:
continue
x1 = int(np.clip(d.xmin, 0, 1) * w) - pad_px
y1 = int(np.clip(d.ymin, 0, 1) * h) - pad_px
x2 = int(np.clip(d.xmax, 0, 1) * w) + pad_px
y2 = int(np.clip(d.ymax, 0, 1) * h) + pad_px
out.append((x1, y1, x2, y2, d.label))
return out
def inside(px, py, rect):
x1, y1, x2, y2 = rect
return (x1 <= px <= x2) and (y1 <= py <= y2)
def split_box_cells(x1, y1, x2, y2, nx=CELL_NX, ny=CELL_NY):
w = max(1, x2 - x1); h = max(1, y2 - y1)
cell_w = w / nx; cell_h = h / ny
cells = []
for j in range(ny):
for i in range(nx):
cx1 = int(round(x1 + i * cell_w))
cy1 = int(round(y1 + j * cell_h))
cx2 = int(round(x1 + (i + 1) * cell_w))
cy2 = int(round(y1 + (j + 1) * cell_h))
cells.append((cx1, cy1, cx2, cy2))
def idx_of(px, py):
i = int((px - x1) / cell_w); j = int((py - y1) / cell_h)
if i < 0 or j < 0 or i >= nx or j >= ny:
return -1
return j * nx + i
return cells, idx_of
def hanning_window(shape):
h, w = shape
if h < 2 or w < 2:
return None
try:
return cv2.createHanningWindow((w, h), cv2.CV_32F)
except Exception:
wy = np.hanning(h).astype(np.float32)
wx = np.hanning(w).astype(np.float32)
return np.outer(wy, wx)
def align_prev_roi(prev_roi, curr_roi):
if prev_roi.shape != curr_roi.shape:
return prev_roi, (0.0, 0.0)
pr = prev_roi.astype(np.float32)
cr = curr_roi.astype(np.float32)
win = hanning_window(pr.shape)
try:
if win is not None:
(dx, dy), _ = cv2.phaseCorrelate(pr, cr, win)
else:
(dx, dy), _ = cv2.phaseCorrelate(pr, cr)
except Exception:
dx = dy = 0.0
M = np.array([[1, 0, dx], [0, 1, dy]], dtype=np.float32)
warped = cv2.warpAffine(prev_roi, M, (prev_roi.shape[1], prev_roi.shape[0]),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
return warped, (dx, dy)
def per_cell_flow_scores(prev_roi, curr_roi, nx, ny, top_pct=FLOW_TOP_PCT, scale_factor=1.0):
flow = cv2.calcOpticalFlowFarneback(
prev_roi, curr_roi, None,
pyr_scale=0.5, levels=FLOW_LEVELS, winsize=FLOW_WINSIZE,
iterations=3, poly_n=5, poly_sigma=1.2, flags=0
)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
if scale_factor != 1.0:
mag *= (1.0 / scale_factor)
H, W = mag.shape
cw = W / nx; ch = H / ny
scores = []
for j in range(ny):
for i in range(nx):
x1 = int(round(i * cw)); x2 = int(round((i + 1) * cw))
y1 = int(round(j * ch)); y2 = int(round((j + 1) * ch))
cell = mag[y1:y2, x1:x2]
if cell.size == 0:
scores.append(0.0); continue
t = np.percentile(cell, top_pct)
top = cell[cell >= t]
s = float(top.mean()) if top.size else 0.0
scores.append(s)
return scores
def texture_fraction(img, thr=TEXTURE_GRAD_THR):
gx = cv2.Sobel(img, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(img, cv2.CV_32F, 0, 1, ksize=3)
mag = cv2.magnitude(gx, gy)
return float((mag > thr).mean())
# ==== RUNTIME ================================================================
with dai.Device(pipeline) as device:
qRgb = device.getOutputQueue("rgb", maxSize=3, blocking=False)
qDet = device.getOutputQueue("det", maxSize=3, blocking=False)
qFeat = device.getOutputQueue("feat", maxSize=3, blocking=False)
feat_last = {} # fid -> (xf, yf)
feat_ema = defaultdict(float) # fid -> EMA residual
cell_sticky = defaultdict(int) # ((box_key), cell_idx) -> counter
prev_gray_full = None
frame_i = 0
last_box_by_class = {} # cls -> (x1,y1,x2,y2)
miss_count_by_class = defaultdict(int)
while True:
inRgb = qRgb.tryGet()
inDet = qDet.tryGet()
inFeat = qFeat.tryGet()
if inRgb is None:
if cv2.waitKey(1) == ord('q'):
break
continue
frame = inRgb.getCvFrame()
h, w = frame.shape[:2]
gray_full = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
dets = inDet.detections if inDet is not None else []
feats = inFeat.trackedFeatures if inFeat is not None else []
# Build current boxes in pixels
cur_boxes = pixel_bboxes(dets, w, h)
# Keep last box per class for a few frames
seen_classes_now = set()
smoothed_boxes = []
for (x1, y1, x2, y2, cls) in cur_boxes:
last_box_by_class[cls] = (x1, y1, x2, y2)
miss_count_by_class[cls] = 0
seen_classes_now.add(cls)
smoothed_boxes.append((x1, y1, x2, y2, cls))
for cls, last_box in list(last_box_by_class.items()):
if cls not in seen_classes_now:
miss_count_by_class[cls] += 1
if miss_count_by_class[cls] <= KEEP_DET_FOR:
(x1, y1, x2, y2) = last_box
smoothed_boxes.append((x1, y1, x2, y2, cls))
else:
last_box_by_class.pop(cls, None)
miss_count_by_class.pop(cls, None)
boxes = smoothed_boxes
# Current feature positions
cur_feat_pos = {f.id: (float(f.position.x), float(f.position.y)) for f in feats}
# Per-detection motion + minimal visualization
for (x1, y1, x2, y2, cls) in boxes:
cells, idx_of = split_box_cells(x1, y1, x2, y2)
movers_per_cell = [0] * len(cells)
# Feature steps inside box
steps, inside_fids = [], []
for fid, (xf, yf) in cur_feat_pos.items():
if not inside(xf, yf, (x1, y1, x2, y2)):
continue
prev = feat_last.get(fid)
if prev is None:
continue
dx = xf - prev[0]; dy = yf - prev[1]
steps.append((dx, dy))
inside_fids.append((fid, xf, yf, dx, dy))
# Drift (median)
if len(steps) >= MIN_FEATS_FOR_DRIFT:
dxs = np.array([s[0] for s in steps], dtype=np.float32)
dys = np.array([s[1] for s in steps], dtype=np.float32)
drift_dx = float(np.median(dxs))
drift_dy = float(np.median(dys))
else:
drift_dx = drift_dy = 0.0
# Movers via EMA
for (fid, xf, yf, dx, dy) in inside_fids:
rdx = dx - drift_dx
rdy = dy - drift_dy
res_mag = (rdx * rdx + rdy * rdy) ** 0.5
ema = feat_ema[fid] = (1.0 - EMA_ALPHA) * feat_ema[fid] + EMA_ALPHA * res_mag
if not (ema >= MOVE_PIX_THR or res_mag >= MOVE_PIX_THR * 2.0):
continue
ci = idx_of(xf, yf)
if ci >= 0:
movers_per_cell[ci] += 1
# Sticky debounce per-cell
bx = (x1 + x2) // 2; by = (y1 + y2) // 2
box_key = (int(bx/16), int(by/16), int((x2-x1)/16), int((y2-y1)/16), cls)
for ci, count in enumerate(movers_per_cell):
cell_key = (box_key, ci)
if count >= CELL_MIN_MOVERS:
cell_sticky[cell_key] = min(cell_sticky[cell_key] + 1, DEBOUNCE_FRAMES)
else:
cell_sticky[cell_key] = max(cell_sticky[cell_key] - 1, 0)
# Flow fallback (throttled)
do_flow = (
FLOW_FALLBACK and
prev_gray_full is not None and
len(inside_fids) < FLOW_MIN_FEATS and
(frame_i % FLOW_EVERY_N == 0)
)
if do_flow:
rx1, ry1 = max(0, x1), max(0, y1)
rx2, ry2 = min(w, x2), min(h, y2)
if rx2 > rx1 + 6 and ry2 > ry1 + 6:
prev_roi = prev_gray_full[ry1:ry2, rx1:rx2]
curr_roi = gray_full[ry1:ry2, rx1:rx2]
if prev_roi.shape == curr_roi.shape and texture_fraction(curr_roi) >= TEXTURE_MIN_FRAC:
scale = float(ROI_DOWNSCALE)
if scale != 1.0:
prev_small = cv2.resize(prev_roi, (0,0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
curr_small = cv2.resize(curr_roi, (0,0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
else:
prev_small, curr_small = prev_roi, curr_roi
Hs, Ws = curr_small.shape[:2]
if Hs >= 2 and Ws >= 2:
aligned_prev, _ = align_prev_roi(prev_small, curr_small)
else:
aligned_prev = prev_small
flow_scores = per_cell_flow_scores(
aligned_prev, curr_small, CELL_NX, CELL_NY, FLOW_TOP_PCT, scale_factor=scale
)
med_score = float(np.median(flow_scores)) if flow_scores else 0.0
for ci, s in enumerate(flow_scores or []):
cell_key = (box_key, ci)
if s >= max(FLOW_THR, med_score + FLOW_DELTA):
cell_sticky[cell_key] = min(cell_sticky[cell_key] + 1, DEBOUNCE_FRAMES)
else:
cell_sticky[cell_key] = max(cell_sticky[cell_key] - 1, 0)
# Decide moving/not
moving_cells = [ci for ci in range(len(cells)) if cell_sticky[(box_key, ci)] >= 1]
any_moving = (len(moving_cells) >= MIN_MOVING_CELLS_OBJECT)
# ---- Minimal visualization: ONLY bounding box (green/red)
color_obj = (0, 255, 0) if any_moving else (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color_obj, 2)
# ---- Minimal printing: "<name>: moving=<True/False>"
if PRINT_VALUES and (frame_i % PRINT_THROTTLE == 0):
label = labelMap[cls] if cls < len(labelMap) else str(cls)
print(f"{label}: moving={any_moving}")
# Update per-feature last positions AFTER using them
for fid, (xf, yf) in {f.id: (float(f.position.x), float(f.position.y)) for f in feats}.items():
feat_last[fid] = (xf, yf)
# Display
cv2.imshow("YOLOv8 + FeatureTracker | minimal", frame)
if cv2.waitKey(1) == ord('q'):
break
prev_gray_full = gray_full
frame_i += 1
cv2.destroyAllWindows()