1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 | import collections
import enum
import io
import os
import pickle
import tarfile
from orco.consts import MIME_PICKLE, MIME_TEXT
class JobState(enum.Enum):
NONE = "n"
ANNOUNCED = "a"
RUNNING = "r"
FINISHED = "f"
ERROR = "e"
FREED = "d"
A_FINISHED = "F" # ARCHIVED ITEM
A_FREED = "D" # ARCHIVED ITEM
ACTIVE_STATES = (
JobState.ANNOUNCED,
JobState.RUNNING,
JobState.FINISHED,
JobState.FREED,
)
JobMetadata = collections.namedtuple(
"EntryMetadata", ["created_date", "computation_time", "finished_date", "job_setup"]
)
class _NoValue:
pass
_NO_VALUE = _NoValue()
class Job:
"""
A single computation with its configuration and result
Attributes
* config - `OrderedDict` of function parameters (use with `Builder.run_with_config`)
* value - resulting value of the computation
* job_setup - setup for the job
* created - datetime when job was created
* comp_time - time of computation when job was created, or None if job was inserted
"""
__slots__ = ("builder_name", "key", "config", "state", "_value", "_job_id", "_db")
def __init__(self, builder_name, key, config):
self.builder_name = builder_name
self.key = key
self.config = config
self.state = None
self._job_id = None
self._db = None
self._value = _NO_VALUE
@property
def value(self):
self._check_attached()
if self.state != JobState.FINISHED:
raise Exception("Job is not finished")
value = self._value
if value is not _NO_VALUE:
return value
value, mime = self._db.get_blob(self._job_id, None)
if value is None:
self._value = None
return None
value = pickle.loads(value)
self._value = value
return value
def is_attached(self):
return self._job_id is not None
def detach(self):
self._job_id = None
self._db = None
self.state = None
def set_job_id(self, job_id, db, state):
assert self._job_id is None
self.state = state
self._job_id = job_id
self._db = db
def metadata(self):
self._check_attached()
return self._db.read_metadata(self._job_id)
def get_object(self, name, default=_NO_VALUE):
value, mime = self.get_blob(name, default)
if mime != MIME_PICKLE:
raise Exception(
"Blob exists, but is not pickled object, but {}".format(mime)
)
return pickle.loads(value)
def get_text(self, name):
value, mime = self.get_blob(name)
if mime != MIME_TEXT:
raise Exception("Blob exists, but is not text, but {}".format(mime))
return value.decode()
def get_names(self):
self._check_attached()
return self._db.get_blob_names(self._job_id)
def get_blob(self, name, default=_NO_VALUE):
self._check_attached()
value, mime = self._db.get_blob(self._job_id, name)
if value is None:
if default is _NO_VALUE:
raise Exception("Blob '{}' not found".format(name))
return default
return value, mime
def get_blob_as_file(self, name, target=None):
value, _ = self.get_blob(name)
if target is None:
target = name
with open(target, "wb") as f:
f.write(value)
def extract_tar(self, name, target=None):
value, mime = self.get_blob(name)
if mime != "application/tar":
raise Exception("Blob is not tar archive")
if target is None:
target = name
if not os.path.isdir(target):
os.makedirs(target)
with tarfile.TarFile(fileobj=io.BytesIO(value)) as tf:
tf.extractall(target)
def _check_attached(self):
if self._job_id is None:
raise Exception("Job is not attached")
def __repr__(self):
return "<Job {}/{}>".format(self.builder_name, self.config)
|