Source code distributed/diskutils.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import errno
import glob
import logging
import os
import shutil
import stat
import tempfile
import weakref

import dask

from . import locket


logger = logging.getLogger(__name__)

DIR_LOCK_EXT = ".dirlock"


def is_locking_enabled():
    return dask.config.get("distributed.worker.use-file-locking")


def safe_unlink(path):
    try:
        os.unlink(path)
    except EnvironmentError as e:
        # Perhaps it was removed by someone else?
        if e.errno != errno.ENOENT:
            logger.error("Failed to remove %r", str(e))


class WorkDir:
    """
    A temporary work directory inside a WorkSpace.
    """

    def __init__(self, workspace, name=None, prefix=None):
        assert name is None or prefix is None

        if name is None:
            self.dir_path = tempfile.mkdtemp(prefix=prefix, dir=workspace.base_dir)
        else:
            self.dir_path = os.path.join(workspace.base_dir, name)
            os.mkdir(self.dir_path)  # it shouldn't already exist

        if is_locking_enabled():
            try:
                self._lock_path = os.path.join(self.dir_path + DIR_LOCK_EXT)
                assert not os.path.exists(self._lock_path)
                logger.debug("Locking %r...", self._lock_path)
                # Avoid a race condition before locking the file
                # by taking the global lock
                try:
                    with workspace._global_lock():
                        self._lock_file = locket.lock_file(self._lock_path)
                        self._lock_file.acquire()
                except OSError as e:
                    logger.exception(
                        "Could not acquire workspace lock on "
                        "path: %s ."
                        "Continuing without lock. "
                        "This may result in workspaces not being "
                        "cleaned up",
                        self._lock_path,
                        exc_info=True,
                    )
                    self._lock_file = None
            except Exception:
                shutil.rmtree(self.dir_path, ignore_errors=True)
                raise
            workspace._known_locks.add(self._lock_path)

            self._finalizer = weakref.finalize(
                self,
                self._finalize,
                workspace,
                self._lock_path,
                self._lock_file,
                self.dir_path,
            )
        else:
            self._finalizer = weakref.finalize(
                self, self._finalize, workspace, None, None, self.dir_path
            )

    def release(self):
        """
        Dispose of this directory.
        """
        self._finalizer()

    @classmethod
    def _finalize(cls, workspace, lock_path, lock_file, dir_path):
        try:
            workspace._purge_directory(dir_path)
        finally:
            if lock_file is not None:
                lock_file.release()
            if lock_path is not None:
                workspace._known_locks.remove(lock_path)
                safe_unlink(lock_path)


class WorkSpace:
    """
    An on-disk workspace that tracks disposable work directories inside it.
    If a process crashes or another event left stale directories behind,
    this will be detected and the directories purged.
    """

    # Keep track of all locks known to this process, to avoid several
    # WorkSpaces to step on each other's toes
    _known_locks = set()

    def __init__(self, base_dir):
        self.base_dir = os.path.abspath(base_dir)
        self._init_workspace()
        self._global_lock_path = os.path.join(self.base_dir, "global.lock")
        self._purge_lock_path = os.path.join(self.base_dir, "purge.lock")

    def _init_workspace(self):
        try:
            os.mkdir(self.base_dir)
        except EnvironmentError as e:
            if e.errno != errno.EEXIST:
                raise

    def _global_lock(self, **kwargs):
        return locket.lock_file(self._global_lock_path, **kwargs)

    def _purge_lock(self, **kwargs):
        return locket.lock_file(self._purge_lock_path, **kwargs)

    def _purge_leftovers(self):
        if not is_locking_enabled():
            return []

        # List candidates with the global lock taken, to avoid purging
        # a lock file that was just created but not yet locked
        # (see WorkDir.__init__)
        lock = self._global_lock(timeout=0)
        try:
            lock.acquire()
        except locket.LockError:
            # No need to waste time here if someone else is busy doing
            # something on this workspace
            return []
        else:
            try:
                candidates = list(self._list_unknown_locks())
            finally:
                lock.release()

        # No need to hold the global lock here, especially as purging
        # can take time.  Instead take the purge lock to avoid two
        # processes purging at once.
        purged = []
        lock = self._purge_lock(timeout=0)
        try:
            lock.acquire()
        except locket.LockError:
            # No need for two processes to purge one after another
            pass
        else:
            try:
                for path in candidates:
                    if self._check_lock_or_purge(path):
                        purged.append(path)
            finally:
                lock.release()
        return purged

    def _list_unknown_locks(self):
        for p in glob.glob(os.path.join(self.base_dir, "*" + DIR_LOCK_EXT)):
            try:
                st = os.stat(p)
            except EnvironmentError:
                # May have been removed in the meantime
                pass
            else:
                # XXX restrict to files owned by current user?
                if stat.S_ISREG(st.st_mode):
                    yield p

    def _purge_directory(self, dir_path):
        shutil.rmtree(dir_path, onerror=self._on_remove_error)

    def _check_lock_or_purge(self, lock_path):
        """
        Try locking the given path, if it fails it's in use,
        otherwise the corresponding directory is deleted.

        Return True if the lock was stale.
        """
        assert lock_path.endswith(DIR_LOCK_EXT)
        if lock_path in self._known_locks:
            # Avoid touching a lock that we know is already taken
            return False
        logger.debug("Checking lock file %r...", lock_path)
        lock = locket.lock_file(lock_path, timeout=0)
        try:
            lock.acquire()
        except locket.LockError:
            # Lock file still in use, ignore
            return False
        try:
            # Lock file is stale, therefore purge corresponding directory
            dir_path = lock_path[: -len(DIR_LOCK_EXT)]
            if os.path.exists(dir_path):
                logger.info("Found stale lock file and directory %r, purging", dir_path)
                self._purge_directory(dir_path)
        finally:
            lock.release()
        # Clean up lock file after we released it
        safe_unlink(lock_path)
        return True

    def _on_remove_error(self, func, path, exc_info):
        typ, exc, tb = exc_info
        logger.error("Failed to remove %r (failed in %r): %s", path, func, str(exc))

    def new_work_dir(self, **kwargs):
        """
        Create and return a new WorkDir in this WorkSpace.
        Either the *prefix* or *name* parameter should be given
        (*prefix* is preferred as it avoids potential collisions)

        Parameters
        ----------
        prefix: str (optional)
            The prefix of the temporary subdirectory name for the workdir
        name: str (optional)
            The subdirectory name for the workdir
        """
        try:
            self._purge_leftovers()
        except OSError:
            logger.error(
                "Failed to clean up lingering worker directories in path: %s ",
                exc_info=True,
            )
        return WorkDir(self, **kwargs)