Source code for gws.core.util

"""Core utilities

Most common function which are needed everywhere. These function are exported in `gws` and can be used as gws.function().
"""

import hashlib
import os
import pickle
import random
import re
import sys
import threading
import time

import gws.core.const
import gws.types as t


[docs]def exit(code: int = 255): """Exit the application. Args: code: Exit code. """ sys.exit(code)
[docs]def is_data_object(x): return isinstance(x, t.Data)
[docs]def get(x, key, default=None): """Get a nested value/attribute from a structure. Args: x: A dict, list or Data. key: A list or a dot separated string of nested keys. default: The default value. Returns: The value if it exists and the default otherwise. """ if not x: return default if isinstance(key, str): key = key.split('.') val, ok = _get(x, key) return val if ok else default
[docs]def has(x, key) -> bool: """True if a nested value/attribute exists in a structure. Args: x: A dict, list or Data. key: A list or a dot separated string of nested keys. Returns: True if a key exists """ if not x: return False if isinstance(key, str): key = key.split('.') _, ok = _get(x, key) return ok
[docs]def merge(x, *args, **kwargs): """Create a dict/Data object with the values from dicts/Datas or kwargs, overwriting keys. Args: x: A dict or a Data. *args: Dicts or Datas. **kwargs: Keyword args. Returns: A new object (dict or Data). """ x = x or {} d = dict(as_dict(x)) for a in args: d.update(as_dict(a)) d.update(kwargs) if isinstance(x, dict): return d return type(x)(d)
[docs]def extend(x, *args, **kwargs): """Create a dict/Data object with the values from dicts/Datas or kwargs, do not overwrite keys unless they're None. Args: x: A dict or a Data. *args: Dicts or Datas. **kwargs: Keyword args. Returns: A new object (dict or Data). """ x = x or {} d = {} for a in args: d.update(as_dict(a)) d.update(kwargs) e = dict(as_dict(x)) for k, v in d.items(): if e.get(k) is None: e[k] = v if isinstance(x, dict): return e return type(x)(e)
[docs]def filter(x, fn=None): """Apply a filter to a collection. Args: x: A dict/Data or an iterable. fn: Filtering function, if omitted, blank strings and empty values are removed. Returns: A filtered object. """ def _is_not_empty_or_blank(x): if isinstance(x, (str, bytes, bytearray)): x = x.strip() return not is_empty(x) fn = fn or _is_not_empty_or_blank if isinstance(x, dict): return {k: v for k, v in x.items() if fn(v)} if is_data_object(x): d = {k: v for k, v in vars(x).items() if fn(v)} return type(x)(d) return [v for v in x if fn(v)]
[docs]def compact(x): """Remove all None values from a collection.""" return filter(x, lambda v: v is not None)
[docs]def deep_merge(x, *args, **kwargs): """Deeply merge dicts/Datas into a Data object. Args: x: A dict or a Data. *args: Dicts or Datas. **kwargs: Keyword args. Returns: A new object (dict or Data). """ def flatten(o, keys, f): if is_data_object(o): o = vars(o) if isinstance(o, dict): for k, v in o.items(): flatten(v, keys + (k,), f) return f[keys] = o def unflatten(o, f): for keys, v in f.items(): p = o for k in keys[:-1]: if getattr(p, k, None) is None: setattr(p, k, t.Data()) p = getattr(p, k) setattr(p, keys[-1], v) flat = {} flatten(x, (), flat) for a in args: flatten(a, (), flat) for k, v in kwargs.items(): flat[(k,)] = v d = t.Data() unflatten(d, flat) return d
[docs]def map(x, fn): """Apply a function to a collection. Args: x: A dict/Data or an iterable. fn: A function. Returns: A mapped object. """ if isinstance(x, dict): return {k: fn(v) for k, v in x.items()} if is_data_object(x): d = {k: fn(v) for k, v in vars(x).items()} return type(x)(d) return [fn(v) for v in x]
[docs]def strip(x): """Strip all str values in a collection and remove empty values. Args: x: A dict/Data or an iterable. Returns: The stripped object. """ def _strip(v): if isinstance(v, (str, bytes, bytearray)): return v.strip() return v return filter(map(x, _strip))
[docs]def is_empty(x) -> bool: """Check if the value is empty (None, empty list/dict/object).""" if x is None: return True try: return len(x) == 0 except TypeError: pass try: return not vars(x) except TypeError: pass return False
[docs]def as_int(x) -> int: """Convert a value to an int or 0 if this fails.""" try: return int(x) except: return 0
[docs]def as_float(x) -> float: """Convert a value to a float or 0.0 if this fails.""" try: return float(x) except: return 0.0
[docs]def as_str(x, encodings: t.List[str] = None) -> str: """Convert a value to a string. Args: x: Value. encodings: A list of acceptable encodings. If the value is bytes, try each encoding, and return the first one which passes without errors. Returns: A string. """ if isinstance(x, str): return x if not _is_bytes(x): return str(x) if encodings: for enc in encodings: try: return x.decode(encoding=enc, errors='strict') except UnicodeDecodeError: pass return x.decode(encoding='utf-8', errors='ignore')
[docs]def as_bytes(x) -> bytes: """Convert a value to bytes by converting it to string and encoding in utf8.""" if _is_bytes(x): return bytes(x) if not isinstance(x, str): x = str(x) return x.encode('utf8')
[docs]def as_list(x, delimiter: str = ',') -> list: """Convert a value to a list. Args: x: A value. Is it's a string, split it by the delimiter delimiter: Returns: A list. """ if isinstance(x, list): return x if is_empty(x): return [] if _is_bytes(x): x = as_str(x) if isinstance(x, str): if delimiter: ls = [s.strip() for s in x.split(delimiter)] return [s for s in ls if s] return [x] if isinstance(x, (int, float, bool)): return [x] try: return [s for s in x] except TypeError: return []
[docs]def as_dict(x) -> dict: """Convert a value to a dict. If the argument is a Data object, return its `dict`.""" if isinstance(x, dict): return x if isinstance(x, t.Data): return vars(x) return {}
_UID_DE_TRANS = { ord('ä'): 'ae', ord('ö'): 'oe', ord('ü'): 'ue', ord('ß'): 'ss', }
[docs]def as_uid(x) -> str: """Convert a value to an uid (alphanumeric string).""" if not x: return '' x = as_str(x).lower().strip().translate(_UID_DE_TRANS) x = re.sub(r'[^a-z0-9]+', '_', x) return x.strip('_')
[docs]def as_query_string(x) -> str: """Convert a dict/list to a query string. For each item in x, if it's a list, join it with a comma, stringify and in utf8. Args: x: Value, which can be a dict'able or a list of key,value pairs. Returns: The query string. """ p = [] items = x if _is_list(x) else as_dict(x).items() for k, v in sorted(items): k = _qs_quote(k) v = _qs_quote(v) p.append(k + b'=' + v) return (b'&'.join(p)).decode('ascii')
[docs]def lines(txt: str, comment: str = None) -> t.List[str]: """Convert a multiline string into a list of strings. Strip each line, skip empty lines, if `comment` is given, also remove lines starting with it. """ ls = [] for s in txt.splitlines(): if comment: s = s.split(comment)[0] s = s.strip() if s: ls.append(s) return ls
[docs]def read_file(path: str) -> str: with open(path, 'rt', encoding='utf8') as fp: return fp.read()
[docs]def read_file_b(path: str) -> bytes: with open(path, 'rb') as fp: return fp.read()
[docs]def write_file(path: str, s: str, user: int = None, group: int = None): with open(path, 'wt', encoding='utf8') as fp: fp.write(s) _chown(path, user, group)
[docs]def write_file_b(path: str, s: bytes, user: int = None, group: int = None): with open(path, 'wb') as fp: fp.write(s) _chown(path, user, group)
[docs]def ensure_dir(dir_path: str, base_dir: str = None, mode: int = 0o755, user: int = None, group: int = None) -> str: """Check if a (possibly nested) directory exists and create if it does not. Args: dir_path: Path to a directory. base_dir: Base directory. mode: Directory creation mode. user: Directory user (defaults to gws.UID) group: Directory group (defaults to gws.GID) Retruns: The absolute path to the directory. """ if base_dir: if os.path.isabs(dir_path): raise ValueError(f'cannot use an absolute path {dir_path!r} with a base dir') bpath = os.path.join(base_dir.encode('utf8'), dir_path.encode('utf8')) else: if not os.path.isabs(dir_path): raise ValueError(f'cannot use a relative path {dir_path!r} without a base dir') bpath = dir_path.encode('utf8') parts = [] for p in bpath.split(b'/'): parts.append(p) path = b'/'.join(parts) if path and not os.path.isdir(path): os.mkdir(path, mode) _chown(bpath, user, group) return bpath.decode('utf8')
def _chown(path, user, group): try: os.chown(path, user or gws.core.const.UID, group or gws.core.const.GID) except OSError: pass
[docs]def random_string(size: int) -> str: """Generate a random string of length `size`. """ a = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' r = random.SystemRandom() return ''.join(r.choice(a) for _ in range(size))
[docs]def sha256(s): return hashlib.sha256(as_bytes(s)).hexdigest()
[docs]class cached_property: """Decorator for a cached property.""" def __init__(self, fn): self._fn = fn self.__doc__ = getattr(fn, '__doc__') def __get__(self, obj, objtype=None): value = self._fn(obj) setattr(obj, self._fn.__name__, value) return value
_global_lock = threading.RLock() _global_vars = {}
[docs]def global_lock(): return _global_lock
[docs]def get_global(name, init_fn): """Get a global variable in a thread-safe way. Args: name: Variable name init_fn: Function that returns the value if the name doesn't exist. Returns: The variable value """ global _global_vars if name in _global_vars: return _global_vars[name] with global_lock(): if name in _global_vars: return _global_vars[name] _global_vars[name] = init_fn() return _global_vars[name]
[docs]def set_global(name, value): """Set a global variable in a thread-safe way. Args: name: Variable name. value: Variable value. Returns: The value """ global _global_vars with global_lock(): _global_vars[name] = value return _global_vars[name]
[docs]def get_cached_object(name, init_fn, max_age: int): """Return a cached object pickled in gws.OBJECT_CACHE_DIR. Args: name: Object name init_fn: Function that returns the value if the cache doesn't exist or is too old max_age: Cache max age in seconds. Returns: The value. """ path = gws.core.const.OBJECT_CACHE_DIR + '/' + as_uid(name) with global_lock(): try: age = time.time() - os.stat(path).st_mtime except: age = -1 if 0 <= age < max_age: try: with open(path, 'rb') as fp: return pickle.load(fp) except: pass try: os.unlink(path) except: pass obj = init_fn() if obj: with open(path, 'wb') as fp: pickle.dump(obj, fp) return obj
#################################################################################################### def _get(x, keys): try: for k in keys: if isinstance(x, dict): x = x[k] elif _is_list(x): x = x[int(k)] elif is_data_object(x): v = getattr(x, k) if v is None: # special case: raise a KeyError if the attribute is truly missing in a Data # (and not just equals to None) v = vars(x)[k] x = v else: x = getattr(x, k) return x, True except (KeyError, IndexError, AttributeError, ValueError): return None, False def _is_list(x): return isinstance(x, (tuple, list)) def _is_bytes(x): return isinstance(x, (bytes, bytearray)) # @TODO how to handle bytes-alikes? # return hasattr(x, 'decode') def _is_dict(x): return isinstance(x, dict) _QS_SAFE = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.-' _QS_MAP = { c: bytes([c]) if c in _QS_SAFE else b'%%%02X' % c for c in range(256) } def _qs_quote(x): return b''.join(_QS_MAP[c] for c in _qs_bytes(x)) def _qs_bytes(x): if _is_bytes(x): return x if isinstance(x, str): return x.encode('utf8') if x is True: return b'true' if x is False: return b'false' try: return b','.join(_qs_bytes(y) for y in x) except TypeError: return str(x).encode('utf8')