K
kruschid

  • May 16, 2023
  • Joined Aug 9, 2022
  • 0 best answers
  • Hi Erik,

    Thank you for responding. From my previous example, I deleted a few unnecessary details (cv2 preview). The code is complete, so you may copy and execute it to test it. The only other thing you need is pip install reactivex. There is no multithreading involved. (As shown by the two print calls).

    import asyncio
    import depthai
    from reactivex import timer, operators as op
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    import threading
    
    def camera_stream():
        pipeline = depthai.Pipeline()
    
        cam_rgb = pipeline.create(depthai.node.ColorCamera)
        cam_rgb.setPreviewSize(300, 300)
        cam_rgb.setInterleaved(False)
    
        xout_rgb = pipeline.create(depthai.node.XLinkOut)
        xout_rgb.setStreamName("rgb")
        cam_rgb.preview.link(xout_rgb.input)
    
        device = depthai.Device(pipeline, usb2Mode=True)
    
        q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    
        print('finished uploading')
        print(threading.current_thread().name)
    
        def get_frame(i):
            print('getting frame:')
            print(threading.current_thread().name)
    
            in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
            frame = None
            if in_rgb is not None:
                frame = in_rgb.getCvFrame()
    
            return frame
    
        return timer(1, 0.1).pipe(
            op.map(get_frame),
            op.filter(lambda frame: frame is not None),
        )
    
    
    loop = asyncio.new_event_loop()
    aio_scheduler = AsyncIOScheduler(loop=loop)
    
    subscription = camera_stream().subscribe(
        on_next=lambda frame: print(frame.shape),
        scheduler=aio_scheduler,
    )
    
    loop.run_forever()

    As previously said, if I move the pipeline, device creation, and queue setting outside of the function camera_stream(), the script works well. However, this would not fulfill my needs for eventually creating asynchronous streams with several OAK-1 devices:

    import asyncio
    import depthai
    from reactivex import timer, operators as op
    from reactivex.scheduler.eventloop import AsyncIOScheduler
    import threading
    
    + pipeline = depthai.Pipeline()
    +
    + cam_rgb = pipeline.create(depthai.node.ColorCamera)
    + cam_rgb.setPreviewSize(300, 300)
    + cam_rgb.setInterleaved(False)
    +
    + xout_rgb = pipeline.create(depthai.node.XLinkOut)
    + xout_rgb.setStreamName("rgb")
    + cam_rgb.preview.link(xout_rgb.input)
    +
    + device = depthai.Device(pipeline, usb2Mode=True)
    +
    + q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    +
    + print('finished uploading')
    + print(threading.current_thread().name)
    
    def camera_stream():
    -    pipeline = depthai.Pipeline()
    -
    -    cam_rgb = pipeline.create(depthai.node.ColorCamera)
    -    cam_rgb.setPreviewSize(300, 300)
    -    cam_rgb.setInterleaved(False)
    -
    -    xout_rgb = pipeline.create(depthai.node.XLinkOut)
    -    xout_rgb.setStreamName("rgb")
    -    cam_rgb.preview.link(xout_rgb.input)
    -
    -    device = depthai.Device(pipeline, usb2Mode=True)
    -
    -    q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
    -
    -    print('finished uploading')
    -    print(threading.current_thread().name)
    
        def get_frame(i):
            print('getting frame:')
            print(threading.current_thread().name)
    
            in_rgb = q_rgb.tryGet() # <<< this line throws X_LINK_ERROR
            frame = None
            if in_rgb is not None:
                frame = in_rgb.getCvFrame()
    
            return frame
    
        return timer(1, 0.1).pipe(
            op.map(get_frame),
            op.filter(lambda frame: frame is not None),
        )
    
    
    loop = asyncio.new_event_loop()
    aio_scheduler = AsyncIOScheduler(loop=loop)
    
    subscription = camera_stream().subscribe(
        on_next=lambda frame: print(frame.shape),
        scheduler=aio_scheduler,
    )
    
    loop.run_forever()
    • erik replied to this.
    • I'd want to make a video stream using depthai, asyncio, and rxpy. The code below performs an adequate job. The 'timer' triggers 'get_frame' every 100ms, which polls a frame and pushes it down the stream. There is just one thread in use.

      The code, however, would only operate with one camera. I'd want to upload the pipeline within the camera_stream() method to make it work with multiple cameras (see the code fragment below).

      import asyncio
      import cv2
      import depthai
      from reactivex import timer, operators as op
      from reactivex.scheduler.eventloop import AsyncIOScheduler
      
      loop = asyncio.new_event_loop()
      aio_scheduler = AsyncIOScheduler(loop=loop)
      
      pipeline = depthai.Pipeline()
      
      cam_rgb = pipeline.create(depthai.node.ColorCamera)
      cam_rgb.setPreviewSize(300, 300)
      cam_rgb.setInterleaved(False)
      
      xout_rgb = pipeline.create(depthai.node.XLinkOut)
      xout_rgb.setStreamName("rgb")
      cam_rgb.preview.link(xout_rgb.input)
      
      device = depthai.Device(pipeline, usb2Mode=True)
      
      q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
      
      def camera_stream():
          def get_frame(i):
              frame = None
              in_rgb = q_rgb.tryGet()
      
              if in_rgb is not None:
                  frame = in_rgb.getCvFrame()
              return frame
      
          return timer(0, 0.1, scheduler=aio_scheduler).pipe(
              op.map(get_frame),
              op.filter(lambda frame: frame is not None),
              op.share(),
          )
      
      def show_video(frame: cv2.Mat):
          cv2.imshow("preview", frame)
          cv2.waitKey(1) # no preview without this
      
      subscription = camera_stream().subscribe(
          on_next=show_video,
      )
      
      loop.run_forever()

      This modification moves the pipeline upload code within the camera_stream() function:

      def camera_stream():
      +    pipeline = depthai.Pipeline()
      +
      +    cam_rgb = pipeline.create(depthai.node.ColorCamera)
      +    cam_rgb.setPreviewSize(300, 300)
      +    cam_rgb.setInterleaved(False)
      +
      +    xout_rgb = pipeline.create(depthai.node.XLinkOut)
      +    xout_rgb.setStreamName("rgb")
      +    cam_rgb.preview.link(xout_rgb.input)
      +
      +    device = depthai.Device(pipeline, usb2Mode=True)
      +
      +    q_rgb = device.getOutputQueue("rgb", maxSize=1, blocking=False)
      
          def get_frame(i):
              ...
      
          return timer(0, 0.1, scheduler=aio_scheduler).pipe(
              ...
          )

      When I run this code, I get the following error: RuntimeError: Communication exception - possible device error/misconfiguration. Original message 'Couldn't read data from stream: 'rgb' (X_LINK_ERROR)'

      I would have expected the code to function normally. Unfortunately, I am unable to identify the issue at this time. My script is rather straightforward, and I did not find any relevant details in the luxonis docs. I would appreciate it if you could help me.

      Camera: OAK-1

      system log

      {
        "architecture": "64bit",
        "machine": "arm64",
        "platform": "macOS-13.2.1-arm64-arm-64bit",
        "processor": "arm",
        "python_build": "main Apr  5 2022 01:52:34",
        "python_compiler": "Clang 12.0.0 ",
        "python_implementation": "CPython",
        "python_version": "3.9.12",
        "release": "22.3.0",
        "system": "Darwin",
        "version": "Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000",
        "win32_ver": "",
        "packages": [
          "autopep8==2.0.2",
          "blinker==1.6.2",
          "blobconverter==1.3.0",
          "boto3==1.26.126",
          "botocore==1.29.126",
          "certifi==2022.12.7",
          "charset-normalizer==3.1.0",
          "click==8.1.3",
          "depthai==2.21.2.0",
          "depthai-sdk==1.9.5",
          "distinctipy==1.2.2",
          "Flask==2.3.2",
          "idna==3.4",
          "importlib-metadata==6.6.0",
          "itsdangerous==2.1.2",
          "Jinja2==3.1.2",
          "jmespath==1.0.1",
          "MarkupSafe==2.1.2",
          "marshmallow==3.17.0",
          "numpy==1.24.3",
          "opencv-contrib-python==4.7.0.72",
          "opencv-python==4.7.0.72",
          "packaging==23.1",
          "pip==22.0.4",
          "pycodestyle==2.10.0",
          "python-dateutil==2.8.2",
          "pytube==12.1.3",
          "PyTurboJPEG==1.6.4",
          "PyYAML==6.0",
          "reactivex==4.0.4",
          "requests==2.30.0",
          "s3transfer==0.6.0",
          "setuptools==58.1.0",
          "six==1.16.0",
          "tomli==2.0.1",
          "typing_extensions==4.5.0",
          "urllib3==1.26.15",
          "Werkzeug==2.3.3",
          "xmltodict==0.13.0",
          "zipp==3.15.0"
        ],
        "usb": ["NoLib"],
        "uname": [
          "Darwin 22.3.0 Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000 arm64 arm"
        ]
      }
      • erik replied to this.
      • This is what worked for me:

        1. Open a terminal, navigate to depthai-python folder
        2. Execute git checkout mac_arm64_wheels
        3. Execute python examples/install_requirements.py
        4. python -mpip install --extra-index-url https://artifacts.luxonis.com/artifactory/luxonis-python-snapshot-local/ depthai==2.16.0.0.dev0+fc472bd13f9c853148e7a35fc956f4bc0dbf49ae
        5. Execute python examples/ColorCamera/rgb_preview.py