From aaa08fbb7464361a15e1b14d3fe2bbc4b53c05b4 Mon Sep 17 00:00:00 2001 From: Christoph Kuhnke Date: Wed, 31 Jan 2024 22:19:24 +0100 Subject: [PATCH] #150: Used multipart upload for VM images (#154) --- doc/changes/changes_0.1.0.md | 1 + .../ds/sandbox/lib/aws_access/aws_access.py | 81 ++++++++++++++++++- .../lib/export_vm/rename_s3_objects.py | 2 +- .../lib/update_release/run_update_release.py | 2 +- .../test_ci_s3_transfer_multipart.py | 52 ++++++++++++ test/integration/aws/test_export_vm.py | 2 +- 6 files changed, 136 insertions(+), 4 deletions(-) create mode 100644 test/codebuild/test_ci_s3_transfer_multipart.py diff --git a/doc/changes/changes_0.1.0.md b/doc/changes/changes_0.1.0.md index 2e4ead15..01092c79 100644 --- a/doc/changes/changes_0.1.0.md +++ b/doc/changes/changes_0.1.0.md @@ -32,6 +32,7 @@ Version: 0.1.0 * #75: Changed default port of Jupyter server to 49494 * #145: Add Docker Test Library to prepare Notebook tests * #255: Renamed data science sandbox to exasol-ai-lab +* #150: Used multipart upload for VM images * #145: Added Docker Test Library to prepare Notebook tests * #151: Setup SageMaker Credentials for notebook testing in the CI * #155: Added a Notebook Link Checker to Github Actions diff --git a/exasol/ds/sandbox/lib/aws_access/aws_access.py b/exasol/ds/sandbox/lib/aws_access/aws_access.py index a022c40c..a5ade7b1 100644 --- a/exasol/ds/sandbox/lib/aws_access/aws_access.py +++ b/exasol/ds/sandbox/lib/aws_access/aws_access.py @@ -1,8 +1,12 @@ from functools import wraps -from typing import Optional, Any, List, Dict, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import boto3 import botocore +import os +import humanfriendly + +from tempfile import NamedTemporaryFile from exasol.ds.sandbox.lib.aws_access.ami import Ami from exasol.ds.sandbox.lib.aws_access.cloudformation_stack import CloudformationStack @@ -36,6 +40,26 @@ def wrapper(self, *args, **kwargs): return wrapper +class Progress: + def __init__(self, report_every: str = "50 MB"): + self._report_every = humanfriendly.parse_size(report_every) + self._processed: int = 0 + self._unreported: int = 0 + + def report(self, chunk: int): + self._unreported += chunk + if self._unreported < self._report_every: + return + self._processed += self._unreported + self._unreported = 0 + display = round(self._processed / 1024 / 1024) + LOG.info(f'Transferred {display} MB') + + def reset(self): + self._processed = 0 + self._unreported = 0 + + class AwsAccess(object): def __init__(self, aws_profile: Optional[str], region: Optional[str] = None): self._aws_profile = aws_profile @@ -419,6 +443,61 @@ def copy_s3_object(self, bucket: str, source: str, dest: str): copy_source = {'Bucket': bucket, 'Key': source} cloud_client.copy_object(Bucket=bucket, CopySource=copy_source, Key=dest) + @_log_function_start + def upload_large_s3_object( + self, + bucket: str, + source: str, + dest: str, + progress: Progress = None, + ): + """ + :param source: path in the local filesystem. + """ + cloud_client = self._get_aws_client("s3") + config = boto3.s3.transfer.TransferConfig() + if progress is None: + progress = Progress("50 MB") + cloud_client.upload_file( + source, + bucket, + dest, + Config=config, + Callback=progress.report, + ) + + @_log_function_start + def copy_large_s3_object( + self, + bucket: str, + source: str, + dest: str, + progress: Progress = None, + ): + """ + Copies an s3 object within a bucket but uses a TransferConfig to + process files larger than 5 GB as multi-part. + + The copy operation requires to first download the file from S3 to + local filesystem and then upload it again. + """ + cloud_client = self._get_aws_client("s3") + config = boto3.s3.transfer.TransferConfig() + + if progress is None: + progress = Progress("50 MB") + + callback = progress.report + tmpfile = NamedTemporaryFile(delete=False).name + try: + LOG.info(f"Downloading (large) S3 object {source} to temp file") + cloud_client.download_file(bucket, source, tmpfile, Callback=callback, Config=config) + progress.reset() + LOG.info(f"Uploading (large) temp file to S3 {dest}") + cloud_client.upload_file(tmpfile, bucket, dest, Config=config, Callback=callback) + finally: + os.unlink(tmpfile) + @_log_function_start def delete_s3_object(self, bucket: str, source: str): """ diff --git a/exasol/ds/sandbox/lib/export_vm/rename_s3_objects.py b/exasol/ds/sandbox/lib/export_vm/rename_s3_objects.py index 4c1ebd3c..9d11d030 100644 --- a/exasol/ds/sandbox/lib/export_vm/rename_s3_objects.py +++ b/exasol/ds/sandbox/lib/export_vm/rename_s3_objects.py @@ -39,5 +39,5 @@ def rename_image_in_s3(aws_access: AwsAccess, export_image_task: ExportImageTask vm_image_format=vm_image_format) dest = build_image_destination(prefix=export_image_task.s3_prefix, asset_id=asset_id, vm_image_format=vm_image_format) - aws_access.copy_s3_object(bucket=export_image_task.s3_bucket, source=source, dest=dest) + aws_access.copy_large_s3_object(bucket=export_image_task.s3_bucket, source=source, dest=dest) aws_access.delete_s3_object(bucket=export_image_task.s3_bucket, source=source) diff --git a/exasol/ds/sandbox/lib/update_release/run_update_release.py b/exasol/ds/sandbox/lib/update_release/run_update_release.py index 84823506..da2a280b 100644 --- a/exasol/ds/sandbox/lib/update_release/run_update_release.py +++ b/exasol/ds/sandbox/lib/update_release/run_update_release.py @@ -15,7 +15,7 @@ def run_update_release(aws_access: AwsAccess, gh_access: GithubReleaseAccess, additional_release_notes = render_template("additional_release_notes.jinja") with tempfile.TemporaryDirectory() as temp_dir: artifacts_file = f"{temp_dir}/artifacts.md" - asset_types = tuple( + asset_types = ( AssetTypes.DOCKER, AssetTypes.AMI, AssetTypes.VM_S3, diff --git a/test/codebuild/test_ci_s3_transfer_multipart.py b/test/codebuild/test_ci_s3_transfer_multipart.py new file mode 100644 index 00000000..0635cff4 --- /dev/null +++ b/test/codebuild/test_ci_s3_transfer_multipart.py @@ -0,0 +1,52 @@ +import os +import pytest + +from exasol.ds.sandbox.lib.asset_id import AssetId +from exasol.ds.sandbox.lib.aws_access.aws_access import ( + AwsAccess, + Progress, +) +from exasol.ds.sandbox.lib.vm_bucket.vm_dss_bucket import find_vm_bucket + + +@pytest.fixture +def sample_size_kb(): + return 1024 * 16 + + +@pytest.fixture +def sample_file(tmp_path, sample_size_kb): + """ + Create a sample file of size 6 MB for transfer to S3 bucket. + """ + file = tmp_path / "sample-file.txt" + one_kb = "123456789 " * 102 + "1234" + file.write_text(one_kb * sample_size_kb) + yield file + file.unlink() + +@pytest.mark.skipif(os.environ.get('DSS_RUN_CI_TEST') != 'true', + reason="CI test need to be activated by env variable DSS_RUN_CI_TEST") +def test_s3_transfer_multipart(sample_file): + aws = AwsAccess(None) + bucket = find_vm_bucket(aws) + s3_key = f"{AssetId.BUCKET_PREFIX}-itest-sample-file" + s3_key2 = f"{s3_key}-copy" + progress = Progress("4 MB") + try: + aws.upload_large_s3_object( + bucket, + source=str(sample_file), + dest=s3_key, + progress=progress, + ) + progress.reset() + aws.copy_large_s3_object( + bucket=bucket, + source=s3_key, + dest=s3_key2, + progress=progress, + ) + finally: + aws.delete_s3_object(bucket, s3_key) + aws.delete_s3_object(bucket, s3_key2) diff --git a/test/integration/aws/test_export_vm.py b/test/integration/aws/test_export_vm.py index 01e00fc1..88e24ce0 100644 --- a/test/integration/aws/test_export_vm.py +++ b/test/integration/aws/test_export_vm.py @@ -112,5 +112,5 @@ def test_export_vm(aws_vm_export_mock, default_asset_id, vm_formats_to_test, tes expected_calls_copy.append(call(bucket=TEST_BUCKET_ID, source=source, dest=dest)) expected_calls_delete.append(call(bucket=TEST_BUCKET_ID, source=source)) - assert mock_cast(aws_vm_export_mock.copy_s3_object).call_args_list == expected_calls_copy + assert mock_cast(aws_vm_export_mock.copy_large_s3_object).call_args_list == expected_calls_copy assert mock_cast(aws_vm_export_mock.delete_s3_object).call_args_list == expected_calls_delete