#!/usr/bin/python
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
from errno import * # pylint: disable=wildcard-import
import os
import random
import re
from scapy import all as scapy
from socket import * # pylint: disable=wildcard-import
import struct
import subprocess
import time
import unittest
import multinetwork_base
import net_test
import netlink
import packets
import xfrm
XFRM_ADDR_ANY = 16 * "\x00"
LOOPBACK = 15 * "\x00" + "\x01"
ENCRYPTED_PAYLOAD = ("b1c74998efd6326faebe2061f00f2c750e90e76001664a80c287b150"
"59e74bf949769cc6af71e51b539e7de3a2a14cb05a231b969e035174"
"d98c5aa0cef1937db98889ec0d08fa408fecf616")
ENCRYPTION_KEY = ("308146eb3bd84b044573d60f5a5fd159"
"57c7d4fe567a2120f35bae0f9869ec22".decode("hex"))
AUTH_TRUNC_KEY = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
TEST_ADDR1 = "2001:4860:4860::8888"
TEST_ADDR2 = "2001:4860:4860::8844"
TEST_SPI = 0x1234
ALL_ALGORITHMS = 0xffffffff
ALGO_CBC_AES_256 = xfrm.XfrmAlgo(("cbc(aes)", 256))
ALGO_HMAC_SHA1 = xfrm.XfrmAlgoAuth(("hmac(sha1)", 128, 96))
class XfrmTest(multinetwork_base.MultiNetworkBaseTest):
@classmethod
def setUpClass(cls):
super(XfrmTest, cls).setUpClass()
cls.xfrm = xfrm.Xfrm()
def setUp(self):
# TODO: delete this when we're more diligent about deleting our SAs.
super(XfrmTest, self).setUp()
subprocess.call("ip xfrm state flush".split())
def expectIPv6EspPacketOn(self, netid, spi, seq, length):
packets = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(packets))
packet = packets[0]
self.assertEquals(IPPROTO_ESP, packet.nh)
spi_seq = struct.pack("!II", spi, seq)
self.assertEquals(spi_seq, str(packet.payload)[:len(spi_seq)])
self.assertEquals(length, len(packet.payload))
def assertIsUdpEncapEsp(self, packet, spi, seq, length):
self.assertEquals(IPPROTO_UDP, packet.proto)
self.assertEquals(4500, packet.dport)
# Skip UDP header. TODO: isn't there a better way to do this?
payload = str(packet.payload)[8:]
self.assertEquals(length, len(payload))
spi_seq = struct.pack("!II", ntohl(spi), seq)
self.assertEquals(spi_seq, str(payload)[:len(spi_seq)])
def testAddSa(self):
self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP,
xfrm.XFRM_MODE_TRANSPORT, 3320,
ALGO_CBC_AES_256, ENCRYPTION_KEY,
ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
expected = (
"src :: dst 2001:4860:4860::8888\n"
"\tproto esp spi 0x00001234 reqid 3320 mode transport\n"
"\treplay-window 4 \n"
"\tauth-trunc hmac(sha1) 0x%s 96\n"
"\tenc cbc(aes) 0x%s\n"
"\tsel src ::/0 dst ::/0 \n" % (
AUTH_TRUNC_KEY.encode("hex"), ENCRYPTION_KEY.encode("hex")))
actual = subprocess.check_output("ip xfrm state".split())
try:
self.assertMultiLineEqual(expected, actual)
finally:
self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP)
@unittest.skipUnless(net_test.LINUX_VERSION < (4, 4, 0), "regression")
def testSocketPolicy(self):
# Open an IPv6 UDP socket and connect it.
s = socket(AF_INET6, SOCK_DGRAM, 0)
netid = random.choice(self.NETIDS)
self.SelectInterface(s, netid, "mark")
s.connect((TEST_ADDR1, 53))
saddr, sport = s.getsockname()[:2]
daddr, dport = s.getpeername()[:2]
# Create a selector that matches all UDP packets. It's not actually used to
# select traffic, that will be done by the socket policy, which selects the
# SA entry (i.e., xfrm state) via the SPI and reqid.
sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0,
AF_INET6, 0, 0, IPPROTO_UDP, 0, 0))
# Create a user policy that specifies that all outbound packets matching the
# (essentially no-op) selector should be encrypted.
info = xfrm.XfrmUserpolicyInfo((sel,
xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR,
100, 0,
xfrm.XFRM_POLICY_OUT,
xfrm.XFRM_POLICY_ALLOW,
xfrm.XFRM_POLICY_LOCALOK,
xfrm.XFRM_SHARE_UNIQUE))
# Create a template that specifies the SPI and the protocol.
xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, htonl(TEST_SPI), IPPROTO_ESP))
tmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET6, XFRM_ADDR_ANY, 0,
xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE,
0, # require
ALL_ALGORITHMS, # auth algos
ALL_ALGORITHMS, # encryption algos
ALL_ALGORITHMS)) # compression algos
# Set the policy and template on our socket.
data = info.Pack() + tmpl.Pack()
s.setsockopt(IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, data)
# Because the policy has level set to "require" (the default), attempting
# to send a packet results in an error, because there is no SA that
# matches the socket policy we set.
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
# Adding a matching SA causes the packet to go out encrypted. The SA's
# SPI must match the one in our template, and the destination address must
# match the packet's destination address (in tunnel mode, it has to match
# the tunnel destination).
reqid = 0
self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP,
xfrm.XFRM_MODE_TRANSPORT, reqid,
ALGO_CBC_AES_256, ENCRYPTION_KEY,
ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
s.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
self.expectIPv6EspPacketOn(netid, TEST_SPI, 1, 84)
# Sending to another destination doesn't work: again, no matching SA.
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR2, 53))
# Sending on another socket without the policy applied results in an
# unencrypted packet going out.
s2 = socket(AF_INET6, SOCK_DGRAM, 0)
self.SelectInterface(s2, netid, "mark")
s2.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
packets = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(packets))
packet = packets[0]
self.assertEquals(IPPROTO_UDP, packet.nh)
# Deleting the SA causes the first socket to return errors again.
self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP)
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
def testUdpEncapWithSocketPolicy(self):
# TODO: test IPv6 instead of IPv4.
netid = random.choice(self.NETIDS)
myaddr = self.MyAddress(4, netid)
remoteaddr = self.GetRemoteAddress(4)
# Reserve a port on which to receive UDP encapsulated packets. Sending
# packets works without this (and potentially can send packets with a source
# port belonging to another application), but receiving requires the port to
# be bound and the encapsulation socket option enabled.
encap_socket = net_test.Socket(AF_INET, SOCK_DGRAM, 0)
encap_socket.bind((myaddr, 0))
encap_port = encap_socket.getsockname()[1]
encap_socket.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP,
xfrm.UDP_ENCAP_ESPINUDP)
# Open a socket to send traffic.
s = socket(AF_INET, SOCK_DGRAM, 0)
self.SelectInterface(s, netid, "mark")
s.connect((remoteaddr, 53))
# Create a UDP encap policy and template inbound and outbound and apply
# them to s.
sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0,
AF_INET, 0, 0, IPPROTO_UDP, 0, 0))
# Use the same SPI both inbound and outbound because this lets us receive
# encrypted packets by simply replaying the packets the kernel sends.
in_reqid = 123
in_spi = htonl(TEST_SPI)
out_reqid = 456
out_spi = htonl(TEST_SPI)
# Start with the outbound policy.
# TODO: what happens without XFRM_SHARE_UNIQUE?
info = xfrm.XfrmUserpolicyInfo((sel,
xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR,
100, 0,
xfrm.XFRM_POLICY_OUT,
xfrm.XFRM_POLICY_ALLOW,
xfrm.XFRM_POLICY_LOCALOK,
xfrm.XFRM_SHARE_UNIQUE))
xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, out_spi, IPPROTO_ESP))
usertmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET, XFRM_ADDR_ANY, out_reqid,
xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE,
0, # require
ALL_ALGORITHMS, # auth algos
ALL_ALGORITHMS, # encryption algos
ALL_ALGORITHMS)) # compression algos
data = info.Pack() + usertmpl.Pack()
s.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data)
# Uncomment for debugging.
# subprocess.call("ip xfrm policy".split())
# Create inbound and outbound SAs that specify UDP encapsulation.
encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
htons(4500), 16 * "\x00"))
self.xfrm.AddMinimalSaInfo(myaddr, remoteaddr, out_spi, IPPROTO_ESP,
xfrm.XFRM_MODE_TRANSPORT, out_reqid,
ALGO_CBC_AES_256, ENCRYPTION_KEY,
ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl)
# Add an encap template that's the mirror of the outbound one.
encaptmpl.sport, encaptmpl.dport = encaptmpl.dport, encaptmpl.sport
self.xfrm.AddMinimalSaInfo(remoteaddr, myaddr, in_spi, IPPROTO_ESP,
xfrm.XFRM_MODE_TRANSPORT, in_reqid,
ALGO_CBC_AES_256, ENCRYPTION_KEY,
ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl)
# Uncomment for debugging.
# subprocess.call("ip xfrm state".split())
# Now send a packet.
s.sendto("foo", (remoteaddr, 53))
srcport = s.getsockname()[1]
# s.send("foo") # TODO: WHY DOES THIS NOT WORK?
# Expect to see an UDP encapsulated packet.
packets = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(packets))
packet = packets[0]
self.assertIsUdpEncapEsp(packet, out_spi, 1, 52)
# Now test the receive path. Because we don't know how to decrypt packets,
# we just play back the encrypted packet that kernel sent earlier. We swap
# the addresses in the IP header to make the packet look like it's bound for
# us, but we can't do that for the port numbers because the UDP header is
# part of the integrity protected payload, which we can only replay as is.
# So the source and destination ports are swapped and the packet appears to
# be sent from srcport to port 53. Open another socket on that port, and
# apply the inbound policy to it.
twisted_socket = socket(AF_INET, SOCK_DGRAM, 0)
net_test.SetSocketTimeout(twisted_socket, 100)
twisted_socket.bind(("0.0.0.0", 53))
# TODO: why does this work even without the per-socket policy applied? The
# received packet obviously matches an SA, but don't inbound packets need to
# match a policy as well?
info.dir = xfrm.XFRM_POLICY_IN
xfrmid.spi = in_spi
usertmpl.reqid = in_reqid
data = info.Pack() + usertmpl.Pack()
twisted_socket.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data)
# Save the payload of the packet so we can replay it back to ourselves.
payload = str(packet.payload)[8:]
spi_seq = struct.pack("!II", ntohl(in_spi), 1)
payload = spi_seq + payload[len(spi_seq):]
# Tamper with the packet and check that it's dropped and counted as invalid.
sainfo = self.xfrm.FindSaInfo(in_spi)
self.assertEquals(0, sainfo.stats.integrity_failed)
broken = payload[:25] + chr((ord(payload[25]) + 1) % 256) + payload[26:]
incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=4500, dport=encap_port) / broken)
self.ReceivePacketOn(netid, incoming)
sainfo = self.xfrm.FindSaInfo(in_spi)
self.assertEquals(1, sainfo.stats.integrity_failed)
# Now play back the valid packet and check that we receive it.
incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=4500, dport=encap_port) / payload)
self.ReceivePacketOn(netid, incoming)
data, src = twisted_socket.recvfrom(4096)
self.assertEquals("foo", data)
self.assertEquals((remoteaddr, srcport), src)
# Check that unencrypted packets are not received.
unencrypted = (scapy.IP(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=srcport, dport=53) / "foo")
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
if __name__ == "__main__":
unittest.main()