Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Multi Mosaic in DynamoDB Table #127

Merged
merged 18 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions cogeo_mosaic/backends/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cogeo_mosaic.backends.base import BaseBackend
from cogeo_mosaic.backends.utils import find_quadkeys
from cogeo_mosaic.cache import lru_cache
from cogeo_mosaic.errors import _HTTP_EXCEPTIONS, MosaicError, MosaicExists
from cogeo_mosaic.errors import _HTTP_EXCEPTIONS, MosaicError, MosaicExistsError
from cogeo_mosaic.logger import logger
from cogeo_mosaic.mosaic import MosaicJSON
from cogeo_mosaic.utils import bbox_union
Expand Down Expand Up @@ -99,22 +99,34 @@ def _quadkeys(self) -> List[str]:
ProjectionExpression="quadkey",
)
return [
qk["quadkey"]
for qk in resp["Items"]
if qk["quadkey"] != self._metadata_quadkey
item["quadkey"]
for item in resp["Items"]
if item["quadkey"] != self._metadata_quadkey
]

def write(self, overwrite: bool = False, **kwargs: Any):
"""Write mosaicjson document to AWS DynamoDB."""
"""Write mosaicjson document to AWS DynamoDB.

Args:
overwrite (bool): delete old mosaic items inthe Table.
**kwargs (any): Options forwarded to `dynamodb.create_table`

Returns:
dict: dictionary with metadata constructed from the sceneid.

Raises:
MosaicExistsError: If mosaic already exists in the Table.

"""
if not self._table_exists():
self._create_table(**kwargs)

if self._mosaic_exists():
if not overwrite:
raise MosaicExists(
raise MosaicExistsError(
f"Mosaic already exists in {self.table_name}, use `overwrite=True`."
)
self.clean()
self.delete()

items = self._create_items()
self._write_items(items)
Expand All @@ -126,7 +138,11 @@ def _update_quadkey(self, quadkey: str, dataset: List[str]):
)

def _update_metadata(self):
"""Update bounds and center."""
"""Update bounds and center.

Note: `parse_float=Decimal` is required because DynamoDB requires all numbers to be in Decimal type

"""
meta = json.loads(json.dumps(self.metadata), parse_float=Decimal)
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
meta["quadkey"] = self._metadata_quadkey
meta["mosaicId"] = self.mosaic_name
Expand Down Expand Up @@ -176,7 +192,13 @@ def update(
self._update_metadata()

def _create_table(self, billing_mode: str = "PAY_PER_REQUEST", **kwargs: Any):
"""Create DynamoDB Table."""
"""Create DynamoDB Table.

Args:
billing_mode (str): DynamoDB billing mode (default set to PER_REQUEST).
**kwargs (any): Options forwarded to `dynamodb.create_table`

"""
logger.debug(f"Creating {self.table_name} Table.")

# Define schema for primary key
Expand Down Expand Up @@ -208,14 +230,15 @@ def _create_table(self, billing_mode: str = "PAY_PER_REQUEST", **kwargs: Any):
return

def _create_items(self) -> List[Dict]:
"""Create DynamoDB items from Mosaic defintion.

Note: `parse_float=Decimal` is required because DynamoDB requires all numbers to be
in Decimal type (ref: https://blog.ruanbekker.com/blog/2019/02/05/convert-float-to-decimal-data-types-for-boto3-dynamodb-using-python/)

"""
items = []
# Create one metadata item with quadkey=-1
# Convert float to decimal
# https://blog.ruanbekker.com/blog/2019/02/05/convert-float-to-decimal-data-types-for-boto3-dynamodb-using-python/
meta = json.loads(json.dumps(self.metadata), parse_float=Decimal)

meta["quadkey"] = self._metadata_quadkey
meta["mosaicId"] = self.mosaic_name
meta = {"quakdey": self._metadata_quadkey, "mosaicId": self.mosaic_name, **meta}
items.append(meta)

for quadkey, assets in self.mosaic_def.tiles.items():
Expand Down Expand Up @@ -294,9 +317,9 @@ def _mosaic_exists(self) -> bool:

return True if item else False
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved

def clean(self):
"""clean MosaicID from dynamoDB Table."""
logger.debug(f"Deleting items for mosaic {self.mosaic_name}...")
def delete(self):
"""Delete every items for a specific mosaic in the dynamoDB Table."""
vincentsarago marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Deleting all items for mosaic {self.mosaic_name}...")

quadkey_list = self._quadkeys + [self._metadata_quadkey]
with self.table.batch_writer() as batch_writer:
Expand Down
4 changes: 2 additions & 2 deletions cogeo_mosaic/backends/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_assets_from_json,
)
from cogeo_mosaic.cache import lru_cache
from cogeo_mosaic.errors import _FILE_EXCEPTIONS, MosaicError, MosaicExists
from cogeo_mosaic.errors import _FILE_EXCEPTIONS, MosaicError, MosaicExistsError
from cogeo_mosaic.mosaic import MosaicJSON


