# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import dbus import logging import pipes import re import shlex from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error # Represents the result of a dbus-send call. |sender| refers to the temporary # bus name of dbus-send, |responder| to the remote process, and |response| # contains the parsed response. DBusSendResult = collections.namedtuple('DBusSendResult', ['sender', 'responder', 'response']) # Used internally. DictEntry = collections.namedtuple('DictEntry', ['key', 'value']) def _build_token_stream(headerless_dbus_send_output): """A tokenizer for dbus-send output. The output is basically just like splitting on whitespace, except that strings are kept together by " characters. @param headerless_dbus_send_output: list of lines of dbus-send output without the meta-information prefix. @return list of tokens in dbus-send output. """ return shlex.split(' '.join(headerless_dbus_send_output)) def _parse_value(token_stream): """Turn a stream of tokens from dbus-send output into native python types. @param token_stream: output from _build_token_stream() above. """ # Assumes properly tokenized output (strings with spaces handled). # Assumes tokens are pre-stripped token_type = token_stream.pop(0) if token_type == 'variant': token_type = token_stream.pop(0) if token_type == 'object': token_type = token_stream.pop(0) # Should be 'path' token_value = token_stream.pop(0) INT_TYPES = ('int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'byte') if token_type in INT_TYPES: return int(token_value) if token_type == 'string' or token_type == 'path': return token_value # shlex removed surrounding " chars. if token_type == 'boolean': return token_value == 'true' if token_type == 'double': return float(token_value) if token_type == 'array': values = [] while token_stream[0] != ']': values.append(_parse_value(token_stream)) token_stream.pop(0) if values and all([isinstance(x, DictEntry) for x in values]): values = dict(values) return values if token_type == 'dict': assert token_value == 'entry(' key = _parse_value(token_stream) value = _parse_value(token_stream) assert token_stream.pop(0) == ')' return DictEntry(key=key, value=value) raise error.TestError('Unhandled DBus type found: %s' % token_type) def _parse_dbus_send_output(dbus_send_stdout): """Turn dbus-send output into usable Python types. This looks like: localhost ~ # dbus-send --system --dest=org.chromium.flimflam \ --print-reply --reply-timeout=2000 / \ org.chromium.flimflam.Manager.GetProperties method return sender=:1.12 -> dest=:1.37 reply_serial=2 array [ dict entry( string "ActiveProfile" variant string "/profile/default" ) dict entry( string "ArpGateway" variant boolean true ) ... ] @param dbus_send_output: string stdout from dbus-send @return a DBusSendResult. """ lines = dbus_send_stdout.strip().splitlines() # The first line contains meta-information about the response header = lines[0] lines = lines[1:] dbus_address_pattern = '[:\d\\.]+' match = re.match('method return sender=(%s) -> dest=(%s) reply_serial=\d+' % (dbus_address_pattern, dbus_address_pattern), header) if match is None: raise error.TestError('Could not parse dbus-send header: %s' % header) sender = match.group(1) responder = match.group(2) token_stream = _build_token_stream(lines) ret_val = _parse_value(token_stream) # Note that DBus permits multiple response values, and this is not handled. logging.debug('Got DBus response: %r', ret_val) return DBusSendResult(sender=sender, responder=responder, response=ret_val) def _build_arg_string(raw_args): """Construct a string of arguments to a DBus method as dbus-send expects. @param raw_args list of dbus.* type objects to seriallize. @return string suitable for dbus-send. """ dbus.Boolean int_map = { dbus.Int16: 'int16:', dbus.Int32: 'int32:', dbus.Int64: 'int64:', dbus.UInt16: 'uint16:', dbus.UInt32: 'uint32:', dbus.UInt64: 'uint64:', dbus.Double: 'double:', dbus.Byte: 'byte:', } arg_list = [] for arg in raw_args: if isinstance(arg, dbus.String): arg_list.append(pipes.quote('string:%s' % arg.replace('"', r'\"'))) continue if isinstance(arg, dbus.Boolean): if arg: arg_list.append('boolean:true') else: arg_list.append('boolean:false') continue for prim_type, prefix in int_map.iteritems(): if isinstance(arg, prim_type): arg_list.append(prefix + str(arg)) continue raise error.TestError('No support for serializing %r' % arg) return ' '.join(arg_list) def dbus_send(bus_name, interface, object_path, method_name, args=None, host=None, timeout_seconds=2, tolerate_failures=False): """Call dbus-send without arguments. @param bus_name: string identifier of DBus connection to send a message to. @param interface: string DBus interface of object to call method on. @param object_path: string DBus path of remote object to call method on. @param method_name: string name of method to call. @param args: optional list of arguments. Arguments must be of types from the python dbus module. @param host: An optional host object if running against a remote host. @param timeout_seconds: number of seconds to wait for a response. @param tolerate_failures: boolean True to ignore problems receiving a response. """ run = utils.run if host is None else host.run cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s ' '%s %s.%s' % (int(timeout_seconds * 1000), bus_name, object_path, interface, method_name)) if args is not None: cmd = cmd + ' ' + _build_arg_string(args) result = run(cmd, ignore_status=tolerate_failures) if result.exit_status != 0: logging.debug('%r', result.stdout) return None return _parse_dbus_send_output(result.stdout) def get_property(bus_name, interface, object_path, property_name, host=None): """A helpful wrapper that extracts the value of a DBus property. @param bus_name: string identifier of DBus connection to send a message to. @param interface: string DBus interface exposing the property. @param object_path: string DBus path of remote object to call method on. @param property_name: string name of property to get. @param host: An optional host object if running against a remote host. """ return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get', args=[dbus.String(interface), dbus.String(property_name)], host=host)