From dc16fc541445940bad55742c10cc22af25a1279b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 7 Apr 2019 01:41:33 +0100 Subject: [PATCH 01/11] [AIRFLOW-4255] Replaces Discovery based api with client based for GCS --- airflow/contrib/hooks/gcs_hook.py | 509 ++++++++++-------------------- setup.py | 1 + 2 files changed, 173 insertions(+), 337 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 825c82cbef44f..9569727b79267 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -18,17 +18,13 @@ # under the License. # -from googleapiclient.discovery import build -from googleapiclient.http import MediaFileUpload -from googleapiclient.errors import HttpError +from google.cloud import storage from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.exceptions import AirflowException import gzip as gz import shutil -import re -import os class GoogleCloudStorageHook(GoogleCloudBaseHook): @@ -37,6 +33,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): connection. """ + _conn = None + def __init__(self, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None): @@ -47,9 +45,11 @@ def get_conn(self): """ Returns a Google Cloud Storage service object. """ - http_authorized = self._authorize() - return build( - 'storage', 'v1', http=http_authorized, cache_discovery=False) + if not self._conn: + self._conn = storage.Client(credentials=self._get_credentials(), + project=self.project_id) + + return self._conn # pylint:disable=redefined-builtin def copy(self, source_bucket, source_object, destination_bucket=None, @@ -83,19 +83,18 @@ def copy(self, source_bucket, source_object, destination_bucket=None, if not source_bucket or not source_object: raise ValueError('source_bucket and source_object cannot be empty.') - service = self.get_conn() - try: - service \ - .objects() \ - .copy(sourceBucket=source_bucket, sourceObject=source_object, - destinationBucket=destination_bucket, - destinationObject=destination_object, body='') \ - .execute() - return True - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise + client = self.get_conn() + source_bucket = client.get_bucket(source_bucket) + source_object = source_bucket.blob(source_object) + destination_bucket = client.get_bucket(destination_bucket) + destination_object = source_bucket.copy_blob( + blob=source_object, + destination_bucket=destination_bucket, + new_name=destination_object) + + self.log.info('Object %s in bucket %s copied to object %s in bucket %s', + source_object.name, source_bucket.name, + destination_object.name, destination_bucket.name) def rewrite(self, source_bucket, source_object, destination_bucket, destination_object=None): @@ -126,29 +125,30 @@ def rewrite(self, source_bucket, source_object, destination_bucket, if not source_bucket or not source_object: raise ValueError('source_bucket and source_object cannot be empty.') - service = self.get_conn() - request_count = 1 - try: - result = service.objects() \ - .rewrite(sourceBucket=source_bucket, sourceObject=source_object, - destinationBucket=destination_bucket, - destinationObject=destination_object, body='') \ - .execute() - self.log.info('Rewrite request #%s: %s', request_count, result) - while not result['done']: - request_count += 1 - result = service.objects() \ - .rewrite(sourceBucket=source_bucket, sourceObject=source_object, - destinationBucket=destination_bucket, - destinationObject=destination_object, - rewriteToken=result['rewriteToken'], body='') \ - .execute() - self.log.info('Rewrite request #%s: %s', request_count, result) - return True - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise + client = self.get_conn() + source_bucket = client.get_bucket(bucket_name=source_bucket) + source_object = source_bucket.blob(blob_name=source_object) + destination_bucket = client.get_bucket(bucket_name=destination_bucket) + + token, bytes_rewritten, total_bytes = destination_bucket.blob( + blob_name=destination_object).rewrite( + source=source_object + ) + + self.log.info('Total Bytes: %s | Bytes Written: %s', + total_bytes, bytes_rewritten) + + while token is not None: + token, bytes_rewritten, total_bytes = destination_bucket.blob( + blob_name=destination_object).rewrite( + source=source_object, token=token + ) + + self.log.info('Total Bytes: %s | Bytes Written: %s', + total_bytes, bytes_rewritten) + self.log.info('Object %s in bucket %s copied to object %s in bucket %s', + source_object.name, source_bucket.name, + destination_object, destination_bucket.name) # pylint:disable=redefined-builtin def download(self, bucket, object, filename=None): @@ -162,24 +162,20 @@ def download(self, bucket, object, filename=None): :param filename: If set, a local file path where the file should be written to. :type filename: str """ - service = self.get_conn() - downloaded_file_bytes = service \ - .objects() \ - .get_media(bucket=bucket, object=object) \ - .execute() + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) - # Write the file to local file path, if requested. if filename: - write_argument = 'wb' if isinstance(downloaded_file_bytes, bytes) else 'w' - with open(filename, write_argument) as file_fd: - file_fd.write(downloaded_file_bytes) - - return downloaded_file_bytes + blob.download_to_filename(filename) + self.log.info('File downloaded to %s', filename) + # MAX XCOM Size is 48KB + # https://github.com/apache/airflow/pull/1618#discussion_r68249677 + if blob.size < 49344: + return blob.download_as_string() # pylint:disable=redefined-builtin - def upload(self, bucket, object, filename, - mime_type='application/octet-stream', gzip=False, - multipart=False, num_retries=0): + def upload(self, bucket, object, filename, gzip=False): """ Uploads a local file to Google Cloud Storage. @@ -189,20 +185,9 @@ def upload(self, bucket, object, filename, :type object: str :param filename: The local file path to the file to be uploaded. :type filename: str - :param mime_type: The MIME type to set when uploading the file. - :type mime_type: str :param gzip: Option to compress file for upload :type gzip: bool - :param multipart: If True, the upload will be split into multiple HTTP requests. The - default size is 256MiB per request. Pass a number instead of True to - specify the request size, which must be a multiple of 262144 (256KiB). - :type multipart: bool or int - :param num_retries: The number of times to attempt to re-upload the file (or individual - chunks, in the case of multipart uploads). Retries are attempted - with exponential backoff. - :type num_retries: int """ - service = self.get_conn() if gzip: filename_gz = filename + '.gz' @@ -212,44 +197,11 @@ def upload(self, bucket, object, filename, shutil.copyfileobj(f_in, f_out) filename = filename_gz - try: - if multipart: - if multipart is True: - chunksize = 256 * 1024 * 1024 - else: - chunksize = multipart - - if chunksize % (256 * 1024) > 0 or chunksize < 0: - raise ValueError("Multipart size is not a multiple of 262144 (256KiB)") - - media = MediaFileUpload(filename, mimetype=mime_type, - chunksize=chunksize, resumable=True) - - request = service.objects().insert(bucket=bucket, name=object, media_body=media) - response = None - while response is None: - status, response = request.next_chunk(num_retries=num_retries) - if status: - self.log.info("Upload progress %.1f%%", status.progress() * 100) - - else: - media = MediaFileUpload(filename, mime_type) - - service \ - .objects() \ - .insert(bucket=bucket, name=object, media_body=media) \ - .execute(num_retries=num_retries) - - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise - - finally: - if gzip: - os.remove(filename) - - return True + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.upload_from_filename(filename=filename) + self.log.info('File %s uploaded to %s in %s bucket', filename, object, bucket) # pylint:disable=redefined-builtin def exists(self, bucket, object): @@ -262,17 +214,10 @@ def exists(self, bucket, object): storage bucket. :type object: str """ - service = self.get_conn() - try: - service \ - .objects() \ - .get(bucket=bucket, object=object) \ - .execute() - return True - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + return blob.exists() # pylint:disable=redefined-builtin def is_updated_after(self, bucket, object, ts): @@ -287,57 +232,41 @@ def is_updated_after(self, bucket, object, ts): :param ts: The timestamp to check against. :type ts: datetime.datetime """ - service = self.get_conn() - try: - response = (service - .objects() - .get(bucket=bucket, object=object) - .execute()) - - if 'updated' in response: - import dateutil.parser - import dateutil.tz + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.reload() - if not ts.tzinfo: - ts = ts.replace(tzinfo=dateutil.tz.tzutc()) + blob_update_time = blob.updated - updated = dateutil.parser.parse(response['updated']) - self.log.info("Verify object date: %s > %s", updated, ts) + if blob_update_time is not None: + import dateutil.tz - if updated > ts: - return True + if not ts.tzinfo: + ts = ts.replace(tzinfo=dateutil.tz.tzutc()) - except HttpError as ex: - if ex.resp['status'] != '404': - raise + self.log.info("Verify object date: %s > %s", blob_update_time, ts) - return False + if blob_update_time > ts: + return True + else: + return False - def delete(self, bucket, object, generation=None): + def delete(self, bucket, object): """ - Delete an object if versioning is not enabled for the bucket, or if generation - parameter is used. + Deletes an object from the bucket. :param bucket: name of the bucket, where the object resides :type bucket: str :param object: name of the object to delete :type object: str - :param generation: if present, permanently delete the object of this generation - :type generation: str - :return: True if succeeded - """ - service = self.get_conn() - - try: - service \ - .objects() \ - .delete(bucket=bucket, object=object, generation=generation) \ - .execute() - return True - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise + """ + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.delete() + + self.log.info('Blob %s deleted.', object) def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None): """ @@ -356,38 +285,32 @@ def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=No :type delimiter: str :return: a stream of object names matching the filtering criteria """ - service = self.get_conn() + client = self.get_conn() + bucket = client.get_bucket(bucket) - ids = list() + ids = [] pageToken = None while True: - response = service.objects().list( - bucket=bucket, - versions=versions, - maxResults=maxResults, - pageToken=pageToken, + blobs = bucket.list_blobs( + max_results=maxResults, + page_token=pageToken, prefix=prefix, - delimiter=delimiter - ).execute() + delimiter=delimiter, + versions=versions + ) - if 'prefixes' not in response: - if 'items' not in response: - self.log.info("No items found for prefix: %s", prefix) - break + blob_names = [] + for blob in blobs: + blob_names.append(blob.name) - for item in response['items']: - if item and 'name' in item: - ids.append(item['name']) + prefixes = blobs.prefixes + if prefixes: + ids += list(prefixes) else: - for item in response['prefixes']: - ids.append(item) + ids += blob_names - if 'nextPageToken' not in response: - # no further pages of results, so stop the loop - break - - pageToken = response['nextPageToken'] - if not pageToken: + pageToken = blobs.next_page_token + if pageToken is None: # empty next page token break return ids @@ -405,23 +328,13 @@ def get_size(self, bucket, object): self.log.info('Checking the file size of object: %s in bucket: %s', object, bucket) - service = self.get_conn() - try: - response = service.objects().get( - bucket=bucket, - object=object - ).execute() - - if 'name' in response and response['name'][-1] != '/': - # Remove Directories & Just check size of files - size = response['size'] - self.log.info('The file size of %s is %s bytes.', object, size) - return size - else: - raise ValueError('Object is not a file') - except HttpError as ex: - if ex.resp['status'] == '404': - raise ValueError('Object Not Found') + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.reload() + blob_size = blob.size + self.log.info('The file size of %s is %s bytes.', object, blob_size) + return blob_size def get_crc32c(self, bucket, object): """ @@ -435,20 +348,13 @@ def get_crc32c(self, bucket, object): """ self.log.info('Retrieving the crc32c checksum of ' 'object: %s in bucket: %s', object, bucket) - service = self.get_conn() - try: - response = service.objects().get( - bucket=bucket, - object=object - ).execute() - - crc32c = response['crc32c'] - self.log.info('The crc32c checksum of %s is %s', object, crc32c) - return crc32c - - except HttpError as ex: - if ex.resp['status'] == '404': - raise ValueError('Object Not Found') + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.reload() + blob_crc32c = blob.crc32c + self.log.info('The crc32c checksum of %s is %s', object, blob_crc32c) + return blob_crc32c def get_md5hash(self, bucket, object): """ @@ -462,21 +368,16 @@ def get_md5hash(self, bucket, object): """ self.log.info('Retrieving the MD5 hash of ' 'object: %s in bucket: %s', object, bucket) - service = self.get_conn() - try: - response = service.objects().get( - bucket=bucket, - object=object - ).execute() - - md5hash = response['md5Hash'] - self.log.info('The md5Hash of %s is %s', object, md5hash) - return md5hash - - except HttpError as ex: - if ex.resp['status'] == '404': - raise ValueError('Object Not Found') - + client = self.get_conn() + bucket = client.get_bucket(bucket) + blob = bucket.blob(blob_name=object) + blob.reload() + blob_md5hash = blob.md5_hash + self.log.info('The md5Hash of %s is %s', object, blob_md5hash) + return blob_md5hash + + @GoogleCloudBaseHook.catch_http_exception + @GoogleCloudBaseHook.fallback_to_default_project_id def create_bucket(self, bucket_name, resource=None, @@ -526,57 +427,23 @@ def create_bucket(self, :return: If successful, it returns the ``id`` of the bucket. """ - project_id = project_id if project_id is not None else self.project_id - storage_classes = [ - 'MULTI_REGIONAL', - 'REGIONAL', - 'NEARLINE', - 'COLDLINE', - 'STANDARD', # alias for MULTI_REGIONAL/REGIONAL, based on location - ] - self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s', bucket_name, location, storage_class) - if storage_class not in storage_classes: - raise ValueError( - 'Invalid value ({}) passed to storage_class. Value should be ' - 'one of {}'.format(storage_class, storage_classes)) - if not re.match('[a-zA-Z0-9]+', bucket_name[0]): - raise ValueError('Bucket names must start with a number or letter.') - - if not re.match('[a-zA-Z0-9]+', bucket_name[-1]): - raise ValueError('Bucket names must end with a number or letter.') - - service = self.get_conn() + client = self.get_conn() + bucket = client.bucket(bucket_name=bucket_name) bucket_resource = resource or {} - bucket_resource.update({ - 'name': bucket_name, - 'location': location, - 'storageClass': storage_class - }) - - self.log.info('The Default Project ID is %s', self.project_id) - - if labels is not None: - bucket_resource['labels'] = labels - - try: - response = service.buckets().insert( - project=project_id, - body=bucket_resource - ).execute() - - self.log.info('Bucket: %s created successfully.', bucket_name) - return response['id'] + for item in bucket_resource: + if item != "name": + bucket._patch_property(item, resource[item]) - except HttpError as ex: - raise AirflowException( - 'Bucket creation failed. Error was: {}'.format(ex.content) - ) + bucket.storage_class = storage_class + bucket.labels = labels or {} + bucket.create(project=project_id, location=location) + return bucket.id - def insert_bucket_acl(self, bucket, entity, role, user_project): + def insert_bucket_acl(self, bucket, entity, role, user_project=None): """ Creates a new ACL entry on the specified bucket. See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert @@ -596,25 +463,17 @@ def insert_bucket_acl(self, bucket, entity, role, user_project): :type user_project: str """ self.log.info('Creating a new ACL entry in bucket: %s', bucket) - service = self.get_conn() - try: - response = service.bucketAccessControls().insert( - bucket=bucket, - body={ - "entity": entity, - "role": role - }, - userProject=user_project - ).execute() - if response: - self.log.info('A new ACL entry created in bucket: %s', bucket) - except HttpError as ex: - raise AirflowException( - 'Bucket ACL entry creation failed. Error was: {}'.format(ex.content) - ) + client = self.get_conn() + bucket = client.bucket(bucket_name=bucket) + bucket.acl.reload() + bucket.acl.entity_from_dict(entity_dict={"entity": entity, "role": role}) + if user_project: + bucket.acl.user_project = user_project + bucket.acl.save() + + self.log.info('A new ACL entry created in bucket: %s', bucket) - def insert_object_acl(self, bucket, object_name, entity, role, generation, - user_project): + def insert_object_acl(self, bucket, object_name, entity, role, user_project=None): """ Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert @@ -633,36 +492,26 @@ def insert_object_acl(self, bucket, object_name, entity, role, generation, :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :type role: str - :param generation: (Optional) If present, selects a specific revision of this - object (as opposed to the latest version, the default). - :type generation: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """ self.log.info('Creating a new ACL entry for object: %s in bucket: %s', object_name, bucket) - service = self.get_conn() - try: - response = service.objectAccessControls().insert( - bucket=bucket, - object=object_name, - body={ - "entity": entity, - "role": role - }, - generation=generation, - userProject=user_project - ).execute() - if response: - self.log.info('A new ACL entry created for object: %s in bucket: %s', - object_name, bucket) - except HttpError as ex: - raise AirflowException( - 'Object ACL entry creation failed. Error was: {}'.format(ex.content) - ) + client = self.get_conn() + bucket = client.bucket(bucket_name=bucket) + blob = bucket.blob(object_name) + # Reload fetches the current ACL from Cloud Storage. + blob.acl.reload() + blob.acl.entity_from_dict(entity_dict={"entity": entity, "role": role}) + if user_project: + blob.acl.user_project = user_project + blob.acl.save() + + self.log.info('A new ACL entry created for object: %s in bucket: %s', + object_name, bucket) - def compose(self, bucket, source_objects, destination_object, num_retries=5): + def compose(self, bucket, source_objects, destination_object): """ Composes a list of existing object into a new object in the same storage bucket @@ -684,31 +533,17 @@ def compose(self, bucket, source_objects, destination_object, num_retries=5): if not source_objects or not len(source_objects): raise ValueError('source_objects cannot be empty.') - if not bucket or not destination_object: - raise ValueError('bucket and destination_object cannot be empty.') - - service = self.get_conn() - - dict_source_objects = [{'name': source_object} - for source_object in source_objects] - body = { - 'sourceObjects': dict_source_objects - } - - try: - self.log.info("Composing %s to %s in the bucket %s", - source_objects, destination_object, bucket) - service \ - .objects() \ - .compose(destinationBucket=bucket, - destinationObject=destination_object, - body=body) \ - .execute(num_retries=num_retries) - return True - except HttpError as ex: - if ex.resp['status'] == '404': - return False - raise + self.log.info("Composing %s to %s in the bucket %s", + source_objects, destination_object, bucket) + client = self.get_conn() + bucket = client.get_bucket(bucket) + destination_blob = bucket.blob(destination_object) + destination_blob.compose( + sources=[ + bucket.blob(blob_name=source_object) for source_object in source_objects + ]) + + self.log.info("Completed successfully.") def _parse_gcs_url(gsurl): diff --git a/setup.py b/setup.py index f6ff21577de2e..965601cdc60fb 100644 --- a/setup.py +++ b/setup.py @@ -181,6 +181,7 @@ def write_version(filename=os.path.join(*['airflow', 'google-cloud-container>=0.1.1', 'google-cloud-language>=1.1.1', 'google-cloud-spanner>=1.7.1', + 'google-cloud-storage~=1.14.0', 'google-cloud-translate>=1.3.3', 'google-cloud-vision>=0.35.2', 'google-cloud-texttospeech>=0.4.0', From bca3d0b5ce54dd679c86b98075f819de201f5bc7 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 02:41:02 +0100 Subject: [PATCH 02/11] Add tests --- airflow/contrib/hooks/gcs_hook.py | 49 +-- tests/contrib/hooks/test_gcs_hook.py | 443 ++++++++------------------- 2 files changed, 163 insertions(+), 329 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 9569727b79267..9afb4ae7e1e69 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -17,15 +17,15 @@ # specific language governing permissions and limitations # under the License. # +import gzip as gz +import os +import shutil from google.cloud import storage from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.exceptions import AirflowException -import gzip as gz -import shutil - class GoogleCloudStorageHook(GoogleCloudBaseHook): """ @@ -46,8 +46,7 @@ def get_conn(self): Returns a Google Cloud Storage service object. """ if not self._conn: - self._conn = storage.Client(credentials=self._get_credentials(), - project=self.project_id) + self._conn = storage.Client(credentials=self._get_credentials()) return self._conn @@ -175,7 +174,8 @@ def download(self, bucket, object, filename=None): return blob.download_as_string() # pylint:disable=redefined-builtin - def upload(self, bucket, object, filename, gzip=False): + def upload(self, bucket, object, filename, + mime_type='application/octet-stream', gzip=False): """ Uploads a local file to Google Cloud Storage. @@ -185,6 +185,8 @@ def upload(self, bucket, object, filename, gzip=False): :type object: str :param filename: The local file path to the file to be uploaded. :type filename: str + :param mime_type: The MIME type to set when uploading the file. + :type mime_type: str :param gzip: Option to compress file for upload :type gzip: bool """ @@ -198,9 +200,13 @@ def upload(self, bucket, object, filename, gzip=False): filename = filename_gz client = self.get_conn() - bucket = client.get_bucket(bucket) + bucket = client.get_bucket(bucket_name=bucket) blob = bucket.blob(blob_name=object) - blob.upload_from_filename(filename=filename) + blob.upload_from_filename(filename=filename, + content_type=mime_type) + + if gzip: + os.remove(filename) self.log.info('File %s uploaded to %s in %s bucket', filename, object, bucket) # pylint:disable=redefined-builtin @@ -215,7 +221,7 @@ def exists(self, bucket, object): :type object: str """ client = self.get_conn() - bucket = client.get_bucket(bucket) + bucket = client.get_bucket(bucket_name=bucket) blob = bucket.blob(blob_name=object) return blob.exists() @@ -233,8 +239,8 @@ def is_updated_after(self, bucket, object, ts): :type ts: datetime.datetime """ client = self.get_conn() - bucket = client.get_bucket(bucket) - blob = bucket.blob(blob_name=object) + bucket = storage.Bucket(client=client, name=bucket) + blob = bucket.get_blob(blob_name=object) blob.reload() blob_update_time = blob.updated @@ -262,8 +268,8 @@ def delete(self, bucket, object): :type object: str """ client = self.get_conn() - bucket = client.get_bucket(bucket) - blob = bucket.blob(blob_name=object) + bucket = storage.Bucket(client=client, name=bucket) + blob = bucket.get_blob(blob_name=object) blob.delete() self.log.info('Blob %s deleted.', object) @@ -329,8 +335,8 @@ def get_size(self, bucket, object): object, bucket) client = self.get_conn() - bucket = client.get_bucket(bucket) - blob = bucket.blob(blob_name=object) + bucket = storage.Bucket(client=client, name=bucket) + blob = bucket.get_blob(blob_name=object) blob.reload() blob_size = blob.size self.log.info('The file size of %s is %s bytes.', object, blob_size) @@ -349,8 +355,8 @@ def get_crc32c(self, bucket, object): self.log.info('Retrieving the crc32c checksum of ' 'object: %s in bucket: %s', object, bucket) client = self.get_conn() - bucket = client.get_bucket(bucket) - blob = bucket.blob(blob_name=object) + bucket = storage.Bucket(client=client, name=bucket) + blob = bucket.get_blob(blob_name=object) blob.reload() blob_crc32c = blob.crc32c self.log.info('The crc32c checksum of %s is %s', object, blob_crc32c) @@ -369,8 +375,8 @@ def get_md5hash(self, bucket, object): self.log.info('Retrieving the MD5 hash of ' 'object: %s in bucket: %s', object, bucket) client = self.get_conn() - bucket = client.get_bucket(bucket) - blob = bucket.blob(blob_name=object) + bucket = storage.Bucket(client=client, name=bucket) + blob = bucket.get_blob(blob_name=object) blob.reload() blob_md5hash = blob.md5_hash self.log.info('The md5Hash of %s is %s', object, blob_md5hash) @@ -436,7 +442,7 @@ def create_bucket(self, for item in bucket_resource: if item != "name": - bucket._patch_property(item, resource[item]) + bucket._patch_property(name=item, value=resource[item]) bucket.storage_class = storage_class bucket.labels = labels or {} @@ -533,6 +539,9 @@ def compose(self, bucket, source_objects, destination_object): if not source_objects or not len(source_objects): raise ValueError('source_objects cannot be empty.') + if not bucket or not destination_object: + raise ValueError('bucket and destination_object cannot be empty.') + self.log.info("Composing %s to %s in the bucket %s", source_objects, destination_object, bucket) client = self.get_conn() diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index fbd1a82b31490..12bee53d76977 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -25,11 +25,15 @@ from airflow.exceptions import AirflowException from googleapiclient.errors import HttpError from tests.compat import mock +from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id +from google.cloud import storage +from google.cloud import exceptions BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}' GCS_STRING = 'airflow.contrib.hooks.gcs_hook.{}' EMPTY_CONTENT = ''.encode('utf8') +PROJECT_ID_TEST = 'project-id' class TestGCSHookHelperFunctions(unittest.TestCase): @@ -56,128 +60,73 @@ def test_parse_gcs_url(self): gcs_hook._parse_gcs_url('gs://bucket/'), ('bucket', '')) -class TestGCSBucket(unittest.TestCase): - def test_bucket_name_value(self): - - bad_start_bucket_name = '/testing123' - with self.assertRaises(ValueError): - - gcs_hook.GoogleCloudStorageHook().create_bucket( - bucket_name=bad_start_bucket_name - ) - - bad_end_bucket_name = 'testing123/' - with self.assertRaises(ValueError): - gcs_hook.GoogleCloudStorageHook().create_bucket( - bucket_name=bad_end_bucket_name - ) - - class TestGoogleCloudStorageHook(unittest.TestCase): def setUp(self): - with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__')): + with mock.patch( + GCS_STRING.format('GoogleCloudBaseHook.__init__'), + new=mock_base_gcp_hook_default_project_id, + ): self.gcs_hook = gcs_hook.GoogleCloudStorageHook( - google_cloud_storage_conn_id='test' - ) + google_cloud_storage_conn_id='test') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_exists(self, mock_service): - test_bucket = 'test_bucket' test_object = 'test_object' - (mock_service.return_value.objects.return_value - .get.return_value.execute.return_value) = { - "kind": "storage#object", - # The ID of the object, including the bucket name, - # object name, and generation number. - "id": "{}/{}/1521132662504504".format(test_bucket, test_object), - "name": test_object, - "bucket": test_bucket, - "generation": "1521132662504504", - "contentType": "text/csv", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "89", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "metadata": { - "md5-hash": "95e614241516ad1b64b3551e50538d25" - }, - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - } + # Given + get_bucket_mock = mock_service.return_value.get_bucket + blob_object = get_bucket_mock.return_value.blob + exists_method = blob_object.return_value.exists + exists_method.return_value = True + # When response = self.gcs_hook.exists(bucket=test_bucket, object=test_object) + # Then self.assertTrue(response) + get_bucket_mock.assert_called_once_with(bucket_name=test_bucket) + blob_object.assert_called_once_with(blob_name=test_object) + exists_method.assert_called_once() @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_exists_nonexisting_object(self, mock_service): - test_bucket = 'test_bucket' test_object = 'test_object' - (mock_service.return_value.objects.return_value - .get.return_value.execute.side_effect) = HttpError( - resp={'status': '404'}, content=EMPTY_CONTENT) + # Given + get_bucket_mock = mock_service.return_value.get_bucket + blob_object = get_bucket_mock.return_value.blob + exists_method = blob_object.return_value.exists + exists_method.return_value = False + # When response = self.gcs_hook.exists(bucket=test_bucket, object=test_object) + # Then self.assertFalse(response) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_copy(self, mock_service): + def test_copy(self, mock_service, mock_bucket): source_bucket = 'test-source-bucket' source_object = 'test-source-object' destination_bucket = 'test-dest-bucket' destination_object = 'test-dest-object' - (mock_service.return_value.objects.return_value - .get.return_value.execute.return_value) = { - "kind": "storage#object", - # The ID of the object, including the bucket name, object name, - # and generation number. - "id": "{}/{}/1521132662504504".format( - destination_bucket, destination_object), - "name": destination_object, - "bucket": destination_bucket, - "generation": "1521132662504504", - "contentType": "text/csv", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "89", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "metadata": { - "md5-hash": "95e614241516ad1b64b3551e50538d25" - }, - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - } - - response = self.gcs_hook.copy( - source_bucket=source_bucket, - source_object=source_object, - destination_bucket=destination_bucket, - destination_object=destination_object - ) + destination_bucket_instance = mock_bucket + source_blob = mock_bucket.blob(source_object) + destination_blob = storage.Blob( + bucket=destination_bucket_instance, + name=destination_object) - self.assertTrue(response) - - @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_copy_failedcopy(self, mock_service): - source_bucket = 'test-source-bucket' - source_object = 'test-source-object' - destination_bucket = 'test-dest-bucket' - destination_object = 'test-dest-object' - - (mock_service.return_value.objects.return_value - .copy.return_value.execute.side_effect) = HttpError( - resp={'status': '404'}, content=EMPTY_CONTENT) + # Given + get_bucket_mock = mock_service.return_value.get_bucket + get_bucket_mock.return_value = mock_bucket + copy_method = get_bucket_mock.return_value.copy_blob + copy_method.return_value = destination_blob + # When response = self.gcs_hook.copy( source_bucket=source_bucket, source_object=source_object, @@ -185,10 +134,15 @@ def test_copy_failedcopy(self, mock_service): destination_object=destination_object ) - self.assertFalse(response) + # Then + self.assertEqual(response, None) + copy_method.assert_called_once_with( + blob=source_blob, + destination_bucket=destination_bucket_instance, + new_name=destination_object + ) def test_copy_fail_same_source_and_destination(self): - source_bucket = 'test-source-bucket' source_object = 'test-source-object' destination_bucket = 'test-source-bucket' @@ -208,7 +162,6 @@ def test_copy_fail_same_source_and_destination(self): ) def test_copy_empty_source_bucket(self): - source_bucket = None source_object = 'test-source-object' destination_bucket = 'test-dest-bucket' @@ -226,7 +179,6 @@ def test_copy_empty_source_bucket(self): ) def test_copy_empty_source_object(self): - source_bucket = 'test-source-object' source_object = None destination_bucket = 'test-dest-bucket' @@ -243,109 +195,76 @@ def test_copy_empty_source_object(self): 'source_bucket and source_object cannot be empty.' ) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_rewrite(self, mock_service): + def test_rewrite(self, mock_service, mock_bucket): source_bucket = 'test-source-bucket' source_object = 'test-source-object' destination_bucket = 'test-dest-bucket' destination_object = 'test-dest-object' - # First response has `done` equals False has it has not completed copying - # It also has `rewriteToken` which would be passed to the second call - # to the api. - first_response = { - "kind": "storage#rewriteResponse", - "totalBytesRewritten": "9111", - "objectSize": "9111", - "done": False, - "rewriteToken": "testRewriteToken" - } - - second_response = { - "kind": "storage#rewriteResponse", - "totalBytesRewritten": "9111", - "objectSize": "9111", - "done": True, - "resource": { - "kind": "storage#object", - # The ID of the object, including the bucket name, - # object name, and generation number. - "id": "{}/{}/1521132662504504".format( - destination_bucket, destination_object), - "name": destination_object, - "bucket": destination_bucket, - "generation": "1521132662504504", - "contentType": "text/csv", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "9111", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "metadata": { - "md5-hash": "95e614241516ad1b64b3551e50538d25" - }, - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - } - } - - (mock_service.return_value.objects.return_value - .rewrite.return_value.execute.side_effect) = [first_response, second_response] - - result = self.gcs_hook.rewrite( + source_blob = mock_bucket.blob(source_object) + + # Given + get_bucket_mock = mock_service.return_value.get_bucket + get_bucket_mock.return_value = mock_bucket + get_blob_method = get_bucket_mock.return_value.blob + rewrite_method = get_blob_method.return_value.rewrite + rewrite_method.side_effect = [(None, mock.ANY, mock.ANY), (mock.ANY, mock.ANY, mock.ANY)] + + # When + response = self.gcs_hook.rewrite( source_bucket=source_bucket, source_object=source_object, destination_bucket=destination_bucket, - destination_object=destination_object - ) + destination_object=destination_object) - self.assertTrue(result) - mock_service.return_value.objects.return_value.rewrite.assert_called_with( - sourceBucket=source_bucket, - sourceObject=source_object, - destinationBucket=destination_bucket, - destinationObject=destination_object, - rewriteToken=first_response['rewriteToken'], - body='' - ) + # Then + self.assertEqual(response, None) + rewrite_method.assert_called_once_with( + source=source_blob) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_delete(self, mock_service): + def test_delete(self, mock_service, mock_bucket): test_bucket = 'test_bucket' test_object = 'test_object' + blob_to_be_deleted = storage.Blob(name=test_object, bucket=mock_bucket) - (mock_service.return_value.objects.return_value - .delete.return_value.execute.return_value) = {} + get_bucket_method = mock_service.return_value.get_bucket + get_blob_method = get_bucket_method.return_value.get_blob + delete_method = get_blob_method.return_value.delete + delete_method.return_value = blob_to_be_deleted response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) + self.assertIsNone(response) - self.assertTrue(response) - - @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_delete_nonexisting_object(self, mock_service): + @mock.patch('google.cloud.storage.Bucket') + def test_delete_nonexisting_object(self, mock_bucket): test_bucket = 'test_bucket' test_object = 'test_object' - (mock_service.return_value.objects.return_value - .delete.return_value.execute.side_effect) = HttpError( - resp={'status': '404'}, content=EMPTY_CONTENT) - - response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) + get_blob_method = mock_bucket.return_value.get_blob + delete_method = get_blob_method.return_value.delete + delete_method.side_effect = exceptions.NotFound(message="Not Found") - self.assertFalse(response) + with self.assertRaises(exceptions.NotFound): + self.gcs_hook.delete(bucket=test_bucket, object=test_object) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_create_bucket(self, mock_service): + def test_create_bucket(self, mock_service, mock_bucket): test_bucket = 'test_bucket' test_project = 'test-project' - test_location = 'EU', + test_location = 'EU' test_labels = {'env': 'prod'} test_storage_class = 'MULTI_REGIONAL' - test_response_id = "{}/0123456789012345".format(test_bucket) - (mock_service.return_value.buckets.return_value - .insert.return_value.execute.return_value) = {"id": test_response_id} + mock_service.return_value.bucket.return_value.create.return_value = None + mock_bucket.return_value.storage_class = test_storage_class + mock_bucket.return_value.labels = test_labels + + sample_bucket = mock_service().bucket(bucket_name=test_bucket) response = self.gcs_hook.create_bucket( bucket_name=test_bucket, @@ -355,59 +274,63 @@ def test_create_bucket(self, mock_service): project_id=test_project ) - self.assertEqual(response, test_response_id) - mock_service.return_value.buckets.return_value.insert.assert_called_with( - project=test_project, - body={ - 'name': test_bucket, - 'location': test_location, - 'storageClass': test_storage_class, - 'labels': test_labels - } + self.assertEquals(response, sample_bucket.id) + + self.assertEquals(sample_bucket.storage_class, test_storage_class) + self.assertEquals(sample_bucket.labels, test_labels) + + mock_service.return_value.bucket.return_value.create.assert_called_with( + project=test_project, location=test_location ) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_create_bucket_with_resource(self, mock_service): + def test_create_bucket_with_resource(self, mock_service, mock_bucket): test_bucket = 'test_bucket' test_project = 'test-project' - test_location = 'EU', + test_location = 'EU' test_labels = {'env': 'prod'} test_storage_class = 'MULTI_REGIONAL' - test_response_id = "{}/0123456789012345".format(test_bucket) - test_lifecycle = {"rule": [{"action": {"type": "Delete"}, "condition": {"age": 7}}]} + test_versioning_enabled = {"enabled": True} + + mock_service.return_value.bucket.return_value.create.return_value = None + mock_bucket.return_value.storage_class = test_storage_class + mock_bucket.return_value.labels = test_labels + mock_bucket.return_value.versioning_enabled = True - (mock_service.return_value.buckets.return_value - .insert.return_value.execute.return_value) = {"id": test_response_id} + sample_bucket = mock_service().bucket(bucket_name=test_bucket) + # sample_bucket = storage.Bucket(client=mock_service, name=test_bucket) # Assert for resource other than None. response = self.gcs_hook.create_bucket( bucket_name=test_bucket, - resource={"lifecycle": {"rule": [{"action": {"type": "Delete"}, "condition": {"age": 7}}]}}, + resource={"versioning": test_versioning_enabled}, storage_class=test_storage_class, location=test_location, labels=test_labels, project_id=test_project ) + self.assertEquals(response, sample_bucket.id) - self.assertEqual(response, test_response_id) - mock_service.return_value.buckets.return_value.insert.assert_called_with( - project=test_project, - body={ - "lifecycle": test_lifecycle, - 'name': test_bucket, - 'location': test_location, - 'storageClass': test_storage_class, - 'labels': test_labels - } + mock_service.return_value.bucket.return_value._patch_property.assert_called_with( + name='versioning', value=test_versioning_enabled ) + mock_service.return_value.bucket.return_value.create.assert_called_with( + project=test_project, location=test_location + ) + + @mock.patch('google.cloud.storage.Bucket.blob') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_compose(self, mock_service): + def test_compose(self, mock_service, mock_blob): test_bucket = 'test_bucket' test_source_objects = ['test_object_1', 'test_object_2', 'test_object_3'] test_destination_object = 'test_object_composed' - method = (mock_service.return_value.objects.return_value.compose) + mock_service.return_value.get_bucket.return_value\ + .blob.return_value = mock_blob(blob_name=mock.ANY) + method = mock_service.return_value.get_bucket.return_value.blob\ + .return_value.compose self.gcs_hook.compose( bucket=test_bucket, @@ -415,19 +338,10 @@ def test_compose(self, mock_service): destination_object=test_destination_object ) - body = { - 'sourceObjects': [ - {'name': 'test_object_1'}, - {'name': 'test_object_2'}, - {'name': 'test_object_3'} - ] - } - method.assert_called_once_with( - destinationBucket=test_bucket, - destinationObject=test_destination_object, - body=body - ) + sources=[ + mock_blob(blob_name=source_object) for source_object in test_source_objects + ]) @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_compose_with_empty_source_objects(self, mock_service): @@ -504,121 +418,32 @@ def test_upload(self, mock_service): test_bucket = 'test_bucket' test_object = 'test_object' - (mock_service.return_value.objects.return_value - .insert.return_value.execute.return_value) = { - "kind": "storage#object", - "id": "{}/{}/0123456789012345".format(test_bucket, test_object), - "name": test_object, - "bucket": test_bucket, - "generation": "0123456789012345", - "contentType": "application/octet-stream", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "393216", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - } + upload_method = mock_service.return_value.get_bucket.return_value\ + .blob.return_value.upload_from_filename + upload_method.return_value = None response = self.gcs_hook.upload(test_bucket, test_object, self.testfile.name) - self.assertTrue(response) + self.assertIsNone(response) + upload_method.assert_called_once_with( + filename=self.testfile.name, + content_type='application/octet-stream' + ) @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_upload_gzip(self, mock_service): test_bucket = 'test_bucket' test_object = 'test_object' - (mock_service.return_value.objects.return_value - .insert.return_value.execute.return_value) = { - "kind": "storage#object", - "id": "{}/{}/0123456789012345".format(test_bucket, test_object), - "name": test_object, - "bucket": test_bucket, - "generation": "0123456789012345", - "contentType": "application/octet-stream", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "393216", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - } + upload_method = mock_service.return_value.get_bucket.return_value \ + .blob.return_value.upload_from_filename + upload_method.return_value = None response = self.gcs_hook.upload(test_bucket, test_object, self.testfile.name, gzip=True) self.assertFalse(os.path.exists(self.testfile.name + '.gz')) - self.assertTrue(response) - - @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_upload_gzip_error(self, mock_service): - test_bucket = 'test_bucket' - test_object = 'test_object' - - (mock_service.return_value.objects.return_value - .insert.return_value.execute.side_effect) = HttpError( - resp={'status': '404'}, content=EMPTY_CONTENT) - - response = self.gcs_hook.upload(test_bucket, - test_object, - self.testfile.name, - gzip=True) - self.assertFalse(os.path.exists(self.testfile.name + '.gz')) - self.assertFalse(response) - - @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_upload_multipart(self, mock_service): - test_bucket = 'test_bucket' - test_object = 'test_object' - - class MockProgress: - def __init__(self, value): - self.value = value - - def progress(self): - return self.value - - (mock_service.return_value.objects.return_value - .insert.return_value.next_chunk.side_effect) = [ - (MockProgress(0.66), None), - (MockProgress(1.0), { - "kind": "storage#object", - "id": "{}/{}/0123456789012345".format(test_bucket, test_object), - "name": test_object, - "bucket": test_bucket, - "generation": "0123456789012345", - "contentType": "application/octet-stream", - "timeCreated": "2018-03-15T16:51:02.502Z", - "updated": "2018-03-15T16:51:02.502Z", - "storageClass": "MULTI_REGIONAL", - "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z", - "size": "393216", - "md5Hash": "leYUJBUWrRtks1UeUFONJQ==", - "crc32c": "xgdNfQ==", - "etag": "CLf4hODk7tkCEAE=" - }) - ] - - response = self.gcs_hook.upload(test_bucket, - test_object, - self.testfile.name, - multipart=True) - - self.assertTrue(response) - - @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) - def test_upload_multipart_wrong_chunksize(self, mock_service): - test_bucket = 'test_bucket' - test_object = 'test_object' - - with self.assertRaises(ValueError): - self.gcs_hook.upload(test_bucket, test_object, - self.testfile.name, multipart=123) + self.assertIsNone(response) From 60e0d01c1e945c4dbf5cf4a00388bee512e9df4b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 02:48:03 +0100 Subject: [PATCH 03/11] GCS Download corrections --- airflow/contrib/hooks/gcs_hook.py | 6 ++---- airflow/contrib/operators/gcs_download_operator.py | 3 ++- airflow/models/xcom.py | 4 ++++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 9afb4ae7e1e69..0719b958a9625 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -168,10 +168,8 @@ def download(self, bucket, object, filename=None): if filename: blob.download_to_filename(filename) self.log.info('File downloaded to %s', filename) - # MAX XCOM Size is 48KB - # https://github.com/apache/airflow/pull/1618#discussion_r68249677 - if blob.size < 49344: - return blob.download_as_string() + + return blob.download_as_string() # pylint:disable=redefined-builtin def upload(self, bucket, object, filename, diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py index 1d168d4660722..4c0d117d994a9 100644 --- a/airflow/contrib/operators/gcs_download_operator.py +++ b/airflow/contrib/operators/gcs_download_operator.py @@ -21,6 +21,7 @@ from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.models import BaseOperator +from airflow.models.xcom import MAX_XCOM_SIZE from airflow.utils.decorators import apply_defaults @@ -82,7 +83,7 @@ def execute(self, context): object=self.object, filename=self.filename) if self.store_to_xcom_key: - if sys.getsizeof(file_bytes) < 48000: + if sys.getsizeof(file_bytes) < MAX_XCOM_SIZE: context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes) else: raise RuntimeError( diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f097b49a8c1ba..8fd2f711bc906 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -31,6 +31,10 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.sqlalchemy import UtcDateTime +# MAX XCOM Size is 48KB +# https://github.com/apache/airflow/pull/1618#discussion_r68249677 +MAX_XCOM_SIZE = 49344 + class XCom(Base, LoggingMixin): """ From c66ee8626ce197b961d4a66a4bdd462da26bc969 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 02:54:42 +0100 Subject: [PATCH 04/11] Update test_gcs_hook.py --- tests/contrib/hooks/test_gcs_hook.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index 12bee53d76977..03a01cd89787b 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -23,7 +23,6 @@ from airflow.contrib.hooks import gcs_hook from airflow.exceptions import AirflowException -from googleapiclient.errors import HttpError from tests.compat import mock from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id from google.cloud import storage From 681cfc86a62abb985a41fba13f3db40c7927cf22 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 09:14:00 +0100 Subject: [PATCH 05/11] Fix test --- tests/contrib/hooks/test_gcs_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index 03a01cd89787b..7e6214a76ada5 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -86,7 +86,7 @@ def test_exists(self, mock_service): self.assertTrue(response) get_bucket_mock.assert_called_once_with(bucket_name=test_bucket) blob_object.assert_called_once_with(blob_name=test_object) - exists_method.assert_called_once() + exists_method.assert_called_once_with() @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_exists_nonexisting_object(self, mock_service): From dbbf54a623ea47bdcd8a1f1202734ead0daefafb Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 10:45:07 +0100 Subject: [PATCH 06/11] Update Bucket -> get_bucket --- airflow/contrib/hooks/gcs_hook.py | 10 +++++----- tests/contrib/hooks/test_gcs_hook.py | 9 +++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 0719b958a9625..068ba7cb4ea9d 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -266,8 +266,8 @@ def delete(self, bucket, object): :type object: str """ client = self.get_conn() - bucket = storage.Bucket(client=client, name=bucket) - blob = bucket.get_blob(blob_name=object) + bucket = client.get_bucket(bucket_name=bucket) + blob = bucket.blob(blob_name=object) blob.delete() self.log.info('Blob %s deleted.', object) @@ -333,7 +333,7 @@ def get_size(self, bucket, object): object, bucket) client = self.get_conn() - bucket = storage.Bucket(client=client, name=bucket) + bucket = client.get_bucket(bucket_name=bucket) blob = bucket.get_blob(blob_name=object) blob.reload() blob_size = blob.size @@ -353,7 +353,7 @@ def get_crc32c(self, bucket, object): self.log.info('Retrieving the crc32c checksum of ' 'object: %s in bucket: %s', object, bucket) client = self.get_conn() - bucket = storage.Bucket(client=client, name=bucket) + bucket = client.get_bucket(bucket_name=bucket) blob = bucket.get_blob(blob_name=object) blob.reload() blob_crc32c = blob.crc32c @@ -373,7 +373,7 @@ def get_md5hash(self, bucket, object): self.log.info('Retrieving the MD5 hash of ' 'object: %s in bucket: %s', object, bucket) client = self.get_conn() - bucket = storage.Bucket(client=client, name=bucket) + bucket = client.get_bucket(bucket_name=bucket) blob = bucket.get_blob(blob_name=object) blob.reload() blob_md5hash = blob.md5_hash diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index 7e6214a76ada5..40590564d8fb1 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -238,13 +238,14 @@ def test_delete(self, mock_service, mock_bucket): response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) self.assertIsNone(response) - @mock.patch('google.cloud.storage.Bucket') - def test_delete_nonexisting_object(self, mock_bucket): + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_delete_nonexisting_object(self, mock_service): test_bucket = 'test_bucket' test_object = 'test_object' - get_blob_method = mock_bucket.return_value.get_blob - delete_method = get_blob_method.return_value.delete + get_bucket_method = mock_service.return_value.get_bucket + blob = get_bucket_method.return_value.blob + delete_method = blob.return_value.delete delete_method.side_effect = exceptions.NotFound(message="Not Found") with self.assertRaises(exceptions.NotFound): From e3b6bdf16a5150f3bbc69def8deeac84e5075ade Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 11:00:45 +0100 Subject: [PATCH 07/11] Update UPDATING.md --- UPDATING.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index 1d5472c920321..e8df90b87db6c 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -24,6 +24,26 @@ assists users migrating to a new version. ## Airflow Master +### Changes to GoogleCloudStorageHook + +* replaced the discovery-based api (`googleapiclient.discovery`) to the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054) +* as a part of this replacement, the `multipart` & `num_retries` parameters for `GoogleCloudStorageHook.upload` method has been removed: + + **Old**: + ```python + def upload(self, bucket, object, filename, + mime_type='application/octet-stream', gzip=False, + multipart=False, num_retries=0): + ``` + + **New**: + ```python + def upload(self, bucket, object, filename, + mime_type='application/octet-stream', gzip=False): + ``` + +* the `generation` parameter is no longer supported in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`. + ### Changes to CloudantHook * upgraded cloudant version from `>=0.5.9,<2.0` to `>=2.0` From 81d883776691f76017ea259674ec708ab9dd674a Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 11:02:23 +0100 Subject: [PATCH 08/11] Update UPDATING.md --- UPDATING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/UPDATING.md b/UPDATING.md index e8df90b87db6c..e5063e28f6ba6 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -26,7 +26,7 @@ assists users migrating to a new version. ### Changes to GoogleCloudStorageHook -* replaced the discovery-based api (`googleapiclient.discovery`) to the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054) +* the discovery-based api (`googleapiclient.discovery`) used in `GoogleCloudStorageHook` is now replaced by the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054) * as a part of this replacement, the `multipart` & `num_retries` parameters for `GoogleCloudStorageHook.upload` method has been removed: **Old**: From 10615ffa98bc48f51964bda69189bbb68e7dcd31 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 13:13:33 +0100 Subject: [PATCH 09/11] Update version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 965601cdc60fb..2c9d13b23cb26 100644 --- a/setup.py +++ b/setup.py @@ -181,7 +181,7 @@ def write_version(filename=os.path.join(*['airflow', 'google-cloud-container>=0.1.1', 'google-cloud-language>=1.1.1', 'google-cloud-spanner>=1.7.1', - 'google-cloud-storage~=1.14.0', + 'google-cloud-storage~=1.14', 'google-cloud-translate>=1.3.3', 'google-cloud-vision>=0.35.2', 'google-cloud-texttospeech>=0.4.0', From 772944b3ff596a38d1817a853cb5981281cc9bbe Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 13:38:36 +0100 Subject: [PATCH 10/11] Update gcs_hook.py --- airflow/contrib/hooks/gcs_hook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 068ba7cb4ea9d..3cc6efd36aa61 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -253,8 +253,8 @@ def is_updated_after(self, bucket, object, ts): if blob_update_time > ts: return True - else: - return False + + return False def delete(self, bucket, object): """ From 44b5e446404a24e02ce4ace70adcf4c5b17cc8bf Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Mon, 8 Apr 2019 23:55:22 +0100 Subject: [PATCH 11/11] Update UPDATING.md --- UPDATING.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index e5063e28f6ba6..84145a32242ae 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -41,6 +41,8 @@ assists users migrating to a new version. def upload(self, bucket, object, filename, mime_type='application/octet-stream', gzip=False): ``` + + The client library uses multipart upload automatically if the object/blob size is more than 8 MB - [source code](https://github.com/googleapis/google-cloud-python/blob/11c543ce7dd1d804688163bc7895cf592feb445f/storage/google/cloud/storage/blob.py#L989-L997). * the `generation` parameter is no longer supported in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`.