Expand All @@ -39,7 +39,7 @@ def assets_for_point(self, lng: float, lat: float) -> List[str]:
def write(self, overwrite: bool = False, gzip: bool = None):
"""Write mosaicjson document to a file."""
if not overwrite and os.path.exists(self.path):
raise MosaicExists("Mosaic file already exist, use `overwrite=True`.")
raise MosaicExistsError("Mosaic file already exist, use `overwrite=True`.")

body = self.mosaic_def.dict(exclude_none=True)
with open(self.path, "wb") as f:
Expand Down
4 changes: 2 additions & 2 deletions cogeo_mosaic/backends/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
get_assets_from_json,
)
from cogeo_mosaic.cache import lru_cache
from cogeo_mosaic.errors import _HTTP_EXCEPTIONS, MosaicError, MosaicExists
from cogeo_mosaic.errors import _HTTP_EXCEPTIONS, MosaicError, MosaicExistsError
from cogeo_mosaic.mosaic import MosaicJSON


Expand Down Expand Up @@ -59,7 +59,7 @@ def write(self, overwrite: bool = False, gzip: bool = None, **kwargs: Any):
body = json.dumps(mosaic_doc).encode("utf-8")

if not overwrite and _aws_head_object(self.key, self.bucket):
raise MosaicExists("Mosaic file already exist, use `overwrite=True`.")
raise MosaicExistsError("Mosaic file already exist, use `overwrite=True`.")

_aws_put_data(self.key, self.bucket, body, client=self.client, **kwargs)

Expand Down
4 changes: 4 additions & 0 deletions cogeo_mosaic/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class MosaicExists(MosaicError):
"""MosaicJSON already exists."""


class MosaicExistsError(MosaicError):
"""MosaicJSON already exists."""


_HTTP_EXCEPTIONS = {
401: MosaicAuthError,
403: MosaicAuthError,
Expand Down
6 changes: 3 additions & 3 deletions tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from cogeo_mosaic.backends.stac import _fetch as stac_search
from cogeo_mosaic.backends.stac import default_stac_accessor as stac_accessor
from cogeo_mosaic.backends.utils import _decompress_gz
from cogeo_mosaic.errors import MosaicError, MosaicExists, NoAssetFoundError
from cogeo_mosaic.errors import MosaicError, MosaicExistsError, NoAssetFoundError
from cogeo_mosaic.mosaic import MosaicJSON

mosaic_gz = os.path.join(os.path.dirname(__file__), "fixtures", "mosaic.json.gz")
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_file_backend():
with open("mosaic.json") as f:
m = json.loads(f.read())
assert m["quadkey_zoom"] == 7
with pytest.raises(MosaicExists):
with pytest.raises(MosaicExistsError):
mosaic.write()
mosaic.write(overwrite=True)

Expand Down Expand Up @@ -267,7 +267,7 @@ def test_s3_backend(session):
session.return_value.client.return_value.head_object.return_value = True
with MosaicBackend("s3://mybucket/00000", mosaic_def=mosaic_content) as mosaic:
assert isinstance(mosaic, S3Backend)
with pytest.raises(MosaicExists):
with pytest.raises(MosaicExistsError):
mosaic.write()
session.return_value.client.return_value.get_object.assert_not_called()
session.return_value.client.return_value.head_object.assert_called_once()
Expand Down