Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
feat: Auto-generate processing table name
Browse files Browse the repository at this point in the history
Work around <aws/aws-cdk#10952> by injecting
`AWS_DEFAULT_REGION`.

Set up production table using
<pynamodb/PynamoDB#922 (comment)>.
  • Loading branch information
l0b0 committed Mar 29, 2021
1 parent 90c9709 commit 5bfecb6
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 89 deletions.
8 changes: 5 additions & 3 deletions backend/check_files_checksums/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from multihash import FUNCS, decode # type: ignore[import]

from ..check import Check
from ..processing_assets_model import ProcessingAssetsModel
from ..processing_assets_model import processing_assets_model_with_meta
from ..types import JsonObject
from ..validation_results_model import ValidationResult, ValidationResultFactory

Expand Down Expand Up @@ -43,14 +43,16 @@ def __init__(
self.validation_result_factory = validation_result_factory
self.logger = logger

self.processing_assets_model = processing_assets_model_with_meta()

def log_failure(self, content: JsonObject) -> None:
self.logger.error(dumps({"success": False, **content}))

def validate(self, hash_key: str, range_key: str) -> None:

try:
item = ProcessingAssetsModel.get(hash_key, range_key=range_key)
except ProcessingAssetsModel.DoesNotExist:
item = self.processing_assets_model.get(hash_key, range_key=range_key)
except self.processing_assets_model.DoesNotExist:
self.log_failure(
{
"error": {"message": "Item does not exist"},
Expand Down
8 changes: 5 additions & 3 deletions backend/check_stac_metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from ..check import Check
from ..log import set_up_logging
from ..processing_assets_model import ProcessingAssetType, ProcessingAssetsModel
from ..processing_assets_model import ProcessingAssetType, processing_assets_model_with_meta
from ..types import JsonObject
from ..validation_results_model import ValidationResult, ValidationResultFactory

Expand Down Expand Up @@ -73,6 +73,8 @@ def __init__(

self.validator = STACSchemaValidator()

self.processing_assets_model = processing_assets_model_with_meta()

def run(self, metadata_url: str) -> None:
s3_url_prefix = "s3://"

Expand Down Expand Up @@ -133,14 +135,14 @@ def get_object(self, url: str) -> Any:

def save(self, key: str) -> None:
for index, metadata_file in enumerate(self.dataset_metadata):
ProcessingAssetsModel(
self.processing_assets_model(
hash_key=key,
range_key=f"{ProcessingAssetType.METADATA.value}#{index}",
url=metadata_file["url"],
).save()

for index, asset in enumerate(self.dataset_assets):
ProcessingAssetsModel(
self.processing_assets_model(
hash_key=key,
range_key=f"{ProcessingAssetType.DATA.value}#{index}",
url=asset["url"],
Expand Down
8 changes: 5 additions & 3 deletions backend/content_iterator/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from jsonschema import validate # type: ignore[import]

from ..processing_assets_model import ProcessingAssetType, ProcessingAssetsModel
from ..processing_assets_model import ProcessingAssetType, processing_assets_model_with_meta
from ..types import JsonObject

# From https://docs.aws.amazon.com/batch/latest/userguide/array_jobs.html
Expand Down Expand Up @@ -51,9 +51,11 @@ def lambda_handler(event: JsonObject, _context: bytes) -> JsonObject:
dataset_id = event["dataset_id"]
version_id = event["version_id"]

asset_count = ProcessingAssetsModel.count(
processing_assets_model = processing_assets_model_with_meta()

asset_count = processing_assets_model.count(
hash_key=f"DATASET#{dataset_id}#VERSION#{version_id}",
range_key_condition=ProcessingAssetsModel.sk.startswith(
range_key_condition=processing_assets_model.sk.startswith(
f"{ProcessingAssetType.DATA.value}#"
),
)
Expand Down
6 changes: 4 additions & 2 deletions backend/import_dataset/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..import_dataset_keys import NEW_KEY_KEY, ORIGINAL_KEY_KEY
from ..log import set_up_logging
from ..parameter_store import ParameterName, get_param
from ..processing_assets_model import ProcessingAssetsModel
from ..processing_assets_model import processing_assets_model_with_meta
from ..types import JsonObject

STS_CLIENT = boto3.client("sts")
Expand Down Expand Up @@ -54,7 +54,9 @@ def lambda_handler(event: JsonObject, _context: bytes) -> JsonObject:
manifest_key = f"manifests/{dataset_version_id}.csv"

with smart_open(f"s3://{storage_bucket_name}/{manifest_key}", "w") as s3_manifest:
for item in ProcessingAssetsModel.query(
processing_assets_model = processing_assets_model_with_meta()

for item in processing_assets_model.query(
f"DATASET#{dataset_id}#VERSION#{dataset_version_id}"
):
logger.debug(dumps({"Adding file to manifest": item.url}))
Expand Down
1 change: 1 addition & 0 deletions backend/parameter_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def _generate_next_value_( # type: ignore[misc,override] # pylint:disable=no-se
DATASET_VERSION_CREATION_STEP_FUNCTION_ARN = auto()
IMPORT_DATASET_FILE_FUNCTION_TASK_ARN = auto()
IMPORT_DATASET_ROLE_ARN = auto()
PROCESSING_ASSETS_TABLE_NAME = auto()
STAGING_BUCKET_NAME = auto()
STORAGE_BUCKET_NAME = auto()

Expand Down
19 changes: 13 additions & 6 deletions backend/processing_assets_model.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
"""Dataset object DynamoDB model."""
from enum import Enum
from os import environ
from typing import Type

from pynamodb.attributes import UnicodeAttribute
from pynamodb.models import Model

from .resources import ResourceName
from .parameter_store import ParameterName, get_param


class ProcessingAssetType(Enum):
DATA = "DATA_ITEM_INDEX"
METADATA = "METADATA_ITEM_INDEX"


class ProcessingAssetsModel(Model):
class Meta: # pylint:disable=too-few-public-methods
table_name = ResourceName.PROCESSING_ASSETS_TABLE_NAME.value
region = "ap-southeast-2" # TODO: don't hardcode region

class ProcessingAssetsModelBase(Model):
pk = UnicodeAttribute(hash_key=True)
sk = UnicodeAttribute(range_key=True)
url = UnicodeAttribute()
multihash = UnicodeAttribute(null=True)


def processing_assets_model_with_meta() -> Type[ProcessingAssetsModelBase]:
class ProcessingAssetsModel(ProcessingAssetsModelBase):
class Meta: # pylint:disable=too-few-public-methods
table_name = get_param(ParameterName.PROCESSING_ASSETS_TABLE_NAME)
region = environ["AWS_DEFAULT_REGION"]

return ProcessingAssetsModel
1 change: 0 additions & 1 deletion backend/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ class ResourceName(Enum):
DATASETS_TABLE_NAME = f"{ENV}-datasets"
DATASET_VERSIONS_ENDPOINT_FUNCTION_NAME = f"{ENV}-dataset-versions-endpoint"
IMPORT_STATUS_ENDPOINT_FUNCTION_NAME = f"{ENV}-import-status-endpoint"
PROCESSING_ASSETS_TABLE_NAME = f"{ENV}-processing-assets"
USERS_ROLE_NAME = f"{ENV}-data-lake-users"
VALIDATION_RESULTS_TABLE_NAME = f"{ENV}-validation-results"
28 changes: 28 additions & 0 deletions infrastructure/constructs/parameter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Iterable

import constructs
from aws_cdk import aws_ssm
from aws_cdk.aws_iam import IGrantable


class Parameter(aws_ssm.StringParameter):
def __init__(
self,
scope: constructs.Construct,
construct_id: str,
*,
string_value: str,
description: str,
parameter_name: str,
readers: Iterable[IGrantable],
) -> None:
super().__init__(
scope,
construct_id,
string_value=string_value,
description=description,
parameter_name=parameter_name,
)

for reader in readers:
self.grant_read(reader)
15 changes: 12 additions & 3 deletions infrastructure/constructs/task_job_definition.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import TYPE_CHECKING, Any

from aws_cdk import aws_batch, aws_ecs, aws_iam
from aws_cdk.core import Construct

if TYPE_CHECKING:
from .batch_submit_job_task import BatchSubmitJobTask
else:
BatchSubmitJobTask = Any


class TaskJobDefinition(aws_batch.JobDefinition):
def __init__(
self,
scope: Construct,
scope: BatchSubmitJobTask,
construct_id: str,
*,
deploy_env: str,
Expand All @@ -28,7 +34,10 @@ def __init__(
job_role=job_role, # type: ignore[arg-type]
memory_limit_mib=batch_job_definition_memory_limit,
vcpus=1,
environment={"DEPLOY_ENV": deploy_env},
environment={
"AWS_DEFAULT_REGION": scope.job_role.stack.region,
"DEPLOY_ENV": deploy_env,
},
)

super().__init__(scope, construct_id, container=container)
54 changes: 37 additions & 17 deletions infrastructure/processing_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aws_cdk import aws_dynamodb, aws_iam, aws_s3, aws_ssm, aws_stepfunctions
from aws_cdk.core import Construct, Stack

from backend.environment import ENV
from backend.parameter_store import ParameterName
from backend.resources import ResourceName
from backend.validation_results_model import ValidationOutcomeIdx
Expand All @@ -14,6 +15,7 @@
from .constructs.batch_submit_job_task import BatchSubmitJobTask
from .constructs.bundled_lambda_function import BundledLambdaFunction
from .constructs.lambda_task import LambdaTask
from .constructs.parameter import Parameter
from .constructs.table import Table


Expand All @@ -38,7 +40,7 @@ def __init__( # pylint: disable=too-many-arguments
# PROCESSING ASSETS TABLE
processing_assets_table = Table(
self,
ResourceName.PROCESSING_ASSETS_TABLE_NAME.value,
f"{ENV}-processing-assets",
deploy_env=deploy_env,
application_layer=application_layer,
)
Expand Down Expand Up @@ -207,13 +209,6 @@ def __init__( # pylint: disable=too-many-arguments
"batchoperations.s3.amazonaws.com"
),
)
import_dataset_role_arn_parameter = aws_ssm.StringParameter(
self,
"import-dataset-role-arn",
description=f"Import dataset role ARN for {deploy_env}",
parameter_name=ParameterName.IMPORT_DATASET_ROLE_ARN.value,
string_value=import_dataset_role.role_arn,
)

import_dataset_file_function = BundledLambdaFunction(
self,
Expand All @@ -222,13 +217,6 @@ def __init__( # pylint: disable=too-many-arguments
application_layer=application_layer,
extra_environment={"DEPLOY_ENV": deploy_env},
)
import_dataset_file_function_arn_parameter = aws_ssm.StringParameter(
self,
"import-dataset-file-function-arn",
description=f"Import dataset file function ARN for {deploy_env}",
parameter_name=ParameterName.IMPORT_DATASET_FILE_FUNCTION_TASK_ARN.value,
string_value=import_dataset_file_function.function_arn,
)

assert import_dataset_file_function.role is not None
for storage_writer in [import_dataset_role, import_dataset_file_function.role]:
Expand Down Expand Up @@ -260,8 +248,6 @@ def __init__( # pylint: disable=too-many-arguments
import_dataset_task.lambda_function.role.add_to_policy(
aws_iam.PolicyStatement(resources=["*"], actions=["s3:CreateJob"])
)
import_dataset_role_arn_parameter.grant_read(import_dataset_task.lambda_function)
import_dataset_file_function_arn_parameter.grant_read(import_dataset_task.lambda_function)

import_dataset_file_function.grant_invoke(import_dataset_role) # type: ignore[arg-type]

Expand All @@ -273,6 +259,40 @@ def __init__( # pylint: disable=too-many-arguments
processing_assets_table.grant_read_data(import_dataset_task.lambda_function)
processing_assets_table.grant(import_dataset_task.lambda_function, "dynamodb:DescribeTable")

# Parameters
Parameter(
self,
"import-dataset-file-function-arn",
string_value=import_dataset_file_function.function_arn,
description=f"Import dataset file function ARN for {deploy_env}",
parameter_name=ParameterName.IMPORT_DATASET_FILE_FUNCTION_TASK_ARN.value,
readers=[import_dataset_task.lambda_function],
)

Parameter(
self,
"import-dataset-role-arn",
string_value=import_dataset_role.role_arn,
description=f"Import dataset role ARN for {deploy_env}",
parameter_name=ParameterName.IMPORT_DATASET_ROLE_ARN.value,
readers=[import_dataset_task.lambda_function],
)

Parameter(
self,
"processing-assets-table-name-parameter",
string_value=processing_assets_table.table_name,
description=f"Processing Assets Table name for {deploy_env}",
parameter_name=ParameterName.PROCESSING_ASSETS_TABLE_NAME.value,
readers=[
check_files_checksums_array_task.job_role, # type: ignore[list-item]
check_files_checksums_single_task.job_role, # type: ignore[list-item]
check_stac_metadata_job_task.job_role, # type: ignore[list-item]
content_iterator_task.lambda_function,
import_dataset_task.lambda_function,
],
)

success_task = aws_stepfunctions.Succeed(self, "success")

############################################################################################
Expand Down
10 changes: 7 additions & 3 deletions tests/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

from backend.content_iterator.task import MAX_ITERATION_SIZE
from backend.dataset_model import DatasetModel
from backend.processing_assets_model import ProcessingAssetsModel
from backend.processing_assets_model import (
ProcessingAssetsModelBase,
processing_assets_model_with_meta,
)
from backend.types import JsonObject
from backend.validation_results_model import ValidationResult, ValidationResultsModel

Expand Down Expand Up @@ -169,15 +172,16 @@ def __init__(
):
prefix = "METADATA" if multihash is None else "DATA"

self._item = ProcessingAssetsModel(
self.processing_assets_model = processing_assets_model_with_meta()
self._item = self.processing_assets_model(
hash_key=asset_id,
range_key=f"{prefix}_ITEM_INDEX#{self.index}",
url=url,
multihash=multihash,
)
ProcessingAsset.index += 1

def __enter__(self) -> ProcessingAssetsModel:
def __enter__(self) -> ProcessingAssetsModelBase:
self._item.save()
return self._item

Expand Down
Loading

0 comments on commit 5bfecb6

Please sign in to comment.