jakaskerl
Hey thanks for your response. I am able to solve the issue.
Below is the code attached you can check & also let me know if the approach is right or not
import os
import time
import cv2
import depthai as dai
import subprocess
class VideoCapture:
def __init__(self, save_directory="./captured_videos", image_directory="./captured_images"):
self.save_directory = save_directory # Directory to save videos
os.makedirs(self.save_directory, exist_ok=True)
self.image_directory = image_directory # Directory to save images
os.makedirs(self.image_directory, exist_ok=True)
self.is_recording = False
self.h265_filename = None
self.h265_filepath = None
def start_capture(self):
# Create a pipeline
pipeline = dai.Pipeline()
# Create color camera node
color_camera = pipeline.create(dai.node.ColorCamera)
color_camera.setBoardSocket(dai.CameraBoardSocket.CAM_A)
color_camera.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K)
color_camera.setInterleaved(False)
color_camera.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
color_camera.setFps(18) # Lower frame rate for testing
# Create video encoder node
video_encoder = pipeline.create(dai.node.VideoEncoder)
video_encoder.setDefaultProfilePreset(18, dai.VideoEncoderProperties.Profile.H265_MAIN)
# Create XLinkOut for encoded stream
xout_h265 = pipeline.create(dai.node.XLinkOut)
xout_h265.setStreamName("h265")
# Create XLinkOut for preview stream
xout_preview = pipeline.create(dai.node.XLinkOut)
xout_preview.setStreamName("video")
# Link nodes
color_camera.video.link(video_encoder.input) # Link camera to encoder
video_encoder.bitstream.link(xout_h265.input) # Link encoder output to XLinkOut for recording
color_camera.video.link(xout_preview.input) # Link camera output to XLinkOut for displaying
# Start device with the defined pipeline
with dai.Device(pipeline) as device:
print("Device connected successfully.")
q_h265 = device.getOutputQueue(name="h265", maxSize=30, blocking=False)
q_preview = device.getOutputQueue(name="video", maxSize=30, blocking=False)
while True:
try:
# Try to get a frame from the preview queue without blocking
preview_frame = q_preview.tryGet()
if preview_frame is not None:
frame = preview_frame.getCvFrame()
cv2.imshow("Live Feed", frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # Quit on 'q'
break
elif key == ord('c'): # Capture image on 'c'
self.capture_image(frame)
elif key == ord('r'): # Toggle recording on 'r'
if not self.is_recording:
self.start_recording()
else:
self.stop_recording()
if self.is_recording:
h265_packet = q_h265.tryGet()
if h265_packet is not None:
data = h265_packet.getData()
with open(self.h265_filepath, 'ab') as f: # Append binary data to the file
f.write(data)
print(f"Captured {len(data)} bytes") # Log size of captured data
except Exception as e:
print(f"Error in main loop: {e}") # Print any errors encountered
cv2.destroyAllWindows()
def start_recording(self):
timestamp = time.strftime("%Y%m%d_%H%M%S")
self.h265_filename = f"captured_video_{timestamp}.h265"
self.h265_filepath = os.path.join(self.save_directory, self.h265_filename)
print(f"Recording started: {self.h265_filepath}")
self.is_recording = True
def stop_recording(self):
print("Recording stopped.")
if self.h265_filepath:
self.convert_to_mp4()
self.is_recording = False
def capture_image(self, frame):
timestamp = time.strftime("%Y%m%d_%H%M%S")
image_filename = os.path.join(self.image_directory, f"captured_image_{timestamp}.jpg")
cv2.imwrite(image_filename, frame) # Save the captured image
print(f"Image captured: {image_filename}")
def convert_to_mp4(self):
mp4_filename = os.path.splitext(self.h265_filename)[0] + ".mp4"
mp4_filepath = os.path.join(self.save_directory, mp4_filename)
ffmpeg_command = [
'ffmpeg',
'-i', self.h265_filepath,
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23', # Adjust quality (lower is better quality)
mp4_filepath
]
try:
subprocess.run(ffmpeg_command, check=True)
print(f"Conversion completed: {mp4_filepath}")
os.remove(self.h265_filepath) # Optionally delete the original H.265 file after conversion
print(f"Deleted original file: {self.h265_filepath}")
except subprocess.CalledProcessError as e:
print(f"Error during conversion: {e}")
if __name__ == "__main__":
video_capture = VideoCapture()
video_capture.start_capture()
Its working fine on macos, but on Jetson nano the video capture gets freeze, I want to run it on Jetson nano linux environment. I am stil figuring it out.