Complete pipeline code:
import sys
import threading
import gi
import os
import random
import colorsys
import collections
import queue
import time
import traceback
import cv2
import base64
import numpy as np
from camera_state import *
from datetime import datetime, timedelta
gi.require_version('Gst', '1.0')from gi.repository import GObject, Gst, GLib
import depthai as daiimport pyds
# --- Funções Auxiliares ---def intersects(p1, p2, p3, p4): def ccw(A, B, C): return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0]) return ccw(p1, p3, p4) != ccw(p2, p3, p4) and ccw(p1, p2, p3) != ccw(p1, p2, p4)
def get_color_for_class(class_id, dynamic_class_colors): if class_id not in dynamic_class_colors: hue = random.random() rgb_color = colorsys.hsv_to_rgb(hue, 0.9, 0.9) dynamic_class_colors[class_id] = (rgb_color[0], rgb_color[1], rgb_color[2], 1.0) return dynamic_class_colors[class_id]
# --- Classe da Câmera OAK ---class OakCameraManager: def __init__(self, device_info: dai.DeviceInfo, camera_state: CameraState, shared_metadata, metadata_lock): self.device_info_obj = device_info self.camera_state = camera_state self.is_running = True self.shared_metadata = shared_metadata self.metadata_lock = metadata_lock self.last_ui_update_time = 0 self.last_frame_update_time = 0 self.stereo_matcher = None
def _apply_controls(self, ctrl: dai.CameraControl): # ALTERADO: Lê as configurações diretamente do objeto de estado controls = self.camera_state.get_all_controls() # Foco (lógica de exemplo, pode ser ajustada) if 'autofocus_mode' in controls: # Se você adicionar este controle no futuro ctrl.setAutoFocusMode(controls['autofocus_mode']) else: ctrl.setAutoFocusMode(dai.CameraControl.AutoFocusMode.OFF) ctrl.setManualFocus(int(controls['focus_manual']))
if controls['autoexposure_mode']: ctrl.setAutoExposureEnable() ctrl.setAntiBandingMode(ANTIBANDING_MODE_MAP.get(controls['antibanding_mode'])) else: ctrl.setManualExposure(int(controls['exposure_us']), int(controls['iso'])) awb_mode = AWB_MODE_MAP.get(controls['awb_mode']) ctrl.setAutoWhiteBalanceMode(awb_mode) if awb_mode == dai.CameraControl.AutoWhiteBalanceMode.OFF: ctrl.setManualWhiteBalance(int(controls['wb_manual']))
ctrl.setBrightness(int(controls['brightness'])) ctrl.setContrast(int(controls['contrast'])) ctrl.setSaturation(int(controls['saturation'])) ctrl.setSharpness(int(controls['sharpness'])) return ctrl
def _process_commands(self, dai_control_queue: dai.InputQueue): # ALTERADO: A lógica de comandos agora é muito mais simples command = self.camera_state.get_command() if command: cmd_type = command.get('type') if cmd_type == 'apply_changes': ctrl = dai.CameraControl() self._apply_controls(ctrl) dai_control_queue.send(ctrl) elif cmd_type in ['apply_af_region', 'apply_ae_region', 'apply_live_ae_region']: controls = self.camera_state.get_all_controls() x = int(controls.get('selection_region_x', 0)) y = int(controls.get('selection_region_y', 0)) w = int(controls.get('selection_region_w', 1)) h = int(controls.get('selection_region_h', 1)) ctrl = dai.CameraControl() if cmd_type == 'apply_af_region': print(f"INFO: Aplicando região de Foco Automático: x={x}, y={y}, w={w}, h={h}") ctrl.setAutoFocusMode(dai.CameraControl.AutoFocusMode.AUTO) ctrl.setAutoFocusRegion(x, y, w, h) else: print(f"INFO: Aplicando região de Exposição Automática (Modo: {cmd_type}): x={x}, y={y}, w={w}, h={h}") ctrl.setAutoExposureEnable() ctrl.setAutoExposureRegion(x, y, w, h) dai_control_queue.send(ctrl)
def run(self, appsrc): print("DEBUG: Thread da Câmera OAK V3 iniciada.") try: device = dai.Device(self.device_info_obj) print(f"INFO: Dispositivo {device.getMxId()} conectado com sucesso.")
controls = self.camera_state.get_all_controls() # 1. Pega as chaves dos presets selecionados na UI res_key = controls.get('resolucao_fps', "1080p @ 30 FPS") crop_key = controls.get('tamanho_corte', "1024x1024")
# 2. Busca os valores correspondentes nos dicionários SENSOR_WIDTH, SENSOR_HEIGHT, FPS = PRESETS_RESOLUCAO[res_key] CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']
print(f"INFO: Usando preset de sensor: {res_key} ({SENSOR_WIDTH}x{SENSOR_HEIGHT} @ {FPS} FPS)") print(f"INFO: Usando preset de corte: {crop_key} ({CROP_WIDTH}x{CROP_HEIGHT})")
# Informa ao PipelineManager (indiretamente) o tamanho do corte para o streammux # Isso é feito através do próprio estado, que o PipelineManager vai ler self.camera_state.update_from_ui('update_crop_size_for_ds', {'w': CROP_WIDTH, 'h': CROP_HEIGHT}) # O 'with' agora gerencia o ciclo de vida do pipeline. with dai.Pipeline(defaultDevice=device) as pipeline: device = pipeline.getDefaultDevice() print('Connected cameras:',device.getConnectedCameras()) # A definição dos nós permanece a mesma cam_rgb = pipeline.create(dai.node.Camera).build( boardSocket=dai.CameraBoardSocket.CAM_A, sensorResolution= [SENSOR_WIDTH,SENSOR_HEIGHT], sensorFps=FPS, )
isp_out = cam_rgb.requestOutput((SENSOR_WIDTH, SENSOR_HEIGHT), dai.ImgFrame.Type.YUV420p, fps=FPS)
# 1. Criamos o nó ImageManip. manip = pipeline.create(dai.node.ImageManip)
# 2. Lemos as coordenadas de corte iniciais do nosso estado centralizado. controls = self.camera_state.get_all_controls() is_depth_enabled = controls.get('depth_enabled', False) q_depth = None default_crop_x = (SENSOR_WIDTH - CROP_WIDTH) // 2 default_crop_y = (SENSOR_HEIGHT - CROP_HEIGHT) // 2 crop_x = controls.get('crop_x', default_crop_x) crop_y = controls.get('crop_y', default_crop_y)
print(f"INFO: Adicionando operação de corte em ({crop_x}, {crop_y}) com tamanho {CROP_WIDTH}x{CROP_HEIGHT}") manip.initialConfig.addCrop(crop_x, crop_y, CROP_WIDTH, CROP_HEIGHT) # 4. Garantimos que a saída do ImageManip tenha o formato esperado pelo VideoEncoder. manip.initialConfig.setFrameType(dai.ImgFrame.Type.NV12) video_enc = pipeline.create(dai.node.VideoEncoder) video_enc.setDefaultProfilePreset(fps=FPS, profile=dai.VideoEncoderProperties.Profile.H265_MAIN)
if is_depth_enabled: print("INFO: Profundidade habilitada. Configurando pipeline estéreo...")
mono_left = pipeline.create(dai.node.Camera).build( boardSocket=dai.CameraBoardSocket.CAM_B, )
mono_right = pipeline.create(dai.node.Camera).build( boardSocket=dai.CameraBoardSocket.CAM_C, )
stereo = pipeline.create(dai.node.StereoDepth) # Usamos o preset de alta precisão que é um bom equilíbrio stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.DEFAULT) stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A) # LeftRightCheck é MANTIDO ATIVO devido à exigência do alinhamento stereo.setLeftRightCheck(True) stereo.setSubpixel(False)
mono_left_out = mono_left.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=FPS) mono_right_out = mono_right.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=FPS)
mono_left_out.link(stereo.left) mono_right_out.link(stereo.right)
# A fila só é criada se a profundidade estiver habilitada q_depth = stereo.depth.createOutputQueue(maxSize=4, blocking=False) else: print("INFO: Profundidade desabilitada. Pulando a criação dos nós estéreo.") isp_out.link(manip.inputImage) # Saída ISP vai para o ImageManip
manip.setMaxOutputFrameSize(CROP_WIDTH * CROP_HEIGHT * 3 // 2) # NV12 tem 1.5 bytes por pixel
manip.out.link(video_enc.input) # Saída de vídeo 1080p vai para o Manip
print("INFO: Aplicando configurações iniciais da câmera...") self._apply_controls(cam_rgb.initialControl)
q_h265_out = video_enc.out.createOutputQueue(maxSize=30, blocking=False) control_in = cam_rgb.inputControl.createInputQueue() sys_logger = pipeline.create(dai.node.SystemLogger) sys_logger.setRate(1) # Envia dados a cada 1 segundo q_sys_info = sys_logger.out.createOutputQueue(maxSize=4, blocking=False) pipeline.start() print("INFO: Pipeline V3 iniciado no dispositivo.")
pts = 0 duration_ns = 10**9 / FPS
while self.is_running and pipeline.isRunning(): self._process_commands(control_in) h265_packet = q_h265_out.tryGet() sys_info = q_sys_info.tryGet()
if q_depth: depth_frame = q_depth.tryGet() if depth_frame is not None: depth_cv_frame = depth_frame.getFrame() # Obter a região de interesse (ROI) do estado central controls = self.camera_state.get_all_controls() roi_x = int(controls['selection_region_x']) roi_y = int(controls['selection_region_y']) roi_w = int(controls['selection_region_w']) roi_h = int(controls['selection_region_h'])
# Calcular o centro da ROI center_x = roi_x + roi_w // 2 center_y = roi_y + roi_h // 2 # Garantir que as coordenadas estão dentro dos limites do frame if 0 <= center_y < CROP_HEIGHT and 0 <= center_x < CROP_WIDTH: # Obter o valor da distância (em mm) no pixel central dist_mm = depth_cv_frame[center_y, center_x] # Atualizar o estado central com o novo valor self.camera_state.update_depth_info({'center_depth_mm': dist_mm}) if sys_info: m = 1024 * 1024 # MiB temp = sys_info.chipTemperature.average # [cite: 3612, 4162] css_cpu = sys_info.leonCssCpuUsage.average # [cite: 4170, 4202] mss_cpu = sys_info.leonMssCpuUsage.average # [cite: 4170, 4202] ddr_mem_info = {'used': sys_info.ddrMemoryUsage.used / m, 'total': sys_info.ddrMemoryUsage.total / m} cmx_mem_info = {'used': sys_info.cmxMemoryUsage.used / m, 'total': sys_info.cmxMemoryUsage.total / m} css_mem_info = {'used': sys_info.leonCssMemoryUsage.used / m, 'total': sys_info.leonCssMemoryUsage.total / m} mss_mem_info = {'used': sys_info.leonMssMemoryUsage.used / m, 'total': sys_info.leonMssMemoryUsage.total / m}
self.camera_state.update_system_info({ 'temp': sys_info.chipTemperature.average, 'css_cpu': sys_info.leonCssCpuUsage.average, 'mss_cpu': sys_info.leonMssCpuUsage.average, 'ddr_memory': ddr_mem_info, 'cmx_memory': cmx_mem_info, 'css_memory': css_mem_info, 'mss_memory': mss_mem_info }) if h265_packet: metadata = { "lens_pos": h265_packet.getLensPosition(), # [cite: 4276] "lens_pos_raw": h265_packet.getLensPositionRaw(), # [cite: 4277] "exposure_us": h265_packet.getExposureTime().microseconds, # [cite: 4275] "iso": h265_packet.getSensitivity(), # [cite: 4278] "color_temp": h265_packet.getColorTemperature() # [cite: 4275] } with self.metadata_lock: self.shared_metadata.update(metadata)
current_time = time.time() if current_time - self.last_ui_update_time > 0.5: # Throttle a 2Hz self.camera_state.update_from_camera_metadata(metadata) self.last_ui_update_time = current_time
buf = Gst.Buffer.new_wrapped(h265_packet.getData().tobytes()) buf.pts = pts buf.duration = duration_ns pts += duration_ns if appsrc.emit('push-buffer', buf) != Gst.FlowReturn.OK: print("AVISO: Appsrc (GStreamer) rejeitou o buffer. Parando.") break # Quando o 'with pipeline' termina, o pipeline é parado. print("INFO: O pipeline foi parado.")
except Exception as e: print(f"ERRO CRÍTICO na thread da câmera: {e}") traceback.print_exc() finally: self.is_running = False if appsrc: appsrc.emit('end-of-stream') print("INFO: Thread da câmera finalizada e EOS enviado.")
def stop(self): print("INFO: Sinal de parada recebido pela OakCameraManager.") self.is_running = False
# --- Classe do Pipeline DeepStream ---class PipelineManager: def __init__(self, camera_state: CameraState, counter_callback): print("INFO: PipelineManager inicializado...") Gst.init(None) self.camera_state = camera_state self.counter_callback = counter_callback self.loop = GLib.MainLoop() self.pipeline = None self.oak_camera = None self.camera_thread = None self.is_running = False self.current_source_type = None
# Estado para o OSD self.trajectories = {} self.line_crossing_counter = 0 self.counted_object_ids = set() self.dynamic_class_colors = {} self.MAX_TRAJECTORY_LENGTH = 2 self.latest_metadata = {} self.metadata_lock = threading.Lock() self.line_crossing_class_counter = collections.defaultdict(int) self.pipeline_start_time = None self.frame_count_for_fps = 0 self.last_fps_calc_time = 0 self.calculated_fps = 0.0
self.glib_thread = threading.Thread(target=self.loop.run) self.glib_thread.daemon = True self.glib_thread.start()
def on_pad_added(self, src, pad, target_sink_pad): print(f"INFO: Pad dinâmico '{pad.get_name()}' criado em '{src.get_name()}'. Tentando ligar...") if pad.can_link(target_sink_pad): pad.link(target_sink_pad) print("INFO: Ligação do pad dinâmico bem-sucedida.") else: print(f"ERRO: Não foi possível ligar o pad dinâmico.")
def start(self, pgie_config_path: str, source_type: str, source_address: str): if self.is_running: print("AVISO: Pipeline já está em execução.") return self.current_source_type = source_type self.pipeline_start_time = time.time() self.last_fps_calc_time = time.time() # Inicia o contador de FPS self.frame_count_for_fps = 0 self.calculated_fps = 0.0
print(f"INFO: Iniciando pipeline com fonte '{source_type}' no endereço '{source_address}'") self.pipeline = Gst.Pipeline.new("deepstream-pipeline") common_elements = { "streammux": Gst.ElementFactory.make("nvstreammux", "stream-muxer"), "pgie": Gst.ElementFactory.make("nvinfer", "primary-inference"), "tracker": Gst.ElementFactory.make("nvtracker", "object-tracker"), "nvconv": Gst.ElementFactory.make("nvvideoconvert", "nvvid-converter"), "nvosd": Gst.ElementFactory.make("nvdsosd", "onscreen-display"), "sink": Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer") }
for name, elem in common_elements.items(): if not elem: raise Exception(f"Falha ao criar o elemento: {name}") self.pipeline.add(elem) controls = self.camera_state.get_all_controls() if source_type == 'Câmera OAK': source_elements = { "appsrc": Gst.ElementFactory.make("appsrc", "oak-video-source"), "parser": Gst.ElementFactory.make("h265parse", "h265-parser"), "decoder": Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder"), } for name, elem in source_elements.items(): if not elem: raise Exception(f"Falha ao criar o elemento da fonte OAK: {name}") self.pipeline.add(elem)
appsrc = source_elements['appsrc'] caps = Gst.Caps.from_string("video/x-h265, stream-format=(string)byte-stream, alignment=(string)au") appsrc.set_property("caps", caps) appsrc.set_property("format", Gst.Format.TIME) appsrc.set_property("is-live", True) appsrc.set_property("do-timestamp", True) appsrc.set_property("max-buffers", 3) appsrc.set_property("leaky-type", 2) # Descarta buffers antigos se o pipeline estiver lento source_elements['appsrc'].link(source_elements['parser']) source_elements['parser'].link(source_elements['decoder']) sinkpad = common_elements['streammux'].get_request_pad("sink_0") srcpad = source_elements['decoder'].get_static_pad("src") srcpad.link(sinkpad) try: device_info = dai.DeviceInfo(source_address) self.oak_camera = OakCameraManager(device_info, self.camera_state, self.latest_metadata, self.metadata_lock) self.camera_thread = threading.Thread(target=self.oak_camera.run, args=(appsrc,)) self.camera_thread.start() except RuntimeError as e: print(f"ERRO: Falha ao conectar à Câmera OAK {source_address}. {e}") return
elif source_type == 'Stream RTSP': source_elements = { "rtspsrc": Gst.ElementFactory.make("rtspsrc", "rtsp-source"), "depay": Gst.ElementFactory.make("rtph265depay", "rtp-h265-depay"), "parser": Gst.ElementFactory.make("h265parse", "h265-parser"), "decoder": Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder"), } for name, elem in source_elements.items(): if not elem: raise Exception(f"Falha ao criar o elemento da fonte RTSP: {name}") self.pipeline.add(elem)
# Configura rtspsrc source_elements['rtspsrc'].set_property('location', source_address) source_elements['rtspsrc'].set_property('latency', 3000) source_elements['rtspsrc'].set_property('protocols', 'tcp')
# Link da fonte RTSP (com callback dinâmico) depay_sink_pad = source_elements['depay'].get_static_pad("sink") source_elements['rtspsrc'].connect("pad-added", self.on_pad_added, depay_sink_pad) source_elements['depay'].link(source_elements['parser']) source_elements['parser'].link(source_elements['decoder'])
# Link da fonte para o streammux sinkpad = common_elements['streammux'].get_request_pad("sink_0") srcpad = source_elements['decoder'].get_static_pad("src") srcpad.link(sinkpad)
# --- Link da parte comum (a mesma para ambas as fontes) --- common_elements['streammux'].link(common_elements['pgie']) common_elements['pgie'].link(common_elements['tracker']) common_elements['tracker'].link(common_elements['nvconv'])
# common_elements['streammux'].link(common_elements['nvconv'])
common_elements['nvconv'].link(common_elements['nvosd']) common_elements['nvosd'].link(common_elements['sink'])
crop_key = controls.get('tamanho_corte', "1280x720 (720p)") CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']
# --- Configuração dos elementos comuns --- if source_type == 'Câmera OAK': res_key = controls.get('resolucao_fps', "1080p @ 30 FPS") _, _, FPS = PRESETS_RESOLUCAO[res_key] else: # RTSP FPS = 30.0 # Configura os elementos que dependem dos parâmetros de start common_elements['streammux'].set_property("width", CROP_WIDTH) common_elements['streammux'].set_property("height", CROP_HEIGHT) common_elements['streammux'].set_property("batch-size", 1) common_elements['streammux'].set_property("live-source", True) common_elements['streammux'].set_property("batched-push-timeout", 40000) common_elements['streammux'].set_property("nvbuf-memory-type", 1) common_elements['streammux'].set_property("compute-hw", 1) if FPS > 0: frame_duration_ns = int(1_000_000_000 / FPS) else: frame_duration_ns = 33333333 # Padrão para 30 FPS se FPS for 0 print(f"INFO: Configurando streammux para {FPS:.2f} FPS (duração do frame: {frame_duration_ns} ns)") common_elements['streammux'].set_property("frame-duration", frame_duration_ns) common_elements['pgie'].set_property("config-file-path", pgie_config_path) common_elements['tracker'].set_property("compute-hw", 1) # Usar 0-CPU 1-GPU 2-VIC(Jetson) para o self.tracker common_elements['tracker'].set_property('user-meta-pool-size', 200) # Tamanho do pool de metadados do usuário common_elements['tracker'].set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream-7.1/lib/libnvds_nvmultiobjecttracker.so') common_elements['tracker'].set_property('ll-config-file', '/opt/nvidia/deepstream/deepstream-7.1/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml') common_elements['nvosd'].set_property("process-mode", 1) common_elements['nvosd'].set_property("qos", True)
common_elements['sink'].set_property("sync", True) common_elements['sink'].set_property("qos", True)
osdsinkpad = common_elements['nvosd'].get_static_pad("sink") osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, self.osd_sink_pad_buffer_probe, 0) bus = self.pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self.bus_call, self.loop)
print("INFO: Mudando estado da pipeline para PLAYING...") self.pipeline.set_state(Gst.State.PLAYING) self.is_running = True
def stop(self): if not self.is_running: return print("INFO: Parando pipeline...") # Para a thread da câmera primeiro, se ela existir if self.oak_camera: self.oak_camera.stop() self.camera_thread.join(timeout=2) self.oak_camera = None self.camera_thread = None
if self.pipeline: self.pipeline.set_state(Gst.State.NULL) self.pipeline = None
# Reseta o estado para a próxima execução self.trajectories.clear() self.counted_object_ids.clear() self.line_crossing_counter = 0 if self.counter_callback: self.counter_callback(0) self.is_running = False self.pipeline_start_time = None print("INFO: Pipeline parado.")
def cleanup(self): # Método para ser chamado ao fechar a aplicação self.stop() self.loop.quit() print("INFO: Loop principal do GStreamer finalizado.")
def bus_call(self, bus, message, loop): t = message.type if t == Gst.MessageType.EOS or t == Gst.MessageType.ERROR: err, debug = (message.parse_error() if t == Gst.MessageType.ERROR else (None, "Fim do Stream")) sys.stderr.write(f"Mensagem do Bus: {err} ({debug})\n") self.stop() return True
def osd_sink_pad_buffer_probe(self, pad, info, u_data): gst_buffer = info.get_buffer() if not gst_buffer: return Gst.PadProbeReturn.OK
current_time = time.time()
# Calcula o FPS a cada segundo self.frame_count_for_fps += 1 if current_time - self.last_fps_calc_time >= 1.0: self.calculated_fps = self.frame_count_for_fps / (current_time - self.last_fps_calc_time) self.last_fps_calc_time = current_time self.frame_count_for_fps = 0 # Calcula o tempo de execução (uptime) if self.pipeline_start_time: uptime_seconds = int(current_time - self.pipeline_start_time) uptime_str = str(timedelta(seconds=uptime_seconds)) else: uptime_str = "0:00:00"
# Pega a hora atual formatada now_str = datetime.now().strftime("%H:%M:%S")
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)) l_frame = batch_meta.frame_meta_list while l_frame is not None: try: frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data) except StopIteration: break
current_frame_object_count = 0 display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta) controls = self.camera_state.get_all_controls()
crop_key = controls.get('tamanho_corte', "1024x1024") CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size'] _, _, FPS = PRESETS_RESOLUCAO[controls.get('resolucao_fps')] # Pega o FPS para o frame-duration
line_x1 = controls.get('line_coord_x1', 0) line_y1 = controls.get('line_coord_y1', 0) line_x2 = controls.get('line_coord_x2', 0) line_y2 = controls.get('line_coord_y2', 0) lines_to_draw = [] crossing_line_color = (1.0, 0.0, 0.0, 7.0) # Vermelho lines_to_draw.append(((line_x1, line_y1), (line_x2, line_y2), crossing_line_color, 3))
total_detections_in_frame = 0
l_obj = frame_meta.obj_meta_list while l_obj is not None: try: obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data) total_detections_in_frame += 1 obj_color = get_color_for_class(obj_meta.class_id, self.dynamic_class_colors) current_frame_object_count += 1 center_x = int(obj_meta.rect_params.left + obj_meta.rect_params.width / 2) center_y = int(obj_meta.rect_params.top + obj_meta.rect_params.height / 2) if obj_meta.object_id not in self.trajectories: self.trajectories[obj_meta.object_id] = collections.deque(maxlen=self.MAX_TRAJECTORY_LENGTH) self.trajectories[obj_meta.object_id].append((center_x, center_y)) traj_points = self.trajectories[obj_meta.object_id]
for i in range(len(traj_points) - 1): lines_to_draw.append((traj_points[i], traj_points[i+1], obj_color, 2))
if len(traj_points) > 1 and obj_meta.object_id not in self.counted_object_ids: p1_line = (line_x1, line_y1) p2_line = (line_x2, line_y2) if intersects(traj_points[-2], traj_points[-1], p1_line, p2_line): self.line_crossing_counter += 1 self.counted_object_ids.add(obj_meta.object_id) self.line_crossing_class_counter[obj_meta.obj_label] += 1 if self.counter_callback: self.counter_callback(self.line_crossing_counter) obj_meta.text_params.display_text = f"ID: {obj_meta.object_id} {obj_meta.obj_label} {obj_meta.confidence:.2%}" obj_meta.rect_params.border_color.set(obj_color[0], obj_color[1], obj_color[2], obj_color[3]) obj_meta.rect_params.border_width = 2 except StopIteration: break l_obj = l_obj.next # --- ADICIONADO: Desenhar Retângulo de Seleção Interativo --- # 1. Pega as coordenadas mais recentes do estado if self.current_source_type == 'Câmera OAK': controls = self.camera_state.get_all_controls() rect_x = controls.get('selection_region_x', 0) rect_y = controls.get('selection_region_y', 0) rect_w = controls.get('selection_region_w', 0) rect_h = controls.get('selection_region_h', 0)
if rect_w > 0 and rect_h > 0 and display_meta.num_rects < len(display_meta.rect_params): rect_params = display_meta.rect_params[display_meta.num_rects] rect_params.left = int(rect_x) rect_params.top = int(rect_y) rect_params.width = int(rect_w) rect_params.height = int(rect_h) rect_params.border_width = 2 rect_params.border_color.set(1.0, 1.0, 0.0, 0.5) # Amarelo rect_params.has_bg_color = 0 display_meta.num_rects += 1
max_lines = len(display_meta.line_params) display_meta.num_lines = min(len(lines_to_draw), max_lines) for i in range(display_meta.num_lines): start, end, color, width = lines_to_draw[i] line = display_meta.line_params[i] line.x1, line.y1, line.x2, line.y2 = start[0], start[1], end[0], end[1] line.line_width = width line.line_color.set(color[0], color[1], color[2], color[3])
# ### NOVO: Lógica para desenhar os metadados ### # 1. Faz uma cópia segura dos metadados mais recentes with self.metadata_lock: current_metadata = self.latest_metadata.copy()
controls = self.camera_state.get_all_controls() show_metadata_flag = controls.get('show_metadata', True) # Pega o valor do checkbox
# Pega a resolução de corte atual para a verificação de tamanho crop_key = controls.get('tamanho_corte', default_oak_crop_key) CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size'] _, _, FPS = PRESETS_RESOLUCAO[controls.get('resolucao_fps')]
# 2. Prepara os textos a serem exibidos if show_metadata_flag and (CROP_WIDTH >= 500 and CROP_HEIGHT >= 500): # 2. Prepara os textos a serem exibidos (lógica movida para dentro do if) metadata_texts = [] if current_metadata: metadata_texts.append(f"Foco: {current_metadata.get('lens_pos', 'N/A')} | Raw: {current_metadata.get('lens_pos_raw', -1.0):.2f}") metadata_texts.append(f"Exposição: {current_metadata.get('exposure_us', 'N/A')} us") metadata_texts.append(f"ISO: {current_metadata.get('iso', 'N/A')}") metadata_texts.append(f"Balanço Branco: {current_metadata.get('color_temp', 'N/A')} K") metadata_texts.append(f"Resolução: {CROP_WIDTH}x{CROP_HEIGHT} @ {FPS:.0f} FPS")
# 3. Adiciona os textos ao display_meta do DeepStream # Começamos com o contador que já existia display_meta.num_labels = 1 text_params = display_meta.text_params[0] text_params.display_text = f"Total Detectado: {current_frame_object_count} | Contagem: {self.line_crossing_counter}" text_params.x_offset, text_params.y_offset = 10, 12 text_params.font_params.font_name, text_params.font_params.font_size = "Serif", 12 text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0) text_params.set_bg_clr = 1 text_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.5)
# 1. Ordena o contador GLOBAL de cruzamentos por classe para o Top 3 sorted_counts = sorted(self.line_crossing_class_counter.items(), key=lambda item: item[1], reverse=True) top_3_crossings = sorted_counts[:3]
# 2. Monta a string de exibição com múltiplas linhas display_text_lines = [] display_text_lines.append(f"Contagem Total Cruzamento: {self.line_crossing_counter}") display_text_lines.append(f"Total na Cena: {total_detections_in_frame}") if top_3_crossings: display_text_lines.append("--- Top 3 Cruzamentos ---") for i, (label, count) in enumerate(top_3_crossings): display_text_lines.append(f"{i+1}. {label}: {count}")
info_text = "\n".join(display_text_lines)
display_meta.num_labels = 1 # Começamos com 1 rótulo (que terá várias linhas) text_params = display_meta.text_params[0]
text_params.display_text = info_text text_params.x_offset, text_params.y_offset = 10, 12 text_params.font_params.font_name, text_params.font_params.font_size = "Serif", 12 text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0) # Cor branca para o texto text_params.set_bg_clr = 1 text_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.4)
# Adiciona os novos textos de metadados if metadata_texts: line_height = 22 # Altura aproximada de cada linha de texto bottom_padding = 10 # Espaço a partir da borda inferior
# <--- MUDANÇA: Calcula a posição Y inicial a partir da base da imagem # A fórmula é: AlturaTotal - (NúmeroDeLinhas * AlturaPorLinha) - Padding y_offset = CROP_HEIGHT - (len(metadata_texts) * line_height) - bottom_padding for text in metadata_texts: if display_meta.num_labels < len(display_meta.text_params): text_params = display_meta.text_params[display_meta.num_labels] text_params.display_text = text text_params.x_offset, text_params.y_offset = 10, y_offset text_params.font_params.font_name, text_params.font_params.font_size = "Serif", 12 text_params.font_params.font_color.set(0.0, 1.0, 0.0, 1.0) # Verde text_params.set_bg_clr = 1 text_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.4) display_meta.num_labels += 1 y_offset += line_height # NOVO: Adiciona o texto de telemetria (canto superior direito) if display_meta.num_labels < len(display_meta.text_params): telemetry_text = ( f"FPS: {self.calculated_fps:.1f}\n" f"Hora: {now_str}\n" f"Execução: {uptime_str}" ) text_params = display_meta.text_params[display_meta.num_labels] text_params.display_text = telemetry_text # Posiciona no canto superior direito # O 'x_offset' pode precisar de ajuste dependendo da sua resolução de saída text_params.x_offset = CROP_WIDTH - 180 # Ajuste conforme necessário text_params.y_offset = CROP_HEIGHT - (3 * line_height) - 10 # 3 linhas de texto + padding text_params.font_params.font_name, text_params.font_params.font_size = "Serif", 12 text_params.font_params.font_color.set(0.0, 1.0, 1.0, 1.0) # Cor Ciano text_params.set_bg_clr = 1 text_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.1) display_meta.num_labels += 1
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta) l_frame = l_frame.next return Gst.PadProbeReturn.OK