智能小车控制逻辑入口
“main.py”是小车控制代码的主入口,定义了小车控制模式及对应模式的后续调用实现。
首先导入一系列与参数有关的模块,以及启动摄像头的关键模块。
import os from argparse import ArgumentParser from multiprocessing import Process, Queue from src.scenes import Manual, scene_initiator from src.utils import getkey, log, CameraBroadcaster, CAMERA_INFO
定义出可选参数更改小车的控制模式,可选命令行cmd控制,手动控制,声音控制(正在规划中)以及测试等。
def parse_args():
parser = ArgumentParser()
parser.add_argument('--mode', type=str, required=False, default='manual',
choices=['cmd', 'voice', 'manual', 'test'])
return parser.parse_args()
遵循简单性的原则,在程序的主入口需判断模式的设定后,进入到对应模式的实现中,包含摄像头端口的调用及行驶过程中各类消息队列的处理。
if __name__ == '__main__':
args = parse_args()
log.info('start')
msg_queue = Queue(maxsize=1)
camera = CameraBroadcaster(CAMERA_INFO)
shared_memory_name = camera.memory_name
camera_process = Process(target=camera.run)
camera_process.start()
if args.mode == 'manual':
task = Manual(shared_memory_name, CAMERA_INFO, msg_queue)
process = Process(target=task.loop)
process.start()
try:
while True:
key = getkey()
if key == 'esc':
process.join()
camera.stop_sign.value = True
camera_process.join()
break
else:
msg_queue.put(key)
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
os.system('stty sane')
log.info('stopping.')
elif args.mode == 'cmd':
process_list = []
record_map = {}
try:
log.info(f'start reading cmd')
while True:
command = input().strip()
if command == 'stop':
for p in process_list:
p.kill()
log.info(f'start put stop sign')
camera.stop_sign.value = True
camera_process.join()
break
elif command == 'clear':
for p in process_list:
p.kill()
process_list.clear()
log.info(f'clear succ')
continue
elif command == 'Manual':
log.error(f'Does not support switching from cmd mode to manual mode')
continue
log.info(f'building scene {command}')
scene = scene_initiator(command)
log.info(f'{scene}')
if scene is not None:
scene_obj = scene(shared_memory_name, CAMERA_INFO, msg_queue)
process = Process(target=scene_obj.loop)
process.start()
process_list.append(process)
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
for process in process_list:
process.kill()
log.info('stopping.')
elif args.mode == 'voice':
raise NotImplementedError('voice control is not currently supported.')
elif args.mode == 'test':
test_list = []
test1 = Manual(shared_memory_name, CAMERA_INFO, msg_queue)
test_list.append(Process(target=test1.loop))
test2 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue)
test_list.append(Process(target=test2.loop))
test3 = scene_initiator('Tracking')(shared_memory_name, CAMERA_INFO, msg_queue)
test_list.append(Process(target=test3.loop))
for process in test_list:
process.start()
try:
while True:
key = getkey()
if key == 'esc':
for process in test_list:
process.kill()
camera.stop_sign.value = True
camera_process.join()
break
else:
msg_queue.put(key)
except (KeyboardInterrupt, SystemExit):
camera.stop_sign.value = True
camera_process.join()
os.system('stty sane')
log.info('stopping.')
父主题: 代码实现