From 1e94498d9d548cbea6466a45dafa3b919c65bd1f Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Fri, 3 Feb 2017 13:32:23 -0800 Subject: [PATCH] Add initial implementation of S3Cache --- setup.py | 1 + superset/assets/version_info.json | 2 +- superset/results_backends.py | 114 ++++++++++++++++++++++++++++-- 3 files changed, 110 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 20dcbc16efebd..d679cb7a32056 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ def get_git_sha(): zip_safe=False, scripts=['superset/bin/superset'], install_requires=[ + 'boto3==1.4.4', 'celery==3.1.23', 'cryptography==1.5.3', 'flask-appbuilder==1.8.1', diff --git a/superset/assets/version_info.json b/superset/assets/version_info.json index cff95f6d46e0f..4be1844c207e6 100644 --- a/superset/assets/version_info.json +++ b/superset/assets/version_info.json @@ -1 +1 @@ -{"GIT_SHA": "2d08e240285288b71df98747ddd4b6cca3220c5a", "version": "0.15.2"} \ No newline at end of file +{"GIT_SHA": "0f7189b859f4a782fd43af694012029645f81b44", "version": "0.15.4"} \ No newline at end of file diff --git a/superset/results_backends.py b/superset/results_backends.py index 714ed66b15c6f..ec0d5b28f0538 100644 --- a/superset/results_backends.py +++ b/superset/results_backends.py @@ -7,24 +7,106 @@ from __future__ import print_function from __future__ import unicode_literals +import cPickle +import logging +import StringIO + +import boto3 from werkzeug.contrib.cache import BaseCache +from superset import app + +config = app.config + class S3Cache(BaseCache): - """S3 cache""" + """S3 cache implementation. + + Adapted from examples in + https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py. + + Timeout parameters are ignored as S3 doesn't support key-level expiration. To expire + keys, set up an expiration policy as described in + https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/. + """ def __init__(self, default_timeout=300): self.default_timeout = default_timeout + self.s3_client = boto3.client('s3') + self.bucket = self.s3_resource.Bucket(config.get('S3_CACHE_BUCKET')) + self.key_prefix = config.get('S3_CACHE_KEY_PREFIX') + def get(self, key): - return None + """Look up key in the cache and return the value for it. + :param key: the key to be looked up. + :returns: The value if it exists and is readable, else ``None``. + """ + if not self._key_exists(key): + return None + else: + value_file = StringIO.StringIO() + + try: + self.s3_client.download_fileobj(self.bucket, self._full_s3_key(key), value_file) + except Exception as e: + logging.warn('Exception while trying to get %s: %s', key, e) + return None + else: + value_file.seek(0) + return cPickle.load(value_file) def delete(self, key): - return True + """Delete `key` from the cache. + :param key: the key to delete. + :returns: Whether the key existed and has been deleted. + :rtype: boolean + """ + if not self._key_exists(key): + return False + else: + try: + response = self.s3_client.delete_objects( + Bucket=self.bucket, + Delete={ + 'Objects': [ + { + 'Key': self._full_s3_key(key) + } + ] + } + ) + except Exception as e: + logging.warn('Exception while trying to delete %s: %s', key, e) + return False + else: + return True def set(self, key, value, timeout=None): - return True + """Add a new key/value to the cache (overwrites value, if key already + exists in the cache). + :param key: the key to set + :param value: the value for the key + :param timeout: the cache timeout for the key in seconds (if not + specified, it uses the default timeout). A timeout of + 0 idicates that the cache never expires. + :returns: ``True`` if key has been updated, ``False`` for backend + errors. Pickling errors, however, will raise a subclass of + ``pickle.PickleError``. + :rtype: boolean + """ + value_file = StringIO.StringIO() + cPickle.dump(value, value_file) + + try: + value_file.seek(0) + self.s3_client.upload_fileobj(value_file, self.bucket, self._full_s3_key(key)) + except Exception as e: + logging.warn('Exception while trying to set %s: %s', key, e) + return False + else: + return True def add(self, key, value, timeout=None): """Works like :meth:`set` but does not overwrite the values of already @@ -38,7 +120,10 @@ def add(self, key, value, timeout=None): existing keys. :rtype: boolean """ - return True + if self._key_exists(key): + return False + else: + return self.set(key, value, timeout=timeout) def clear(self): """Clears the cache. Keep in mind that not all caches support @@ -46,4 +131,21 @@ def clear(self): :returns: Whether the cache has been cleared. :rtype: boolean """ - return True + return False + + def _full_s3_key(self, key): + """Convert a cache key to a full S3 key, including the key prefix.""" + return '%s%s' % (self.key_prefix, key) + + def _key_exists(self, key): + """Determine whether the given key exists in the bucket.""" + try: + response = self.s3_client.head_object( + Bucket=self.bucket, + Key=self._full_s3_key(key) + ) + except Exception as e: + # head_object throws an exception when object doesn't exist + return False + else: + return True