Source code distributed/metrics.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import collections
from functools import wraps
import sys
import time as timemod


_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())


def _psutil_caller(method_name, default=_empty_namedtuple):
    """
    Return a function calling the given psutil *method_name*,
    or returning *default* if psutil is not present.
    """
    # Import only once to avoid the cost of a failing import at each wrapper() call
    try:
        import psutil
    except ImportError:
        return default

    meth = getattr(psutil, method_name)

    @wraps(meth)
    def wrapper():
        try:
            return meth()
        except RuntimeError:
            # This can happen on some systems (e.g. no physical disk in worker)
            return default()

    return wrapper


disk_io_counters = _psutil_caller("disk_io_counters")

net_io_counters = _psutil_caller("net_io_counters")


class _WindowsTime:
    """
    Combine time.time() and time.perf_counter() to get an absolute clock
    with fine resolution.
    """

    # Resync every N seconds, to avoid drifting
    RESYNC_EVERY = 600

    def __init__(self):
        self.delta = None
        self.last_resync = float("-inf")

    perf_counter = timemod.perf_counter

    def time(self):
        delta = self.delta
        cur = self.perf_counter()
        if cur - self.last_resync >= self.RESYNC_EVERY:
            delta = self.resync()
            self.last_resync = cur
        return delta + cur

    def resync(self):
        _time = timemod.time
        _perf_counter = self.perf_counter
        min_samples = 5
        while True:
            times = [(_time(), _perf_counter()) for i in range(min_samples * 2)]
            abs_times = collections.Counter(t[0] for t in times)
            first, nfirst = abs_times.most_common()[0]
            if nfirst < min_samples:
                # System too noisy? Start again
                continue
            else:
                perf_times = [t[1] for t in times if t[0] == first][:-1]
                assert len(perf_times) >= min_samples - 1, perf_times
                self.delta = first - sum(perf_times) / len(perf_times)
                return self.delta


# A high-resolution wall clock timer measuring the seconds since Unix epoch
if sys.platform.startswith("win"):
    time = _WindowsTime().time
else:
    # Under modern Unices, time.time() should be good enough
    time = timemod.time


def _native_thread_time():
    # Python 3.7+, not all platforms
    return timemod.thread_time()


def _linux_thread_time():
    # Use hardcoded CLOCK_THREAD_CPUTIME_ID on Python 3 <= 3.6
    if sys.platform != "linux":
        raise OSError
    return timemod.clock_gettime(3)


def _native_process_time():
    # Python 3, should work everywhere
    return timemod.process_time()


def _native_clock_func():
    # time.clock() unfortunately has different semantics depending on the
    # platform.  On POSIX it's a per-process CPU timer (with possibly
    # poor resolution).  On Windows it's a high-resolution wall clock timer.
    return timemod.clock()


def _detect_process_time():
    """
    Return a per-process CPU timer function if possible, otherwise
    a wall-clock timer.
    """
    for func in [_native_process_time]:
        try:
            func()
            return func
        except (AttributeError, OSError):
            pass
    # Only Python 2?
    return _native_clock_func


def _detect_thread_time():
    """
    Return a per-thread CPU timer function if possible, otherwise
    a per-process CPU timer function, or at worse a wall-clock timer.
    """
    for func in [_native_thread_time, _linux_thread_time, _native_process_time]:
        try:
            func()
            return func
        except (AttributeError, OSError):
            pass
    # Only Python 2?
    return time


process_time = _detect_process_time()
thread_time = _detect_thread_time()