Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4255] Replace Discovery based api with client based for GCS #5054

Merged
merged 12 commits into from
Apr 9, 2019

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Apr 7, 2019

Make sure you have checked all steps below.

Jira

  • My PR addresses the following Airflow Jira issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR"

Description

Google Cloud Client Libraries use our latest client library model and are our recommended option for accessing Cloud APIs programmatically, where available.

https://pypi.org/project/google-cloud-storage/ library is available and we should be using that.

This is Part 1 of probably 3 parts. I am trying to not break any changes in this PR and keep it backward compatible so that we could include it in a patch or minor version release.

The 2nd & 3rd PR would contain some breaking changes and will contain notes in Updating.md

Tests

  • [] My PR adds the following unit tests OR does not need testing for this extremely good reason:
    The current tests already cover some and will add few more tests

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

Code Quality

  • Passes flake8

cc @fenglu-g

@kaxil kaxil requested a review from Fokko April 7, 2019 16:06
@kaxil kaxil force-pushed the replace-gcs-client-library branch from 6786d39 to 707e6ba Compare April 7, 2019 16:11
@kaxil kaxil force-pushed the replace-gcs-client-library branch from 707e6ba to dc16fc5 Compare April 7, 2019 16:13
@kaxil kaxil changed the title [AIRFLOW-4255] Replaces Discovery based api with client based for GCS [AIRFLOW-4255] Replace Discovery based api with client based for GCS Apr 7, 2019
@mik-laj
Copy link
Member

mik-laj commented Apr 7, 2019

Does this change require a note in file UPDATING.md? This hook is used by many custom operators

'storage', 'v1', http=http_authorized, cache_discovery=False)
if not self._conn:
self._conn = storage.Client(credentials=self._get_credentials(),
project=self.project_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other hooks, project_id is a method parameter. In this implementation, user can only pass project_id as a connection configuration. This introduces inconsistencies. What steps should we take to unify these situations for all GCP operator?

We have a 3 options:

  1. Specifying project_id in connection configuration.
  2. Specifying project_id in a method parameter with fallback to connection configuration
  3. Specifying project_id in a hook constructor parameter with fallback to connection configuration.

The third variant does not appear anywhere, but it seems to me most expected. Initalizing parameters are not mixed with execution time parameters. project_id is a parameter that initialize client library. It don't execute a API call.

Probably the wrong place for this discussion, but we should take steps to use each operator and hook for GCP to be identical.

CC: @potiuk @antonimaciej

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, let's discuss this and decide on this on the mailing list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we discuss this? region_name on various AWS hooks/operators have the same pattern (some take them as kwargs, some just from the connection)

pageToken=pageToken,
blobs = bucket.list_blobs(
max_results=maxResults,
page_token=pageToken,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is deprecated. Could you use a new way?

page_token (str) – (Optional) If present, return the next batch of blobs, using the value, which must correspond to the nextPageToken value returned in the previous response. Deprecated: use the pages property of the returned iterator instead of manually passing the token.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is in my todo list. As I wrote in the description of this PR, I want to keep this PR as backwards-compatible as possible, hence there is no note in Updating.md. I will add the note however as I think even though the function input and output are same, it still adds an extra dependency of google-cloud-storage, so I will do that.

I will take care of the Deprecated nextPageToken in the upcoming PR.

raise ValueError('Object Not Found')
client = self.get_conn()
bucket = client.get_bucket(bucket)
blob = bucket.blob(blob_name=object)
Copy link
Member

@mik-laj mik-laj Apr 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks like the get_blob method code.
Is this duplication intentional?
https://github.com/googleapis/google-cloud-python/blob/master/storage/google/cloud/storage/bucket.py#L691-L706

But in this case in not effective. It's do 2 calls to external API.

I write a sample script:

client = storage.Client()

bucket = client.get_bucket("instance-mb-test-1")
blob = bucket.get_blob('file-1.bin')
print("Blob size: ", blob.size)

On the screen a have a message:

DEBUG:urllib3.util.retry:Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None)
DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.googleapis.com:443
DEBUG:urllib3.connectionpool:https://www.googleapis.com:443 "GET /storage/v1/b/instance-mb-test-1?projection=noAcl HTTP/1.1" 200 447
DEBUG:urllib3.connectionpool:https://www.googleapis.com:443 "GET /storage/v1/b/instance-mb-test-1/o/file-1.bin HTTP/1.1" 200 753
Blob size:  104960000

It's confirm that you implementation do a two API calls (plus one call for authorization).
I am proposing that you use the code:

client = storage.Client()
bucket = storage.Bucket(client, "instance-mb-test-1")
blob = bucket.get_blob('file-1.bin')
print("Blob size: ", blob.size)

It's do one API call:

DEBUG:urllib3.util.retry:Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None)
DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.googleapis.com:443
DEBUG:urllib3.connectionpool:https://www.googleapis.com:443 "GET /storage/v1/b/instance-mb-test-1/o/file-1.bin HTTP/1.1" 200 753
Blob size:  104960000

It is important to optimize this methos, because it is often used in a loop, and therefore the number of queries is significant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inspired from Google's code and examples:

https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/storage/cloud-client/snippets.py

I don't get why there are 2 calls for one and not the other, may be I am missing something. Because it can either first get bucket object and then create a blob or create a bucket object and then get blob, looks same to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it in few places, though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an additional advantage in using both get_bucket and get_blob. The get_bucket method raises an error if the Bucket does not exists. The get_blob just returns None if the object doesn't exist.

