Hi @Irena
Made some adjustments:
import depthai as dai
import time
import blobconverter
from pathlib import Path
# modelOL = blobconverter.from_zoo(name="mobile_object_localizer_192x192", zoo_type="depthai", shaves=6)
modelSSD = blobconverter.from_zoo(name="mobilenet-ssd", shaves=6)
fire_model_path = "/Users/jaka/Desktop/tests/multimodel_scripts/models/fire-detection_openvino_2021.2_5shave.blob"
SW_VERSION = "SSD_FireDet"
# Start defining a pipeline
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(version = dai.OpenVINO.VERSION_2021_4)
# Color camera
cam = pipeline.create(dai.node.ColorCamera)
cam.setInterleaved(False)
cam.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam.setIspScale(1,3)
# cam.setPreviewSize(640, 640)
# cam.setVideoSize(640, 640)
# cam.setFps(40)
# Define a neural network Mobilenet-ssd
nn_mssd = pipeline.create(dai.node.MobileNetDetectionNetwork)
nn_mssd.setConfidenceThreshold(0.5)
nn_mssd.setBlobPath(modelSSD)
nn_mssd.setNumInferenceThreads(2)
nn_mssd.input.setBlocking(False)
# Define input image to nn_MobilenetSSD
manip_mssd = pipeline.createImageManip()
manip_mssd.setResize(300,300)
manip_mssd.setMaxOutputFrameSize(270000) # 300x300x3
manip_mssd.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
# Define a neural network Fire Detection
nn_fire = pipeline.create(dai.node.NeuralNetwork)
# nn_fire.setConfidenceThreshold(0.5)
nn_fire.setBlobPath(fire_model_path)
nn_fire.input.setBlocking(False)
# Define input image to nn_FireDetection
manip_fire = pipeline.createImageManip()
manip_fire.setResize(224,224)
manip_fire.setMaxOutputFrameSize(150528) # 224x224x2
manip_fire.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p)
#define a script node
script = pipeline.create(dai.node.Script)
script.setProcessor(dai.ProcessorType.LEON_CSS)
#Define a video encoder
videoEnc = pipeline.create(dai.node.VideoEncoder)
videoEnc.setDefaultProfilePreset(30, dai.VideoEncoderProperties.Profile.MJPEG)
# Links
cam.preview.link(manip_mssd.inputImage)
manip_mssd.out.link(nn_mssd.input)
cam.preview.link(manip_fire.inputImage)
manip_fire.out.link(nn_fire.input)
script.inputs['detSSD'].setBlocking(False)
script.inputs['detSSD'].setQueueSize(1)
nn_mssd.out.link(script.inputs["detSSD"])
script.inputs['detFire'].setBlocking(False)
script.inputs['detFire'].setQueueSize(1)
nn_fire.out.link(script.inputs["detFire"])
script.inputs['frame'].setBlocking(False)
script.inputs['frame'].setQueueSize(1)
videoEnc.bitstream.link(script.inputs['frame'])
cam.video.link(videoEnc.input)
script.setScript("""
import socket
import time
import threading
serverFrame = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverFrame.bind(("0.0.0.0", 5010))
serverFrame.listen()
serverSSD = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSSD.bind(("0.0.0.0", 5011))
serverSSD.listen()
serverFire = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverFire.bind(("0.0.0.0", 5012))
serverFire.listen()
node.warn("Server up")
labelMap_SSD = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
"diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
label_fire = ["fire", "normal", "smoke"]
client_connections = {
"frame": [],
"ssd": [],
"fire": []
}
def send_frame_thread():
try:
while True:
pck = node.io["frame"].tryGet()
if not pck:
continue
data = pck.getData()
ts = pck.getTimestamp()
header = f"ABCDE " + str(ts.total_seconds()).ljust(18) + str(len(data)).ljust(8)
for conn in client_connections["frame"]:
try:
conn.send(bytes(header, encoding='ascii'))
conn.send(data)
except Exception as e:
node.warn(f"Frame client disconnected: {e}")
client_connections["frame"].remove(conn)
conn.close()
except Exception as e:
node.warn(f"Error oak: {e}")
def send_result_nn(type):
try:
while True:
pck = node.io["frame"].tryGet()
if not pck:
continue
data = pck.getData()
ts = pck.getTimestamp()
data_to_send = []
if type == 1: # SSD
detections_ssd = node.io["detSSD"].tryGet()
if detections_ssd:
dets = detections_ssd.detections
for det in dets:
label = labelMap_SSD[det.label]
if label == "person":
det_bb = [det.label, det.xmin, det.ymin, det.xmax, det.ymax]
data_to_send.append(det_bb)
for conn in client_connections["ssd"]:
try:
header = f"ABCDE " + str(ts.total_seconds()).ljust(18) + str(len(data)).ljust(8) + str(data_to_send).ljust(224)
conn.send(bytes(header, encoding='ascii'))
conn.send(data)
except Exception as e:
node.warn(f"SSD client disconnected: {e}")
client_connections["ssd"].remove(conn)
conn.close()
elif type == 2: # Fire Detection
data_fire = node.io["detFire"].tryGet()
if data_fire:
data_to_send = data_fire.getLayerFp16("final_result")
for conn in client_connections["fire"]:
try:
header = f"ABCDE " + str(ts.total_seconds()).ljust(18) + str(len(data)).ljust(8) + str(data_to_send).ljust(224)
conn.send(bytes(header, encoding='ascii'))
conn.send(data)
except Exception as e:
node.warn(f"Fire Detection client disconnected: {e}")
client_connections["fire"].remove(conn)
conn.close()
except Exception as e:
node.warn(f"Error oak: {e}")
def get_thread(server, type):
try:
while True:
conn, client = server.accept()
node.warn(f"Connected to client IP: {client}, type: {type}")
if type == 0:
client_connections["frame"].append(conn)
threading.Thread(target=send_frame_thread).start()
elif type == 1:
client_connections["ssd"].append(conn)
threading.Thread(target=send_result_nn, args=(type, )).start()
elif type == 2:
client_connections["fire"].append(conn)
threading.Thread(target=send_result_nn, args=(type, )).start()
except Exception as e:
node.warn("Server error:", e)
threading.Thread(target=get_thread, args=(serverFrame, 0)).start()
threading.Thread(target=get_thread, args=(serverSSD, 1)).start()
threading.Thread(target=get_thread, args=(serverFire, 2)).start()
""")
# By default, you would boot device with:
with dai.Device(pipeline) as device:
while True:
time.sleep(1)
# But for this example, we want to flash the device with the pipeline
# device_infos = dai.DeviceBootloader.getAllAvailableDevices()
# print(f'Found {len(device_infos)} devices')
# for device in device_infos:
# print(f"Start flashing SW_version_{SW_VERSION} on device: {device}")
# bootloader = dai.DeviceBootloader(device)
# progress = lambda p : print(f'Flashing progress: {p*100:.1f}%')
# # (r, errmsg) = bootloader.flash(progress, pipeline)
# (r, errmsg) = bootloader.flash(progress, pipeline, compress=True, applicationName=SW_VERSION)
# if r: print("Flash OK")
# else: print("Flash ERROR:", errmsg)
The server should now be able to accept multiple clients connecting to the same port. Added a tryGet inside the script to prevent hanging. Make sure to handle the changes on client side.
Another important consideration: Lower the fps. The device can capture and process frames quickly, but the script node is not so performant since the CPU is really slow.
Thanks,
Jaka