普通文本  |  230行  |  7.04 KB

# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test times."""

import collections
import logging
import threading
import time as _time

import grpc
import grpc_testing

logging.basicConfig()
_LOGGER = logging.getLogger(__name__)


def _call(behaviors):
    for behavior in behaviors:
        try:
            behavior()
        except Exception:  # pylint: disable=broad-except
            _LOGGER.exception('Exception calling behavior "%r"!', behavior)


def _call_in_thread(behaviors):
    calling = threading.Thread(target=_call, args=(behaviors,))
    calling.start()
    # NOTE(nathaniel): Because this function is called from "strict" Time
    # implementations, it blocks until after all behaviors have terminated.
    calling.join()


class _State(object):

    def __init__(self):
        self.condition = threading.Condition()
        self.times_to_behaviors = collections.defaultdict(list)


class _Delta(
        collections.namedtuple('_Delta', (
            'mature_behaviors',
            'earliest_mature_time',
            'earliest_immature_time',
        ))):
    pass


def _process(state, now):
    mature_behaviors = []
    earliest_mature_time = None
    while state.times_to_behaviors:
        earliest_time = min(state.times_to_behaviors)
        if earliest_time <= now:
            if earliest_mature_time is None:
                earliest_mature_time = earliest_time
            earliest_mature_behaviors = state.times_to_behaviors.pop(
                earliest_time)
            mature_behaviors.extend(earliest_mature_behaviors)
        else:
            earliest_immature_time = earliest_time
            break
    else:
        earliest_immature_time = None
    return _Delta(mature_behaviors, earliest_mature_time,
                  earliest_immature_time)


class _Future(grpc.Future):

    def __init__(self, state, behavior, time):
        self._state = state
        self._behavior = behavior
        self._time = time
        self._cancelled = False

    def cancel(self):
        with self._state.condition:
            if self._cancelled:
                return True
            else:
                behaviors_at_time = self._state.times_to_behaviors.get(
                    self._time)
                if behaviors_at_time is None:
                    return False
                else:
                    behaviors_at_time.remove(self._behavior)
                    if not behaviors_at_time:
                        self._state.times_to_behaviors.pop(self._time)
                        self._state.condition.notify_all()
                    self._cancelled = True
                    return True

    def cancelled(self):
        with self._state.condition:
            return self._cancelled

    def running(self):
        raise NotImplementedError()

    def done(self):
        raise NotImplementedError()

    def result(self, timeout=None):
        raise NotImplementedError()

    def exception(self, timeout=None):
        raise NotImplementedError()

    def traceback(self, timeout=None):
        raise NotImplementedError()

    def add_done_callback(self, fn):
        raise NotImplementedError()


class StrictRealTime(grpc_testing.Time):

    def __init__(self):
        self._state = _State()
        self._active = False
        self._calling = None

    def _activity(self):
        while True:
            with self._state.condition:
                while True:
                    now = _time.time()
                    delta = _process(self._state, now)
                    self._state.condition.notify_all()
                    if delta.mature_behaviors:
                        self._calling = delta.earliest_mature_time
                        break
                    self._calling = None
                    if delta.earliest_immature_time is None:
                        self._active = False
                        return
                    else:
                        timeout = max(0, delta.earliest_immature_time - now)
                        self._state.condition.wait(timeout=timeout)
            _call(delta.mature_behaviors)

    def _ensure_called_through(self, time):
        with self._state.condition:
            while ((self._state.times_to_behaviors and
                    min(self._state.times_to_behaviors) < time) or
                   (self._calling is not None and self._calling < time)):
                self._state.condition.wait()

    def _call_at(self, behavior, time):
        with self._state.condition:
            self._state.times_to_behaviors[time].append(behavior)
            if self._active:
                self._state.condition.notify_all()
            else:
                activity = threading.Thread(target=self._activity)
                activity.start()
                self._active = True
            return _Future(self._state, behavior, time)

    def time(self):
        return _time.time()

    def call_in(self, behavior, delay):
        return self._call_at(behavior, _time.time() + delay)

    def call_at(self, behavior, time):
        return self._call_at(behavior, time)

    def sleep_for(self, duration):
        time = _time.time() + duration
        _time.sleep(duration)
        self._ensure_called_through(time)

    def sleep_until(self, time):
        _time.sleep(max(0, time - _time.time()))
        self._ensure_called_through(time)


class StrictFakeTime(grpc_testing.Time):

    def __init__(self, time):
        self._state = _State()
        self._time = time

    def time(self):
        return self._time

    def call_in(self, behavior, delay):
        if delay <= 0:
            _call_in_thread((behavior,))
        else:
            with self._state.condition:
                time = self._time + delay
                self._state.times_to_behaviors[time].append(behavior)
        return _Future(self._state, behavior, time)

    def call_at(self, behavior, time):
        with self._state.condition:
            if time <= self._time:
                _call_in_thread((behavior,))
            else:
                self._state.times_to_behaviors[time].append(behavior)
        return _Future(self._state, behavior, time)

    def sleep_for(self, duration):
        if 0 < duration:
            with self._state.condition:
                self._time += duration
                delta = _process(self._state, self._time)
                _call_in_thread(delta.mature_behaviors)

    def sleep_until(self, time):
        with self._state.condition:
            if self._time < time:
                self._time = time
                delta = _process(self._state, self._time)
                _call_in_thread(delta.mature_behaviors)