- Edited
Hi Erik,
Thank you for responding. From my previous example, I deleted a few unnecessary details (cv2 preview). The code is complete, so you may copy and execute it to test it. The only other thing you need is pip install reactivex
. There is no multithreading involved. (As shown by the two print
calls).
import asyncio
import depthai
from reactivex import timer, operators as op
from reactivex.scheduler.eventloop import AsyncIOScheduler
import threading
def camera_stream():
pipeline = depthai.Pipeline()
cam_rgb = pipeline.create(depthai.node.ColorCamera)
cam_rgb.setPreviewSize(300, 300)
cam_rgb.setInterleaved(False)
xout_rgb = pipeline.create(depthai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)
device = depthai.Device(pipeline, usb2Mode=True)
q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
print('finished uploading')
print(threading.current_thread().name)
def get_frame(i):
print('getting frame:')
print(threading.current_thread().name)
in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
frame = None
if in_rgb is not None:
frame = in_rgb.getCvFrame()
return frame
return timer(1, 0.1).pipe(
op.map(get_frame),
op.filter(lambda frame: frame is not None),
)
loop = asyncio.new_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)
subscription = camera_stream().subscribe(
on_next=lambda frame: print(frame.shape),
scheduler=aio_scheduler,
)
loop.run_forever()
As previously said, if I move the pipeline, device creation, and queue setting outside of the function camera_stream()
, the script works well. However, this would not fulfill my needs for eventually creating asynchronous streams with several OAK-1 devices:
import asyncio
import depthai
from reactivex import timer, operators as op
from reactivex.scheduler.eventloop import AsyncIOScheduler
import threading
+ pipeline = depthai.Pipeline()
+
+ cam_rgb = pipeline.create(depthai.node.ColorCamera)
+ cam_rgb.setPreviewSize(300, 300)
+ cam_rgb.setInterleaved(False)
+
+ xout_rgb = pipeline.create(depthai.node.XLinkOut)
+ xout_rgb.setStreamName("rgb")
+ cam_rgb.preview.link(xout_rgb.input)
+
+ device = depthai.Device(pipeline, usb2Mode=True)
+
+ q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
+
+ print('finished uploading')
+ print(threading.current_thread().name)
def camera_stream():
- pipeline = depthai.Pipeline()
-
- cam_rgb = pipeline.create(depthai.node.ColorCamera)
- cam_rgb.setPreviewSize(300, 300)
- cam_rgb.setInterleaved(False)
-
- xout_rgb = pipeline.create(depthai.node.XLinkOut)
- xout_rgb.setStreamName("rgb")
- cam_rgb.preview.link(xout_rgb.input)
-
- device = depthai.Device(pipeline, usb2Mode=True)
-
- q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
-
- print('finished uploading')
- print(threading.current_thread().name)
def get_frame(i):
print('getting frame:')
print(threading.current_thread().name)
in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
frame = None
if in_rgb is not None:
frame = in_rgb.getCvFrame()
return frame
return timer(1, 0.1).pipe(
op.map(get_frame),
op.filter(lambda frame: frame is not None),
)
loop = asyncio.new_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)
subscription = camera_stream().subscribe(
on_next=lambda frame: print(frame.shape),
scheduler=aio_scheduler,
)
loop.run_forever()