#!/usr/bin/python

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import socket
import sys
import time

import common

from autotest_lib.client.cros import dhcp_handling_rule
from autotest_lib.client.cros import dhcp_packet
from autotest_lib.client.cros import dhcp_test_server

TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"

TEST_CLASSLESS_STATIC_ROUTE_DATA = \
        "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \
        "\x00\xc0\xa8\x00\xfe"

TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [
        (18, "10.9.192.0", "172.31.155.10"),
        (0, "0.0.0.0", "192.168.0.254")
        ]

TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \
        "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04"

TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com")

# At this time, we don't support the compression allowed in the RFC.
# This is correct and sufficient for our purposes.
TEST_DOMAIN_SEARCH_LIST_EXPECTED = \
        "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00"

def bin2hex(byte_str, justification=20):
    """
    Turn big hex strings into prettier strings of hex bytes.  Group those hex
    bytes into lines justification bytes long.
    """
    chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
    groups = []
    for i in xrange(0, len(chars), justification):
        groups.append("".join(chars[i:i+justification]))
    return "\n".join(groups)

def test_packet_serialization():
    log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
    binary_discovery_packet = log_file.read()
    log_file.close()
    discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
    if not discovery_packet.is_valid:
        return False
    generated_string = discovery_packet.to_binary_string()
    if generated_string is None:
        print "Failed to generate string from packet object."
        return False
    if generated_string != binary_discovery_packet:
        print "Packets didn't match: "
        print "Generated: \n%s" % bin2hex(generated_string)
        print "Expected: \n%s" % bin2hex(binary_discovery_packet)
        return False
    print "test_packet_serialization PASSED"
    return True

def test_classless_static_route_parsing():
    parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack(
            TEST_CLASSLESS_STATIC_ROUTE_DATA)
    if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED:
        print ("Parsed binary domain list and got %s but expected %s" %
               (repr(parsed_routes),
                repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)))
        return False
    print "test_classless_static_route_parsing PASSED"
    return True

def test_classless_static_route_serialization():
    byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack(
            TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)
    if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA:
        # Turn the strings into printable hex strings on a single line.
        pretty_actual = bin2hex(byte_string, 100)
        pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100)
        print ("Expected to serialize %s to %s but instead got %s." %
               (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected,
                     pretty_actual))
        return False
    print "test_classless_static_route_serialization PASSED"
    return True

def test_domain_search_list_parsing():
    parsed_domains = dhcp_packet.DomainListOption.unpack(
            TEST_DOMAIN_SEARCH_LIST_COMPRESSED)
    # Order matters too.
    parsed_domains = tuple(parsed_domains)
    if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED:
        print ("Parsed binary domain list and got %s but expected %s" %
               (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED))
        return False
    print "test_domain_search_list_parsing PASSED"
    return True

def test_domain_search_list_serialization():
    byte_string = dhcp_packet.DomainListOption.pack(
            TEST_DOMAIN_SEARCH_LIST_PARSED)
    if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED:
        # Turn the strings into printable hex strings on a single line.
        pretty_actual = bin2hex(byte_string, 100)
        pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100)
        print ("Expected to serialize %s to %s but instead got %s." %
               (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual))
        return False
    print "test_domain_search_list_serialization PASSED"
    return True

def receive_packet(a_socket, timeout_seconds=1.0):
    data = None
    start_time = time.time()
    while data is None and start_time + timeout_seconds > time.time():
        try:
            data, _ = a_socket.recvfrom(1024)
        except socket.timeout:
            pass # We expect many timeouts.
    if data is None:
        print "Timed out before we received a response from the server."
        return None

    print "Client received a packet of length %d from the server." % len(data)
    packet = dhcp_packet.DhcpPacket(byte_str=data)
    if not packet.is_valid:
        print "Received an invalid response from DHCP server."
        return None

    return packet

