• DepthAI
  • uploading a pipeline within a function causes X_LINK_ERROR

I'd want to make a video stream using depthai, asyncio, and rxpy. The code below performs an adequate job. The 'timer' triggers 'get_frame' every 100ms, which polls a frame and pushes it down the stream. There is just one thread in use.

The code, however, would only operate with one camera. I'd want to upload the pipeline within the camera_stream() method to make it work with multiple cameras (see the code fragment below).

import asyncio
import cv2
import depthai
from reactivex import timer, operators as op
from reactivex.scheduler.eventloop import AsyncIOScheduler

loop = asyncio.new_event_loop()
aio_scheduler = AsyncIOScheduler(loop=loop)

pipeline = depthai.Pipeline()

cam_rgb = pipeline.create(depthai.node.ColorCamera)
cam_rgb.setPreviewSize(300, 300)
cam_rgb.setInterleaved(False)

xout_rgb = pipeline.create(depthai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
cam_rgb.preview.link(xout_rgb.input)

device = depthai.Device(pipeline, usb2Mode=True)

q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)

def camera_stream():
    def get_frame(i):
        frame = None
        in_rgb = q_rgb.tryGet()

        if in_rgb is not None:
            frame = in_rgb.getCvFrame()
        return frame

    return timer(0, 0.1, scheduler=aio_scheduler).pipe(
        op.map(get_frame),
        op.filter(lambda frame: frame is not None),
        op.share(),
    )

def show_video(frame: cv2.Mat):
    cv2.imshow("preview", frame)
    cv2.waitKey(1) # no preview without this

subscription = camera_stream().subscribe(
    on_next=show_video,
)

loop.run_forever()

This modification moves the pipeline upload code within the camera_stream() function:

def camera_stream():
+    pipeline = depthai.Pipeline()
+
+    cam_rgb = pipeline.create(depthai.node.ColorCamera)
+    cam_rgb.setPreviewSize(300, 300)
+    cam_rgb.setInterleaved(False)
+
+    xout_rgb = pipeline.create(depthai.node.XLinkOut)
+    xout_rgb.setStreamName("rgb")
+    cam_rgb.preview.link(xout_rgb.input)
+
+    device = depthai.Device(pipeline, usb2Mode=True)
+
+    q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)

    def get_frame(i):
        ...

    return timer(0, 0.1, scheduler=aio_scheduler).pipe(
        ...
    )

When I run this code, I get the following error: RuntimeError: Communication exception - possible device error/misconfiguration. Original message 'Couldn't read data from stream: 'rgb' (X_LINK_ERROR)'

I would have expected the code to function normally. Unfortunately, I am unable to identify the issue at this time. My script is rather straightforward, and I did not find any relevant details in the luxonis docs. I would appreciate it if you could help me.

Camera: OAK-1

system log

