普通文本  |  767行  |  29.99 KB

from django.db import models as dbmodels, connection
from autotest_lib.frontend.afe import model_logic, readonly_connection

_quote_name = connection.ops.quote_name

class TempManager(model_logic.ExtendedManager):
    """A Temp Manager."""
    _GROUP_COUNT_NAME = 'group_count'

    def _get_key_unless_is_function(self, field):
        if '(' in field:
            return field
        return self.get_key_on_this_table(field)


    def _get_field_names(self, fields, extra_select_fields={}):
        field_names = []
        for field in fields:
            if field in extra_select_fields:
                field_names.append(extra_select_fields[field][0])
            else:
                field_names.append(self._get_key_unless_is_function(field))
        return field_names


    def _get_group_query_sql(self, query, group_by):
        compiler = query.query.get_compiler(using=query.db)
        sql, params = compiler.as_sql()


        # insert GROUP BY clause into query
        group_fields = self._get_field_names(group_by, query.query.extra_select)
        group_by_clause = ' GROUP BY ' + ', '.join(group_fields)
        group_by_position = sql.rfind('ORDER BY')
        if group_by_position == -1:
            group_by_position = len(sql)
        sql = (sql[:group_by_position] +
               group_by_clause + ' ' +
               sql[group_by_position:])

        return sql, params


    def _get_column_names(self, cursor):
        """Gets the column names from the cursor description.

        This method exists so that it can be mocked in the unit test for
        sqlite3 compatibility.

        """
        return [column_info[0] for column_info in cursor.description]


    def execute_group_query(self, query, group_by):
        """Performs the given query grouped by the specified fields.

        The given query's extra select fields are added.

        @param query: The query to perform.
        @param group_by: The fields by which to group.

        @return A list of dicts, where each dict corresponds to single row and
            contains a key for each grouped field as well as all of the extra
            select fields.

        """
        sql, params = self._get_group_query_sql(query, group_by)
        cursor = readonly_connection.cursor()
        cursor.execute(sql, params)
        field_names = self._get_column_names(cursor)
        row_dicts = [dict(zip(field_names, row)) for row in cursor.fetchall()]
        return row_dicts


    def get_count_sql(self, query):
        """Get SQL to select a per-group count of unique matches for a query.

        @param query: The query to use.

        @return A tuple (field alias, field SQL).

        """
        if query.query.distinct:
            pk_field = self.get_key_on_this_table()
            count_sql = 'COUNT(DISTINCT %s)' % pk_field
        else:
            count_sql = 'COUNT(1)'
        return self._GROUP_COUNT_NAME, count_sql


    def _get_num_groups_sql(self, query, group_by):
        group_fields = self._get_field_names(group_by, query.query.extra_select)
        query = query.order_by() # this can mess up the query and isn't needed

        compiler = query.query.get_compiler(using=query.db)
        sql, params = compiler.as_sql()
        from_ = sql[sql.find(' FROM'):]
        return ('SELECT DISTINCT %s %s' % (','.join(group_fields),
                                                  from_),
                params)


    def _cursor_rowcount(self, cursor):
        """To be stubbed by tests"""
        return cursor.rowcount


    def get_num_groups(self, query, group_by):
        """Gets the number of distinct groups for a query.

        @param query: The query to use.
        @param group_by: The fields by which to group.

        @return The number of distinct groups for the given query grouped by
            the fields in group_by.

        """
        sql, params = self._get_num_groups_sql(query, group_by)
        cursor = readonly_connection.cursor()
        cursor.execute(sql, params)
        return self._cursor_rowcount(cursor)


class Machine(dbmodels.Model):
    """Models a machine."""
    machine_idx = dbmodels.AutoField(primary_key=True)
    hostname = dbmodels.CharField(unique=True, max_length=255)
    machine_group = dbmodels.CharField(blank=True, max_length=240)
    owner = dbmodels.CharField(blank=True, max_length=240)

    class Meta:
        """Metadata for class Machine."""
        db_table = 'tko_machines'


class Kernel(dbmodels.Model):
    """Models a kernel."""
    kernel_idx = dbmodels.AutoField(primary_key=True)
    kernel_hash = dbmodels.CharField(max_length=105, editable=False)
    base = dbmodels.CharField(max_length=90)
    printable = dbmodels.CharField(max_length=300)

    class Meta:
        """Metadata for class Kernel."""
        db_table = 'tko_kernels'


