• DepthAI-v2
  • Challenges with DepthAI: How to Prevent video.link from stalling image.link?

I'm working on a project where I need to send H264 video frames to an RTSP stream and images to an HTTP server using DepthAI's camera control feature. Here's the situation:

Goal: To start sending H264 video frames only when prompted by a user, while allowing image retrieval from the HTTP server anytime as needed.

  • Issue: When accessing the HTTP link for the still image, the process hangs at node.io['jpeg'].get().

  • Observation: As soon as the script starts, DepthAI requires getOutputQueue() followed by get() or tryGet() post-camera connection for the encoded video stream, which seems to affect image capture.

  • Current Workaround: I've set up a separate thread to perform tryGet() on the encoded stream once. This seems to resolve the issue temporarily.

  • Is this a sustainable workaround, or are there better alternatives?

  • Could there be any negative impacts of leaving encoded1 running continuously without doing tryGet() again, specifically regarding host CPU performance or video streaming quality on the encoded2?

  • I am hesitant to set the encoded queue's blocking behavior to False for H264 encoding. Are there reasons I should reconsider this?

Here's the code

#!/usr/bin/env python3
import threading
import depthai as dai
import time
import signal
import os
import tkinter as tk


device = None
streaming_thread = None
running = False
reading_thread = None


def signal_handler(sig, frame):
    global device
    global streaming_thread
    global running
    print("Ctrl+C detected. Exiting...")
    if device:
        running = False
        if streaming_thread:
            streaming_thread.join(timeout=5.0)
        if reading_thread:
            reading_thread.join(timeout=5.0)
        device.close()
    os._exit(0)


signal.signal(signal.SIGINT, signal_handler)


def create_image_pipeline(cam=None, pipeline=None):
    if cam is None:
        # Define a source - color camera
        cam = pipeline.create(dai.node.ColorCamera)
    # VideoEncoder
    jpeg = pipeline.create(dai.node.VideoEncoder)
    jpeg.setDefaultProfilePreset(cam.getFps(), dai.VideoEncoderProperties.Profile.MJPEG)

    # Script node
    script = pipeline.create(dai.node.Script)
    script.setProcessor(dai.ProcessorType.LEON_CSS)
    script.setScript("""
        from http.server import BaseHTTPRequestHandler
        import socketserver
        import socket
        import fcntl
        import struct

        PORT = 8080
        ctrl = CameraControl()
        ctrl.setCaptureStill(True)

        def get_ip_address(ifname):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            return socket.inet_ntoa(fcntl.ioctl(
                s.fileno(),
                -1071617759,  # SIOCGIFADDR
                struct.pack('256s', ifname[:15].encode())
            )[20:24])

        class HTTPHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                if self.path == '/':
                    self.send_response(200)
                    self.end_headers()
                    self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>')
                elif self.path == '/img':
                    node.io['out'].send(ctrl)
                    jpegImage = node.io['jpeg'].get()
                    self.send_response(200)
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', str(len(jpegImage.getData())))
                    self.end_headers()
                    self.wfile.write(jpegImage.getData())
                else:
                    self.send_response(404)
                    self.end_headers()
                    self.wfile.write(b'Url not found...')

        with socketserver.TCPServer(("", PORT), HTTPHandler) as httpd:
            node.warn(f"Serving at {get_ip_address('re0')}:{PORT}")
            httpd.serve_forever()
    """)

    # Connections
    cam.still.link(jpeg.input)
    script.outputs['out'].link(cam.inputControl)
    jpeg.bitstream.link(script.inputs['jpeg'])


