jakaskerl
This is the code I was using to save them. There is a lot of extra code in here but I wanted to give a reproducible example so it could be run.
def processDepthFromDevice(depth_cv2_frame, max_depth):
# Handles/cleans the depth frame coming directly from the stereo node on the device
depth = depth_cv2_frame.astype('float64')
# Filter out undefined values
depth[depth > max_depth] = np.nan # Set everything past the depth cap to null/0
depth[depth == 0] = np.nan # Set everything with zero depth to nan
# Create an image of depth for cv2
depth_display_frame = (np.nan_to_num(depth) * 255 / max_depth).astype(np.uint8)
depth_display_frame = cv2.applyColorMap(depth_display_frame, cv2.COLORMAP_HOT)
return depth, depth_display_frame
def serialReader(shared, port, baudrate=9600, timeout=0.005):
try:
ser = serial.Serial(port, baudrate=baudrate, timeout=timeout)
while True:
if ser.in_waiting > 0:
gps_timestamp = dai.Clock.now().total_seconds()
gps_msg = ser.readline().decode('ascii', errors='ignore').strip()
print(gps_msg.split(',')[0:7], end='\r')
# Save the gps message if recording is active
if shared.record_bool:
gps_msg_filename = str(int((gps_timestamp - shared.clock_shift) * 1000)) + '_gps.txt'
with open(os.path.join(shared.path_to_gps_msgs, gps_msg_filename), 'w') as gps_file:
gps_file.write(gps_msg)
except Exception as e:
print(f"[Reader] Error: {e}")
finally:
try:
ser.close()
except NameError:
pass
print("[Reader] Serial port closed")
# Main code
if __name__ == '__main__':
# Set parameters for recording frames
path_to_top_level_save = r''
# Set up the parameters that need to be shared with the serial reader process
manager = multiprocessing.Manager()
shared_data = manager.Namespace()
shared_data.record_bool = False # This stays as false and gets changed by pressing 's' to record and 'p' to stop
shared_data.clock_shift = 0
# Set parameters for the oak-d pipeline
fps = 30 # Sets the frames per second, also sets the min shutter speed as a consequence
IMAGE_WIDTH, IMAGE_HEIGHT = 1280, 800
DEPTH_CAP = 15000 # The max depth in mm that is allowed to be included
# Create pipeline
pipeline = dai.Pipeline()
device = dai.Device()
# Define the nodes
camRgb = pipeline.create(dai.node.Camera)
left = pipeline.create(dai.node.MonoCamera)
right = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
imu = pipeline.create(dai.node.IMU)
sync = pipeline.create(dai.node.Sync)
x_out = pipeline.create(dai.node.XLinkOut)
# Set stream names
x_out.setStreamName("xout")
# Set properties for rgb node
rgbCamSocket = dai.CameraBoardSocket.CAM_A
camRgb.setBoardSocket(rgbCamSocket)
camRgb.setSize(1280, 800)
camRgb.setFps(fps)
camRgb.setMeshSource(dai.CameraProperties.WarpMeshSource.CALIBRATION)
# For now, RGB needs fixed focus to properly align with depth.
try:
calibData = device.readCalibration2()
lensPosition = calibData.getLensPosition(rgbCamSocket)
if lensPosition:
camRgb.initialControl.setManualFocus(lensPosition)
except:
raise
# Set properties for left/right nodes
# The disparity is computed at this resolution, then upscaled to RGB resolution
monoResolution = dai.MonoCameraProperties.SensorResolution.THE_400_P
# monoResolution = dai.MonoCameraProperties.SensorResolution.THE_800_P
left.setResolution(monoResolution)
left.setCamera("left")
left.setFps(fps)
right.setResolution(monoResolution)
right.setCamera("right")
right.setFps(fps)
# Set properties for stereo node
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setConfidenceThreshold(253)
stereo.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_5x5)
stereo.setLeftRightCheck(True) # LR-check is required for depth alignment
stereo.setSubpixel(True) # This adds a lot of lag on 800p mono resolution
stereo.setDepthAlign(rgbCamSocket)
stereo.setDisparityToDepthUseSpecTranslation(False)
stereo.setDepthAlignmentUseSpecTranslation(False)
# Set properties for IMU node
# imu.enableIMUSensor(dai.IMUSensor.ROTATION_VECTOR, 200)
imu.enableIMUSensor([dai.IMUSensor.GAME_ROTATION_VECTOR, dai.IMUSensor.MAGNETOMETER_CALIBRATED], 100)
imu.setBatchReportThreshold(1)
imu.setMaxBatchReports(1)
# Set properties for sync node
sync_threshold = 20
sync.setSyncThreshold(timedelta(milliseconds=sync_threshold))
sync.setSyncAttempts(-1) # Default, only sends message if all input nodes are in sync
# Linking
left.out.link(stereo.left)
right.out.link(stereo.right)
stereo.disparity.link(sync.inputs["disparity"])
stereo.depth.link(sync.inputs["depth"])
camRgb.video.link(sync.inputs["video"])
imu.out.link(sync.inputs["imu"])
sync.out.link(x_out.input)
# Get max disparity to normalize the disparity images
disparityMultiplier = 255.0 / stereo.initialConfig.getMaxDisparity()
# print(stereo.initialConfig.getMaxDisparity())
# Connect to device and start pipeline
with device:
# Set the debug level
# device.setLogLevel(dai.LogLevel.INFO)
# device.setLogOutputLevel(dai.LogLevel.INFO)
# Get intrinsics
calib_data = device.readCalibration()
intrinsics_A = np.array(calib_data.getCameraIntrinsics(dai.CameraBoardSocket.CAM_A, dai.Size2f(IMAGE_WIDTH, IMAGE_HEIGHT))) # This is correct one for aligned point cloud creation
depth_hfov = calib_data.getFov(dai.CameraBoardSocket.CAM_A, useSpec=False) # This shows the correct HFOV of 96.59 degrees when aligned, still need to figure out where to get VFOV
# Start the serial reader process
serial_process = multiprocessing.Process(target=serialReader, args=(shared_data, 'COM4'))
serial_process.start()
print('[Main] serialReader process started')
# Set up the cv2 window
cv2.namedWindow('rgb')
cv2.namedWindow('depth')
# Start the pipeline
device.startPipeline(pipeline)
# Turn on/off the IR dot projector or IR illumination
device.setIrLaserDotProjectorIntensity(0.0)
device.setIrFloodLightIntensity(0.0)
# Start the queue
queue = device.getOutputQueue(name='xout')
queue.setMaxSize(1)
queue.setBlocking(False)
while True:
# Get the message group from the camera
msg_grp = queue.get()
queue_timestamp = dai.Clock.now().total_seconds()
imu_msg = msg_grp['imu']
video_msg = msg_grp['video']
disp_msg = msg_grp['disparity']
depth_msg = msg_grp['depth']
# IMU
imu_roll, imu_pitch, imu_heading = calcOrientation(imu_msg.packets[-1].rotationVector)
imu_quat = imu_msg.packets[-1].rotationVector
imu_array = np.array([imu_quat.i, imu_quat.j, imu_quat.k, imu_quat.real])
imu_timestamp = imu_quat.getTimestamp().total_seconds()
# Get the magnetometer data
imu_mag = imu_msg.packets[-1].magneticField
imu_mag_array = np.array([imu_mag.x, imu_mag.y, imu_mag.z])
# Print the orientation
imuF = '{:.02f}'
# print(f'Heading [deg]: {imuF.format(imu_heading)} Roll [deg]: {imuF.format(imu_roll)} Pitch [deg]: {imuF.format(imu_pitch)}', end='\r')
# Get color frame
rgb_frame_orig = video_msg.getCvFrame()
rgb_frame_timestamp = video_msg.getTimestamp().total_seconds()
# # Get disparity frame
# disp_frame_orig = disp_msg.getCvFrame()
# disp_frame = (disp_frame_orig * disparityMultiplier).astype(np.uint8)
# disp_frame = cv2.applyColorMap(disp_frame, cv2.COLORMAP_JET)
# Get the perpendicular depth frame directly from the device
depth_frame_device_orig = depth_msg.getCvFrame()
depth_frame_timestamp = depth_msg.getTimestamp().total_seconds()
depth_device, depth_frame_device = processDepthFromDevice(depth_frame_device_orig, DEPTH_CAP)
# Measure offset between Clock.now and device time in the first couple seconds after pipeline starts
if imu_quat.getTimestampDevice().total_seconds() < 2.0:
shared_data.clock_shift = abs(imu_timestamp - imu_quat.getTimestampDevice().total_seconds())
# Save the messages if recording is active
if shared_data.record_bool:
# Save the rgb frame
rgb_msg_filename = str(int((queue_timestamp - shared_data.clock_shift) * 1000)) + '_' + str(int((rgb_frame_timestamp - shared_data.clock_shift) * 1000)) + '_rgb.npy'
np.save(os.path.join(path_to_rgb_msgs, rgb_msg_filename), rgb_frame_orig)
# Save the rgb frame as png
cv2.imwrite(os.path.join(path_to_rgb_png_msgs, rgb_msg_filename.replace('.npy', '.png')), rgb_frame_orig)
# Save the depth frame
depth_msg_filename = str(int((queue_timestamp - shared_data.clock_shift) * 1000)) + '_' + str(int((depth_frame_timestamp - shared_data.clock_shift) * 1000)) + '_depth.npy'
np.save(os.path.join(path_to_depth_msgs, depth_msg_filename), depth_device)
# Save the depth frame as png
cv2.imwrite(os.path.join(path_to_depth_png_msgs, depth_msg_filename.replace('.npy', '.png')), depth_frame_device)
# Save the IMU array
imu_msg_filename = str(int((queue_timestamp - shared_data.clock_shift) * 1000)) + '_' + str(int((imu_timestamp - shared_data.clock_shift) * 1000)) + '_imu.csv'
np.savetxt(os.path.join(path_to_imu_msgs, imu_msg_filename), imu_array, delimiter=',', fmt='%f')
# Save the magnetometer array
imu_mag_msg_filename = str(int((queue_timestamp - shared_data.clock_shift) * 1000)) + '_' + str(int((imu_timestamp - shared_data.clock_shift) * 1000)) + '_mag.csv'
np.savetxt(os.path.join(path_to_mag_msgs, imu_mag_msg_filename), imu_mag_array, delimiter=',', fmt='%f')
# Display the rgb frame
cv2.imshow('rgb', rgb_frame_orig[::2, ::2])
cv2.imshow('depth', depth_frame_device[::2, ::2])
cv2.waitKey(1)
# Check for key presses for recording functionality
if keyboard.is_pressed('s'):
if not shared_data.record_bool:
# Get the current time and date
current_datetime = datetime.now()
date_string = current_datetime.strftime('%m%d%y')
time_string = current_datetime.strftime('%H%M%S')
# Create the new directory to hold the frames for this recording session
os.chdir(path_to_top_level_save)
path_to_recording_session = os.path.join(path_to_top_level_save, str(date_string + '_' + time_string))
os.mkdir(path_to_recording_session)
# Write the intrinsic matrix
np.savetxt(os.path.join(path_to_recording_session, 'intrinsic.csv'), intrinsics_A, delimiter=',', fmt='%f')
# Make the folders for each message type
path_to_rgb_msgs = os.path.join(path_to_recording_session, 'rgb')
path_to_rgb_png_msgs = os.path.join(path_to_recording_session, 'rgb_png')
# path_to_disp_msgs = os.path.join(path_to_recording_session, 'disparity')
path_to_depth_msgs = os.path.join(path_to_recording_session, 'depth')
path_to_depth_png_msgs = os.path.join(path_to_recording_session, 'depth_png')
path_to_imu_msgs = os.path.join(path_to_recording_session, 'imu')
path_to_mag_msgs = os.path.join(path_to_recording_session, 'mag')
shared_data.path_to_gps_msgs = os.path.join(path_to_recording_session, 'gps') # Shared with serialReader process
os.mkdir(path_to_rgb_msgs)
os.mkdir(path_to_rgb_png_msgs)
# os.mkdir(path_to_disp_msgs)
os.mkdir(path_to_depth_msgs)
os.mkdir(path_to_depth_png_msgs)
os.mkdir(path_to_imu_msgs)
os.mkdir(path_to_mag_msgs)
os.mkdir(shared_data.path_to_gps_msgs)
# Print to console
print('[Main] recording started\n')
shared_data.record_bool = True
elif keyboard.is_pressed('p'):
if shared_data.record_bool:
shared_data.record_bool = False
print('[Main] recording stopped\n')