"""Manage chunked uploads."""
import os
import shutil
import gws
import gws.tools.json2
import gws.tools.os2
import gws.web.error
import gws.types as t
[docs]class Error(gws.Error):
pass
[docs]class UploadRecord(t.Data):
uid: str
name: str
path: str
[docs]class UploadChunkParams(t.Params):
uid: str
name: str
totalSize: int
content: bytes
chunkNumber: int
chunkCount: int
[docs]class UploadChunkResponse(t.Response):
uid: str
_UPLOAD_DIR = gws.TMP_DIR + '/uploads'
[docs]def upload_chunk(p: UploadChunkParams) -> UploadChunkResponse:
try:
uid = save_chunk(
uid=p.uid,
name=p.name,
content=p.content,
total_size=p.totalSize,
chunk_number=p.chunkNumber,
chunk_count=p.chunkCount
)
return UploadChunkResponse(uid=uid)
except Error as e:
gws.log.error(e)
raise gws.web.error.BadRequest()
[docs]def save_chunk(uid: str, name: str, content: bytes, total_size: int, chunk_number: int, chunk_count: int) -> str:
dir = gws.ensure_dir(_UPLOAD_DIR)
if chunk_number == 1:
uid = gws.random_string(64)
status = t.Data(
name=name,
total_size=total_size,
chunk_count=chunk_count,
)
gws.tools.json2.to_path(f'{dir}/{uid}.json', status)
elif not uid.isalnum():
raise Error(f'upload {uid!r}: invalid uid')
else:
try:
status = t.Data(gws.tools.json2.from_path(f'{dir}/{uid}.json'))
except gws.tools.json2.Error:
status = None
if not status:
raise Error(f'upload {uid!r}: invalid status')
if chunk_number < 1 or chunk_number > status.chunk_count:
raise Error(f'upload {uid!r}: invalid chunk number')
gws.write_file_b(f'{dir}/{uid}.{chunk_number}', content)
return uid
[docs]def get(uid: str) -> UploadRecord:
dir = gws.ensure_dir(_UPLOAD_DIR)
try:
status = t.Data(gws.tools.json2.from_path(f'{dir}/{uid}.json'))
except gws.tools.json2.Error:
status = None
if not status:
raise Error(f'upload {uid!r}: not found')
path = f'{dir}/{uid}.all'
if os.path.isfile(path):
return UploadRecord(uid=uid, path=path, name=status.name)
# @TODO this should use a system lock
chunks = [f'{dir}/{uid}.{n}' for n in range(1, status.chunk_count + 1)]
if not all(os.path.isfile(c) for c in chunks):
raise Error(f'upload {uid!r}: incomplete')
tmp_path = path + '.' + gws.random_string(6)
with open(tmp_path, 'wb') as fp_all:
for c in chunks:
try:
with open(c, 'rb') as fp:
shutil.copyfileobj(fp, fp_all)
except (OSError, IOError) as e:
raise Error(f'upload {uid!r}: read error') from e
if gws.tools.os2.file_size(tmp_path) != status.total_size:
raise Error(f'upload {uid!r}: invalid file size')
try:
os.rename(tmp_path, path)
except OSError:
raise Error(f'upload {uid!r}: move error') from e
for c in chunks:
gws.tools.os2.unlink(c)
return UploadRecord(uid=uid, path=path, name=status.name)
[docs]def delete(uid: str):
dir = gws.ensure_dir(_UPLOAD_DIR)
for p in gws.tools.os2.find_files(dir):
if p.startswith(uid + '.'):
gws.tools.os2.unlink(p)