def initialize_camera():
    global device
    max_retries = 5  # Maximum number of initialization retries
    retry_delay = 3

    pipeline = dai.Pipeline()

    for _ in range(max_retries):
        try:
            device_infos = dai.Device.getAllAvailableDevices()
            if len(device_infos) == 0:
                print("No DepthAI device found! Retrying...")
                time.sleep(retry_delay)  # Add a delay before retrying
                continue
            print("Available devices:")
            for i, info in enumerate(device_infos):
                print(f"[{i}] {info.getMxId()} [{info.state.name}]")
            if len(device_infos) == 1:
                device_info = device_infos[0]
            else:
                val = input("Which DepthAI Device you want to use: ")
                try:
                    device_info = device_infos[int(val)]
                except:
                    raise ValueError("Incorrect value supplied: {}".format(val))

            if device_info.protocol != dai.XLinkProtocol.X_LINK_USB_VSC:
                print("Running RTSP stream may be unstable due to connection... (protocol: {})".format(device_info.protocol))

            pipeline = dai.Pipeline()
            FPS = 30
            colorCam = pipeline.create(dai.node.ColorCamera)
            colorCam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
            colorCam.setInterleaved(False)
            colorCam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
            colorCam.setFps(FPS)

            videnc = pipeline.create(dai.node.VideoEncoder)
            videnc.setDefaultProfilePreset(FPS, dai.VideoEncoderProperties.Profile.H264_BASELINE)
            colorCam.video.link(videnc.input)
            veOut = pipeline.create(dai.node.XLinkOut)
            veOut.setStreamName("encoded")
            videnc.bitstream.link(veOut.input)

            create_image_pipeline(cam=colorCam, pipeline=pipeline)

            device = dai.Device(pipeline, device_info)

            # device.setLogLevel(dai.LogLevel.DEBUG)
            # device.setLogOutputLevel(dai.LogLevel.DEBUG)
            return True  # Device initialized successfully

        except Exception as e:
            print(f"Error initializing Oak device: {e} | Retrying..")
            time.sleep(retry_delay)  # Add a delay between retries

    print("Maximum retries reached. Exiting... | Try running the script again")
    return False  # Device initialization failed


if __name__ == "__main__":
    if initialize_camera():
        running = True

        def read_in_parallel():
            encoded1 = device.getOutputQueue("encoded", maxSize=1, blocking=False)
            encoded1.tryGet()

        def streaming():
            global running
            print("starting streaming thread")
            try:
                encoded2 = device.getOutputQueue("encoded", maxSize=30, blocking=True)
                while running:
                    data = encoded2.get().getData()
            except Exception as e:
                print(f"Exception: {e}")

        def on_button_press():
            global streaming_thread
            if streaming_thread is None:
                streaming_thread = threading.Thread(target=streaming)
                streaming_thread.start()
                time.sleep(1)

        reading_thread = threading.Thread(target=read_in_parallel, daemon=True)
        reading_thread.start()

        # Set up the GUI
        root = tk.Tk()
        root.title("Streaming Control")

        # Create a button widget
        button = tk.Button(root, text="Start Streaming", command=on_button_press)
        button.pack(pady=20)

        # Run the GUI
        root.mainloop()
    else:
        os._exit(1)  # Exit with an error code if initialization fails
8 days later

Hi @Anas_Khan
It works for me in this configuration:

#!/usr/bin/env python3
import threading
import depthai as dai
import time
import signal
import os
import tkinter as tk


device = None
streaming_thread = None
running = False
reading_thread = None



