Source code for pytoolbox.collections

import collections, math

from . import module
from .datetime import total_seconds

_all = module.All(globals())


[docs]class EventsTable(object): """Scan a spare events table and replace missing entry by previous (non empty) entry."""
[docs] def __init__(self, sparse_events_table, time_range, time_speedup, sleep_factor=1.0): self.time_range = time_range self.time_speedup = time_speedup self.sleep_factor = sleep_factor previous_event = sparse_events_table[0] self.events = {} for index in range(self.time_range): # noqa event = sparse_events_table.get(index, previous_event) self.events[index] = event previous_event = event
[docs] def get(self, time, time_speedup=None, default_value=None): # """ # >>> def test_get_index(time_range, time_speedup): # ... table = EventsTable({0: 'salut'}, time_range, time_speedup) # ... modulo = previous = 0 # ... for t in range(24 * 3600 + 1): # ... index = table.get(t+2*60, time_range, time_speedup)[0] # ... if previous > index: # ... modulo += 1 # ... assert 0 <= index < time_range # ... previous = index # ... assert modulo == time_speedup # Test get_index with a speedup of 1440 (maps 1 minute to 24 hours): # >>> test_get_index(24, 24 * 60) # Test get_index with a speedup of 12 (maps 2 hours to 24 hours): # >>> test_get_index(24, 12) # """ speedup = time_speedup or self.time_speedup index = int(total_seconds(time) * (speedup / 3600) % self.time_range) return index, self.events.get(index, default_value)
[docs] def sleep_time(self, time, time_speedup=None, sleep_factor=None): """ Return required sleep time to wait for next scheduled event. **Example usage** >>> table = EventsTable({0: 'salut'}, 24, 60) >>> table.sleep_time(1) 59 >>> table.sleep_time(58) 2 >>> table.sleep_time(60) 60 >>> table.sleep_time(62) 58 >>> table.sleep_time(3590, time_speedup=1) 10 >>> table.sleep_time(12543, time_speedup=1) 1857 >>> table.sleep_time(12543, time_speedup=1, sleep_factor=2) 57 >>> table.sleep_time(12600, time_speedup=1, sleep_factor=2) 1800 >>> table.sleep_time(1, time_speedup=60, sleep_factor=1) 59 >>> table.sleep_time(1, time_speedup=60, sleep_factor=2) 29 >>> table.sleep_time(30, time_speedup=60, sleep_factor=2) 30 >>> table.time_range = 1 >>> table.sleep_time(1, time_speedup=1) 149 """ # 150 = 3600 / 24 speedup = time_speedup or self.time_speedup factor = sleep_factor or self.sleep_factor duration = self.time_range * 150 / (speedup * factor) return math.ceil(duration - (total_seconds(time) % duration))
[docs]class pygal_deque(collections.deque): # pylint:disable=invalid-name """ A deque None'ing duplicated values to produce nicer `pygal <pygal.org>`_ line charts (e.g. 5555322211111 -> 5__532_21___1). .. warning:: Not yet implemented: * appendleft(x) * clear() * count(x) * extend(iterable) * extendleft(iterable) * pop() * popleft() * remove(value) * reverse() * rotate(n) """ last = None
[docs] def append(self, value): # pylint:disable=arguments-renamed if value != self.last and value is not None: try: self[-1] = self.last except Exception: # pylint:disable=broad-except pass self.last = value else: value = None try: first = self[0] except IndexError: first = None super().append(value) if self[0] is None: self[0] = first
[docs] def list(self, fill=False): self_list = list(self) try: if self.last is not None: self_list[-1] = self.last except Exception: # pylint:disable=broad-except pass if fill and self_list: previous = None for index, _ in enumerate(self_list): if self_list[index] is None: # pylint:disable=unnecessary-list-index-lookup self_list[index] = previous else: previous = self_list[index] return self_list
[docs]def flatten_dict(the_dict, key_template='{0}.{1}'): """ Flatten the keys of a nested dictionary. Nested keys will be appended iteratively using given `key_template`. **Example usage** >>> flatten_dict({'a': 'b', 'c': 'd'}) {'a': 'b', 'c': 'd'} >>> flatten_dict({'a': {'b': {'c': ['d', 'e']}, 'f': 'g'}}) {'a.b.c': ['d', 'e'], 'a.f': 'g'} >>> flatten_dict({'a': {'b': {'c': ['d', 'e']}, 'f': 'g'}}, '{1}-{0}') {'c-b-a': ['d', 'e'], 'f-a': 'g'} """ def expand_item(key, value): if isinstance(value, dict): return [ (key_template.format(key, k), v) for k, v in flatten_dict(value, key_template).items() ] return [(key, value)] return dict(item for k, v in the_dict.items() for item in expand_item(k, v))
[docs]def merge_dicts(*dicts): """ Return a dictionary from multiple dictionaries. .. warning:: This operation is not commutative. **Example usage** >>> merge_dicts({'a': 1, 'b': 2}, {'b': 3, 'c': 4}, {'c': 5}) {'a': 1, 'b': 3, 'c': 5} >>> merge_dicts({'c': 5}, {'b': 3, 'c': 4}, {'a': 1, 'b': 2}) {'c': 4, 'b': 2, 'a': 1} """ merged_dict = {} for the_dict in dicts: merged_dict.update(the_dict) return merged_dict
[docs]def swap_dict_of_values(the_dict, type=set, method=set.add): # pylint:disable=redefined-builtin """ Return a dictionary (:class:`collections.defaultdict`) with keys and values swapped. This algorithm expect that the values are a container with objects, not a single object. Set type to None if values are unique and you want keys to be the values. **Example usage** Simple swap: >>> swap_dict_of_values({'odd': [1, 3], 'even': (0, 2)}, type=None) {1: 'odd', 3: 'odd', 0: 'even', 2: 'even'} Complex swap: >>> def S(value): ... return {k: sorted(v) for k, v in sorted(value.items())} ... >>> S(swap_dict_of_values( ... {'odd': [1, 3], 'even': (0, 2), 'fib': {1, 2, 3}}, ... type=list, ... method=list.append)) {0: ['even'], 1: ['fib', 'odd'], 2: ['even', 'fib'], 3: ['fib', 'odd']} >>> swap_dict_of_values({'o': [1, 3], 'e': (0, 2), 'f': {2, 3}}, method='add')[2] == {'f', 'e'} True >>> swap_dict_of_values({'bad': 'ab', 'example': 'ab'})['a'] == {'bad', 'example'} True """ if type is None: reversed_dict = {} for key, values in the_dict.items(): for value in values: reversed_dict[value] = key else: method = getattr(type, method) if isinstance(method, str) else method reversed_dict = collections.defaultdict(type) for key, values in the_dict.items(): for value in values: method(reversed_dict[value], key) return reversed_dict
[docs]def to_dict_of_values(iterable, type=list, method=list.append): # pylint:disable=redefined-builtin """ Return a dictionary (:class:`collections.defaultdict`) with key, value pairs merged as key -> values. **Example usage** >>> from pytoolbox.unittest import asserts >>> asserts.dict_equal( ... to_dict_of_values([('odd', 1), ('odd', 3), ('even', 0), ('even', 2)]), ... {'even': [0, 2], 'odd': [1, 3]}) >>> asserts.dict_equal( ... to_dict_of_values((('a', 1), ('a', 1), ('a', 2)), type=set, method=set.add), ... {'a': {1, 2}}) """ dict_of_values = collections.defaultdict(type) for key, value in iterable: method(dict_of_values[key], value) return dict_of_values
[docs]def window(values, index, delta): """ Extract 1+2*`delta` items from `values` centered at `index` and return a tuple with (items, left, right). This function tries to simulate a physical slider, meaning the number of extracted elements is constant but the centering at `index` is not guaranteed. A visual example with 6 `values` and ``delta=1``:: index = 0 [+++]--- left = 0, right = 2 index = 1 [+++]--- left = 0, right = 2 index = 2 -[+++]-- left = 1, right = 3 index = 3 --[+++]- left = 2, right = 4 index = 4 ---[+++] left = 3, right = 5 index = 5 ---[+++] left = 3, right = 5 **Example usage** >>> window(['a'], 0, 2) (['a'], 0, 0) >>> window(['a', 'b', 'c', 'd'], 2, 0) (['c'], 2, 2) >>> window(['a', 'b', 'c', 'd', 'e'], 0, 1) (['a', 'b', 'c'], 0, 2) >>> window(['a', 'b', 'c', 'd', 'e'], 1, 1) (['a', 'b', 'c'], 0, 2) >>> window(['a', 'b', 'c', 'd', 'e'], 2, 1) (['b', 'c', 'd'], 1, 3) >>> window(['a', 'b', 'c', 'd', 'e'], 3, 1) (['c', 'd', 'e'], 2, 4) >>> window(['a', 'b', 'c', 'd', 'e'], 4, 1) (['c', 'd', 'e'], 2, 4) >>> window(['a', 'b', 'c', 'd', 'e'], 3, 6) (['a', 'b', 'c', 'd', 'e'], 0, 4) >>> data = list(range(20)) >>> window(data, 6, 6) ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 0, 12) >>> window(data, 7, 6) ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], 1, 13) >>> window(data, 10, 6) ([4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], 4, 16) >>> window(data, 19, 6) ([7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], 7, 19) """ length = len(values) left, right = index - delta, index + delta if left < 0: left, right = 0, right - left elif right >= length: left, right = left - right + length - 1, length - 1 left, right = max(left, 0), min(right, length - 1) return values[left:right + 1], left, right
__all__ = _all.diff(globals())