Source code distributed/counter.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from collections import defaultdict

from tornado.ioloop import IOLoop, PeriodicCallback


try:
    from crick import TDigest
except ImportError:
    pass
else:

    class Digest:
        def __init__(self, loop=None, intervals=(5, 60, 3600)):
            self.intervals = intervals
            self.components = [TDigest() for i in self.intervals]

            self.loop = loop or IOLoop.current()
            self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
            self.loop.add_callback(self._pc.start)

        def add(self, item):
            self.components[0].add(item)

        def update(self, seq):
            self.components[0].update(seq)

        def shift(self):
            for i in range(len(self.intervals) - 1):
                frac = 0.2 * self.intervals[0] / self.intervals[i]
                part = self.components[i].scale(frac)
                rest = self.components[i].scale(1 - frac)

                self.components[i + 1].merge(part)
                self.components[i] = rest

        def size(self):
            return sum(d.size() for d in self.components)


class Counter:
    def __init__(self, loop=None, intervals=(5, 60, 3600)):
        self.intervals = intervals
        self.components = [defaultdict(lambda: 0) for i in self.intervals]

        self.loop = loop or IOLoop.current()
        self._pc = PeriodicCallback(self.shift, self.intervals[0] * 1000)
        self.loop.add_callback(self._pc.start)

    def add(self, item):
        self.components[0][item] += 1

    def shift(self):
        for i in range(len(self.intervals) - 1):
            frac = 0.2 * self.intervals[0] / self.intervals[i]
            part = {k: v * frac for k, v in self.components[i].items()}
            rest = {k: v * (1 - frac) for k, v in self.components[i].items()}

            for k, v in part.items():
                self.components[i + 1][k] += v
            d = defaultdict(lambda: 0)
            d.update(rest)
            self.components[i] = d

    def size(self):
        return sum(sum(d.values()) for d in self.components)