I'm working on a project where I need to send H264 video frames to an RTSP stream and images to an HTTP server using DepthAI's camera control feature. Here's the situation:
Goal: To start sending H264 video frames only when prompted by a user, while allowing image retrieval from the HTTP server anytime as needed.
Issue: When accessing the HTTP link for the still image, the process hangs at node.io['jpeg'].get()
.
Observation: As soon as the script starts, DepthAI requires getOutputQueue()
followed by get()
or tryGet()
post-camera connection for the encoded
video stream, which seems to affect image capture.
Current Workaround: I've set up a separate thread to perform tryGet()
on the encoded
stream once. This seems to resolve the issue temporarily.
Is this a sustainable workaround, or are there better alternatives?
Could there be any negative impacts of leaving encoded1
running continuously without doing tryGet() again, specifically regarding host CPU performance or video streaming quality on the encoded2
?
I am hesitant to set the encoded
queue's blocking behavior to False for H264 encoding. Are there reasons I should reconsider this?
Here's the code
#!/usr/bin/env python3
import threading
import depthai as dai
import time
import signal
import os
import tkinter as tk
device = None
streaming_thread = None
running = False
reading_thread = None
def signal_handler(sig, frame):
global device
global streaming_thread
global running
print("Ctrl+C detected. Exiting...")
if device:
running = False
if streaming_thread:
streaming_thread.join(timeout=5.0)
if reading_thread:
reading_thread.join(timeout=5.0)
device.close()
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
def create_image_pipeline(cam=None, pipeline=None):
if cam is None:
# Define a source - color camera
cam = pipeline.create(dai.node.ColorCamera)
# VideoEncoder
jpeg = pipeline.create(dai.node.VideoEncoder)
jpeg.setDefaultProfilePreset(cam.getFps(), dai.VideoEncoderProperties.Profile.MJPEG)
# Script node
script = pipeline.create(dai.node.Script)
script.setProcessor(dai.ProcessorType.LEON_CSS)
script.setScript("""
from http.server import BaseHTTPRequestHandler
import socketserver
import socket
import fcntl
import struct
PORT = 8080
ctrl = CameraControl()
ctrl.setCaptureStill(True)
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
-1071617759, # SIOCGIFADDR
struct.pack('256s', ifname[:15].encode())
)[20:24])
class HTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.end_headers()
self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>')
elif self.path == '/img':
node.io['out'].send(ctrl)
jpegImage = node.io['jpeg'].get()
self.send_response(200)
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(jpegImage.getData())))
self.end_headers()
self.wfile.write(jpegImage.getData())
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b'Url not found...')
with socketserver.TCPServer(("", PORT), HTTPHandler) as httpd:
node.warn(f"Serving at {get_ip_address('re0')}:{PORT}")
httpd.serve_forever()
""")
# Connections
cam.still.link(jpeg.input)
script.outputs['out'].link(cam.inputControl)
jpeg.bitstream.link(script.inputs['jpeg'])
def initialize_camera():
global device
max_retries = 5 # Maximum number of initialization retries
retry_delay = 3
pipeline = dai.Pipeline()
for _ in range(max_retries):
try:
device_infos = dai.Device.getAllAvailableDevices()
if len(device_infos) == 0:
print("No DepthAI device found! Retrying...")
time.sleep(retry_delay) # Add a delay before retrying
continue
print("Available devices:")
for i, info in enumerate(device_infos):
print(f"[{i}] {info.getMxId()} [{info.state.name}]")
if len(device_infos) == 1:
device_info = device_infos[0]
else:
val = input("Which DepthAI Device you want to use: ")
try:
device_info = device_infos[int(val)]
except:
raise ValueError("Incorrect value supplied: {}".format(val))
if device_info.protocol != dai.XLinkProtocol.X_LINK_USB_VSC:
print("Running RTSP stream may be unstable due to connection... (protocol: {})".format(device_info.protocol))
pipeline = dai.Pipeline()
FPS = 30
colorCam = pipeline.create(dai.node.ColorCamera)
colorCam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
colorCam.setInterleaved(False)
colorCam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
colorCam.setFps(FPS)
videnc = pipeline.create(dai.node.VideoEncoder)
videnc.setDefaultProfilePreset(FPS, dai.VideoEncoderProperties.Profile.H264_BASELINE)
colorCam.video.link(videnc.input)
veOut = pipeline.create(dai.node.XLinkOut)
veOut.setStreamName("encoded")
videnc.bitstream.link(veOut.input)
create_image_pipeline(cam=colorCam, pipeline=pipeline)
device = dai.Device(pipeline, device_info)
# device.setLogLevel(dai.LogLevel.DEBUG)
# device.setLogOutputLevel(dai.LogLevel.DEBUG)
return True # Device initialized successfully
except Exception as e:
print(f"Error initializing Oak device: {e} | Retrying..")
time.sleep(retry_delay) # Add a delay between retries
print("Maximum retries reached. Exiting... | Try running the script again")
return False # Device initialization failed
if __name__ == "__main__":
if initialize_camera():
running = True
def read_in_parallel():
encoded1 = device.getOutputQueue("encoded", maxSize=1, blocking=False)
encoded1.tryGet()
def streaming():
global running
print("starting streaming thread")
try:
encoded2 = device.getOutputQueue("encoded", maxSize=30, blocking=True)
while running:
data = encoded2.get().getData()
except Exception as e:
print(f"Exception: {e}")
def on_button_press():
global streaming_thread
if streaming_thread is None:
streaming_thread = threading.Thread(target=streaming)
streaming_thread.start()
time.sleep(1)
reading_thread = threading.Thread(target=read_in_parallel, daemon=True)
reading_thread.start()
# Set up the GUI
root = tk.Tk()
root.title("Streaming Control")
# Create a button widget
button = tk.Button(root, text="Start Streaming", command=on_button_press)
button.pack(pady=20)
# Run the GUI
root.mainloop()
else:
os._exit(1) # Exit with an error code if initialization fails