1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 | import json
import re
import pytest
pytest.importorskip("bokeh")
from tornado.escape import url_escape
from tornado.httpclient import AsyncHTTPClient, HTTPClientError
from dask.sizeof import sizeof
from distributed.utils import is_valid_xml
from distributed.utils_test import gen_cluster, slowinc, inc
@gen_cluster(client=True)
async def test_connect(c, s, a, b):
future = c.submit(lambda x: x + 1, 1)
x = c.submit(slowinc, 1, delay=1, retries=5)
await future
http_client = AsyncHTTPClient()
for suffix in [
"info/main/workers.html",
"info/worker/" + url_escape(a.address) + ".html",
"info/task/" + url_escape(future.key) + ".html",
"info/main/logs.html",
"info/logs/" + url_escape(a.address) + ".html",
"info/call-stack/" + url_escape(x.key) + ".html",
"info/call-stacks/" + url_escape(a.address) + ".html",
"json/counts.json",
"json/identity.json",
"json/index.html",
"individual-plots.json",
"sitemap.json",
]:
response = await http_client.fetch(
"http://localhost:%d/%s" % (s.http_server.port, suffix)
)
assert response.code == 200
body = response.body.decode()
if suffix.endswith(".json"):
json.loads(body)
else:
assert is_valid_xml(body)
assert not re.search("href=./", body) # no absolute links
@gen_cluster(client=True, nthreads=[])
async def test_worker_404(c, s):
http_client = AsyncHTTPClient()
with pytest.raises(HTTPClientError) as err:
await http_client.fetch(
"http://localhost:%d/info/worker/unknown" % s.http_server.port
)
assert err.value.code == 404
with pytest.raises(HTTPClientError) as err:
await http_client.fetch(
"http://localhost:%d/info/task/unknown" % s.http_server.port
)
assert err.value.code == 404
@gen_cluster(client=True, scheduler_kwargs={"http_prefix": "/foo", "dashboard": True})
async def test_prefix(c, s, a, b):
http_client = AsyncHTTPClient()
for suffix in ["foo/info/main/workers.html", "foo/json/index.html", "foo/system"]:
response = await http_client.fetch(
"http://localhost:%d/%s" % (s.http_server.port, suffix)
)
assert response.code == 200
body = response.body.decode()
if suffix.endswith(".json"):
json.loads(body)
else:
assert is_valid_xml(body)
@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_prometheus(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families
http_client = AsyncHTTPClient()
# request data twice since there once was a case where metrics got registered multiple times resulting in
# prometheus_client errors
for _ in range(2):
response = await http_client.fetch(
"http://localhost:%d/metrics" % s.http_server.port
)
assert response.code == 200
assert response.headers["Content-Type"] == "text/plain; version=0.0.4"
txt = response.body.decode("utf8")
families = {
family.name: family for family in text_string_to_metric_families(txt)
}
assert "dask_scheduler_workers" in families
client = families["dask_scheduler_clients"]
assert client.samples[0].value == 1.0
@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_prometheus_collect_task_states(c, s, a, b):
pytest.importorskip("prometheus_client")
from prometheus_client.parser import text_string_to_metric_families
http_client = AsyncHTTPClient()
async def fetch_metrics():
port = s.http_server.port
response = await http_client.fetch(f"http://localhost:{port}/metrics")
txt = response.body.decode("utf8")
families = {
family.name: family for family in text_string_to_metric_families(txt)
}
active_metrics = {
sample.labels["state"]: sample.value
for sample in families["dask_scheduler_tasks"].samples
}
forgotten_tasks = [
sample.value
for sample in families["dask_scheduler_tasks_forgotten"].samples
]
return active_metrics, forgotten_tasks
expected = {"memory", "released", "processing", "waiting", "no-worker", "erred"}
# Ensure that we get full zero metrics for all states even though the
# scheduler did nothing, yet
assert not s.tasks
active_metrics, forgotten_tasks = await fetch_metrics()
assert active_metrics.keys() == expected
assert sum(active_metrics.values()) == 0.0
assert sum(forgotten_tasks) == 0.0
# submit a task which should show up in the prometheus scraping
future = c.submit(slowinc, 1, delay=0.5)
active_metrics, forgotten_tasks = await fetch_metrics()
assert active_metrics.keys() == expected
assert sum(active_metrics.values()) == 1.0
assert sum(forgotten_tasks) == 0.0
res = await c.gather(future)
assert res == 2
del future
active_metrics, forgotten_tasks = await fetch_metrics()
assert active_metrics.keys() == expected
assert sum(active_metrics.values()) == 0.0
assert sum(forgotten_tasks) == 0.0
@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_health(c, s, a, b):
http_client = AsyncHTTPClient()
response = await http_client.fetch(
"http://localhost:%d/health" % s.http_server.port
)
assert response.code == 200
assert response.headers["Content-Type"] == "text/plain"
txt = response.body.decode("utf8")
assert txt == "ok"
@gen_cluster()
async def test_sitemap(s, a, b):
http_client = AsyncHTTPClient()
response = await http_client.fetch(
"http://localhost:%d/sitemap.json" % s.http_server.port
)
out = json.loads(response.body.decode())
assert "paths" in out
assert "/sitemap.json" in out["paths"]
assert "/health" in out["paths"]
assert "/statics/css/base.css" in out["paths"]
@gen_cluster(client=True)
async def test_task_page(c, s, a, b):
future = c.submit(lambda x: x + 1, 1, workers=a.address)
x = c.submit(inc, 1)
await future
http_client = AsyncHTTPClient()
"info/task/" + url_escape(future.key) + ".html",
response = await http_client.fetch(
"http://localhost:%d/info/task/" % s.http_server.port
+ url_escape(future.key)
+ ".html"
)
assert response.code == 200
body = response.body.decode()
assert str(sizeof(1)) in body
assert "int" in body
assert a.address in body
assert "memory" in body
@gen_cluster(
client=True,
scheduler_kwargs={"dashboard": True},
config={
"distributed.scheduler.dashboard.bokeh-application.allow_websocket_origin": [
"good.invalid"
]
},
)
async def test_allow_websocket_origin(c, s, a, b):
from tornado.httpclient import HTTPRequest
from tornado.websocket import websocket_connect
url = (
"ws://localhost:%d/status/ws?bokeh-protocol-version=1.0&bokeh-session-id=1"
% s.http_server.port
)
with pytest.raises(HTTPClientError) as err:
await websocket_connect(
HTTPRequest(url, headers={"Origin": "http://evil.invalid"})
)
assert err.value.code == 403
@gen_cluster(client=True)
async def test_eventstream(c, s, a, b):
from tornado.websocket import websocket_connect
ws_client = await websocket_connect(
"ws://localhost:%d/%s" % (s.http_server.port, "eventstream")
)
assert "websocket" in str(s.plugins).lower()
ws_client.close()
|