Source code orco/globals.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from .builder import Builder
from .runtime import Runtime


_global_builders = {}
_global_runtime = None


def builder(*, name=None, job_setup=None, is_frozen=False):
    def _register(fn):
        b = Builder(fn, name=name, job_setup=job_setup, is_frozen=is_frozen)
        _register_builder(b)
        return b.make_proxy()

    return _register


def _register_builder(b):
    if b.name in _global_builders:
        raise Exception("Builder {!r} is already globally registered.".format(b.name))
    _global_builders[b.name] = b
    if _global_runtime is not None:
        _global_runtime.register_builder(b)


def clear_global_builders():
    _global_builders.clear()


def _get_global_builders():
    return _global_builders.values()


def start_runtime(db_url, *, n_processes=None):
    """
    Create and start a global runtime,

    Runtime manages database with results and starts computations.
    If there were an old runtime, then it is stopped

    For SQLite:

    >>> start_runtime("sqlite:///path/to/dbfile.db")

    For Postgress:

    >>> start_runtime("postgresql://<USERNAME>:<PASSWORD>@<HOSTNAME>/<DATABASE>")
    """

    global _global_runtime
    if _global_runtime is not None:
        _global_runtime.stop()
    _global_runtime = Runtime(db_url, n_processes=n_processes)


def stop_global_runtime():
    global _global_runtime
    if _global_runtime is not None:
        _global_runtime.stop()


def get_global_runtime() -> Runtime:
    global _global_runtime
    if _global_runtime is None:
        raise Exception("No runtime was started")
    return _global_runtime


def has_global_runtime():
    return _global_runtime is not None


def serve(port=8550, debug=False, nonblocking=False):
    return get_global_runtime().serve(port, debug, nonblocking)


def read(job, *, reattach=False):
    return get_global_runtime().read(job, reattach=reattach)


def try_read(job, *, reattach=False):
    return get_global_runtime().read(job, reattach=reattach)


def read_jobs(job):
    return get_global_runtime().read_jobs(job)


def read_many(jobs, *, reattach=False, drop_missing=False):
    return get_global_runtime().read_many(
        jobs, reattach=reattach, drop_missing=drop_missing
    )


def drop(job, *, drop_inputs=False):
    return get_global_runtime().drop(job, drop_inputs=drop_inputs)


def drop_many(jobs, *, drop_inputs=False):
    return get_global_runtime().drop_many(jobs, drop_inputs=drop_inputs)


def archive(job, *, archive_inputs=False):
    return get_global_runtime().archive(job, archive_inputs=archive_inputs)


def archive_many(jobs, *, archive_inputs=False):
    return get_global_runtime().archive_many(jobs, archive_inputs=archive_inputs)


def free(job):
    return get_global_runtime().free(job)


def free_many(jobs):
    return get_global_runtime().free_many(jobs)


def insert(job, value):
    return get_global_runtime().insert(job, value)


def drop_builder(builder_name, *, drop_inputs=False):
    return get_global_runtime().drop_builder(builder_name, drop_inputs=drop_inputs)


def compute(job, *, reattach=False, continue_on_error=False):
    return get_global_runtime().compute(
        job, reattach=reattach, continue_on_error=continue_on_error
    )


def compute_many(jobs, *, reattach=False, continue_on_error=False):
    return get_global_runtime().compute_many(
        jobs, reattach=reattach, continue_on_error=continue_on_error
    )


def upgrade_builder(builder, upgrade_fn):
    return get_global_runtime().upgrade_builder(builder, upgrade_fn)