#!/usr/bin/python
#
# Copyright 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ctypes
import errno
import socket
import unittest

from bpf import *  # pylint: disable=wildcard-import
import csocket
import net_test

libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
HAVE_EBPF_SUPPORT = net_test.LINUX_VERSION >= (4, 4, 0)

@unittest.skipUnless(HAVE_EBPF_SUPPORT,
                     "eBPF function not fully supported")
class BpfTest(net_test.NetworkTest):

  def testCreateMap(self):
    key, value = 1, 1
    map_fd = CreateMap(BPF_MAP_TYPE_HASH, 4, 4, 100)
    UpdateMap(map_fd, key, value)
    self.assertEquals(LookupMap(map_fd, key).value, value)
    DeleteMap(map_fd, key)
    self.assertRaisesErrno(errno.ENOENT, LookupMap, map_fd, key)

  def testIterateMap(self):
    map_fd = CreateMap(BPF_MAP_TYPE_HASH, 4, 4, 100)
    value = 1024
    for key in xrange(1, 100):
      UpdateMap(map_fd, key, value)
    for key in xrange(1, 100):
      self.assertEquals(LookupMap(map_fd, key).value, value)
    self.assertRaisesErrno(errno.ENOENT, LookupMap, map_fd, 101)
    key = 0
    count = 0
    while 1:
      if count == 99:
        self.assertRaisesErrno(errno.ENOENT, GetNextKey, map_fd, key)
        break
      else:
        result = GetNextKey(map_fd, key)
        key = result.value
        self.assertGreater(key, 0)
        self.assertEquals(LookupMap(map_fd, key).value, value)
        count += 1

  def testProgLoad(self):
    bpf_prog = BpfMov64Reg(BPF_REG_6, BPF_REG_1)
    bpf_prog += BpfLdxMem(BPF_W, BPF_REG_0, BPF_REG_6, 0)
    bpf_prog += BpfExitInsn()
    insn_buff = ctypes.create_string_buffer(bpf_prog)
    # Load a program that does nothing except pass every packet it receives
    # It should not block the packet transmission otherwise the test fails.
    prog_fd = BpfProgLoad(BPF_PROG_TYPE_SOCKET_FILTER,
                          ctypes.addressof(insn_buff),
                          len(insn_buff), BpfInsn._length)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    sock.settimeout(1)
    BpfProgAttach(sock.fileno(), prog_fd)
    addr = "127.0.0.1"
    sock.bind((addr, 0))
    addr = sock.getsockname()
    sockaddr = csocket.Sockaddr(addr)
    sock.sendto("foo", addr)
    data, addr = csocket.Recvfrom(sock, 4096, 0)
    self.assertEqual("foo", data)
    self.assertEqual(sockaddr, addr)

  def testPacketBlock(self):
    bpf_prog = BpfMov64Reg(BPF_REG_6, BPF_REG_1)
    bpf_prog += BpfMov64Imm(BPF_REG_0, 0)
    bpf_prog += BpfExitInsn()
    insn_buff = ctypes.create_string_buffer(bpf_prog)
    prog_fd = BpfProgLoad(BPF_PROG_TYPE_SOCKET_FILTER,
                          ctypes.addressof(insn_buff),
                          len(insn_buff), BpfInsn._length)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    sock.settimeout(1)
    BpfProgAttach(sock.fileno(), prog_fd)
    addr = "127.0.0.1"
    sock.bind((addr, 0))
    addr = sock.getsockname()
    sock.sendto("foo", addr)
    self.assertRaisesErrno(errno.EAGAIN, csocket.Recvfrom, sock, 4096, 0)

  def testPacketCount(self):
    map_fd = CreateMap(BPF_MAP_TYPE_HASH, 4, 4, 100)
    key = 0xf0f0
    bpf_prog = BpfMov64Reg(BPF_REG_6, BPF_REG_1)
    bpf_prog += BpfLoadMapFd(map_fd, BPF_REG_1)
    bpf_prog += BpfMov64Imm(BPF_REG_7, key)
    bpf_prog += BpfStxMem(BPF_W, BPF_REG_10, BPF_REG_7, -4)
    bpf_prog += BpfMov64Reg(BPF_REG_8, BPF_REG_10)
    bpf_prog += BpfAlu64Imm(BPF_ADD, BPF_REG_8, -4)
    bpf_prog += BpfMov64Reg(BPF_REG_2, BPF_REG_8)
    bpf_prog += BpfFuncLookupMap()
    bpf_prog += BpfJumpImm(BPF_AND, BPF_REG_0, 0, 10)
    bpf_prog += BpfLoadMapFd(map_fd, BPF_REG_1)
    bpf_prog += BpfMov64Reg(BPF_REG_2, BPF_REG_8)
    bpf_prog += BpfStMem(BPF_W, BPF_REG_10, -8, 1)
    bpf_prog += BpfMov64Reg(BPF_REG_3, BPF_REG_10)
    bpf_prog += BpfAlu64Imm(BPF_ADD, BPF_REG_3, -8)
    bpf_prog += BpfMov64Imm(BPF_REG_4, 0)
    bpf_prog += BpfFuncUpdateMap()
    bpf_prog += BpfLdxMem(BPF_W, BPF_REG_0, BPF_REG_6, 0)
    bpf_prog += BpfExitInsn()
    bpf_prog += BpfMov64Reg(BPF_REG_2, BPF_REG_0)
    bpf_prog += BpfMov64Imm(BPF_REG_1, 1)
    bpf_prog += BpfRawInsn(BPF_STX | BPF_XADD | BPF_W, BPF_REG_2, BPF_REG_1,
                           0, 0)
    bpf_prog += BpfLdxMem(BPF_W, BPF_REG_0, BPF_REG_6, 0)
    bpf_prog += BpfExitInsn()
    insn_buff = ctypes.create_string_buffer(bpf_prog)
    # this program loaded is used to counting the packet transmitted through
    # a target socket. It will store the packet count into the eBPF map and we
    # will verify if the counting result is correct.
    prog_fd = BpfProgLoad(BPF_PROG_TYPE_SOCKET_FILTER,
                          ctypes.addressof(insn_buff),
                          len(insn_buff), BpfInsn._length)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    sock.settimeout(1)
    BpfProgAttach(sock.fileno(), prog_fd)
    addr = "127.0.0.1"
    sock.bind((addr, 0))
    addr = sock.getsockname()
    sockaddr = csocket.Sockaddr(addr)
    packet_count = 100
    for i in xrange(packet_count):
      sock.sendto("foo", addr)
      data, retaddr = csocket.Recvfrom(sock, 4096, 0)
      self.assertEqual("foo", data)
      self.assertEqual(sockaddr, retaddr)
    self.assertEquals(LookupMap(map_fd, key).value, packet_count)


if __name__ == "__main__":
  unittest.main()