# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dpkt
import logging
import re
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.tendo import peerd_config
from autotest_lib.client.cros import chrooted_avahi
from autotest_lib.client.cros.netprotos import interface_host
from autotest_lib.client.cros.netprotos import zeroconf
from autotest_lib.client.cros.tendo import peerd_dbus_helper
class peerd_AdvertiseServices(test.test):
"""Test that peerd can correctly advertise services over mDNS."""
version = 1
ANY_VALUE = object() # Use reference equality for wildcard.
FAKE_HOST_HOSTNAME = 'test-host'
TEST_TIMEOUT_SECONDS = 30
TEST_SERVICE_ID = 'test-service-0'
TEST_SERVICE_INFO = {'some_data': 'a value',
'other_data': 'another value'}
TEST_SERVICE_PORT = 8080
SERBUS_SERVICE_ID = 'serbus'
SERBUS_SERVICE_INFO = {
'ver': '1.0',
'id': ANY_VALUE,
'services': r'(.+\.)?' + TEST_SERVICE_ID + r'(\..+)?',
}
SERBUS_SERVICE_PORT = 0
def initialize(self):
# Make sure these are initiallized to None in case we throw
# during self.initialize().
self._chrooted_avahi = None
self._peerd = None
self._host = None
self._zc_listener = None
self._chrooted_avahi = chrooted_avahi.ChrootedAvahi()
self._chrooted_avahi.start()
# Start up a cleaned up peerd with really verbose logging.
self._peerd = peerd_dbus_helper.make_helper(
peerd_config.PeerdConfig(verbosity_level=3))
# Listen on our half of the interface pair for mDNS advertisements.
self._host = interface_host.InterfaceHost(
self._chrooted_avahi.unchrooted_interface_name)
self._zc_listener = zeroconf.ZeroconfDaemon(self._host,
self.FAKE_HOST_HOSTNAME)
# The queries for hostname/dns_domain are IPCs and therefor relatively
# expensive. Do them just once.
hostname = self._chrooted_avahi.hostname
dns_domain = self._chrooted_avahi.dns_domain
if not hostname or not dns_domain:
raise error.TestFail('Failed to get hostname/domain from avahi.')
self._dns_domain = dns_domain
self._hostname = '%s.%s' % (hostname, dns_domain)
def cleanup(self):
for obj in (self._chrooted_avahi,
self._host,
self._peerd):
if obj is not None:
obj.close()
def _check_txt_record_data(self, expected_data, actual_data):
# Labels in the TXT record should be 1:1 with our service info.
expected_entries = expected_data.copy()
for entry in actual_data:
# All labels should be key/value pairs.
if entry.find('=') < 0:
raise error.TestFail('All TXT entries should have = separator, '
'but got: %s' % entry)
k, v = entry.split('=', 1)
if k not in expected_entries:
raise error.TestFail('Unexpected TXT entry key: %s' % k)
if (expected_entries[k] != self.ANY_VALUE and
not re.match(expected_entries[k], v)):
# We're going to return False here rather than fail the test
# for one tricky reason: in the root serbus record, we may
# find that the service list does not match our expectation
# since other daemons may be advertising services via peerd.
# We need to basically wait for our test service to show up.
logging.warning('Expected TXT value to match %s for '
'entry=%s but got value=%r instead.',
expected_entries[k], k, v)
return False
expected_entries.pop(k)
if expected_entries:
# Raise a detailed exception here, rather than return false.
raise error.TestFail('Missing entries from TXT: %r' %
expected_entries)
return True
def _ask_for_record(self, record_name, record_type):
"""Ask for a record, and query for it if we don't have it.
@param record_name: string name of record (e.g. the complete host name
for A records.
@param record_type: one of dpkt.dns.DNS_*.
@return list of matching records.
"""
found_records = self._zc_listener.cached_results(
record_name, record_type)
if len(found_records) > 1:
logging.warning('Found multiple records with name=%s and type=%r',
record_name, record_type)
if found_records:
logging.debug('Found record with name=%s, type=%r, value=%r.',
record_name, record_type, found_records[0].data)
return found_records[0]
logging.debug('Did not see record with name=%s and type=%r',
record_name, record_type)
desired_records = [(record_name, record_type)]
self._zc_listener.send_request(desired_records)
return None
def _found_service_records(self, service_id, service_info, service_port):
PTR_name = '_%s._tcp.%s' % (service_id, self._dns_domain)
record_PTR = self._ask_for_record(PTR_name, dpkt.dns.DNS_PTR)
if not record_PTR:
return False
# Great, we know the PTR, make sure that we can also get the SRV and
# TXT entries.
TXT_name = SRV_name = record_PTR.data
record_SRV = self._ask_for_record(SRV_name, dpkt.dns.DNS_SRV)
if record_SRV is None:
return False
if (record_SRV.data[0] != self._hostname or
record_SRV.data[3] != service_port):
raise error.TestFail('Expected SRV record data %r but got %r' %
((self._hostname, service_port),
record_SRV.data))
# TXT should exist.
record_TXT = self._ask_for_record(TXT_name, dpkt.dns.DNS_TXT)
if (record_TXT is None or
not self._check_txt_record_data(service_info, record_TXT.data)):
return False
return True
def _found_desired_records(self):
"""Verifies that avahi has all the records we care about.
Asks the |self._zc_listener| for records we expect to correspond
to our test service. Will trigger queries if we don't find the
expected records.
@return True if we have all expected records, False otherwise.
"""
logging.debug('Looking for records for %s.', self._hostname)
# First, check that Avahi is doing the simple things and publishing
# an A record.
record_A = self._ask_for_record(self._hostname, dpkt.dns.DNS_A)
if (record_A is None or
record_A.data != self._chrooted_avahi.avahi_interface_addr):
return False
logging.debug('Found A record, looking for serbus records.')
# If we can see Avahi publishing that it's there, check that it has
# appropriate entries for its serbus master record.
if not self._found_service_records(self.SERBUS_SERVICE_ID,
self.SERBUS_SERVICE_INFO,
self.SERBUS_SERVICE_PORT):
return False
logging.debug('Found serbus records, looking for service records.')
# We also expect the subservices we've added to exist.
if not self._found_service_records(self.TEST_SERVICE_ID,
self.TEST_SERVICE_INFO,
self.TEST_SERVICE_PORT):
return False
logging.debug('Found all desired records.')
return True
def run_once(self):
# Tell peerd about this exciting new service we have.
self._peerd.expose_service(
self.TEST_SERVICE_ID,
self.TEST_SERVICE_INFO,
mdns_options={'port': self.TEST_SERVICE_PORT})
# Wait for advertisements of that service to appear from avahi.
logging.info('Waiting to receive mDNS advertisements of '
'peerd services.')
success, duration = self._host.run_until(self._found_desired_records,
self.TEST_TIMEOUT_SECONDS)
if not success:
raise error.TestFail('Did not receive mDNS advertisements in time.')