def test_simple_server_exchange(server):
    intended_ip = "127.0.0.42"
    subnet_mask = "255.255.255.0"
    server_ip = "127.0.0.1"
    lease_time_seconds = 60
    test_timeout = 3.0
    mac_addr = "\x01\x02\x03\x04\x05\x06"
    # Build up our packets and have them request some default option values,
    # like the IP we're being assigned and the address of the server assigning
    # it.
    discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
    discovery_message.set_option(
            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    request_message = dhcp_packet.DhcpPacket.create_request_packet(
            discovery_message.transaction_id,
            mac_addr)
    request_message.set_option(
            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    # This is the pool of settings the DHCP server will seem to draw from to
    # answer queries from the client.  This information is written into packets
    # through the handling rules.
    dhcp_server_config = {
            dhcp_packet.OPTION_SERVER_ID : server_ip,
            dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
            dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
            dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
            }
    # Build up the handling rules for the server and start the test.
    rules = []
    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
            intended_ip,
            server_ip,
            dhcp_server_config))
    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
            intended_ip,
            server_ip,
            dhcp_server_config))
    rules[-1].is_final_handler = True
    server.start_test(rules, test_timeout)
    # Because we don't want to require root permissions to run these tests,
    # listen on the loopback device, don't broadcast, and don't use reserved
    # ports (like the actual DHCP ports).  Use 8068/8067 instead.
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client_socket.bind(("127.0.0.1", 8068))
    client_socket.settimeout(0.1)
    client_socket.sendto(discovery_message.to_binary_string(),
                         (server_ip, 8067))

    offer_packet = receive_packet(client_socket)
    if offer_packet is None:
        return False

    if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER):
        print "Type of DHCP response is not offer."
        return False

    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
        print "Server didn't offer the IP we expected."
        return False

    print "Offer looks good to the client, sending request."
    # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages.  In
    # our unit test, we have to do this ourselves.
    request_message.set_option(
            dhcp_packet.OPTION_SERVER_ID,
            offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
    request_message.set_option(
            dhcp_packet.OPTION_SUBNET_MASK,
            offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
    request_message.set_option(
            dhcp_packet.OPTION_IP_LEASE_TIME,
            offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
    request_message.set_option(
            dhcp_packet.OPTION_REQUESTED_IP,
            offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
    # Send the REQUEST message.
    client_socket.sendto(request_message.to_binary_string(),
                         (server_ip, 8067))
    ack_packet = receive_packet(client_socket)
    if ack_packet is None:
        return False

    if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK):
        print "Type of DHCP response is not acknowledgement."
        return False

    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
        print "Server didn't give us the IP we expected."
        return False

    print "Waiting for the server to finish."
    server.wait_for_test_to_finish()
    print "Server agrees that the test is over."
    if not server.last_test_passed:
        print "Server is unhappy with the test result."
        return False

    print "test_simple_server_exchange PASSED."
    return True

def test_server_dialogue():
    server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
                                             ingress_port=8067,
                                             broadcast_address="127.0.0.1",
                                             broadcast_port=8068)
    server.start()
    ret = False
    if server.is_healthy:
        ret = test_simple_server_exchange(server)
    else:
        print "Server isn't healthy, aborting."
    print "Sending server stop() signal."
    server.stop()
    print "Stop signal sent."
    return ret

def run_tests():
    logger = logging.getLogger("dhcp")
    logger.setLevel(logging.DEBUG)
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.DEBUG)
    logger.addHandler(stream_handler)
    retval = test_packet_serialization()
    retval &= test_classless_static_route_parsing()
    retval &= test_classless_static_route_serialization()
    retval &= test_domain_search_list_parsing()
    retval &= test_domain_search_list_serialization()
    retval &= test_server_dialogue()
    if retval:
        print "All tests PASSED."
        return 0
    else:
        print "Some tests FAILED"
        return -1

if __name__ == "__main__":
    sys.exit(run_tests())