Pepper NAO Choregraphe Naoqi wiki - ALExpressionWatcher

このページについて

NAOqi Core - 概要 | API | ExpressionObject API


What it does


ALExpressionWatcher module allows you to be notified or query the validity of a condition expression.

This module is useful when you want to create an higher ordered event based on a combination of others events.

Especially, with various time-based operators (~, #, @, bang(), etc...), ALExpressionWatcher lets you create complex events combinations without needing to implement complex asynchronous time-based code.

How it works

  • A condition expression mixes several ALMemory events and/or ALValues using Conditions expression language.
  • Adding a condition expression into ALExpressionWatcher, provides a unique ExpressionObject embedding methods and a also a qi::Signal (see C++ qi API Reference or Python qi API reference).
  • This signal is triggered with the condition expression result value when the expression is valid relative to the selected report mode.
  • ALExpressionWatcher is usable with 3 different report modes:
Report ModesEmit ExpressionObject signal...
REPORT_CHANGEAny time the Value of the condition expression changes.
REPORT_EDGEOnly when the Boolean casted value of the condition expression changes.
REPORT_EDGE_TRUEOnly when the Boolean casted value of the condition expression goes from False to True.

Getting started

StepAction
1.Create a condition expression.
Mix several ALMemory events and/or ALValues using Conditions expression language.
2.Add the condition expression using ALExpressionWatcher::add.
Result:
An ExpressionObject object, with an embedded qi::Signal, returning the condition expression value, triggered according to the chosen report mode.

Example

Suppose you want to be warned when the currently tracked human is not looking at the robot and stands in zone 2 for at least 5 seconds. With ALExpressionWatcher you can do something like:

sample_alexpressionwatcher.py
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: Use the ALExpressionWatcher Module"""

import qi
import argparse
import sys
import time


class onTrackedHuman(object):
    """
    A simple class to react to expression detection events.
    """

    def __init__(self, app):
        """
        Initialisation of qi framework and event detection.
        """
        super(onTrackedHuman, self).__init__()
        app.start()
        session = app.session
        # Connect the event callback.
        self.memory = session.service("ALMemory")
        self.subscriber.signal.connect(self.on_human_tracked)
        # Get the service ALExpressionWatcher.
        self.expression_service = session.service("ALExpressionWatcher")
        # expression setup
        reportMode = 2
        conditionExpression = "('Launchpad/TrackedHumanIsLookingAtRobot' == 0 && 'Launchpad/ZoneOfTrackedHuman' == 2) ~ 5"
        # add condition to ALExpressionWatcher
        expressionObject = self.expression_service.add(conditionExpression, reportMode)
        # connect expressionObject signal to callback
        signalId = expressionObject.signal.connect(self.onTrackedHumanDontCareAboutMe)
        self.expression_service.subscribe("onTrackedHuman")
        self.got_event = False

    def onTrackedHumanDontCareAboutMe(self, expressionValue):
        """
        'ALConditionChecker/Condition/TrackedHumanDontCareAboutMe' event callback
        """
        if expressionValue == []:  # empty value when the face disappears
            self.got_event = False
        elif not self.got_event:  # only speak the first time the event is raised.
            print "Got ALConditionChecker event with value:", expressionValue, "so tracked human didn't look at me and stood in zone 2 since 5 seconds!"

    def run(self):
        """
        Loop on, wait for events until manual interruption.
        """
        print "Starting HumanGreeter"
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print "Interrupted by user, stopping HumanGreeter"
            self.expression_service.unsubscribe("onTrackedHuman")
            #stop
            sys.exit(0)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["onTrackedHuman", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)
    react_to_human_expression = onTrackedHuman(app)
    app.run() # will exit when the connection is over.
    react_to_human_expression.clean_up()