# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test times."""
import collections
import logging
import threading
import time as _time
import grpc
import grpc_testing
logging.basicConfig()
_LOGGER = logging.getLogger(__name__)
def _call(behaviors):
for behavior in behaviors:
try:
behavior()
except Exception: # pylint: disable=broad-except
_LOGGER.exception('Exception calling behavior "%r"!', behavior)
def _call_in_thread(behaviors):
calling = threading.Thread(target=_call, args=(behaviors,))
calling.start()
# NOTE(nathaniel): Because this function is called from "strict" Time
# implementations, it blocks until after all behaviors have terminated.
calling.join()
class _State(object):
def __init__(self):
self.condition = threading.Condition()
self.times_to_behaviors = collections.defaultdict(list)
class _Delta(
collections.namedtuple('_Delta', (
'mature_behaviors',
'earliest_mature_time',
'earliest_immature_time',
))):
pass
def _process(state, now):
mature_behaviors = []
earliest_mature_time = None
while state.times_to_behaviors:
earliest_time = min(state.times_to_behaviors)
if earliest_time <= now:
if earliest_mature_time is None:
earliest_mature_time = earliest_time
earliest_mature_behaviors = state.times_to_behaviors.pop(
earliest_time)
mature_behaviors.extend(earliest_mature_behaviors)
else:
earliest_immature_time = earliest_time
break
else:
earliest_immature_time = None
return _Delta(mature_behaviors, earliest_mature_time,
earliest_immature_time)
class _Future(grpc.Future):
def __init__(self, state, behavior, time):
self._state = state
self._behavior = behavior
self._time = time
self._cancelled = False
def cancel(self):
with self._state.condition:
if self._cancelled:
return True
else:
behaviors_at_time = self._state.times_to_behaviors.get(
self._time)
if behaviors_at_time is None:
return False
else:
behaviors_at_time.remove(self._behavior)
if not behaviors_at_time:
self._state.times_to_behaviors.pop(self._time)
self._state.condition.notify_all()
self._cancelled = True
return True
def cancelled(self):
with self._state.condition:
return self._cancelled
def running(self):
raise NotImplementedError()
def done(self):
raise NotImplementedError()
def result(self, timeout=None):
raise NotImplementedError()
def exception(self, timeout=None):
raise NotImplementedError()
def traceback(self, timeout=None):
raise NotImplementedError()
def add_done_callback(self, fn):
raise NotImplementedError()
class StrictRealTime(grpc_testing.Time):
def __init__(self):
self._state = _State()
self._active = False
self._calling = None
def _activity(self):
while True:
with self._state.condition:
while True:
now = _time.time()
delta = _process(self._state, now)
self._state.condition.notify_all()
if delta.mature_behaviors:
self._calling = delta.earliest_mature_time
break
self._calling = None
if delta.earliest_immature_time is None:
self._active = False
return
else:
timeout = max(0, delta.earliest_immature_time - now)
self._state.condition.wait(timeout=timeout)
_call(delta.mature_behaviors)
def _ensure_called_through(self, time):
with self._state.condition:
while ((self._state.times_to_behaviors and
min(self._state.times_to_behaviors) < time) or
(self._calling is not None and self._calling < time)):
self._state.condition.wait()
def _call_at(self, behavior, time):
with self._state.condition:
self._state.times_to_behaviors[time].append(behavior)
if self._active:
self._state.condition.notify_all()
else:
activity = threading.Thread(target=self._activity)
activity.start()
self._active = True
return _Future(self._state, behavior, time)
def time(self):
return _time.time()
def call_in(self, behavior, delay):
return self._call_at(behavior, _time.time() + delay)
def call_at(self, behavior, time):
return self._call_at(behavior, time)
def sleep_for(self, duration):
time = _time.time() + duration
_time.sleep(duration)
self._ensure_called_through(time)
def sleep_until(self, time):
_time.sleep(max(0, time - _time.time()))
self._ensure_called_through(time)
class StrictFakeTime(grpc_testing.Time):
def __init__(self, time):
self._state = _State()
self._time = time
def time(self):
return self._time
def call_in(self, behavior, delay):
if delay <= 0:
_call_in_thread((behavior,))
else:
with self._state.condition:
time = self._time + delay
self._state.times_to_behaviors[time].append(behavior)
return _Future(self._state, behavior, time)
def call_at(self, behavior, time):
with self._state.condition:
if time <= self._time:
_call_in_thread((behavior,))
else:
self._state.times_to_behaviors[time].append(behavior)
return _Future(self._state, behavior, time)
def sleep_for(self, duration):
if 0 < duration:
with self._state.condition:
self._time += duration
delta = _process(self._state, self._time)
_call_in_thread(delta.mature_behaviors)
def sleep_until(self, time):
with self._state.condition:
if self._time < time:
self._time = time
delta = _process(self._state, self._time)
_call_in_thread(delta.mature_behaviors)