From 3189ac4f8cb4aec8a90f41d7f910a19d5a798bf0 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Wed, 17 Nov 2021 17:50:53 -0800 Subject: [PATCH 01/14] fix: add offset of last byte received to retry a streaming download --- google/resumable_media/_download.py | 18 +++++++++++++ google/resumable_media/requests/download.py | 29 ++++++++++++++++----- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index b2bac98c..6358be08 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -139,6 +139,24 @@ def __init__( media_url, stream=stream, start=start, end=end, headers=headers ) self.checksum = checksum + self._bytes_downloaded = 0 + self._expected_checksum = None + self._checksum_object = None + + @property + def bytes_downloaded(self): + """int: Number of bytes that have been downloaded.""" + return self._bytes_downloaded + + @property + def expected_checksum(self): + """str: The expected checksum of the response detected from the ``X-Goog-Hash`` header.""" + return self._expected_checksum + + @property + def checksum_object(self): + """object: The checksum object for the appropriate checksum type.""" + return self._checksum_object def _prepare_request(self): """Prepare the contents of an HTTP request. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index bcedc569..b5f9e5d7 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -86,12 +86,22 @@ def _write_to_stream(self, response): checksum doesn't agree with server-computed checksum. """ - # `_get_expected_checksum()` may return None even if a checksum was - # requested, in which case it will emit an info log _MISSING_CHECKSUM. - # If an invalid checksum type is specified, this will raise ValueError. - expected_checksum, checksum_object = _helpers._get_expected_checksum( - response, self._get_headers, self.media_url, checksum_type=self.checksum - ) + # Retrieve the expected checksum only once for the download request, + # then compute and validate the checksum when the full download completes. + # Retried requests are range requests, and there's no way to detect + # data corruption for that byte range alone. + if self.expected_checksum is None and self.checksum_object is None: + # `_get_expected_checksum()` may return None even if a checksum was + # requested, in which case it will emit an info log _MISSING_CHECKSUM. + # If an invalid checksum type is specified, this will raise ValueError. + expected_checksum, checksum_object = _helpers._get_expected_checksum( + response, self._get_headers, self.media_url, checksum_type=self.checksum + ) + self._expected_checksum = expected_checksum + self._checksum_object = checksum_object + else: + expected_checksum = self.expected_checksum + checksum_object = self.checksum_object with response: # NOTE: In order to handle compressed streams gracefully, we try @@ -104,6 +114,7 @@ def _write_to_stream(self, response): ) for chunk in body_iter: self._stream.write(chunk) + self._bytes_downloaded += len(chunk) local_checksum_object.update(chunk) if expected_checksum is not None: @@ -162,6 +173,12 @@ def consume( # Wrap the request business logic in a function to be retried. def retriable_request(): + # To restart an interrupted download, read from the offset of last byte + # received using a range request, and set object generation query param. + if self.bytes_downloaded > 0: + _download.add_bytes_range(self.bytes_downloaded, self.end, self._headers) + request_kwargs["headers"] = self._headers + result = transport.request(method, url, **request_kwargs) self._process_response(result) From 4a37b8282c41cc3eaec0fcc2523fd7bb3cc04160 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Wed, 12 Jan 2022 12:50:40 -0800 Subject: [PATCH 02/14] add helper method _parse_generation_header --- google/resumable_media/_helpers.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index d4747bf9..1563bd39 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -34,6 +34,7 @@ "which will be used if it is installed." ) _HASH_HEADER = "x-goog-hash" +_GENERATION_HEADER = "x-goog-generation" _MISSING_CHECKSUM = """\ No {checksum_type} checksum was returned from the service while downloading {} (which happens for composite objects), so client-side content integrity @@ -302,6 +303,27 @@ def _get_checksum_object(checksum_type): raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``") +def _parse_generation_header(response, get_headers): + """Parses the generation header from an ``X-Goog-Generation`` value. + + Args: + response (~requests.Response): The HTTP response object. + get_headers (callable: response->dict): returns response headers. + + + Returns: + Optional[long]: The object generation from the response, if it + can be detected from the ``X-Goog-Generation`` header; otherwise, None. + """ + headers = get_headers(response) + object_generation = headers.get(_GENERATION_HEADER, None) + + if object_generation: + return int(object_generation) + + return object_generation + + class _DoNothingHash(object): """Do-nothing hash object. From 2b14f448fb4ff66ba6bf2593e7a17d3914b792bb Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Wed, 12 Jan 2022 12:52:00 -0800 Subject: [PATCH 03/14] add some object generation related logic --- google/resumable_media/_download.py | 44 +++++++++++++++++++++ google/resumable_media/requests/download.py | 22 +++++++++++ 2 files changed, 66 insertions(+) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index 6358be08..5511b92a 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -18,6 +18,13 @@ import http.client import re +from urllib.parse import parse_qs +from urllib.parse import parse_qsl +from urllib.parse import urlencode +from urllib.parse import urlsplit +from urllib.parse import urlunsplit + + from google.resumable_media import _helpers from google.resumable_media import common @@ -142,6 +149,7 @@ def __init__( self._bytes_downloaded = 0 self._expected_checksum = None self._checksum_object = None + self._object_generation = None @property def bytes_downloaded(self): @@ -571,3 +579,39 @@ def _check_for_zero_content_range(response, get_status_code, get_headers): if content_range == _ZERO_CONTENT_RANGE_HEADER: return True return False + + +def query_param_in_media_url(media_url, query_param): + """Retrieve query parameter specified in media url. + + Args: + media_url (str): The URL containing the media to be downloaded. + query_param (str): The query parameter name to retrieve. + + Returns: + str: The query parameter value from the media url if exists, else None. + """ + + _, _, _, query, _ = urlsplit(media_url) + queries = parse_qs(query) + return queries.get(query_param, None) + + +def add_query_parameters(media_url, name_value_pairs): + """Add query parameters to a base url. + + Args: + media_url (str): The URL containing the media to be downloaded. + name_value_pairs (list[tuple[str, str]): Names and values of the query parameters to add. + + Returns: + str: URL with additional query strings appended. + """ + + if len(name_value_pairs) == 0: + return media_url + + scheme, netloc, path, query, frag = urlsplit(media_url) + query = parse_qsl(query) + query.extend(name_value_pairs) + return urlunsplit((scheme, netloc, path, urlencode(query), frag)) diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index b5f9e5d7..5694e3fe 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -171,6 +171,12 @@ def consume( if self._stream is not None: request_kwargs["stream"] = True + # Check if object generation is specified in the media_url request + # to delete: call helper_method and assign self._object_generation + generation_from_url = _download.query_param_in_media_url(url, "generation") + if generation_from_url: + self._object_generation = int(generation_from_url) + # Wrap the request business logic in a function to be retried. def retriable_request(): # To restart an interrupted download, read from the offset of last byte @@ -179,8 +185,24 @@ def retriable_request(): _download.add_bytes_range(self.bytes_downloaded, self.end, self._headers) request_kwargs["headers"] = self._headers + # Set object generation query param to ensure the same object content is requested + if self._object_generation and not generation_from_url: + # do something: call helper method to generate new url + query_param = [("generation", self._object_generation)] + base_url = url + url = _download.add_query_parameters(base_url, query_param) + # if self._object_generation is not None: + # generation_query = "&generation={}".format(self._object_generation) + # url += generation_query + result = transport.request(method, url, **request_kwargs) + # If a generation hasn't been specified, and this is the first response we get, let's record the + # generation. In future requests we'll use this generation as a precondition to avoid data races. + if self._object_generation is None: + self._object_generation = _helpers._parse_generation_header(result, self._get_headers) + # import pdb; pdb.set_trace() + self._process_response(result) if self._stream is not None: From b1895f9d1f88420fd406a5e879143535de2751ef Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Wed, 12 Jan 2022 13:53:02 -0800 Subject: [PATCH 04/14] revise object genertion helper method --- google/resumable_media/_download.py | 16 ++++++++------ google/resumable_media/requests/download.py | 24 ++++++++------------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index 5511b92a..bf8584f6 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -581,20 +581,24 @@ def _check_for_zero_content_range(response, get_status_code, get_headers): return False -def query_param_in_media_url(media_url, query_param): - """Retrieve query parameter specified in media url. +def generation_in_media_url(media_url): + """Retrieve the object generation query param specified in media url. Args: media_url (str): The URL containing the media to be downloaded. - query_param (str): The query parameter name to retrieve. Returns: - str: The query parameter value from the media url if exists, else None. + long: The object generation from the media url if exists, else None. """ _, _, _, query, _ = urlsplit(media_url) - queries = parse_qs(query) - return queries.get(query_param, None) + query_params = parse_qs(query) + object_generation = query_params.get("generation", None) + + if object_generation is not None: + return int(object_generation) + + return object_generation def add_query_parameters(media_url, name_value_pairs): diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 5694e3fe..a50b2250 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -171,11 +171,8 @@ def consume( if self._stream is not None: request_kwargs["stream"] = True - # Check if object generation is specified in the media_url request - # to delete: call helper_method and assign self._object_generation - generation_from_url = _download.query_param_in_media_url(url, "generation") - if generation_from_url: - self._object_generation = int(generation_from_url) + # Assign object generation if generation is specified in the media url + self._object_generation = _download.generation_in_media_url(url) # Wrap the request business logic in a function to be retried. def retriable_request(): @@ -185,23 +182,20 @@ def retriable_request(): _download.add_bytes_range(self.bytes_downloaded, self.end, self._headers) request_kwargs["headers"] = self._headers - # Set object generation query param to ensure the same object content is requested - if self._object_generation and not generation_from_url: - # do something: call helper method to generate new url + # Set object generation query param to ensure the same object content is requested. + if ( + self._object_generation is not None + and _download.generation_in_media_url(self.media_url) is None + ): query_param = [("generation", self._object_generation)] - base_url = url - url = _download.add_query_parameters(base_url, query_param) - # if self._object_generation is not None: - # generation_query = "&generation={}".format(self._object_generation) - # url += generation_query + url = _download.add_query_parameters(self.media_url, query_param) result = transport.request(method, url, **request_kwargs) # If a generation hasn't been specified, and this is the first response we get, let's record the - # generation. In future requests we'll use this generation as a precondition to avoid data races. + # generation. In future requests we'll specify the generation query param to avoid data races. if self._object_generation is None: self._object_generation = _helpers._parse_generation_header(result, self._get_headers) - # import pdb; pdb.set_trace() self._process_response(result) From c04d8ba01381b98fc54ed512b66fa5dc208502ad Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Wed, 12 Jan 2022 15:01:15 -0800 Subject: [PATCH 05/14] fix url variable scope --- google/resumable_media/requests/download.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index a50b2250..d929535a 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -161,7 +161,7 @@ def consume( ValueError: If the current :class:`Download` has already finished. """ - method, url, payload, headers = self._prepare_request() + method, _, payload, headers = self._prepare_request() # NOTE: We assume "payload is None" but pass it along anyway. request_kwargs = { "data": payload, @@ -172,10 +172,12 @@ def consume( request_kwargs["stream"] = True # Assign object generation if generation is specified in the media url - self._object_generation = _download.generation_in_media_url(url) + self._object_generation = _download.generation_in_media_url(self.media_url) # Wrap the request business logic in a function to be retried. def retriable_request(): + url = self.media_url + # To restart an interrupted download, read from the offset of last byte # received using a range request, and set object generation query param. if self.bytes_downloaded > 0: From fee2696318b3231d8a2a6aab682f2142294eb253 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Thu, 13 Jan 2022 13:36:48 -0800 Subject: [PATCH 06/14] handle special cases with decompressive transcoding --- google/resumable_media/_helpers.py | 16 ++++++++++++++++ google/resumable_media/requests/download.py | 13 +++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index 1563bd39..554abed8 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -33,6 +33,7 @@ "implementation. Python 3 has a faster implementation, `google-crc32c`, " "which will be used if it is installed." ) +_CONTENT_ENCODING_HEADER = "Content-Encoding" _HASH_HEADER = "x-goog-hash" _GENERATION_HEADER = "x-goog-generation" _MISSING_CHECKSUM = """\ @@ -324,6 +325,21 @@ def _parse_generation_header(response, get_headers): return object_generation +def _is_decompressive_transcoding(response, get_headers): + """Returns True if the object was served decompressed and the "Content-Encoding" header is "gzip". + See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip + + Args: + response (~requests.Response): The HTTP response object. + get_headers (callable: response->dict): returns response headers. + + Returns: + bool: Returns True if the "Content-Encoding" header is "gzip"; otherwise, False. + """ + headers = get_headers(response) + return headers.get(_CONTENT_ENCODING_HEADER) == "gzip" + + class _DoNothingHash(object): """Do-nothing hash object. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index d929535a..c2d62315 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -181,7 +181,9 @@ def retriable_request(): # To restart an interrupted download, read from the offset of last byte # received using a range request, and set object generation query param. if self.bytes_downloaded > 0: - _download.add_bytes_range(self.bytes_downloaded, self.end, self._headers) + _download.add_bytes_range( + self.bytes_downloaded, self.end, self._headers + ) request_kwargs["headers"] = self._headers # Set object generation query param to ensure the same object content is requested. @@ -197,11 +199,18 @@ def retriable_request(): # If a generation hasn't been specified, and this is the first response we get, let's record the # generation. In future requests we'll specify the generation query param to avoid data races. if self._object_generation is None: - self._object_generation = _helpers._parse_generation_header(result, self._get_headers) + self._object_generation = _helpers._parse_generation_header( + result, self._get_headers + ) self._process_response(result) + # With decompressive transcoding, GCS serves back the whole file regardless of the range request, + # thus we reset the stream position to the start of the stream. + # See more: https://cloud.google.com/storage/docs/transcoding#range, if self._stream is not None: + if _helpers._is_decompressive_transcoding(result, self._get_headers): + self._stream.seek(0) self._write_to_stream(result) return result From 1e81eb7a454ae4222f34ce89a611baa6f25ea2e8 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Thu, 13 Jan 2022 14:33:28 -0800 Subject: [PATCH 07/14] fix helper method --- google/resumable_media/_download.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index bf8584f6..0c4a1ecf 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -596,7 +596,7 @@ def generation_in_media_url(media_url): object_generation = query_params.get("generation", None) if object_generation is not None: - return int(object_generation) + return int(object_generation[0]) return object_generation From 5dea6abf8f53f5fd65356b99ca936ed4a7586e55 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Thu, 13 Jan 2022 15:05:59 -0800 Subject: [PATCH 08/14] move to _helpers --- google/resumable_media/_download.py | 47 --------------------- google/resumable_media/_helpers.py | 46 ++++++++++++++++++++ google/resumable_media/requests/download.py | 6 +-- 3 files changed, 49 insertions(+), 50 deletions(-) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index 0c4a1ecf..1a0ae427 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -18,13 +18,6 @@ import http.client import re -from urllib.parse import parse_qs -from urllib.parse import parse_qsl -from urllib.parse import urlencode -from urllib.parse import urlsplit -from urllib.parse import urlunsplit - - from google.resumable_media import _helpers from google.resumable_media import common @@ -579,43 +572,3 @@ def _check_for_zero_content_range(response, get_status_code, get_headers): if content_range == _ZERO_CONTENT_RANGE_HEADER: return True return False - - -def generation_in_media_url(media_url): - """Retrieve the object generation query param specified in media url. - - Args: - media_url (str): The URL containing the media to be downloaded. - - Returns: - long: The object generation from the media url if exists, else None. - """ - - _, _, _, query, _ = urlsplit(media_url) - query_params = parse_qs(query) - object_generation = query_params.get("generation", None) - - if object_generation is not None: - return int(object_generation[0]) - - return object_generation - - -def add_query_parameters(media_url, name_value_pairs): - """Add query parameters to a base url. - - Args: - media_url (str): The URL containing the media to be downloaded. - name_value_pairs (list[tuple[str, str]): Names and values of the query parameters to add. - - Returns: - str: URL with additional query strings appended. - """ - - if len(name_value_pairs) == 0: - return media_url - - scheme, netloc, path, query, frag = urlsplit(media_url) - query = parse_qsl(query) - query.extend(name_value_pairs) - return urlunsplit((scheme, netloc, path, urlencode(query), frag)) diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index 554abed8..d7a5a849 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -22,6 +22,12 @@ import random import warnings +from urllib.parse import parse_qs +from urllib.parse import parse_qsl +from urllib.parse import urlencode +from urllib.parse import urlsplit +from urllib.parse import urlunsplit + from google.resumable_media import common @@ -325,6 +331,46 @@ def _parse_generation_header(response, get_headers): return object_generation +def _get_generation_from_url(media_url): + """Retrieve the object generation query param specified in the media url. + + Args: + media_url (str): The URL containing the media to be downloaded. + + Returns: + long: The object generation from the media url if exists; otherwise, None. + """ + + _, _, _, query, _ = urlsplit(media_url) + query_params = parse_qs(query) + object_generation = query_params.get("generation", None) + + if object_generation is not None: + return int(object_generation[0]) + + return object_generation + + +def add_query_parameters(media_url, name_value_pairs): + """Add query parameters to a base url. + + Args: + media_url (str): The URL containing the media to be downloaded. + name_value_pairs (list[tuple[str, str]]): Names and values of the query parameters to add. + + Returns: + str: URL with additional query strings appended. + """ + + if len(name_value_pairs) == 0: + return media_url + + scheme, netloc, path, query, frag = urlsplit(media_url) + query = parse_qsl(query) + query.extend(name_value_pairs) + return urlunsplit((scheme, netloc, path, urlencode(query), frag)) + + def _is_decompressive_transcoding(response, get_headers): """Returns True if the object was served decompressed and the "Content-Encoding" header is "gzip". See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index c2d62315..910993b8 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -172,7 +172,7 @@ def consume( request_kwargs["stream"] = True # Assign object generation if generation is specified in the media url - self._object_generation = _download.generation_in_media_url(self.media_url) + self._object_generation = _helpers._get_generation_from_url(self.media_url) # Wrap the request business logic in a function to be retried. def retriable_request(): @@ -189,10 +189,10 @@ def retriable_request(): # Set object generation query param to ensure the same object content is requested. if ( self._object_generation is not None - and _download.generation_in_media_url(self.media_url) is None + and _helpers._get_generation_from_url(self.media_url) is None ): query_param = [("generation", self._object_generation)] - url = _download.add_query_parameters(self.media_url, query_param) + url = _helpers.add_query_parameters(self.media_url, query_param) result = transport.request(method, url, **request_kwargs) From 8b094342e846f18baf5eeb66d27d5386c98fe563 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Fri, 14 Jan 2022 14:46:27 -0800 Subject: [PATCH 09/14] add support to safely resume interrupted raw downloads --- google/resumable_media/_download.py | 15 ---- google/resumable_media/_helpers.py | 15 ++-- google/resumable_media/requests/download.py | 90 +++++++++++++++------ 3 files changed, 73 insertions(+), 47 deletions(-) diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index 1a0ae427..7958e3c0 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -144,21 +144,6 @@ def __init__( self._checksum_object = None self._object_generation = None - @property - def bytes_downloaded(self): - """int: Number of bytes that have been downloaded.""" - return self._bytes_downloaded - - @property - def expected_checksum(self): - """str: The expected checksum of the response detected from the ``X-Goog-Hash`` header.""" - return self._expected_checksum - - @property - def checksum_object(self): - """object: The checksum object for the appropriate checksum type.""" - return self._checksum_object - def _prepare_request(self): """Prepare the contents of an HTTP request. diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index d7a5a849..a9ce3111 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -40,8 +40,8 @@ "which will be used if it is installed." ) _CONTENT_ENCODING_HEADER = "Content-Encoding" -_HASH_HEADER = "x-goog-hash" _GENERATION_HEADER = "x-goog-generation" +_HASH_HEADER = "x-goog-hash" _MISSING_CHECKSUM = """\ No {checksum_type} checksum was returned from the service while downloading {} (which happens for composite objects), so client-side content integrity @@ -317,7 +317,6 @@ def _parse_generation_header(response, get_headers): response (~requests.Response): The HTTP response object. get_headers (callable: response->dict): returns response headers. - Returns: Optional[long]: The object generation from the response, if it can be detected from the ``X-Goog-Generation`` header; otherwise, None. @@ -325,11 +324,11 @@ def _parse_generation_header(response, get_headers): headers = get_headers(response) object_generation = headers.get(_GENERATION_HEADER, None) - if object_generation: + if object_generation is None: + return None + else: return int(object_generation) - return object_generation - def _get_generation_from_url(media_url): """Retrieve the object generation query param specified in the media url. @@ -345,11 +344,11 @@ def _get_generation_from_url(media_url): query_params = parse_qs(query) object_generation = query_params.get("generation", None) - if object_generation is not None: + if object_generation is None: + return None + else: return int(object_generation[0]) - return object_generation - def add_query_parameters(media_url, name_value_pairs): """Add query parameters to a base url. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 910993b8..e310b567 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -90,7 +90,7 @@ def _write_to_stream(self, response): # then compute and validate the checksum when the full download completes. # Retried requests are range requests, and there's no way to detect # data corruption for that byte range alone. - if self.expected_checksum is None and self.checksum_object is None: + if self._expected_checksum is None and self._checksum_object is None: # `_get_expected_checksum()` may return None even if a checksum was # requested, in which case it will emit an info log _MISSING_CHECKSUM. # If an invalid checksum type is specified, this will raise ValueError. @@ -100,8 +100,8 @@ def _write_to_stream(self, response): self._expected_checksum = expected_checksum self._checksum_object = checksum_object else: - expected_checksum = self.expected_checksum - checksum_object = self.checksum_object + expected_checksum = self._expected_checksum + checksum_object = self._checksum_object with response: # NOTE: In order to handle compressed streams gracefully, we try @@ -171,7 +171,7 @@ def consume( if self._stream is not None: request_kwargs["stream"] = True - # Assign object generation if generation is specified in the media url + # Assign object generation if generation is specified in the media url. self._object_generation = _helpers._get_generation_from_url(self.media_url) # Wrap the request business logic in a function to be retried. @@ -180,9 +180,9 @@ def retriable_request(): # To restart an interrupted download, read from the offset of last byte # received using a range request, and set object generation query param. - if self.bytes_downloaded > 0: + if self._bytes_downloaded > 0: _download.add_bytes_range( - self.bytes_downloaded, self.end, self._headers + self._bytes_downloaded, self.end, self._headers ) request_kwargs["headers"] = self._headers @@ -207,7 +207,7 @@ def retriable_request(): # With decompressive transcoding, GCS serves back the whole file regardless of the range request, # thus we reset the stream position to the start of the stream. - # See more: https://cloud.google.com/storage/docs/transcoding#range, + # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: if _helpers._is_decompressive_transcoding(result, self._get_headers): self._stream.seek(0) @@ -267,13 +267,22 @@ def _write_to_stream(self, response): ~google.resumable_media.common.DataCorruption: If the download's checksum doesn't agree with server-computed checksum. """ - - # `_get_expected_checksum()` may return None even if a checksum was - # requested, in which case it will emit an info log _MISSING_CHECKSUM. - # If an invalid checksum type is specified, this will raise ValueError. - expected_checksum, checksum_object = _helpers._get_expected_checksum( - response, self._get_headers, self.media_url, checksum_type=self.checksum - ) + # Retrieve the expected checksum only once for the download request, + # then compute and validate the checksum when the full download completes. + # Retried requests are range requests, and there's no way to detect + # data corruption for that byte range alone. + if self._expected_checksum is None and self._checksum_object is None: + # `_get_expected_checksum()` may return None even if a checksum was + # requested, in which case it will emit an info log _MISSING_CHECKSUM. + # If an invalid checksum type is specified, this will raise ValueError. + expected_checksum, checksum_object = _helpers._get_expected_checksum( + response, self._get_headers, self.media_url, checksum_type=self.checksum + ) + self._expected_checksum = expected_checksum + self._checksum_object = checksum_object + else: + expected_checksum = self._expected_checksum + checksum_object = self._checksum_object with response: body_iter = response.raw.stream( @@ -281,6 +290,7 @@ def _write_to_stream(self, response): ) for chunk in body_iter: self._stream.write(chunk) + self._bytes_downloaded += len(chunk) checksum_object.update(chunk) response._content_consumed = True @@ -329,23 +339,55 @@ def consume( ValueError: If the current :class:`Download` has already finished. """ - method, url, payload, headers = self._prepare_request() + method, _, payload, headers = self._prepare_request() + # NOTE: We assume "payload is None" but pass it along anyway. + request_kwargs = { + "data": payload, + "headers": headers, + "timeout": timeout, + "stream": True, + } + + # Assign object generation if generation is specified in the media url. + self._object_generation = _helpers._get_generation_from_url(self.media_url) # Wrap the request business logic in a function to be retried. def retriable_request(): - # NOTE: We assume "payload is None" but pass it along anyway. - result = transport.request( - method, - url, - data=payload, - headers=headers, - stream=True, - timeout=timeout, - ) + url = self.media_url + + # To restart an interrupted download, read from the offset of last byte + # received using a range request, and set object generation query param. + if self._bytes_downloaded > 0: + _download.add_bytes_range( + self._bytes_downloaded, self.end, self._headers + ) + request_kwargs["headers"] = self._headers + + # Set object generation query param to ensure the same object content is requested. + if ( + self._object_generation is not None + and _helpers._get_generation_from_url(self.media_url) is None + ): + query_param = [("generation", self._object_generation)] + url = _helpers.add_query_parameters(self.media_url, query_param) + + result = transport.request(method, url, **request_kwargs) + + # If a generation hasn't been specified, and this is the first response we get, let's record the + # generation. In future requests we'll specify the generation query param to avoid data races. + if self._object_generation is None: + self._object_generation = _helpers._parse_generation_header( + result, self._get_headers + ) self._process_response(result) + # With decompressive transcoding, GCS serves back the whole file regardless of the range request, + # thus we reset the stream position to the start of the stream. + # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: + if _helpers._is_decompressive_transcoding(result, self._get_headers): + self._stream.seek(0) self._write_to_stream(result) return result From c29efb22c7192782cd914835f1987cdec9364524 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Fri, 14 Jan 2022 16:44:03 -0800 Subject: [PATCH 10/14] add unit tests --- tests/unit/requests/test_download.py | 124 +++++++++++++++++++++++++++ tests/unit/test__helpers.py | 77 +++++++++++++++++ 2 files changed, 201 insertions(+) diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index fd430ef7..bf74259f 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -42,6 +42,7 @@ def test__write_to_stream_no_hash_check(self): assert ret_val is None assert stream.getvalue() == chunk1 + chunk2 + assert download._bytes_downloaded == len(chunk1 + chunk2) # Check mocks. response.__enter__.assert_called_once_with() @@ -66,6 +67,8 @@ def test__write_to_stream_with_hash_check_success(self, checksum): assert ret_val is None assert stream.getvalue() == chunk1 + chunk2 + chunk3 + assert download._bytes_downloaded == len(chunk1 + chunk2 + chunk3) + assert download._checksum_object is not None # Check mocks. response.__enter__.assert_called_once_with() @@ -273,6 +276,64 @@ def test_consume_with_headers(self): # Make sure the headers have been modified. assert headers == {"range": range_bytes} + def test_consume_gets_generation_from_url(self): + GENERATION_VALUE = 1641590104888641 + url = EXAMPLE_URL + f"&generation={GENERATION_VALUE}" + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + + download = download_mod.Download( + url, stream=stream, end=65536, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_response(chunks=chunks, headers=None) + + assert not download.finished + assert download._object_generation is None + + ret_val = download.consume(transport) + + assert download._object_generation == GENERATION_VALUE + assert ret_val is transport.request.return_value + assert stream.getvalue() == b"".join(chunks) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", url, **called_kwargs) + + def test_consume_gets_generation_from_headers(self): + GENERATION_VALUE = 1641590104888641 + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=65536, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + headers = {_helpers._GENERATION_HEADER: GENERATION_VALUE} + transport.request.return_value = _mock_response(chunks=chunks, headers=headers) + + assert not download.finished + assert download._object_generation is None + + ret_val = download.consume(transport) + + assert download._object_generation == GENERATION_VALUE + assert ret_val is transport.request.return_value + assert stream.getvalue() == b"".join(chunks) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + class TestRawDownload(object): def test__write_to_stream_no_hash_check(self): @@ -287,6 +348,7 @@ def test__write_to_stream_no_hash_check(self): assert ret_val is None assert stream.getvalue() == chunk1 + chunk2 + assert download._bytes_downloaded == len(chunk1 + chunk2) # Check mocks. response.__enter__.assert_called_once_with() @@ -313,6 +375,8 @@ def test__write_to_stream_with_hash_check_success(self, checksum): assert ret_val is None assert stream.getvalue() == chunk1 + chunk2 + chunk3 + assert download._bytes_downloaded == len(chunk1 + chunk2 + chunk3) + assert download._checksum_object is not None # Check mocks. response.__enter__.assert_called_once_with() @@ -526,6 +590,66 @@ def test_consume_with_headers(self): # Make sure the headers have been modified. assert headers == {"range": range_bytes} + def test_consume_gets_generation_from_url(self): + GENERATION_VALUE = 1641590104888641 + url = EXAMPLE_URL + f"&generation={GENERATION_VALUE}" + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + + download = download_mod.RawDownload( + url, stream=stream, end=65536, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_raw_response(chunks=chunks, headers=None) + + assert not download.finished + assert download._object_generation is None + + ret_val = download.consume(transport) + + assert download._object_generation == GENERATION_VALUE + assert ret_val is transport.request.return_value + assert stream.getvalue() == b"".join(chunks) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", url, **called_kwargs) + + def test_consume_gets_generation_from_headers(self): + GENERATION_VALUE = 1641590104888641 + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=65536, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + headers = {_helpers._GENERATION_HEADER: GENERATION_VALUE} + transport.request.return_value = _mock_raw_response( + chunks=chunks, headers=headers + ) + + assert not download.finished + assert download._object_generation is None + + ret_val = download.consume(transport) + + assert download._object_generation == GENERATION_VALUE + assert ret_val is transport.request.return_value + assert stream.getvalue() == b"".join(chunks) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + class TestChunkedDownload(object): @staticmethod diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index fed70baf..c4908078 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -408,6 +408,83 @@ def test_md5_multiple_matches(self): assert error.args[2] == [self.MD5_CHECKSUM, another_checksum] +class Test__parse_generation_header(object): + + GENERATION_VALUE = 1641590104888641 + + def test_empty_value(self): + headers = {} + response = _mock_response(headers=headers) + generation_header = _helpers._parse_generation_header(response, _get_headers) + assert generation_header is None + + def test_header_value(self): + headers = {_helpers._GENERATION_HEADER: self.GENERATION_VALUE} + response = _mock_response(headers=headers) + generation_header = _helpers._parse_generation_header(response, _get_headers) + assert generation_header == self.GENERATION_VALUE + + +class Test__is_decompressive_transcoding(object): + def test_empty_value(self): + headers = {} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is False + + def test_gzip_in_header(self): + headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is True + + def test_gzip_not_in_header(self): + headers = {_helpers._CONTENT_ENCODING_HEADER: "br"} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is False + + +class Test__get_generation_from_url(object): + + GENERATION_VALUE = 1641590104888641 + MEDIA_URL = ( + "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object?alt=media" + ) + MEDIA_URL_W_GENERATION = MEDIA_URL + f"&generation={GENERATION_VALUE}" + + def test_empty_value(self): + generation = _helpers._get_generation_from_url(self.MEDIA_URL) + assert generation is None + + def test_generation_in_url(self): + generation = _helpers._get_generation_from_url(self.MEDIA_URL_W_GENERATION) + assert generation == self.GENERATION_VALUE + + +class Test__add_query_parameters(object): + def test_w_empty_list(self): + query_params = [] + MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object" + new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) + assert new_url == MEDIA_URL + + def test_wo_existing_qs(self): + query_params = [("one", "One"), ("two", "Two")] + MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object" + expected = "&".join( + ["{}={}".format(name, value) for name, value in query_params] + ) + new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) + assert new_url == "{}?{}".format(MEDIA_URL, expected) + + def test_w_existing_qs(self): + query_params = [("one", "One"), ("two", "Two")] + MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object?alt=media" + expected = "&".join( + ["{}={}".format(name, value) for name, value in query_params] + ) + new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) + assert new_url == "{}&{}".format(MEDIA_URL, expected) + + def _mock_response(headers): return mock.Mock( headers=headers, From 4967a4e6adc4e416473646131db2db918bae37ea Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Tue, 18 Jan 2022 10:55:29 -0800 Subject: [PATCH 11/14] add more tests --- google/resumable_media/requests/download.py | 8 +- tests/unit/requests/test_download.py | 164 ++++++++++++++++++++ 2 files changed, 170 insertions(+), 2 deletions(-) diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index e310b567..e4baf89f 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -172,7 +172,8 @@ def consume( request_kwargs["stream"] = True # Assign object generation if generation is specified in the media url. - self._object_generation = _helpers._get_generation_from_url(self.media_url) + if self._object_generation is None: + self._object_generation = _helpers._get_generation_from_url(self.media_url) # Wrap the request business logic in a function to be retried. def retriable_request(): @@ -211,6 +212,7 @@ def retriable_request(): if self._stream is not None: if _helpers._is_decompressive_transcoding(result, self._get_headers): self._stream.seek(0) + self._bytes_downloaded = 0 self._write_to_stream(result) return result @@ -349,7 +351,8 @@ def consume( } # Assign object generation if generation is specified in the media url. - self._object_generation = _helpers._get_generation_from_url(self.media_url) + if self._object_generation is None: + self._object_generation = _helpers._get_generation_from_url(self.media_url) # Wrap the request business logic in a function to be retried. def retriable_request(): @@ -388,6 +391,7 @@ def retriable_request(): if self._stream is not None: if _helpers._is_decompressive_transcoding(result, self._get_headers): self._stream.seek(0) + self._bytes_downloaded = 0 self._write_to_stream(result) return result diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index bf74259f..1e0fe8af 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -334,6 +334,87 @@ def test_consume_gets_generation_from_headers(self): } transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + def test_consume_w_object_generation(self): + GENERATION_VALUE = 1641590104888641 + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_response(chunks=chunks, headers=None) + + assert download._object_generation is None + + # Mock a retry operation with object generation retrieved and bytes already downloaded in the stream + download._object_generation = GENERATION_VALUE + offset = 256 + download._bytes_downloaded = offset + download.consume(transport) + + expected_url = EXAMPLE_URL + f"&generation={GENERATION_VALUE}" + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", expected_url, **called_kwargs) + range_bytes = "bytes={:d}-{:d}".format(offset, end) + assert download._headers["range"] == range_bytes + + def test_consume_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_response(chunks=chunks, headers=None) + + assert download._bytes_downloaded == 0 + + # Mock a retry operation with bytes already downloaded in the stream and checksum stored + offset = 256 + download._bytes_downloaded = offset + download._expected_checksum = None + download._checksum_object = _helpers._DoNothingHash() + download.consume(transport) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + range_bytes = "bytes={:d}-{:d}".format(offset, end) + assert download._headers["range"] == range_bytes + + def test_consume_gzip_reset_stream_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream + headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_response(chunks=chunks, headers=headers) + offset = 16 + download._bytes_downloaded = offset + download.consume(transport) + + assert stream.getvalue() == b"".join(chunks) + assert download._bytes_downloaded == len(b"".join(chunks)) + class TestRawDownload(object): def test__write_to_stream_no_hash_check(self): @@ -650,6 +731,89 @@ def test_consume_gets_generation_from_headers(self): } transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + def test_consume_w_object_generation(self): + GENERATION_VALUE = 1641590104888641 + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_raw_response(chunks=chunks, headers=None) + + assert download._object_generation is None + + # Mock a retry operation with object generation retrieved and bytes already downloaded in the stream + download._object_generation = GENERATION_VALUE + offset = 256 + download._bytes_downloaded = offset + download.consume(transport) + + expected_url = EXAMPLE_URL + f"&generation={GENERATION_VALUE}" + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", expected_url, **called_kwargs) + range_bytes = "bytes={:d}-{:d}".format(offset, end) + assert download._headers["range"] == range_bytes + + def test_consume_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _mock_raw_response(chunks=chunks, headers=None) + + assert download._bytes_downloaded == 0 + + # Mock a retry operation with bytes already downloaded in the stream and checksum stored + offset = 256 + download._bytes_downloaded = offset + download._expected_checksum = None + download._checksum_object = _helpers._DoNothingHash() + download.consume(transport) + + called_kwargs = { + "data": None, + "headers": download._headers, + "timeout": EXPECTED_TIMEOUT, + "stream": True, + } + transport.request.assert_called_once_with("GET", EXAMPLE_URL, **called_kwargs) + range_bytes = "bytes={:d}-{:d}".format(offset, end) + assert download._headers["range"] == range_bytes + + def test_consume_gzip__reset_stream_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream + headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_raw_response( + chunks=chunks, headers=headers + ) + offset = 16 + download._bytes_downloaded = offset + download.consume(transport) + + assert stream.getvalue() == b"".join(chunks) + assert download._bytes_downloaded == len(b"".join(chunks)) + class TestChunkedDownload(object): @staticmethod From e28d237037a9993c8f116e49cdfee1b9a6021d1f Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Thu, 27 Jan 2022 21:45:24 -0800 Subject: [PATCH 12/14] update helper method per comments --- google/resumable_media/_helpers.py | 14 +++++++------- google/resumable_media/requests/download.py | 4 ++-- tests/unit/test__helpers.py | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index db09c175..cbbd246a 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -23,7 +23,6 @@ import warnings from urllib.parse import parse_qs -from urllib.parse import parse_qsl from urllib.parse import urlencode from urllib.parse import urlsplit from urllib.parse import urlunsplit @@ -350,24 +349,25 @@ def _get_generation_from_url(media_url): return int(object_generation[0]) -def add_query_parameters(media_url, name_value_pairs): +def add_query_parameters(media_url, query_params): """Add query parameters to a base url. Args: media_url (str): The URL containing the media to be downloaded. - name_value_pairs (list[tuple[str, str]]): Names and values of the query parameters to add. + query_params (dict): Names and values of the query parameters to add. Returns: str: URL with additional query strings appended. """ - if len(name_value_pairs) == 0: + if len(query_params) == 0: return media_url scheme, netloc, path, query, frag = urlsplit(media_url) - query = parse_qsl(query) - query.extend(name_value_pairs) - return urlunsplit((scheme, netloc, path, urlencode(query), frag)) + params = parse_qs(query) + new_params = {**params, **query_params} + query = urlencode(new_params, doseq=True) + return urlunsplit((scheme, netloc, path, query, frag)) def _is_decompressive_transcoding(response, get_headers): diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 75656fda..be6968a8 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -192,7 +192,7 @@ def retriable_request(): self._object_generation is not None and _helpers._get_generation_from_url(self.media_url) is None ): - query_param = [("generation", self._object_generation)] + query_param = {"generation": self._object_generation} url = _helpers.add_query_parameters(self.media_url, query_param) result = transport.request(method, url, **request_kwargs) @@ -371,7 +371,7 @@ def retriable_request(): self._object_generation is not None and _helpers._get_generation_from_url(self.media_url) is None ): - query_param = [("generation", self._object_generation)] + query_param = {"generation": self._object_generation} url = _helpers.add_query_parameters(self.media_url, query_param) result = transport.request(method, url, **request_kwargs) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index e7585571..89ef74a6 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -461,25 +461,25 @@ def test_generation_in_url(self): class Test__add_query_parameters(object): def test_w_empty_list(self): - query_params = [] + query_params = {} MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object" new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) assert new_url == MEDIA_URL def test_wo_existing_qs(self): - query_params = [("one", "One"), ("two", "Two")] + query_params = {"one": "One", "two": "Two"} MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object" expected = "&".join( - ["{}={}".format(name, value) for name, value in query_params] + ["{}={}".format(name, value) for name, value in query_params.items()] ) new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) assert new_url == "{}?{}".format(MEDIA_URL, expected) def test_w_existing_qs(self): - query_params = [("one", "One"), ("two", "Two")] + query_params = {"one": "One", "two": "Two"} MEDIA_URL = "https://storage.googleapis.com/storage/v1/b/my-bucket/o/my-object?alt=media" expected = "&".join( - ["{}={}".format(name, value) for name, value in query_params] + ["{}={}".format(name, value) for name, value in query_params.items()] ) new_url = _helpers.add_query_parameters(MEDIA_URL, query_params) assert new_url == "{}&{}".format(MEDIA_URL, expected) From d57991844e940c7c2196ad87218406729c8be4f7 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Fri, 28 Jan 2022 17:01:35 -0800 Subject: [PATCH 13/14] address comments on handling stream seek error --- google/resumable_media/requests/download.py | 22 ++++++++++- tests/unit/requests/test_download.py | 42 ++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index be6968a8..0bf0803d 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -36,6 +36,16 @@ {} """ +_STREAM_SEEK_ERROR = """\ +Incomplete download for: + +{} + +Error writing to stream while handling a gzip-compressed file download. + +Please restart the download. +""" + class Download(_request_helpers.RequestsMixin, _download.Download): """Helper to manage downloading a resource from a Google API. @@ -211,7 +221,11 @@ def retriable_request(): # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: if _helpers._is_decompressive_transcoding(result, self._get_headers): - self._stream.seek(0) + try: + self._stream.seek(0) + except Exception as exc: + msg = _STREAM_SEEK_ERROR.format(url) + raise Exception(msg) from exc self._bytes_downloaded = 0 self._write_to_stream(result) @@ -390,7 +404,11 @@ def retriable_request(): # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: if _helpers._is_decompressive_transcoding(result, self._get_headers): - self._stream.seek(0) + try: + self._stream.seek(0) + except Exception as exc: + msg = _STREAM_SEEK_ERROR.format(url) + raise Exception(msg) from exc self._bytes_downloaded = 0 self._write_to_stream(result) diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index 38ded2cd..0da02417 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -415,6 +415,25 @@ def test_consume_gzip_reset_stream_w_bytes_downloaded(self): assert stream.getvalue() == b"".join(chunks) assert download._bytes_downloaded == len(b"".join(chunks)) + def test_consume_gzip_reset_stream_error(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a stream seek error while resuming a decompressive transcoding download + stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) + headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_response(chunks=chunks, headers=headers) + offset = 16 + download._bytes_downloaded = offset + with pytest.raises(Exception): + download.consume(transport) + class TestRawDownload(object): def test__write_to_stream_no_hash_check(self): @@ -792,7 +811,7 @@ def test_consume_w_bytes_downloaded(self): range_bytes = "bytes={:d}-{:d}".format(offset, end) assert download._headers["range"] == range_bytes - def test_consume_gzip__reset_stream_w_bytes_downloaded(self): + def test_consume_gzip_reset_stream_w_bytes_downloaded(self): stream = io.BytesIO() chunks = (b"up down ", b"charlie ", b"brown") end = 65536 @@ -814,6 +833,27 @@ def test_consume_gzip__reset_stream_w_bytes_downloaded(self): assert stream.getvalue() == b"".join(chunks) assert download._bytes_downloaded == len(b"".join(chunks)) + def test_consume_gzip_reset_stream_error(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a stream seek error while resuming a decompressive transcoding download + stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) + headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_raw_response( + chunks=chunks, headers=headers + ) + offset = 16 + download._bytes_downloaded = offset + with pytest.raises(Exception): + download.consume(transport) + class TestChunkedDownload(object): @staticmethod From 9a04788bb81691a854a9598574186750e168d584 Mon Sep 17 00:00:00 2001 From: Cathy Ouyang Date: Thu, 10 Feb 2022 16:52:49 -0800 Subject: [PATCH 14/14] address comments on moving transcoding feature --- google/resumable_media/_helpers.py | 16 ---- google/resumable_media/requests/download.py | 30 -------- tests/unit/requests/test_download.py | 82 --------------------- tests/unit/test__helpers.py | 17 ----- 4 files changed, 145 deletions(-) diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index cbbd246a..2043d19d 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -38,7 +38,6 @@ "implementation. Python 3 has a faster implementation, `google-crc32c`, " "which will be used if it is installed." ) -_CONTENT_ENCODING_HEADER = "Content-Encoding" _GENERATION_HEADER = "x-goog-generation" _HASH_HEADER = "x-goog-hash" _MISSING_CHECKSUM = """\ @@ -370,21 +369,6 @@ def add_query_parameters(media_url, query_params): return urlunsplit((scheme, netloc, path, query, frag)) -def _is_decompressive_transcoding(response, get_headers): - """Returns True if the object was served decompressed and the "Content-Encoding" header is "gzip". - See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip - - Args: - response (~requests.Response): The HTTP response object. - get_headers (callable: response->dict): returns response headers. - - Returns: - bool: Returns True if the "Content-Encoding" header is "gzip"; otherwise, False. - """ - headers = get_headers(response) - return headers.get(_CONTENT_ENCODING_HEADER) == "gzip" - - class _DoNothingHash(object): """Do-nothing hash object. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 0bf0803d..58de0100 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -36,16 +36,6 @@ {} """ -_STREAM_SEEK_ERROR = """\ -Incomplete download for: - -{} - -Error writing to stream while handling a gzip-compressed file download. - -Please restart the download. -""" - class Download(_request_helpers.RequestsMixin, _download.Download): """Helper to manage downloading a resource from a Google API. @@ -216,17 +206,7 @@ def retriable_request(): self._process_response(result) - # With decompressive transcoding, GCS serves back the whole file regardless of the range request, - # thus we reset the stream position to the start of the stream. - # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: - if _helpers._is_decompressive_transcoding(result, self._get_headers): - try: - self._stream.seek(0) - except Exception as exc: - msg = _STREAM_SEEK_ERROR.format(url) - raise Exception(msg) from exc - self._bytes_downloaded = 0 self._write_to_stream(result) return result @@ -399,17 +379,7 @@ def retriable_request(): self._process_response(result) - # With decompressive transcoding, GCS serves back the whole file regardless of the range request, - # thus we reset the stream position to the start of the stream. - # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: - if _helpers._is_decompressive_transcoding(result, self._get_headers): - try: - self._stream.seek(0) - except Exception as exc: - msg = _STREAM_SEEK_ERROR.format(url) - raise Exception(msg) from exc - self._bytes_downloaded = 0 self._write_to_stream(result) return result diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index 0da02417..210973d7 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -395,45 +395,6 @@ def test_consume_w_bytes_downloaded(self): range_bytes = "bytes={:d}-{:d}".format(offset, end) assert download._headers["range"] == range_bytes - def test_consume_gzip_reset_stream_w_bytes_downloaded(self): - stream = io.BytesIO() - chunks = (b"up down ", b"charlie ", b"brown") - end = 65536 - - download = download_mod.Download( - EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" - ) - transport = mock.Mock(spec=["request"]) - - # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream - headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} - transport.request.return_value = _mock_response(chunks=chunks, headers=headers) - offset = 16 - download._bytes_downloaded = offset - download.consume(transport) - - assert stream.getvalue() == b"".join(chunks) - assert download._bytes_downloaded == len(b"".join(chunks)) - - def test_consume_gzip_reset_stream_error(self): - stream = io.BytesIO() - chunks = (b"up down ", b"charlie ", b"brown") - end = 65536 - - download = download_mod.Download( - EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" - ) - transport = mock.Mock(spec=["request"]) - - # Mock a stream seek error while resuming a decompressive transcoding download - stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) - headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} - transport.request.return_value = _mock_response(chunks=chunks, headers=headers) - offset = 16 - download._bytes_downloaded = offset - with pytest.raises(Exception): - download.consume(transport) - class TestRawDownload(object): def test__write_to_stream_no_hash_check(self): @@ -811,49 +772,6 @@ def test_consume_w_bytes_downloaded(self): range_bytes = "bytes={:d}-{:d}".format(offset, end) assert download._headers["range"] == range_bytes - def test_consume_gzip_reset_stream_w_bytes_downloaded(self): - stream = io.BytesIO() - chunks = (b"up down ", b"charlie ", b"brown") - end = 65536 - - download = download_mod.RawDownload( - EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" - ) - transport = mock.Mock(spec=["request"]) - - # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream - headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} - transport.request.return_value = _mock_raw_response( - chunks=chunks, headers=headers - ) - offset = 16 - download._bytes_downloaded = offset - download.consume(transport) - - assert stream.getvalue() == b"".join(chunks) - assert download._bytes_downloaded == len(b"".join(chunks)) - - def test_consume_gzip_reset_stream_error(self): - stream = io.BytesIO() - chunks = (b"up down ", b"charlie ", b"brown") - end = 65536 - - download = download_mod.RawDownload( - EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" - ) - transport = mock.Mock(spec=["request"]) - - # Mock a stream seek error while resuming a decompressive transcoding download - stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) - headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} - transport.request.return_value = _mock_raw_response( - chunks=chunks, headers=headers - ) - offset = 16 - download._bytes_downloaded = offset - with pytest.raises(Exception): - download.consume(transport) - class TestChunkedDownload(object): @staticmethod diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 89ef74a6..feedeb18 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -425,23 +425,6 @@ def test_header_value(self): assert generation_header == self.GENERATION_VALUE -class Test__is_decompressive_transcoding(object): - def test_empty_value(self): - headers = {} - response = _mock_response(headers=headers) - assert _helpers._is_decompressive_transcoding(response, _get_headers) is False - - def test_gzip_in_header(self): - headers = {_helpers._CONTENT_ENCODING_HEADER: "gzip"} - response = _mock_response(headers=headers) - assert _helpers._is_decompressive_transcoding(response, _get_headers) is True - - def test_gzip_not_in_header(self): - headers = {_helpers._CONTENT_ENCODING_HEADER: "br"} - response = _mock_response(headers=headers) - assert _helpers._is_decompressive_transcoding(response, _get_headers) is False - - class Test__get_generation_from_url(object): GENERATION_VALUE = 1641590104888641