I like this:

client = storage.Client()

bucket = client.get_bucket("instance-mb-test-1")
blob = bucket.get_blob('file-1.bin')
print("Blob size: ", blob.size)

Let me know what you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case (1):

image

Case (2):
image

I would like to sticket with get_bucket so that we get a meaningful error, rather than just None

Copy link
Member Author

@kaxil kaxil Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, get_blob() doesn't contain blob.reload in the latest stable google-cloud-storage release (1.14.0)

https://github.com/googleapis/google-cloud-python/blob/storage-1.14.0/storage/google/cloud/storage/bucket.py#L642

The link you pasted is for master branch and has not yet made through in release :) Hopefully they release it soon and we can remove blob.reload from our code

blob.reload()
blob_crc32c = blob.crc32c
self.log.info('The crc32c checksum of %s is %s', object, blob_crc32c)
return blob_crc32c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -193,16 +187,7 @@ def upload(self, bucket, object, filename,
:type mime_type: str
:param gzip: Option to compress file for upload
:type gzip: bool
:param multipart: If True, the upload will be split into multiple HTTP requests. The
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that multipart support is gone?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add that as well in Updating.md if you think other users might think the same as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a change in API of the operator so should go in UPDATING, yes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Added a comment on Multipart in Updating.md.

@@ -82,7 +83,7 @@ def execute(self, context):
object=self.object,
filename=self.filename)
if self.store_to_xcom_key:
if sys.getsizeof(file_bytes) < 48000:
if sys.getsizeof(file_bytes) < MAX_XCOM_SIZE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one

@codecov-io
Copy link

codecov-io commented Apr 8, 2019

Codecov Report

Merging #5054 into master will decrease coverage by 0.07%.
The diff coverage is 43.6%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #5054      +/-   ##
=========================================
- Coverage   76.98%   76.9%   -0.08%     
=========================================
  Files         463     455       -8     
  Lines       29806   29667     -139     
=========================================
- Hits        22945   22816     -129     
+ Misses       6861    6851      -10
Impacted Files Coverage Δ
airflow/models/xcom.py 80% <100%> (ø) ⬆️
airflow/contrib/hooks/gcs_hook.py 53.64% <43.07%> (-1.02%) ⬇️
airflow/contrib/operators/gcs_download_operator.py 88.46% <50%> (+0.46%) ⬆️
airflow/lineage/backend/atlas/__init__.py 72.41% <0%> (-15.09%) ⬇️
airflow/models/__init__.py 93% <0%> (-7%) ⬇️
airflow/operators/check_operator.py 91.79% <0%> (-0.86%) ⬇️
airflow/models/connection.py 65.53% <0%> (-0.2%) ⬇️
airflow/settings.py 84.25% <0%> (-0.13%) ⬇️
airflow/jobs.py 78.77% <0%> (-0.04%) ⬇️
... and 16 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update da1be99...d0739e1. Read the comment docs.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small comments

if blob_update_time > ts:
return True
else:
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'return False' is missing for if blob_udate_time is None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

return True
if gzip:
os.remove(filename)
self.log.info('File %s uploaded to %s in %s bucket', filename, object, bucket)

# pylint:disable=redefined-builtin
def exists(self, bucket, object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: maybe we can change the "object" name in the signature of functions. Since we are introducing backwards-incompatible changes anyway, that might be good time to get rid of the "object" redefinition and remove the pylint disable warnings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I have that PR ready. I am trying to keep the changes in this PR to be more on a backwards-compatible side.

The next PR will contain some breaking changes which will contain these name changes.

Copy link
Member

@mik-laj mik-laj Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what are the intentions of sharing one refactorization for a few PR's? This makes changes much more difficult to review. I see a reason if this change was backwards compatible, but it is not. We have a note in fleUpdating.md

Copy link
Member Author

@kaxil kaxil Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maintain intention is so that we can cherry-pick this one in 1.10.4.

If you look at this PR and check for "breaking-changes" - the one's that are there are not widely used (also in Updating.md).

I wouldn't want to change the name of something like the object parameter (or even bucket) and just put a note in Updating.md. We wont cherry-pick the 2nd PR for 1.10.4 and would target 2.0 instead.

They are fundamentally 2 separate pieces: This PR focuses on "Replacing discovery api with client api" and not on "updating parameter name". Also more readable in Changelog.

None of the changes in this PR remove or change any required parameter of any method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes perfect sense @kaxil 👍 . Thanks for explanation.

else:
return False

return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you approve this PR if you are ok with it :) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! done. I like it :)

@kaxil kaxil merged commit ec7c67f into apache:master Apr 9, 2019
@ashb
Copy link
Member

ashb commented Apr 12, 2019

@kaxil We probably shouldn't pull this in to 1.10.4 since it changes the function sig, should we?

cthenderson pushed a commit to cthenderson/apache-airflow that referenced this pull request Apr 16, 2019
andriisoldatenko pushed a commit to andriisoldatenko/airflow that referenced this pull request Jul 26, 2019
wmorris75 pushed a commit to modmed/incubator-airflow that referenced this pull request Jul 29, 2019
dharamsk pushed a commit to postmates/airflow that referenced this pull request Aug 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants