Source code for gws.core.spec

"""Validate values according to specs"""

import os
import re
import gws.tools.units
import gws.tools.date

import gws.types as t


[docs]class Error(Exception): def __init__(self, *args): super().__init__(*args) self.message = args[0] if args else ''
#:export SpecValidator
[docs]class SpecValidator: def __init__(self, spec): self.spec = spec
[docs] def method_spec(self, name): return self.spec.get('method:' + name)
[docs] def read_value(self, val, type_name, path='', strict=True): reader = _Reader(self.spec, path, strict) return reader.read(val, type_name)
## class _Reader: def __init__(self, spec, path, strict): self.spec = spec self.path = path self.keys = [] self.strict = strict self.handlers = _HANDLERS def error(self, code, msg, val): val = repr(val) if len(val) > 600: val = val[:600] + '...' raise Error(code + ': ' + msg, self.path, '.'.join(str(k) for k in self.keys), val) def read(self, val, type_name): if type_name in self.handlers: return self.handlers[type_name](self, val, None) s = self.spec[type_name] return self.handlers[s['type']](self, val, s) # type handlers def _read_any(rd, val, spec): return val def _read_bool(rd, val, spec): if rd.strict: return _ensure(rd, val, bool) try: return bool(val) except: rd.error('ERR_MUST_BE_BOOL', 'must be true or false', val) def _read_str(rd, val, spec): if rd.strict: return _ensure(rd, val, str) try: return _to_string(val) except: rd.error('ERR_MUST_BE_STRING', 'must be a string', val) def _read_literal(rd, val, spec): return _read_str(rd, val, spec) def _read_bytes(rd, val, spec): try: if isinstance(val, str): return val.encode('utf8', errors='strict') return bytes(val) except: rd.error('ERR_MUST_BE_BYTES', 'must be a byte buffer', val) def _read_int(rd, val, spec): if rd.strict: return _ensure(rd, val, int) try: return int(val) except: rd.error('ERR_MUST_BE_INT', 'must be an integer', val) def _read_float(rd, val, spec): if rd.strict: if isinstance(val, int): return float(val) return _ensure(rd, val, float) try: return float(val) except: rd.error('ERR_MUST_BE_FLOAT', 'must be a float', val) def _read_dict(rd, val, spec): return _ensure(rd, val, dict) def _read_object(rd, val, spec): val = _ensure(rd, val, dict) if not rd.strict: val = {k.lower(): v for k, v in val.items()} res = {} for p in spec['props']: name = p['name'] rd.keys.append(name) try: res[name] = _property_value(rd, val.get(name if rd.strict else name.lower()), p) finally: rd.keys.pop() if rd.strict: names = set(p['name'] for p in spec['props']) unknown = [key for key in val if key not in names] if unknown: return rd.error('ERR_UNKNOWN_PROP', f"unknown properties: {_comma(unknown)}, expected {_comma(names)}", val) return t.Data(res) def _read_taggedunion(rd, val, spec): if not hasattr(val, 'get'): return rd.error('ERR_UNEXPECTED', 'unexpected value', val) # tagged unions are discriminated by the "tag" prop # the 'parts' spec is a dict tag_value => class_name type_name = val.get(spec['tag']) base = spec['parts'].get(type_name) if base: return rd.read(val, base) return rd.error('ERR_BAD_TYPE', f"illegal type: {type_name!r}, expected {_comma(spec['parts'])}", val) def _read_union(rd, val, spec): # @TODO no untyped unions yet return rd.error('ERR_BAD_TYPE', 'not supported', val) def _read_list(rd, val, spec): if not rd.strict and isinstance(val, str): val = val.strip() val = [v.strip() for v in val.split(',')] if val else [] val = _ensure(rd, val, list) res = [] for n, v in enumerate(val): rd.keys.append(n) try: res.append(rd.read(v, spec['bases'][0])) finally: rd.keys.pop() return res def _read_tuple(rd, val, spec): if not rd.strict and isinstance(val, str): val = val.strip() val = [v.strip() for v in val.split(',')] if val else [] val = _ensure(rd, val, list) if len(val) != len(spec['bases']): rd.error('ERR_BAD_TYPE', f"expected {spec['name']!r}", val) res = [] for n, v in enumerate(val): rd.keys.append(n) try: res.append(rd.read(v, spec['bases'][n])) finally: rd.keys.pop() return res def _read_enum(rd, val, spec): # NB: our Enums (see __init__) accept both names (for configs) and values (for api calls) # this blocks silly things like Enum {foo=bar bar=123} but we don't care for k, v in spec['values'].items(): if val == k or val == v: return v rd.error('ERR_BAD_ENUM', f"invalid value, expected {_comma(spec['values'])}", val) def _read_dirpath(rd, val, spec): path = os.path.join(os.path.dirname(rd.path), val) if not os.path.isdir(path): rd.error('ERR_DIR_NOT_FOUND', 'directory not found', path) return path def _read_filepath(rd, val, spec): path = os.path.join(os.path.dirname(rd.path), val) if not os.path.isfile(path): rd.error('ERR_FILE_NOT_FOUND', 'file not found', path) return path def _read_duration(rd, val, spec): try: return gws.tools.units.parse_duration(val) except ValueError: rd.error('ERR_BAD_DURATION', 'invalid duration', val) def _read_regex(rd, val, spec): try: re.compile(val) return val except re.error as e: rd.error('ERR_BAD_REGEX', f"invalid regular expression: {e!r}", val) def _read_formatstr(rd, val, spec): # @TODO return _read_str(rd, val, spec) def _read_crs(rd, val, spec): # @TODO: crs validation return _read_str(rd, val, spec) def _read_color(rd, val, spec): # @TODO: color validation return _read_str(rd, val, spec) def _read_date(rd, val, spec): d = gws.tools.date.from_iso(str(val)) if not d: return rd.error('ERR_INVALID_DATE', 'invalid date', val) return gws.tools.date.to_iso_date(d) def _read_datetime(rd, val, spec): d = gws.tools.date.from_iso(str(val)) if not d: return rd.error('ERR_INVALID_DATE', 'invalid date', val) return gws.tools.date.to_iso(d) def _read_url(rd, val, spec): # @TODO: url validation return _read_str(rd, val, spec) ## utils def _property_value(rd, prop_val, spec): default = spec['default'] # no value? if prop_val is None: if not spec['optional']: return rd.error('ERR_MISSING_PROP', f"required property missing: {spec['name']!r}", 'nothing') # no default as well if default is None: return None # the default, if given, must match the type # NB, for Data objects, default={} will create an objects with defaults return rd.read(default, spec['type']) return rd.read(prop_val, spec['type']) def _ensure(rd, val, klass): if isinstance(val, klass): return val if klass == list and isinstance(val, tuple): return list(val) if klass == dict and gws.is_data_object(val): return vars(val) rd.error('ERR_WRONG_TYPE', f"wrong type {_classname(type(val))!r}, expected {_classname(klass)!r}", val) def _to_string(x): if isinstance(x, str): return x if isinstance(x, (bytes, bytearray)): return x.decode('utf8') raise ValueError() def _classname(cls): try: return cls.__name__ except: return str(cls) def _comma(ls): return ', '.join(sorted(repr(x) for x in ls)) ## _HANDLERS = { 'any': _read_any, 'bool': _read_bool, 'bytes': _read_bytes, 'dict': _read_dict, 'enum': _read_enum, 'float': _read_float, 'int': _read_int, 'list': _read_list, 'object': _read_object, 'str': _read_str, 'literal': _read_literal, 'tuple': _read_tuple, 'taggedunion': _read_taggedunion, 'gws.types.Crs': _read_crs, 'gws.types.Color': _read_color, 'gws.types.Date': _read_date, 'gws.types.DateTime': _read_datetime, 'gws.types.DirPath': _read_dirpath, 'gws.types.Duration': _read_duration, 'gws.types.FilePath': _read_filepath, 'gws.types.FormatStr': _read_formatstr, 'gws.types.Regex': _read_regex, 'gws.types.Url': _read_url, 'gws.types.Any': _read_any, }