普通文本  |  103行  |  4.07 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dbus
from dbus.mainloop.glib import DBusGMainLoop

from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import policy, session_manager
from autotest_lib.client.cros import cryptohome, ownership


class login_MultiUserPolicy(test.test):
    """Verifies that storing and retrieving user policy works with
       multiple profiles signed-in.
    """

    version = 1

    _user1 = 'user1@somewhere.com'
    _user2 = 'user2@somewhere.com'


    def initialize(self):
        super(login_MultiUserPolicy, self).initialize()
        policy.install_protobufs(self.autodir, self.job)
        self._bus_loop = DBusGMainLoop(set_as_default=True)

        # Clear the user's vault, to make sure the test starts without any
        # policy or key lingering around. At this stage the session isn't
        # started and there's no user signed in.
        ownership.restart_ui_to_clear_ownership_files()
        cryptohome_proxy = cryptohome.CryptohomeProxy(
            self._bus_loop, self.autodir, self.job)
        cryptohome_proxy.ensure_clean_cryptohome_for(self._user1)
        cryptohome_proxy.ensure_clean_cryptohome_for(self._user2)


    def run_once(self):
        sm = session_manager.connect(self._bus_loop)

        # Start a session for the first user, and verify that no policy exists
        # for that user yet.
        sm.StartSession(self._user1, '')
        policy_blob = sm.RetrievePolicyEx(
                session_manager.make_user_policy_descriptor(self._user1),
                byte_arrays=True)
        if policy_blob:
            raise error.TestError('session_manager already has user policy!')

        # Now store a policy. This is building a device policy protobuf, but
        # that's fine as far as the session_manager is concerned; it's the
        # outer PolicyFetchResponse that contains the public_key.
        public_key = ownership.known_pubkey()
        private_key = ownership.known_privkey()
        policy_data = policy.build_policy_data()
        policy_response = policy.generate_policy(private_key,
                                                 public_key,
                                                 policy_data)
        try:
            sm.StorePolicyEx(
                session_manager.make_user_policy_descriptor(self._user1),
                dbus.ByteArray(policy_response))
        except dbus.exceptions.DBusException as e:
            raise error.TestFail('Call to StorePolicyEx failed', e)

        # Storing policy for the second user fails before his session starts.
        try:
            sm.StorePolicyEx(
                session_manager.make_user_policy_descriptor(self._user2),
                dbus.ByteArray(policy_response))
        except dbus.exceptions.DBusException:
            pass
        else:
            raise error.TestFail('Storing policy should fail before the '
                                 'session is started')


        # Now start the second user's session, and verify that he has no
        # policy stored yet.
        sm.StartSession(self._user2, '')
        policy_blob = sm.RetrievePolicyEx(
                session_manager.make_user_policy_descriptor(self._user2),
                byte_arrays=True)
        if policy_blob:
            raise error.TestError('session_manager already has user policy!')

        # Storing works now.
        try:
            sm.StorePolicyEx(
                session_manager.make_user_policy_descriptor(self._user2),
                dbus.ByteArray(policy_response))
        except dbus.exceptions.DBusException as e:
            raise error.TestFail('Call to StorePolicyEx failed', e)

        # Verify that retrieving policy works too.
        policy_blob = sm.RetrievePolicyEx(
                session_manager.make_user_policy_descriptor(self._user2),
                byte_arrays=True)
        if not policy_blob:
            raise error.TestError('Failed to retrieve stored policy')