普通文本  |  174行  |  5.4 KB

# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Return information about routing table entries

Read and parse the system routing table. There are
four classes defined here: NetworkRoutes, which contains
information about all routes; IPv4Route, which describes
a single IPv4 routing table entry; IPv6Route, which
does the same for IPv6; and Route, which has common code
for IPv4Route and IPv6Route.
"""

ROUTES_V4_FILE = "/proc/net/route"
ROUTES_V6_FILE = "/proc/net/ipv6_route"

# The following constants are from <net/route.h>
RTF_UP      = 0x0001
RTF_GATEWAY = 0x0002
RTF_HOST    = 0x0004
# IPv6 constants from <net/route.h>
RTF_DEFAULT = 0x10000

import socket
import struct

class Route(object):
    def __init__(self, iface, dest, gway, flags, mask):
        self.interface = iface
        self.destination = dest
        self.gateway = gway
        self.flagbits = flags
        self.netmask = mask

    def __str__(self):
        flags = ""
        if self.flagbits & RTF_UP:
            flags += "U"
        if self.flagbits & RTF_GATEWAY:
            flags += "G"
        if self.flagbits & RTF_HOST:
            flags += "H"
        if self.flagbits & RTF_DEFAULT:
            flags += "D"
        return "<%s dest: %s gway: %s mask: %s flags: %s>" % (
                self.interface,
                self._intToIp(self.destination),
                self._intToIp(self.gateway),
                self._intToIp(self.netmask),
                flags)

    def isUsable(self):
        return self.flagbits & RTF_UP

    def isHostRoute(self):
        return self.flagbits & RTF_HOST

    def isGatewayRoute(self):
        return self.flagbits & RTF_GATEWAY

    def isInterfaceRoute(self):
        return (self.flagbits & RTF_GATEWAY) == 0

    def matches(self, ip):
        try:
            return (self._ipToInt(ip) & self.netmask) == self.destination
        except socket.error:
            return False


class IPv4Route(Route):
    def __init__(self, iface, dest, gway, flags, mask):
        super(IPv4Route, self).__init__(
            iface, int(dest, 16), int(gway, 16), int(flags, 16), int(mask, 16))

    def _intToIp(self, addr):
        return socket.inet_ntoa(struct.pack('@I', addr))

    def _ipToInt(self, ip):
        return struct.unpack('I', socket.inet_aton(ip))[0]

    def isDefaultRoute(self):
        return (self.flagbits & RTF_GATEWAY) and self.destination == 0

def parseIPv4Routes(routelist):
    # The first line is headers that will allow us
    # to correctly interpret the values in the following
    # lines
    headers = routelist[0].split()
    col_map = {token: pos for (pos, token) in enumerate(headers)}

    routes = []
    for routeline in routelist[1:]:
        route = routeline.split()
        interface = route[col_map["Iface"]]
        destination = route[col_map["Destination"]]
        gateway = route[col_map["Gateway"]]
        flags = route[col_map["Flags"]]
        mask = route[col_map["Mask"]]
        routes.append(IPv4Route(interface, destination, gateway, flags, mask))

    return routes


class IPv6Route(Route):
    def __init__(self, iface, dest, gway, flags, plen):
        super(IPv6Route, self).__init__(
            iface,
            long(dest, 16),
            long(gway, 16),
            long(flags, 16),
            # netmask = set first plen bits to 1, all following to 0
            (1 << 128) - (1 << (128 - int(plen, 16))))

    def _intToIp(self, addr):
        return socket.inet_ntop(socket.AF_INET6, ("%032x" % addr).decode("hex"))

    def _ipToInt(self, ip):
        return long(socket.inet_pton(socket.AF_INET6, ip).encode("hex"), 16)

    def isDefaultRoute(self):
        return self.flagbits & RTF_DEFAULT

def parseIPv6Routes(routelist):
    # ipv6_route has no headers, so the routing table looks like the following:
    # Dest DestPrefix Src SrcPrefix Gateway Metric RefCnt UseCnt Flags Iface
    routes = []
    for routeline in routelist:
        route = routeline.split()
        interface = route[9]
        destination = route[0]
        gateway = route[4]
        flags = route[8]
        prefix = route[1]
        routes.append(IPv6Route(interface, destination, gateway, flags, prefix))

    return routes


class NetworkRoutes(object):
    def __init__(self, routelist_v4=None, routelist_v6=None):
        if routelist_v4 is None:
            with open(ROUTES_V4_FILE) as routef_v4:
                routelist_v4 = routef_v4.readlines()

        self.routes = parseIPv4Routes(routelist_v4)

        if routelist_v6 is None:
            with open(ROUTES_V6_FILE) as routef_v6:
                routelist_v6 = routef_v6.readlines()

        self.routes += parseIPv6Routes(routelist_v6)

    def _filterUsableRoutes(self):
        return (rr for rr in self.routes if rr.isUsable())

    def hasDefaultRoute(self, interface):
        return any(rr for rr in self._filterUsableRoutes()
                   if (rr.interface == interface and rr.isDefaultRoute()))

    def getDefaultRoutes(self):
        return [rr for rr in self._filterUsableRoutes() if rr.isDefaultRoute()]

    def hasInterfaceRoute(self, interface):
        return any(rr for rr in self._filterUsableRoutes()
                   if (rr.interface == interface and rr.isInterfaceRoute()))

    def getRouteFor(self, ip):
        for rr in self._filterUsableRoutes():
            if rr.matches(ip):
                return rr
        return None