From 4698799101b5847d55edc8267db85257a74c3119 Mon Sep 17 00:00:00 2001 From: MiaCY <97990237+MiaCY@users.noreply.github.com> Date: Tue, 2 May 2023 14:41:01 -0700 Subject: [PATCH] docs: add sample and sample test for transfer manager (#1027) * add sample and sample test for transfer manager download blob as chunks concurrently method * chore: modify format for int * chore: refactor transfer manager sample names and tests --------- Co-authored-by: Andrew Gorcester --- samples/snippets/snippets_test.py | 34 +++- samples/snippets/storage_transfer_manager.py | 184 ------------------ ...age_transfer_manager_download_all_blobs.py | 65 +++++++ ...er_manager_download_chunks_concurrently.py | 44 +++++ ...orage_transfer_manager_upload_directory.py | 79 ++++++++ ...rage_transfer_manager_upload_many_blobs.py | 66 +++++++ 6 files changed, 284 insertions(+), 188 deletions(-) delete mode 100644 samples/snippets/storage_transfer_manager.py create mode 100644 samples/snippets/storage_transfer_manager_download_all_blobs.py create mode 100644 samples/snippets/storage_transfer_manager_download_chunks_concurrently.py create mode 100644 samples/snippets/storage_transfer_manager_upload_directory.py create mode 100644 samples/snippets/storage_transfer_manager_upload_many_blobs.py diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index ee6f790f2..6be8e1767 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -72,7 +72,10 @@ import storage_set_bucket_default_kms_key import storage_set_client_endpoint import storage_set_metadata -import storage_transfer_manager +import storage_transfer_manager_download_all_blobs +import storage_transfer_manager_download_chunks_concurrently +import storage_transfer_manager_upload_directory +import storage_transfer_manager_upload_many_blobs import storage_upload_file import storage_upload_from_memory import storage_upload_from_stream @@ -686,7 +689,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): with open(os.path.join(uploads, name), "w") as f: f.write(name) - storage_transfer_manager.upload_many_blobs_with_transfer_manager( + storage_transfer_manager_upload_many_blobs.upload_many_blobs_with_transfer_manager( test_bucket.name, BLOB_NAMES, source_directory="{}/".format(uploads), @@ -699,7 +702,7 @@ def test_transfer_manager_snippets(test_bucket, capsys): with tempfile.TemporaryDirectory() as downloads: # Download the files. - storage_transfer_manager.download_all_blobs_with_transfer_manager( + storage_transfer_manager_download_all_blobs.download_all_blobs_with_transfer_manager( test_bucket.name, destination_directory=os.path.join(downloads, ""), threads=2, @@ -729,7 +732,7 @@ def test_transfer_manager_directory_upload(test_bucket, capsys): with open(os.path.join(uploads, name), "w") as f: f.write(name) - storage_transfer_manager.upload_directory_with_transfer_manager( + storage_transfer_manager_upload_directory.upload_directory_with_transfer_manager( test_bucket.name, source_directory="{}/".format(uploads) ) out, _ = capsys.readouterr() @@ -737,3 +740,26 @@ def test_transfer_manager_directory_upload(test_bucket, capsys): assert "Found {}".format(len(BLOB_NAMES)) in out for name in BLOB_NAMES: assert "Uploaded {}".format(name) in out + + +def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys): + BLOB_NAME = "test_file.txt" + + with tempfile.NamedTemporaryFile() as file: + file.write(b"test") + + storage_upload_file.upload_blob( + test_bucket.name, file.name, BLOB_NAME + ) + + with tempfile.TemporaryDirectory() as downloads: + # Download the file. + storage_transfer_manager_download_chunks_concurrently.download_chunks_concurrently( + test_bucket.name, + BLOB_NAME, + os.path.join(downloads, BLOB_NAME), + processes=8, + ) + out, _ = capsys.readouterr() + + assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py deleted file mode 100644 index 0a02b96e3..000000000 --- a/samples/snippets/storage_transfer_manager.py +++ /dev/null @@ -1,184 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -def upload_many_blobs_with_transfer_manager( - bucket_name, filenames, source_directory="", threads=4 -): - """Upload every file in a list to a bucket, concurrently in a thread pool. - - Each blob name is derived from the filename, not including the - `source_directory` parameter. For complete control of the blob name for each - file (and other aspects of individual blob metadata), use - transfer_manager.upload_many() instead. - """ - - # The ID of your GCS bucket - # bucket_name = "your-bucket-name" - - # A list (or other iterable) of filenames to upload. - # filenames = ["file_1.txt", "file_2.txt"] - - # The directory on your computer that is the root of all of the files in the - # list of filenames. This string is prepended (with os.path.join()) to each - # filename to get the full path to the file. Relative paths and absolute - # paths are both accepted. This string is not included in the name of the - # uploaded blob; it is only used to find the source files. An empty string - # means "the current working directory". Note that this parameter allows - # directory traversal (e.g. "/", "../") and is not intended for unsanitized - # end user input. - # source_directory="" - - # The number of threads to use for the operation. The performance impact of - # this value depends on the use case, but generally, smaller files benefit - # from more threads and larger files don't benefit from more threads. Too - # many threads can slow operations, especially with large files, due to - # contention over the Python GIL. - # threads=4 - - from google.cloud.storage import Client, transfer_manager - - storage_client = Client() - bucket = storage_client.bucket(bucket_name) - - results = transfer_manager.upload_many_from_filenames( - bucket, filenames, source_directory=source_directory, threads=threads - ) - - for name, result in zip(filenames, results): - # The results list is either `None` or an exception for each filename in - # the input list, in order. - - if isinstance(result, Exception): - print("Failed to upload {} due to exception: {}".format(name, result)) - else: - print("Uploaded {} to {}.".format(name, bucket.name)) - - -def upload_directory_with_transfer_manager(bucket_name, source_directory, threads=4): - """Upload every file in a directory, including all files in subdirectories. - - Each blob name is derived from the filename, not including the `directory` - parameter itself. For complete control of the blob name for each file (and - other aspects of individual blob metadata), use - transfer_manager.upload_many() instead. - """ - - # The ID of your GCS bucket - # bucket_name = "your-bucket-name" - - # The directory on your computer to upload. Files in the directory and its - # subdirectories will be uploaded. An empty string means "the current - # working directory". - # source_directory="" - - # The number of threads to use for the operation. The performance impact of - # this value depends on the use case, but generally, smaller files benefit - # from more threads and larger files don't benefit from more threads. Too - # many threads can slow operations, especially with large files, due to - # contention over the Python GIL. - # threads=4 - - from pathlib import Path - - from google.cloud.storage import Client, transfer_manager - - storage_client = Client() - bucket = storage_client.bucket(bucket_name) - - # Generate a list of paths (in string form) relative to the `directory`. - # This can be done in a single list comprehension, but is expanded into - # multiple lines here for clarity. - - # First, recursively get all files in `directory` as Path objects. - directory_as_path_obj = Path(source_directory) - paths = directory_as_path_obj.rglob("*") - - # Filter so the list only includes files, not directories themselves. - file_paths = [path for path in paths if path.is_file()] - - # These paths are relative to the current working directory. Next, make them - # relative to `directory` - relative_paths = [path.relative_to(source_directory) for path in file_paths] - - # Finally, convert them all to strings. - string_paths = [str(path) for path in relative_paths] - - print("Found {} files.".format(len(string_paths))) - - # Start the upload. - results = transfer_manager.upload_many_from_filenames( - bucket, string_paths, source_directory=source_directory, threads=threads - ) - - for name, result in zip(string_paths, results): - # The results list is either `None` or an exception for each filename in - # the input list, in order. - - if isinstance(result, Exception): - print("Failed to upload {} due to exception: {}".format(name, result)) - else: - print("Uploaded {} to {}.".format(name, bucket.name)) - - -def download_all_blobs_with_transfer_manager( - bucket_name, destination_directory="", threads=4 -): - """Download all of the blobs in a bucket, concurrently in a thread pool. - - The filename of each blob once downloaded is derived from the blob name and - the `destination_directory `parameter. For complete control of the filename - of each blob, use transfer_manager.download_many() instead. - - Directories will be created automatically as needed, for instance to - accommodate blob names that include slashes. - """ - - # The ID of your GCS bucket - # bucket_name = "your-bucket-name" - - # The directory on your computer to which to download all of the files. This - # string is prepended (with os.path.join()) to the name of each blob to form - # the full path. Relative paths and absolute paths are both accepted. An - # empty string means "the current working directory". Note that this - # parameter allows accepts directory traversal ("../" etc.) and is not - # intended for unsanitized end user input. - # destination_directory = "" - - # The number of threads to use for the operation. The performance impact of - # this value depends on the use case, but generally, smaller files benefit - # from more threads and larger files don't benefit from more threads. Too - # many threads can slow operations, especially with large files, due to - # contention over the Python GIL. - # threads=4 - - from google.cloud.storage import Client, transfer_manager - - storage_client = Client() - bucket = storage_client.bucket(bucket_name) - - blob_names = [blob.name for blob in bucket.list_blobs()] - - results = transfer_manager.download_many_to_path( - bucket, blob_names, destination_directory=destination_directory, threads=threads - ) - - for name, result in zip(blob_names, results): - # The results list is either `None` or an exception for each blob in - # the input list, in order. - - if isinstance(result, Exception): - print("Failed to download {} due to exception: {}".format(name, result)) - else: - print("Downloaded {} to {}.".format(name, destination_directory + name)) diff --git a/samples/snippets/storage_transfer_manager_download_all_blobs.py b/samples/snippets/storage_transfer_manager_download_all_blobs.py new file mode 100644 index 000000000..b07739d20 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_download_all_blobs.py @@ -0,0 +1,65 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_all_blobs_with_transfer_manager( + bucket_name, destination_directory="", threads=4 +): + """Download all of the blobs in a bucket, concurrently in a thread pool. + + The filename of each blob once downloaded is derived from the blob name and + the `destination_directory `parameter. For complete control of the filename + of each blob, use transfer_manager.download_many() instead. + + Directories will be created automatically as needed, for instance to + accommodate blob names that include slashes. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to which to download all of the files. This + # string is prepended (with os.path.join()) to the name of each blob to form + # the full path. Relative paths and absolute paths are both accepted. An + # empty string means "the current working directory". Note that this + # parameter allows accepts directory traversal ("../" etc.) and is not + # intended for unsanitized end user input. + # destination_directory = "" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob_names = [blob.name for blob in bucket.list_blobs()] + + results = transfer_manager.download_many_to_path( + bucket, blob_names, destination_directory=destination_directory, threads=threads + ) + + for name, result in zip(blob_names, results): + # The results list is either `None` or an exception for each blob in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to download {} due to exception: {}".format(name, result)) + else: + print("Downloaded {} to {}.".format(name, destination_directory + name)) diff --git a/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py new file mode 100644 index 000000000..633c5ae65 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_download_chunks_concurrently.py @@ -0,0 +1,44 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8): + """Download a single file in chunks, concurrently.""" + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The file to be downloaded + # blob_name = "target-file" + + # The destination filename or path + # filename = "" + + # The maximum number of worker processes that should be used to handle the + # workload of downloading the blob concurrently. PROCESS worker type uses more + # system resources (both memory and CPU) and can result in faster operations + # when working with large files. The optimal number of workers depends heavily + # on the specific use case. Refer to the docstring of the underlining method + # for more details. + # processes=8 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes) + + print("Downloaded {} to {}.".format(blob_name, filename)) diff --git a/samples/snippets/storage_transfer_manager_upload_directory.py b/samples/snippets/storage_transfer_manager_upload_directory.py new file mode 100644 index 000000000..6f5171c54 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_upload_directory.py @@ -0,0 +1,79 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def upload_directory_with_transfer_manager(bucket_name, source_directory, threads=4): + """Upload every file in a directory, including all files in subdirectories. + + Each blob name is derived from the filename, not including the `directory` + parameter itself. For complete control of the blob name for each file (and + other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to upload. Files in the directory and its + # subdirectories will be uploaded. An empty string means "the current + # working directory". + # source_directory="" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from pathlib import Path + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + # Generate a list of paths (in string form) relative to the `directory`. + # This can be done in a single list comprehension, but is expanded into + # multiple lines here for clarity. + + # First, recursively get all files in `directory` as Path objects. + directory_as_path_obj = Path(source_directory) + paths = directory_as_path_obj.rglob("*") + + # Filter so the list only includes files, not directories themselves. + file_paths = [path for path in paths if path.is_file()] + + # These paths are relative to the current working directory. Next, make them + # relative to `directory` + relative_paths = [path.relative_to(source_directory) for path in file_paths] + + # Finally, convert them all to strings. + string_paths = [str(path) for path in relative_paths] + + print("Found {} files.".format(len(string_paths))) + + # Start the upload. + results = transfer_manager.upload_many_from_filenames( + bucket, string_paths, source_directory=source_directory, threads=threads + ) + + for name, result in zip(string_paths, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) diff --git a/samples/snippets/storage_transfer_manager_upload_many_blobs.py b/samples/snippets/storage_transfer_manager_upload_many_blobs.py new file mode 100644 index 000000000..995571b22 --- /dev/null +++ b/samples/snippets/storage_transfer_manager_upload_many_blobs.py @@ -0,0 +1,66 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def upload_many_blobs_with_transfer_manager( + bucket_name, filenames, source_directory="", threads=4 +): + """Upload every file in a list to a bucket, concurrently in a thread pool. + + Each blob name is derived from the filename, not including the + `source_directory` parameter. For complete control of the blob name for each + file (and other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # A list (or other iterable) of filenames to upload. + # filenames = ["file_1.txt", "file_2.txt"] + + # The directory on your computer that is the root of all of the files in the + # list of filenames. This string is prepended (with os.path.join()) to each + # filename to get the full path to the file. Relative paths and absolute + # paths are both accepted. This string is not included in the name of the + # uploaded blob; it is only used to find the source files. An empty string + # means "the current working directory". Note that this parameter allows + # directory traversal (e.g. "/", "../") and is not intended for unsanitized + # end user input. + # source_directory="" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + results = transfer_manager.upload_many_from_filenames( + bucket, filenames, source_directory=source_directory, threads=threads + ) + + for name, result in zip(filenames, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name))