Source code distributed/diagnostics/graph_layout.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from .plugin import SchedulerPlugin


class GraphLayout(SchedulerPlugin):
    """Dynamic graph layout during computation

    This assigns (x, y) locations to all tasks quickly and dynamically as new
    tasks are added.  This scales to a few thousand nodes.

    It is commonly used with distributed/bokeh/scheduler.py::TaskGraph, which
    is rendered at /graph on the diagnostic dashboard.
    """

    def __init__(self, scheduler):
        self.x = {}
        self.y = {}
        self.collision = {}
        self.scheduler = scheduler
        self.index = {}
        self.index_edge = {}
        self.next_y = 0
        self.next_index = 0
        self.next_edge_index = 0
        self.new = []
        self.new_edges = []
        self.state_updates = []
        self.visible_updates = []
        self.visible_edge_updates = []

        scheduler.add_plugin(self)

        if self.scheduler.tasks:
            dependencies = {
                k: [ds.key for ds in ts.dependencies]
                for k, ts in scheduler.tasks.items()
            }
            priority = {k: ts.priority for k, ts in scheduler.tasks.items()}
            self.update_graph(
                self.scheduler,
                tasks=self.scheduler.tasks,
                dependencies=dependencies,
                priority=priority,
            )

    def update_graph(
        self, scheduler, dependencies=None, priority=None, tasks=None, **kwargs
    ):
        stack = sorted(tasks, key=lambda k: priority.get(k, 0), reverse=True)
        while stack:
            key = stack.pop()
            if key in self.x or key not in scheduler.tasks:
                continue
            deps = dependencies.get(key, ())
            if deps:
                if not all(dep in self.y for dep in deps):
                    stack.append(key)
                    stack.extend(
                        sorted(deps, key=lambda k: priority.get(k, 0), reverse=True)
                    )
                    continue
                else:
                    total_deps = sum(
                        len(scheduler.tasks[dep].dependents) for dep in deps
                    )
                    y = sum(
                        self.y[dep] * len(scheduler.tasks[dep].dependents) / total_deps
                        for dep in deps
                    )
                    x = max(self.x[dep] for dep in deps) + 1
            else:
                x = 0
                y = self.next_y
                self.next_y += 1

            if (x, y) in self.collision:
                old_x, old_y = x, y
                x, y = self.collision[(x, y)]
                y += 0.1
                self.collision[old_x, old_y] = (x, y)
            else:
                self.collision[(x, y)] = (x, y)

            self.x[key] = x
            self.y[key] = y
            self.index[key] = self.next_index
            self.next_index = self.next_index + 1
            self.new.append(key)
            for dep in deps:
                edge = (dep, key)
                self.index_edge[edge] = self.next_edge_index
                self.next_edge_index += 1
                self.new_edges.append(edge)

    def transition(self, key, start, finish, *args, **kwargs):
        if finish != "forgotten":
            self.state_updates.append((self.index[key], finish))
        else:
            self.visible_updates.append((self.index[key], "False"))
            task = self.scheduler.tasks[key]
            for dep in task.dependents:
                edge = (key, dep.key)
                self.visible_edge_updates.append(
                    (self.index_edge.pop((key, dep.key)), "False")
                )
            for dep in task.dependencies:
                self.visible_edge_updates.append(
                    (self.index_edge.pop((dep.key, key)), "False")
                )

            try:
                del self.collision[(self.x[key], self.y[key])]
            except KeyError:
                pass

            for collection in [self.x, self.y, self.index]:
                del collection[key]

    def reset_index(self):
        """Reset the index and refill new and new_edges

        From time to time TaskGraph wants to remove invisible nodes and reset
        all of its indices.  This helps.
        """
        self.new = []
        self.new_edges = []
        self.visible_updates = []
        self.state_updates = []
        self.visible_edge_updates = []

        self.index = {}
        self.next_index = 0
        self.index_edge = {}
        self.next_edge_index = 0

        for key in self.x:
            self.index[key] = self.next_index
            self.next_index += 1
            self.new.append(key)
            for dep in self.scheduler.tasks[key].dependencies:
                edge = (dep.key, key)
                self.index_edge[edge] = self.next_edge_index
                self.next_edge_index += 1
                self.new_edges.append(edge)