I'd want to make a video stream using depthai, asyncio, and rxpy. The code below performs an adequate job. The 'timer' triggers 'get_frame' every 100ms, which polls a frame and pushes it down the stream. There is just one thread in use.
The code, however, would only operate with one camera. I'd want to upload the pipeline within the camera_stream()
method to make it work with multiple cameras (see the code fragment below).
import asyncio
import cv2
import depthai
from reactivex import timer, operators as op
from reactivex.scheduler.eventloop import AsyncIOScheduler
loop = asyncio.new_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)
pipeline = depthai.Pipeline()
cam_rgb = pipeline.create(depthai.node.ColorCamera)
cam_rgb.setPreviewSize(300, 300)
cam_rgb.setInterleaved(False)
xout_rgb = pipeline.create(depthai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)
device = depthai.Device(pipeline, usb2Mode=True)
q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
def camera_stream():
def get_frame(i):
frame = None
in_rgb = q_rgb.tryGet()
if in_rgb is not None:
frame = in_rgb.getCvFrame()
return frame
return timer(0, 0.1, scheduler=aio_scheduler).pipe(
op.map(get_frame),
op.filter(lambda frame: frame is not None),
op.share(),
)
def show_video(frame: cv2.Mat):
cv2.imshow("preview", frame)
cv2.waitKey(1) # no preview without this
subscription = camera_stream().subscribe(
on_next=show_video,
)
loop.run_forever()
This modification moves the pipeline upload code within the camera_stream()
function:
def camera_stream():
+ pipeline = depthai.Pipeline()
+
+ cam_rgb = pipeline.create(depthai.node.ColorCamera)
+ cam_rgb.setPreviewSize(300, 300)
+ cam_rgb.setInterleaved(False)
+
+ xout_rgb = pipeline.create(depthai.node.XLinkOut)
+ xout_rgb.setStreamName("rgb")
+ cam_rgb.preview.link(xout_rgb.input)
+
+ device = depthai.Device(pipeline, usb2Mode=True)
+
+ q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
def get_frame(i):
...
return timer(0, 0.1, scheduler=aio_scheduler).pipe(
...
)
When I run this code, I get the following error: RuntimeError: Communication exception - possible device error/misconfiguration. Original message 'Couldn't read data from stream: 'rgb' (X_LINK_ERROR)'
I would have expected the code to function normally. Unfortunately, I am unable to identify the issue at this time. My script is rather straightforward, and I did not find any relevant details in the luxonis docs. I would appreciate it if you could help me.
Camera: OAK-1
system log
{
"architecture": "64bit",
"machine": "arm64",
"platform": "macOS-13.2.1-arm64-arm-64bit",
"processor": "arm",
"python_build": "main Apr 5 2022 01:52:34",
"python_compiler": "Clang 12.0.0 ",
"python_implementation": "CPython",
"python_version": "3.9.12",
"release": "22.3.0",
"system": "Darwin",
"version": "Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000",
"win32_ver": "",
"packages": [
"autopep8==2.0.2",
"blinker==1.6.2",
"blobconverter==1.3.0",
"boto3==1.26.126",
"botocore==1.29.126",
"certifi==2022.12.7",
"charset-normalizer==3.1.0",
"click==8.1.3",
"depthai==2.21.2.0",
"depthai-sdk==1.9.5",
"distinctipy==1.2.2",
"Flask==2.3.2",
"idna==3.4",
"importlib-metadata==6.6.0",
"itsdangerous==2.1.2",
"Jinja2==3.1.2",
"jmespath==1.0.1",
"MarkupSafe==2.1.2",
"marshmallow==3.17.0",
"numpy==1.24.3",
"opencv-contrib-python==4.7.0.72",
"opencv-python==4.7.0.72",
"packaging==23.1",
"pip==22.0.4",
"pycodestyle==2.10.0",
"python-dateutil==2.8.2",
"pytube==12.1.3",
"PyTurboJPEG==1.6.4",
"PyYAML==6.0",
"reactivex==4.0.4",
"requests==2.30.0",
"s3transfer==0.6.0",
"setuptools==58.1.0",
"six==1.16.0",
"tomli==2.0.1",
"typing_extensions==4.5.0",
"urllib3==1.26.15",
"Werkzeug==2.3.3",
"xmltodict==0.13.0",
"zipp==3.15.0"
],
"usb": ["NoLib"],
"uname": [
"Darwin 22.3.0 Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000 arm64 arm"
]
}