class Patch(dbmodels.Model):
    """Models a patch."""
    kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')
    name = dbmodels.CharField(blank=True, max_length=240)
    url = dbmodels.CharField(blank=True, max_length=900)
    the_hash = dbmodels.CharField(blank=True, max_length=105, db_column='hash')

    class Meta:
        """Metadata for class Patch."""
        db_table = 'tko_patches'


class Status(dbmodels.Model):
    """Models a status."""
    status_idx = dbmodels.AutoField(primary_key=True)
    word = dbmodels.CharField(max_length=30)

    class Meta:
        """Metadata for class Status."""
        db_table = 'tko_status'


class Job(dbmodels.Model, model_logic.ModelExtensions):
    """Models a job."""
    job_idx = dbmodels.AutoField(primary_key=True)
    tag = dbmodels.CharField(unique=True, max_length=100)
    label = dbmodels.CharField(max_length=300)
    username = dbmodels.CharField(max_length=240)
    machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')
    queued_time = dbmodels.DateTimeField(null=True, blank=True)
    started_time = dbmodels.DateTimeField(null=True, blank=True)
    finished_time = dbmodels.DateTimeField(null=True, blank=True)
    afe_job_id = dbmodels.IntegerField(null=True, default=None)

    objects = model_logic.ExtendedManager()

    class Meta:
        """Metadata for class Job."""
        db_table = 'tko_jobs'


class JobKeyval(dbmodels.Model):
    """Models a job keyval."""
    job = dbmodels.ForeignKey(Job)
    key = dbmodels.CharField(max_length=90)
    value = dbmodels.CharField(blank=True, max_length=300)

    class Meta:
        """Metadata for class JobKeyval."""
        db_table = 'tko_job_keyvals'


class Test(dbmodels.Model, model_logic.ModelExtensions,
           model_logic.ModelWithAttributes):
    """Models a test."""
    test_idx = dbmodels.AutoField(primary_key=True)
    job = dbmodels.ForeignKey(Job, db_column='job_idx')
    test = dbmodels.CharField(max_length=300)
    subdir = dbmodels.CharField(blank=True, max_length=300)
    kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')
    status = dbmodels.ForeignKey(Status, db_column='status')
    reason = dbmodels.CharField(blank=True, max_length=3072)
    machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')
    finished_time = dbmodels.DateTimeField(null=True, blank=True)
    started_time = dbmodels.DateTimeField(null=True, blank=True)
    invalid = dbmodels.BooleanField(default=False)
    invalidates_test = dbmodels.ForeignKey(
            'self', null=True, db_column='invalidates_test_idx',
            related_name='invalidates_test_set')

    objects = model_logic.ExtendedManager()

    def _get_attribute_model_and_args(self, attribute):
        return TestAttribute, dict(test=self, attribute=attribute,
                                   user_created=True)


    def set_attribute(self, attribute, value):
        # ensure non-user-created attributes remain immutable
        try:
            TestAttribute.objects.get(test=self, attribute=attribute,
                                      user_created=False)
            raise ValueError('Attribute %s already exists for test %s and is '
                             'immutable' % (attribute, self.test_idx))
        except TestAttribute.DoesNotExist:
            super(Test, self).set_attribute(attribute, value)

    class Meta:
        """Metadata for class Test."""
        db_table = 'tko_tests'


class TestAttribute(dbmodels.Model, model_logic.ModelExtensions):
    """Models a test attribute."""
    test = dbmodels.ForeignKey(Test, db_column='test_idx')
    attribute = dbmodels.CharField(max_length=90)
    value = dbmodels.CharField(blank=True, max_length=300)
    user_created = dbmodels.BooleanField(default=False)

    objects = model_logic.ExtendedManager()

    class Meta:
        """Metadata for class TestAttribute."""
        db_table = 'tko_test_attributes'


class IterationAttribute(dbmodels.Model, model_logic.ModelExtensions):
    """Models an iteration attribute."""
    # This isn't really a primary key, but it's necessary to appease Django
    # and is harmless as long as we're careful.
    test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)
    iteration = dbmodels.IntegerField()
    attribute = dbmodels.CharField(max_length=90)
    value = dbmodels.CharField(blank=True, max_length=300)

    objects = model_logic.ExtendedManager()

    class Meta:
        """Metadata for class IterationAttribute."""
        db_table = 'tko_iteration_attributes'


