Source code distributed/tests/test_counter.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import pytest

from distributed.counter import Counter
from distributed.utils_test import loop  # noqa F401

try:
    from distributed.counter import Digest
except ImportError:
    Digest = None


@pytest.mark.parametrize(
    "CD,size",
    [
        (Counter, lambda d: sum(d.values())),
        pytest.param(
            Digest,
            lambda x: x.size(),
            marks=pytest.mark.skipif(not Digest, reason="no crick library"),
        ),
    ],
)
def test_digest(loop, CD, size):
    c = CD(loop=loop)
    c.add(1)
    c.add(2)
    assert size(c.components[0]) == 2

    c.shift()
    assert 0 < size(c.components[0]) < 2
    assert 0 < size(c.components[1]) < 1
    assert sum(size(d) for d in c.components) == 2

    for i in range(len(c.components) - 1):
        assert size(c.components[i]) >= size(c.components[i + 1])

    c.add(3)

    assert sum(size(d) for d in c.components) == c.size()


def test_counter(loop):
    c = Counter(loop=loop)
    c.add(1)

    for i in range(5):
        c.shift()
        assert abs(sum(cc[1] for cc in c.components) - 1) < 1e-13