@jakaskerl thanks for the reply.
I am working on this example with a hand gesture recognition model and would like to save a 4K full frame and cropped bb once the number "two" is detected for instance. The model is yolov8n 416p.
from auxiliary import config
from auxiliary import utils
from imutils.video import FPS
import argparse
import time
import cv2
import depthai as dai
import json
from pathlib import Path
import numpy as np
import os
import csv
import datetime
print("[INFO] initializing a depthai camera pipeline...")
def create_camera_pipeline(config_path, model_path):
# initialize a depthai pipeline
pipeline = dai.Pipeline()
# load model config file and fetch nn_config parameters
print("[INFO] loading model config...")
configPath = Path(config_path)
model_config = load_config(configPath)
nnConfig = model_config.get("nn_config", {})
print("[INFO] extracting metadata from model config...")
# using nnConfig extract metadata like classes,
# iou and confidence threshold, number of coordinates
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", {})
confidenceThreshold = metadata.get("confidence_threshold", {})
# output of metadata - feel free to tweak the threshold parameters
#{'classes': 5, 'coordinates': 4, 'anchors': [], 'anchor_masks': {},
# 'iou_threshold': 0.5, 'confidence_threshold': 0.5}
print(metadata)
print("[INFO] configuring source and outputs...")
# define sources and outputs
# since OAK's camera is used in this pipeline
# a color camera node is defined
camRgb = pipeline.create(dai.node.ColorCamera)
# create a Yolo detection node
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
xoutRgb = pipeline.create(dai.node.XLinkOut)
# create a XLinkOut node for getting the detection results to host
nnOut = pipeline.create(dai.node.XLinkOut)
print("[INFO] setting stream names for queues...")
# set stream names used in queue to fetch data when the pipeline is started
xoutRgb.setStreamName("rgb")
nnOut.setStreamName("nn")
print("[INFO] setting camera properties...")
# setting camera properties like the output preview size,
# camera resolution, color channel ordering and FPS
camRgb.setPreviewSize(config.CAMERA_PREV_DIM)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(40)
print("[INFO] setting YOLO network properties...")
# network specific settings - parameters read from config file
# confidence and iou threshold, classes, coordinates are set
# most important the model .blob file is used to load weights
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setBlobPath(model_path)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
print("[INFO] creating links...")
camRgb.preview.link(detectionNetwork.input)
detectionNetwork.passthrough.link(xoutRgb.input)
detectionNetwork.out.link(nnOut.input)
# return the pipeline to the calling function
return pipeline
def load_config(config_path):
# open the config file and load using json module
with config_path.open() as f:
config = json.load(f)
return config
def annotateFrame(frame, detections, model_name):
# loops over all detections in a given frame
# annotates the frame with model name, class label,
# confidence score, and draw bounding box on the object
color = (0, 0, 255)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.putText(frame, model_name, (20, 40), cv2.FONT_HERSHEY_TRIPLEX, 1,color)
cv2.putText(frame, config.LABELS[detection.label], (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 1,color)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60),
cv2.FONT_HERSHEY_TRIPLEX, 1, color)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
return frame
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
# resize the image array and modify the channel dimensions
resized = cv2.resize(arr, shape)
return resized.transpose(2, 0, 1)
def frameNorm(frame, bbox):
# nn data, being the bounding box locations, are in <0..1> range
# normalized them with frame width/height
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
pipeline = create_camera_pipeline(config_path=config.YOLOV8N_CONFIG,
model_path=config.YOLOV8N_MODEL)
output_video = config.OUTPUT_VIDEO_YOLOv8n
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out = cv2.VideoWriter(
output_video,
fourcc,
20.0,
config.CAMERA_PREV_DIM
)
collected_data=[]
# pipeline defined, now the device is assigned and pipeline is started
with dai.Device(pipeline) as device:
qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
frame = None
startTime = time.monotonic()
fps = FPS().start()
counter = 0
color2 = (255, 255, 255)
print("[INFO] starting inference with OAK camera...")
while True:
# fetch the RGB frames and YOLO detections for the frame
inRgb = qRgb.get()
inDet = qDet.get()
if inRgb is not None:
# convert inRgb output to a format OpenCV library can work
frame = inRgb.getCvFrame()
frame_non = inRgb.getCvFrame()
# annotate the frame with FPS information
cv2.putText(frame, "NN fps: {:.2f}".format(counter / (time.monotonic() - startTime)),
(2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.8, color2)
# update the FPS counter
fps.update()
if inDet is not None:
# if inDet is not none, fetch all the detections for a frame
detections = inDet.detections
counter += 1
if frame is not None:
# annotate frame with detection results
#frame = annotateFrame(frame, detections, "yolov8n")
color = (0, 0, 255)
# frame_non=frame
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
#cv2.putText(frame, model_name, (20, 40), cv2.FONT_HERSHEY_TRIPLEX, 1,color)
cv2.putText(frame, config.LABELS[detection.label], (bbox[0] + 10, bbox[1] + 25), cv2.FONT_HERSHEY_TRIPLEX, 1,color)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 60),
cv2.FONT_HERSHEY_TRIPLEX, 1, color)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
class_name=config.LABELS[detection.label]
x,y,w,h=bbox[0], bbox[1], bbox[2], bbox[3]
conf=int(detection.confidence * 100)
current_time = datetime.datetime.now()
collected_data.append({
'timestamp': current_time.strftime('%Y-%m-%d %H:%M:%S.%f'),
"Confidence":conf,
'label': str(class_name),
'detections': [
{
'x1': int(x),
'y1': int(y),
'x2': int(w),
'y2': int(h)
}
]
})
if str(class_name)=="Two":
cropped_object = frame_non[int(y):int(h), int(x):int(w)]
# Create a folder based on the current timestamp
timestamp_folder = time.strftime("%Y%m%d_%H%M%S", time.localtime())
os.makedirs("Records/"+timestamp_folder, exist_ok=True)
# Save the cropped object with ID and class name as the filename
filename = f"{class_name}_.jpg"
filename_fm = f"{class_name}_fullframe.jpg"
filepath = os.path.join("Records/"+timestamp_folder, filename)
filepath_fm = os.path.join("Records/"+timestamp_folder, filename_fm)
cv2.imwrite(filepath, cropped_object)
cv2.imwrite(filepath_fm,frame_non)
print(f"Saved: {filepath}")
print(f"Saved: {filepath_fm}")
cv2.putText(frame, "Image Saved ", (10,20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
# last_capture_time_crp= current_time_crp
# display the frame with gesture output on the screen
cv2.imshow("Infrence", frame)
# write the annotated frame to the file
out.write(frame)
# break out of the while loop if `q` key is pressed
if cv2.waitKey(1) == ord('q'):
break
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
out.release()
cv2.destroyAllWindows()
data=collected_data
current_time_d= datetime.datetime.now()
timec=current_time_d.strftime('%Y-%m-%d %H:%M:%S.%f')
timestamp_for_filename =timec.replace(':', '_').replace('.', '_')
csv_file_path = f'{timestamp_for_filename}_output.csv'
try:
with open(csv_file_path, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f'CSV file "{csv_file_path}" created successfully.')
except OSError as e:
print(f'Error creating CSV file: {e}')
My first idea was to run the NN on the grayscale frames and get the 4K from the RGB cam somehow. However, when I came across this example, I started to test with different camRgb resolutions camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
and camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
to try and see how the FPS changed and got these results:
4K

1080p

So this means I am actually getting the 4K frames at some point correct? How can I get that full frame saved or even displayed while still running the 416p NN? I am only able to save the 416 detection at the moment.


Appreciate your help, thanks a lot.