普通文本  |  688行  |  28.06 KB

# -*- coding: utf-8 -*-

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Guide the user to perform gestures. Record and validate the gestures."""

import fcntl
import glob
import os
import subprocess
import sys

import common_util
import firmware_log
import firmware_utils
import fuzzy
import mini_color
import mtb
import touchbotII_robot_wrapper as robot_wrapper
import test_conf as conf
import validators

from firmware_utils import GestureList

sys.path.append('../../bin/input')
import input_device

# Include some constants
from firmware_constants import DEV, GV, MODE, OPTIONS, TFK


class TestFlow:
    """Guide the user to perform gestures. Record and validate the gestures."""

    def __init__(self, device_geometry, device, keyboard, win, parser, output,
                 test_version, board, firmware_version, options):
        self.device_geometry = device_geometry
        self.device = device
        self.device_node = self.device.device_node
        self.keyboard = keyboard
        self.firmware_version = firmware_version
        self.output = output
        self.board = board
        self.test_version = test_version
        self.output.print_report('%s' % test_version)
        self._get_record_cmd()
        self.win = win
        self.parser = parser
        self.packets = None
        self.gesture_file_name = None
        self.prefix_space = self.output.get_prefix_space()
        self.scores = []
        self.mode = options[OPTIONS.MODE]
        self.fngenerator_only = options[OPTIONS.FNGENERATOR]
        self.iterations = options[OPTIONS.ITERATIONS]
        self.replay_dir = options[OPTIONS.REPLAY]
        self.resume_dir = options[OPTIONS.RESUME]
        self.recording = not any([bool(self.replay_dir), bool(self.resume_dir)])
        self.device_type = (DEV.TOUCHSCREEN if options[OPTIONS.TOUCHSCREEN]
                                            else DEV.TOUCHPAD)

        self.robot = robot_wrapper.RobotWrapper(self.board, options)
        self.robot_waiting = False

        self.gv_count = float('infinity')
        gesture_names = self._get_gesture_names()
        order = None
        if self._is_robot_mode():
            order = lambda x: conf.finger_tips_required[x.name]
        self.gesture_list = GestureList(gesture_names).get_gesture_list(order)
        self._get_all_gesture_variations(options[OPTIONS.SIMPLIFIED])

        self.init_flag = False
        self.system_device = self._non_blocking_open(self.device_node)
        self.evdev_device = input_device.InputEvent()
        self.screen_shot = firmware_utils.ScreenShot(self.geometry_str)
        self.mtb_evemu = mtb.MtbEvemu(device)

        self._rename_old_log_and_html_files()
        self._set_static_prompt_messages()
        self.gesture_image_name = None
        self.gesture_continues_flag = False
        self.use_existent_event_file_flag = False

    def __del__(self):
        self.system_device.close()

    def _rename_old_log_and_html_files(self):
        """When in replay or resume mode, rename the old log and html files."""
        if self.replay_dir or self.resume_dir:
            for file_type in ['*.log', '*.html']:
                path_names = os.path.join(self.output.log_dir, file_type)
                for old_path_name in glob.glob(path_names):
                    new_path_name = '.'.join([old_path_name, 'old'])
                    os.rename(old_path_name, new_path_name)

    def _is_robot_mode(self):
        return self.robot.is_robot_action_mode() or self.mode == MODE.ROBOT_SIM

    def _get_gesture_names(self):
        """Determine the gesture names based on the mode."""
        if self.mode == MODE.QUICKSTEP:
            return conf.gesture_names_quickstep
        elif self.mode == MODE.NOISE:
            return conf.gesture_names_noise_extended
        elif self._is_robot_mode():
            # The mode could be MODE.ROBOT or MODE.ROBOT_SIM.
            # The same gesture names list is used in both modes.
            return conf.gesture_names_robot[self.device_type]
        elif self.mode == MODE.MANUAL:
            # Define the manual list which is gesture_names_complete:
            # gesture_names_robot - gesture_names_equipment_required
            manual_set = (set(conf.gesture_names_complete[self.device_type]) -
                          set(conf.gesture_names_robot[self.device_type]))
            return list(manual_set - set(conf.gesture_names_fngenerator_required))

        elif self.mode == MODE.CALIBRATION:
            return conf.gesture_names_calibration
        else:
            # Filter out tests that need a function generator for COMPLETE mode
            # unless they've indicated that they have one
            return [n for n in conf.gesture_names_complete[self.device_type]
                    if (self.fngenerator_only or
                        n not in conf.gesture_names_fngenerator_required)]

    def _non_blocking_open(self, filename):
        """Open the file in non-blocing mode."""
        fd = open(filename)
        fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
        return fd

    def _non_blocking_read(self, dev, fd):
        """Non-blocking read on fd."""
        try:
            dev.read(fd)
            event = (dev.tv_sec, dev.tv_usec, dev.type, dev.code, dev.value)
        except Exception, e:
            event = None
        return event

    def _reopen_system_device(self):
        """Close the device and open a new one."""
        self.system_device.close()
        self.system_device = open(self.device_node)
        self.system_device = self._non_blocking_open(self.device_node)

    def _set_static_prompt_messages(self):
        """Set static prompt messages."""
        # Prompt for next gesture.
        self._prompt_next = (
                "Press SPACE to save this file and go to next test,\n"
                "      'm'   to save this file and record again,\n"
                "      'd'   to delete this file and try again,\n"
                "      'x'   to discard this file and exit.")

        # Prompt to see test result through timeout callback.
        self._prompt_result = (
                "Perform the gesture now.\n"
                "See the test result on the right after finger lifted.\n"
                "Or press 'x' to exit.")

    def _get_prompt_abnormal_gestures(self, warn_msg):
        """Prompt for next gesture."""
        prompt = '\n'.join(
                ["It is very likely that you perform a WRONG gesture!",
                 warn_msg,
                 "Press 'd'   to delete this file and try again (recommended),",
                 "      SPACE to save this file if you are sure it's correct,",
                 "      'x'   to discard this file and exit."])
        return prompt

    def _get_prompt_no_data(self):
        """Prompt to remind user of performing gestures."""
        prompt = ("You need to perform the specified gestures "
                  "before pressing SPACE.\n")
        return prompt + self._prompt_result

    def _get_record_cmd(self):
        """Get the device event record command."""
        # Run mtplot with settings to disable clearing the display if the robot
        # clicks the pad, and adding a visible click indicator in the output
        self.record_program = 'mtplot -s1 -c0 -m0'
        if not common_util.program_exists(self.record_program):
            msg = 'Error: the program "%s" does not exist in $PATH.'
            self.output.print_report(msg % self.record_program)
            exit(1)

        display_name = firmware_utils.get_display_name()
        self.geometry_str = '%dx%d+%d+%d' % self.device_geometry
        format_str = '%s %s -d %s -g %s'
        self.record_cmd = format_str % (self.record_program,
                                        self.device_node,
                                        display_name,
                                        self.geometry_str)
        self.output.print_report('Record program: %s' % self.record_cmd)

    def _span_seq(self, seq1, seq2):
        """Span sequence seq1 over sequence seq2.

        E.g., seq1 = (('a', 'b'), 'c')
              seq2 = ('1', ('2', '3'))
              res = (('a', 'b', '1'), ('a', 'b', '2', '3'),
                     ('c', '1'), ('c', '2', '3'))
        E.g., seq1 = ('a', 'b')
              seq2 = ('1', '2', '3')
              res  = (('a', '1'), ('a', '2'), ('a', '3'),
                      ('b', '1'), ('b', '2'), ('b', '3'))
        E.g., seq1 = (('a', 'b'), ('c', 'd'))
              seq2 = ('1', '2', '3')
              res  = (('a', 'b', '1'), ('a', 'b', '2'), ('a', 'b', '3'),
                      ('c', 'd', '1'), ('c', 'd', '2'), ('c', 'd', '3'))
        """
        to_list = lambda s: list(s) if isinstance(s, tuple) else [s]
        return tuple(tuple(to_list(s1) + to_list(s2)) for s1 in seq1
                                                      for s2 in seq2)

    def span_variations(self, seq):
        """Span the variations of a gesture."""
        if seq is None:
            return (None,)
        elif isinstance(seq[0], tuple):
            return reduce(self._span_seq, seq)
        else:
            return seq

    def _stop(self):
        """Terminate the recording process."""
        self.record_proc.poll()
        # Terminate the process only when it was not terminated yet.
        if self.record_proc.returncode is None:
            self.record_proc.terminate()
            self.record_proc.wait()
        self.output.print_window('')

    def _get_gesture_image_name(self):
        """Get the gesture file base name without file extension."""
        filepath = os.path.splitext(self.gesture_file_name)[0]
        self.gesture_image_name = filepath + '.png'
        return filepath

    def _close_gesture_file(self):
        """Close the gesture file."""
        if self.gesture_file.closed:
            return

        filename = self.gesture_file.name
        self.gesture_file.close()

        # Strip off the header of the gesture file.
        #
        # Input driver version is 1.0.1
        # Input device ID: bus 0x18 vendor 0x0 product 0x0 version 0x0
        # Input device name: "Atmel maXTouch Touchpad"
        # ...
        # Testing ... (interrupt to exit)
        # Event: time 519.855, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID),
        #                                       value 884
        #
        tmp_filename = filename + '.tmp'
        os.rename(filename, tmp_filename)
        with open(tmp_filename) as src_f:
            with open(filename, 'w') as dst_f:
                for line in src_f:
                    if line.startswith('Event:'):
                        dst_f.write(line)
        os.remove(tmp_filename)

    def _stop_record_and_post_image(self):
        """Terminate the recording process."""
        if self.record_new_file:
            self._close_gesture_file()
            self.screen_shot.dump_root(self._get_gesture_image_name())
            self.record_proc.terminate()
            self.record_proc.wait()
        else:
            self._get_gesture_image_name()
        self.win.set_image(self.gesture_image_name)

    def _create_prompt(self, test, variation):
        """Create a color prompt."""
        prompt = test.prompt
        if isinstance(variation, tuple):
            subprompt = reduce(lambda s1, s2: s1 + s2,
                               tuple(test.subprompt[s] for s in variation))
        elif variation is None or test.subprompt is None:
            subprompt = None
        else:
            subprompt = test.subprompt[variation]

        if subprompt is None:
            color_prompt = prompt
            monochrome_prompt = prompt
        else:
            color_prompt = mini_color.color_string(prompt, '{', '}', 'green')
            color_prompt = color_prompt.format(*subprompt)
            monochrome_prompt = prompt.format(*subprompt)

        color_msg_format = mini_color.color_string('\n<%s>:\n%s%s', '<', '>',
                                                   'blue')
        color_msg = color_msg_format % (test.name, self.prefix_space,
                                        color_prompt)
        msg = '%s: %s' % (test.name, monochrome_prompt)

        glog = firmware_log.GestureLog()
        glog.name = test.name
        glog.variation = variation
        glog.prompt = monochrome_prompt

        return (msg, color_msg, glog)

    def _choice_exit(self):
        """Procedure to exit."""
        self._stop()
        if os.path.exists(self.gesture_file_name):
            os.remove(self.gesture_file_name)
            self.output.print_report(self.deleted_msg)

    def _stop_record_and_rm_file(self):
        """Stop recording process and remove the current gesture file."""
        self._stop()
        if os.path.exists(self.gesture_file_name):
            os.remove(self.gesture_file_name)
            self.output.print_report(self.deleted_msg)

    def _create_gesture_file_name(self, gesture, variation):
        """Create the gesture file name based on its variation.

        Examples of different levels of file naming:
            Primary name:
                pinch_to_zoom.zoom_in-lumpy-fw_11.27
            Root name:
                pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510
            Base name:
                pinch_to_zoom.zoom_in-lumpy-fw_11.27-manual-20130221_050510.dat
        """
        if variation is None:
            gesture_name = gesture.name
        else:
            if type(variation) is tuple:
                name_list = [gesture.name,] + list(variation)
            else:
                name_list = [gesture.name, variation]
            gesture_name = '.'.join(name_list)

        self.primary_name = conf.filename.sep.join([
                gesture_name,
                self.board,
                conf.fw_prefix + self.firmware_version])
        root_name = conf.filename.sep.join([
                self.primary_name,
                self.mode,
                firmware_utils.get_current_time_str()])
        basename = '.'.join([root_name, conf.filename.ext])
        return basename

    def _add_scores(self, new_scores):
        """Add the new scores of a single gesture to the scores list."""
        if new_scores is not None:
            self.scores += new_scores

    def _final_scores(self, scores):
        """Print the final score."""
        # Note: conf.score_aggregator uses a function in fuzzy module.
        final_score = eval(conf.score_aggregator)(scores)
        self.output.print_report('\nFinal score: %s\n' % str(final_score))

    def _robot_action(self):
        """Control the robot to perform the action."""
        if self._is_robot_mode() or self.robot.is_manual_noise_test_mode():
            self.robot.configure_noise(self.gesture, self.variation)

        if self._is_robot_mode():
            self.robot.control(self.gesture, self.variation)
            # Once the script terminates start a timeout to clean up if one
            # hasn't already been set to keep the test suite from hanging.
            if not self.gesture_begins_flag:
                self.win.register_timeout_add(self.gesture_timeout_callback,
                                              self.gesture.timeout)

    def _handle_user_choice_save_after_parsing(self, next_gesture=True):
        """Handle user choice for saving the parsed gesture file."""
        self.output.print_window('')
        if self.use_existent_event_file_flag or self.recording:
            if self.saved_msg:
                self.output.print_report(self.saved_msg)
            if self.new_scores:
                self._add_scores(self.new_scores)
            self.output.report_html.insert_image(self.gesture_image_name)
            self.output.report_html.flush()
        # After flushing to report_html, reset the gesture_image_name so that
        # it will not be reused by next gesture variation accidentally.
        self.gesture_image_name = None

        if self._pre_setup_this_gesture_variation(next_gesture=next_gesture):
            # There are more gestures.
            self._setup_this_gesture_variation()
            self._robot_action()
        else:
            # No more gesture.
            self._final_scores(self.scores)
            self.output.stop()
            self.output.report_html.stop()
            self.win.stop()
        self.packets = None

    def _handle_user_choice_discard_after_parsing(self):
        """Handle user choice for discarding the parsed gesture file."""
        self.output.print_window('')
        self._setup_this_gesture_variation()
        self._robot_action()
        self.packets = None

    def _handle_user_choice_exit_after_parsing(self):
        """Handle user choice to exit after the gesture file is parsed."""
        self._stop_record_and_rm_file()
        self.output.stop()
        self.output.report_html.stop()
        self.win.stop()

    def check_for_wrong_number_of_fingers(self, details):
        flag_found = False
        try:
            position = details.index('CountTrackingIDValidator')
        except ValueError as e:
            return None

        # An example of the count of tracking IDs:
        #     '    count of trackid IDs: 1'
        number_tracking_ids = int(details[position + 1].split()[-1])
        # An example of the criteria_str looks like:
        #     '    criteria_str: == 2'
        criteria = int(details[position + 2].split()[-1])
        if number_tracking_ids < criteria:
            print '  CountTrackingIDValidator: '
            print '  number_tracking_ids: ', number_tracking_ids
            print '  criteria: ', criteria
            print '  number_tracking_ids should be larger!'
            msg = 'Number of Tracking IDs should be %d instead of %d'
            return msg % (criteria, number_tracking_ids)
        return None

    def _empty_packets_is_legal_result(self):
        return ('tap' in self.gesture.name and self._is_robot_mode())

    def _handle_user_choice_validate_before_parsing(self):
        """Handle user choice for validating before gesture file is parsed."""
        # Parse the device events. Make sure there are events.
        self.packets = self.parser.parse_file(self.gesture_file_name)
        if self.packets or self._empty_packets_is_legal_result():
            # Validate this gesture and get the results.
            (self.new_scores, msg_list, vlogs) = validators.validate(
                    self.packets, self.gesture, self.variation)

            # If the number of tracking IDs is less than the expected value,
            # the user probably made a wrong gesture.
            error = self.check_for_wrong_number_of_fingers(msg_list)
            if error:
                prompt = self._get_prompt_abnormal_gestures(error)
                color = 'red'
            else:
                prompt = self._prompt_next
                color = 'black'

            self.output.print_window(msg_list)
            self.output.buffer_report(msg_list)
            self.output.report_html.insert_validator_logs(vlogs)
            self.win.set_prompt(prompt, color=color)
            print prompt
            self._stop_record_and_post_image()
        else:
            self.win.set_prompt(self._get_prompt_no_data(), color='red')

    def _handle_user_choice_exit_before_parsing(self):
        """Handle user choice to exit before the gesture file is parsed."""
        self._close_gesture_file()
        self._handle_user_choice_exit_after_parsing()

    def _is_parsing_gesture_file_done(self):
        """Is parsing the gesture file done?"""
        return self.packets is not None

    def _is_arrow_key(self, choice):
        """Is this an arrow key?"""
        return (choice in TFK.ARROW_KEY_LIST)

    def user_choice_callback(self, fd, condition):
        """A callback to handle the key pressed by the user.

        This is the primary GUI event-driven method handling the user input.
        """
        choice = self.keyboard.get_key_press_event(fd)
        if choice:
            self._handle_keyboard_event(choice)
        return True

    def _handle_keyboard_event(self, choice):
        """Handle the keyboard event."""
        if self._is_arrow_key(choice):
            self.win.scroll(choice)
        elif self.robot_waiting:
            # The user wants the robot to start its action.
            if choice in (TFK.SAVE, TFK.SAVE2):
                self.robot_waiting = False
                self._robot_action()
            # The user wants to exit.
            elif choice == TFK.EXIT:
                self._handle_user_choice_exit_after_parsing()
        elif self._is_parsing_gesture_file_done():
            # Save this gesture file and go to next gesture.
            if choice in (TFK.SAVE, TFK.SAVE2):
                self._handle_user_choice_save_after_parsing()
            # Save this file and perform the same gesture again.
            elif choice == TFK.MORE:
                self._handle_user_choice_save_after_parsing(next_gesture=False)
            # Discard this file and perform the gesture again.
            elif choice == TFK.DISCARD:
                self._handle_user_choice_discard_after_parsing()
            # The user wants to exit.
            elif choice == TFK.EXIT:
                self._handle_user_choice_exit_after_parsing()
            # The user presses any wrong key.
            else:
                self.win.set_prompt(self._prompt_next, color='red')
        else:
            if choice == TFK.EXIT:
                self._handle_user_choice_exit_before_parsing()
            # The user presses any wrong key.
            else:
                self.win.set_prompt(self._prompt_result, color='red')

    def _get_all_gesture_variations(self, simplified):
        """Get all variations for all gestures."""
        gesture_variations_list = []
        self.variations_dict = {}
        for gesture in self.gesture_list:
            variations_list = []
            variations = self.span_variations(gesture.variations)
            for variation in variations:
                gesture_variations_list.append((gesture, variation))
                variations_list.append(variation)
                if simplified:
                    break
            self.variations_dict[gesture.name] = variations_list
        self.gesture_variations = iter(gesture_variations_list)

    def gesture_timeout_callback(self):
        """A callback watching whether a gesture has timed out."""
        if self.replay_dir:
            # There are event files to replay for this gesture variation.
            if self.use_existent_event_file_flag:
                self._handle_user_choice_validate_before_parsing()
            self._handle_user_choice_save_after_parsing(next_gesture=True)
            return False

        # A gesture is stopped only when two conditions are met simultaneously:
        # (1) there are no reported packets for a timeout interval, and
        # (2) the number of tracking IDs is 0.
        elif (self.gesture_continues_flag or
            not self.mtb_evemu.all_fingers_leaving()):
            self.gesture_continues_flag = False
            return True

        else:
            self._handle_user_choice_validate_before_parsing()
            self.win.remove_event_source(self.gesture_file_watch_tag)
            if self._is_robot_mode():
                self._handle_keyboard_event(TFK.SAVE)
            return False

    def gesture_file_watch_callback(self, fd, condition, evdev_device):
        """A callback to watch the device input."""
        # Read the device node continuously until end
        event = True
        while event:
            event = self._non_blocking_read(evdev_device, fd)
            if event:
                self.mtb_evemu.process_event(event)

        self.gesture_continues_flag = True
        if (not self.gesture_begins_flag):
            self.gesture_begins_flag = True
            self.win.register_timeout_add(self.gesture_timeout_callback,
                                          self.gesture.timeout)
        return True

    def init_gesture_setup_callback(self, widget, event):
        """A callback to set up environment before a user starts a gesture."""
        if not self.init_flag:
            self.init_flag = True
            self._pre_setup_this_gesture_variation()
            self._setup_this_gesture_variation()
            self._robot_action()

    def _get_existent_event_files(self):
        """Get the existent event files that starts with the primary_name."""
        primary_pathnames = os.path.join(self.output.log_dir,
                                         self.primary_name + '*.dat')
        self.primary_gesture_files = glob.glob(primary_pathnames)
        # Reverse sorting the file list so that we could pop from the tail.
        self.primary_gesture_files.sort()
        self.primary_gesture_files.reverse()

    def _use_existent_event_file(self):
        """If the replay flag is set in the command line, and there exists a
        file(s) with the same primary name, then use the existent file(s)
        instead of recording a new one.
        """
        if self.primary_gesture_files:
            self.gesture_file_name = self.primary_gesture_files.pop()
            return True
        return False

    def _pre_setup_this_gesture_variation(self, next_gesture=True):
        """Get gesture, variation, filename, prompt, etc."""
        next_gesture_first_time = False
        if next_gesture:
            if self.gv_count < self.iterations:
                self.gv_count += 1
            else:
                self.gv_count = 1
                gesture_variation = next(self.gesture_variations, None)
                if gesture_variation is None:
                    return False
                self.gesture, self.variation = gesture_variation
                next_gesture_first_time = True

        basename = self._create_gesture_file_name(self.gesture, self.variation)
        if next_gesture_first_time:
            self._get_existent_event_files()

        if self.replay_dir or self.resume_dir:
            self.use_existent_event_file_flag = self._use_existent_event_file()

        if ((not self.replay_dir and not self.resume_dir) or
                (self.resume_dir and not self.use_existent_event_file_flag)):
            self.gesture_file_name = os.path.join(self.output.log_dir, basename)
            self.saved_msg = '(saved: %s)\n' % self.gesture_file_name
            self.deleted_msg = '(deleted: %s)\n' % self.gesture_file_name
        else:
            self.saved_msg = None
            self.deleted_msg = None
        self.new_scores = None

        if self.robot.is_robot_action_mode() or self.robot.is_manual_noise_test_mode():
            self.robot.turn_off_noise()

        (msg, color_msg, glog) = self._create_prompt(self.gesture,
                                                     self.variation)
        self.win.set_gesture_name(msg)
        self.output.report_html.insert_gesture_log(glog)
        print color_msg
        self.output.print_report(color_msg)
        return True

    def _setup_this_gesture_variation(self):
        """Set up the recording process or use an existent event data file."""
        if self.replay_dir:
            self.record_new_file = False
            self.win.register_timeout_add(self.gesture_timeout_callback, 0)
            return

        if self.resume_dir and self.use_existent_event_file_flag:
            self.record_new_file = False
            self._handle_user_choice_validate_before_parsing()
            self._handle_keyboard_event(TFK.SAVE)
            return

        # Initiate the MtbSanityValidator. Note that this should be done each
        # time just before recording the gesture file since it requires a
        # snapshot of the input device before any finger touching the device.
        self.gesture.mtb_sanity_validator = validators.MtbSanityValidator()

        # Now, we will record a new gesture event file.
        # Fork a new process for mtplot. Add io watch for the gesture file.
        self.record_new_file = True
        self.gesture_file = open(self.gesture_file_name, 'w')
        self.record_proc = subprocess.Popen(self.record_cmd.split(),
                                            stdout=self.gesture_file)

        # Watch if data come in to the monitored file.
        self.gesture_begins_flag = False
        self._reopen_system_device()
        self.gesture_file_watch_tag = self.win.register_io_add_watch(
                self.gesture_file_watch_callback, self.system_device,
                self.evdev_device)