########################
% Pipetool related tests
########################

+ Basic tests

= Test default test case

s = PeriodicSource("hello", 1, name="src")
d1 = Drain(name="d1")
c = ConsoleSink(name="c")
tf = TransformDrain(lambda x: "Got %s" % x)
t = TermSink(name="PipeToolsPeriodicTest", keepterm=False)
s > d1 > c
d1 > tf > t

p = PipeEngine(s)
p.start()
time.sleep(3)
s.msg = []
p.stop()

try:
    os.remove("test.png")
except OSError:
    pass

= Test add_pipe

s = AutoSource()
p = PipeEngine(s)
p.add(Pipe())
assert len(p.active_pipes) == 2

x = p.spawn_Pipe()
assert len(p.active_pipes) == 3
assert isinstance(x, Pipe)

= Test exhausted source

s = AutoSource()
s._gen_data("hello")
s.is_exhausted = True
d1 = Drain(name="d1")
c = ConsoleSink(name="c")
s > d1 > c

p = PipeEngine(s)
p.start()
p.wait_and_stop()

= Test add_pipe on running instance

p = PipeEngine()
p.start()

s = CLIFeeder()

d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c

p.add(s)

s.send("hello")
s.send("hi")

assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "hi"

p.stop()

= Test Operators

s = AutoSource()
p = PipeEngine(s)
assert p == p

a = AutoSource()
b = AutoSource()
a >> b
assert len(a.high_sinks) == 1
assert len(a.high_sources) == 0
assert len(b.high_sinks) == 0
assert len(b.high_sources) == 1
a
b

a = AutoSource()
b = AutoSource()
a << b
assert len(a.high_sinks) == 0
assert len(a.high_sources) == 1
assert len(b.high_sinks) == 1
assert len(b.high_sources) == 0
a
b

a = AutoSource()
b = AutoSource()
a == b
assert len(a.sinks) == 1
assert len(a.sources) == 1
assert len(b.sinks) == 1
assert len(b.sources) == 1

a = AutoSource()
b = AutoSource()
a//b
assert len(a.high_sinks) == 1
assert len(a.high_sources) == 1
assert len(b.high_sinks) == 1
assert len(b.high_sources) == 1

a = AutoSource()
b = AutoSource()
a^b
assert len(b.trigger_sources) == 1
assert len(a.trigger_sinks) == 1

= Test doc

s = AutoSource()
p = PipeEngine(s)
p.list_pipes()
p.list_pipes_detailed()

= Test RawConsoleSink with CLIFeeder

p = PipeEngine()

s = CLIFeeder()
s.send("hello")
s.is_exhausted = True

r, w = os.pipe()

d1 = Drain(name="d1")
c = RawConsoleSink(name="c")
c._write_pipe = w
s > d1 > c

p.add(s)
p.start()

assert os.read(r, 20) == b"hello\n"
p.wait_and_stop()

= Test QueueSink with CLIFeeder

p = PipeEngine()

s = CLIFeeder()
s.send("hello")
s.is_exhausted = True

d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c

p.add(s)
p.start()

p.wait_and_stop()
assert c.recv() == "hello"

= Test UpDrain

test_val = None

class TestSink(Sink):
    def high_push(self, msg):
        global test_val
        test_val = msg

p = PipeEngine()

s = CLIFeeder()
s.send("hello")
s.is_exhausted = True

d1 = UpDrain(name="d1")
c = TestSink(name="c")
s > d1
d1 >> c

p.add(s)
p.start()

p.wait_and_stop()
assert test_val == "hello"

= Test DownDrain

test_val = None

class TestSink(Sink):
    def push(self, msg):
        global test_val
        test_val = msg

p = PipeEngine()

s = CLIHighFeeder()
s.send("hello")
s.is_exhausted = True

d1 = DownDrain(name="d1")
c = TestSink(name="c")
s >> d1
d1 > c

p.add(s)
p.start()

p.wait_and_stop()
assert test_val == "hello"

+ Advanced ScapyPipes pipetools tests

= Test SniffSource
~ netaccess

p = PipeEngine()

s = SniffSource()
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c

p.add(s)
p.start()
sniff(count=3)
p.stop()
assert c.q.get()

= Test exhausted AutoSource and SniffSource

import mock
from scapy.error import Scapy_Exception

def _fail():
    raise Scapy_Exception()

a = AutoSource()
a._send = mock.MagicMock(side_effect=_fail)
a._wake_up()
try:
    a.deliver()
except:
    pass

s = SniffSource()
s.s = mock.MagicMock()
s.s.recv = mock.MagicMock(side_effect=_fail)
try:
    s.deliver()
except:
    pass

= Test RdpcapSource and WrpcapSink
~ needs_root

req = Ether()/IP()/ICMP()
rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')

