普通文本  |  364行  |  11.66 KB

#    Copyright 2015-2017 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest
from trappy.stats.Topology import Topology
from trappy.stats.Trigger import Trigger
from trappy.stats.Aggregator import MultiTriggerAggregator

import collections
import trappy
from trappy.base import Base
import pandas as pd
from pandas.util.testing import assert_series_equal


class TestTopology(unittest.TestCase):

    def test_add_to_level(self):
        """Test level creation"""

        level_groups = [[1, 2], [0, 3, 4, 5]]
        level = "test_level"
        topology = Topology()
        topology.add_to_level(level, level_groups)
        check_groups = topology.get_level(level)

        self.assertTrue(topology.has_level(level))
        self.assertEqual(level_groups, check_groups)

    def test_flatten(self):
        """Test Topology: flatten"""

        level_groups = [[1, 2], [0, 3, 4, 5]]
        level = "test_level"
        topology = Topology()
        topology.add_to_level(level, level_groups)
        flattened = [0, 1, 2, 3, 4, 5]

        self.assertEqual(flattened, topology.flatten())

    def test_cpu_topology_construction(self):
        """Test CPU Topology Construction"""

        cluster_0 = [0, 3, 4, 5]
        cluster_1 = [1, 2]
        clusters = [cluster_0, cluster_1]
        topology = Topology(clusters=clusters)

        # Check cluster level creation
        cluster_groups = [[0, 3, 4, 5], [1, 2]]
        self.assertTrue(topology.has_level("cluster"))
        self.assertEqual(cluster_groups, topology.get_level("cluster"))

        # Check cpu level creation
        cpu_groups = [[0], [1], [2], [3], [4], [5]]
        self.assertTrue(topology.has_level("cpu"))
        self.assertEqual(cpu_groups, topology.get_level("cpu"))

        # Check "all" level
        all_groups = [[0, 1, 2, 3, 4, 5]]
        self.assertEqual(all_groups, topology.get_level("all"))

    def test_level_span(self):
        """TestTopology: level_span"""

        level_groups = [[1, 2], [0, 3, 4, 5]]
        level = "test_level"
        topology = Topology()
        topology.add_to_level(level, level_groups)

        self.assertEqual(topology.level_span(level), 2)

    def test_group_index(self):
        """TestTopology: get_index"""

        level_groups = [[1, 2], [0, 3, 4, 5]]
        level = "test_level"
        topology = Topology()
        topology.add_to_level(level, level_groups)

        self.assertEqual(topology.get_index(level, [1, 2]), 0)
        self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1)

class BaseTestStats(unittest.TestCase):
    def setUp(self):
        trace = trappy.BareTrace()
        data = {

            "identifier": [
                0,
                0,
                0,
                1,
                1,
                1,
            ],
            "result": [
                "fire",
                "blank",
                "fire",
                "blank",
                "fire",
                "blank",
            ],
        }

        index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time")
        data_frame = pd.DataFrame(data, index=index)
        trace.add_parsed_event("aim_and_fire", data_frame)
        self._trace = trace
        self.topology = Topology(clusters=[[0], [1]])


class TestTrigger(BaseTestStats):

    def test_trigger_generation(self):
        """TestTrigger: generate"""

        filters = {
            "result": "fire"
        }

        event_class = self._trace.aim_and_fire
        value = 1
        pivot = "identifier"

        trigger = Trigger(self._trace,
                          event_class,
                          filters,
                          value,
                          pivot)

        expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time"))
        assert_series_equal(expected, trigger.generate(0))

        expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
        assert_series_equal(expected, trigger.generate(1))

    def test_trigger_with_func(self):
        """Trigger works with a function or lambda as filter"""

        def my_filter(val):
            return val.startswith("fi")

        trigger = Trigger(self._trace, self._trace.aim_and_fire,
                          filters={"result": my_filter}, value=1,
                          pivot="identifier")

        expected = pd.Series([1], index=pd.Index([0.5], name="Time"))
        assert_series_equal(expected, trigger.generate(1))

        my_filters = {"result": lambda x: x.startswith("bl")}
        trigger = Trigger(self._trace, self._trace.aim_and_fire,
                          filters=my_filters, value=1, pivot="identifier")

        expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time"))
        assert_series_equal(expected, trigger.generate(1))

    def test_trigger_with_callable_class(self):
        """Trigger works with a callable class as filter"""

        class my_filter(object):
            def __init__(self, val_out):
                self.prev_val = 0
                self.val_out = val_out

            def __call__(self, val):
                ret = self.prev_val == self.val_out
                self.prev_val = val

                return ret

        trigger = Trigger(self._trace, self._trace.aim_and_fire,
                          filters={"identifier": my_filter(1)}, value=1,
                          pivot="result")

        expected = pd.Series([1], index=pd.Index([0.6], name="Time"))
        assert_series_equal(expected, trigger.generate("blank"))

    def test_filter_prev_values(self):
        """Trigger works with a filter that depends on previous values of the same pivot"""

        # We generate an example in which we want a trigger whenever the
        # identifier is no longer 1 for blank

        class my_filter(object):
            def __init__(self, val_out):
                self.prev_val = 0
                self.val_out = val_out

            def __call__(self, val):
                ret = self.prev_val == self.val_out
                self.prev_val = val

                return ret

        trace = trappy.BareTrace()
        data = collections.OrderedDict([
            (0.1, ["blank", 1]),
            (0.2, ["fire",  1]),
            (0.3, ["blank", 0]), # value is no longer 1, trigger
            (0.4, ["blank", 1]),
            (0.5, ["fire",  0]), # This should NOT trigger
            (0.6, ["blank", 0]), # value is no longer 1 for blank, trigger
        ])
        data_frame = pd.DataFrame.from_dict(data, orient="index", )
        data_frame.columns = ["result", "identifier"]
        trace.add_parsed_event("aim_and_fire", data_frame)

        trigger = Trigger(trace, trace.aim_and_fire,
                          filters={"identifier": my_filter(1)}, value=-1,
                          pivot="result")

        expected = pd.Series([-1, -1], index=[0.3, 0.6])
        assert_series_equal(expected, trigger.generate("blank"))