def create_image_pipeline(cam=None, pipeline=None):
    if cam is None:
        # Define a source - color camera
        cam = pipeline.create(dai.node.ColorCamera)
    # VideoEncoder
    jpeg = pipeline.create(dai.node.VideoEncoder)
    jpeg.setDefaultProfilePreset(cam.getFps(), dai.VideoEncoderProperties.Profile.MJPEG)

    # Script node
    script = pipeline.create(dai.node.Script)
    script.setProcessor(dai.ProcessorType.LEON_CSS)
    script.setScript("""
        from http.server import BaseHTTPRequestHandler
        import socketserver
        import socket
        import fcntl
        import struct

        PORT = 8080
        ctrl = CameraControl()
        ctrl.setCaptureStill(True)

        def get_ip_address(ifname):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            return socket.inet_ntoa(fcntl.ioctl(
                s.fileno(),
                -1071617759,  # SIOCGIFADDR
                struct.pack('256s', ifname[:15].encode())
            )[20:24])

        class HTTPHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                if self.path == '/':
                    self.send_response(200)
                    self.end_headers()
                    self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>')
                elif self.path == '/img':
                    node.io['out'].send(ctrl)
                    jpegImage = node.io['jpeg'].get()
                    self.send_response(200)
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', str(len(jpegImage.getData())))
                    self.end_headers()
                    self.wfile.write(jpegImage.getData())
                else:
                    self.send_response(404)
                    self.end_headers()
                    self.wfile.write(b'Url not found...')

        with socketserver.TCPServer(("", PORT), HTTPHandler) as httpd:
            node.warn(f"Serving at {get_ip_address('re0')}:{PORT}")
            httpd.serve_forever()
    """)

    # Connections
    cam.still.link(jpeg.input)
    script.outputs['out'].link(cam.inputControl)
    jpeg.bitstream.link(script.inputs['jpeg'])


def initialize_camera():
    global device
    max_retries = 5  # Maximum number of initialization retries
    retry_delay = 3

    pipeline = dai.Pipeline()

    for _ in range(max_retries):
        try:
            device_infos = dai.Device.getAllAvailableDevices()
            if len(device_infos) == 0:
                print("No DepthAI device found! Retrying...")
                time.sleep(retry_delay)  # Add a delay before retrying
                continue
            print("Available devices:")
            for i, info in enumerate(device_infos):
                print(f"[{i}] {info.getMxId()} [{info.state.name}]")
            if len(device_infos) == 1:
                device_info = device_infos[0]
            else:
                val = input("Which DepthAI Device you want to use: ")
                try:
                    device_info = device_infos[int(val)]
                except:
                    raise ValueError("Incorrect value supplied: {}".format(val))

            if device_info.protocol != dai.XLinkProtocol.X_LINK_USB_VSC:
                print("Running RTSP stream may be unstable due to connection... (protocol: {})".format(device_info.protocol))

            pipeline = dai.Pipeline()
            FPS = 30
            colorCam = pipeline.create(dai.node.ColorCamera)
            colorCam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
            colorCam.setInterleaved(False)
            colorCam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
            colorCam.setFps(FPS)

            videnc = pipeline.create(dai.node.VideoEncoder)
            videnc.setDefaultProfilePreset(FPS, dai.VideoEncoderProperties.Profile.H264_BASELINE)
            colorCam.video.link(videnc.input)
            veOut = pipeline.create(dai.node.XLinkOut)
            veOut.setStreamName("encoded")
            videnc.bitstream.link(veOut.input)

            create_image_pipeline(cam=colorCam, pipeline=pipeline)

            device = dai.Device(pipeline, device_info)

            # device.setLogLevel(dai.LogLevel.DEBUG)
            # device.setLogOutputLevel(dai.LogLevel.DEBUG)
            return True  # Device initialized successfully

        except Exception as e:
            print(f"Error initializing Oak device: {e} | Retrying..")
            time.sleep(retry_delay)  # Add a delay between retries

    print("Maximum retries reached. Exiting... | Try running the script again")
    return False  # Device initialization failed


if __name__ == "__main__":
    
    if initialize_camera():
        qEnc = device.getOutputQueue("encoded", maxSize=1, blocking=False)

        def streaming():
            while True:
                encoderOutput = qEnc.tryGet()
                if encoderOutput is not None:
                    print(encoderOutput.getData())

        def on_button_press():
            streaming()

        # Set up the GUI
        root = tk.Tk()
        root.title("Streaming Control")

        # Create a button widget
        button = tk.Button(root, text="Start Streaming", command=on_button_press)
        button.pack(pady=20)

        # Run the GUI
        root.mainloop()
    else:
        os._exit(1)  # Exit with an error code if initialization fails