class IterationResult(dbmodels.Model, model_logic.ModelExtensions):
    """Models an iteration result."""
    # See comment on IterationAttribute regarding primary_key=True.
    test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)
    iteration = dbmodels.IntegerField()
    attribute = dbmodels.CharField(max_length=256)
    value = dbmodels.FloatField(null=True, blank=True)

    objects = model_logic.ExtendedManager()

    class Meta:
        """Metadata for class IterationResult."""
        db_table = 'tko_iteration_result'


class TestLabel(dbmodels.Model, model_logic.ModelExtensions):
    """Models a test label."""
    name = dbmodels.CharField(max_length=80, unique=True)
    description = dbmodels.TextField(blank=True)
    tests = dbmodels.ManyToManyField(Test, blank=True,
                                     db_table='tko_test_labels_tests')

    name_field = 'name'
    objects = model_logic.ExtendedManager()

    class Meta:
        """Metadata for class TestLabel."""
        db_table = 'tko_test_labels'


class SavedQuery(dbmodels.Model, model_logic.ModelExtensions):
    """Models a saved query."""
    # TODO: change this to foreign key once DBs are merged.
    owner = dbmodels.CharField(max_length=80)
    name = dbmodels.CharField(max_length=100)
    url_token = dbmodels.TextField()

    class Meta:
        """Metadata for class SavedQuery."""
        db_table = 'tko_saved_queries'


class EmbeddedGraphingQuery(dbmodels.Model, model_logic.ModelExtensions):
    """Models an embedded graphing query."""
    url_token = dbmodels.TextField(null=False, blank=False)
    graph_type = dbmodels.CharField(max_length=16, null=False, blank=False)
    params = dbmodels.TextField(null=False, blank=False)
    last_updated = dbmodels.DateTimeField(null=False, blank=False,
                                          editable=False)
    # refresh_time shows the time at which a thread is updating the cached
    # image, or NULL if no one is updating the image. This is used so that only
    # one thread is updating the cached image at a time (see
    # graphing_utils.handle_plot_request).
    refresh_time = dbmodels.DateTimeField(editable=False)
    cached_png = dbmodels.TextField(editable=False)

    class Meta:
        """Metadata for class EmbeddedGraphingQuery."""
        db_table = 'tko_embedded_graphing_queries'


# Views.

