% send, sniff, sr* tests for Scapy ~ netaccess ############ ############ + Test bridge_and_sniff() using tap sockets ~ tap linux = Create two tap interfaces import subprocess from threading import Thread tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] if LINUX: for i in range(2): assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"]) == 0 else: for i in range(2): assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"]) == 0 = Run a sniff thread on the tap1 **interface** * It will terminate when 5 IP packets from 1.2.3.4 have been sniffed t_sniff = Thread( target=sniff, kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} ) t_sniff.start() = Run a bridge_and_sniff thread between the taps **sockets** * It will terminate when 5 IP packets from 1.2.3.4 have been forwarded t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) t_bridge.start() = Send five IP packets from 1.2.3.4 to the tap0 **interface** time.sleep(1) sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", count=5) = Wait for the threads t_bridge.join() t_sniff.join() = Run a sniff thread on the tap1 **interface** * It will terminate when 5 IP packets from 2.3.4.5 have been sniffed t_sniff = Thread( target=sniff, kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} ) t_sniff.start() = Run a bridge_and_sniff thread between the taps **sockets** * It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded def nat_1_2(pkt): if IP in pkt and pkt[IP].src == "1.2.3.4": pkt[IP].src = "2.3.4.5" del pkt[IP].chksum return pkt return False t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": nat_1_2, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) t_bridge.start() = Send five IP packets from 1.2.3.4 to the tap0 **interface** time.sleep(1) sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", count=5) = Wait for the threads t_bridge.join() t_sniff.join() = Delete the tap interfaces del tap0, tap1 ############ ############ + Test bridge_and_sniff() using tun sockets ~ tun linux not_pcapdnet = Create two tun interfaces import subprocess from threading import Thread tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] if LINUX: for i in range(2): assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"]) == 0 else: for i in range(2): assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"]) == 0 = Run a sniff thread on the tun1 **interface** * It will terminate when 5 IP packets from 1.2.3.4 have been sniffed t_sniff = Thread( target=sniff, kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} ) t_sniff.start() = Run a bridge_and_sniff thread between the tuns **sockets** * It will terminate when 5 IP packets from 1.2.3.4 have been forwarded. t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": lambda pkt: pkt, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) t_bridge.start() = Send five IP packets from 1.2.3.4 to the tun0 **interface** time.sleep(1) conf.route.add(net="1.2.3.4/32", dev="tun0") send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) conf.route.delt(net="1.2.3.4/32", dev="tun0") = Wait for the threads t_bridge.join() t_sniff.join() = Run a sniff thread on the tun1 **interface** * It will terminate when 5 IP packets from 2.3.4.5 have been sniffed t_sniff = Thread( target=sniff, kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} ) t_sniff.start() = Run a bridge_and_sniff thread between the tuns **sockets** * It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded def nat_1_2(pkt): if IP in pkt and pkt[IP].src == "1.2.3.4": pkt[IP].src = "2.3.4.5" del pkt[IP].chksum return pkt return False t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": nat_1_2, "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) t_bridge.start() = Send five IP packets from 1.2.3.4 to the tun0 **interface** time.sleep(1) conf.route.add(net="1.2.3.4/32", dev="tun0") send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) conf.route.delt(net="1.2.3.4/32", dev="tun0") = Wait for the threads t_bridge.join() t_sniff.join() = Delete the tun interfaces del tun0, tun1