普通文本  |  672行  |  21.97 KB

#!/usr/bin/env python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import cmd
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import gobject
import threading

from functools import wraps


DBUS_ERROR = 'org.freedesktop.DBus.Error'
NEARD_PATH = '/org/neard/'
PROMPT = 'NFC> '

class NfcClientException(Exception):
    """Exception class for exceptions thrown by NfcClient."""


def print_message(message, newlines=2):
    """
    Prints the given message with extra wrapping newline characters.

    @param message: Message to print.
    @param newlines: Integer, specifying the number of '\n' characters that
            should be padded at the beginning and end of |message| before
            being passed to "print".

    """
    padding = newlines * '\n'
    message = padding + message + padding
    print message


def handle_errors(func):
    """
    Decorator for handling exceptions that are commonly raised by many of the
    methods in NfcClient.

    @param func: The function this decorator is wrapping.

    """
    @wraps(func)
    def _error_handler(*args):
        try:
            return func(*args)
        except dbus.exceptions.DBusException as e:
            if e.get_dbus_name() == DBUS_ERROR + '.ServiceUnknown':
                print_message('neard may have crashed or disappeared. '
                              'Check if neard is running and run "initialize" '
                              'from this shell.')
                return
            if e.get_dbus_name() == DBUS_ERROR + '.UnknownObject':
                print_message('Could not find object.')
                return
            print_message(str(e))
        except Exception as e:
            print_message(str(e))
    return _error_handler