class TestViewManager(TempManager):
    """A Test View Manager."""

    def get_query_set(self):
        query = super(TestViewManager, self).get_query_set()

        # add extra fields to selects, using the SQL itself as the "alias"
        extra_select = dict((sql, sql)
                            for sql in self.model.extra_fields.iterkeys())
        return query.extra(select=extra_select)


    def _get_include_exclude_suffix(self, exclude):
        if exclude:
            return '_exclude'
        return '_include'


    def _add_attribute_join(self, query_set, join_condition,
                            suffix=None, exclude=False):
        if suffix is None:
            suffix = self._get_include_exclude_suffix(exclude)
        return self.add_join(query_set, 'tko_test_attributes',
                             join_key='test_idx',
                             join_condition=join_condition,
                             suffix=suffix, exclude=exclude)


    def _add_label_pivot_table_join(self, query_set, suffix, join_condition='',
                                    exclude=False, force_left_join=False):
        return self.add_join(query_set, 'tko_test_labels_tests',
                             join_key='test_id',
                             join_condition=join_condition,
                             suffix=suffix, exclude=exclude,
                             force_left_join=force_left_join)


    def _add_label_joins(self, query_set, suffix=''):
        query_set = self._add_label_pivot_table_join(
                query_set, suffix=suffix, force_left_join=True)

        # since we're not joining from the original table, we can't use
        # self.add_join() again
        second_join_alias = 'tko_test_labels' + suffix
        second_join_condition = ('%s.id = %s.testlabel_id' %
                                 (second_join_alias,
                                  'tko_test_labels_tests' + suffix))
        query_set.query.add_custom_join('tko_test_labels',
                                        second_join_condition,
                                        query_set.query.LOUTER,
                                        alias=second_join_alias)
        return query_set


    def _get_label_ids_from_names(self, label_names):
        label_ids = list( # listifying avoids a double query below
                TestLabel.objects.filter(name__in=label_names)
                .values_list('name', 'id'))
        if len(label_ids) < len(set(label_names)):
            raise ValueError('Not all labels found: %s' %
                             ', '.join(label_names))
        return dict(name_and_id for name_and_id in label_ids)


    def _include_or_exclude_labels(self, query_set, label_names, exclude=False):
        label_ids = self._get_label_ids_from_names(label_names).itervalues()
        suffix = self._get_include_exclude_suffix(exclude)
        condition = ('tko_test_labels_tests%s.testlabel_id IN (%s)' %
                     (suffix,
                      ','.join(str(label_id) for label_id in label_ids)))
        return self._add_label_pivot_table_join(query_set,
                                                join_condition=condition,
                                                suffix=suffix,
                                                exclude=exclude)


    def _add_custom_select(self, query_set, select_name, select_sql):
        return query_set.extra(select={select_name: select_sql})


    def _add_select_value(self, query_set, alias):
        return self._add_custom_select(query_set, alias,
                                       _quote_name(alias) + '.value')


    def _add_select_ifnull(self, query_set, alias, non_null_value):
        select_sql = "IF(%s.id IS NOT NULL, '%s', NULL)" % (_quote_name(alias),
                                                            non_null_value)
        return self._add_custom_select(query_set, alias, select_sql)


    def _join_test_label_column(self, query_set, label_name, label_id):
        alias = 'test_label_' + label_name
        label_query = TestLabel.objects.filter(name=label_name)
        query_set = Test.objects.join_custom_field(query_set, label_query,
                                                   alias)

        query_set = self._add_select_ifnull(query_set, alias, label_name)
        return query_set


    def _join_test_label_columns(self, query_set, label_names):
        label_id_map = self._get_label_ids_from_names(label_names)
        for label_name in label_names:
            query_set = self._join_test_label_column(query_set, label_name,
                                                     label_id_map[label_name])
        return query_set


    def _join_test_attribute(self, query_set, attribute, alias=None,
                             extra_join_condition=None):
        """
        Join the given TestView QuerySet to TestAttribute.  The resulting query
        has an additional column for the given attribute named
        "attribute_<attribute name>".
        """
        if not alias:
            alias = 'test_attribute_' + attribute
        attribute_query = TestAttribute.objects.filter(attribute=attribute)
        if extra_join_condition:
            attribute_query = attribute_query.extra(
                    where=[extra_join_condition])
        query_set = Test.objects.join_custom_field(query_set, attribute_query,
                                                   alias)

        query_set = self._add_select_value(query_set, alias)
        return query_set


    def _join_machine_label_columns(self, query_set, machine_label_names):
        for label_name in machine_label_names:
            alias = 'machine_label_' + label_name
            condition = "FIND_IN_SET('%s', %s)" % (
                    label_name, _quote_name(alias) + '.value')
            query_set = self._join_test_attribute(
                    query_set, 'host-labels',
                    alias=alias, extra_join_condition=condition)
            query_set = self._add_select_ifnull(query_set, alias, label_name)
        return query_set


    def _join_one_iteration_key(self, query_set, result_key, first_alias=None):
        alias = 'iteration_result_' + result_key
        iteration_query = IterationResult.objects.filter(attribute=result_key)
        if first_alias:
            # after the first join, we need to match up iteration indices,
            # otherwise each join will expand the query by the number of
            # iterations and we'll have extraneous rows
            iteration_query = iteration_query.extra(
                    where=['%s.iteration = %s.iteration'
                           % (_quote_name(alias), _quote_name(first_alias))])

        query_set = Test.objects.join_custom_field(query_set, iteration_query,
                                                   alias, left_join=False)
        # select the iteration value and index for this join
        query_set = self._add_select_value(query_set, alias)
        if not first_alias:
            # for first join, add iteration index select too
            query_set = self._add_custom_select(
                    query_set, 'iteration_index',
                    _quote_name(alias) + '.iteration')

        return query_set, alias


    def _join_iteration_results(self, test_view_query_set, result_keys):
        """Join the given TestView QuerySet to IterationResult for one result.

        The resulting query looks like a TestView query but has one row per
        iteration.  Each row includes all the attributes of TestView, an
        attribute for each key in result_keys and an iteration_index attribute.

        We accomplish this by joining the TestView query to IterationResult
        once per result key.  Each join is restricted on the result key (and on
        the test index, like all one-to-many joins).  For the first join, this
        is the only restriction, so each TestView row expands to a row per
        iteration (per iteration that includes the key, of course).  For each
        subsequent join, we also restrict the iteration index to match that of
        the initial join.  This makes each subsequent join produce exactly one
        result row for each input row.  (This assumes each iteration contains
        the same set of keys.  Results are undefined if that's not true.)
        """
        if not result_keys:
            return test_view_query_set

        query_set, first_alias = self._join_one_iteration_key(
                test_view_query_set, result_keys[0])
        for result_key in result_keys[1:]:
            query_set, _ = self._join_one_iteration_key(query_set, result_key,
                                                        first_alias=first_alias)
        return query_set


    def _join_job_keyvals(self, query_set, job_keyvals):
        for job_keyval in job_keyvals:
            alias = 'job_keyval_' + job_keyval
            keyval_query = JobKeyval.objects.filter(key=job_keyval)
            query_set = Job.objects.join_custom_field(query_set, keyval_query,
                                                       alias)
            query_set = self._add_select_value(query_set, alias)
        return query_set


    def _join_iteration_attributes(self, query_set, iteration_attributes):
        for attribute in iteration_attributes:
            alias = 'iteration_attribute_' + attribute
            attribute_query = IterationAttribute.objects.filter(
                    attribute=attribute)
            query_set = Test.objects.join_custom_field(query_set,
                                                       attribute_query, alias)
            query_set = self._add_select_value(query_set, alias)
        return query_set


    def get_query_set_with_joins(self, filter_data):
        """Add joins for querying over test-related items.

        These parameters are supported going forward:
        * test_attribute_fields: list of attribute names.  Each attribute will
                be available as a column attribute_<name>.value.
        * test_label_fields: list of label names.  Each label will be available
                as a column label_<name>.id, non-null iff the label is present.
        * iteration_result_fields: list of iteration result names.  Each
                result will be available as a column iteration_<name>.value.
                Note that this changes the semantics to return iterations
                instead of tests -- if a test has multiple iterations, a row
                will be returned for each one.  The iteration index is also
                available as iteration_<name>.iteration.
        * machine_label_fields: list of machine label names.  Each will be
                available as a column machine_label_<name>.id, non-null iff the
                label is present on the machine used in the test.
        * job_keyval_fields: list of job keyval names. Each value will be
                available as a column job_keyval_<name>.id, non-null iff the
                keyval is present in the AFE job.
        * iteration_attribute_fields: list of iteration attribute names. Each
                attribute will be available as a column
                iteration_attribute<name>.id, non-null iff the attribute is
                present.

        These parameters are deprecated:
        * include_labels
        * exclude_labels
        * include_attributes_where
        * exclude_attributes_where

        Additionally, this method adds joins if the following strings are
        present in extra_where (this is also deprecated):
        * test_labels
        * test_attributes_host_labels

        @param filter_data: Data by which to filter.

        @return A QuerySet.

        """
        query_set = self.get_query_set()

        test_attributes = filter_data.pop('test_attribute_fields', [])
        for attribute in test_attributes:
            query_set = self._join_test_attribute(query_set, attribute)

        test_labels = filter_data.pop('test_label_fields', [])
        query_set = self._join_test_label_columns(query_set, test_labels)

        machine_labels = filter_data.pop('machine_label_fields', [])
        query_set = self._join_machine_label_columns(query_set, machine_labels)

        iteration_keys = filter_data.pop('iteration_result_fields', [])
        query_set = self._join_iteration_results(query_set, iteration_keys)

        job_keyvals = filter_data.pop('job_keyval_fields', [])
        query_set = self._join_job_keyvals(query_set, job_keyvals)

        iteration_attributes = filter_data.pop('iteration_attribute_fields', [])
        query_set = self._join_iteration_attributes(query_set,
                                                    iteration_attributes)

        # everything that follows is deprecated behavior

        joined = False

        extra_where = filter_data.get('extra_where', '')
        if 'tko_test_labels' in extra_where:
            query_set = self._add_label_joins(query_set)
            joined = True

        include_labels = filter_data.pop('include_labels', [])
        exclude_labels = filter_data.pop('exclude_labels', [])
        if include_labels:
            query_set = self._include_or_exclude_labels(query_set,
                                                        include_labels)
            joined = True
        if exclude_labels:
            query_set = self._include_or_exclude_labels(query_set,
                                                        exclude_labels,
                                                        exclude=True)
            joined = True

        include_attributes_where = filter_data.pop('include_attributes_where',
                                                   '')
        exclude_attributes_where = filter_data.pop('exclude_attributes_where',
                                                   '')
        if include_attributes_where:
            query_set = self._add_attribute_join(
                query_set,
                join_condition=self.escape_user_sql(include_attributes_where))
            joined = True
        if exclude_attributes_where:
            query_set = self._add_attribute_join(
                query_set,
                join_condition=self.escape_user_sql(exclude_attributes_where),
                exclude=True)
            joined = True

        if not joined:
            filter_data['no_distinct'] = True

        if 'tko_test_attributes_host_labels' in extra_where:
            query_set = self._add_attribute_join(
                query_set, suffix='_host_labels',
                join_condition='tko_test_attributes_host_labels.attribute = '
                               '"host-labels"')

        return query_set


    def query_test_ids(self, filter_data, apply_presentation=True):
        """Queries for test IDs.

        @param filter_data: Data by which to filter.
        @param apply_presentation: Whether or not to apply presentation
            parameters.

        @return A list of test IDs.

        """
        query = self.model.query_objects(filter_data,
                                         apply_presentation=apply_presentation)
        dicts = query.values('test_idx')
        return [item['test_idx'] for item in dicts]


    def query_test_label_ids(self, filter_data):
        """Queries for test label IDs.

        @param filter_data: Data by which to filter.

        @return A list of test label IDs.

        """
        query_set = self.model.query_objects(filter_data)
        query_set = self._add_label_joins(query_set, suffix='_list')
        rows = self._custom_select_query(query_set, ['tko_test_labels_list.id'])
        return [row[0] for row in rows if row[0] is not None]


    def escape_user_sql(self, sql):
        sql = super(TestViewManager, self).escape_user_sql(sql)
        return sql.replace('test_idx', self.get_key_on_this_table('test_idx'))


