普通文本  |  684行  |  23.96 KB

#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# Description:
#
# Class for handling linux 'evdev' input devices.
#
# Provides evtest-like functionality if run from the command line:
# $ input_device.py -d /dev/input/event6

""" Read properties and events of a linux input device. """

import array
import copy
import fcntl
import os.path
import select
import struct
import time

from collections import OrderedDict

from linux_input import *


class Valuator:
    """ A Valuator just stores a value """
    def __init__(self):
        self.value = 0

class SwValuator(Valuator):
    """ A Valuator used for EV_SW (switch) events """
    def __init__(self, value):
        self.value = value

class AbsValuator(Valuator):
    """
    An AbsValuator, used for EV_ABS events stores a value as well as other
    properties of the corresponding absolute axis.
    """
    def __init__(self, value, min_value, max_value, fuzz, flat, resolution):
        self.value = value
        self.min = min_value
        self.max = max_value
        self.fuzz = fuzz
        self.flat = flat
        self.resolution = resolution


class InputEvent:
    """
    Linux evdev input event

    An input event has the following fields which can be accessed as public
    properties of this class:
        tv_sec
        tv_usec
        type
        code
        value
    """
    def __init__(self, tv_sec=0, tv_usec=0, type=0, code=0, value=0):
        self.format = input_event_t
        self.format_size = struct.calcsize(self.format)
        (self.tv_sec, self.tv_usec, self.type, self.code,
         self.value) = (tv_sec, tv_usec, type, code, value)

    def read(self, stream):
        """ Read an input event from the provided stream and unpack it. """
        packed = stream.read(self.format_size)
        (self.tv_sec, self.tv_usec, self.type, self.code,
         self.value) = struct.unpack(self.format, packed)

    def write(self, stream):
        """ Pack an input event and write it to the provided stream. """
        packed = struct.pack(self.format, self.tv_sec, self.tv_usec, self.type,
                             self.code, self.value)
        stream.write(packed)
        stream.flush()

    def __str__(self):
        t = EV_TYPES.get(self.type, self.type)
        if self.type in EV_STRINGS:
            c = EV_STRINGS[self.type].get(self.code, self.code)
        else:
            c = self.code
        return ('%d.%06d: %s[%s] = %d' %
                (self.tv_sec, self.tv_usec, t, c, self.value))