class NfcClient(object):
    """
    neard D-Bus client

    """
    NEARD_SERVICE_NAME = 'org.neard'
    IMANAGER = NEARD_SERVICE_NAME + '.Manager'
    IADAPTER = NEARD_SERVICE_NAME + '.Adapter'
    ITAG = NEARD_SERVICE_NAME + '.Tag'
    IRECORD = NEARD_SERVICE_NAME + '.Record'
    IDEVICE = NEARD_SERVICE_NAME + '.Device'

    def __init__(self):
        self._mainloop = None
        self._mainloop_thread = None
        self._adapters = {}
        self._adapter_property_handler_matches = {}

    def begin(self):
        """
        Starts the D-Bus client.

        """
        # Here we run a GLib MainLoop in its own thread, so that the client can
        # listen to D-Bus signals while keeping the console interactive.
        self._dbusmainloop = dbus.mainloop.glib.DBusGMainLoop(
                set_as_default=True)
        dbus.mainloop.glib.threads_init()
        gobject.threads_init()

        def _mainloop_thread_func():
            self._mainloop = gobject.MainLoop()
            context = self._mainloop.get_context()
            self._run_loop = True
            while self._run_loop:
                context.iteration(True)
        self._mainloop_thread = threading.Thread(None, _mainloop_thread_func)
        self._mainloop_thread.start()

        self._bus = dbus.SystemBus()
        self.setup_manager()

    def end(self):
        """
        Stops the D-Bus client.

        """
        self._run_loop = False
        self._mainloop.quit()
        self._mainloop_thread.join()

    def restart(self):
        """Reinitializes the NFC client."""
        self.setup_manager()

    @handle_errors
    def _get_manager_proxy(self):
        return dbus.Interface(
                self._bus.get_object(self.NEARD_SERVICE_NAME, '/'),
                self.IMANAGER)

    @handle_errors
    def _get_adapter_proxy(self, adapter):
        return dbus.Interface(
                self._bus.get_object(self.NEARD_SERVICE_NAME, adapter),
                self.IADAPTER)

    def _get_cached_adapter_proxy(self, adapter):
        adapter_proxy = self._adapters.get(adapter, None)
        if not adapter_proxy:
            raise NfcClientException('Adapter "' + adapter + '" not found.')
        return adapter_proxy


    @handle_errors
    def _get_tag_proxy(self, tag):
        return dbus.Interface(
                self._bus.get_object(self.NEARD_SERVICE_NAME, tag),
                self.ITAG)

    @handle_errors
    def _get_device_proxy(self, device):
        return dbus.Interface(
                self._bus.get_object(self.NEARD_SERVICE_NAME, device),
                self.IDEVICE)

    @handle_errors
    def _get_record_proxy(self, record):
        return dbus.Interface(
                self._bus.get_object(self.NEARD_SERVICE_NAME, record),
                self.IRECORD)

    @handle_errors
    def _get_adapter_properties(self, adapter):
        adapter_proxy = self._get_cached_adapter_proxy(adapter)
        return adapter_proxy.GetProperties()

    def _get_adapters(self):
        props = self._manager.GetProperties()
        return props.get('Adapters', None)

    def setup_manager(self):
        """
        Creates a manager proxy and subscribes to adapter signals. This method
        will also initialize proxies for adapters if any are available.

        """
        # Create the manager proxy.
        self._adapters.clear()
        self._manager = self._get_manager_proxy()
        if not self._manager:
            print_message('Failed to create a proxy to the Manager interface.')
            return

        # Listen to the adapter added and removed signals.
        self._manager.connect_to_signal(
                'AdapterAdded',
                lambda adapter: self.register_adapter(str(adapter)))
        self._manager.connect_to_signal(
                'AdapterRemoved',
                lambda adapter: self.unregister_adapter(str(adapter)))

        # See if there are any adapters and create proxies for each.
        adapters = self._get_adapters()
        if adapters:
            for adapter in adapters:
                self.register_adapter(adapter)

    def register_adapter(self, adapter):
        """
        Registers an adapter proxy with the given object path and subscribes to
        adapter signals.

        @param adapter: string, containing the adapter's D-Bus object path.

        """
        print_message('Added adapter: ' + adapter)
        adapter_proxy = self._get_adapter_proxy(adapter)
        self._adapters[adapter] = adapter_proxy

        # Tag found/lost currently don't get fired. Monitor property changes
        # instead.
        if self._adapter_property_handler_matches.get(adapter, None) is None:
            self._adapter_property_handler_matches[adapter] = (
                    adapter_proxy.connect_to_signal(
                            'PropertyChanged',
                            (lambda name, value:
                                    self._adapter_property_changed_signal(
                                            adapter, name, value))))

    def unregister_adapter(self, adapter):
        """
        Removes the adapter proxy for the given object path from the internal
        cache of adapters.

        @param adapter: string, containing the adapter's D-Bus object path.

        """
        print_message('Removed adapter: ' + adapter)
        match = self._adapter_property_handler_matches.get(adapter, None)
        if match is not None:
            match.remove()
            self._adapter_property_handler_matches.pop(adapter)
        self._adapters.pop(adapter)

    def _adapter_property_changed_signal(self, adapter, name, value):
        if name == 'Tags' or name == 'Devices':
            print_message('Found ' + name + ': ' +
                          self._dbus_array_to_string(value))

    @handle_errors
    def show_adapters(self):
        """
        Prints the D-Bus object paths of all adapters that are available.

        """
        adapters = self._get_adapters()
        if not adapters:
            print_message('No adapters found.')
            return
        for adapter in adapters:
            print_message('  ' + str(adapter), newlines=0)
        print

    def _dbus_array_to_string(self, array):
        string = '[ '
        for value in array:
            string += ' ' + str(value) + ', '
        string += ' ]'
        return string

    def print_adapter_status(self, adapter):
        """
        Prints the properties of the given adapter.

        @param adapter: string, containing the adapter's D-Bus object path.

        """
        props = self._get_adapter_properties(adapter)
        if not props:
            return
        print_message('Status ' + adapter + ': ', newlines=0)
        for key, value in props.iteritems():
            if type(value) == dbus.Array:
                value = self._dbus_array_to_string(value)
            else:
                value = str(value)
            print_message('  ' + key + ' = ' + value, newlines=0)
        print

    @handle_errors
    def set_powered(self, adapter, powered):
        """
        Enables or disables the adapter.

        @param adapter: string, containing the adapter's D-Bus object path.
        @param powered: boolean that dictates whether the adapter will be
                enabled or disabled.

        """
        adapter_proxy = self._get_cached_adapter_proxy(adapter)
        if not adapter_proxy:
            return
        adapter_proxy.SetProperty('Powered', powered)

    @handle_errors
    def start_polling(self, adapter):
        """
        Starts polling for nearby tags and devices in "Initiator" mode.

        @param adapter: string, containing the adapter's D-Bus object path.

        """
        adapter_proxy = self._get_cached_adapter_proxy(adapter)
        adapter_proxy.StartPollLoop('Initiator')
        print_message('Started polling.')

    @handle_errors
    def stop_polling(self, adapter):
        """
        Stops polling for nearby tags and devices.

        @param adapter: string, containing the adapter's D-Bus object path.

        """
        adapter_proxy = self._get_cached_adapter_proxy(adapter)
        adapter_proxy.StopPollLoop()
        self._polling_stopped = True
        print_message('Stopped polling.')

    @handle_errors
    def show_tag_data(self, tag):
        """
        Prints the properties of the given tag, as well as the contents of any
        records associated with it.

        @param tag: string, containing the tag's D-Bus object path.

        """
        tag_proxy = self._get_tag_proxy(tag)
        if not tag_proxy:
            print_message('Tag "' + tag + '" not found.')
            return
        props = tag_proxy.GetProperties()
        print_message('Tag ' + tag + ': ', newlines=1)
        for key, value in props.iteritems():
            if key != 'Records':
                print_message('  ' + key + ' = ' + str(value), newlines=0)
        records = props['Records']
        if not records:
            return
        print_message('Records: ', newlines=1)
        for record in records:
            self.show_record_data(str(record))
        print

    @handle_errors
    def show_device_data(self, device):
        """
        Prints the properties of the given device, as well as the contents of
        any records associated with it.

        @param device: string, containing the device's D-Bus object path.

        """
        device_proxy = self._get_device_proxy(device)
        if not device_proxy:
            print_message('Device "' + device + '" not found.')
            return
        records = device_proxy.GetProperties()['Records']
        if not records:
            print_message('No records on device.')
            return
        print_message('Records: ', newlines=1)
        for record in records:
            self.show_record_data(str(record))
        print

    @handle_errors
    def show_record_data(self, record):
        """
        Prints the contents of the given record.

        @param record: string, containing the record's D-Bus object path.

        """
        record_proxy = self._get_record_proxy(record)
        if not record_proxy:
            print_message('Record "' + record + '" not found.')
            return
        props = record_proxy.GetProperties()
        print_message('Record ' + record + ': ', newlines=1)
        for key, value in props.iteritems():
            print '  ' + key + ' = ' + value
        print

    def _create_record_data(self, record_type, params):
        if record_type == 'Text':
            possible_keys = [ 'Encoding', 'Language', 'Representation' ]
            tag_data = { 'Type': 'Text' }
        elif record_type == 'URI':
            possible_keys = [ 'URI' ]
            tag_data = { 'Type': 'URI' }
        else:
            print_message('Writing record type "' + record_type +
                          '" currently not supported.')
            return None
        for key, value in params.iteritems():
            if key in possible_keys:
                tag_data[key] = value
        return tag_data

    @handle_errors
    def write_tag(self, tag, record_type, params):
        """
        Writes an NDEF record to the given tag.

        @param tag: string, containing the tag's D-Bus object path.
        @param record_type: The type of the record, e.g. Text or URI.
        @param params: dictionary, containing the parameters of the NDEF.

        """
        tag_data = self._create_record_data(record_type, params)
        if not tag_data:
            return
        tag_proxy = self._get_tag_proxy(tag)
        if not tag_proxy:
            print_message('Tag "' + tag + '" not found.')
            return
        tag_proxy.Write(tag_data)
        print_message('Tag written!')

    @handle_errors
    def push_to_device(self, device, record_type, params):
        """
        Pushes an NDEF record to the given device.

        @param device: string, containing the device's D-Bus object path.
        @param record_type: The type of the record, e.g. Text or URI.
        @param params: dictionary, containing the parameters of the NDEF.

        """
        record_data = self._create_record_data(record_type, params)
        if not record_data:
            return
        device_proxy = self._get_device_proxy(device)
        if not device_proxy:
            print_message('Device "' + device + '" not found.')
            return
        device_proxy.Push(record_data)
        print_message('NDEF pushed to device!')


