• DepthAI-v2
  • Script MQTT subscription - how do we extract payload for further processing?

Hi, I have the OAK-D Pro PoE and need to use MQTT-based communication. I can successfully publish from camera based on this example.

However, I'm facing a strange issue when trying communication the other way i.e., getting messages to the camera via MQTT. I can successfully subscribe to a topic in a script and see messages correctly coming in inside the on_message callback. However, I can't get the payload value to the surrounding context, which I have tried using a global variable. A minimum reproducible example code below. The global keyword doesn't seem to do anything in on_message as the value of msg inside the while loop of the script is simply the initialised blank value.

import depthai as dai
import time

pipeline = dai.Pipeline()

script = pipeline.createScript()
script.setProcessor(dai.ProcessorType.LEON_CSS)

# Change the IP to your MQTT broker!
MQTT_BROKER = "192.168.0.38"
MQTT_BROKER_PORT = 1883
MQTT_TOPIC = "oak_d/detection/m"
script_text = f"""
    import time

    msg = ''

    def on_message(client, userdata, message, tmp=None):
        global msg
        msg = message.payload.decode('utf8')
        node.warn(' In on_message, message: ' + msg)

    mqttc = Client()
    node.warn('Connecting to MQTT broker...')
    mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
    node.warn('Successfully connected to MQTT broker!')
    mqttc.subscribe("{MQTT_TOPIC}", 2);
    mqttc.on_message = on_message
    mqttc.loop_start()

    while True:
        time.sleep(1)
        node.warn(' In main loop, message: ' + msg)
"""
with open("paho-mqtt.py", "r") as f:
    paho_script = f.read()
    script.setScript(f"{paho_script}\n{script_text}")

with dai.Device(pipeline) as device:
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            break

Output of running this code:

Appreciate any help. 🙂

  • erik replied to this.
  • Turns out the problem was due to the indentation in the script defined by script_text in my original code. 🤦‍♂️

    Thanks to the dev who figured it out - Issue #892 on depthai-core. Removing the indentation makes the code work as expected.

    script = pipeline.createScript()
    script.setProcessor(dai.ProcessorType.LEON_CSS)
    
    # Change the IP to your MQTT broker!
    MQTT_BROKER = "192.168.0.38"
    MQTT_BROKER_PORT = 1883
    MQTT_TOPIC = "oak_d/detection/m"
    script_text = f"""
    import time
    
    msg = ''
    
    def on_message(client, userdata, message, tmp=None):
        global msg
        msg = message.payload.decode('utf8')
        node.warn(' In on_message, message: ' + msg)
    
    mqttc = Client()
    node.warn('Connecting to MQTT broker...')
    mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
    node.warn('Successfully connected to MQTT broker!')
    mqttc.subscribe("{MQTT_TOPIC}", 2);
    mqttc.on_message = on_message
    mqttc.loop_start()
    
    while True:
        time.sleep(1)
        node.warn(' In main loop, message: ' + msg)
    """
    with open("paho-mqtt.py", "r") as f:
        paho_script = f.read()
        script.setScript(f"{paho_script}\n{script_text}")
    
    with dai.Device(pipeline) as device:
        while True:
            try:
                time.sleep(1)
            except KeyboardInterrupt:

    Hi zepman ,
    This does seem like a depthai issue. Could you create an issue on depthai-core about this, and just copy paste your paste there?
    Thanks, Erik

    14 days later

    Turns out the problem was due to the indentation in the script defined by script_text in my original code. 🤦‍♂️

    Thanks to the dev who figured it out - Issue #892 on depthai-core. Removing the indentation makes the code work as expected.

    script = pipeline.createScript()
    script.setProcessor(dai.ProcessorType.LEON_CSS)
    
    # Change the IP to your MQTT broker!
    MQTT_BROKER = "192.168.0.38"
    MQTT_BROKER_PORT = 1883
    MQTT_TOPIC = "oak_d/detection/m"
    script_text = f"""
    import time
    
    msg = ''
    
    def on_message(client, userdata, message, tmp=None):
        global msg
        msg = message.payload.decode('utf8')
        node.warn(' In on_message, message: ' + msg)
    
    mqttc = Client()
    node.warn('Connecting to MQTT broker...')
    mqttc.connect("{MQTT_BROKER}", {MQTT_BROKER_PORT}, 120)
    node.warn('Successfully connected to MQTT broker!')
    mqttc.subscribe("{MQTT_TOPIC}", 2);
    mqttc.on_message = on_message
    mqttc.loop_start()
    
    while True:
        time.sleep(1)
        node.warn(' In main loop, message: ' + msg)
    """
    with open("paho-mqtt.py", "r") as f:
        paho_script = f.read()
        script.setScript(f"{paho_script}\n{script_text}")
    
    with dai.Device(pipeline) as device:
        while True:
            try:
                time.sleep(1)
            except KeyboardInterrupt: