1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 | import collections
from functools import wraps
import sys
import time as timemod
_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())
def _psutil_caller(method_name, default=_empty_namedtuple):
"""
Return a function calling the given psutil *method_name*,
or returning *default* if psutil is not present.
"""
# Import only once to avoid the cost of a failing import at each wrapper() call
try:
import psutil
except ImportError:
return default
meth = getattr(psutil, method_name)
@wraps(meth)
def wrapper():
try:
return meth()
except RuntimeError:
# This can happen on some systems (e.g. no physical disk in worker)
return default()
return wrapper
disk_io_counters = _psutil_caller("disk_io_counters")
net_io_counters = _psutil_caller("net_io_counters")
class _WindowsTime:
"""
Combine time.time() and time.perf_counter() to get an absolute clock
with fine resolution.
"""
# Resync every N seconds, to avoid drifting
RESYNC_EVERY = 600
def __init__(self):
self.delta = None
self.last_resync = float("-inf")
perf_counter = timemod.perf_counter
def time(self):
delta = self.delta
cur = self.perf_counter()
if cur - self.last_resync >= self.RESYNC_EVERY:
delta = self.resync()
self.last_resync = cur
return delta + cur
def resync(self):
_time = timemod.time
_perf_counter = self.perf_counter
min_samples = 5
while True:
times = [(_time(), _perf_counter()) for i in range(min_samples * 2)]
abs_times = collections.Counter(t[0] for t in times)
first, nfirst = abs_times.most_common()[0]
if nfirst < min_samples:
# System too noisy? Start again
continue
else:
perf_times = [t[1] for t in times if t[0] == first][:-1]
assert len(perf_times) >= min_samples - 1, perf_times
self.delta = first - sum(perf_times) / len(perf_times)
return self.delta
# A high-resolution wall clock timer measuring the seconds since Unix epoch
if sys.platform.startswith("win"):
time = _WindowsTime().time
else:
# Under modern Unices, time.time() should be good enough
time = timemod.time
def _native_thread_time():
# Python 3.7+, not all platforms
return timemod.thread_time()
def _linux_thread_time():
# Use hardcoded CLOCK_THREAD_CPUTIME_ID on Python 3 <= 3.6
if sys.platform != "linux":
raise OSError
return timemod.clock_gettime(3)
def _native_process_time():
# Python 3, should work everywhere
return timemod.process_time()
def _native_clock_func():
# time.clock() unfortunately has different semantics depending on the
# platform. On POSIX it's a per-process CPU timer (with possibly
# poor resolution). On Windows it's a high-resolution wall clock timer.
return timemod.clock()
def _detect_process_time():
"""
Return a per-process CPU timer function if possible, otherwise
a wall-clock timer.
"""
for func in [_native_process_time]:
try:
func()
return func
except (AttributeError, OSError):
pass
# Only Python 2?
return _native_clock_func
def _detect_thread_time():
"""
Return a per-thread CPU timer function if possible, otherwise
a per-process CPU timer function, or at worse a wall-clock timer.
"""
for func in [_native_thread_time, _linux_thread_time, _native_process_time]:
try:
func()
return func
except (AttributeError, OSError):
pass
# Only Python 2?
return time
process_time = _detect_process_time()
thread_time = _detect_thread_time()
|