#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Description:
#
# Class for handling linux 'evdev' input devices.
#
# Provides evtest-like functionality if run from the command line:
# $ input_device.py -d /dev/input/event6
""" Read properties and events of a linux input device. """
import array
import copy
import fcntl
import os.path
import select
import struct
import time
from collections import OrderedDict
from linux_input import *
class Valuator:
""" A Valuator just stores a value """
def __init__(self):
self.value = 0
class SwValuator(Valuator):
""" A Valuator used for EV_SW (switch) events """
def __init__(self, value):
self.value = value
class AbsValuator(Valuator):
"""
An AbsValuator, used for EV_ABS events stores a value as well as other
properties of the corresponding absolute axis.
"""
def __init__(self, value, min_value, max_value, fuzz, flat, resolution):
self.value = value
self.min = min_value
self.max = max_value
self.fuzz = fuzz
self.flat = flat
self.resolution = resolution
class InputEvent:
"""
Linux evdev input event
An input event has the following fields which can be accessed as public
properties of this class:
tv_sec
tv_usec
type
code
value
"""
def __init__(self, tv_sec=0, tv_usec=0, type=0, code=0, value=0):
self.format = input_event_t
self.format_size = struct.calcsize(self.format)
(self.tv_sec, self.tv_usec, self.type, self.code,
self.value) = (tv_sec, tv_usec, type, code, value)
def read(self, stream):
""" Read an input event from the provided stream and unpack it. """
packed = stream.read(self.format_size)
(self.tv_sec, self.tv_usec, self.type, self.code,
self.value) = struct.unpack(self.format, packed)
def write(self, stream):
""" Pack an input event and write it to the provided stream. """
packed = struct.pack(self.format, self.tv_sec, self.tv_usec, self.type,
self.code, self.value)
stream.write(packed)
stream.flush()
def __str__(self):
t = EV_TYPES.get(self.type, self.type)
if self.type in EV_STRINGS:
c = EV_STRINGS[self.type].get(self.code, self.code)
else:
c = self.code
return ('%d.%06d: %s[%s] = %d' %
(self.tv_sec, self.tv_usec, t, c, self.value))
class InputDevice:
"""
Linux evdev input device
A linux kernel "evdev" device sends a stream of "input events".
These events are grouped together into "input reports", which is a set of
input events ending in a single EV_SYN/SYN_REPORT event.
Each input event is tagged with a type and a code.
A given input device supports a subset of the possible types, and for
each type it supports a subset of the possible codes for that type.
The device maintains a "valuator" for each supported type/code pairs.
There are two types of "valuators":
Normal valuators represent just a value.
Absolute valuators are only for EV_ABS events. They have more fields:
value, minimum, maximum, resolution, fuzz, flatness
Note: Relative and Absolute "Valuators" are also often called relative
and absolute axis, respectively.
The evdev protocol is stateful. Input events are only sent when the values
of a valuator actually changes. This dramatically reduces the stream of
events emenating from the kernel.
Multitouch devices are a special case. There are two types of multitouch
devices defined in the kernel:
Multitouch type "A" (MT-A) devices:
In each input report, the device sends an unordered list of all
active contacts. The data for each active contact is separated
in the input report by an EV_SYN/SYN_MT_REPORT event.
Thus, the MT-A contact event stream is not stateful.
Note: MT-A is not currently supported by this class.
Multitouch type "B" (MT-B) devices:
The device maintains a list of slots, where each slot contains a
single contact. In each input report, the device only sends
information about the slots that have changed.
Thus, the MT-B contact event stream is stateful.
When reporting multiple slots, the EV_ABS/MT_SLOT valuator is used
to indicate the 'current' slot for which subsequent EV_ABS/ABS_MT_*
valuator events apply.
An inactive slot has EV_ABS/ABS_MT_TRACKING_ID == -1
Active slots have EV_ABS/ABS_MT_TRACKING_ID >= 0
Besides maintaining the set of supported ABS_MT valuators in the supported
valuator list, a array of slots is also maintained. Each slot has its own
unique copy of just the supported ABS_MT valuators. This represents the
current state of that slot.
"""
def __init__(self, path, ev_syn_cb=None):
"""
Constructor opens the device file and probes its properties.
Note: The device file is left open when the constructor exits.
"""
self.path = path
self.ev_syn_cb = ev_syn_cb
self.events = {} # dict { ev_type : dict { ev_code : Valuator } }
self.mt_slots = [] # [ dict { mt_code : AbsValuator } ] * |MT-B slots|
# Open the device node, and use ioctls to probe its properties
self.f = None
self.f = open(path, 'rb+', buffering=0)
self._ioctl_version()
self._ioctl_id()
self._ioctl_name()
for t in self._ioctl_types():
self._ioctl_codes(t)
self._setup_mt_slots()
def __del__(self):
"""
Deconstructor closes the device file, if it is open.
"""
if self.f and not self.f.closed:
self.f.close()
def process_event(self, ev):
"""
Processes an incoming input event.
Returns True for EV_SYN/SYN_REPORT events to indicate that a complete
input report has been received.
Returns False for other events.
Events not supported by this device are silently ignored.
For MT events, updates the slot valuator value for the current slot.
If current slot is the 'primary' slot, also updates the events entry.
For all other events, updates the corresponding valuator value.
"""
if ev.type == EV_SYN and ev.code == SYN_REPORT:
return True
elif ev.type not in self.events or ev.code not in self.events[ev.type]:
return False
elif self.is_mt_b() and ev.type == EV_ABS and ev.code in ABS_MT_RANGE:
# TODO: Handle MT-A
slot = self._get_current_slot()
slot[ev.code].value = ev.value
# if the current slot is the "primary" slot,
# update the events[] entry, too.
if slot == self._get_mt_primary_slot():
self.events[ev.type][ev.code].value = ev.value
else:
self.events[ev.type][ev.code].value = ev.value
return False
def _ioctl_version(self):
""" Queries device file for version information. """
# Version is a 32-bit integer, which encodes 8-bit major version,
# 8-bit minor version and 16-bit revision.
version = array.array('I', [0])
fcntl.ioctl(self.f, EVIOCGVERSION, version, 1)
self.version = (version[0] >> 16, (version[0] >> 8) & 0xff,
version[0] & 0xff)
def _ioctl_id(self):
""" Queries device file for input device identification. """
# struct input_id is 4 __u16
gid = array.array('H', [0] * 4)
fcntl.ioctl(self.f, EVIOCGID, gid, 1)
self.id_bus = gid[ID_BUS]
self.id_vendor = gid[ID_VENDOR]
self.id_product = gid[ID_PRODUCT]
self.id_version = gid[ID_VERSION]
def _ioctl_name(self):
""" Queries device file for the device name. """
# Device name is a C-string up to 255 bytes in length.
name_len = 255
name = array.array('B', [0] * name_len)
name_len = fcntl.ioctl(self.f, EVIOCGNAME(name_len), name, 1)
self.name = name[0:name_len-1].tostring()
def _ioctl_get_switch(self, sw):
"""
Queries device file for current value of all switches and returns
a boolean indicating whether the switch sw is set.
"""
size = SW_CNT / 8 # Buffer size of one __u16
buf = array.array('H', [0])
fcntl.ioctl(self.f, EVIOCGSW(size), buf)
return SwValuator(((buf[0] >> sw) & 0x01) == 1)
def _ioctl_absinfo(self, axis):
"""
Queries device file for absinfo structure for given absolute axis.
"""
# struct input_absinfo is 6 __s32
a = array.array('i', [0] * 6)
fcntl.ioctl(self.f, EVIOCGABS(axis), a, 1)
return AbsValuator(a[0], a[1], a[2], a[3], a[4], a[5])
def _ioctl_codes(self, ev_type):
"""
Queries device file for supported event codes for given event type.
"""
self.events[ev_type] = {}
if ev_type not in EV_SIZES:
return
size = EV_SIZES[ev_type] / 8 # Convert bits to bytes
ev_code = array.array('B', [0] * size)
try:
count = fcntl.ioctl(self.f, EVIOCGBIT(ev_type, size), ev_code, 1)
for c in range(count * 8):
if test_bit(c, ev_code):
if ev_type == EV_ABS:
self.events[ev_type][c] = self._ioctl_absinfo(c)
elif ev_type == EV_SW:
self.events[ev_type][c] = self._ioctl_get_switch(c)
else:
self.events[ev_type][c] = Valuator()
except IOError as (errno, strerror):
# Errno 22 signifies that this event type has no event codes.
if errno != 22:
raise
def _ioctl_types(self):
""" Queries device file for supported event types. """
ev_types = array.array('B', [0] * (EV_CNT / 8))
fcntl.ioctl(self.f, EVIOCGBIT(EV_SYN, EV_CNT / 8), ev_types, 1)
types = []
for t in range(EV_CNT):
if test_bit(t, ev_types):
types.append(t)
return types
def _convert_slot_index_to_slot_id(self, index):
""" Convert a slot index in self.mt_slots to its slot id. """
return self.abs_mt_slot.min + index
def _ioctl_mt_slots(self):
"""Query mt slots values using ioctl.
The ioctl buffer argument should be binary equivalent to
struct input_mt_request_layout {
__u32 code;
__s32 values[num_slots];
Note that the slots information returned by EVIOCGMTSLOTS
corresponds to the slot ids ranging from abs_mt_slot.min to
abs_mt_slot.max which is not necessarily the same as the
slot indexes ranging from 0 to num_slots - 1 in self.mt_slots.
We need to map between the slot index and the slot id correctly.
};
"""
# Iterate through the absolute mt events that are supported.
for c in range(ABS_MT_FIRST, ABS_MT_LAST):
if c not in self.events[EV_ABS]:
continue
# Sync with evdev kernel driver for the specified code c.
mt_slot_info = array.array('i', [c] + [0] * self.num_slots)
mt_slot_info_len = (self.num_slots + 1) * mt_slot_info.itemsize
fcntl.ioctl(self.f, EVIOCGMTSLOTS(mt_slot_info_len), mt_slot_info)
values = mt_slot_info[1:]
for slot_index in range(self.num_slots):
slot_id = self._convert_slot_index_to_slot_id(slot_index)
self.mt_slots[slot_index][c].value = values[slot_id]
def _setup_mt_slots(self):
"""
Sets up the device's mt_slots array.
Each element of the mt_slots array is initialized as a deepcopy of a
dict containing all of the MT valuators from the events dict.
"""
# TODO(djkurtz): MT-A
if not self.is_mt_b():
return
ev_abs = self.events[EV_ABS]
# Create dict containing just the MT valuators
mt_abs_info = dict((axis, ev_abs[axis])
for axis in ev_abs
if axis in ABS_MT_RANGE)
# Initialize TRACKING_ID to -1
mt_abs_info[ABS_MT_TRACKING_ID].value = -1
# Make a copy of mt_abs_info for each MT slot
self.abs_mt_slot = ev_abs[ABS_MT_SLOT]
self.num_slots = self.abs_mt_slot.max - self.abs_mt_slot.min + 1
for s in range(self.num_slots):
self.mt_slots.append(copy.deepcopy(mt_abs_info))
self._ioctl_mt_slots()
def get_current_slot_id(self):
"""
Return the current slot id.
"""
if not self.is_mt_b():
return None
return self.events[EV_ABS][ABS_MT_SLOT].value
def _get_current_slot(self):
"""
Returns the current slot, as indicated by the last ABS_MT_SLOT event.
"""
current_slot_id = self.get_current_slot_id()
if current_slot_id is None:
return None
return self.mt_slots[current_slot_id]
def _get_tid(self, slot):
""" Returns the tracking_id for the given MT slot. """
return slot[ABS_MT_TRACKING_ID].value
def _get_mt_valid_slots(self):
"""
Returns a list of valid slots.
A valid slot is a slot whose tracking_id != -1.
"""
return [s for s in self.mt_slots if self._get_tid(s) != -1]
def _get_mt_primary_slot(self):
"""
Returns the "primary" MT-B slot.
The "primary" MT-B slot is arbitrarily chosen as the slot with lowest
tracking_id (> -1). It is used to make an MT-B device look like
single-touch (ST) device.
"""
slot = None
for s in self.mt_slots:
tid = self._get_tid(s)
if tid < 0:
continue
if not slot or tid < self._get_tid(slot):
slot = s
return slot
def _code_if_mt(self, type, code):
"""
Returns MT-equivalent event code for certain specific event codes
"""
if type != EV_ABS:
return code
elif code == ABS_X:
return ABS_MT_POSITION_X
elif code == ABS_Y:
return ABS_MT_POSITION_Y
elif code == ABS_PRESSURE:
return ABS_MT_PRESSURE
elif code == ABS_TOOL_WIDTH:
return ABS_TOUCH_MAJOR
else:
return code
def _get_valuator(self, type, code):
""" Returns Valuator for given event type and code """
if (not type in self.events) or (not code in self.events[type]):
return None
if type == EV_ABS:
code = self._code_if_mt(type, code)
return self.events[type][code]
def _get_value(self, type, code):
"""
Returns the value of the valuator with the give event (type, code).
"""
axis = self._get_valuator(type, code)
if not axis:
return None
return axis.value
def _get_min(self, type, code):
"""
Returns the min value of the valuator with the give event (type, code).
Note: Only AbsValuators (EV_ABS) have max values.
"""
axis = self._get_valuator(type, code)
if not axis:
return None
return axis.min
def _get_max(self, type, code):
"""
Returns the min value of the valuator with the give event (type, code).
Note: Only AbsValuators (EV_ABS) have max values.
"""
axis = self._get_valuator(type, code)
if not axis:
return None
return axis.max
""" Public accessors """
def get_num_fingers(self):
if self.is_mt_b():
return len(self._get_mt_valid_slots())
elif self.is_mt_a():
return 0 # TODO(djkurtz): MT-A
else: # Single-Touch case
if not self._get_value(EV_KEY, BTN_TOUCH) == 1:
return 0
elif self._get_value(EV_KEY, BTN_TOOL_TRIPLETAP) == 1:
return 3
elif self._get_value(EV_KEY, BTN_TOOL_DOUBLETAP) == 1:
return 2
elif self._get_value(EV_KEY, BTN_TOOL_FINGER) == 1:
return 1
else:
return 0
def get_x(self):
return self._get_value(EV_ABS, ABS_X)
def get_x_min(self):
return self._get_min(EV_ABS, ABS_X)
def get_x_max(self):
return self._get_max(EV_ABS, ABS_X)
def get_y(self):
return self._get_value(EV_ABS, ABS_Y)
def get_y_min(self):
return self._get_min(EV_ABS, ABS_Y)
def get_y_max(self):
return self._get_max(EV_ABS, ABS_Y)
def get_pressure(self):
return self._get_value(EV_ABS, ABS_PRESSURE)
def get_pressure_min(self):
return self._get_min(EV_ABS, ABS_PRESSURE)
def get_pressure_max(self):
return self._get_max(EV_ABS, ABS_PRESSURE)
def get_left(self):
return int(self._get_value(EV_KEY, BTN_LEFT) == 1)
def get_right(self):
return int(self._get_value(EV_KEY, BTN_RIGHT) == 1)
def get_middle(self):
return int(self._get_value(EV_KEY, BTN_MIDDLE) == 1)
def get_microphone_insert(self):
return self._get_value(EV_SW, SW_MICROPHONE_INSERT)
def get_headphone_insert(self):
return self._get_value(EV_SW, SW_HEADPHONE_INSERT)
def get_lineout_insert(self):
return self._get_value(EV_SW, SW_LINEOUT_INSERT)
def is_touchpad(self):
return ((EV_KEY in self.events) and
(BTN_TOOL_FINGER in self.events[EV_KEY]) and
(EV_ABS in self.events))
def is_keyboard(self):
return ((EV_KEY in self.events) and
(KEY_F2 in self.events[EV_KEY]))
def is_touchscreen(self):
return ((EV_KEY in self.events) and
(BTN_TOUCH in self.events[EV_KEY]) and
(not BTN_TOOL_FINGER in self.events[EV_KEY]) and
(EV_ABS in self.events))
def is_mt_b(self):
return self.is_mt() and ABS_MT_SLOT in self.events[EV_ABS]
def is_mt_a(self):
return self.is_mt() and ABS_MT_SLOT not in self.events[EV_ABS]
def is_mt(self):
return (EV_ABS in self.events and
(set(self.events[EV_ABS]) & set(ABS_MT_RANGE)))
def is_hp_jack(self):
return (EV_SW in self.events and
SW_HEADPHONE_INSERT in self.events[EV_SW])
def is_mic_jack(self):
return (EV_SW in self.events and
SW_MICROPHONE_INSERT in self.events[EV_SW])
def is_audio_jack(self):
return (EV_SW in self.events and
((SW_HEADPHONE_INSERT in self.events[EV_SW]) or
(SW_MICROPHONE_INSERT in self.events[EV_SW] or
(SW_LINEOUT_INSERT in self.events[EV_SW]))))
""" Debug helper print functions """
def print_abs_info(self, axis):
if EV_ABS in self.events and axis in self.events[EV_ABS]:
a = self.events[EV_ABS][axis]
print ' Value %6d' % a.value
print ' Min %6d' % a.min
print ' Max %6d' % a.max
if a.fuzz != 0:
print ' Fuzz %6d' % a.fuzz
if a.flat != 0:
print ' Flat %6d' % a.flat
if a.resolution != 0:
print ' Resolution %6d' % a.resolution
def print_props(self):
print ('Input driver Version: %d.%d.%d' %
(self.version[0], self.version[1], self.version[2]))
print ('Input device ID: bus %x vendor %x product %x version %x' %
(self.id_bus, self.id_vendor, self.id_product, self.id_version))
print 'Input device name: "%s"' % (self.name)
for t in self.events:
print ' Event type %d (%s)' % (t, EV_TYPES.get(t, '?'))
for c in self.events[t]:
if (t in EV_STRINGS):
code = EV_STRINGS[t].get(c, '?')
print ' Event code %s (%d)' % (code, c)
else:
print ' Event code (%d)' % (c)
self.print_abs_info(c)
def get_slots(self):
""" Get those slots with positive tracking IDs. """
slot_dict = OrderedDict()
for slot_index in range(self.num_slots):
slot = self.mt_slots[slot_index]
if self._get_tid(slot) == -1:
continue
slot_id = self._convert_slot_index_to_slot_id(slot_index)
slot_dict[slot_id] = slot
return slot_dict
def print_slots(self):
slot_dict = self.get_slots()
for slot_id, slot in slot_dict.items():
print 'slot #%d' % slot_id
for a in slot:
abs = EV_STRINGS[EV_ABS].get(a, '?')
print ' %s = %6d' % (abs, slot[a].value)
def print_report(device):
print '----- EV_SYN -----'
if device.is_touchpad():
f = device.get_num_fingers()
if f == 0:
return
x = device.get_x()
y = device.get_y()
z = device.get_pressure()
l = device.get_left()
print 'Left=%d Fingers=%d X=%d Y=%d Pressure=%d' % (l, f, x, y, z)
if device.is_mt():
device.print_slots()
if __name__ == "__main__":
from optparse import OptionParser
import glob
parser = OptionParser()
parser.add_option("-a", "--audio_jack", action="store_true",
dest="audio_jack", default=False,
help="Find and use all audio jacks")
parser.add_option("-d", "--devpath", dest="devpath",
default="/dev/input/event0",
help="device path (/dev/input/event0)")
parser.add_option("-q", "--quiet", action="store_false", dest="verbose",
default=True, help="print less messages to stdout")
parser.add_option("-t", "--touchpad", action="store_true", dest="touchpad",
default=False, help="Find and use first touchpad device")
(options, args) = parser.parse_args()
# TODO: Use gudev to detect touchpad
devices = []
if options.touchpad:
for path in glob.glob('/dev/input/event*'):
device = InputDevice(path)
if device.is_touchpad():
print 'Using touchpad %s.' % path
options.devpath = path
devices.append(device)
break
else:
print 'No touchpad found!'
exit()
elif options.audio_jack:
for path in glob.glob('/dev/input/event*'):
device = InputDevice(path)
if device.is_audio_jack():
devices.append(device)
device = None
elif os.path.exists(options.devpath):
print 'Using %s.' % options.devpath
devices.append(InputDevice(options.devpath))
else:
print '%s does not exist.' % options.devpath
exit()
for device in devices:
device.print_props()
if device.is_touchpad():
print ('x: (%d,%d), y: (%d,%d), z: (%d, %d)' %
(device.get_x_min(), device.get_x_max(),
device.get_y_min(), device.get_y_max(),
device.get_pressure_min(), device.get_pressure_max()))
device.print_slots()
print 'Number of fingers: %d' % device.get_num_fingers()
print 'Current slot id: %d' % device.get_current_slot_id()
print '------------------'
print
ev = InputEvent()
while True:
_rl, _, _ = select.select([d.f for d in devices], [], [])
for fd in _rl:
# Lookup for the device which owns fd.
device = [d for d in devices if d.f == fd][0]
try:
ev.read(fd)
except KeyboardInterrupt:
exit()
is_syn = device.process_event(ev)
print ev
if is_syn:
print_report(device)