普通文本  |  371行  |  16.61 KB

#/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

import time
import os

from acts.keys import Config
from acts.utils import rand_ascii_str
from acts.test_utils.bt.GattEnum import CharacteristicValueFormat
from acts.test_utils.bt.GattEnum import GattCbStrings
from acts.test_utils.bt.GattEnum import GattCharacteristic
from acts.test_utils.bt.GattEnum import GattDescriptor
from acts.test_utils.bt.GattEnum import GattCbErr
from acts.test_utils.bt.GattEnum import GattTransport
from acts.test_utils.bt.GattEnum import GattEvent
from acts.test_utils.bt.GattEnum import GattServerResponses
from acts.test_utils.bt.GattEnum import GattService
from acts.test_utils.bt.bt_test_utils import TIMEOUT_SMALL

from gatt_test_database import STRING_512BYTES
from acts.utils import exe_cmd
from math import ceil


class GattServerLib():

    characteristic_list = []
    default_timeout = 10
    descriptor_list = []
    dut = None
    gatt_server = None
    gatt_server_callback = None
    gatt_server_list = []
    log = None
    mac_addr = None
    service_list = []
    write_mapping = {}

    def __init__(self, log, mac_addr, dut):
        self.dut = dut
        self.log = log
        self.mac_addr = mac_addr

    def list_all_uuids(self):
        """From the GATT Client, discover services and list all services,
        chars and descriptors.
        """
        self.log.info("Listing Characteristics")
        for characteristic in self.characteristic_list:
            instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId(
                characteristic)
            uuid = self.dut.droid.gattServerGetCharacteristicUuid(
                characteristic)
            self.dut.log.info("GATT Server characteristic handle uuid: {} {}".
                              format(hex(instance_id), uuid))

    def open(self):
        """Open an empty GATT Server instance"""
        self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback(
        )
        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
            self.gatt_server_callback)
        self.gatt_server_list.append(self.gatt_server)

    def clear_services(self):
        """Clear BluetoothGattServices from BluetoothGattServer"""
        self.dut.droid.gattServerClearServices(self.gatt_server)

    def close_bluetooth_gatt_servers(self):
        """Close Bluetooth Gatt Servers"""
        try:
            for btgs in self.gatt_server_list:
                self.dut.droid.gattServerClose(btgs)
        except Exception as err:
            self.log.error(
                "Failed to close Bluetooth GATT Servers: {}".format(err))
        self.characteristic_list = []
        self.descriptor_list = []
        self.gatt_server_list = []
        self.service_list = []

    def characteristic_set_value_by_instance_id(self, instance_id, value):
        """Set Characteristic value by instance id"""
        self.dut.droid.gattServerCharacteristicSetValueByInstanceId(
            int(instance_id, 16), value)

    def notify_characteristic_changed(self, instance_id, confirm):
        """ Notify characteristic changed """
        self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId(
            self.gatt_server, 0, int(instance_id, 16), confirm)

    def send_response(self, user_input):
        """Send a single response to the GATT Client"""
        args = user_input.split()
        mtu = 23
        if len(args) == 2:
            user_input = args[0]
            mtu = int(args[1])
        desc_read = GattEvent.DESC_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        desc_write = GattEvent.DESC_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_read = GattEvent.CHAR_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_write = GattEvent.CHAR_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        execute_write = GattEvent.EXEC_WRITE.value['evt'].format(
            self.gatt_server_callback)
        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
                                          char_write, execute_write)
        events = self.dut.ed.pop_events(regex, 5, TIMEOUT_SMALL)
        status = 0
        if user_input:
            status = GattServerResponses.get(user_input)
        for event in events:
            self.log.debug("Found event: {}.".format(event))
            request_id = event['data']['requestId']
            if event['name'] == execute_write:
                if ('execute' in event['data'] and
                        event['data']['execute'] == True):
                    for key in self.write_mapping:
                        value = self.write_mapping[key]
                        self.log.info(
                            "Writing key, value: {}, {}".format(key, value))
                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
                            key, value)
                else:
                    self.log.info("Execute result is false")
                self.write_mapping = {}
                self.dut.droid.gattServerSendResponse(
                    self.gatt_server, 0, request_id, status, 0, [])
                continue
            offset = event['data']['offset']
            instance_id = event['data']['instanceId']
            if (event['name'] == desc_write or event['name'] == char_write):
                if ('preparedWrite' in event['data'] and
                        event['data']['preparedWrite'] == True):
                    value = event['data']['value']
                    if instance_id in self.write_mapping.keys():
                        self.write_mapping[
                            instance_id] = self.write_mapping[instance_id] + value
                        self.log.info(
                            "New Prepared Write Value for {}: {}".format(
                                instance_id, self.write_mapping[instance_id]))
                    else:
                        self.log.info("write mapping key, value {}, {}".format(
                            instance_id, value))
                        self.write_mapping[instance_id] = value
                        self.log.info(
                            "current value {}, {}".format(instance_id, value))
                    self.dut.droid.gattServerSendResponse(
                        self.gatt_server, 0, request_id, status, 0, value)
                    continue
                else:
                    self.dut.droid.gattServerSetByteArrayValueByInstanceId(
                        event['data']['instanceId'], event['data']['value'])

            try:
                data = self.dut.droid.gattServerGetReadValueByInstanceId(
                    int(event['data']['instanceId']))
            except Exception as err:
                self.log.error(err)
            if not data:
                data = [1]
            self.log.info(
                "GATT Server Send Response [request_id, status, offset, data]" \
                " [{}, {}, {}, {}]".
                format(request_id, status, offset, data))
            data = data[offset:offset + mtu - 1]
            self.dut.droid.gattServerSendResponse(
                self.gatt_server, 0, request_id, status, offset, data)

    def _setup_service(self, serv):
        service = self.dut.droid.gattServerCreateService(
            serv['uuid'], serv['type'])
        if 'handles' in serv:
            self.dut.droid.gattServerServiceSetHandlesToReserve(
                service, serv['handles'])
        return service

    def _setup_characteristic(self, char):
        characteristic = \
            self.dut.droid.gattServerCreateBluetoothGattCharacteristic(
                char['uuid'], char['properties'], char['permissions'])
        if 'instance_id' in char:
            self.dut.droid.gattServerCharacteristicSetInstanceId(
                characteristic, char['instance_id'])
            set_id = self.dut.droid.gattServerCharacteristicGetInstanceId(
                characteristic)
            if set_id != char['instance_id']:
                self.log.error(
                    "Instance ID did not match up. Found {} Expected {}".
                    format(set_id, char['instance_id']))
        if 'value_type' in char:
            value_type = char['value_type']
            value = char['value']
            if value_type == CharacteristicValueFormat.STRING.value:
                self.log.info("Set String value result: {}".format(
                    self.dut.droid.gattServerCharacteristicSetStringValue(
                        characteristic, value)))
            elif value_type == CharacteristicValueFormat.BYTE.value:
                self.log.info("Set Byte Array value result: {}".format(
                    self.dut.droid.gattServerCharacteristicSetByteValue(
                        characteristic, value)))
            else:
                self.log.info("Set Int value result: {}".format(
                    self.dut.droid.gattServerCharacteristicSetIntValue(
                        characteristic, value, value_type, char['offset'])))
        return characteristic

    def _setup_descriptor(self, desc):
        descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor(
            desc['uuid'], desc['permissions'])
        if 'value' in desc:
            self.dut.droid.gattServerDescriptorSetByteValue(
                descriptor, desc['value'])
        if 'instance_id' in desc:
            self.dut.droid.gattServerDescriptorSetInstanceId(
                descriptor, desc['instance_id'])
        self.descriptor_list.append(descriptor)
        return descriptor

    def setup_gatts_db(self, database):
        """Setup GATT Server database"""
        self.gatt_server_callback = \
            self.dut.droid.gattServerCreateGattServerCallback()
        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
            self.gatt_server_callback)
        self.gatt_server_list.append(self.gatt_server)
        for serv in database['services']:
            service = self._setup_service(serv)
            self.service_list.append(service)
            if 'characteristics' in serv:
                for char in serv['characteristics']:
                    characteristic = self._setup_characteristic(char)
                    if 'descriptors' in char:
                        for desc in char['descriptors']:
                            descriptor = self._setup_descriptor(desc)
                            self.dut.droid.gattServerCharacteristicAddDescriptor(
                                characteristic, descriptor)
                    self.characteristic_list.append(characteristic)
                    self.dut.droid.gattServerAddCharacteristicToService(
                        service, characteristic)
            self.dut.droid.gattServerAddService(self.gatt_server, service)
            expected_event = GattCbStrings.SERV_ADDED.value.format(
                self.gatt_server_callback)
            self.dut.ed.pop_event(expected_event, 10)

    def send_continuous_response(self, user_input):
        """Send the same response"""
        desc_read = GattEvent.DESC_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        desc_write = GattEvent.DESC_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_read = GattEvent.CHAR_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_write = GattEvent.CHAR_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        execute_write = GattEvent.CHAR_EXEC_WRITE.value['evt'].format(
            self.gatt_server_callback)
        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
                                          char_write, execute_write)
        offset = 0
        status = 0
        mtu = 23
        char_value = []
        for i in range(512):
            char_value.append(i % 256)
        len_min = 470
        end_time = time.time() + 180
        i = 0
        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
        while time.time() < end_time:
            events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL)
            for event in events:
                start_offset = i * (mtu - 1)
                i += 1
                self.log.debug("Found event: {}.".format(event))
                request_id = event['data']['requestId']
                data = char_value[start_offset:start_offset + mtu - 1]
                if not data:
                    data = [1]
                self.log.debug(
                    "GATT Server Send Response [request_id, status, offset, " \
                    "data] [{}, {}, {}, {}]".format(request_id, status, offset,
                        data))
                self.dut.droid.gattServerSendResponse(
                    self.gatt_server, 0, request_id, status, offset, data)

    def send_continuous_response_data(self, user_input):
        """Send the same response with data"""
        desc_read = GattEvent.DESC_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        desc_write = GattEvent.DESC_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_read = GattEvent.CHAR_READ_REQ.value['evt'].format(
            self.gatt_server_callback)
        char_write = GattEvent.CHAR_WRITE_REQ.value['evt'].format(
            self.gatt_server_callback)
        execute_write = GattEvent.EXEC_WRITE.value['evt'].format(
            self.gatt_server_callback)
        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
                                          char_write, execute_write)
        offset = 0
        status = 0
        mtu = 11
        char_value = []
        len_min = 470
        end_time = time.time() + 180
        i = 0
        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
        while time.time() < end_time:
            events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL)
            for event in events:
                self.log.info(event)
                request_id = event['data']['requestId']
                if event['name'] == execute_write:
                    if ('execute' in event['data'] and
                            event['data']['execute'] == True):
                        for key in self.write_mapping:
                            value = self.write_mapping[key]
                            self.log.debug("Writing key, value: {}, {}".format(
                                key, value))
                            self.dut.droid.gattServerSetByteArrayValueByInstanceId(
                                key, value)
                        self.write_mapping = {}
                    self.dut.droid.gattServerSendResponse(
                        self.gatt_server, 0, request_id, status, 0, [1])
                    continue
                offset = event['data']['offset']
                instance_id = event['data']['instanceId']
                if (event['name'] == desc_write or
                        event['name'] == char_write):
                    if ('preparedWrite' in event['data'] and
                            event['data']['preparedWrite'] == True):
                        value = event['data']['value']
                        if instance_id in self.write_mapping:
                            self.write_mapping[
                                instance_id] = self.write_mapping[instance_id] + value
                        else:
                            self.write_mapping[instance_id] = value
                    else:
                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
                            event['data']['instanceId'],
                            event['data']['value'])
                try:
                    data = self.dut.droid.gattServerGetReadValueByInstanceId(
                        int(event['data']['instanceId']))
                except Exception as err:
                    self.log.error(err)
                if not data:
                    self.dut.droid.gattServerSendResponse(
                        self.gatt_server, 0, request_id, status, offset, [1])
                else:
                    self.dut.droid.gattServerSendResponse(
                        self.gatt_server, 0, request_id, status, offset,
                        data[offset:offset + 17])