- I have a code written with the API and mediapipe to get skeletal depth data using left/right mono cameras on the Oak-D.
- I want to just simply replace the monoLeft and monoRight cameras with video footages recorded from the cameras.
- What's the easiest way to go about this? (Some of the example codes I found on the documentation a bit too confusing for me.)
Here is my code:
#!/usr/bin/env python3
import depthai as dai
import mediapipe as mp
from calc import HostSpatialsCalc
from Jae_HHI_PoseEstimation.archive.utility import *
import csv
import time
import os
from datetime import date
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)
# Properties
monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)
stereo.initialConfig.setConfidenceThreshold(255)
stereo.setLeftRightCheck(True)
stereo.setSubpixel(False)
#LINKING
xoutRight = pipeline.create(dai.node.XLinkOut)
xoutRight.setStreamName('right')
monoRight.out.link(xoutRight.input)
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)
xoutDepth = pipeline.create(dai.node.XLinkOut)
xoutDepth.setStreamName("depth")
stereo.depth.link(xoutDepth.input)
#pose
mpDraw = mp.solutions.drawing_utils
mpPose = mp.solutions.pose
pose = mpPose.Pose()
#Check if CSV file name exists. If so, increment file name.
today = str(date.today())
i = 0
while os.path.exists("OakLandMark" + today + "_%s" % i):
i += 1
filename = "OakLandMark" + today + "_%s" % i
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queues will be used to get the grayscale frames from the outputs defined above
qRight = device.getOutputQueue(name="right", maxSize=4, blocking=False)
depthQueue = device.getOutputQueue(name="depth")
device.setIrLaserDotProjectorBrightness(200) # in mA, 0..1200
device.setIrFloodLightBrightness(1000) # in mA, 0..1500
#spatials
hostSpatials = HostSpatialsCalc(device)
delta = 1
hostSpatials.setDeltaRoi(delta)
#CSV WRITING
with open(filename, 'a', newline='') as csvfile:
# SET UP DICTIONARY FOR CSV WRITING
fieldnames = [.....(it's really long so I took it out to declutter).....]
list_of_zeroes = [0] * 100
landmark_dict = dict(zip(fieldnames, list_of_zeroes))
dataWriter = csv.DictWriter(csvfile, fieldnames=fieldnames)
dataWriter.writeheader()
#LANDMARK DETECTION, DISPLAY, CSV DATA WRITING
while True:
# Instead of get (blocking), we use tryGet (non-blocking) which will return the available data or None otherwise
inRight = qRight.tryGet()
depthData = depthQueue.get()
if inRight is not None:
rgb_img = cv2.cvtColor(inRight.getCvFrame(), cv2.COLOR_GRAY2RGB)
results = pose.process(rgb_img)
if results.pose_landmarks:
#draw landmark onto image
mpDraw.draw_landmarks(rgb_img, results.pose_landmarks, mpPose.POSE_CONNECTIONS)
cv2.imshow("Image", rgb_img)
i=1
for id, lm in enumerate(results.pose_landmarks.landmark):
h, w, c = rgb_img.shape
cx, cy = int(lm.x * w), int(lm.y * h)
spatials, centroid = hostSpatials.calc_spatials(depthData, (cx, cy)) # centroid == x/y in our case
#append spatials to dictionary and then write to CSV
landmark_dict[fieldnames[i]] = spatials['x']
landmark_dict[fieldnames[i+1]] = spatials['y']
landmark_dict[fieldnames[i+2]] = spatials['z']
i+=3
landmark_dict[fieldnames[0]] = int(round(time.time() * 1000)%1000000)
dataWriter.writerow(landmark_dict)
if cv2.waitKey(1) == ord('q'):
break