普通文本  |  199行  |  8.63 KB

# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dpkt
import logging
import re

from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.tendo import peerd_config
from autotest_lib.client.cros import chrooted_avahi
from autotest_lib.client.cros.netprotos import interface_host
from autotest_lib.client.cros.netprotos import zeroconf
from autotest_lib.client.cros.tendo import peerd_dbus_helper


class peerd_AdvertiseServices(test.test):
    """Test that peerd can correctly advertise services over mDNS."""
    version = 1

    ANY_VALUE = object()  # Use reference equality for wildcard.
    FAKE_HOST_HOSTNAME = 'test-host'
    TEST_TIMEOUT_SECONDS = 30
    TEST_SERVICE_ID = 'test-service-0'
    TEST_SERVICE_INFO = {'some_data': 'a value',
                          'other_data': 'another value'}
    TEST_SERVICE_PORT = 8080
    SERBUS_SERVICE_ID = 'serbus'
    SERBUS_SERVICE_INFO = {
            'ver': '1.0',
            'id': ANY_VALUE,
            'services': r'(.+\.)?' + TEST_SERVICE_ID + r'(\..+)?',
    }
    SERBUS_SERVICE_PORT = 0


    def initialize(self):
        # Make sure these are initiallized to None in case we throw
        # during self.initialize().
        self._chrooted_avahi = None
        self._peerd = None
        self._host = None
        self._zc_listener = None
        self._chrooted_avahi = chrooted_avahi.ChrootedAvahi()
        self._chrooted_avahi.start()
        # Start up a cleaned up peerd with really verbose logging.
        self._peerd = peerd_dbus_helper.make_helper(
                peerd_config.PeerdConfig(verbosity_level=3))
        # Listen on our half of the interface pair for mDNS advertisements.
        self._host = interface_host.InterfaceHost(
                self._chrooted_avahi.unchrooted_interface_name)
        self._zc_listener = zeroconf.ZeroconfDaemon(self._host,
                                                    self.FAKE_HOST_HOSTNAME)
        # The queries for hostname/dns_domain are IPCs and therefor relatively
        # expensive.  Do them just once.
        hostname = self._chrooted_avahi.hostname
        dns_domain = self._chrooted_avahi.dns_domain
        if not hostname or not dns_domain:
            raise error.TestFail('Failed to get hostname/domain from avahi.')
        self._dns_domain = dns_domain
        self._hostname = '%s.%s' % (hostname, dns_domain)


    def cleanup(self):
        for obj in (self._chrooted_avahi,
                    self._host,
                    self._peerd):
            if obj is not None:
                obj.close()


    def _check_txt_record_data(self, expected_data, actual_data):
        # Labels in the TXT record should be 1:1 with our service info.
        expected_entries = expected_data.copy()
        for entry in actual_data:
            # All labels should be key/value pairs.
            if entry.find('=') < 0:
                raise error.TestFail('All TXT entries should have = separator, '
                                     'but got: %s' % entry)
            k, v = entry.split('=', 1)
            if k not in expected_entries:
                raise error.TestFail('Unexpected TXT entry key: %s' % k)
            if (expected_entries[k] != self.ANY_VALUE and
                    not re.match(expected_entries[k], v)):
                # We're going to return False here rather than fail the test
                # for one tricky reason: in the root serbus record, we may
                # find that the service list does not match our expectation
                # since other daemons may be advertising services via peerd.
                # We need to basically wait for our test service to show up.
                logging.warning('Expected TXT value to match %s for '
                                'entry=%s but got value=%r instead.',
                                expected_entries[k], k, v)
                return False
            expected_entries.pop(k)
        if expected_entries:
            # Raise a detailed exception here, rather than return false.
            raise error.TestFail('Missing entries from TXT: %r' %
                                 expected_entries)
        return True


    def _ask_for_record(self, record_name, record_type):
        """Ask for a record, and query for it if we don't have it.

        @param record_name: string name of record (e.g. the complete host name
                            for A records.
        @param record_type: one of dpkt.dns.DNS_*.
        @return list of matching records.

        """
        found_records = self._zc_listener.cached_results(
                record_name, record_type)
        if len(found_records) > 1:
            logging.warning('Found multiple records with name=%s and type=%r',
                            record_name, record_type)
        if found_records:
            logging.debug('Found record with name=%s, type=%r, value=%r.',
                          record_name, record_type, found_records[0].data)
            return found_records[0]
        logging.debug('Did not see record with name=%s and type=%r',
                      record_name, record_type)
        desired_records = [(record_name, record_type)]
        self._zc_listener.send_request(desired_records)
        return None


    def _found_service_records(self, service_id, service_info, service_port):
        PTR_name = '_%s._tcp.%s' % (service_id, self._dns_domain)
        record_PTR = self._ask_for_record(PTR_name, dpkt.dns.DNS_PTR)
        if not record_PTR:
            return False
        # Great, we know the PTR, make sure that we can also get the SRV and
        # TXT entries.
        TXT_name = SRV_name = record_PTR.data
        record_SRV = self._ask_for_record(SRV_name, dpkt.dns.DNS_SRV)
        if record_SRV is None:
            return False
        if (record_SRV.data[0] != self._hostname or
                record_SRV.data[3] != service_port):
            raise error.TestFail('Expected SRV record data %r but got %r' %
                                 ((self._hostname, service_port),
                                  record_SRV.data))
        # TXT should exist.
        record_TXT = self._ask_for_record(TXT_name, dpkt.dns.DNS_TXT)
        if (record_TXT is None or
                not self._check_txt_record_data(service_info, record_TXT.data)):
            return False
        return True


    def _found_desired_records(self):
        """Verifies that avahi has all the records we care about.

        Asks the |self._zc_listener| for records we expect to correspond
        to our test service.  Will trigger queries if we don't find the
        expected records.

        @return True if we have all expected records, False otherwise.

        """
        logging.debug('Looking for records for %s.', self._hostname)
        # First, check that Avahi is doing the simple things and publishing
        # an A record.
        record_A = self._ask_for_record(self._hostname, dpkt.dns.DNS_A)
        if (record_A is None or
                record_A.data != self._chrooted_avahi.avahi_interface_addr):
            return False
        logging.debug('Found A record, looking for serbus records.')
        # If we can see Avahi publishing that it's there, check that it has
        # appropriate entries for its serbus master record.
        if not self._found_service_records(self.SERBUS_SERVICE_ID,
                                           self.SERBUS_SERVICE_INFO,
                                           self.SERBUS_SERVICE_PORT):
            return False
        logging.debug('Found serbus records, looking for service records.')
        # We also expect the subservices we've added to exist.
        if not self._found_service_records(self.TEST_SERVICE_ID,
                                           self.TEST_SERVICE_INFO,
                                           self.TEST_SERVICE_PORT):
            return False
        logging.debug('Found all desired records.')
        return True


    def run_once(self):
        # Tell peerd about this exciting new service we have.
        self._peerd.expose_service(
                self.TEST_SERVICE_ID,
                self.TEST_SERVICE_INFO,
                mdns_options={'port': self.TEST_SERVICE_PORT})
        # Wait for advertisements of that service to appear from avahi.
        logging.info('Waiting to receive mDNS advertisements of '
                     'peerd services.')
        success, duration = self._host.run_until(self._found_desired_records,
                                                 self.TEST_TIMEOUT_SECONDS)
        if not success:
            raise error.TestFail('Did not receive mDNS advertisements in time.')