Skip to content

Commit

Permalink
docs: Add snippets for upload_chunks_concurrently and add chunk_size (#…
Browse files Browse the repository at this point in the history
…1135)

* docs: Add snippets for upload_chunks_concurrently and add chunk_size

* switch from 'processes' to 'workers' in sample nomenclature

* copyright

* tests
  • Loading branch information
andrewsg authored Oct 9, 2023
1 parent a3a1159 commit 3a0f551
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 34 deletions.
61 changes: 47 additions & 14 deletions samples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import storage_transfer_manager_download_bucket
import storage_transfer_manager_download_chunks_concurrently
import storage_transfer_manager_download_many
import storage_transfer_manager_upload_chunks_concurrently
import storage_transfer_manager_upload_directory
import storage_transfer_manager_upload_many
import storage_upload_file
Expand Down Expand Up @@ -243,7 +244,10 @@ def test_upload_blob_with_kms(test_bucket):
with tempfile.NamedTemporaryFile() as source_file:
source_file.write(b"test")
storage_upload_with_kms_key.upload_blob_with_kms(
test_bucket.name, source_file.name, blob_name, KMS_KEY,
test_bucket.name,
source_file.name,
blob_name,
KMS_KEY,
)
bucket = storage.Client().bucket(test_bucket.name)
kms_blob = bucket.get_blob(blob_name)
Expand Down Expand Up @@ -396,7 +400,10 @@ def test_move_blob(test_bucket_create, test_blob):
print(f"test_move_blob not found in bucket {test_bucket_create.name}")

storage_move_file.move_blob(
bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob",
bucket.name,
test_blob.name,
test_bucket_create.name,
"test_move_blob",
)

assert test_bucket_create.get_blob("test_move_blob") is not None
Expand All @@ -412,7 +419,10 @@ def test_copy_blob(test_blob):
pass

storage_copy_file.copy_blob(
bucket.name, test_blob.name, bucket.name, "test_copy_blob",
bucket.name,
test_blob.name,
bucket.name,
"test_copy_blob",
)

assert bucket.get_blob("test_copy_blob") is not None
Expand Down Expand Up @@ -551,7 +561,10 @@ def test_define_bucket_website_configuration(test_bucket):
def test_object_get_kms_key(test_bucket):
with tempfile.NamedTemporaryFile() as source_file:
storage_upload_with_kms_key.upload_blob_with_kms(
test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY,
test_bucket.name,
source_file.name,
"test_upload_blob_encrypted",
KMS_KEY,
)
kms_key = storage_object_get_kms_key.object_get_kms_key(
test_bucket.name, "test_upload_blob_encrypted"
Expand All @@ -568,7 +581,10 @@ def test_storage_compose_file(test_bucket):

with tempfile.NamedTemporaryFile() as dest_file:
destination = storage_compose_file.compose_file(
test_bucket.name, source_files[0], source_files[1], dest_file.name,
test_bucket.name,
source_files[0],
source_files[1],
dest_file.name,
)
composed = destination.download_as_string()

Expand Down Expand Up @@ -608,7 +624,8 @@ def test_change_default_storage_class(test_bucket, capsys):

def test_change_file_storage_class(test_blob, capsys):
blob = storage_change_file_storage_class.change_file_storage_class(
test_blob.bucket.name, test_blob.name,
test_blob.bucket.name,
test_blob.name,
)
out, _ = capsys.readouterr()
assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out
Expand Down Expand Up @@ -694,7 +711,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
test_bucket.name,
BLOB_NAMES,
source_directory="{}/".format(uploads),
processes=8,
workers=8,
)
out, _ = capsys.readouterr()

Expand All @@ -706,7 +723,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
storage_transfer_manager_download_bucket.download_bucket_with_transfer_manager(
test_bucket.name,
destination_directory=os.path.join(downloads, ""),
processes=8,
workers=8,
max_results=10000,
)
out, _ = capsys.readouterr()
Expand All @@ -720,7 +737,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
test_bucket.name,
blob_names=BLOB_NAMES,
destination_directory=os.path.join(downloads, ""),
processes=8,
workers=8,
)
out, _ = capsys.readouterr()

Expand Down Expand Up @@ -763,18 +780,34 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
with tempfile.NamedTemporaryFile() as file:
file.write(b"test")

storage_upload_file.upload_blob(
test_bucket.name, file.name, BLOB_NAME
)
storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME)

with tempfile.TemporaryDirectory() as downloads:
# Download the file.
storage_transfer_manager_download_chunks_concurrently.download_chunks_concurrently(
test_bucket.name,
BLOB_NAME,
os.path.join(downloads, BLOB_NAME),
processes=8,
workers=8,
)
out, _ = capsys.readouterr()

assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out
assert (
"Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME))
in out
)


def test_transfer_manager_upload_chunks_concurrently(test_bucket, capsys):
BLOB_NAME = "test_file.txt"

with tempfile.NamedTemporaryFile() as file:
file.write(b"test")
file.flush()

storage_transfer_manager_upload_chunks_concurrently.upload_chunks_concurrently(
test_bucket.name, file.name, BLOB_NAME
)

