場合によっては、ある人物が見つかった時に行動を起こし、その人物に集中し続けることが必要な場合があります。
完全にエンゲージされたモードは、誰でも検出して引き合うことができるため、このニーズに適しています。
ALBasicAwareness / HumanTrackedイベントとALBasicAwareness / PeopleLeft(イベント処理チュートリアルを参照)をキャッチして、ロボットが誰かを見つけた(または見失った)ことがわかっています。この例では、アクションをトリガーすることができます。
以下の例では、音声認識を開始または停止し、PeoplePerceptionから追跡された人物に関する情報を取得して出力します。
albasicawareness_human_found.py
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
"""Example: A Simple class to Find Human with BasicAwareness"""
import qi
import argparse
import sys
import time
class HumanTrackedEventWatcher(object):
""" A class to react to HumanTracked and PeopleLeft events """
def __init__(self, app):
"""
Initialisation of qi framework and event detection.
"""
super(HumanTrackedEventWatcher, self).__init__()
try:
app.start()
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
str(args.port) + ".\n")
sys.exit(1)
session = app.session
self.subscribers_list = []
self.is_speech_reco_started = False
self.memory = session.service("ALMemory")
self.speech_reco = session.service("ALSpeechRecognition")
self.basic_awareness = session.service("ALBasicAwareness")
self.motion = session.service("ALMotion")
self.connect_callback("ALBasicAwareness/HumanTracked",
self.on_human_tracked)
self.connect_callback("ALBasicAwareness/HumanLost",
self.on_people_left)
def connect_callback(self, event_name, callback_func):
""" connect a callback for a given event """
subscriber = self.memory.subscriber(event_name)
subscriber.signal.connect(callback_func)
self.subscribers_list.append(subscriber)
def on_human_tracked(self, value):
""" callback for event HumanTracked """
print "Got HumanTracked: detected person with ID:", str(value)
if value >= 0: # found a new person
self.start_speech_reco()
position_human = self.get_people_perception_data(value)
[x, y, z] = position_human
print "The tracked person with ID", value, "is at the position:", \
"x=", x, "/ y=", y, "/ z=", z
def on_people_left(self, value):
""" callback for event PeopleLeft """
print "Got PeopleLeft: lost person", str(value)
self.stop_speech_reco()
def start_speech_reco(self):
""" start ASR when someone's detected in event handler class """
if not self.is_speech_reco_started:
try:
self.speech_reco.setVocabulary(["yes", "no"], False)
except RuntimeError:
print "ASR already started"
print "Starting speech recognition"
self.speech_reco.subscribe("BasicAwareness_Test")
self.is_speech_reco_started = True
def stop_speech_reco(self):
""" stop ASR when someone's detected in event handler class """
if self.is_speech_reco_started:
print "Stopping speech recognition"
self.speech_reco.unsubscribe("BasicAwareness_Test")
self.is_speech_reco_started = False
def get_people_perception_data(self, id_person_tracked):
"""
return information related to the person who has the id
"id_person_tracked" from People Perception
"""
memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
"/PositionInWorldFrame"
return self.memory.getData(memory_key)
def run(self):
"""
this example uses the setEngagementMode, startAwareness and
stopAwareness methods
"""
# start
print "Waiting for the robot to be in wake up position"
self.motion.wakeUp()
print "Starting BasicAwareness with the fully engaged mode"
self.basic_awareness.setEngagementMode("FullyEngaged")
self.basic_awareness.setEnabled(True)
# loop on, wait for events until manual interruption
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print "Interrupted by user, shutting down"
# stop
print "Stopping BasicAwareness"
self.basic_awareness.setEnabled(False)
self.stop_speech_reco()
print "Waiting for the robot to be in rest position"
self.motion.rest()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1",
help="Robot IP address. On robot or Local Naoqi: use \
'127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["HumanTrackedEventWatcher",
"--qi-url=" + connection_url])
human_tracked_event_watcher = HumanTrackedEventWatcher(app)
human_tracked_event_watcher.run()