Source code for gws.ext.action.bplan.importer

import re
import shutil
import zipfile
import PIL.Image

import gws
import gws.ext.db.provider.postgres
import gws.gis.extent
import gws.gis.gdal2
import gws.gis.shape
import gws.qgis.project
import as os2

import gws.types as t

[docs]class Stats(t.Data): numRecords: int numPngs: int numPdfs: int
[docs]def run(action, src_path: str, replace: bool, au_uid: str = None, job: = None) -> Stats: """"Import bplan data from a file or a directory.""" tmp_dir = None if os2.is_file(src_path): # a file is given - unpack it into a temp dir tmp_dir = gws.ensure_dir(gws.TMP_DIR + '/bplan_' + gws.random_string(32)) _extract(src_path, tmp_dir) stats = None try: stats = _run2(action, tmp_dir or src_path, replace, au_uid, job) except as e: pass if tmp_dir: shutil.rmtree(tmp_dir) return stats
[docs]def update(action): with action.db.connect() as conn: rs ='SELECT DISTINCT _au FROM {conn.quote_table(}') au_uids = set(r['_au'] for r in rs) _update_pdfs(action, au_uids) _create_qgis_projects(action, au_uids)
## def _run2(action, src_dir, replace, au_uid, job): gws.log.debug(f'BEGIN {src_dir!r} au={au_uid!r}') stats = Stats(numRecords=0, numPngs=0, numPdfs=0) _update_job(job, step=0, steps=5) # iterate shape files and prepare a list of db records shp_paths = set() recs = {} for p in sorted(os2.find_files(src_dir, ext='shp')): # NB prefer '..._utf8.shp' variants if they exist if 'utf8' in p: shp_paths.discard(p.replace('utf8', '')) shp_paths.add(p) for p in sorted(shp_paths): gws.log.debug(f'read {p!r}') if au_uid and not _path_belongs_to_au(p, [au_uid]): continue with gws.gis.gdal2.from_path(p) as ds: for f in gws.gis.gdal2.features(ds,, encoding=_encoding(p)): r = {} # convert all attributes to strings for a in f.attributes: if a.type == t.AttributeType.datetime: val = else: val = str(a.value) r[] = val r['_uid'] = uid = r[action.key_col] r['_au'] = r[action.au_key_col] type_name = r.get(action.type_col, '') for ty in action.type_list: if ty.srcName == type_name: r['_type'] = ty.uid break if uid not in recs: recs[uid] = r if not f.shape: # if no geometry found, create a point from x/y coords try: f.shape = gws.gis.shape.from_geometry({ "type": "Point", "coordinates": [ float(r[action.x_coord_col]), float(r[action.y_coord_col]), ] }, except: pass if f.shape: s = f.shape.to_multi() recs[uid][_geom_name(s)] = s.ewkt _update_job(job, step=1) # insert records table: t.SqlTable = action.plan_table db: gws.ext.db.provider.postgres.Object = action.db recs = list(recs.values()) au_uids = [au_uid] if au_uid else sorted(set(r['_au'] for r in recs)) with db.connect() as conn: src = with conn.transaction(): for a in au_uids: au_recs = [r for r in recs if r['_au'] == a] if not au_recs: continue gws.log.debug(f'insert {a!r} ({len(au_recs)})') stats.numRecords += len(au_recs) if replace: conn.execute(f'DELETE FROM {conn.quote_table(src)} WHERE _au = %s', [a]) else: uids = [r['_uid'] for r in au_recs] ph = ','.join(['%s'] * len(uids)) conn.execute(f'DELETE FROM {conn.quote_table(src)} WHERE _uid IN ({ph})', uids) conn.insert_many(src, au_recs) _update_job(job, step=2) # move png/pgw files into place dd = action.data_dir if replace: for p in os2.find_files(f'{dd}/png'): if _path_belongs_to_au(p, au_uids): gws.log.debug(f'delete {p}') os2.unlink(p) for p in os2.find_files(src_dir, ext='png'): if not _path_belongs_to_au(p, au_uids): continue w = re.sub(r'\.png$', '.pgw', p) if not os2.is_file(w): continue fb = _fnbody(p) gws.log.debug(f'copy {fb}.png') shutil.copyfile(p, f'{dd}/png/{fb}.png') shutil.copyfile(w, f'{dd}/png/{fb}.pgw') stats.numPngs += 1 _update_job(job, step=3) # move pdfs into place if replace: for p in os2.find_files(f'{dd}/pdf'): if _path_belongs_to_au(p, au_uids): gws.log.debug(f'delete {p}') os2.unlink(p) for p in os2.find_files(src_dir, ext='pdf'): if not _path_belongs_to_au(p, au_uids): continue fb = _fnbody(p) gws.log.debug(f'copy {fb}.pdf') shutil.copyfile(p, f'{dd}/pdf/{fb}.pdf') stats.numPdfs += 1 _update_job(job, step=4) # _update_pdfs(action, au_uids) _update_job(job, step=5) # _create_qgis_projects(action, au_uids) _update_job(job, gws.log.debug(f'END {src_dir!r}') return stats def _update_pdfs(action, au_uids): gws.log.debug(f'update pdfs for {au_uids!r}') dd = action.data_dir by_uid = {} with action.db.connect() as conn: for r in'SELECT _au, _uid FROM {conn.quote_table(}'): if r['_au'] in au_uids: by_uid[r['_uid']] = [] for p in os2.find_files(dd + '/pdf', ext='pdf'): fn = _filename(p) for uid, names in by_uid.items(): if fn.startswith(uid): names.append(fn) break with action.db.connect() as conn: with conn.transaction(): for uid, names in by_uid.items(): if names: gws.log.debug(f'save pdfs for {uid}') names = ','.join(names) conn.execute(f'UPDATE {conn.quote_table(} SET medien=%s WHERE _uid=%s', [names, uid]) def _create_qgis_projects(action, au_uids): gws.log.debug(f'create qgis projects for {au_uids!r}') dd = action.data_dir extents = _enum_extents(action, au_uids) layers = _enum_layers(action, au_uids) for au_uid in au_uids: path = f'{dd}/qgs/{au_uid}.qgs' ls = [la for la in layers if la['au_uid'] == au_uid] if not ls: os2.unlink(path) return ext = extents.get(au_uid) if not ext: continue res = action.qgis_template.render({ 'extent': ext, 'layers': ls }) gws.write_file(path, res.content) gws.log.debug(f'created {path!r}') def _enum_extents(action, au_uids): extents = {} with action.db.connect() as conn: tab = conn.quote_table( rs ='SELECT _au, ST_Extent(_geom_p) AS p FROM {tab} GROUP BY _au') for rec in rs: if rec['_au'] not in au_uids: continue extents[rec['_au']] = gws.gis.extent.from_box(rec['p']) return extents def _enum_layers(action, au_uids): layers = {} images = _enum_images(action) au_index = {au.uid: au for au in action.au_list} type_index = {ty.uid: ty for ty in action.type_list} def _layer_uid(rec, geom_type): return rec['_type'].lower() + '_' + geom_type + '_' + rec['_au'] def _new_layer(rec, geom_type): au = au_index.get(rec['_au']) au_name = if au else '' ty = type_index.get(rec['_type']) type_name = if ty else '' color = ty.color if ty else '' return { 'uid': _layer_uid(rec, geom_type), 'geom': geom_type, 'type': rec['_type'], 'type_name': type_name, 'au_uid': rec['_au'], 'au_name': au_name, 'color': color, 'images': [], } with action.db.connect() as conn: tab = conn.quote_table( rs ='SELECT _uid, _au, _type, _geom_p, _geom_l, _geom_x FROM {tab} ORDER BY _uid') for rec in rs: if rec['_au'] not in au_uids: continue for g in 'plx': if rec['_geom_' + g]: layer_uid = _layer_uid(rec, g) if layer_uid not in layers: layers[layer_uid] = _new_layer(rec, g) imgs = [img for img in images if img['fname'].startswith(rec['_uid'])] if imgs: layer_uid = _layer_uid(rec, 'r') if layer_uid not in layers: layers[layer_uid] = _new_layer(rec, 'r') layers[layer_uid]['images'].extend(imgs) ls = sorted(layers.values(), key=lambda la: la['type_name']) for la in ls: la['images'].sort(key=lambda img: img['fname'], reverse=True) return ls def _enum_images(action): dd = action.data_dir images = [] for path in os2.find_files(f'{dd}/png', ext='png'): fn = _fnbody(path) converted_path = f'{dd}/cnv/{fn}.png' if os2.file_mtime(converted_path) < os2.file_mtime(path): try: # reduce the image palette (20-30 colors work just fine for scanned plans) gws.log.debug(f'converting {path!r}') img = img = img.convert('RGBA') img = img.convert('P', palette=PIL.Image.ADAPTIVE, colors=action.image_quality) # copy the pgw along pgw = gws.read_file(f'{dd}/png/{fn}.pgw') gws.write_file(f'{dd}/cnv/{fn}.pgw', pgw) except Exception as e: gws.log.error(f'error converting {path!r}: {e}') continue images.append({ 'uid': '_r_' + fn, 'fname': fn, 'path': converted_path, 'palette': _image_palette(converted_path) }) return images def _image_palette(path): colors = [] img = palette = img.getpalette() # transparency is either a 256-bytes array for each entry or an integer index transparency ='transparency', None) if isinstance(transparency, bytes) and len(transparency) < 256: transparency = None for n in range(255): r = palette[n * 3 + 0] g = palette[n * 3 + 1] b = palette[n * 3 + 2] if isinstance(transparency, int): alpha = 0 if n == transparency else 0xFF elif isinstance(transparency, bytes): alpha = transparency[n] else: alpha = 0xFF colors.append(['#%02x%02x%02x' % (r, g, b), alpha]) return colors def _extract(zip_path, target_dir): zf = zipfile.ZipFile(zip_path) for fi in zf.infolist(): fn = _filename(fi.filename) if not fn or fn.startswith('.'): continue with as src, open(target_dir + '/' + fn, 'wb') as dst: gws.log.debug(f'unzip {fn!r}') shutil.copyfileobj(src, dst) def _encoding(path): if os2.is_file(path.replace('.shp', '.cpg')): # have a cpg file, let gdal handle the encoding return return 'utf8' if 'utf8' in path else 'ISO-8859–1' def _geom_name(s: t.IShape): if s.type == t.GeometryType.multipoint: return '_geom_x' if s.type == t.GeometryType.multilinestring: return '_geom_l' if s.type == t.GeometryType.multipolygon: return '_geom_p' raise ValueError(f'invalid geometry type: {s.type!r}') def _filename(path): return os2.parse_path(path)['filename'] def _fnbody(path): return os2.parse_path(path)['name'] def _path_belongs_to_au(path, au_uids): fn = _filename(path) # filename is like AAAAnnn.png or Shapes_AAAA_xxx.shp, where AAAA = au uid for a in au_uids: if fn.startswith(a) or (a + '_' in fn) or ('_' + a in fn): return True return False def _diff(a, b): d = {} for k in a.keys() | b.keys(): if a.get(k) != b.get(k): d[k] = a[k], b[k] return d def _update_job(job, **kwargs): if not job: return j = if not j: raise'NOT_FOUND') if j.state != raise'WRONG_STATE={j.state}') j.update(**kwargs)