Source code distributed/tests/test_metrics.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import sys
import threading
import time

from distributed import metrics
from distributed.utils_test import run_for


def test_wall_clock():
    for i in range(3):
        time.sleep(0.01)
        t = time.time()
        samples = [metrics.time() for j in range(50)]
        # Resolution
        deltas = [samples[j + 1] - samples[j] for j in range(len(samples) - 1)]
        assert min(deltas) >= 0.0, deltas
        assert max(deltas) <= 1.0, deltas
        assert any(lambda d: 0.0 < d < 0.0001 for d in deltas), deltas
        # Close to time.time()
        assert t - 0.5 < samples[0] < t + 0.5


def test_process_time():
    start = metrics.process_time()
    run_for(0.05)
    dt = metrics.process_time() - start
    assert 0.03 <= dt <= 0.2

    # All threads counted
    t = threading.Thread(target=run_for, args=(0.1,))
    start = metrics.process_time()
    t.start()
    t.join()
    dt = metrics.process_time() - start
    assert dt >= 0.05

    # Sleep time not counted
    start = metrics.process_time()
    time.sleep(0.1)
    dt = metrics.process_time() - start
    assert dt <= 0.05


def test_thread_time():
    start = metrics.thread_time()
    run_for(0.05)
    dt = metrics.thread_time() - start
    assert 0.03 <= dt <= 0.2

    # Sleep time not counted
    start = metrics.thread_time()
    time.sleep(0.1)
    dt = metrics.thread_time() - start
    assert dt <= 0.05

    if sys.platform == "linux":
        # Always per-thread on Linux
        t = threading.Thread(target=run_for, args=(0.1,))
        start = metrics.thread_time()
        t.start()
        t.join()
        dt = metrics.thread_time() - start
        assert dt <= 0.05