"""
Throttling classes implementing various throttling policies.
"""
import time
from . import module
from .datetime import total_seconds
from .types import Missing
_all = module.All(globals())
[docs]class TimeThrottle(object):
"""
Time based throttling class.
>>> import datetime
>>> def slow_range(*args):
... for i in range(*args):
... time.sleep(0.5)
... yield i
>>> t1, t2 = (TimeThrottle(t) for t in (datetime.timedelta(minutes=1), 0.2))
>>> list(t1.throttle_iterable((i, i) for i in range(10)))
[(0, 0), (9, 9)]
>>> list(t2.throttle_iterable(slow_range(3)))
[0, 1, 2]
"""
[docs] def __init__(self, min_time_delta):
self.min_time_delta = total_seconds(min_time_delta)
self.previous_time = None
[docs] def is_throttled(self):
"""Return a boolean indicating if you should throttle."""
if not self.previous_time:
self._update()
return False
if time.time() - self.previous_time >= total_seconds(self.min_time_delta):
self._update()
return False
return True
[docs] def throttle_iterable(self, objects, callback=lambda o: None):
"""
Consume and skips some objects to yield them at defined `min_delay`. First and last objects
are always returned.
* Set `callback` to a callable with the signature ``is_throttled_args = callback(object)``.
Used by subclasses.
"""
current_object = Missing
for obj in objects:
current_object = obj
args = callback(obj) or ()
if not self.is_throttled(*args):
current_object = Missing
yield obj
if current_object is not Missing:
yield current_object
def _update(self):
self.previous_time = time.time()
[docs]class TimeAndRatioThrottle(TimeThrottle):
"""
Time and ratio based throttling class.
>>> import datetime
>>> def slow_range(*args):
... for i in range(*args):
... time.sleep(0.5)
... yield i
>>> t1, t2 = (TimeAndRatioThrottle(0.3, t, 10*t) for t in (datetime.timedelta(minutes=1), 0.4))
>>> list(t1.throttle_iterable(list(range(9)), lambda i: [i/9]))
[0, 8]
>>> list(t2.throttle_iterable(slow_range(9), lambda i: [i/9]))
[0, 3, 6, 8]
"""
[docs] def __init__(self, min_ratio_delta, min_time_delta, max_time_delta):
super().__init__(min_time_delta)
self.min_ratio_delta = total_seconds(min_ratio_delta)
self.max_time_delta = total_seconds(max_time_delta)
self.previous_ratio = 0
[docs] def is_throttled(self, ratio): # pylint:disable=arguments-differ
"""Return a boolean indicating if you should throttle."""
if not self.previous_time:
self._update(ratio)
return False
ratio_delta = ratio - self.previous_ratio
time_delta = time.time() - self.previous_time
if (
ratio_delta > self.min_ratio_delta
and time_delta > self.min_time_delta
or time_delta > self.max_time_delta
):
self._update(ratio)
return False
return True
def _update(self, ratio): # pylint:disable=arguments-differ
super()._update()
self.previous_ratio = ratio
__all__ = _all.diff(globals())