Source code for gws.ext.action.georisks.aartelink

"""Utilities for EASD AarteLink push notifications"""

import hashlib

import gws
import gws.tools.date
import gws.tools.net
import gws.tools.json2
import gws.common.db
import gws.gis.shape

_DEVICE_STATE_VERB = 'receiveDeviceState'
_ALARM_MESSAGE_VERB = 'receiveAlarmMessage'


[docs]def service_request(action): url = action.var('aarteLink.serviceUrl') + '/' + 'overview' auth = (action.var('aarteLink.serviceLogin'), action.var('aarteLink.servicePassword')) resp = gws.tools.net.http_request(url, auth=auth) r = gws.tools.json2.from_string(resp.text) # "version": 1, # "timestamp": 1561541113, # "time": "2019-06-26T11:25:13+02:00", # "messageType": "system_overview", # "data": { # "version": 1, # "customerId": ..., # "systemId": "...", # "name": "...", # "comAddr": "...", # "devices": { # "...": { # "name": ..., # "serial": "...", # "state": "...", # "errorLevel": 0, # "errorLevelName": "...", # "type": "...", # "typeName": "...", # "location": "...", recs = [] for id, dev in r['data']['devices'].items(): rec = { 'id': id, 'name': dev.get('name', ''), 'state': dev.get('state', ''), 'errorlevel': int(dev.get('errorLevel', 0)), 'errorlevelname': dev.get('errorLevelName', ''), 'type': dev.get('type', ''), 'typename': dev.get('typeName', ''), } if dev.get('location'): x, y = dev.get('location').split(':') rec['geom'] = gws.gis.shape.from_geometry({ 'type': 'Point', 'coordinates': [float(x), float(y)] }, 'EPSG:4326') recs.append(rec) with action.db.connect() as conn: conn.exec(f'TRUNCATE TABLE {action.DEVICE_TABLE_NAME}') tbl = gws.common.db.SqlTableConfig({ 'name': action.DEVICE_TABLE_NAME, 'keyColumn': 'id', 'geometryColumn': 'geom' }) action.db.insert(tbl, recs)
[docs]def handle(action, req): # because of the encoding issues, we use the raw request_uri and parse it manually uri = req.env('REQUEST_URI', '') p = uri.split('/') for n, s in enumerate(p): if s == _DEVICE_STATE_VERB: r = _parse_device_state(action, p[n:]) _save_device_state(action, r) return if s == _ALARM_MESSAGE_VERB: r = _parse_alarm_message(action, p[n:]) _save_alarm_message(action, r) return raise ValueError(f'unknown verb: {uri!r}')
def _parse_device_state(action, p): # 4.2.2 the url format is # <BaseAddress>/receiveDeviceState/<custormerId>/<systemId>/<deviceId>/<values>/<timestamp>/< checksum> r = { 'cmd': p[0], 'customer_id': p[1], 'system_id': p[2], 'device_id': p[3], 'payload': p[4], 'values': _parse_device_state_values(p[4]), 'timestamp': p[5], 'checksum': p[6], } # <systemKey><custormerId><systemId><deviceId><values> _validate_checksum(action, r, 'customer_id', 'system_id', 'device_id', 'payload') return r def _parse_device_state_values(payload): vs = [] for kv in gws.tools.net.unquote(payload).split(','): kv = kv.strip() if not kv: continue m = kv.split('=') if len(m) == 2: k, v = m u = '' elif len(m) == 3: k, v, u = m else: raise ValueError(f'unexpected key-value pair {kv!r}') # @TODO convert types vs.append({ 'name': k.strip(), 'value': v.strip(), 'unit': u.strip(), }) return vs def _parse_alarm_message(action, p): # spec 4.2.3 # <BaseAddress>/receiveAlarmMessage/<custormerId>/<systemId>/<type>/<message>/<timestamp>/<checksum> r = { 'cmd': p[0], 'customer_id': p[1], 'system_id': p[2], 'type': p[3], 'payload': p[4], 'message': gws.tools.net.unquote(p[4]), 'timestamp': p[5], 'checksum': p[6], } # md5($systemKey.$customerId.$systemId.$type.$messageEncoded.$timestamp) _validate_checksum(action, r, 'customer_id', 'system_id', 'type', 'payload', 'timestamp') return r def _save_device_state(action, r): data = [ gws.merge(v, { 'customer_id': r['customer_id'], 'system_id': r['system_id'], 'device_id': r['device_id'], 'time_created': _to_date(r), }) for v in r['values'] ] with action.db.connect() as conn: conn.batch_insert(action.MESSAGE_TABLE_NAME, data) def _save_alarm_message(action, r): rec = { 'customer_id': r['customer_id'], 'system_id': r['system_id'], 'type': r['type'], 'message': r['message'], 'time_created': _to_date(r), } with action.db.connect() as conn: conn.insert_one(action.ALARM_TABLE_NAME, 'id', rec) def _validate_checksum(action, r, *keys): system_key = action.var('aarteLink.systemKey', default='') h = system_key + ''.join(r[k] for k in keys) md5 = _md5(h) if md5 != r['checksum']: gws.log.warn(f"checksum mismatch: h={h!r} cs={r['checksum']!r}") # raise ValueError(f"checksum mismatch: h={h!r} cs={r['checksum']!r}") def _to_date(r): return gws.tools.date.to_isotz( gws.tools.date.utc_from_timestamp( int(r['timestamp']))) def _md5(s): return hashlib.md5(gws.as_bytes(s)).hexdigest()