普通文本  |  141行  |  5.84 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dbus, grp, os, pwd, stat
from dbus.mainloop.glib import DBusGMainLoop

from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import policy, session_manager
from autotest_lib.client.cros import cros_ui, cryptohome, ownership


class login_UserPolicyKeys(test.test):
    """Verifies that, after user policy is pushed, the user policy key winds
       up stored in the right place.
    """
    version = 1

    def _can_read(self, uid, gid, info):
        """Returns true if uid or gid can read a file with the info stat."""
        if uid == info.st_uid:
            return info.st_mode & stat.S_IRUSR
        if gid == info.st_gid:
            return info.st_mode & stat.S_IRGRP
        return info.st_mode & stat.S_IROTH


    def _can_execute(self, uid, gid, info):
        """Returns true if uid or gid can execute a file with the info stat."""
        if uid == info.st_uid:
            return info.st_mode & stat.S_IXUSR
        if gid == info.st_gid:
            return info.st_mode & stat.S_IXGRP
        return info.st_mode & stat.S_IXOTH


    def _verify_key_file(self, key_file):
        """Verifies that the key file has been created and is readable."""
        if not os.path.isfile(key_file):
            raise error.TestFail('%s does not exist!' % key_file)
        # And is readable by chronos.
        chronos_uid = pwd.getpwnam('chronos').pw_uid
        chronos_gid = grp.getgrnam('chronos').gr_gid
        info = os.stat(key_file)
        if not stat.S_ISREG(info.st_mode):
            raise error.TestFail('%s is not a regular file' % key_file)
        if not self._can_read(chronos_uid, chronos_gid, info):
            raise error.TestFail('chronos can\' read %s, mode is %s' %
                                 (key_file, oct(info.st_mode)))
        # All the parent directories must be executable by chronos.
        current = key_file
        parent = os.path.dirname(current)
        while current != parent:
            current = parent
            parent = os.path.dirname(parent)
            info = os.stat(current)
            mode = stat.S_IMODE(info.st_mode)
            if not self._can_execute(chronos_uid, chronos_gid, info):
                raise error.TestFail('chronos can\'t execute %s, mode is %s' %
                                     (current, oct(info.st_mode)))


    def setup(self):
        os.chdir(self.srcdir)
        utils.make('OUT_DIR=.')


    def initialize(self):
        super(login_UserPolicyKeys, self).initialize()
        self._bus_loop = DBusGMainLoop(set_as_default=True)
        self._cryptohome_proxy = cryptohome.CryptohomeProxy(self._bus_loop)

        # Clear the user's vault, to make sure the test starts without any
        # policy or key lingering around. At this stage the session isn't
        # started and there's no user signed in.
        ownership.restart_ui_to_clear_ownership_files()
        self._cryptohome_proxy.remove(ownership.TESTUSER)


    def run_once(self):
        # Mount the vault, connect to session_manager and start the session.
        self._cryptohome_proxy.mount(ownership.TESTUSER,
                                     ownership.TESTPASS,
                                     create=True)
        sm = session_manager.connect(self._bus_loop)
        sm.StartSession(ownership.TESTUSER, '')

        # No policy stored yet.
        retrieved_policy = sm.RetrievePolicyForUser(ownership.TESTUSER,
                                                    byte_arrays=True)
        if retrieved_policy:
            raise error.TestError('session_manager already has user policy!')

        # And no user key exists.
        key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER)
        if os.path.exists(key_file):
            raise error.TestFail('%s exists before storing user policy!' %
                                 key_file)

        # Now store a policy. This is building a device policy protobuf, but
        # that's fine as far as the session_manager is concerned; it's the
        # outer PolicyFetchResponse that contains the public_key.
        public_key = ownership.known_pubkey()
        private_key = ownership.known_privkey()
        policy_data = policy.build_policy_data(self.srcdir)
        policy_response = policy.generate_policy(self.srcdir,
                                                 private_key,
                                                 public_key,
                                                 policy_data)
        try:
            sm.StorePolicyForUser(ownership.TESTUSER,
                                  dbus.ByteArray(policy_response))
        except dbus.exceptions.DBusException as e:
            raise error.TestFail('Failed to store user policy', e)

        # The policy key should have been created now.
        self._verify_key_file(key_file)

        # Restart the ui; the key should be deleted.
        self._cryptohome_proxy.unmount(ownership.TESTUSER)
        cros_ui.restart()
        if os.path.exists(key_file):
            raise error.TestFail('%s exists after restarting ui!' %
                                 key_file)

        # Starting a new session will restore the key that was previously
        # stored. Reconnect to the session_manager, since the restart killed it.
        self._cryptohome_proxy.mount(ownership.TESTUSER,
                                     ownership.TESTPASS,
                                     create=True)
        sm = session_manager.connect(self._bus_loop)
        sm.StartSession(ownership.TESTUSER, '')
        self._verify_key_file(key_file)


    def cleanup(self):
        cros_ui.restart()
        self._cryptohome_proxy.remove(ownership.TESTUSER)
        super(login_UserPolicyKeys, self).cleanup()