普通文本  |  219行  |  7.19 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import subprocess

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib.cros import site_eap_certs

class HostapdServer(object):
    """Hostapd server instance wrapped in a context manager.

    Simple interface to starting and controlling a hsotapd instance.
    This can be combined with a virtual-ethernet setup to test 802.1x
    on a wired interface.

    Example usage:
        with hostapd_server.HostapdServer(interface='veth_master') as hostapd:
            hostapd.send_eap_packets()

    """
    CONFIG_TEMPLATE = """
interface=%(interface)s
driver=%(driver)s
logger_syslog=-1
logger_syslog_level=2
logger_stdout=-1
logger_stdout_level=2
dump_file=%(config_directory)s/hostapd.dump
ctrl_interface=%(config_directory)s/%(control_directory)s
ieee8021x=1
eapol_key_index_workaround=0
eap_server=1
eap_user_file=%(config_directory)s/%(user_file)s
ca_cert=%(config_directory)s/%(ca_cert)s
server_cert=%(config_directory)s/%(server_cert)s
private_key=%(config_directory)s/%(server_key)s
use_pae_group_addr=1
eap_reauth_period=10
"""
    CA_CERTIFICATE_FILE = 'ca.crt'
    CONFIG_FILE = 'hostapd.conf'
    CONTROL_DIRECTORY = 'hostapd.ctl'
    EAP_PASSWORD = 'password'
    EAP_PHASE2 = 'MSCHAPV2'
    EAP_TYPE = 'PEAP'
    EAP_USERNAME = 'test'
    HOSTAPD_EXECUTABLE = 'hostapd'
    HOSTAPD_CLIENT_EXECUTABLE = 'hostapd_cli'
    SERVER_CERTIFICATE_FILE = 'server.crt'
    SERVER_PRIVATE_KEY_FILE = 'server.key'
    USER_AUTHENTICATION_TEMPLATE = """* %(type)s
"%(username)s"\t%(phase2)s\t"%(password)s"\t[2]
"""
    USER_FILE = 'hostapd.eap_user'
    # This is the default group MAC address to which EAP challenges
    # are sent, absent any prior knowledge of a specific client on
    # the link.
    PAE_NEAREST_ADDRESS = '01:80:c2:00:00:03'

    def __init__(self,
                 interface=None,
                 driver='wired',
                 config_directory='/tmp/hostapd-test'):
        super(HostapdServer, self).__init__()
        self._interface = interface
        self._config_directory = config_directory
        self._control_directory = '%s/%s' % (self._config_directory,
                                             self.CONTROL_DIRECTORY)
        self._driver = driver
        self._process = None


    def __enter__(self):
        self.start()
        return self


    def __exit__(self, exception, value, traceback):
        self.stop()


    def write_config(self):
        """Write out a hostapd configuration file-set based on the caller
        supplied parameters.

        @return the file name of the top-level configuration file written.

        """
        if not os.path.exists(self._config_directory):
            os.mkdir(self._config_directory)
        config_params = {
            'ca_cert': self.CA_CERTIFICATE_FILE,
            'config_directory' : self._config_directory,
            'control_directory': self.CONTROL_DIRECTORY,
            'driver': self._driver,
            'interface': self._interface,
            'server_cert': self.SERVER_CERTIFICATE_FILE,
            'server_key': self.SERVER_PRIVATE_KEY_FILE,
            'user_file': self.USER_FILE
        }
        authentication_params = {
            'password': self.EAP_PASSWORD,
            'phase2': self.EAP_PHASE2,
            'username': self.EAP_USERNAME,
            'type': self.EAP_TYPE
        }
        for filename, contents in (
                ( self.CA_CERTIFICATE_FILE, site_eap_certs.ca_cert_1 ),
                ( self.CONFIG_FILE, self.CONFIG_TEMPLATE % config_params),
                ( self.SERVER_CERTIFICATE_FILE, site_eap_certs.server_cert_1 ),
                ( self.SERVER_PRIVATE_KEY_FILE,
                  site_eap_certs.server_private_key_1 ),
                ( self.USER_FILE,
                  self.USER_AUTHENTICATION_TEMPLATE % authentication_params )):
            config_file = '%s/%s' % (self._config_directory, filename)
            with open(config_file, 'w') as f:
                f.write(contents)
        return '%s/%s' % (self._config_directory, self.CONFIG_FILE)


    def start(self):
        """Start the hostap server."""
        config_file = self.write_config()
        self._process = subprocess.Popen(
                 [self.HOSTAPD_EXECUTABLE, '-dd', config_file])


    def stop(self):
        """Stop the hostapd server."""
        if self._process:
            self._process.terminate()
            self._process.wait()
            self._process = None


    def running(self):
        """Tests whether the hostapd process is still running.

        @return True if the hostapd process is still running, False otherwise.

        """
        if not self._process:
            return False

        if self._process.poll() != None:
            # We have essentially reaped the proces, and it is no more.
            self._process = None
            return False

        return True


    def send_eap_packets(self):
        """Start sending EAP packets to the nearest neighbor."""
        self.send_command('new_sta %s' % self.PAE_NEAREST_ADDRESS)


    def get_client_mib(self, client_mac_address):
        """Get a dict representing the MIB properties for |client_mac_address|.

        @param client_mac_address string MAC address of the client.
        @return dict containing mib properties.

        """
        # Expected output of "hostapd cli <client_mac_address>":
        #
        #     Selected interface 'veth_master'
        #     b6:f1:39:1d:ad:10
        #     dot1xPaePortNumber=0
        #     dot1xPaePortProtocolVersion=2
        #     [...]
        result = self.send_command('sta %s' % client_mac_address)
        client_mib = {}
        found_client = False
        for line in result.splitlines():
            if found_client:
                parts = line.split('=', 1)
                if len(parts) == 2:
                    client_mib[parts[0]] = parts[1]
            elif line == client_mac_address:
                found_client = True
        return client_mib


    def send_command(self, command):
        """Send a command to the hostapd instance.

        @param command string containing the command to run on hostapd.
        @return string output of the command.

        """
        return utils.system_output('%s -p %s %s' %
                                   (self.HOSTAPD_CLIENT_EXECUTABLE,
                                    self._control_directory, command))


    def client_has_authenticated(self, client_mac_address):
        """Return whether |client_mac_address| has successfully authenticated.

        @param client_mac_address string MAC address of the client.
        @return True if client is authenticated.

        """
        mib = self.get_client_mib(client_mac_address)
        return mib.get('dot1xAuthAuthSuccessesWhileAuthenticating', '') == '1'


    def client_has_logged_off(self, client_mac_address):
        """Return whether |client_mac_address| has logged-off.

        @param client_mac_address string MAC address of the client.
        @return True if client has logged off.

        """
        mib = self.get_client_mib(client_mac_address)
        return mib.get('dot1xAuthAuthEapLogoffWhileAuthenticated', '') == '1'