class InputDevice:
    """
    Linux evdev input device

    A linux kernel "evdev" device sends a stream of "input events".
    These events are grouped together into "input reports", which is a set of
    input events ending in a single EV_SYN/SYN_REPORT event.

    Each input event is tagged with a type and a code.
    A given input device supports a subset of the possible types, and for
    each type it supports a subset of the possible codes for that type.

    The device maintains a "valuator" for each supported type/code pairs.
    There are two types of "valuators":
       Normal valuators represent just a value.
       Absolute valuators are only for EV_ABS events. They have more fields:
           value, minimum, maximum, resolution, fuzz, flatness
    Note: Relative and Absolute "Valuators" are also often called relative
          and absolute axis, respectively.

    The evdev protocol is stateful.  Input events are only sent when the values
    of a valuator actually changes.  This dramatically reduces the stream of
    events emenating from the kernel.

    Multitouch devices are a special case.  There are two types of multitouch
    devices defined in the kernel:
        Multitouch type "A" (MT-A) devices:
            In each input report, the device sends an unordered list of all
            active contacts.  The data for each active contact is separated
            in the input report by an EV_SYN/SYN_MT_REPORT event.
            Thus, the MT-A contact event stream is not stateful.
            Note: MT-A is not currently supported by this class.

        Multitouch type "B" (MT-B) devices:
            The device maintains a list of slots, where each slot contains a
            single contact.  In each input report, the device only sends
            information about the slots that have changed.
            Thus, the MT-B contact event stream is stateful.
            When reporting multiple slots, the EV_ABS/MT_SLOT valuator is used
            to indicate the 'current' slot for which subsequent EV_ABS/ABS_MT_*
            valuator events apply.
            An inactive slot has EV_ABS/ABS_MT_TRACKING_ID == -1
            Active slots have EV_ABS/ABS_MT_TRACKING_ID >= 0

    Besides maintaining the set of supported ABS_MT valuators in the supported
    valuator list, a array of slots is also maintained.  Each slot has its own
    unique copy of just the supported ABS_MT valuators.  This represents the
    current state of that slot.
    """

    def __init__(self, path, ev_syn_cb=None):
        """
        Constructor opens the device file and probes its properties.

        Note: The device file is left open when the constructor exits.
        """
        self.path = path
        self.ev_syn_cb = ev_syn_cb
        self.events = {}     # dict { ev_type : dict { ev_code : Valuator } }
        self.mt_slots = []   # [ dict { mt_code : AbsValuator } ] * |MT-B slots|

        # Open the device node, and use ioctls to probe its properties
        self.f = None
        self.f = open(path, 'rb+', buffering=0)
        self._ioctl_version()
        self._ioctl_id()
        self._ioctl_name()
        for t in self._ioctl_types():
            self._ioctl_codes(t)
        self._setup_mt_slots()

    def __del__(self):
        """
        Deconstructor closes the device file, if it is open.
        """
        if self.f and not self.f.closed:
            self.f.close()

    def process_event(self, ev):
        """
        Processes an incoming input event.

        Returns True for EV_SYN/SYN_REPORT events to indicate that a complete
        input report has been received.

        Returns False for other events.

        Events not supported by this device are silently ignored.

        For MT events, updates the slot valuator value for the current slot.
        If current slot is the 'primary' slot, also updates the events entry.

        For all other events, updates the corresponding valuator value.
        """
        if ev.type == EV_SYN and ev.code == SYN_REPORT:
            return True
        elif ev.type not in self.events or ev.code not in self.events[ev.type]:
            return False
        elif self.is_mt_b() and ev.type == EV_ABS and ev.code in ABS_MT_RANGE:
            # TODO: Handle MT-A
            slot = self._get_current_slot()
            slot[ev.code].value = ev.value
            # if the current slot is the "primary" slot,
            # update the events[] entry, too.
            if slot == self._get_mt_primary_slot():
                self.events[ev.type][ev.code].value = ev.value
        else:
            self.events[ev.type][ev.code].value = ev.value
        return False

    def _ioctl_version(self):
        """ Queries device file for version information. """
        # Version is a 32-bit integer, which encodes 8-bit major version,
        # 8-bit minor version and 16-bit revision.
        version = array.array('I', [0])
        fcntl.ioctl(self.f, EVIOCGVERSION, version, 1)
        self.version = (version[0] >> 16, (version[0] >> 8) & 0xff,
                        version[0] & 0xff)

    def _ioctl_id(self):
        """ Queries device file for input device identification. """
        # struct input_id is 4 __u16
        gid = array.array('H', [0] * 4)
        fcntl.ioctl(self.f, EVIOCGID, gid, 1)
        self.id_bus = gid[ID_BUS]
        self.id_vendor = gid[ID_VENDOR]
        self.id_product = gid[ID_PRODUCT]
        self.id_version = gid[ID_VERSION]

    def _ioctl_name(self):
        """ Queries device file for the device name. """
        # Device name is a C-string up to 255 bytes in length.
        name_len = 255
        name = array.array('B', [0] * name_len)
        name_len = fcntl.ioctl(self.f, EVIOCGNAME(name_len), name, 1)
        self.name = name[0:name_len-1].tostring()

    def _ioctl_get_switch(self, sw):
        """
        Queries device file for current value of all switches and returns
        a boolean indicating whether the switch sw is set.
        """
        size = SW_CNT / 8    # Buffer size of one __u16
        buf = array.array('H', [0])
        fcntl.ioctl(self.f, EVIOCGSW(size), buf)
        return SwValuator(((buf[0] >> sw) & 0x01) == 1)

    def _ioctl_absinfo(self, axis):
        """
        Queries device file for absinfo structure for given absolute axis.
        """
        # struct input_absinfo is 6 __s32
        a = array.array('i', [0] * 6)
        fcntl.ioctl(self.f, EVIOCGABS(axis), a, 1)
        return AbsValuator(a[0], a[1], a[2], a[3], a[4], a[5])

    def _ioctl_codes(self, ev_type):
        """
        Queries device file for supported event codes for given event type.
        """
        self.events[ev_type] = {}
        if ev_type not in EV_SIZES:
            return

        size = EV_SIZES[ev_type] / 8    # Convert bits to bytes
        ev_code = array.array('B', [0] * size)
        try:
            count = fcntl.ioctl(self.f, EVIOCGBIT(ev_type, size), ev_code, 1)
            for c in range(count * 8):
                if test_bit(c, ev_code):
                    if ev_type == EV_ABS:
                        self.events[ev_type][c] = self._ioctl_absinfo(c)
                    elif ev_type == EV_SW:
                        self.events[ev_type][c] = self._ioctl_get_switch(c)
                    else:
                        self.events[ev_type][c] = Valuator()
        except IOError as (errno, strerror):
            # Errno 22 signifies that this event type has no event codes.
            if errno != 22:
                raise

    def _ioctl_types(self):
        """ Queries device file for supported event types. """
        ev_types = array.array('B', [0] * (EV_CNT / 8))
        fcntl.ioctl(self.f, EVIOCGBIT(EV_SYN, EV_CNT / 8), ev_types, 1)
        types  = []
        for t in range(EV_CNT):
            if test_bit(t, ev_types):
                types.append(t)
        return types

    def _convert_slot_index_to_slot_id(self, index):
        """ Convert a slot index in self.mt_slots to its slot id. """
        return self.abs_mt_slot.min + index

    def _ioctl_mt_slots(self):
        """Query mt slots values using ioctl.

        The ioctl buffer argument should be binary equivalent to
        struct input_mt_request_layout {
            __u32 code;
            __s32 values[num_slots];

        Note that the slots information returned by EVIOCGMTSLOTS
        corresponds to the slot ids ranging from abs_mt_slot.min to
        abs_mt_slot.max which is not necessarily the same as the
        slot indexes ranging from 0 to num_slots - 1 in self.mt_slots.
        We need to map between the slot index and the slot id correctly.
        };
        """
        # Iterate through the absolute mt events that are supported.
        for c in range(ABS_MT_FIRST, ABS_MT_LAST):
            if c not in self.events[EV_ABS]:
                continue
            # Sync with evdev kernel driver for the specified code c.
            mt_slot_info = array.array('i', [c] + [0] * self.num_slots)
            mt_slot_info_len = (self.num_slots + 1) * mt_slot_info.itemsize
            fcntl.ioctl(self.f, EVIOCGMTSLOTS(mt_slot_info_len), mt_slot_info)
            values = mt_slot_info[1:]
            for slot_index in range(self.num_slots):
                slot_id = self._convert_slot_index_to_slot_id(slot_index)
                self.mt_slots[slot_index][c].value = values[slot_id]

    def _setup_mt_slots(self):
        """
        Sets up the device's mt_slots array.

        Each element of the mt_slots array is initialized as a deepcopy of a
        dict containing all of the MT valuators from the events dict.
        """
        # TODO(djkurtz): MT-A
        if not self.is_mt_b():
            return
        ev_abs = self.events[EV_ABS]
        # Create dict containing just the MT valuators
        mt_abs_info = dict((axis, ev_abs[axis])
                           for axis in ev_abs
                           if axis in ABS_MT_RANGE)

        # Initialize TRACKING_ID to -1
        mt_abs_info[ABS_MT_TRACKING_ID].value = -1

        # Make a copy of mt_abs_info for each MT slot
        self.abs_mt_slot = ev_abs[ABS_MT_SLOT]
        self.num_slots = self.abs_mt_slot.max - self.abs_mt_slot.min + 1
        for s in range(self.num_slots):
            self.mt_slots.append(copy.deepcopy(mt_abs_info))

        self._ioctl_mt_slots()

    def get_current_slot_id(self):
        """
        Return the current slot id.
        """
        if not self.is_mt_b():
            return None
        return self.events[EV_ABS][ABS_MT_SLOT].value

    def _get_current_slot(self):
        """
        Returns the current slot, as indicated by the last ABS_MT_SLOT event.
        """
        current_slot_id = self.get_current_slot_id()
        if current_slot_id is None:
            return None
        return self.mt_slots[current_slot_id]

    def _get_tid(self, slot):
        """ Returns the tracking_id for the given MT slot. """
        return slot[ABS_MT_TRACKING_ID].value

    def _get_mt_valid_slots(self):
        """
        Returns a list of valid slots.

        A valid slot is a slot whose tracking_id != -1.
        """
        return [s for s in self.mt_slots if self._get_tid(s) != -1]

    def _get_mt_primary_slot(self):
        """
        Returns the "primary" MT-B slot.

        The "primary" MT-B slot is arbitrarily chosen as the slot with lowest
        tracking_id (> -1).  It is used to make an MT-B device look like
        single-touch (ST) device.
        """
        slot = None
        for s in self.mt_slots:
            tid = self._get_tid(s)
            if tid < 0:
                continue
            if not slot or tid < self._get_tid(slot):
                slot = s
        return slot

    def _code_if_mt(self, type, code):
        """
        Returns MT-equivalent event code for certain specific event codes
        """
        if type != EV_ABS:
            return code
        elif code == ABS_X:
            return  ABS_MT_POSITION_X
        elif code == ABS_Y:
            return ABS_MT_POSITION_Y
        elif code == ABS_PRESSURE:
            return ABS_MT_PRESSURE
        elif code == ABS_TOOL_WIDTH:
            return ABS_TOUCH_MAJOR
        else:
            return code

    def _get_valuator(self, type, code):
        """ Returns Valuator for given event type and code """
        if (not type in self.events) or (not code in self.events[type]):
            return None
        if type == EV_ABS:
            code = self._code_if_mt(type, code)
        return self.events[type][code]

    def _get_value(self, type, code):
        """
        Returns the value of the valuator with the give event (type, code).
        """
        axis = self._get_valuator(type, code)
        if not axis:
            return None
        return axis.value

    def _get_min(self, type, code):
        """
        Returns the min value of the valuator with the give event (type, code).

        Note: Only AbsValuators (EV_ABS) have max values.
        """
        axis = self._get_valuator(type, code)
        if not axis:
            return None
        return axis.min

    def _get_max(self, type, code):
        """
        Returns the min value of the valuator with the give event (type, code).

        Note: Only AbsValuators (EV_ABS) have max values.
        """
        axis = self._get_valuator(type, code)
        if not axis:
            return None
        return axis.max

    """ Public accessors """

    def get_num_fingers(self):
        if self.is_mt_b():
            return len(self._get_mt_valid_slots())
        elif self.is_mt_a():
            return 0  # TODO(djkurtz): MT-A
        else:  # Single-Touch case
            if not self._get_value(EV_KEY, BTN_TOUCH) == 1:
                return 0
            elif self._get_value(EV_KEY, BTN_TOOL_TRIPLETAP) == 1:
                return 3
            elif self._get_value(EV_KEY, BTN_TOOL_DOUBLETAP) == 1:
                return 2
            elif self._get_value(EV_KEY, BTN_TOOL_FINGER) == 1:
                return 1
            else:
                return 0

    def get_x(self):
        return self._get_value(EV_ABS, ABS_X)

    def get_x_min(self):
        return self._get_min(EV_ABS, ABS_X)

    def get_x_max(self):
        return self._get_max(EV_ABS, ABS_X)

    def get_y(self):
        return self._get_value(EV_ABS, ABS_Y)

    def get_y_min(self):
        return self._get_min(EV_ABS, ABS_Y)

    def get_y_max(self):
        return self._get_max(EV_ABS, ABS_Y)

    def get_pressure(self):
        return self._get_value(EV_ABS, ABS_PRESSURE)

    def get_pressure_min(self):
        return self._get_min(EV_ABS, ABS_PRESSURE)

    def get_pressure_max(self):
        return self._get_max(EV_ABS, ABS_PRESSURE)

    def get_left(self):
        return int(self._get_value(EV_KEY, BTN_LEFT) == 1)

    def get_right(self):
        return int(self._get_value(EV_KEY, BTN_RIGHT) == 1)

    def get_middle(self):
        return int(self._get_value(EV_KEY, BTN_MIDDLE) == 1)

    def get_microphone_insert(self):
        return self._get_value(EV_SW, SW_MICROPHONE_INSERT)

    def get_headphone_insert(self):
        return self._get_value(EV_SW, SW_HEADPHONE_INSERT)

    def get_lineout_insert(self):
        return self._get_value(EV_SW, SW_LINEOUT_INSERT)

    def is_touchpad(self):
        return ((EV_KEY in self.events) and
                (BTN_TOOL_FINGER in self.events[EV_KEY]) and
                (EV_ABS in self.events))

    def is_keyboard(self):
        return ((EV_KEY in self.events) and
                (KEY_F2 in self.events[EV_KEY]))

    def is_touchscreen(self):
        return ((EV_KEY in self.events) and
                (BTN_TOUCH in self.events[EV_KEY]) and
                (not BTN_TOOL_FINGER in self.events[EV_KEY]) and
                (EV_ABS in self.events))

    def is_mt_b(self):
        return self.is_mt() and ABS_MT_SLOT in self.events[EV_ABS]

    def is_mt_a(self):
        return self.is_mt() and ABS_MT_SLOT not in self.events[EV_ABS]

    def is_mt(self):
        return (EV_ABS in self.events and
                (set(self.events[EV_ABS]) & set(ABS_MT_RANGE)))

    def is_hp_jack(self):
        return (EV_SW in self.events and
                SW_HEADPHONE_INSERT in self.events[EV_SW])

    def is_mic_jack(self):
        return (EV_SW in self.events and
                SW_MICROPHONE_INSERT in self.events[EV_SW])

    def is_audio_jack(self):
        return (EV_SW in self.events and
                ((SW_HEADPHONE_INSERT in self.events[EV_SW]) or
                 (SW_MICROPHONE_INSERT in self.events[EV_SW] or
                 (SW_LINEOUT_INSERT in self.events[EV_SW]))))

    """ Debug helper print functions """

    def print_abs_info(self, axis):
        if EV_ABS in self.events and axis in self.events[EV_ABS]:
            a = self.events[EV_ABS][axis]
            print '      Value       %6d' % a.value
            print '      Min         %6d' % a.min
            print '      Max         %6d' % a.max
            if a.fuzz != 0:
                print '      Fuzz        %6d' % a.fuzz
            if a.flat != 0:
                print '      Flat        %6d' % a.flat
            if a.resolution != 0:
                print '      Resolution  %6d' % a.resolution

    def print_props(self):
        print ('Input driver Version: %d.%d.%d' %
               (self.version[0], self.version[1], self.version[2]))
        print ('Input device ID: bus %x vendor %x product %x version %x' %
               (self.id_bus, self.id_vendor, self.id_product, self.id_version))
        print 'Input device name: "%s"' % (self.name)
        for t in self.events:
            print '  Event type %d (%s)' % (t, EV_TYPES.get(t, '?'))
            for c in self.events[t]:
                if (t in EV_STRINGS):
                    code = EV_STRINGS[t].get(c, '?')
                    print '    Event code %s (%d)' % (code, c)
                else:
                    print '    Event code (%d)' % (c)
                self.print_abs_info(c)

    def get_slots(self):
        """ Get those slots with positive tracking IDs. """
        slot_dict = OrderedDict()
        for slot_index in range(self.num_slots):
            slot = self.mt_slots[slot_index]
            if self._get_tid(slot) == -1:
                continue
            slot_id = self._convert_slot_index_to_slot_id(slot_index)
            slot_dict[slot_id] = slot
        return slot_dict

    def print_slots(self):
        slot_dict = self.get_slots()
        for slot_id, slot in slot_dict.items():
            print 'slot #%d' % slot_id
            for a in slot:
                abs = EV_STRINGS[EV_ABS].get(a, '?')
                print '  %s = %6d' % (abs, slot[a].value)


