Hello everyone,
I'm experiencing an issue with a TCP connection.
I have two separate systems, each one consists of 10 OAK-1 PoE cameras, all running the same software in standalone mode, and all connected to the same PoE switch.
A script continuously acquires and processes images using an object detection model. This script acts as a TCP server: an external client connects and requests the result of the processing.
Everything works fine for several hours, but eventually one of the cameras stops responding over TCP. However, I can still ping the camera, and reflashing the code temporarily restores functionality. Each time, a different camera is affected.
I need help understanding what might be causing this issue.
I'm attaching the script with the code flashed on the camera where the tcp server is defined.
Thank you to anyone who can help,
Stefano
`from paho.mqtt.client import Client
import threading
import socket
import node
import time
import base64
import json
import hashlib
CAMERA_ID = '127.0.0.1'
ID_MACHINE = '1800'
MQTT_LIFEBIT = 'LifeBit'
MQTT_TOPIC_SEND = 'ElabResult'
MQTT_TOPIC_RECV = 'Trigger'
MQTT_TOPIC_CHECKMQTT = 'CheckMqtt'
MQTT_BROKER = '127.0.0.1'
TCP_PORT = '12341'
class TcpResult():
def init(self) -> None:
self.trigger_code = None
self.result = None
REQUESTS_RECEIVED = threading.Event()
TCP_RESULT = TcpResult()
last_img_hash = ""
class TcpServer:
def init(self, tcp_ip: str, tcp_port: int) -> None:
self.tcp_ip = tcp_ip
self.tcp_port = tcp_port
TCP_RESULT.trigger_code = None
TCP_RESULT.result = None
threading.Thread(target=self.elaboration_loop).start()
threading.Thread(target=self.check_tcp).start()
def check_tcp(self) -> None:
while True:
self.start_tcp_server()
time.sleep(1)
def start_tcp_server(self) -> None:
""" Initialize socket and wait for connections
"""
node.warn(f"INFO - TcpServer - {CAMERA_ID} started")
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_socket.bind((self.tcp_ip, self.tcp_port))
tcp_socket.listen(5)
except Exception as e:
node.warn(f"ERROR - TCP {CAMERA_ID}: Exception in starting tcp socket: {e}")
while True:
node.warn(f"INFO - TCP {CAMERA_ID}: Is listening on {self.tcp_ip}:{self.tcp_port}")
try:
conn, addr = tcp_socket.accept()
except Exception as e:
node.warn(f"ERROR - TCP {CAMERA_ID}: Failed to accept connection: {e}")
continue
with conn:
node.warn(f"INFO - TcpServer - {CAMERA_ID}: Connected to {addr}")
try:
TCP_RESULT.trigger_code = -1
data = conn.recv(1024)
if data:
data = data.decode("utf-8")
TCP_RESULT.trigger_code = data
node.warn(f"INFO - TcpServer - {CAMERA_ID}: Message received from {addr} : {data}")
REQUESTS_RECEIVED.set()
payload = self.compose_message()
conn.sendall(payload)
else:
node.warn(f"INFO - TCP {CAMERA_ID}: No data received from {addr}")
except Exception as e:
node.warn(f"ERROR - TCP {CAMERA_ID}: Exception in tcp connection: {e}")
def compose_message(self) -> bytes:
""" Compose the payload containing Esito and Status info to send to the client.
If not Esito and Status are available, wait
"""
try:
counter = 0
payload = ""
while (TCP_RESULT.result is None) and counter < 20:
node.warn(f'INFO - TCP {CAMERA_ID}: is waiting for data to be available...')
time.sleep(0.5)
counter += 1
payload = TCP_RESULT.result
TCP_RESULT.result = None
return payload
except Exception as e:
node.warn(f"ERROR - TCP {CAMERA_ID}: Exception while composing tcp message: {e}")
def send_results(self, results) -> None:
global TCP_RESULT
node.warn(f"INFO - Camera {CAMERA_ID}: Mqtt Results Message Thread Started")
try:
s = time.time()
dets = results['dets']
data_im = results['data_im']
n_detections = len(dets.detections)
esito = max(0, min(1, n_detections))
is_new_image = self.hash_image(data_im)
if data_im is not None:
encoded_im = base64.b64encode(data_im).decode('utf-8')
else:
encoded_im = ''
coords = []
for i in range(n_detections):
elt = [round(dets.detections[i].xmin, 5),
round(dets.detections[i].ymin, 5),
round(dets.detections[i].xmax, 5),
round(dets.detections[i].ymax, 5)]
coords.append(elt)
result = json.dumps({
"CAMERA_ID": CAMERA_ID,
"Result": esito,
"Coordinates": coords,
"Detections": str(n_detections),
"Trigger_ID": TCP_RESULT.trigger_code,
"IsNewImage": is_new_image,
"Image": encoded_im}).encode("utf-8")
TCP_RESULT.result = result
node.warn(f"Elaboration time: {time.time()-s}s")
except Exception as e:
node.warn(f"ERROR - Camera {CAMERA_ID}: Exception while sending mqtt results: {e}")
def hash_image(self,image_data):
global last_img_hash
hasher = hashlib.new("sha256")
hasher.update(image_data)
img_hash = hasher.hexdigest()
is_new_image = True if (img_hash != last_img_hash) else False
last_img_hash = img_hash
return is_new_image
def elaboration_loop(self) -> None:
results = {}
while True:
try:
data_im = None
dets = None
counter = 0
if REQUESTS_RECEIVED.is_set():
while (data_im is None or not dets) and counter < 50:
dets = node.io['detections'].get()
pck = node.io['frame'].get()
if pck:
data_im = pck.getData()
if data_im is None or not dets:
counter += 1
time.sleep(0.2)
results['data_im'] = data_im if data_im else None
results['dets'] = dets if dets else []
threading.Thread(target=self.send_results, args=[results,]).start()
REQUESTS_RECEIVED.clear()
else:
node.io['detections'].get()
node.io['frame'].get()
except Exception as e:
node.warn(f"WARING - Camera {CAMERA_ID}: Exception in the elaboration loop: {e}")
tcp_server = TcpServer(CAMERA_ID, TCP_PORT)
`