wrpcap("t.pcap", [req, rpy])

p = PipeEngine()

s = RdpcapSource("t.pcap")
d1 = Drain(name="d1")
c = WrpcapSink("t2.pcap", name="c")
s > d1 > c
p.add(s)
p.start()
p.wait_and_stop()

results = rdpcap("t2.pcap")

assert raw(results[0]) == raw(req)
assert raw(results[1]) == raw(rpy)

os.unlink("t.pcap")
os.unlink("t2.pcap")

= Test InjectSink and Inject3Sink
~ needs_root

import mock

a = IP(dst="192.168.0.1")/ICMP()
msgs = []

class FakeSocket(object):
    def __init__(self, *arg, **karg):
        pass
    def close(self):
        pass
    def send(self, msg):
        global msgs
        msgs.append(msg)

@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
def _inject_sink(i3):
    s = CLIFeeder()
    s.send(a)
    s.is_exhausted = True
    d1 = Drain(name="d1")
    c = Inject3Sink() if i3 else InjectSink()
    s > d1 > c
    p = PipeEngine(s)
    p.start()
    p.wait_and_stop()

_inject_sink(False) # InjectSink
_inject_sink(True) # Inject3Sink

assert msgs == [a,a]

= TriggerDrain and TriggeredValve with CLIFeeder

s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()

s > d1 > d2 > c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")

assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"

p.stop()

= TriggerDrain and TriggeredValve with CLIHighFeeder

s = CLIHighFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()

s >> d1
d1 >> d2
d2 >> c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")

assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"

p.stop()

= TriggerDrain and TriggeredQueueingValve with CLIFeeder

s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()

s > d1 > d2 > c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")

assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"

p.stop()

= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel

s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredSwitch()
c = QueueSink()

s > d1 > d2
d2 >> c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")

assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello2"

p.stop()

= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel

s = CLIHighFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredSwitch()
c = QueueSink()

s >> d1
d1 >> d2
d2 > c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")

assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"

p.stop()

= TriggerDrain and TriggeredMessage

s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredMessage("hello")
c = QueueSink()

s > d1 > d2 > c
d1 ^ d2

p = PipeEngine(s)
p.start()

s.send("trigger")

r = [c.q.get(timeout=5), c.q.get(timeout=5)]
assert "hello" in r
assert "trigger" in r

p.stop()

= TriggerDrain and TriggeredQueueingValve on low channel

p = PipeEngine()

s = CLIFeeder()
r, w = os.pipe()

d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredQueueingValve()
c = QueueSink(name="c")
s > d1 > d2 > c
d1 ^ d2

p.add(s)
p.start()

s.send("trigger")
s.send("hello")
s.send("trigger")
assert c.q.get(timeout=3) == "trigger"
assert c.q.get(timeout=3) in ['hello', 'trigger']
assert c.q.get(timeout=3) in ['hello', 'trigger']
assert d2.q.qsize() == 0

p.stop()

= TriggerDrain and TriggeredQueueingValve on high channel

p = PipeEngine()

s = CLIHighFeeder()
r, w = os.pipe()

d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredQueueingValve()
c = QueueSink(name="c")
s >> d1 >> d2 >> c
d1 ^ d2

p.add(s)
p.start()

s.send("trigger")
s.send("hello")
s.send("trigger")
assert c.q.get(timeout=3) == "trigger"
assert c.q.get(timeout=3) == "hello"
assert d2.q.qsize() == 0

p.stop()

= UDPDrain

p = PipeEngine()

s = CLIFeeder()
s2 = CLIHighFeeder()
d1 = UDPDrain()
c = QueueSink()

s > d1 > c
s2 >> d1 >> c

p.add(s)
p.add(s2)
p.start()

s.send(IP(src="127.0.0.1")/UDP()/DNS())
s2.send(DNS())

res = [c.q.get(timeout=2), c.q.get(timeout=2)]
assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res
res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00')
assert DNS in res[0] and res[0][UDP].sport == 1234

p.stop()

= FDSourceSink on a Bunch object

class Bunch:
    __init__ = lambda self, **kw: setattr(self, '__dict__', kw)

fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None)

s = FDSourceSink(fd)
d = Drain()
c = QueueSink()
s > d > c

assert s.fileno() == None
s.push("data")
s.deliver()
assert c.q.get(timeout=1) == "hello"

= TCPConnectPipe networking test
~ networking needs_root

p = PipeEngine()

s = CLIFeeder()
d1 = TCPConnectPipe(addr="www.google.fr", port=80)
c = QueueSink()

s > d1 > c

p.add(s)
p.start()

s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n")
result = c.q.get(timeout=10)
p.stop()

assert result.startswith(b"HTTP/1.0 200 OK")