class TestView(dbmodels.Model, model_logic.ModelExtensions):
    """Models a test view."""
    extra_fields = {
            'DATE(job_queued_time)': 'job queued day',
            'DATE(test_finished_time)': 'test finished day',
    }

    group_fields = [
            'test_name',
            'status',
            'kernel',
            'hostname',
            'job_tag',
            'job_name',
            'platform',
            'reason',
            'job_owner',
            'job_queued_time',
            'DATE(job_queued_time)',
            'test_started_time',
            'test_finished_time',
            'DATE(test_finished_time)',
    ]

    test_idx = dbmodels.IntegerField('test index', primary_key=True)
    job_idx = dbmodels.IntegerField('job index', null=True, blank=True)
    test_name = dbmodels.CharField(blank=True, max_length=90)
    subdir = dbmodels.CharField('subdirectory', blank=True, max_length=180)
    kernel_idx = dbmodels.IntegerField('kernel index')
    status_idx = dbmodels.IntegerField('status index')
    reason = dbmodels.CharField(blank=True, max_length=3072)
    machine_idx = dbmodels.IntegerField('host index')
    test_started_time = dbmodels.DateTimeField(null=True, blank=True)
    test_finished_time = dbmodels.DateTimeField(null=True, blank=True)
    job_tag = dbmodels.CharField(blank=True, max_length=300)
    job_name = dbmodels.CharField(blank=True, max_length=300)
    job_owner = dbmodels.CharField('owner', blank=True, max_length=240)
    job_queued_time = dbmodels.DateTimeField(null=True, blank=True)
    job_started_time = dbmodels.DateTimeField(null=True, blank=True)
    job_finished_time = dbmodels.DateTimeField(null=True, blank=True)
    afe_job_id = dbmodels.IntegerField(null=True)
    hostname = dbmodels.CharField(blank=True, max_length=300)
    platform = dbmodels.CharField(blank=True, max_length=240)
    machine_owner = dbmodels.CharField(blank=True, max_length=240)
    kernel_hash = dbmodels.CharField(blank=True, max_length=105)
    kernel_base = dbmodels.CharField(blank=True, max_length=90)
    kernel = dbmodels.CharField(blank=True, max_length=300)
    status = dbmodels.CharField(blank=True, max_length=30)
    invalid = dbmodels.BooleanField(blank=True)
    invalidates_test_idx = dbmodels.IntegerField(null=True, blank=True)

    objects = TestViewManager()

    def save(self):
        raise NotImplementedError('TestView is read-only')


    def delete(self):
        raise NotImplementedError('TestView is read-only')


    @classmethod
    def query_objects(cls, filter_data, initial_query=None,
                      apply_presentation=True):
        if initial_query is None:
            initial_query = cls.objects.get_query_set_with_joins(filter_data)
        return super(TestView, cls).query_objects(
                filter_data, initial_query=initial_query,
                apply_presentation=apply_presentation)

    class Meta:
        """Metadata for class TestView."""
        db_table = 'tko_test_view_2'