class NfcConsole(cmd.Cmd):
    """
    Interactive console to interact with the NFC daemon.

    """
    def __init__(self):
        cmd.Cmd.__init__(self)
        self.prompt = PROMPT

    def begin(self):
        """
        Starts the interactive shell.

        """
        print_message('NFC console! Run "help" for a list of commands.',
                      newlines=1)
        self._nfc_client = NfcClient()
        self._nfc_client.begin()
        self.cmdloop()

    def can_exit(self):
        """Override"""
        return True

    def do_initialize(self, args):
        """Handles "initialize"."""
        if args:
            print_message('Command "initialize" expects no arguments.')
            return
        self._nfc_client.restart()

    def help_initialize(self):
        """Prints the help message for "initialize"."""
        print_message('Initializes the neard D-Bus client. This can be '
                      'run many times to restart the client in case of '
                      'neard failures or crashes.')

    def do_adapters(self, args):
        """Handles "adapters"."""
        if args:
            print_message('Command "adapters" expects no arguments.')
            return
        self._nfc_client.show_adapters()

    def help_adapters(self):
        """Prints the help message for "adapters"."""
        print_message('Displays the D-Bus object paths of the available '
                      'adapter objects.')

    def do_adapter_status(self, args):
        """Handles "adapter_status"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage: adapter_status <adapter>')
            return
        self._nfc_client.print_adapter_status(NEARD_PATH + args[0])

    def help_adapter_status(self):
        """Prints the help message for "adapter_status"."""
        print_message('Returns the properties of the given NFC adapter.\n\n'
                      '    Ex: "adapter_status nfc0"')

    def do_enable_adapter(self, args):
        """Handles "enable_adapter"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage: enable_adapter <adapter>')
            return
        self._nfc_client.set_powered(NEARD_PATH + args[0], True)

    def help_enable_adapter(self):
        """Prints the help message for "enable_adapter"."""
        print_message('Powers up the adapter. Ex: "enable_adapter nfc0"')

    def do_disable_adapter(self, args):
        """Handles "disable_adapter"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage: disable_adapter <adapter>')
            return
        self._nfc_client.set_powered(NEARD_PATH + args[0], False)

    def help_disable_adapter(self):
        """Prints the help message for "disable_adapter"."""
        print_message('Powers down the adapter. Ex: "disable_adapter nfc0"')

    def do_start_poll(self, args):
        """Handles "start_poll"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage: start_poll <adapter>')
            return
        self._nfc_client.start_polling(NEARD_PATH + args[0])

    def help_start_poll(self):
        """Prints the help message for "start_poll"."""
        print_message('Initiates a poll loop.\n\n    Ex: "start_poll nfc0"')

    def do_stop_poll(self, args):
        """Handles "stop_poll"."""
        args = args.split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage: stop_poll <adapter>')
            return
        self._nfc_client.stop_polling(NEARD_PATH + args[0])

    def help_stop_poll(self):
        """Prints the help message for "stop_poll"."""
        print_message('Stops a poll loop.\n\n    Ex: "stop_poll nfc0"')

    def do_read_tag(self, args):
        """Handles "read_tag"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage read_tag <tag>')
            return
        self._nfc_client.show_tag_data(NEARD_PATH + args[0])

    def help_read_tag(self):
        """Prints the help message for "read_tag"."""
        print_message('Reads the contents of a tag.  Ex: read_tag nfc0/tag0')

    def _parse_record_args(self, record_type, args):
        if record_type == 'Text':
            if len(args) < 5:
                print_message('Usage: write_tag <tag> Text <encoding> '
                              '<language> <representation>')
                return None
            if args[2] not in [ 'UTF-8', 'UTF-16' ]:
                print_message('Encoding must be one of "UTF-8" or "UTF-16".')
                return None
            return {
                'Encoding': args[2],
                'Language': args[3],
                'Representation': ' '.join(args[4:])
            }
        if record_type == 'URI':
            if len(args) != 3:
                print_message('Usage: write_tag <tag> URI <uri>')
                return None
            return {
                'URI': args[2]
            }
        print_message('Only types "Text" and "URI" are supported by this '
                      'script.')
        return None

    def do_write_tag(self, args):
        """Handles "write_tag"."""
        args = args.strip().split(' ')
        if len(args) < 3:
            print_message('Usage: write_tag <tag> [params]')
            return
        record_type = args[1]
        params = self._parse_record_args(record_type, args)
        if not params:
            return
        self._nfc_client.write_tag(NEARD_PATH + args[0],
                                   record_type, params)

    def help_write_tag(self):
        """Prints the help message for "write_tag"."""
        print_message('Writes the given data to a tag. Usage:\n'
                      '  write_tag <tag> Text <encoding> <language> '
                      '<representation>\n  write_tag <tag> URI <uri>')

    def do_read_device(self, args):
        """Handles "read_device"."""
        args = args.strip().split(' ')
        if len(args) != 1 or not args[0]:
            print_message('Usage read_device <device>')
            return
        self._nfc_client.show_device_data(NEARD_PATH + args[0])

    def help_read_device(self):
        """Prints the help message for "read_device"."""
        print_message('Reads the contents of a device.  Ex: read_device '
                      'nfc0/device0')

    def do_push_to_device(self, args):
        """Handles "push_to_device"."""
        args = args.strip().split(' ')
        if len(args) < 3:
            print_message('Usage: push_to_device <device> [params]')
            return
        record_type = args[1]
        params = self._parse_record_args(record_type, args)
        if not params:
            return
        self._nfc_client.push_to_device(NEARD_PATH + args[0],
                                        record_type, params)

    def help_push_to_device(self):
        """Prints the help message for "push_to_device"."""
        print_message('Pushes the given data to a device. Usage:\n'
                      '  push_to_device <device> Text <encoding> <language> '
                      '<representation>\n  push_to_device <device> URI <uri>')

    def do_exit(self, args):
        """
        Handles the 'exit' command.

        @param args: Arguments to the command. Unused.

        """
        if args:
            print_message('Command "exit" expects no arguments.')
            return
        resp = raw_input('Are you sure? (yes/no): ')
        if resp == 'yes':
            print_message('Goodbye!')
            self._nfc_client.end()
            return True
        if resp != 'no':
            print_message('Did not understand: ' + resp)
        return False

    def help_exit(self):
        """Handles the 'help exit' command."""
        print_message('Exits the console.')

    do_EOF = do_exit
    help_EOF = help_exit


def main():
    """Main function."""
    NfcConsole().begin()


if __name__ == '__main__':
    main()