普通文本  |  138行  |  4.99 KB

#!/usr/bin/python
#pylint: disable-msg=C0111

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import unittest

import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import rdb_testing_utils
from autotest_lib.scheduler import scheduler_models
from autotest_lib.scheduler.shard import shard_client


class ShardClientIntegrationTest(rdb_testing_utils.AbstractBaseRDBTester,
                                 unittest.TestCase):
    """Integration tests for the shard_client."""


    def setup_global_config(self):
        """Mock out global_config for shard client creation."""
        global_config.global_config.override_config_value(
                'SHARD', 'is_slave_shard', 'True')
        global_config.global_config.override_config_value(
                'SHARD', 'shard_hostname', 'host1')


    def initialize_shard_client(self):
        self.setup_global_config()
        return shard_client.get_shard_client()


    def testCompleteStatusBasic(self):
        """Test that complete jobs are uploaded properly."""

        client = self.initialize_shard_client()
        job = self.create_job(deps=set(['a']), shard_hostname=client.hostname)
        scheduler_models.initialize()
        hqe = scheduler_models.HostQueueEntry.fetch(
                where='job_id = %s' % job.id)[0]

        # This should set both the shard_id and the complete bit.
        hqe.set_status('Completed')

        # Only incomplete jobs should be in known ids.
        job_ids, host_ids, _ = client._get_known_jobs_and_hosts()
        assert(job_ids == [])

        # Jobs that have successfully gone through a set_status should
        # be ready for upload.
        jobs = client._get_jobs_to_upload()
        assert(job.id in [j.id for j in jobs])


    def testOnlyShardId(self):
        """Test that setting only the shardid prevents the job from upload."""

        client = self.initialize_shard_client()
        job = self.create_job(deps=set(['a']), shard_hostname=client.hostname)
        scheduler_models.initialize()
        hqe = scheduler_models.HostQueueEntry.fetch(
                where='job_id = %s' % job.id)[0]

        def _local_update_field(hqe, field_name, value):
            """Turns update_field on the complete field into a no-op."""
            if field_name == 'complete':
                return
            models.HostQueueEntry.objects.filter(id=hqe.id).update(
                    **{field_name: value})
            setattr(hqe, field_name, value)

        self.god.stub_with(scheduler_models.HostQueueEntry, 'update_field',
                _local_update_field)

        # This should only update the shard_id.
        hqe.set_status('Completed')

        # Retrieve the hqe along an independent code path so we're assured of
        # freshness, then make sure it has shard=None and an unset complete bit.
        modified_hqe = self.db_helper.get_hqes(job_id=job.id)[0]
        assert(modified_hqe.id == hqe.id and
               modified_hqe.complete == 0 and
               modified_hqe.job.shard == None)

        # Make sure the job with a shard but without complete is still
        # in known_ids.
        job_ids, host_ids, _ = client._get_known_jobs_and_hosts()
        assert(set(job_ids) == set([job.id]))

        # Make sure the job with a shard but without complete is not
        # in uploaded jobs.
        jobs = client._get_jobs_to_upload()
        assert(jobs == [])


    def testHostSerialization(self):
        """Test simple host serialization."""
        client = self.initialize_shard_client()
        host = self.db_helper.create_host(name='test_host')
        serialized_host = host.serialize()
        models.Host.objects.all().delete()
        models.Host.deserialize(serialized_host)
        models.Host.objects.get(hostname='test_host')


    def testUserExists(self):
        """Test user related race conditions."""
        client = self.initialize_shard_client()
        user = self.db_helper.create_user(name='test_user')
        serialized_user = user.serialize()

        # Master sends a user with the same login but different id
        serialized_user['id'] = '3'
        models.User.deserialize(serialized_user)
        models.User.objects.get(id=3, login='test_user')

        # Master sends a user with the same id, different login
        serialized_user['login'] = 'fake_user'
        models.User.deserialize(serialized_user)
        models.User.objects.get(id=3, login='fake_user')

        # Master sends a new user
        user = self.db_helper.create_user(name='new_user')
        serialized_user = user.serialize()
        models.User.objects.all().delete()
        models.User.deserialize(serialized_user)
        models.User.objects.get(login='new_user')


if __name__ == '__main__':
    unittest.main()