import pathlib
import cv2
import depthai
import numpy as np
import time
import collections
# Initialize the pipeline
pipeline = depthai.Pipeline()
# Set up the RGB camera
cam_rgb = pipeline.createColorCamera()
cam_rgb.setResolution(depthai.ColorCameraProperties.SensorResolution.THE_1080_P) # Set camera resolution
cam_rgb.setPreviewSize(640, 640) # Adjust as needed, here it's set to 640x640
cam_rgb.setInterleaved(False)
# Set up the YOLOv6 detection network
detection_nn = pipeline.createYoloDetectionNetwork()
detection_nn.setBlobPath('C:/Users/Administrator/Desktop/best_ckpt_openvino_2022.1_6shave.blob') # Specify your YOLOv6 blob file path
detection_nn.setConfidenceThreshold(0.6)
detection_nn.setNumClasses(5) # Set to 5 classes as per your model
detection_nn.setCoordinateSize(4)
# Define default anchors as a flat list
default_anchors = [
10, 13, 16, 30, 33, 23, # First scale
30, 61, 62, 45, 59, 119, # Second scale
116, 90, 156, 198, 373, 326 # Third scale
]
# Set the anchors and masks for YOLOv6
detection_nn.setAnchors(default_anchors)
detection_nn.setAnchorMasks({"side26": [0, 1, 2], "side13": [3, 4, 5]}) # Adjust if your model uses different masks
detection_nn.setIouThreshold(0.5)
# Set up XLinkOut for camera and neural network outputs
xout_rgb = pipeline.createXLinkOut()
xout_rgb.setStreamName("rgb")
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
# Link the camera preview to the outputs
cam_rgb.preview.link(xout_rgb.input)
cam_rgb.preview.link(detection_nn.input)
detection_nn.out.link(xout_nn.input)
# Variables for detection aggregation and analysis
frame_detection_counts = collections.defaultdict(lambda: collections.defaultdict(int)) # To store counts of each type of defect per frame
detection_active = False # Flag to indicate active detection window
start_time = None # To mark the start of the detection window
time_window = 2 # 2-second time window for detection aggregation
frame_threshold = 50 # Minimum number of frames to consider a defect valid
# Variable to simulate sensor input
SensorIn = False
# Define the mapping of numerical labels to human-readable defect names
label_map = {
0: "x",
1: "y",
2: "z",
3: "q"
}
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) \* normVals).astype(int)
def start_detection():
global detection_active, start_time
detection_active = True
start_time = time.perf_counter()
def collect_detections(detections):
global frame_detection_counts
defect_counter = collections.Counter() # Counter for current detection cycle
# Count detections for each type
for detection in detections:
defect_label = label_map.get(detection.label, "Unknown")
defect_counter[defect_label] += 1
# Update frame detection counts
for defect, count in defect_counter.items():
key = f"{count} {defect}"
frame_detection_counts[defect][key] += 1
def analyze_detections():
global frame_detection_counts
final_defects = {}
# Determine the most frequent count for each defect type
for defect, counts in frame_detection_counts.items():
# Filter out defect types that don't meet the frame threshold
valid_counts = {k: v for k, v in counts.items() if v >= frame_threshold}
if valid_counts:
max_occurrence = max(valid_counts, key=valid_counts.get) # Get the key with the highest count
final_defects[defect] = max_occurrence
return final_defects
def decide_rejection(final_defects):
# Apply rejection rules based on the final analysis of defect occurrences
for defect, occurrence in final_defects.items():
count, _ = occurrence.split(' ', 1) # Split to get the count
count = int(count)
if defect == "Crushed Tea Bag" and count >= 1:
return True
if defect == "Tag Out" and count >= 3:
return True
# Add more rules as needed
return False
for device in depthai.Device.getAllAvailableDevices():
print(f"{device.getMxId()} {device.state}")
# Start the pipeline with a specific device ID
# device_id = '14442C1091B39ECF00' # Replace with your specific device ID
# device_id = '14442C10E149D6D600'
device_id = '14442C10B17DEBCF00'
device_info = depthai.DeviceInfo(device_id)
# Manually set the device state to BOOTLOADER
device_info.state = depthai.XLinkDeviceState.X_LINK_BOOTLOADER
# Start the pipeline
with depthai.Device(pipeline, device_info) as device:
# Print device information
for device_info in depthai.Device.getAllAvailableDevices():
print(f"{device_info.getMxId()} {device_info.state}")
q_rgb = device.getOutputQueue("rgb")
q_nn = device.getOutputQueue("nn")
frame = None
detections = []
# Variables for FPS calculation
last_fps_update_time = time.time()
cam_frame_count = 0
nn_frame_count = 0
cam_fps = 0
nn_fps = 0
while True:
in_rgb = q_rgb.tryGet()
in_nn = q_nn.tryGet()
current_time = time.time()
# if in_rgb is not None:
# frame = in_rgb.getCvFrame()
# cam_frame_count += 1
if in_nn is not None:
detections = in_nn.detections
nn_frame_count += 1
# Calculate FPS every second
if current_time - last_fps_update_time >= 1.0:
cam_fps = cam_frame_count / (current_time - last_fps_update_time)
nn_fps = nn_frame_count / (current_time - last_fps_update_time)
cam_frame_count = 0
nn_frame_count = 0
last_fps_update_time = current_time
# Check for space bar to simulate sensor input
x=True
key = cv2.waitKey(1) & 0xFF
if x: # Space bar pressed
SensorIn = not SensorIn
# if SensorIn:
# print("SensorIn triggered: ON")
# else:
# print("SensorIn triggered: OFF")
if SensorIn and not detection_active:
start_detection()
if detection_active:
if (time.perf_counter() - start_time) <= time_window:
collect_detections(detections)
else:
final_defects = analyze_detections()
print("Final Defects Count:", final_defects)
if decide_rejection(final_defects):
print("Reject the product.")
else:
print("Accept the product.")
# Reset for the next detection cycle
detection_active = False
frame_detection_counts.clear()
# if frame is not None:
# for detection in detections:
# bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
# cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (255, 0, 0), 2)
# cv2.putText(frame, f"ID: {label_map.get(detection.label, 'Unknown')}", (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# # Display FPS on the frame
# cv2.putText(frame, f"Camera FPS: {cam_fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# cv2.putText(frame, f"NN FPS: {nn_fps:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# cv2.imshow("preview", frame)
if key == ord('q'): # Quit the loop
break
cv2.destroyAllWindows()