{
  "architecture": "64bit",
  "machine": "arm64",
  "platform": "macOS-13.2.1-arm64-arm-64bit",
  "processor": "arm",
  "python_build": "main Apr  5 2022 01:52:34",
  "python_compiler": "Clang 12.0.0 ",
  "python_implementation": "CPython",
  "python_version": "3.9.12",
  "release": "22.3.0",
  "system": "Darwin",
  "version": "Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000",
  "win32_ver": "",
  "packages": [
    "autopep8==2.0.2",
    "blinker==1.6.2",
    "blobconverter==1.3.0",
    "boto3==1.26.126",
    "botocore==1.29.126",
    "certifi==2022.12.7",
    "charset-normalizer==3.1.0",
    "click==8.1.3",
    "depthai==2.21.2.0",
    "depthai-sdk==1.9.5",
    "distinctipy==1.2.2",
    "Flask==2.3.2",
    "idna==3.4",
    "importlib-metadata==6.6.0",
    "itsdangerous==2.1.2",
    "Jinja2==3.1.2",
    "jmespath==1.0.1",
    "MarkupSafe==2.1.2",
    "marshmallow==3.17.0",
    "numpy==1.24.3",
    "opencv-contrib-python==4.7.0.72",
    "opencv-python==4.7.0.72",
    "packaging==23.1",
    "pip==22.0.4",
    "pycodestyle==2.10.0",
    "python-dateutil==2.8.2",
    "pytube==12.1.3",
    "PyTurboJPEG==1.6.4",
    "PyYAML==6.0",
    "reactivex==4.0.4",
    "requests==2.30.0",
    "s3transfer==0.6.0",
    "setuptools==58.1.0",
    "six==1.16.0",
    "tomli==2.0.1",
    "typing_extensions==4.5.0",
    "urllib3==1.26.15",
    "Werkzeug==2.3.3",
    "xmltodict==0.13.0",
    "zipp==3.15.0"
  ],
  "usb": ["NoLib"],
  "uname": [
    "Darwin 22.3.0 Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000 arm64 arm"
  ]
}
  • erik replied to this.

    Hi kruschid ,
    Could you provide full (minimal) code, so we can have a look? Note that initializing the camera (so dai.Device()) has to run from the main thread.
    Thanks, Erik

    Hi Erik,

    Thank you for responding. From my previous example, I deleted a few unnecessary details (cv2 preview). The code is complete, so you may copy and execute it to test it. The only other thing you need is pip install reactivex. There is no multithreading involved. (As shown by the two print calls).

    import asyncio
    import depthai
    from reactivex import timer, operators as op
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    import threading
    
    def camera_stream():
        pipeline = depthai.Pipeline()
    
        cam_rgb = pipeline.create(depthai.node.ColorCamera)
        cam_rgb.setPreviewSize(300, 300)
        cam_rgb.setInterleaved(False)
    
        xout_rgb = pipeline.create(depthai.node.XLinkOut)
        xout_rgb.setStreamName("rgb")
        cam_rgb.preview.link(xout_rgb.input)
    
        device = depthai.Device(pipeline, usb2Mode=True)
    
        q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    
        print('finished uploading')
        print(threading.current_thread().name)
    
        def get_frame(i):
            print('getting frame:')
            print(threading.current_thread().name)
    
            in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
            frame = None
            if in_rgb is not None:
                frame = in_rgb.getCvFrame()
    
            return frame
    
        return timer(1, 0.1).pipe(
            op.map(get_frame),
            op.filter(lambda frame: frame is not None),
        )
    
    
    loop = asyncio.new_event_loop()
    aio_scheduler = AsyncIOScheduler(loop=loop)
    
    subscription = camera_stream().subscribe(
        on_next=lambda frame: print(frame.shape),
        scheduler=aio_scheduler,
    )
    
    loop.run_forever()

    As previously said, if I move the pipeline, device creation, and queue setting outside of the function camera_stream(), the script works well. However, this would not fulfill my needs for eventually creating asynchronous streams with several OAK-1 devices:

    import asyncio
    import depthai
    from reactivex import timer, operators as op
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    import threading
    
    + pipeline = depthai.Pipeline()
    +
    + cam_rgb = pipeline.create(depthai.node.ColorCamera)
    + cam_rgb.setPreviewSize(300, 300)
    + cam_rgb.setInterleaved(False)
    +
    + xout_rgb = pipeline.create(depthai.node.XLinkOut)
    + xout_rgb.setStreamName("rgb")
    + cam_rgb.preview.link(xout_rgb.input)
    +
    + device = depthai.Device(pipeline, usb2Mode=True)
    +
    + q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    +
    + print('finished uploading')
    + print(threading.current_thread().name)
    
    def camera_stream():
    -    pipeline = depthai.Pipeline()
    -
    -    cam_rgb = pipeline.create(depthai.node.ColorCamera)
    -    cam_rgb.setPreviewSize(300, 300)
    -    cam_rgb.setInterleaved(False)
    -
    -    xout_rgb = pipeline.create(depthai.node.XLinkOut)
    -    xout_rgb.setStreamName("rgb")
    -    cam_rgb.preview.link(xout_rgb.input)
    -
    -    device = depthai.Device(pipeline, usb2Mode=True)
    -
    -    q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    -
    -    print('finished uploading')
    -    print(threading.current_thread().name)
    
        def get_frame(i):
            print('getting frame:')
            print(threading.current_thread().name)
    
            in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
            frame = None
            if in_rgb is not None:
                frame = in_rgb.getCvFrame()
    
            return frame
    
        return timer(1, 0.1).pipe(
            op.map(get_frame),
            op.filter(lambda frame: frame is not None),
        )
    
    
    loop = asyncio.new_event_loop()
    aio_scheduler = AsyncIOScheduler(loop=loop)
    
    subscription = camera_stream().subscribe(
        on_next=lambda frame: print(frame.shape),
        scheduler=aio_scheduler,
    )
    
    loop.run_forever()
    • erik replied to this.

      Hi kruschid ,
      Here's response from gpt4, which works on my side - I am not familiar enough with rx/python to elaborate on any of this though🙂


      The issue you're experiencing may be due to the fact that ReactiveX is trying to access the q_rgb queue from a different thread than the one where it was created. In many libraries, including depthai, certain objects are not thread-safe and must only be accessed from the thread where they were created.

      In your case, the q_rgb queue is created in the main thread, but the ReactiveX pipeline may be trying to pull from it in a different thread. When it tries to do this, it results in an error.

      One possible solution would be to use a thread-safe queue, like those provided by the queue module in the Python standard library. You could have your main thread pull from the q_rgb queue and push to the thread-safe queue, then have your ReactiveX pipeline pull from the thread-safe queue.

      Here's a rough example of how you might do this:

      import queue
      import threading
      import asyncio
      import depthai
      from reactivex import timer, operators as op
      from reactivex.scheduler.eventloop import AsyncIOScheduler
      
      def camera_stream(thread_safe_queue):
          def get_frame(i):
              print('getting frame:')
              print(threading.current_thread().name)
      
              frame = None
              while not thread_safe_queue.empty():
                  frame = thread_safe_queue.get()
              return frame
      
          return timer(1, 0.1).pipe(
              op.map(get_frame),
              op.filter(lambda frame: frame is not None),
          )
      
      def start_depthai(thread_safe_queue):
          pipeline = depthai.Pipeline()
      
          cam_rgb = pipeline.create(depthai.node.ColorCamera)
          cam_rgb.setPreviewSize(300, 300)
          cam_rgb.setInterleaved(False)
      
          xout_rgb = pipeline.create(depthai.node.XLinkOut)
          xout_rgb.setStreamName("rgb")
          cam_rgb.preview.link(xout_rgb.input)
      
          device = depthai.Device(pipeline, usb2Mode=True)
      
          q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
      
          print('finished uploading')
          print(threading.current_thread().name)
      
          while True:
              in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
              if in_rgb is not None:
                  frame = in_rgb.getCvFrame()
                  thread_safe_queue.put(frame)
      
      thread_safe_queue = queue.Queue()
      
      threading.Thread(target=start_depthai, args=(thread_safe_queue,)).start()
      
      loop = asyncio.new_event_loop()
      aio_scheduler = AsyncIOScheduler(loop=loop)
      
      subscription = camera_stream(thread_safe_queue).subscribe(
          on_next=lambda frame: print(frame.shape),
          scheduler=aio_scheduler,
      )
      
      loop.run_forever()