class TestAggregator(BaseTestStats):

    def test_scalar_aggfunc_single_trigger(self):
        """TestAggregator: 1 trigger scalar aggfunc"""

        def aggfunc(series):
            return series.sum()

        filters = {
            "result": "fire"
        }

        event_class = self._trace.aim_and_fire
        value = 1
        pivot = "identifier"

        trigger = Trigger(self._trace,
                          event_class,
                          filters,
                          value,
                          pivot)

        aggregator = MultiTriggerAggregator([trigger],
                        self.topology,
                        aggfunc=aggfunc)

        # There are three "fire" in total
        # The all level in topology looks like
        # [[0, 1]]
        result = aggregator.aggregate(level="all")
        self.assertEqual(result, [3.0])

        # There are two "fire" on the first node group and a
        # a single "fire" on the second node group at the cluster
        # level which looks like
        # [[0], [1]]
        result = aggregator.aggregate(level="cluster")
        self.assertEqual(result, [2.0, 1.0])

    def test_vector_aggfunc_single_trigger(self):
        """TestAggregator: 1 trigger vector aggfunc"""

        def aggfunc(series):
            return series.cumsum()

        filters = {
            "result": "fire"
        }

        event_class = self._trace.aim_and_fire
        value = 1
        pivot = "identifier"

        trigger = Trigger(self._trace, event_class, filters, value, pivot)

        aggregator = MultiTriggerAggregator([trigger],
                        self.topology,
                        aggfunc=aggfunc)

        # There are three "fire" in total
        # The all level in topology looks like
        # [[0, 1]]
        result = aggregator.aggregate(level="all")
        expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0],
                                      index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
                          )
        assert_series_equal(result[0], expected_result)

    def test_vector_aggfunc_multiple_trigger(self):
        """TestAggregator: multi trigger vector aggfunc"""

        def aggfunc(series):
            return series.cumsum()

        filters = {
            "result": "fire"
        }

        event_class = self._trace.aim_and_fire
        value = 1
        pivot = "identifier"

        trigger_fire = Trigger(self._trace,
                          event_class,
                          filters,
                          value,
                          pivot)

        filters = {
            "result": "blank"
        }
        value = -1
        trigger_blank = Trigger(self._trace, event_class, filters, value,
                                pivot)

        aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
                        self.topology,
                        aggfunc=aggfunc)

        # There are three "fire" in total
        # The all level in topology looks like
        # [[0, 1]]
        result = aggregator.aggregate(level="all")
        expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0],
                                      index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6])
                          )
        assert_series_equal(result[0], expected_result)

    def test_default_aggfunc_multiple_trigger(self):
        """MultiTriggerAggregator with the default aggfunc"""

        trigger_fire = Trigger(self._trace, self._trace.aim_and_fire,
                          filters={"result": "fire"},
                          pivot="identifier", value=1)

        trigger_blank = Trigger(self._trace, self._trace.aim_and_fire,
                          filters={"result": "blank"},
                          pivot="identifier", value=2)

        aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank],
                                            self.topology)

        results = aggregator.aggregate(level="cpu")
        expected_results = [
            pd.Series([1., 2., 1., 0., 0., 0.],
                      index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
            pd.Series([0., 0., 0., 2., 1., 2.],
                      index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]),
            ]

        self.assertEquals(len(results), len(expected_results))
        for result, expected in zip(results, expected_results):
            assert_series_equal(result, expected)