KlemenSkrlj Thank you for the response. The ReadMe for this repo says that for real time application it will only work on RVC4. I tried the following on RVC2 and I only saw detection. Also, I saw sometimes the Jupyter Notebook's kernel will crash which indicates to me that the device is not capable of running both models at the same time.:
import depthai as dai
from depthai_nodes.node import (
ParsingNeuralNetwork,
ImgDetectionsBridge,
GatherData,
ImgDetectionsFilter,
)
# Your model paths
DETECTION_MODEL_PATH = r"C:\Users\ssharm21\depthai-core\depthai-ml-training\conversion\best.rvc2.tar.xz"
CLASSIFICATION_MODEL_PATH = r"C:\Users\ssharm21\depthai-core\depthai-ml-training\conversion\MobileNetV2 RVC2 Compatible Attribute Classifier.rvc2.tar.xz"
DEVICE = "169.254.1.222"
PADDING = 0.1 # Add padding around detected objects
device = dai.Device(dai.DeviceInfo(DEVICE)) if DEVICE else dai.Device()
platform = device.getPlatform()
img_frame_type = dai.ImgFrame.Type.BGR888i if platform.name == "RVC4" else dai.ImgFrame.Type.BGR888p
visualizer = dai.RemoteConnection(httpPort=8082)
print(f"Platform: {platform.name}")
with dai.Pipeline(device) as pipeline:
print("Creating detection + classification pipeline...")
# === CAMERA ===
cam = pipeline.create(dai.node.Camera).build()
camera_out = cam.requestOutput((512, 288), type=img_frame_type, fps=15)
# === DETECTION MODEL ===
detection_archive = dai.NNArchive(DETECTION_MODEL_PATH)
detection_nn = pipeline.create(ParsingNeuralNetwork).build(
camera_out,
detection_archive
)
detection_nn.input.setBlocking(False)
detection_nn.input.setMaxSize(1)
# Detection bridge
detection_label_encoding = {k: v for k, v in enumerate(detection_archive.getConfig().model.heads[0].metadata.classes)}
detection_bridge = pipeline.create(ImgDetectionsBridge).build(detection_nn.out)
detection_bridge.setLabelEncoding(detection_label_encoding)
# === OPTIONAL: FILTER SPECIFIC CLASSES ===
# If you want to classify only specific detected objects
# valid_labels = [0, 1, 2] # Bear_nest, Lot_box, cassette - adjust as needed
# detections_filter = pipeline.create(ImgDetectionsFilter).build(
# detection_nn.out, labels_to_keep=valid_labels
# )
# === SCRIPT NODE FOR CROPPING ===
script_node = pipeline.create(dai.node.Script)
detection_nn.out.link(script_node.inputs["det_in"])
detection_nn.passthrough.link(script_node.inputs["preview"])
# Script to generate crop configurations
script_content = f"""
import time
def generate_crops(detections, img_width, img_height, target_width, target_height, padding):
crops = []
for detection in detections.detections:
# Get bounding box
x1 = max(0, detection.xmin - padding)
y1 = max(0, detection.ymin - padding)
x2 = min(1, detection.xmax + padding)
y2 = min(1, detection.ymax + padding)
# Create crop config
cfg = ImageManipConfig()
cfg.setCropRect(x1, y1, x2, y2)
cfg.setResize({224}, {224}) # Classification model input size
crops.append(cfg)
return crops
while True:
try:
detections = node.io['det_in'].get()
frame = node.io['preview'].get()
if detections is not None and frame is not None:
crops = generate_crops(
detections,
frame.getWidth(),
frame.getHeight(),
224, 224, # Classification input size
{PADDING}
)
# Send crops one by one
for i, crop_cfg in enumerate(crops):
node.io['manip_cfg'].send(crop_cfg)
node.io['manip_img'].send(frame)
except:
pass
"""
script_node.setScript(script_content)
# === IMAGE CROPPER ===
crop_node = pipeline.create(dai.node.ImageManip)
crop_node.initialConfig.setOutputSize(224, 224) # Classification input size
crop_node.inputConfig.setWaitForMessage(True)
script_node.outputs["manip_cfg"].link(crop_node.inputConfig)
script_node.outputs["manip_img"].link(crop_node.inputImage)
# === CLASSIFICATION MODEL ===
classification_archive = dai.NNArchive(CLASSIFICATION_MODEL_PATH)
classification_nn = pipeline.create(ParsingNeuralNetwork).build(
crop_node.out,
classification_archive
)
classification_nn.input.setBlocking(False)
classification_nn.input.setMaxSize(1)
# === SYNCHRONIZATION ===
# Sync classification results with detections
gather_data_node = pipeline.create(GatherData).build(camera_fps=15)
classification_nn.out.link(gather_data_node.input_data)
detection_bridge.out.link(gather_data_node.input_reference)
# === VISUALIZATION ===
visualizer.addTopic("Video", detection_nn.passthrough, "images")
visualizer.addTopic("Detections", detection_bridge.out, "detections")
visualizer.addTopic("Cropped_Objects", crop_node.out, "images")
visualizer.addTopic("Classifications", classification_nn.out, "classifications")
visualizer.addTopic("Synced_Results", gather_data_node.out, "detections")
print(f"Detection classes: {detection_label_encoding}")
print(f"Classification classes: {list(classification_archive.getConfig().model.heads[0].metadata.classes)}")
print("Open http://localhost:8082 to view results")
pipeline.start()
visualizer.registerPipeline(pipeline)
while pipeline.isRunning():
key = visualizer.waitKey(1)
if key == ord("q"):
print("Got q key. Exiting...")
break