Source code for gws.core.tree

import importlib

from . import util, error, log, debug
import gws.types as t

_UIDS = set()


#:export IObject
[docs]class Object(t.IObject): access: t.Access config: t.Config parent: t.IObject root: t.IRootObject def __init__(self): self.children: t.List[t.IObject] = [] self.klass = _class_name(self.__class__) self.uid: str = '' for a in 'access', 'config', 'parent', 'root': setattr(self, a, None) @property def props(self) -> t.Props: return t.cast(t.Props, None)
[docs] def is_a(self, klass): if isinstance(klass, type): return isinstance(self, klass) return self.klass == klass or self.klass.startswith(klass + '.')
def _new_uid(self, uid): global _UIDS n = 0 u = uid while u in _UIDS: n += 1 u = uid + str(n) _UIDS.add(u) return u def _auto_uid(self): u = self.var('uid') if u: return u u = self.var('title') if u: return util.as_uid(u) return self.klass.replace('.', '_')
[docs] def set_uid(self, uid): global _UIDS if not uid or uid == self.uid: return with util.global_lock(): if self.uid: _UIDS.discard(self.uid) self.uid = self._new_uid(uid)
[docs] def initialize(self, cfg): self.config = cfg self.access = self.var('access') self.set_uid(self._auto_uid()) try: self.configure() except Exception as e: raise _error(self, e) log.debug(f'configured {self.klass} uid={self.uid}')
[docs] def configure(self): # this is intended to be overridden pass
[docs] def post_initialize(self): try: for obj in reversed(self.root.all_objects): obj.post_configure() except Exception as e: raise _error(self, e)
[docs] def post_configure(self): # this is intended to be overridden pass
[docs] def var(self, key, default=None, parent=False): v = util.get(self.config, key) if v is not None: return v if parent and self.parent: return self.parent.var(key, default, parent=True) return default
[docs] def create_child(self, klass, cfg) -> t.IObject: return self.append_child(self.root.create_object(klass, cfg, parent=self))
[docs] def append_child(self, obj: t.IObject) -> t.IObject: obj.parent = self self.children.append(obj) return obj
[docs] def get_children(self, klass) -> t.List[t.IObject]: return list(_find_all(self.children, klass))
[docs] def get_closest(self, klass) -> t.IObject: if self.parent: if self.parent.is_a(klass): return self.parent return self.parent.get_closest(klass)
[docs] def props_for(self, user) -> t.Optional[dict]: if not user.can_use(self): return None return _make_props(self.props, user)
#:export IRootObject
[docs]class RootObject(Object, t.IRootObject): application: t.IApplication validator: t.SpecValidator def __init__(self): super().__init__() self.all_types = {} self.all_objects = [] self.shared_objects = {} self.root = self for a in 'application', 'validator': setattr(self, a, None)
[docs] def create(self, klass, cfg=None): cfg = _to_config(cfg) oo = self._create(klass, cfg) oo.root = self return oo
[docs] def create_object(self, klass, cfg, parent=None): cfg = _to_config(cfg) obj = self.create(klass, cfg) obj.parent = parent obj.initialize(cfg) self.all_objects.append(obj) return obj
[docs] def create_unbound_object(self, klass, cfg): cfg = _to_config(cfg) obj = self.create(klass, cfg) obj.initialize(cfg) return obj
[docs] def create_shared_object(self, klass, uid, cfg): cfg = _to_config(cfg) uid = _class_name(klass).replace('.', '_') + '_' + util.as_uid(uid) if uid in self.shared_objects: # log.debug(f'SHARED: FOUND {klass} {uid}') return self.shared_objects[uid] with util.global_lock(): log.debug(f'SHARED: create {klass} {uid}') obj = self.create_object(klass, util.merge(cfg, {'uid': uid})) self.shared_objects[uid] = obj return obj
[docs] def find_all(self, klass=None) -> t.List[t.IObject]: return list(_find_all(self.all_objects, klass))
[docs] def find_first(self, klass) -> t.IObject: for p in _find_all(self.all_objects, klass): return p
[docs] def find(self, klass, uid=None) -> t.IObject: return _find(self.all_objects, klass, uid)
[docs] def find_by_uid(self, uid) -> t.IObject: return _find_by_uid(self.all_objects, uid)
def _create(self, klass, cfg): if isinstance(klass, type): return klass() if cfg.type: klass += '.' + cfg.type if klass not in self.all_types: self.all_types[klass] = _load_class(klass) return self.all_types[klass]()
def _to_config(cfg): if util.is_data_object(cfg): return cfg if isinstance(cfg, dict): return t.Data(cfg) if not cfg: return t.Data() return cfg def _load_class(klass): try: mod = importlib.import_module(klass) except Exception as e: raise error.Error(f'import of {klass!r} failed') from e try: return mod.Object except Exception as e: raise error.Error(f'object not found in {klass!r}') from e def _find(nodes, klass, uid): if not uid: return for obj in nodes: if obj.uid == uid and obj.is_a(klass): return obj def _find_by_uid(nodes, uid): if not uid: return for obj in nodes: if obj.uid == uid: return obj def _find_all(nodes, klass): if not klass: yield from nodes else: for obj in nodes: if obj.is_a(klass): yield obj def _class_name(s): if isinstance(s, str): return s if not isinstance(s, type): s = s.__class__ mod = s.__module__ name = s.__name__ if name == 'Object': return mod return mod + '.' + name def _exc_name_for_error(e): cls = 'Error' try: cls = e.__class__.__name__ except: pass a = '?' try: a = e.args[0] except: pass if isinstance(e, error.Error): return a return '%s: %s' % (cls, a) def _object_name_for_error(x): cls = getattr(x, 'klass', '') if not cls: try: cls = x.__class__.__name__ except: cls = 'object' uid = getattr(x, 'uid', '') if uid: return '%s(%s)' % (cls, uid) return cls def _error(obj, exc): msg = '%s\nin %s' % (_exc_name_for_error(exc), _object_name_for_error(obj)) raise error.Error(msg) def _make_props(obj, user): if obj is None or isinstance(obj, (int, float, bool, str, bytes)): return obj if isinstance(obj, t.IObject): return _make_props(obj.props_for(user), user) if util.is_data_object(obj): return _make_props(vars(obj), user) if isinstance(obj, dict): ls = {} for k, v in obj.items(): v = _make_props(v, user) if v is not None: ls[k] = v return ls if isinstance(obj, (list, tuple)): ls = [] for v in obj: v = _make_props(v, user) if v is not None: ls.append(v) return ls if util.has(obj, 'props'): return _make_props(util.get(obj, 'props'), user) if obj: return str(obj)