If you're interested in the code, check draw_horizon_line.py
import depthai as dai
import csv
import json
import time
import copy
from pathlib import Path
import cv2
from queue import Queue
import numpy as np
from scipy.spatial.transform import Rotation as R
from scipy.signal import butter, filtfilt
import matplotlib.pyplot as plt
from datetime import timedelta
from threading import Event, Thread
class MadgwickFilter():
def init(self, beta=0.1):
self.beta = beta
self.q = np.array([1.0, 0.0, 0.0, 0.0]) # Initial quaternion (w, x, y, z)
def update(self, gyro, accel, beta, dt):
self.beta = beta
q = self.q
# Convert gyroscope degrees to radians
gx, gy, gz = gyro
ax, ay, az = accel
# Normalize accelerometer measurement
norm_accel = np.sqrt(ax**2 + ay**2 + az**2)
ax, ay, az = ax / norm_accel, ay / norm_accel, az / norm_accel
# Quaternion components
q1, q2, q3, q4 = q
_2q1 = 2 * q1
_2q2 = 2 * q2
_2q3 = 2 * q3
_2q4 = 2 * q4
_4q1 = 4 * q1
_4q2 = 4 * q2
_4q3 = 4 * q3
_8q2 = 8 * q2
_8q3 = 8 * q3
q1q1 = q1 * q1
q1q2 = q1 * q2
q1q3 = q1 * q3
q1q4 = q1 * q4
q2q2 = q2 * q2
q2q3 = q2 * q3
q2q4 = q2 * q4
q3q3 = q3 * q3
q3q4 = q3 * q4
q4q4 = q4 * q4
# Compute the gradient descent step
f1 = 2 * (q2 * q4 - q1 * q3) - ax
f2 = 2 * (q1 * q2 + q3 * q4) - ay
f3 = 1 - 2 * (q2q2 + q3q3) - az
J = np.array([
[-_2q3, _2q4, -_2q1, _2q2],
[_2q2, _2q1, _2q4, _2q3],
[0, -4 * q2, -4 * q3, 0],
])
step = np.dot(J.T, np.array([f1, f2, f3]))
step = step / np.linalg.norm(step)
step *= self.beta
# Update quaternion
q[0] += (-q[1] * gx - q[2] * gy - q[3] * gz) * (0.5 * dt) + step[0]
q[1] += (q[0] * gx + q[2] * gz - q[3] * gy) * (0.5 * dt) + step[1]
q[2] += (q[0] * gy - q[1] * gz + q[3] * gx) * (0.5 * dt) + step[2]
q[3] += (q[0] * gz + q[1] * gy - q[2] * gx) * (0.5 * dt) + step[3]
# Normalize quaternion
norm = np.sqrt(np.dot(q, q))
self.q = q / norm
def get_quaternion(self):
return self.q
def stabilize_image(image, roll, pitch):
(h, w) = image.shape[:2]
center = (w / 2, h / 2)
# Compute the rotation matrix for roll
M_roll = cv2.getRotationMatrix2D(center, -roll, 1.0) # Roll angle negated for stabilization
# Compute the rotation matrix for pitch by rotating around the horizontal axis
pitch_radians = -np.radians(pitch)
M_pitch = np.array([[1, 0, 0],
[0, np.cos(pitch_radians), -np.sin(pitch_radians)],
[0, np.sin(pitch_radians), np.cos(pitch_radians)]])
# Apply both transformations to the image
rotated_image_roll = cv2.warpAffine(image, M_roll, (w, h))
rotated_image = cv2.warpAffine(rotated_image_roll, M_pitch[:2], (w, h))
return rotated_image
def video_writer_thread(frame_queue, video_path, stop_event):
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = None
while not stop_event.is_set() or not frame_queue.empty():
if not frame_queue.empty():
frame = frame_queue.get()
if frame is None:
break
# Initialize video writer once we know the frame size
if out is None:
frame_height, frame_width = frame.shape[:2]
out = cv2.VideoWriter(video_path, fourcc, 24, (frame_width, frame_height))
out.write(frame)
if out:
out.release()
# viz_out = None
# while not viz_stop_event.is_set() or not viz_frame_queue.empty():
# if not viz_frame_queue.empty():
# viz_frame = viz_frame_queue.get()
# if viz_frame is None:
# break
# # Initialize video writer once we know the frame size
# if viz_out is None:
# frame_height, frame_width = viz_frame.shape[:2]
# viz_out = cv2.VideoWriter(viz_video_path, viz_fourcc, 24, (frame_width, frame_height))
# viz_out.write(viz_frame)
# if viz_out:
# viz_out.release()
def calculate_frequency(timestamps):
if len(timestamps) < 2:
return 0
# Calculate intervals as timedelta objects
intervals = np.diff(timestamps)
# # Convert timedelta objects to seconds
# intervals_in_seconds = np.array([interval for interval in intervals])
# Compute average interval in seconds
average_interval = np.mean(intervals)
return 1 / average_interval if average_interval > 0 else 0
def high_pass_filter(data, cutoff, fs, order=5):
nyquist = 0.5 * fs
normal_cutoff = cutoff / nyquist
b, a = butter(order, normal_cutoff, btype='high', analog=False)
y = filtfilt(b, a, data)
return y
def low_pass_filter(data, cutoff, fs, order=5):
nyquist = 0.5 * fs
normal_cutoff = cutoff / nyquist
b, a = butter(order, normal_cutoff, btype='low', analog=False)
y = filtfilt(b, a, data)
return y
def quaternion_to_euler(quaternion):
r = R.from_quat(quaternion)
euler_angles = r.as_euler('xyz', degrees=True)
return euler_angles
def rotate_image(image, angle):
(h, w) = image.shape[:2]
center = (w / 2, h / 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated_image = cv2.warpAffine(image, M, (w, h))
return rotated_image
def shift_for_pitch(image, pitch):
(h, w) = image.shape[:2]
# Calculate how much to shift the image based on pitch
# Scale the pitch to pixel units (adjust scaling factor as needed)
pitch_shift = int(pitch * h / 90) # Assume pitch range [-90, 90] maps to full height
# Apply the vertical shift (simulate pointing the camera up/down)
if pitch_shift > 0:
shifted_image = image[pitch_shift:, :] # Crop the top part (pointing downward)
else:
shifted_image = image[:h + pitch_shift, :] # Crop the bottom part (pointing upward)
# Resize the shifted image back to original size to keep the frame consistent
shifted_image = cv2.resize(shifted_image, (w, h))
return shifted_image
def format_euler_angles(euler_angles):
"""Format the Euler angles for display."""
roll, pitch, yaw = euler_angles
return f"Roll: {roll:.2f} Pitch: {pitch:.2f}"
def draw_horizon_line(video_frame, roll, pitch, color, f=50, sensor_width=36, sensor_height=20.25):
height, width, _ = video_frame.shape
# Convert roll and pitch to radians
roll_rad = np.radians(roll + 90)
pitch_rad = np.radians(pitch)
# Calculate rotation matrices
R_x = np.array([[1, 0, 0],
[0, np.cos(roll_rad), -np.sin(roll_rad)],
[0, np.sin(roll_rad), np.cos(roll_rad)]])
R_y = np.array([[np.cos(pitch_rad), 0, np.sin(pitch_rad)],
[0, 1, 0],
[-np.sin(pitch_rad), 0, np.cos(pitch_rad)]])
# Combined rotation matrix
R = R_y @ R_x
# Camera position and transformation matrix
t_updated_position = np.array([0, 0, 2])
T_CW = np.eye(4)
T_CW[:3, :3] = R
T_CW[:3, 3] = t_updated_position
# Define two points far ahead in the world space
P_w_camera_far_point = np.array([0, 5000, 0, 1])
P_w_camera_second_point = np.array([1000, 5000, 0, 1])
# Transform the points to camera coordinates
P_c1 = np.linalg.inv(T_CW) @ P_w_camera_far_point
P_c2 = np.linalg.inv(T_CW) @ P_w_camera_second_point
# Project the points onto the image plane
x_proj1 = (f * P_c1[0]) / -P_c1[2]
y_proj1 = (f * P_c1[1]) / -P_c1[2]
x_proj2 = (f * P_c2[0]) / -P_c2[2]
y_proj2 = (f * P_c2[1]) / -P_c2[2]
# Scaling factors for projection
alpha_u = 1 / sensor_width
alpha_v = 1 / sensor_height
# Convert to pixel coordinates
u1 = int((x_proj1 * alpha_u + 0.5) * width)
v1 = int((0.5 - y_proj1 * alpha_v) * height)
u2 = int((x_proj2 * alpha_u + 0.5) * width)
v2 = int((0.5 - y_proj2 * alpha_v) * height)
# Calculate the slope of the line (in image space)
slope = (v2 - v1) / (u2 - u1)
# Define the line extension to the image boundaries
u_start = 0
u_end = width
v_start = int(v1 + slope * (u_start - u1))
v_end = int(v1 + slope * (u_end - u1))
# Draw the horizon line
cv2.line(video_frame, (u_start, v_start), (u_end, v_end), color, 2) # White horizon line
plot_angles = True
record = True
use_compl = False
stabilize = True
Create pipeline
pipeline = dai.Pipeline()
Define sources and outputs
imu = pipeline.create(dai.node.IMU)
cam_rgb = pipeline.create(dai.node.ColorCamera)
sync = pipeline.create(dai.node.Sync)
xout_grp = pipeline.create(dai.node.XLinkOut)
xout_grp.setStreamName("xout")
Configure the IMU sensors
imu.enableIMUSensor(dai.IMUSensor.ACCELEROMETER_RAW, 200)
imu.enableIMUSensor(dai.IMUSensor.GYROSCOPE_RAW, 200)
imu.enableIMUSensor(dai.IMUSensor.MAGNETOMETER_RAW, 100) # Add magnetometer raw data
imu.enableIMUSensor(dai.IMUSensor.ROTATION_VECTOR, 200)
imu.setBatchReportThreshold(1)
imu.setMaxBatchReports(10)
Configure the color camera
cam_rgb.setBoardSocket(dai.CameraBoardSocket.CAM_D)
cam_rgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
cam_rgb.setFps(24)
Apply flipping correction manually via transformation matrix
#cam_rgb.initialControl.setRotation(180) # This should correctly flip the camera
sync.setSyncThreshold(timedelta(milliseconds=10))
sync.setSyncAttempts(-1)
cam_rgb.video.link(sync.inputs["video"])
imu.out.link(sync.inputs["imu"])
sync.out.link(xout_grp.input)
Connect to the device and start the pipeline
with dai.Device(pipeline) as device:
dt = 1 / 24 # 210
fs = 1 / dt
# synced queue
grp_q = device.getOutputQueue(name="xout", maxSize=30, blocking=True)
lp_alpha = 0.5
hp_alpha = 0.05
cf_alpha_r = 0.2
cf_alpha_p = 0.2
md_beta = 0.033
madgwick = MadgwickFilter(beta=md_beta)
if use_compl:
unfilt_gr = [0]
unfilt_gp = [0]
unfilt_gy = [0]
unfilt_ar = []
unfilt_ap = []
unfilt_ay = []
if record:
print("Recording...")
print()
# Open CSV file for writing roll, pitch, and yaw estimates
imu_data_path = "imu_data.json"
imu_data_dict = {}
imu_data_count = 0
frame_counter = 0
# Initialize frame queue and video writer thread
frame_queue = Queue()
stop_event = Event()
# Start video writer thread
video_thread = Thread(target=video_writer_thread, args=(frame_queue, "recorded_vid.mp4", stop_event))
video_thread.start()
if plot_angles:
frame_rpy = []
accel_timestamps = []
gyro_timestamps = []
vid_timestamps = []
axs = []
ays = []
azs = []
acc_mean_size = 50
try:
while True:
start_time = time.time()
if record:
imu_packets_list = []
grp_msg = grp_q.get()
video_frame = grp_msg["video"]
frame = video_frame.getCvFrame()
frame = rotate_image(frame, 180)
frame_height, frame_width = frame.shape[:2]
if record:
frame_queue.put(copy.deepcopy(frame))
imuData = grp_msg["imu"]
imuPackets = imuData.packets
print(f"imu packet length: {len(imuPackets)}")
imu_ts = imuData.getTimestampDevice().total_seconds()
vid_ts = video_frame.getTimestampDevice().total_seconds()
qs = []
gyro_x = []
gyro_y = []
gyro_z = []
accel_x = []
accel_y = []
accel_z = []
for imuPacket in imuPackets:
# print(dir(imuPacket))
if record:
imu_data_count += 1
acceleroValues = imuPacket.acceleroMeter
gyroValues = imuPacket.gyroscope
magnetoValues = imuPacket.magneticField # Magnetometer data
rotationVector = imuPacket.rotationVector
quaternion = [rotationVector.i, rotationVector.j, rotationVector.k, rotationVector.real]
accelero_ts = acceleroValues.getTimestampDevice().total_seconds()
gyro_ts = gyroValues.getTimestampDevice().total_seconds()
if record:
packet_data = {
"video_ts": vid_ts,
"imu_ts": imu_ts,
"accelero_ts": accelero_ts,
"gyro_ts": gyro_ts,
"accel_x": acceleroValues.x,
"accel_y": acceleroValues.y,
"accel_z": acceleroValues.z,
"gyro_x": gyroValues.x,
"gyro_y": gyroValues.y,
"gyro_z": gyroValues.z,
"quaternion (i, j, k, real)": quaternion
}
imu_packets_list.append(packet_data)
qs.append(quaternion)
gyro_x.append(gyroValues.x)
gyro_y.append(gyroValues.y)
gyro_z.append(gyroValues.z)
accel_x.append(acceleroValues.x)
accel_y.append(acceleroValues.y)
accel_z.append(acceleroValues.z)
accel_timestamps.append(accelero_ts)
gyro_timestamps.append(gyro_ts)
vid_timestamps.append(vid_ts)
gyro_freq = calculate_frequency(gyro_timestamps)
accel_freq = calculate_frequency(accel_timestamps)
fps = calculate_frequency(vid_timestamps)
print(f"Gyroscope Frequency: {gyro_freq:.2f} Hz")
print(f"Accelerometer Frequency: {accel_freq:.2f} Hz")
print(f"FPS: {fps:.2f}")
print()
curr_q = np.mean(qs, 0)
euler_angles = quaternion_to_euler(curr_q)
euler_angles[0] -= 90
euler_angles[1] *= -1
# euler_angles[1] /= 2
lr = euler_angles[0]
lp = euler_angles[1]
ly = euler_angles[2]
gx = np.mean(gyro_x, 0)
gy = np.mean(gyro_y, 0)
gz = np.mean(gyro_z, 0)
ax = np.mean(accel_x, 0)
ay = np.mean(accel_y, 0)
az = np.mean(accel_z, 0)
axs.append(ax)
ays.append(ay)
azs.append(az)
if use_compl:
ugr = unfilt_gr[-1] + gx * dt
ugp = unfilt_gp[-1] + gz * dt
ugy = unfilt_gy[-1] + gy * dt
unfilt_gr.append(ugr)
unfilt_gp.append(ugp)
unfilt_gy.append(ugy)
uar = np.arctan2(ay, az)
uap = np.arctan2(-ax, np.sqrt(ay ** 2 + az ** 2))
unfilt_ar.append(uar)
unfilt_ap.append(uap)
if len(unfilt_ar) > 18:
accel_roll = low_pass_filter(unfilt_ar, lp_alpha, fs)
accel_pitch = low_pass_filter(unfilt_ap, lp_alpha, fs)
gyro_roll = high_pass_filter(unfilt_gr, hp_alpha, fs)
gyro_pitch = high_pass_filter(unfilt_gp, hp_alpha, fs)
else:
accel_roll = unfilt_ar
accel_pitch = unfilt_ap
gyro_roll = unfilt_gr
gyro_pitch = unfilt_gp
gr = np.rad2deg(gyro_roll[-1]) # * 10 - 10
gp = np.rad2deg(gyro_pitch[-1]) / 2 # * 3
ar = np.rad2deg(accel_roll[-1]) - 90
ap = - np.rad2deg(accel_pitch[-1]) / 2 # - 5
cr = cf_alpha_r * gr + (1 - cf_alpha_r) * ar
cp = cf_alpha_p * gp + (1 - cf_alpha_p) * ap
cp += 1
gyro_data = np.array([gx, gy, gz])
accel_data = np.array([ax, ay, az])
if accel_freq != 0:
dt = 1 / accel_freq
else:
dt = 1 / 24
madgwick.update(gyro=gyro_data, accel=accel_data, beta=md_beta, dt=dt)
mq = madgwick.get_quaternion()
new_mq = np.array([mq[1], mq[2], mq[3], mq[0]])
mr, mp, my = quaternion_to_euler(new_mq)
mr += 90
mp /= 2
if len(axs) > acc_mean_size:
curr_ax = np.mean(axs[-acc_mean_size:])
curr_ay = np.mean(ays[-acc_mean_size:])
curr_az = np.mean(azs[-acc_mean_size:])
else:
curr_ax = axs[-1]
curr_ay = ays[-1]
curr_az = azs[-1]
# if curr_ax < -1.5:
# mp += 5
# elif curr_ax > 1.5:
# mp -= 5
if plot_angles:
frame_rpy.append([lr, lp, ly, mr, mp])
if not stabilize:
euler_text = format_euler_angles([lr, lp, 0])
madg_text = format_euler_angles([mr, mp, my])
if use_compl:
compl_text = format_euler_angles([cr, cp, 0])
viz_frame = cv2.putText(frame, f"Lux: {euler_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
viz_frame = cv2.putText(viz_frame, f"Madg: {madg_text}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
viz_frame = cv2.putText(viz_frame, f"FPS: {fps:.2f}", (1550, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
if use_compl:
viz_frame = cv2.putText(viz_frame, f"Compl: {compl_text}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2, cv2.LINE_AA)
viz_frame = cv2.putText(viz_frame, f"Madg Beta: {md_beta:.3f}", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 165, 255), 2, cv2.LINE_AA)
viz_frame = cv2.putText(viz_frame, f"Acc: ax: {curr_ax:.2f}, ay: {curr_ay:.2f}, az: {curr_az:.2f}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
draw_horizon_line(viz_frame, lp, lr, color=(0, 255, 0))
draw_horizon_line(viz_frame, mp, mr, color=(0, 0, 255))
if use_compl:
draw_horizon_line(viz_frame, cp, cr, color=(255, 0, 255))
if stabilize:
stabilized_frame = rotate_image(frame, -lr)
stabilized_frame = shift_for_pitch(stabilized_frame, -lp)
cv2.imshow("Stabilized Video", stabilized_frame)
else:
cv2.imshow("frame", viz_frame)
if record:
imu_data_dict[frame_counter] = imu_packets_list
frame_counter += 1
# viz_frame_queue.put(viz_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('w'):
md_beta -= 0.1
elif key == ord('e'):
md_beta += 0.1
except KeyboardInterrupt:
print("Loop terminated!")
if record:
stop_event.set()
# viz_stop_event.set()
video_thread.join()
with open(imu_data_path, 'w') as json_file:
json.dump(imu_data_dict, json_file, indent=4)
cv2.destroyAllWindows()
if plot_angles:
fig, axs = plt.subplots(2, 1, figsize=(8, 10)) # 2 rows, 1 column
frame_rpy = np.array(frame_rpy)
lr = frame_rpy[:, 0]
lp = frame_rpy[:, 1]
ly = frame_rpy[:, 2]
mr = frame_rpy[:, 3]
mp = frame_rpy[:, 4]
axs[0].plot(lr, color='g', label='lux roll')
axs[0].plot(mr, color='r', label='madg roll')
axs[1].plot(lp, color='g', label='lux pitch')
axs[1].plot(mp, color='r', label='madg pitch')
axs[0].legend()
axs[1].legend()
plt.tight_layout() # Adjusts subplot parameters to give specified padding
plt.savefig("rp_plot.png")
plt.show()