机械狗控制逻辑入口
“main.py”是机械狗控制代码的主入口,定义了控制模式及对应模式的后续调用实现。
首先导入一系列与参数有关的模块。
import os from argparse import ArgumentParser from multiprocessing import Process, Queue from src.actions import Stop from src.scenes import Manual, scene_initiator from src.utils import getkey, log, CameraBroadcaster, SystemInfo, Controller, get_port, STM32_NAME, ESP32_NAME
定义出可选参数更改小车的控制模式,可选命令行cmd控制(预留接口),手动控制,简易模式(自动巡线行走),声音控制(预留接口)、手动控制行走等。
def parse_args():
parser = ArgumentParser()
parser.add_argument('--mode', type=str, required=False, default='manual',
choices=['cmd', 'voice', 'tracking', 'easy', 'manual'])
return parser.parse_args()
遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中。
if __name__ == '__main__':
# 获取STM32和ESP32设备的端口
stm32_port = get_port(STM32_NAME)
esp32_port = get_port(ESP32_NAME)
# 使用获得的端口创建SystemInfo实例
system_info = SystemInfo(stm32_port=stm32_port, esp32_port=esp32_port)
# 初始化Controller
ctrl = Controller()
# 解析命令行参数
args = parse_args()
log.info('start')
# 创建一个最大大小为1的消息队列
msg_queue = Queue(maxsize=1)
camera = CameraBroadcaster(system_info)
shared_memory_name = camera.memory_name
# 在后台启动摄像头进程
camera_process = Process(target=camera.run)
camera_process.start()
# 检查选择的模式并执行相应的操作
if args.mode == 'manual':
task = Manual(shared_memory_name, system_info, msg_queue)
process = Process(target=task.loop)
process.start()
try:
while True:
key = getkey()
if key == 'esc':
process.join()
camera.stop_sign.value = True
camera_process.join()
break
else:
msg_queue.put(key)
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
os.system('stty sane')
log.info('stopping.')
elif args.mode == 'cmd':
process_list = []
record_map = {}
try:
log.info(f'start reading cmd')
while True:
command = input().strip()
if command == 'stop':
# 如果命令是'stop',终止所有进程,包括摄像头,并退出循环
for p in process_list:
p.kill()
log.info(f'start put stop sign')
ctrl.execute(Stop())
camera.stop_sign.value = True
camera_process.join()
break
elif command == 'clear':
# 如果命令是'clear',清空进程列表,重置控制器,并继续
for p in process_list:
p.kill()
process_list.clear()
ctrl = Controller()
ctrl.execute(Stop())
log.info(f'clear succ')
continue
elif command == 'Manual':
log.error(f'Does not support switching from cmd mode to manual mode')
continue
# 根据命令构建场景,并在单独的进程中启动它
log.info(f'building scene {command}')
scene = scene_initiator(command)
log.info(f'{scene}')
if scene is not None:
scene_obj = scene(shared_memory_name, system_info, msg_queue)
process = Process(target=scene_obj.loop)
process.start()
process_list.append(process)
# 处理键盘中断或系统退出,停止摄像头进程和所有场景进程,并记录事件
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
for process in process_list:
process.kill()
log.info('stopping.')
elif args.mode == 'voice':
raise NotImplementedError('voice control is not currently supported.')
elif args.mode == 'easy':
process_list = []
task2 = scene_initiator('LF')(shared_memory_name, system_info, msg_queue)
process_list.append(Process(target=task2.loop))
for process in process_list:
process.start()
try:
while True:
key = getkey()
if key == 'esc':
for process in process_list:
process.kill()
camera.stop_sign.value = True
camera_process.join()
break
else:
# 将按键放入消息队列供场景处理
msg_queue.put(key)
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
os.system('stty sane')
log.info('stopping.')
elif args.mode == 'tracking':
# 对于跟踪模式,启动对应场景(Tracking)的单独进程
process_list = []
task1 = scene_initiator('Tracking')(shared_memory_name, system_info, msg_queue)
process_list.append(Process(target=task1.loop))
for process in process_list:
process.start()
try:
while True:
key = getkey()
if key == 'esc':
for process in process_list:
process.kill()
camera.stop_sign.value = True
camera_process.join()
break
else:
msg_queue.put(key)
except (KeyboardInterrupt, SystemExit):
# 处理键盘中断或系统退出,停止摄像头进程,并记录事件
camera.stop_sign.value = True
camera_process.join()
os.system('stty sane')
log.info('stopping.')
父主题: 代码实现