# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import dbus, gobject, sys import common from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros import session_manager from autotest_lib.client.cros import ownership """Utility class for tests that generate, push and fetch policies. As the python bindings for the protobufs used in policies are built as a part of tests that use them, callers must pass in their location at call time.""" def compare_policy_response(proto_binding_location, policy_response, owner=None, guests=None, new_users=None, roaming=None, whitelist=None, proxies=None): """Check the contents of |policy_response| against given args. Deserializes |policy_response| into a PolicyFetchResponse protobuf, with an embedded (serialized) PolicyData protobuf that embeds a (serialized) ChromeDeviceSettingsProto, and checks to see if this protobuf turducken contains the information passed in. @param proto_binding_location: the location of generated python bindings for policy protobufs. @param policy_response: string serialization of a PolicyData protobuf. @param owner: string representing the owner's name/account. @param guests: boolean indicating whether guests should be allowed. @param new_users: boolean indicating if user pods are on login screen. @param roaming: boolean indicating whether data roaming is enabled. @param whitelist: list of accounts that are allowed to log in. @param proxies: dictionary - { 'proxy_mode': <string> } @return True if |policy_response| has all the provided data, else False. """ # Pull in protobuf bindings. sys.path.append(proto_binding_location) from device_management_backend_pb2 import PolicyFetchResponse from device_management_backend_pb2 import PolicyData from chrome_device_policy_pb2 import ChromeDeviceSettingsProto from chrome_device_policy_pb2 import AllowNewUsersProto from chrome_device_policy_pb2 import GuestModeEnabledProto from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto from chrome_device_policy_pb2 import DataRoamingEnabledProto from chrome_device_policy_pb2 import DeviceProxySettingsProto response_proto = PolicyFetchResponse() response_proto.ParseFromString(policy_response) ownership.assert_has_policy_data(response_proto) data_proto = PolicyData() data_proto.ParseFromString(response_proto.policy_data) ownership.assert_has_device_settings(data_proto) if owner: ownership.assert_username(data_proto, owner) settings = ChromeDeviceSettingsProto() settings.ParseFromString(data_proto.policy_value) if guests: ownership.assert_guest_setting(settings, guests) if new_users: ownership.assert_show_users(settings, new_users) if roaming: ownership.assert_roaming(settings, roaming) if whitelist: ownership.assert_new_users(settings, False) ownership.assert_users_on_whitelist(settings, whitelist) if proxies: ownership.assert_proxy_settings(settings, proxies) def build_policy_data(proto_binding_location, owner=None, guests=None, new_users=None, roaming=None, whitelist=None, proxies=None): """Generate and serialize a populated device policy protobuffer. Creates a PolicyData protobuf, with an embedded ChromeDeviceSettingsProto, containing the information passed in. @param proto_binding_location: the location of generated python bindings for policy protobufs. @param owner: string representing the owner's name/account. @param guests: boolean indicating whether guests should be allowed. @param new_users: boolean indicating if user pods are on login screen. @param roaming: boolean indicating whether data roaming is enabled. @param whitelist: list of accounts that are allowed to log in. @param proxies: dictionary - { 'proxy_mode': <string> } @return serialization of the PolicyData proto that we build. """ # Pull in protobuf bindings. sys.path.append(proto_binding_location) from device_management_backend_pb2 import PolicyData from chrome_device_policy_pb2 import ChromeDeviceSettingsProto from chrome_device_policy_pb2 import AllowNewUsersProto from chrome_device_policy_pb2 import GuestModeEnabledProto from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto from chrome_device_policy_pb2 import DataRoamingEnabledProto from chrome_device_policy_pb2 import DeviceProxySettingsProto data_proto = PolicyData() data_proto.policy_type = ownership.POLICY_TYPE if owner: data_proto.username = owner settings = ChromeDeviceSettingsProto() if guests: settings.guest_mode_enabled.guest_mode_enabled = guests if new_users: settings.show_user_names.show_user_names = new_users if roaming: settings.data_roaming_enabled.data_roaming_enabled = roaming if whitelist: settings.allow_new_users.allow_new_users = False for user in whitelist: settings.user_whitelist.user_whitelist.append(user) if proxies: settings.device_proxy_settings.proxy_mode = proxies['proxy_mode'] data_proto.policy_value = settings.SerializeToString() return data_proto.SerializeToString() def generate_policy(proto_binding_location, key, pubkey, policy, old_key=None): """Generate and serialize a populated, signed device policy protobuffer. Creates a protobuf containing the device policy |policy|, signed with |key|. Also includes the public key |pubkey|, signed with |old_key| if provided. If not, |pubkey| is signed with |key|. The protobuf is serialized to a string and returned. @param proto_binding_location: the location of generated python bindings for policy protobufs. @param key: new policy signing key. @param pubkey: new public key to be signed and embedded in generated PolicyFetchResponse. @param policy: policy data to be embedded in generated PolicyFetchResponse. @param old_key: if provided, this implies the generated PolicyFetchRespone is intended to represent a key rotation. pubkey will be signed with this key before embedding. @return serialization of the PolicyFetchResponse proto that we build. """ # Pull in protobuf bindings. sys.path.append(proto_binding_location) from device_management_backend_pb2 import PolicyFetchResponse if old_key == None: old_key = key policy_proto = PolicyFetchResponse() policy_proto.policy_data = policy policy_proto.policy_data_signature = ownership.sign(key, policy) policy_proto.new_public_key = pubkey policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) return policy_proto.SerializeToString() def push_policy_and_verify(policy_string, sm): """Push a device policy to the session manager over DBus. The serialized device policy |policy_string| is sent to the session manager with the StorePolicy DBus call. Success of the store is validated by fetching the policy again and comparing. @param policy_string: serialized policy to push to the session manager. @param sm: a connected SessionManagerInterface. @raises error.TestFail if policy push failed. """ listener = session_manager.OwnershipSignalListener(gobject.MainLoop()) listener.listen_for_new_policy() sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True) listener.wait_for_signals(desc='Policy push.') retrieved_policy = sm.RetrievePolicy(byte_arrays=True) if retrieved_policy != policy_string: raise error.TestFail('Policy should not be %s' % retrieved_policy) def get_policy(sm): """Get a device policy from the session manager over DBus. Provided mainly for symmetry with push_policy_and_verify(). @param sm: a connected SessionManagerInterface. @return Serialized PolicyFetchResponse. """ return sm.RetrievePolicy(byte_arrays=True)