Skip to content

Commit

Permalink
Chunked uploads handling (danielfrg#80)
Browse files Browse the repository at this point in the history
Added basic pruning of stale chunked uploads
  • Loading branch information
pvanliefland committed Aug 31, 2020
1 parent f7111b9 commit 62ad519
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
21 changes: 18 additions & 3 deletions s3contents/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CHUNKS_HANDLING = False

import base64
import time


# Used as a "registry" for uploads.
Expand All @@ -28,9 +29,11 @@ def store_content_chunk(path: str, content: str):
current_value = content_chunks.get()

if path not in current_value:
current_value[path] = []
current_value[path] = {"started_at": time.time(), "chunks": []}

current_value[path].append(base64.b64decode(content.encode("ascii"), validate=True))
current_value[path]["chunks"].append(
base64.b64decode(content.encode("ascii"), validate=True)
)


def assemble_chunks(path: str) -> str:
Expand All @@ -41,11 +44,23 @@ def assemble_chunks(path: str) -> str:
if path not in current_value:
raise ValueError(f"No chunk for path {path}")

return base64.b64encode(b"".join(current_value[path])).decode("ascii")
return base64.b64encode(b"".join(current_value[path]["chunks"])).decode("ascii")


def delete_chunks(path):
"""Should be called once the upload is complete to free the memory"""

current_value = content_chunks.get()
del current_value[path]


def prune_stale_chunks():
"""Called periodically to avoid keeping large objects in memory
when a chunked upload does not finish"""

current_value = content_chunks.get()
now = time.time()

for path, chunk_info in current_value:
if now - chunk_info["started_at"] > 3600:
del current_value[path]
3 changes: 3 additions & 0 deletions s3contents/genericmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CHUNKS_HANDLING,
assemble_chunks,
delete_chunks,
prune_stale_chunks,
store_content_chunk,
)
from s3contents.genericfs import GenericFSError, NoSuchFile
Expand Down Expand Up @@ -278,6 +279,8 @@ def save(self, model, path):
return model

def _save_chunked(self, chunk, model, path):
prune_stale_chunks()

if not CHUNKS_HANDLING:
self.log.error(
"S3contents.GenericManager._save_chunked: not available with Python < 3.7"
Expand Down
9 changes: 9 additions & 0 deletions s3contents/tests/test_gcsmanager_chunked.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time
from tornado.web import HTTPError

from s3contents import GCSContentsManager
Expand Down Expand Up @@ -33,13 +34,21 @@ def make_dir(self, api_path):
)

def test_save(self):
current_value = content_chunks.get()
current_value["stale_file.txt"] = {
"started_at": time.time() - 4000,
"chunks": [],
}

if not CHUNKS_HANDLING:
# Chunked file upload not available for Python < 3.7, we expect a HTTP error
with self.assertRaises(HTTPError):
super().test_save()

super().test_save()

self.assertNotIn("stale_file.txt", current_value)


# This needs to be removed or else we'll run the main IPython tests as well.
del TestLargeFileManager
11 changes: 10 additions & 1 deletion s3contents/tests/test_s3manager_chunked.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest
import time
from tornado.web import HTTPError

from s3contents import S3ContentsManager
from s3contents.chunks import CHUNKS_HANDLING
from s3contents.chunks import CHUNKS_HANDLING, content_chunks
from s3contents.ipycompat import TestLargeFileManager


Expand Down Expand Up @@ -37,13 +38,21 @@ def make_dir(self, api_path):
)

def test_save(self):
current_value = content_chunks.get()
current_value["stale_file.txt"] = {
"started_at": time.time() - 4000,
"chunks": [],
}

if not CHUNKS_HANDLING:
# Chunked file upload not available for Python < 3.7, we expect a HTTP error
with self.assertRaises(HTTPError):
super().test_save()

super().test_save()

self.assertNotIn("stale_file.txt", current_value)


# This needs to be removed or else we'll run the main IPython tests as well.
del TestLargeFileManager

0 comments on commit 62ad519

Please sign in to comment.