def print_report(device):
    print '----- EV_SYN -----'
    if device.is_touchpad():
        f = device.get_num_fingers()
        if f == 0:
            return
        x = device.get_x()
        y = device.get_y()
        z = device.get_pressure()
        l = device.get_left()
        print 'Left=%d Fingers=%d X=%d Y=%d Pressure=%d' % (l, f, x, y, z)
        if device.is_mt():
            device.print_slots()


if __name__ == "__main__":
    from optparse import OptionParser
    import glob
    parser = OptionParser()

    parser.add_option("-a", "--audio_jack", action="store_true",
                      dest="audio_jack", default=False,
                      help="Find and use all audio jacks")
    parser.add_option("-d", "--devpath", dest="devpath",
                      default="/dev/input/event0",
                      help="device path (/dev/input/event0)")
    parser.add_option("-q", "--quiet", action="store_false", dest="verbose",
                      default=True, help="print less messages to stdout")
    parser.add_option("-t", "--touchpad", action="store_true", dest="touchpad",
                      default=False, help="Find and use first touchpad device")
    (options, args) = parser.parse_args()

    # TODO: Use gudev to detect touchpad
    devices = []
    if options.touchpad:
        for path in glob.glob('/dev/input/event*'):
            device = InputDevice(path)
            if device.is_touchpad():
                print 'Using touchpad %s.' % path
                options.devpath = path
                devices.append(device)
                break
        else:
            print 'No touchpad found!'
            exit()
    elif options.audio_jack:
        for path in glob.glob('/dev/input/event*'):
            device = InputDevice(path)
            if device.is_audio_jack():
                devices.append(device)
        device = None
    elif os.path.exists(options.devpath):
        print 'Using %s.' % options.devpath
        devices.append(InputDevice(options.devpath))
    else:
        print '%s does not exist.' % options.devpath
        exit()

    for device in devices:
        device.print_props()
        if device.is_touchpad():
            print ('x: (%d,%d), y: (%d,%d), z: (%d, %d)' %
                   (device.get_x_min(), device.get_x_max(),
                    device.get_y_min(), device.get_y_max(),
                    device.get_pressure_min(), device.get_pressure_max()))
            device.print_slots()
            print 'Number of fingers: %d' % device.get_num_fingers()
            print 'Current slot id: %d' % device.get_current_slot_id()
    print '------------------'
    print

    ev = InputEvent()
    while True:
        _rl, _, _ = select.select([d.f for d in devices], [], [])
        for fd in _rl:
            # Lookup for the device which owns fd.
            device = [d for d in devices if d.f == fd][0]
            try:
                ev.read(fd)
            except KeyboardInterrupt:
                exit()
            is_syn = device.process_event(ev)
            print ev
            if is_syn:
                print_report(device)