#!/usr/bin/env python import dbus import dbus.decorators import dbus.glib import gobject import sys import getopt from signal import * mgr_cmds = [ "InterfaceVersion", "ListAdapters", "DefaultAdapter" ] mgr_signals = [ "AdapterAdded", "AdapterRemoved" ] dev_cmds = [ "GetAddress", "GetVersion", "GetRevision", "GetManufacturer", "GetCompany", "GetMode", "SetMode", "GetDiscoverableTimeout", "SetDiscoverableTimeout", "IsConnectable", "IsDiscoverable", "IsConnected", "ListConnections", "GetMajorClass", "ListAvailableMinorClasses", "GetMinorClass", "SetMinorClass", "GetServiceClasses", "GetName", "SetName", "GetRemoteVersion", "GetRemoteRevision", "GetRemoteManufacturer", "GetRemoteCompany", "GetRemoteMajorClass", "GetRemoteMinorClass", "GetRemoteServiceClasses", "GetRemoteClass", "GetRemoteName", "GetRemoteAlias", "SetRemoteAlias", "ClearRemoteAlias", "LastSeen", "LastUsed", "DisconnectRemoteDevice", "CreateBonding", "CancelBondingProcess", "RemoveBonding", "HasBonding", "ListBondings", "GetPinCodeLength", "GetEncryptionKeySize", "DiscoverDevices", "DiscoverDevicesWithoutNameResolving", "CancelDiscovery", "ListRemoteDevices", "ListRecentRemoteDevices" ] dev_signals = [ "ModeChanged", "NameChanged", "MinorClassChanged", "DiscoveryStarted", "DiscoveryCompleted", "RemoteDeviceFound", "RemoteNameUpdated", "RemoteNameFailed", "RemoteAliasChanged" "RemoteAliasCleared", "RemoteDeviceConnected", "RemoteDeviceDisconnectRequested", "RemoteDeviceDisconnected", "BondingCreated", "BondingRemoved" ] dev_signals_filter = [ "/org/bluez/hci0", "/org/bluez/hci1", "/org/bluez/hci2", "/org/bluez/hci3", "/org/bluez/hci4", "/org/bluez/hci5", "/org/bluez/hci6", "/org/bluez/hci7" ] class Tester: exit_events = [] dev_path = None need_dev = False listen = False at_interrupt = None def __init__(self, argv): self.name = argv[0] self.parse_args(argv[1:]) try: self.dbus_setup() except dbus.DBusException, e: print 'Failed to do D-Bus setup: %s' % e sys.exit(1) def parse_args(self, argv): try: opts, args = getopt.getopt(argv, "hli:") except getopt.GetoptError: self.usage() sys.exit(1) for o, a in opts: if o == "-h": self.usage() sys.exit() elif o == "-l": self.listen = True elif o == "-i": if a[0] == '/': self.dev_path = a else: self.dev_path = '/org/bluez/%s' % a if not (args or self.listen): self.usage() sys.exit(1) if args: self.cmd = args[0] self.cmd_args = args[1:] def dbus_dev_setup(self): if not self.dev_path: try: self.dbus_mgr_setup() self.dev_path = self.manager.DefaultAdapter() except dbus.DBusException, e: print 'Failed to get default device: %s' % e sys.exit(1) try: obj = self.bus.get_object('org.bluez', self.dev_path) self.device = dbus.Interface(obj, 'org.bluez.Adapter') except dbus.DBusException, e: print 'Failed to setup device path: %s' % e sys.exit(1) def dbus_dev_sig_setup(self): try: for signal in dev_signals: for path in dev_signals_filter: self.bus.add_signal_receiver(self.dev_signal_handler, signal, 'org.bluez.Adapter', 'org.bluez', path, message_keyword='dbus_message') except dbus.DBusException, e: print 'Failed to setup signal handler for device path: %s' % e sys.exit(1) def dbus_mgr_sig_setup(self): try: for signal in mgr_signals: self.bus.add_signal_receiver(self.mgr_signal_handler, signal,'org.bluez.Manager', 'org.bluez', '/org/bluez') except dbus.DBusException, e: print 'Failed to setup signal handler for manager path: %s' % e sys.exit(1) def dbus_mgr_setup(self): self.manager_obj = self.bus.get_object('org.bluez', '/org/bluez') self.manager = dbus.Interface(self.manager_obj, 'org.bluez.Manager') def dbus_setup(self): self.bus = dbus.SystemBus() def usage(self): print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name print ' -i <dev> Specify device (e.g. "hci0" or "/org/bluez/hci0")' print ' -l Listen for events (no command required)' print ' -h Show this help' print 'Manager commands:' for cmd in mgr_cmds: print '\t%s' % cmd print 'Adapter commands:' for cmd in dev_cmds: print '\t%s' % cmd #@dbus.decorators.explicitly_pass_message def dev_signal_handler(*args, **keywords): dbus_message = keywords["dbus_message"] print '%s - %s: ' % (dbus_message.get_member(), dbus_message.get_path()), for arg in args[1:]: print '%s ' % arg, print #@dbus.decorators.explicitly_pass_message def mgr_signal_handler(*args, **keywords): dbus_message = keywords["dbus_message"] print '%s: ' % dbus_message.get_member() for arg in args[1:]: print '%s ' % arg, print def signal_cb(self, sig, frame): print 'Caught signal, exiting' if self.at_interrupt: self.at_interrupt() self.main_loop.quit() def call_mgr_dbus_func(self): if self.cmd == 'InterfaceVersion': try: print self.manager.InterfaceVersion() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) if self.cmd == 'ListAdapters': try: devices = self.manager.ListAdapters() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) for device in devices: print device elif self.cmd == 'DefaultAdapter': try: print self.manager.DefaultAdapter() except dbus.DBusException, e: print 'Sending %s failed: %s' % (self.cmd, e) sys.exit(1) def call_dev_dbus_func(self): try: if self.cmd == 'GetAddress': print self.device.GetAddress() elif self.cmd == 'GetManufacturer': print self.device.GetManufacturer() elif self.cmd == 'GetVersion': print self.device.GetVersion() elif self.cmd == 'GetRevision': print self.device.GetRevision() elif self.cmd == 'GetCompany': print self.device.GetCompany() elif self.cmd == 'GetMode': print self.device.GetMode() elif self.cmd == 'SetMode': if len(self.cmd_args) == 1: self.device.SetMode(self.cmd_args[0]) else: print 'Usage: %s -i <dev> SetMode scan_mode' % self.name elif self.cmd == 'GetDiscoverableTimeout': print '%u' % (self.device.GetDiscoverableTimeout()) elif self.cmd == 'SetDiscoverableTimeout': if len(self.cmd_args) == 1: self.device.SetDiscoverableTimeout(dbus.UInt32(self.cmd_args[0])) else: print 'Usage: %s -i <dev> SetDiscoverableTimeout timeout' % self.name elif self.cmd == 'IsConnectable': print self.device.IsConnectable() elif self.cmd == 'IsDiscoverable': print self.device.IsDiscoverable() elif self.cmd == 'IsConnected': if len(self.cmd_args) == 1: print self.device.IsConnected(self.cmd_args[0]) else: print 'Usage: %s -i <dev> IsConnected address' % self.name elif self.cmd == 'ListConnections': print self.device.ListConnections() elif self.cmd == 'GetMajorClass': print self.device.GetMajorClass() elif self.cmd == 'ListAvailableMinorClasses': print self.device.ListAvailableMinorClasses() elif self.cmd == 'GetMinorClass': print self.device.GetMinorClass() elif self.cmd == 'SetMinorClass': if len(self.cmd_args) == 1: self.device.SetMinorClass(self.cmd_args[0]) else: print 'Usage: %s -i <dev> SetMinorClass minor' % self.name elif self.cmd == 'GetServiceClasses': classes = self.device.GetServiceClasses() for clas in classes: print clas, elif self.cmd == 'GetName': print self.device.GetName() elif self.cmd == 'SetName': if len(self.cmd_args) == 1: self.device.SetName(self.cmd_args[0]) else: print 'Usage: %s -i <dev> SetName newname' % self.name elif self.cmd == 'GetRemoteName': if len(self.cmd_args) == 1: print self.device.GetRemoteName(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteName address' % self.name elif self.cmd == 'GetRemoteVersion': if len(self.cmd_args) == 1: print self.device.GetRemoteVersion(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteVersion address' % self.name elif self.cmd == 'GetRemoteRevision': if len(self.cmd_args) == 1: print self.device.GetRemoteRevision(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteRevision address' % self.name elif self.cmd == 'GetRemoteManufacturer': if len(self.cmd_args) == 1: print self.device.GetRemoteManufacturer(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteManufacturer address' % self.name elif self.cmd == 'GetRemoteCompany': if len(self.cmd_args) == 1: print self.device.GetRemoteCompany(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteCompany address' % self.name elif self.cmd == 'GetRemoteAlias': if len(self.cmd_args) == 1: print self.device.GetRemoteAlias(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteAlias address' % self.name elif self.cmd == 'GetRemoteMajorClass': if len(self.cmd_args) == 1: print self.device.GetRemoteMajorClass(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteMajorClass address' % self.name elif self.cmd == 'GetRemoteMinorClass': if len(self.cmd_args) == 1: print self.device.GetRemoteMinorClass(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteMinorClass address' % self.name elif self.cmd == 'GetRemoteServiceClasses': if len(self.cmd_args) == 1: print self.device.GetRemoteServiceClasses(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetRemoteServiceClasses address' % self.name elif self.cmd == 'SetRemoteAlias': if len(self.cmd_args) == 2: self.device.SetRemoteAlias(self.cmd_args[0], self.cmd_args[1]) else: print 'Usage: %s -i <dev> SetRemoteAlias address alias' % self.name elif self.cmd == 'ClearRemoteAlias': if len(self.cmd_args) == 1: print self.device.ClearRemoteAlias(self.cmd_args[0]) else: print 'Usage: %s -i <dev> ClearRemoteAlias address' % self.name elif self.cmd == 'LastSeen': if len(self.cmd_args) == 1: print self.device.LastSeen(self.cmd_args[0]) else: print 'Usage: %s -i <dev> LastSeen address' % self.name elif self.cmd == 'LastUsed': if len(self.cmd_args) == 1: print self.device.LastUsed(self.cmd_args[0]) else: print 'Usage: %s -i <dev> LastUsed address' % self.name elif self.cmd == 'DisconnectRemoteDevice': if len(self.cmd_args) == 1: print self.device.LastUsed(self.cmd_args[0]) else: print 'Usage: %s -i <dev> DisconnectRemoteDevice address' % self.name elif self.cmd == 'CreateBonding': if len(self.cmd_args) == 1: print self.device.CreateBonding(self.cmd_args[0]) else: print 'Usage: %s -i <dev> CreateBonding address' % self.name elif self.cmd == 'RemoveBonding': if len(self.cmd_args) == 1: print self.device.RemoveBonding(self.cmd_args[0]) else: print 'Usage: %s -i <dev> RemoveBonding address' % self.name elif self.cmd == 'CancelBondingProcess': if len(self.cmd_args) == 1: print self.device.CancelBondingProcess(self.cmd_args[0]) else: print 'Usage: %s -i <dev> CancelBondingProcess address' % self.name elif self.cmd == 'HasBonding': if len(self.cmd_args) == 1: print self.device.HasBonding(self.cmd_args[0]) else: print 'Usage: %s -i <dev> HasBonding address' % self.name elif self.cmd == 'ListBondings': bondings = self.device.ListBondings() for bond in bondings: print bond, elif self.cmd == 'GetPinCodeLength': if len(self.cmd_args) == 1: print self.device.GetPinCodeLength(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetPinCodeLength address' % self.name elif self.cmd == 'GetEncryptionKeySize': if len(self.cmd_args) == 1: print self.device.GetEncryptionKeySize(self.cmd_args[0]) else: print 'Usage: %s -i <dev> GetEncryptionKeySize address' % self.name elif self.cmd == 'DiscoverDevices': print self.device.DiscoverDevices() elif self.cmd == 'DiscoverDevicesWithoutNameResolving': print self.device.DiscoverDevicesWithoutNameResolving() elif self.cmd == 'ListRemoteDevices': devices = self.device.ListRemoteDevices() for device in devices: print device, elif self.cmd == 'ListRecentRemoteDevices': if len(self.cmd_args) == 1: devices = self.device.ListRecentRemoteDevices(self.cmd_args[0]) for device in devices: print device, else: print 'Usage: %s -i <dev> ListRecentRemoteDevices date' % self.name else: # FIXME: remove at future version print 'Script Error: Method %s not found. Maybe a mispelled word.' % (self.cmd_args) except dbus.DBusException, e: print '%s failed: %s' % (self.cmd, e) sys.exit(1) def run(self): # Manager methods if self.listen: self.dbus_mgr_sig_setup() self.dbus_dev_sig_setup() print 'Listening for events...' if self.cmd in mgr_cmds: try: self.dbus_mgr_setup() except dbus.DBusException, e: print 'Failed to setup manager interface: %s' % e sys.exit(1) self.call_mgr_dbus_func() elif self.cmd in dev_cmds: try: self.dbus_dev_setup() except dbus.DBusException, e: print 'Failed to setup device interface: %s' % e sys.exit(1) self.call_dev_dbus_func() elif not self.listen: print 'Unknown command: %s' % self.cmd self.usage() sys.exit(1) if self.listen: signal(SIGINT, self.signal_cb) signal(SIGTERM, self.signal_cb) self.main_loop = gobject.MainLoop() self.main_loop.run() if __name__ == '__main__': gobject.threads_init() dbus.glib.init_threads() tester = Tester(sys.argv) tester.run()