"""Manage construction plans."""
import gws
import gws.common.action
import gws.common.metadata
import gws.tools.date
import gws.common.db
import gws.common.template
import gws.ext.db.provider.postgres
import gws.server.spool
import gws.tools.job
import gws.gis.shape
import gws.tools.json2
import gws.tools.upload
import gws.web.error
import gws.types as t
from . import importer
[docs]class AdministrativeUnitConfig(t.WithAccess):
uid: str
name: str
[docs]class PlanTypeConfig(t.Config):
uid: str
name: str
srcName: str
color: str
[docs]class Config(t.WithTypeAndAccess):
"""Construction plans action"""
db: str = '' #: database provider ID
crs: t.Crs #: CRS for the bplan data
planTable: gws.common.db.SqlTableConfig #: plan table configuration
metaTable: gws.common.db.SqlTableConfig #: meta table configuration
dataDir: t.DirPath #: data directory
templates: t.List[t.ext.template.Config]
administrativeUnits: t.List[AdministrativeUnitConfig]
planTypes: t.List[PlanTypeConfig]
imageQuality: int = 24 #: palette size for optimized images
uploadChunkSize: int #: upload chunk size in mb
[docs]class AdministrativeUnit(t.Data):
uid: str
name: str
[docs]class Props(t.Props):
type: t.Literal = 'bplan'
auList: t.List[AdministrativeUnit]
uploadChunkSize: int
[docs]class StatusParams(t.Params):
jobUid: str
[docs]class StatusResponse(t.Response):
jobUid: str
progress: int
state: gws.tools.job.State
stats: importer.Stats
[docs]class ImportParams(t.Params):
uploadUid: str
auUid: str
replace: bool
[docs]class GetFeaturesParams(t.Params):
auUid: str
[docs]class GetFeaturesResponse(t.Response):
features: t.List[t.FeatureProps]
[docs]class DeleteFeatureParams(t.Params):
uid: str
[docs]class DeleteFeatureResponse(t.Response):
pass
[docs]class LoadInfoParams(t.Params):
auUid: str
[docs]class LoadInfoResponse(t.Response):
info: str
[docs]class Object(gws.common.action.Object):
[docs] def post_configure(self):
super().post_configure()
self._load_db_meta()
[docs] def props_for(self, user):
return {
'type': self.type,
'auList': self._au_list_for(user),
'uploadChunkSize': self.var('uploadChunkSize') * 1024 * 1024,
}
[docs] def api_get_features(self, req: t.IRequest, p: GetFeaturesParams) -> GetFeaturesResponse:
au_uid = self._check_au(req, p.auUid)
features = self.db.select(t.SelectArgs(
table=self.plan_table,
extra_where=[f'_au = %s', au_uid],
sort='name',
))
for f in features:
f.apply_templates(self.templates)
g = f.attr('_geom_p') or f.attr('_geom_l') or f.attr('_geom_x')
if g:
f.shape = gws.gis.shape.from_wkb_hex(g, self.plan_table.geometry_crs)
f.attributes = []
return GetFeaturesResponse(features=[f.props for f in features])
[docs] def api_delete_feature(self, req: t.IRequest, p: DeleteFeatureParams) -> DeleteFeatureResponse:
with self.db.connect() as conn:
r = conn.select_one(f'''
SELECT *
FROM {conn.quote_table(self.plan_table.name)}
WHERE _uid = %s
''', [p.uid])
if not r:
raise gws.web.error.NotFound()
self._check_au(req, r['_au'])
conn.execute(f'''
DELETE
FROM {conn.quote_table(self.plan_table.name)}
WHERE _uid = %s
''', [r['_uid']])
return DeleteFeatureResponse()
[docs] def api_upload_chunk(self, req: t.IRequest, p: gws.tools.upload.UploadChunkParams) -> gws.tools.upload.UploadChunkResponse:
return gws.tools.upload.upload_chunk(p)
[docs] def api_import(self, req: t.IRequest, p: ImportParams) -> StatusResponse:
try:
rec = gws.tools.upload.get(p.uploadUid)
except gws.tools.upload.Error as e:
gws.log.error(e)
raise gws.web.error.BadRequest()
job_uid = gws.random_string(64)
args = {
'actionUid': self.uid,
'auUid': self._check_au(req, p.auUid),
'path': rec.path,
'replace': p.replace,
}
job = gws.tools.job.create(
uid=job_uid,
user=req.user,
args=gws.tools.json2.to_string(args),
worker=__name__ + '._worker')
gws.server.spool.add(job)
return StatusResponse(
jobUid=job.uid,
state=job.state,
)
[docs] def api_import_status(self, req: t.IRequest, p: StatusParams) -> StatusResponse:
job = gws.tools.job.get_for(req.user, p.jobUid)
if not job:
raise gws.web.error.NotFound()
return StatusResponse(
jobUid=job.uid,
state=job.state,
progress=job.progress,
stats=job.result.get('stats', {}) if job.result else {},
)
[docs] def api_import_cancel(self, req: t.IRequest, p: StatusParams) -> StatusResponse:
"""Cancel a print job"""
job = gws.tools.job.get_for(req.user, p.jobUid)
if not job:
raise gws.web.error.NotFound()
job.cancel()
return StatusResponse(
jobUid=job.uid,
state=job.state,
)
[docs] def api_load_info(self, req: t.IRequest, p: LoadInfoParams) -> LoadInfoResponse:
res = self.info_template.render({'auUid': p.auUid})
return LoadInfoResponse(info=res.content)
[docs] def do_import(self, path, replace):
importer.run(self, path, replace)
[docs] def do_update(self):
importer.update(self)
def _au_list_for(self, user):
return [
t.Data(uid=au.uid, name=au.name)
for au in self.au_list
if user.can_use(au)
]
def _check_au(self, req, au_uid):
au_uids = set(au.uid for au in self._au_list_for(req.user))
if au_uid not in au_uids:
gws.log.error(f'wrong auUid={au_uid}')
raise gws.web.error.Forbidden()
return au_uid
def _load_db_meta(self):
metas = {}
with self.db.connect() as conn:
rs = conn.select(f'''SELECT * FROM {conn.quote_table(self.meta_table.name)}''')
for r in rs:
au_uid = r['_au']
metas[au_uid] = gws.common.metadata.from_dict(gws.tools.json2.from_string(r['meta']))
rs = conn.select(f'''
SELECT _au, MIN(_updated) AS mi, MAX(_updated) AS ma
FROM {conn.quote_table(self.plan_table.name)}
GROUP BY _au
''')
for r in rs:
au_uid = r['_au']
if au_uid not in metas:
metas[au_uid] = t.MetaData()
metas[au_uid].dateCreated = gws.tools.date.to_iso(gws.tools.date.to_utc(r['mi']), with_tz='Z')
metas[au_uid].dateUpdated = gws.tools.date.to_iso(gws.tools.date.to_utc(r['ma']), with_tz='Z')
for obj in self.root.find_all():
uid = gws.get(obj, 'uid') or ''
if uid and gws.get(obj, 'meta'):
for au_uid, meta in metas.items():
if uid.endswith(au_uid):
obj.meta = gws.common.metadata.extend(meta, obj.meta)
if gws.get(obj, 'update_sequence'):
obj.update_sequence = meta.dateUpdated
def _worker(root: t.IRootObject, job: gws.tools.job.Job):
args = gws.tools.json2.from_string(job.args)
action = root.find('gws.ext.action', args['actionUid'])
job.update(state=gws.tools.job.State.running)
stats = importer.run(action, args['path'], args['replace'], args['auUid'], job)
job.update(result={'stats': stats})