import importlib
import gws
import gws.config
import gws.tools.json2
import gws.types as t
from . import storage
[docs]class State(t.Enum):
init = 'init' #: the job is being created
open = 'open' #: the job is just created and waiting for start
running = 'running' #: the job is running
complete = 'complete' #: the job has been completed successfully
error = 'error' #: there was an error
cancel = 'cancel' #: the job was cancelled
[docs]class Error(gws.Error):
pass
[docs]class PrematureTermination(Exception):
pass
[docs]def create(uid, user: t.IUser, worker: str, project_uid=None, args=None):
if user:
fid = user.fid
str_user = gws.config.root().application.auth.serialize_user(user)
else:
fid = str_user = ''
gws.log.debug('creating job', worker, fid)
storage.create(uid)
storage.update(
uid,
user_fid=fid,
str_user=str_user,
project_uid=project_uid,
worker=worker,
args=gws.tools.json2.to_string(args),
steps=0,
step=0,
state=State.open,
)
return get(uid)
[docs]def get(uid):
rec = storage.find(uid)
if rec:
return Job(rec)
[docs]def remove(uid):
storage.remove(uid)
[docs]def get_for(user, uid):
job = get(uid)
if not job:
gws.log.error(f'job={uid!r}: not found')
return
if job.user_fid != user.fid:
gws.log.error(f'job={uid!r} wrong user (job={job.user_fid!r} user={user.fid!r})')
return
return job
[docs]class Job:
def __init__(self, rec):
self.uid = ''
self.user_fid = ''
self.str_user = ''
self.project_uid = ''
self.worker = ''
self.args = ''
self.steps = 0
self.step = 0
self.state = ''
self.steptype = ''
self.stepname = ''
self.error = ''
self.result = ''
self.created = 0
self.updated = 0
for k, v in rec.items():
setattr(self, k, v)
self.args = gws.tools.json2.from_string(self.args)
self.result = gws.tools.json2.from_string(self.result)
@property
def user(self) -> t.Optional[t.IUser]:
if self.str_user:
return gws.config.root().application.auth.unserialize_user(self.str_user)
@property
def progress(self) -> int:
if self.state == State.complete:
return 100
if self.steps and self.state == State.running:
return min(100, int((self.step or 0) * 100 / self.steps))
return 0
[docs] def run(self):
if self.state != State.open:
gws.log.error(f'job={self.uid!r} invalid state for run={self.state!r}')
return
mod_name, _, fn_name = self.worker.rpartition('.')
mod = importlib.import_module(mod_name)
fn = getattr(mod, fn_name)
root = gws.config.root()
try:
fn(root, self)
except Exception as e:
gws.log.error('job: FAILED', self.uid)
self.update(state=State.error, error=repr(e))
raise
[docs] def update(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
if 'result' in kwargs:
kwargs['result'] = gws.tools.json2.to_string(kwargs['result'])
storage.update(self.uid, **kwargs)
[docs] def cancel(self):
self.update(state=State.cancel)
[docs] def remove(self):
storage.remove(self.uid)