This is a modified version of the license plate OCR example. I'm reading producer id "tattoos" placed on pigs that are processed at the facility where I work. I use one MobileNet to detect the tattoo and pass the roi to another MobileNet that locates the individual characters and sorts them left to right. Works great on still images, but moving object are where I experience the problem. `import argparse
import numpy as np # numpy - manipulate the packet data returned by depthai
import cv2 # opencv - display the video stream
import depthai # depthai - access the camera and its data packets
from pathlib import Path
import os # os for file operations
import time
import threading
import PyPigVision as ppv
from PyPigVision import file_handling as fh
from PyPigVision import tattoo_handling as th
from pylogix import PLC
parser = argparse.ArgumentParser()
parser.add_argument('-cam', '--camera', action="store_true",
help="Use DepthAI 4K RGB camera for inference (conflicts with -vid)")
parser.add_argument('-vid', '--video', type=str,
help="Path to video file to be used for inference (conflicts with -cam)")
parser.add_argument('-rot', '--rotate', action="store_true",
help="Rotates video 90° so camera can be mounted normally and accomodate a network that is trained with horizontal images.")
parser.add_argument('-save', '--Save_Data', type=str,
help="Write data to file")
parser.add_argument('-plc', '--Use_PLC', action="store_true",
help="Write data to PLC")
parser.add_argument('-ocr_box', '--display_OCR_bboxes', action="store_true",
help="Display OCR images with bboxes (creates images that cant be used for training)")
args = parser.parse_args()
if not args.camera and not args.video:
raise RuntimeError(
"No source selected. Use either \"-cam\" to run on RGB camera as a source or \"-vid <path>\" to run on video"
)
Handle frames per second
class FPSHandler:
def init(self, cap=None):
self.timestamp = time.time()
self.start = time.time()
self.framerate = cap.get(cv2.CAP_PROP_FPS) if cap is not None else None
self.frame_cnt = 0
self.ticks = {}
self.ticks_cnt = {}
def next_iter(self):
if not args.camera:
frame_delay = 1.0 / self.framerate
delay = (self.timestamp + frame_delay) - time.time()
if delay > 0:
time.sleep(delay)
self.timestamp = time.time()
self.frame_cnt += 1
def tick(self, name):
if name in self.ticks:
self.ticks_cnt[name] += 1
else:
self.ticks[name] = time.time()
self.ticks_cnt[name] = 0
def tick_fps(self, name):
if name in self.ticks:
if (time.time() - self.ticks[name]) > 0:
return self.ticks_cnt[name] / (time.time() - self.ticks[name])
else:
return 0
else:
return 0
def fps(self):
if (self.timestamp - self.start) > 0:
return self.frame_cnt / (self.timestamp - self.start)
else:
return 0
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def to_planar(arr: np.ndarray, shape: tuple) -> list: # Could possibly be how you have to load images into nn
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()
Handles loading images into a frame sequence queue dictionary
def get_frame():
global frame_det_seq
if args.video:
read_correctly, frame = cap.read()
if read_correctly:
frame_seq_map[frame_det_seq] = frame
frame_det_seq += 1
return read_correctly, frame
else:
in_rgb = cam_out.get()
frame = in_rgb.getCvFrame()
frame_seq_map[in_rgb.getSequenceNum()] = frame
return True, frame
if args.camera:
fps = FPSHandler()
else:
cap = cv2.VideoCapture(str(Path(args.video).resolve().absolute()))
fps = FPSHandler(cap)
define image storage variables
frame_seq_map = {}
frame_det_seq = 0
tattoo_detections = []
rec_results = []
tat_last_seq = 0
tat_last_img = np.empty([300, 300, 3])
labels = {1: '0', 2: '1', 3: '2', 4: '3', 5: '4', 6: '6', 7: '7', 8: '8', 9: '9', 10: 'X'}
decoded_text = ""
running = True
carc_dict = {}
found = 0
read = 0
offset = 20
chars_thresh = 4
pipeline = depthai.Pipeline() # create blank pipeline
if args.camera:
cam_rgb = pipeline.create(depthai.node.ColorCamera) # create color camera object
cam_rgb.setPreviewSize(576, 576) # set camera preview size
cam_rgb.setInterleaved(False)
cam_rgb.initialControl.setManualFocus(110)
cam_rgb.initialControl.setAutoWhiteBalanceMode(depthai.CameraControl.AutoWhiteBalanceMode.AUTO)
--DETECTION--
det_nn = pipeline.createMobileNetDetectionNetwork() # create tattoo detection mobilenet network
det_nn.setBlobPath("C:\Luxonis\DETECT_BLOBS\Detect_2_17_2022.blob") # configure path to blob
det_nn.setConfidenceThreshold(0.5) # set confidence threshold
det_nn.input.setQueueSize(1)
det_nn.input.setBlocking(False)
if args.rotate:
if args.camera:
manipRgb = pipeline.createImageManip()
rgbRr = depthai.RotatedRect()
rgbRr.center.x, rgbRr.center.y = cam_rgb.getPreviewWidth() // 2, cam_rgb.getPreviewHeight() // 2
rgbRr.size.width, rgbRr.size.height = cam_rgb.getPreviewHeight(), cam_rgb.getPreviewWidth()
rgbRr.angle = 90
manipRgb.initialConfig.setCropRotatedRect(rgbRr, False)
cam_rgb.preview.link(manipRgb.inputImage)
# Resize camera preview and map it to tattoo detection nn input
manip = pipeline.createImageManip()
manip.initialConfig.setResize(300, 300)
manip.initialConfig.setFrameType(depthai.RawImgFrame.Type.RGB888p)
manipRgb.out.link(manip.inputImage)
manip.out.link(det_nn.input)
# Create output que for rgb camera images
cam_xout = pipeline.createXLinkOut()
cam_xout.setStreamName("cam_out")
manipRgb.out.link(cam_xout.input)
else:
det_xin = pipeline.createXLinkIn()
det_xin.setStreamName("det_in")
det_xin.out.link(det_nn.input)
else:
if args.camera:
# Resize camera preview and map it to tattoo detection nn input
manip = pipeline.createImageManip()
manip.initialConfig.setResize(300, 300)
manip.initialConfig.setFrameType(depthai.RawImgFrame.Type.RGB888p)
cam_rgb.preview.link(manip.inputImage)
manip.out.link(det_nn.input)
# Create output que for rgb camera images
cam_xout = pipeline.createXLinkOut()
cam_xout.setStreamName("cam_out")
cam_rgb.preview.link(cam_xout.input)
else:
det_xin = pipeline.createXLinkIn()
det_xin.setStreamName("det_in")
det_xin.out.link(det_nn.input)
--OCR--
rec_nn = pipeline.createMobileNetDetectionNetwork() # create tattoo ocr mobilenet network
rec_nn.setBlobPath("C:\Luxonis\READ_BLOBS\read_2_16_2022.blob") # configure path to blob
rec_nn.setConfidenceThreshold(0.4) # set confidence threshold
rec_nn.input.setQueueSize(1)
rec_nn.input.setBlocking(False)
rec_xin = pipeline.createXLinkIn()
rec_xin.setStreamName("rec_in")
rec_xin.out.link(rec_nn.input)
Create output queue for tattoo detection nn detections
det_nn_xout = pipeline.createXLinkOut()
det_nn_xout.setStreamName("det_nn")
det_nn.out.link(det_nn_xout.input)
Create output queue for tattoo detection nn image passthrough
det_pass = pipeline.createXLinkOut()
det_pass.setStreamName("det_pass")
det_nn.passthrough.link(det_pass.input)
Create output queue for tattoo ocr nn detections
rec_xout = pipeline.createXLinkOut()
rec_xout.setStreamName("rec_nn")
rec_nn.out.link(rec_xout.input)
Create output queue for tattoo ocr nn image passthrough
rec_pass = pipeline.createXLinkOut()
rec_pass.setStreamName("rec_pass")
rec_nn.passthrough.link(rec_pass.input)
Unloads tattoo detection nn output q and loads results into OCR nn input
def detect_thread(det_queue, det_pass, rec_queue):
global tattoo_detections, tat_last_seq, tat_last_img, found
while running:
try:
in_det = det_queue.get().detections
in_pass = det_pass.get()
orig_frame = frame_seq_map.get(in_pass.getSequenceNum(), None)
tat_last_img = orig_frame
if orig_frame is None:
continue
tat_last_seq = in_pass.getSequenceNum()
tattoo_detections = in_det
for detection in tattoo_detections:
bbox = frameNorm(orig_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cropped_frame = orig_frame[bbox[1] - offset:bbox[3] + offset, bbox[0] - offset:bbox[2] + offset]
found += 1
shape = cropped_frame.shape
if shape[0] > 0 and shape[1] > 0:
tstamp = time.monotonic()
img = depthai.ImgFrame()
img.setTimestamp(tstamp)
img.setType(depthai.RawImgFrame.Type.BGR888p)
img.setData(to_planar(cropped_frame, (300, 300)))
img.setWidth(300)
img.setHeight(300)
carc_dict[img.getSequenceNum()] = [bbox, orig_frame]
rec_queue.send(img)
fps.tick('detect')
except RuntimeError:
continue
Loads cropped tattoo images from queue loaded by the tattoo detection nn
def rec_thread(q_rec, q_pass):
global rec_results, decoded_text, read
with PLC('10.166.137.120') as comm:
box_color_rec = (205, 0, 0)
while running:
try:
# Get detections from queue of cropped frames from tattoo detection nn
rec_data = q_rec.get().detections
rec_frame = q_pass.get().getCvFrame()
seq = q_pass.get().getSequenceNum()
char_detections = [detection for detection in rec_data]
except RuntimeError:
continue
# Get top four characters
if len(char_detections) >= chars_thresh:
raw_results = char_detections[:4]
# Declare storage variables
Xmin_Char = []
ocr = []
decoded_text = ''
frame_copy = rec_frame.copy()
# Create list of detections xmin position and detection label
for detection in raw_results:
Xmin_Char.append([detection.xmin, labels[detection.label]])
bbox = frameNorm(frame_copy, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
ocr.append([bbox, labels[detection.label]])
if args.display_OCR_bboxes:
cv2.rectangle(frame_copy, (bbox[0], bbox[1]), (bbox[2], bbox[3]), box_color_rec, 2)
cv2.putText(frame_copy, '{} ({}%)'.format(labels[detection.label], int(detection.confidence * 100)), (bbox[0] - 10, bbox[1] - 20), cv2.FONT_HERSHEY_TRIPLEX, 0.4, box_color_rec)
# Sort previously created list by xmin position to provide left to right decoded text
for detection in sorted(Xmin_Char, reverse=False):
decoded_text += detection[1]
# Create result image to stack
rec_results = [(cv2.resize(frame_copy, (300, 300)), decoded_text)] + rec_results[:9]
# Extract image and annotation information to save
current_ocr_bbox_label = ocr
current_ocr_img = frame_copy
# Retrieve detection image and bounding box corresponding to current ocr image
current_detect_data = carc_dict.get(seq)
# Delete dictionary entry containing detection image and bounding box once they have been extracted
if current_detect_data is not None:
current_detect_bbox = current_detect_data[0]
current_detect_img = current_detect_data[1]
current_detect_bbox_label = [[current_detect_data[0], 'tattoo']]
del carc_dict[seq]
# Save files
try:
if len(char_detections) >= chars_thresh:
read += 1
if args.Save_Data is not None:
if os.path.isdir(args.Save_Data):
uid = fh.save_files(args.Save_Data, decoded_text, current_detect_bbox_label, current_ocr_bbox_label, current_detect_img, current_ocr_img)
if args.Use_PLC:
filtered_result = th.cond_tattoo(decoded_text)
request = [("Program:P01_MainPeriodicProgram.DINT_HotTattoo", filtered_result), ("Program:P01_MainPeriodicProgram.Tattoo_UID", int(uid))]
w_ret = comm.Write(request)
except Exception as e:
print(e)
fps.tick('OCR')
intialize device with context manager using created pipeline
with depthai.Device(pipeline) as device:
if args.camera:
cam_out = device.getOutputQueue("cam_out", 1, True)
else:
det_in = device.getInputQueue("det_in")
rec_in = device.getInputQueue("rec_in")
det_nn = device.getOutputQueue("det_nn", 1, False)
det_pass = device.getOutputQueue("det_pass", 1, False)
rec_nn = device.getOutputQueue("rec_nn", 1, False)
rec_pass = device.getOutputQueue("rec_pass", 1, False)
# Start tattoo detection thread
det_t = threading.Thread(target=detect_thread, args=(det_nn, det_pass, rec_in))
det_t.start()
# Start tattoo ocr thread
rec_t = threading.Thread(target=rec_thread, args=(rec_nn, rec_pass))
rec_t.start()
def should_run():
return cap.isOpened() if args.video else True
# Main loop
try:
fps_rgb = 0
ticks = 0
start = time.time()
# start main loop
while should_run():
read_correctly, frame = get_frame() # Load frame from camera into queue
if not read_correctly: # Could be used later if video is incorporated
break
for map_key in list(filter(lambda item: item <= tat_last_seq, frame_seq_map.keys())):
del frame_seq_map[map_key]
fps.next_iter() # Increment camera fps
if not args.camera:
if args.rotate:
frame = cv2.rotate(frame, cv2.cv2.ROTATE_90_CLOCKWISE)
tstamp = time.monotonic()
tat_frame = depthai.ImgFrame()
tat_frame.setData(to_planar(frame, (300, 300)))
tat_frame.setTimestamp(tstamp)
tat_frame.setSequenceNum(frame_det_seq)
tat_frame.setType(depthai.RawImgFrame.Type.BGR888p)
tat_frame.setWidth(300)
tat_frame.setHeight(300)
det_in.send(tat_frame)
if tat_last_img is None:
debug_frame = frame.copy() # Copy frame to manipulate
else:
debug_frame = tat_last_img.copy()
debug_frame = cv2.resize(debug_frame, (700, 900))
box_color = (0, 0, 255)
text_color = (0, 255, 255)
for detection in tattoo_detections: # Loop tattoo detections in current frame
bbox = frameNorm(debug_frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax)) # Normalize bounding box for current frame size
cv2.rectangle(debug_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), box_color, 2) # Draw bounding box on current frame
cv2.putText(debug_frame, '{} ({}%)'.format('tattoo', int(detection.confidence * 100)), (bbox[0] - 10, bbox[1] - 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, box_color)
if ticks < 5:
ticks += 1
else:
if time.time() - start > 0:
fps_rgb = round(ticks * (1 / (time.time() - start)), 1)
ticks = 0
start = time.time()
cv2.putText(debug_frame, f"RGB FPS: {round(fps.fps(), 1)}", (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color) # Display RGB fps on screen
cv2.putText(debug_frame, f"DETECT FPS: {round(fps.tick_fps('detect'), 1)}", (5, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color) # Display Detection fps on screen
cv2.putText(debug_frame, f"OCR FPS: {round(fps.tick_fps('OCR'), 1)}", (5, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color) # Display OCR fps on screen
cv2.putText(debug_frame, f"FOUND: {found}", (5, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color) # Display tattoos found count
cv2.putText(debug_frame, f"READ: {read}", (5, 75), cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color) # Display tattoos read count
cv2.imshow("rgb", debug_frame) # Display main detection frame
rec_stacked = None # Create variable to house ocr result que
for rec_img, rec_text in rec_results: # Loop current tattoo ocr result queue
rec_placeholder_img = np.zeros((300, 70, 3), np.uint8)
cv2.putText(rec_placeholder_img, rec_text, (5, 25), cv2.FONT_HERSHEY_TRIPLEX, 0.5, (0, 255, 0))
rec_combined = np.hstack((rec_img, rec_placeholder_img))
if rec_stacked is None:
rec_stacked = rec_combined
else:
rec_stacked = np.vstack((rec_stacked, rec_combined))
if rec_stacked is not None: # If result queue has contents, display them
cv2.imshow("Recognized tattoos", rec_stacked)
key = cv2.waitKey(1)
if key == ord('q'):
break
except KeyboardInterrupt:
pass
running = False
det_t.join()
rec_t.join()
print("FPS: {:.2f}".format(fps.fps()))
`