1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 | import contextlib
import gc
import itertools
import random
import re
import pytest
from distributed.metrics import thread_time
from distributed.utils_perf import FractionalTimer, GCDiagnosis, disable_gc_diagnosis
from distributed.utils_test import captured_logger, run_for
class RandomTimer:
"""
A mock timer producing random (but monotonic) values.
"""
def __init__(self):
self.last = 0.0
self.timings = []
self.durations = ([], [])
self.i_durations = itertools.cycle((0, 1))
self.random = random.Random(42)
def __call__(self):
dt = self.random.expovariate(1.0)
self.last += dt
self.timings.append(self.last)
self.durations[next(self.i_durations)].append(dt)
return self.last
def test_fractional_timer():
N = 10
def check_fraction(timer, ft):
# The running fraction should be approximately equal to the
# sum of last N "measurement" intervals over the sum of last
# 2N intervals (not 2N - 1 or 2N + 1)
actual = ft.running_fraction
expected = sum(timer.durations[1][-N:]) / (
sum(timer.durations[0][-N:] + timer.durations[1][-N:])
)
assert actual == pytest.approx(expected)
timer = RandomTimer()
ft = FractionalTimer(n_samples=N, timer=timer)
for i in range(N):
ft.start_timing()
ft.stop_timing()
assert len(timer.timings) == N * 2
assert ft.running_fraction is None
ft.start_timing()
ft.stop_timing()
assert len(timer.timings) == (N + 1) * 2
assert ft.running_fraction is not None
check_fraction(timer, ft)
for i in range(N * 10):
ft.start_timing()
ft.stop_timing()
check_fraction(timer, ft)
@contextlib.contextmanager
def enable_gc_diagnosis_and_log(diag, level="INFO"):
disable_gc_diagnosis(force=True) # just in case
if gc.callbacks:
print("Unexpected gc.callbacks", gc.callbacks)
with captured_logger("distributed.utils_perf", level=level, propagate=False) as sio:
gc.disable()
gc.collect() # drain any leftover from previous tests
diag.enable()
try:
yield sio
finally:
diag.disable()
gc.enable()
def test_gc_diagnosis_cpu_time():
diag = GCDiagnosis(warn_over_frac=0.75)
diag.N_SAMPLES = 3 # shorten tests
with enable_gc_diagnosis_and_log(diag, level="WARN") as sio:
# Spend some CPU time doing only full GCs
for i in range(diag.N_SAMPLES):
gc.collect()
assert not sio.getvalue()
gc.collect()
lines = sio.getvalue().splitlines()
assert len(lines) == 1
# Between 80% and 100%
assert re.match(
r"full garbage collections took (100|[89][0-9])% " r"CPU time recently",
lines[0],
)
with enable_gc_diagnosis_and_log(diag, level="WARN") as sio:
# Spend half the CPU time doing full GCs
for i in range(diag.N_SAMPLES + 1):
t1 = thread_time()
gc.collect()
dt = thread_time() - t1
run_for(dt, timer=thread_time)
# Less than 75% so nothing printed
assert not sio.getvalue()
@pytest.mark.xfail(reason="unknown")
def test_gc_diagnosis_rss_win():
diag = GCDiagnosis(info_over_rss_win=10e6)
def make_refcycle(nbytes):
l = [b"x" * nbytes]
l.append(l)
return
with enable_gc_diagnosis_and_log(diag) as sio:
make_refcycle(100 * 1024)
gc.collect()
# Too small, nothing printed
assert not sio.getvalue()
# NOTE: need to allocate a very large value to make sure RSS
# really shrinks (depending on the system memory allocator,
# "small" memory deallocations may keep the memory in the pool)
make_refcycle(200 * 1024 * 1024)
gc.collect()
lines = sio.getvalue().splitlines()
assert len(lines) == 1
# Several MB released, and at least 1 reference cycles
assert re.match(
r"full garbage collection released [\d\.]+ MB "
r"from [1-9]\d* reference cycles",
lines[0],
)
|