Source code for gws.ext.action.storage

"""Storage API."""

import gws
import gws.common.action
import gws.ext.helper.storage
import gws.web.error

import gws.types as t


[docs]class WriteParams(t.Params): entry: t.StorageEntry data: dict
[docs]class WriteResponse(t.Response): entry: t.StorageEntry
[docs]class ReadParams(t.Params): entry: t.StorageEntry
[docs]class ReadResponse(t.Response): entry: t.StorageEntry data: dict
[docs]class DirParams(t.Params): category: str
[docs]class DirResponse(t.Response): entries: t.List[t.StorageEntry] readable: bool writable: bool
[docs]class DeleteParams(t.Params): entry: t.StorageEntry
[docs]class DeleteResponse(t.Params): pass
[docs]class Config(t.WithTypeAndAccess): """Storage action""" pass
[docs]class Object(gws.common.action.Object): @gws.cached_property def storage(self) -> gws.ext.helper.storage.Object: obj: gws.ext.helper.storage.Object = self.root.find_first('gws.ext.helper.storage') return obj
[docs] def api_write(self, req: t.IRequest, p: WriteParams) -> WriteResponse: try: entry = self.storage.write(p.entry, req.user, p.data) except gws.ext.helper.storage.AccessDenied: raise gws.web.error.Forbidden() return WriteResponse(entry=entry)
[docs] def api_delete(self, req: t.IRequest, p: DeleteParams) -> DeleteResponse: try: self.storage.delete(p.entry, req.user) except gws.ext.helper.storage.AccessDenied: raise gws.web.error.Forbidden() return DeleteResponse()
[docs] def api_read(self, req: t.IRequest, p: ReadParams) -> ReadResponse: try: element = self.storage.read(p.entry, req.user) except gws.ext.helper.storage.NotFound: raise gws.web.error.NotFound() except gws.ext.helper.storage.AccessDenied: raise gws.web.error.Forbidden() return ReadResponse(element)
[docs] def api_dir(self, req: t.IRequest, p: DirParams) -> DirResponse: if not self.storage.can_read_category(p.category, req.user): return DirResponse(entries=[], readable=False, writable=False) return DirResponse( entries=self.storage.dir(p.category, req.user), readable=True, writable=self.storage.can_write_category(p.category, req.user), )