from collections import defaultdict
from tornado.ioloop import IOLoop, PeriodicCallback
try:
from crick import TDigest
except ImportError:
pass
else:
class Digest:
def __init__(self, loop=None, intervals=(5, 60, 3600)):
self.intervals = intervals
self.components = [TDigest() for i in self.intervals]
self.loop = loop or IOLoop.current()
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
self.loop.add_callback(self._pc.start)
def add(self, item):
self.components[0].add(item)
def update(self, seq):
self.components[0].update(seq)
def shift(self):
for i in range(len(self.intervals) - 1):
frac = 0.2 * self.intervals[0] / self.intervals[i]
part = self.components[i].scale(frac)
rest = self.components[i].scale(1 - frac)
self.components[i + 1].merge(part)
self.components[i] = rest
def size(self):
return sum(d.size() for d in self.components)
class Counter:
def __init__(self, loop=None, intervals=(5, 60, 3600)):
self.intervals = intervals
self.components = [defaultdict(lambda: 0) for i in self.intervals]
self.loop = loop or IOLoop.current()
self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
self.loop.add_callback(self._pc.start)
def add(self, item):
self.components[0][item] += 1
def shift(self):
for i in range(len(self.intervals) - 1):
frac = 0.2 * self.intervals[0] / self.intervals[i]
part = {k: v * frac for k, v in self.components[i].items()}
rest = {k: v * (1 - frac) for k, v in self.components[i].items()}
for k, v in part.items():
self.components[i + 1][k] += v
d = defaultdict(lambda: 0)
d.update(rest)
self.components[i] = d
def size(self):
return sum(sum(d.values()) for d in self.components)