普通文本  |  312行  |  12.44 KB

#!/usr/bin/python
#
# Copyright 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import os
import random
from socket import *  # pylint: disable=wildcard-import
import time
import unittest

from scapy import all as scapy

import csocket
import iproute
import multinetwork_base
import multinetwork_test
import net_test

# Setsockopt values.
IPV6_ADDR_PREFERENCES = 72
IPV6_PREFER_SRC_PUBLIC = 0x0002


USE_OPTIMISTIC_SYSCTL = "/proc/sys/net/ipv6/conf/default/use_optimistic"

HAVE_USE_OPTIMISTIC = os.path.isfile(USE_OPTIMISTIC_SYSCTL)


class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):

  def SetDAD(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)

  def SetOptimisticDAD(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)

  def SetUseTempaddrs(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)

  def SetUseOptimistic(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)

  def GetSourceIP(self, netid, mode="mark"):
    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
    # Because why not...testing for temporary addresses is a separate thing.
    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)

    s.connect((net_test.IPV6_ADDR, 123))
    src_addr = s.getsockname()[0]
    self.assertTrue(src_addr)
    return src_addr

  def assertAddressNotPresent(self, address):
    self.assertRaises(IOError, self.iproute.GetAddress, address)

  def assertAddressHasExpectedAttributes(
      self, address, expected_ifindex, expected_flags):
    ifa_msg = self.iproute.GetAddress(address)[0]
    self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
    self.assertEquals(64, ifa_msg.prefixlen)
    self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
    self.assertEquals(expected_ifindex, ifa_msg.index)
    self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)

  def AddressIsTentative(self, address):
    ifa_msg = self.iproute.GetAddress(address)[0]
    return ifa_msg.flags & iproute.IFA_F_TENTATIVE

  def BindToAddress(self, address):
    s = net_test.UDPSocket(AF_INET6)
    s.bind((address, 0, 0, 0))

  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
    return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)

  def assertAddressUsable(self, address, netid):
    self.BindToAddress(address)
    self.SendWithSourceAddress(address, netid)
    # No exceptions? Good.

  def assertAddressNotUsable(self, address, netid):
    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
    self.assertRaisesErrno(errno.EINVAL,
                           self.SendWithSourceAddress, address, netid)

  def assertAddressSelected(self, address, netid):
    self.assertEquals(address, self.GetSourceIP(netid))

  def assertAddressNotSelected(self, address, netid):
    self.assertNotEquals(address, self.GetSourceIP(netid))

  def WaitForDad(self, address):
    for _ in xrange(20):
      if not self.AddressIsTentative(address):
        return
      time.sleep(0.1)
    raise AssertionError("%s did not complete DAD after 2 seconds")


class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):

  def setUp(self):
    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
    # are all consistently disabled at the outset.
    for netid in self.tuns:
      self.SetDAD(self.GetInterfaceName(netid), 0)
      self.SetOptimisticDAD(self.GetInterfaceName(netid), 0)
      self.SetUseTempaddrs(self.GetInterfaceName(netid), 0)
      if HAVE_USE_OPTIMISTIC:
        self.SetUseOptimistic(self.GetInterfaceName(netid), 0)

    # [1]  Pick an interface on which to test.
    self.test_netid = random.choice(self.tuns.keys())
    self.test_ip = self.MyAddress(6, self.test_netid)
    self.test_ifindex = self.ifindices[self.test_netid]
    self.test_ifname = self.GetInterfaceName(self.test_netid)

    # [2]  Delete the test interface's IPv6 address.
    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
    self.assertAddressNotPresent(self.test_ip)

    self.assertAddressNotUsable(self.test_ip, self.test_netid)


class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):

  def testRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Get flags and prove tentative-ness.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)

    # Even though the interface has an IPv6 address, its tentative nature
    # prevents it from being selected.
    self.assertAddressNotUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # Busy wait for DAD to complete (should be less than 1 second).
    self.WaitForDad(self.test_ip)

    # The test_ip should have completed DAD by now, and should be the
    # chosen source address, eligible to bind to, etc.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):

  def testRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Get flags and prove optimism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # Optimistic addresses are usable but are not selected.
    if net_test.LinuxVersion() >= (3, 18, 0):
      # The version checked in to android kernels <= 3.10 requires the
      # use_optimistic sysctl to be turned on.
      self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # Busy wait for DAD to complete (should be less than 1 second).
    self.WaitForDad(self.test_ip)

    # The test_ip should have completed DAD by now, and should be the
    # chosen source address.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):

  @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
  def testModifiedRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Get flags and prove optimistism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # The interface has an IPv6 address and, despite its optimistic nature,
    # the use_optimistic option allows it to be selected.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):

  @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
  def testModifiedRfc6724Behaviour(self):
    # [3]  Add a valid IPv6 address to this interface and verify it is
    # selected as the source address.
    preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe"
    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
    self.assertAddressHasExpectedAttributes(
        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
    self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))

    # [4]  Get another IPv6 address, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Get flags and prove optimism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # Since the interface has another IPv6 address, the optimistic address
    # is not selected--the other, valid address is chosen.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)
    self.assertAddressSelected(preferred_ip, self.test_netid)


class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):

  @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
  def testDadFailure(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Prove optimism and usability.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)

    # Send a NA for the optimistic address, indicating address conflict
    # ("DAD defense").
    conflict_macaddr = "02:00:0b:ad:d0:0d"
    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)

    # The address should have failed DAD, and therefore no longer be usable.
    self.assertAddressNotUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.


class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):

  @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
  @unittest.skipUnless(net_test.LinuxVersion() >= (3, 18, 0),
                       "correct optimistic bind() not supported")
  def testSendToOnlinkDestination(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, 0)
    # Prove optimism and usability.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)

    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
    # packet with a source address that is NOT the optimistic address.
    # In this setup, the only usable address is the link-local address.
    onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid))
    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)

    expected_ns = multinetwork_test.Packets.NS(
        net_test.GetLinkAddress(self.test_ifname, True),
        onlink_dest,
        self.MyMacAddress(self.test_netid))[1]
    self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)


# TODO(ek): add tests listening for netlink events.


if __name__ == "__main__":
  unittest.main()