Source code distributed/http/proxy.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging

from tornado import web

logger = logging.getLogger(__name__)

try:
    from jupyter_server_proxy.handlers import ProxyHandler

    class GlobalProxyHandler(ProxyHandler):
        """
        A tornado request handler that proxies HTTP and websockets
        from a port to any valid endpoint'.
        """

        def initialize(self, dask_server=None, extra=None):
            self.scheduler = dask_server
            self.extra = extra or {}

        async def http_get(self, port, host, proxied_path):
            # route here first
            # incoming URI /proxy/{port}/{host}/{proxied_path}

            self.host = host

            # rewrite uri for jupyter-server-proxy handling
            uri = "/proxy/%s/%s" % (str(port), proxied_path)
            self.request.uri = uri

            # slash is removed during regex in handler
            proxied_path = "/%s" % proxied_path

            worker = "%s:%s" % (self.host, str(port))
            if not check_worker_dashboard_exits(self.scheduler, worker):
                msg = "Worker <%s> does not exist" % worker
                self.set_status(400)
                self.finish(msg)
                return
            return await self.proxy(port, proxied_path)

        async def open(self, port, host, proxied_path):
            # finally, proxy to other address/port
            return await self.proxy_open(host, port, proxied_path)

        def post(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def put(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def delete(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def head(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def patch(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def options(self, port, proxied_path):
            return self.proxy(port, proxied_path)

        def proxy(self, port, proxied_path):
            # router here second
            # returns ProxyHandler coroutine
            return super().proxy(self.host, port, proxied_path)


except ImportError:
    logger.info(
        "To route to workers diagnostics web server "
        "please install jupyter-server-proxy: "
        "python -m pip install jupyter-server-proxy"
    )

    class GlobalProxyHandler(web.RequestHandler):
        """Minimal Proxy handler when jupyter-server-proxy is not installed"""

        def initialize(self, dask_server=None, extra=None):
            self.server = dask_server
            self.extra = extra or {}

        def get(self, port, host, proxied_path):
            worker_url = "%s:%s/%s" % (host, str(port), proxied_path)
            msg = """
                <p> Try navigating to <a href=http://%s>%s</a> for your worker dashboard </p>

                <p>
                Dask tried to proxy you to that page through your
                Scheduler's dashboard connection, but you don't have
                jupyter-server-proxy installed.  You may want to install it
                with either conda or pip, and then restart your scheduler.
                </p>

                <p><pre> conda install jupyter-server-proxy -c conda-forge </pre></p>
                <p><pre> python -m pip install jupyter-server-proxy</pre></p>

                <p>
                The link above should work though if your workers are on a
                sufficiently open network.  This is common on single machines,
                but less common in production clusters.  Your IT administrators
                will know more
                </p>
            """ % (
                worker_url,
                worker_url,
            )
            self.write(msg)


def check_worker_dashboard_exits(scheduler, worker):
    """Check addr:port exists as a worker in scheduler list

    Parameters
    ----------
    worker : str
        addr:port

    Returns
    -------
    bool
    """
    addr, port = worker.split(":")
    workers = list(scheduler.workers.values())
    for w in workers:
        bokeh_port = w.services.get("dashboard", "")
        if addr == w.host and port == str(bokeh_port):
            return True
    return False


routes = [(r"proxy/(\d+)/(.*?)/(.*)", GlobalProxyHandler, {})]