机械狗锁定目标追踪
导入一系列与参数有关的模块。
import os
from multiprocessing import Process, Queue
import numpy as np
from src.actions import SetServo, Stop, MoveForward
from src.models import YoloV5
from src.models.yolov7 import YoloV7
from src.scenes.base_scene import BaseScene
from src.utils import log
from src.utils.cv_utils import cal_iou, xyxy_to_xywh, xywh_to_xyxy, cal_inter_small
from src.utils.constant import STATE_OBSERVATION_MATRIX, STATE_TRANSITION_MATRIX, PROCESS_NOISE_COVARIANCE_MATRIX, \
OBSERVATION_NOISE_COVARIANCE_MATRIX
锁定目标追踪功能定义。
class Tracking(BaseScene):
def __init__(self, memory_name, camera_info, msg_queue):
super().__init__(memory_name, camera_info, msg_queue)
self.in_queue1 = Queue(1)
self.in_queue2 = Queue(1)
self.out_queue1 = Queue(1)
self.out_queue2 = Queue(1)
def init_state(self):
log.info(f'start init {self.__class__.__name__}')
yolov5_model_path = os.path.join(os.getcwd(), 'weights', 'yolo.om')
yolov7_model_path = os.path.join(os.getcwd(), 'weights', 'hand_det.om')
if not os.path.exists(yolov5_model_path) or not os.path.exists(yolov7_model_path):
log.error(f'Cannot find the offline inference model(.om) file needed for {self.__class__.__name__} scene.')
return True
lock_process = Process(target=YoloV7(yolov7_model_path).infer, args=(self.in_queue2, self.out_queue2))
tracking_process = Process(target=YoloV5(yolov5_model_path).infer, args=(self.in_queue1, self.out_queue1))
lock_process.start()
tracking_process.start()
log.info(f'{self.__class__.__name__} model init succ.')
self.ctrl.execute(SetServo(servo=[91, 60]))
return False
def get_start_bbox(self, tracking_bboxes, lock_bboxes):
"""
判断当前是否存在锁定目标,判断条件为:人体的检测框与手势的检测框的相交面积 / 手势检测框的面积 大于0.9 且手势为锁定手势
@param tracking_bboxes: 所有的人体检测框
@param lock_bboxes: 所有的手势检测框
@return:
"""
fives = []
for i, (x1, y1, x2, y2, cls, conf) in enumerate(lock_bboxes):
if cls != 6:
continue
fives.append([x1, y1, x2, y2])
for i, (x1, y1, x2, y2, cls, conf) in enumerate(tracking_bboxes):
human_box = [x1, y1, x2, y2]
for i, (xx1, yy1, xx2, yy2) in enumerate(fives):
hand_box = [xx1, yy1, xx2, yy2]
iou = cal_inter_small(hand_box, human_box)
if iou > 0.9:
return [x1, y1, x2, y2]
return None
def get_count(self, target_box, lock_bboxes):
"""
判断当前锁定目标是否解锁,判断条件为:人体的检测框与手势的检测框的相交面积 / 手势检测框的面积 大于0.9 且手势为解锁手势
@param target_box:当前锁定目标的检测框
@param lock_bboxes:所有的手势检测框
@return:
"""
for i, (x1, y1, x2, y2, cls, conf) in enumerate(lock_bboxes):
if cls != 8:
continue
hand_box = [x1, y1, x2, y2]
iou = cal_inter_small(hand_box, target_box)
if iou > 0.9:
return 1
return 0
def loop(self):
# 初始化循环与模型
ret = self.init_state()
if ret:
log.error(f'{self.__class__.__name__} init failed.')
return
# 初始化舵机角度
self.ctrl.execute(SetServo(servo=[91, 60]))
# 创建np数组,绑定共享内存
frame = np.ndarray((self.height, self.width, 3), dtype=np.uint8, buffer=self.broadcaster.buf)
log.info(f'{self.__class__.__name__} loop start')
locked = False
locked_count = 0
unlocked_count = 0
lost_count = 0
iou_threshold = 0.3 # 匹配时的阈值
x_posterior = None
x_speed = 70
z_speed = 40
last_action = SetServo(servo=[91, 60])
while True:
action = Stop()
if self.stop_sign.value:
break
if self.pause_sign.value:
continue
# 获取图像
img_bgr = frame.copy()
# 图像通过消息队列传递给模型
self.in_queue1.put(img_bgr)
self.in_queue2.put(img_bgr)
# 获取模型的输出
tracking_bbox = self.out_queue1.get()
lock_bbox = self.out_queue2.get()
if not locked:
# 获取锁定目标的检测框
start_bbox = self.get_start_bbox(tracking_bbox, lock_bbox)
# 当前没有找到锁定目标
if start_bbox is None:
locked_count = 0
continue
else:
locked_count += 1
log.info(
f'Gesture recognition successful, locked count incremented by one. locked count:{locked_count}')
if locked_count > 4:
locked = True
locked_count = 0
action = SetServo(servo=[91, 60])
log.info(f'locked')
# 将检测框转换为中心点坐标和宽高,初始化状态
initial_box_state = xyxy_to_xywh(start_bbox)
initial_state = np.array(
[[initial_box_state[0], initial_box_state[1], initial_box_state[2], initial_box_state[3],
0, 0]]).T # [中心x,中心y,宽w,高h,dx,dy]
if locked:
log.info(f'======================================================')
# 初始化卡尔曼滤波器
if x_posterior is None:
x_posterior = np.array(initial_state)
p_posterior = np.array(np.eye(6))
z = np.array(initial_state)
max_iou = iou_threshold
max_iou_matched = False
# 使用最大IOU来寻找观测值
for i, (x1, y1, x2, y2, cls, conf) in enumerate(tracking_bbox):
xyxy = [x1, y1, x2, y2]
iou = cal_iou(xyxy, xywh_to_xyxy(x_posterior[0:4]))
if iou > max_iou:
target_box = xyxy
max_iou = iou
max_iou_matched = True
if max_iou_matched:
log.info(f'-----------------------------------------------------')
# 如果找到了最大IOU BOX,则认为该框为观测值
xywh = xyxy_to_xywh(target_box)
x1, y1, x2, y2 = target_box
x, y, w, h = xywh
log.info(f'x1:{x1},y1:{y1},x2:{x2},y2:{y2}')
log.info(f'x:{x},y:{y},w:{w},h:{h}')
log.info(f'-----------------------------------------------------')
# 运动控制
while True:
if h >= 700 and 450 < x < 750:
action = Stop(servo=[91, 60])
break
if x <= 450:
action = MoveForward(x=x_speed, z=-z_speed, servo=[91, 60])
elif x >= 750:
action = MoveForward(x=x_speed, z=z_speed, servo=[91, 60])
else:
action = MoveForward(x=x_speed, z=0, servo=[91, 60])
break
# 计算dx,dy
dx = xywh[0] - x_posterior[0]
dy = xywh[1] - x_posterior[1]
z[0:4] = np.array([xywh]).T
z[4::] = np.array([dx, dy])
lost_count = 0
else:
lost_count += 1
if max_iou_matched:
# 进行先验估计
x_prior = np.dot(STATE_TRANSITION_MATRIX, x_posterior)
# 计算状态估计协方差矩阵P
p_prior = np.dot(np.dot(STATE_TRANSITION_MATRIX, p_posterior),
STATE_TRANSITION_MATRIX.T) + PROCESS_NOISE_COVARIANCE_MATRIX
# 计算卡尔曼增益
k1 = np.dot(p_prior, STATE_OBSERVATION_MATRIX.T)
k2 = np.dot(np.dot(STATE_OBSERVATION_MATRIX, p_prior),
STATE_OBSERVATION_MATRIX.T) + OBSERVATION_NOISE_COVARIANCE_MATRIX
kalman_gain = np.dot(k1, np.linalg.inv(k2))
# 后验估计
x_posterior_1 = z - np.dot(STATE_OBSERVATION_MATRIX, x_prior)
x_posterior = x_prior + np.dot(kalman_gain, x_posterior_1)
# 更新状态估计协方差矩阵P
P_posterior_1 = np.eye(6) - np.dot(kalman_gain, STATE_OBSERVATION_MATRIX)
p_posterior = np.dot(P_posterior_1, p_prior)
else:
# 如果IOU匹配失败,此时失去观测值,那么直接使用上一次的最优估计作为先验估计
# 此时直接迭代,不使用卡尔曼滤波
x_posterior = np.dot(STATE_TRANSITION_MATRIX, x_posterior)
action = Stop()
if lost_count > 20:
action = Stop()
locked = False
x_posterior = None
log.info(f'target lost')
if locked:
log.info(f'target_box: {target_box},lock_bbox: {lock_bbox}')
if self.get_count(target_box, lock_bbox):
unlocked_count += 1
log.info(f'Gesture recognition successful, unlocked count incremented by one. '
f'unlocked_count:{unlocked_count}')
else:
unlocked_count = 0
if unlocked_count > 5:
log.info(f'unlock')
locked = False
action = Stop()
x_posterior = None
# 执行动作,如果动作和上一次动作相同,则不执行
if action.speed_setting == last_action.speed_setting:
pass
else:
log.info('*' * 20)
log.info(
f'action: {action}, class_x:{action.x_speed}, class_z:{action.z_speed}, '
f'speed:{action.speed_setting}, servo:{action.servo_angle}')
log.info(
f'last_action: {last_action}, class_x:{last_action.x_speed}, class_z:{last_action.z_speed}, '
f'speed:{last_action.speed_setting}, servo:{last_action.servo_angle}')
log.info('*' * 20)
self.ctrl.execute(action)
last_action = action
父主题: 代码实现