out, _ = capsys.readouterr()
assert "File {} uploaded to {}".format(file.name, BLOB_NAME) in out
9 changes: 5 additions & 4 deletions samples/snippets/storage_transfer_manager_download_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# [START storage_transfer_manager_download_bucket]
def download_bucket_with_transfer_manager(
bucket_name, destination_directory="", processes=8, max_results=1000
bucket_name, destination_directory="", workers=8, max_results=1000
):
"""Download all of the blobs in a bucket, concurrently in a process pool.
Expand All @@ -40,8 +40,9 @@ def download_bucket_with_transfer_manager(
# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8
# some CPU and memory resources until finished. Threads can be used instead
# of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

# The maximum number of results to fetch from bucket.list_blobs(). This
# sample code fetches all of the blobs up to max_results and queues them all
Expand All @@ -60,7 +61,7 @@ def download_bucket_with_transfer_manager(
blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)]

results = transfer_manager.download_many_to_path(
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
bucket, blob_names, destination_directory=destination_directory, max_workers=workers
)

for name, result in zip(blob_names, results):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.

# [START storage_transfer_manager_download_chunks_concurrently]
def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
def download_chunks_concurrently(
bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, workers=8
):
"""Download a single file in chunks, concurrently in a process pool."""

# The ID of your GCS bucket
Expand All @@ -25,19 +27,29 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
# The destination filename or path
# filename = ""

# The size of each chunk. The performance impact of this value depends on
# the use case. The remote service has a minimum of 5 MiB and a maximum of
# 5 GiB.
# chunk_size = 32 * 1024 * 1024 (32 MiB)

# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8
# some CPU and memory resources until finished. Threads can be used instead
# of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)

transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes)
transfer_manager.download_chunks_concurrently(
blob, filename, chunk_size=chunk_size, max_workers=workers
)

print("Downloaded {} to {}.".format(blob_name, filename))


# [END storage_transfer_manager_download_chunks_concurrently]
9 changes: 5 additions & 4 deletions samples/snippets/storage_transfer_manager_download_many.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# [START storage_transfer_manager_download_many]
def download_many_blobs_with_transfer_manager(
bucket_name, blob_names, destination_directory="", processes=8
bucket_name, blob_names, destination_directory="", workers=8
):
"""Download blobs in a list by name, concurrently in a process pool.
Expand Down Expand Up @@ -46,16 +46,17 @@ def download_many_blobs_with_transfer_manager(
# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8
# some CPU and memory resources until finished. Threads can be used instead
# of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)

results = transfer_manager.download_many_to_path(
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
bucket, blob_names, destination_directory=destination_directory, max_workers=workers
)

for name, result in zip(blob_names, results):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START storage_transfer_manager_upload_chunks_concurrently]
def upload_chunks_concurrently(
bucket_name,
source_filename,
destination_blob_name,
chunk_size=32 * 1024 * 1024,
workers=8,
):
"""Upload a single file, in chunks, concurrently in a process pool."""
# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The path to your file to upload
# source_filename = "local/path/to/file"

# The ID of your GCS object
# destination_blob_name = "storage-object-name"

# The size of each chunk. The performance impact of this value depends on
# the use case. The remote service has a minimum of 5 MiB and a maximum of
# 5 GiB.
# chunk_size = 32 * 1024 * 1024 (32 MiB)

# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case. Each additional process
# occupies some CPU and memory resources until finished. Threads can be used
# instead of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

transfer_manager.upload_chunks_concurrently(
source_filename, blob, chunk_size=chunk_size, max_workers=workers
)

print(f"File {source_filename} uploaded to {destination_blob_name}.")


# [END storage_transfer_manager_upload_chunks_concurrently]
9 changes: 5 additions & 4 deletions samples/snippets/storage_transfer_manager_upload_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# [START storage_transfer_manager_upload_directory]
def upload_directory_with_transfer_manager(bucket_name, source_directory, processes=8):
def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8):
"""Upload every file in a directory, including all files in subdirectories.
Each blob name is derived from the filename, not including the `directory`
Expand All @@ -33,8 +33,9 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces
# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8
# some CPU and memory resources until finished. Threads can be used instead
# of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

from pathlib import Path

Expand Down Expand Up @@ -65,7 +66,7 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces

# Start the upload.
results = transfer_manager.upload_many_from_filenames(
bucket, string_paths, source_directory=source_directory, max_workers=processes
bucket, string_paths, source_directory=source_directory, max_workers=workers
)

for name, result in zip(string_paths, results):
Expand Down
9 changes: 5 additions & 4 deletions samples/snippets/storage_transfer_manager_upload_many.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# [START storage_transfer_manager_upload_many]
def upload_many_blobs_with_transfer_manager(
bucket_name, filenames, source_directory="", processes=8
bucket_name, filenames, source_directory="", workers=8
):
"""Upload every file in a list to a bucket, concurrently in a process pool.
Expand Down Expand Up @@ -43,16 +43,17 @@ def upload_many_blobs_with_transfer_manager(
# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8
# some CPU and memory resources until finished. Threads can be used instead
# of processes by passing `worker_type=transfer_manager.THREAD`.
# workers=8

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)

results = transfer_manager.upload_many_from_filenames(
bucket, filenames, source_directory=source_directory, max_workers=processes
bucket, filenames, source_directory=source_directory, max_workers=workers
)

for name, result in zip(filenames, results):
Expand Down

0 comments on commit 3a0f551

Please sign in to comment.