- Edited
I am looking for a way to grab frames from a camera whether it is the OAK-D, or some other camera.
I would like to run something as simple as:
import cv2
from CameraStream import CameraStream
if __name__ == "__main__":
cam_1 = CameraStream("cam_onn").start()
cam_2 = CameraStream("cam_oak", is_oak=True).start()
while True:
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
img_1 = cam_1.read()
img_2 = cam_2.read()
cv2.imshow(cam_1.cam_name, img_1)
cv2.imshow(cam_2.cam_name, img_2)
cv2.destroyAllWindows()
I have a CameraStream
class implemented where I try to simplify grabbing frames and using threads. Notably, the stream
property of the class, and get_stream()
is where I am trying to perform setup of the cameras to simplify the grabbing of frames, but to no avail. The error I get is:
RuntimeError: Communication exception - possible device error/misconfiguration. Original message 'Couldn't read data from stream: 'rgb' (X_LINK_ERROR)'
I will post the CameraStream
class below, but I wonder if I am on the right track, or if can I fix my issue?
import cv2, threading, pickle
import depthai as dai
class CameraStream:
def __init__(self, cam_name, src=0, is_oak=False):
# initialize the video camera stream
# and read the first frame from the stream
self.cam_name = cam_name
self.is_oak = is_oak
self.src = src
self.cam_mtx, self.dist_mtx = self.get_camera_matricies()
self.stream = self.get_stream()
if (self.is_oak):
preview = self.stream.get()
self.frame = preview.getCvFrame()
else:
_, self.frame = self.stream.read()
# initialize the variable used to indicate if the thread should
# be stopped
self.stopped = False
def start(self):
# start the thread to read frames from the video stream
threading.Thread(target=self.update, args=()).start()
return self
def update(self):
# keep looping infinitely until the thread is stopped
while True:
# if the thread indicator variable is set, stop the thread
if self.stopped:
return
# otherwise, read the next frame from the stream
if (self.is_oak):
preview = self.stream.get()
self.frame = preview.getCvFrame()
else:
_, self.frame = self.stream.read()
def read(self):
# return the frame most recently read
return self.frame
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
def get_camera_matricies(self):
with open(f'./calibration/cameras/{self.cam_name}/calibration.pkl', 'rb') as f:
cc = pickle.load(f)
camera_matrix = cc[0]
distortion_matrix = cc[1]
return (camera_matrix, distortion_matrix)
def get_stream(self):
if (self.is_oak):
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
cam_rgb = pipeline.create(dai.node.ColorCamera)
xout_rgb = pipeline.create(dai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
# Properties
cam_rgb.setPreviewSize(640, 480)
cam_rgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.preview.link(xout_rgb.input)
with dai.Device(pipeline) as device:
q = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
return q
else:
cap = cv2.VideoCapture(self.src)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
return cap