1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 | import logging
from tornado import web
logger = logging.getLogger(__name__)
try:
from jupyter_server_proxy.handlers import ProxyHandler
class GlobalProxyHandler(ProxyHandler):
"""
A tornado request handler that proxies HTTP and websockets
from a port to any valid endpoint'.
"""
def initialize(self, dask_server=None, extra=None):
self.scheduler = dask_server
self.extra = extra or {}
async def http_get(self, port, host, proxied_path):
# route here first
# incoming URI /proxy/{port}/{host}/{proxied_path}
self.host = host
# rewrite uri for jupyter-server-proxy handling
uri = "/proxy/%s/%s" % (str(port), proxied_path)
self.request.uri = uri
# slash is removed during regex in handler
proxied_path = "/%s" % proxied_path
worker = "%s:%s" % (self.host, str(port))
if not check_worker_dashboard_exits(self.scheduler, worker):
msg = "Worker <%s> does not exist" % worker
self.set_status(400)
self.finish(msg)
return
return await self.proxy(port, proxied_path)
async def open(self, port, host, proxied_path):
# finally, proxy to other address/port
return await self.proxy_open(host, port, proxied_path)
def post(self, port, proxied_path):
return self.proxy(port, proxied_path)
def put(self, port, proxied_path):
return self.proxy(port, proxied_path)
def delete(self, port, proxied_path):
return self.proxy(port, proxied_path)
def head(self, port, proxied_path):
return self.proxy(port, proxied_path)
def patch(self, port, proxied_path):
return self.proxy(port, proxied_path)
def options(self, port, proxied_path):
return self.proxy(port, proxied_path)
def proxy(self, port, proxied_path):
# router here second
# returns ProxyHandler coroutine
return super().proxy(self.host, port, proxied_path)
except ImportError:
logger.info(
"To route to workers diagnostics web server "
"please install jupyter-server-proxy: "
"python -m pip install jupyter-server-proxy"
)
class GlobalProxyHandler(web.RequestHandler):
"""Minimal Proxy handler when jupyter-server-proxy is not installed"""
def initialize(self, dask_server=None, extra=None):
self.server = dask_server
self.extra = extra or {}
def get(self, port, host, proxied_path):
worker_url = "%s:%s/%s" % (host, str(port), proxied_path)
msg = """
<p> Try navigating to <a href=http://%s>%s</a> for your worker dashboard </p>
<p>
Dask tried to proxy you to that page through your
Scheduler's dashboard connection, but you don't have
jupyter-server-proxy installed. You may want to install it
with either conda or pip, and then restart your scheduler.
</p>
<p><pre> conda install jupyter-server-proxy -c conda-forge </pre></p>
<p><pre> python -m pip install jupyter-server-proxy</pre></p>
<p>
The link above should work though if your workers are on a
sufficiently open network. This is common on single machines,
but less common in production clusters. Your IT administrators
will know more
</p>
""" % (
worker_url,
worker_url,
)
self.write(msg)
def check_worker_dashboard_exits(scheduler, worker):
"""Check addr:port exists as a worker in scheduler list
Parameters
----------
worker : str
addr:port
Returns
-------
bool
"""
addr, port = worker.split(":")
workers = list(scheduler.workers.values())
for w in workers:
bokeh_port = w.services.get("dashboard", "")
if addr == w.host and port == str(bokeh_port):
return True
return False
routes = [(r"proxy/(\d+)/(.*?)/(.*)", GlobalProxyHandler, {})]
|