"""Authorization and session manager."""
import gws
import gws.common.auth.error
import gws.common.auth.session
import gws.common.auth.stores.sqlite
import gws.common.auth.user
import gws.tools.date
import gws.tools.json2
import gws.types as t
from .error import Error
[docs]class Config(t.Config):
"""Authentication and authorization options"""
methods: t.Optional[t.List[t.ext.auth.method.Config]] #: authorization methods
providers: t.List[t.ext.auth.provider.Config] #: authorization providers
sessionLifeTime: t.Duration = 1200 #: sess life time
sessionStorage: str = 'sqlite' #: sess storage engine
#:export IAuthManager
[docs]class Object(gws.Object, t.IAuthManager):
"""Authorization manager."""
@property
def guest_session(self):
return self.new_session(type='guest', user=self.guest_user)
# session manager
[docs] def new_session(self, **kwargs):
return gws.common.auth.session.Session(**kwargs)
[docs] def open_session(self, req: t.IRequest) -> t.ISession:
for m in self.methods:
sess = m.open_session(self, req)
if sess:
return sess
return self.guest_session
[docs] def close_session(self, sess: t.ISession, req: t.IRequest, res: t.IResponse) -> t.ISession:
if sess and sess.method:
return sess.method.close_session(self, sess, req, res)
return self.guest_session
[docs] def login(self, method: t.IAuthMethod, login: str, password: str, req: t.IRequest) -> t.ISession:
sess = method.login(self, login, password, req)
if sess:
return sess
raise error.LoginNotFound()
[docs] def logout(self, sess: t.ISession, req: t.IRequest) -> t.ISession:
if sess and sess.method:
return sess.method.logout(self, sess, req)
return self.guest_session
# stored sessions
[docs] def find_stored_session(self, uid):
rec = self.store.find(uid)
if not rec:
return
age = gws.tools.date.timestamp() - rec['updated']
if age > self.session_life_time:
gws.log.debug(f'sess uid={uid!r} EXPIRED age={age!r}')
self.store.delete(uid)
return
return self.new_session(
type=rec['session_type'],
uid=rec['uid'],
method=self.get_method(rec['method_type']),
user=self.unserialize_user(rec['str_user']),
data=gws.tools.json2.from_string(rec['str_data'])
)
[docs] def create_stored_session(self, type: str, method: t.IAuthMethod, user: t.IUser) -> t.ISession:
self.store.cleanup(self.session_life_time)
uid = self.store.create(
session_type=type,
method_type=method.type,
provider_uid=user.provider.uid,
user_uid=user.uid,
str_user=self.serialize_user(user))
return self.find_stored_session(uid)
[docs] def save_stored_session(self, sess: t.ISession):
if sess.changed:
self.store.update(sess.uid, str_data=gws.tools.json2.to_string(sess.data))
else:
self.store.touch(sess.uid)
[docs] def destroy_stored_session(self, sess: t.ISession):
self.store.delete(sess.uid)
[docs] def delete_stored_sessions(self):
self.store.delete_all()
[docs] def stored_session_records(self) -> t.List[dict]:
return self.store.get_all()
#
[docs] def authenticate(self, method: t.IAuthMethod, login, password, **kw) -> t.Optional[t.IUser]:
for prov in self.providers:
if prov.allowed_methods and method.type not in prov.allowed_methods:
continue
gws.log.debug(f'trying provider {prov.uid!r} for login {login!r}')
user = prov.authenticate(method, login, password, **kw)
if user:
return user
[docs] def get_user(self, user_fid: str) -> t.Optional[t.IUser]:
provider_uid, user_uid = gws.common.auth.user.parse_fid(user_fid)
prov = self.get_provider(provider_uid)
if prov:
return prov.get_user(user_uid)
[docs] def get_role(self, name: str) -> t.IRole:
return gws.common.auth.user.Role(name)
[docs] def get_provider(self, uid: str) -> t.Optional[t.IAuthProvider]:
for prov in self.providers:
if prov.uid == uid:
return prov
[docs] def get_method(self, type: str) -> t.Optional[t.IAuthMethod]:
for m in self.methods:
if m.type == type:
return m
[docs] def serialize_user(self, user: t.IUser) -> str:
return gws.tools.json2.to_string(user.provider.user_to_dict(user))
[docs] def unserialize_user(self, s: str) -> t.IUser:
d = gws.tools.json2.from_string(s)
prov = self.get_provider(d['provider_uid'])
return prov.user_from_dict(d)