From 29e8089eb9b58b3adaba8501c760e0d1c6f3f32d Mon Sep 17 00:00:00 2001 From: Fernando Marino Date: Wed, 20 Dec 2023 17:18:38 +0100 Subject: [PATCH 1/5] fix for https://github.com/datahub-project/datahub/issues/7914 adding http proxies support for authentication scheme other than bearer --- .../src/datahub/ingestion/source/openapi.py | 17 +++++++------ .../ingestion/source/openapi_parser.py | 24 ++++++++++++------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 3925ba51c16dd9..9a40fea69647cb 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -52,6 +52,7 @@ class OpenApiConfig(ConfigModel): ignore_endpoints: list = Field(default=[], description="") username: str = Field(default="", description="") password: str = Field(default="", description="") + proxies: dict = Field(default={"http": None, "https": None}, description="") forced_examples: dict = Field(default={}, description="") token: Optional[str] = Field(default=None, description="") get_token: dict = Field(default={}, description="") @@ -87,9 +88,10 @@ def get_swagger(self) -> Dict: password=self.password, tok_url=url4req, method=self.get_token["request_type"], + proxies=self.proxies ) sw_dict = get_swag_json( - self.url, token=self.token, swagger_file=self.swagger_file + self.url, token=self.token, swagger_file=self.swagger_file, proxies=self.proxies ) # load the swagger file else: # using basic auth for accessing endpoints @@ -98,6 +100,7 @@ def get_swagger(self) -> Dict: username=self.username, password=self.password, swagger_file=self.swagger_file, + proxies=self.proxies ) return sw_dict @@ -258,10 +261,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 tot_url = clean_url(config.url + self.url_basepath + endpoint_k) if config.token: - response = request_call(tot_url, token=config.token) + response = request_call(tot_url, token=config.token, proxies=config.proxies) else: response = request_call( - tot_url, username=config.username, password=config.password + tot_url, username=config.username, password=config.password, proxies=config.proxies ) if response.status_code == 200: fields2add, root_dataset_samples[dataset_name] = extract_fields( @@ -281,10 +284,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 url_guess = try_guessing(endpoint_k, root_dataset_samples) tot_url = clean_url(config.url + self.url_basepath + url_guess) if config.token: - response = request_call(tot_url, token=config.token) + response = request_call(tot_url, token=config.token, proxies=config.proxies) else: response = request_call( - tot_url, username=config.username, password=config.password + tot_url, username=config.username, password=config.password, proxies=config.proxies ) if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) @@ -304,10 +307,10 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 ) tot_url = clean_url(config.url + self.url_basepath + composed_url) if config.token: - response = request_call(tot_url, token=config.token) + response = request_call(tot_url, token=config.token, proxies=config.proxies) else: response = request_call( - tot_url, username=config.username, password=config.password + tot_url, username=config.username, password=config.password, proxies=config.proxies ) if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index 1ab40bc8be73d4..8e7e9a96f58727 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -51,6 +51,7 @@ def request_call( token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, + proxies: Optional[dict] = None ) -> requests.Response: headers = {"accept": "application/json"} @@ -60,8 +61,8 @@ def request_call( ) elif token is not None: - headers["Authorization"] = f"Bearer {token}" - return requests.get(url, headers=headers) + headers["Authorization"] = f"{token}" + return requests.get(url, proxies=proxies, headers=headers) else: return requests.get(url, headers=headers) @@ -72,12 +73,13 @@ def get_swag_json( username: Optional[str] = None, password: Optional[str] = None, swagger_file: str = "", + proxies: Optional[dict] = None ) -> Dict: tot_url = url + swagger_file if token is not None: - response = request_call(url=tot_url, token=token) + response = request_call(url=tot_url, token=token, proxies=proxies) else: - response = request_call(url=tot_url, username=username, password=password) + response = request_call(url=tot_url, username=username, password=password, proxies=proxies) if response.status_code != 200: raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}") @@ -251,7 +253,7 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str: attr_list=["2",]) asd2 == "http://asd.com/2" """ - splitted = re.split(r"\{[^}]+\}", raw_url) + splitted = re.split(r"\{[^}]+}", raw_url) if splitted[-1] == "": # it can happen that the last element is empty splitted = splitted[:-1] composed_url = "" @@ -265,7 +267,7 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str: def maybe_theres_simple_id(url: str) -> str: - dets = re.findall(r"(\{[^}]+\})", url) # searching the fields between parenthesis + dets = re.findall(r"(\{[^}]+})", url) # searching the fields between parenthesis if len(dets) == 0: return url dets_w_id = [det for det in dets if "id" in det] # the fields containing "id" @@ -349,6 +351,7 @@ def get_tok( password: str = "", tok_url: str = "", method: str = "post", + proxies: Optional[dict] = None ) -> str: """ Trying to post username/password to get auth. @@ -357,12 +360,15 @@ def get_tok( url4req = url + tok_url if method == "post": # this will make a POST call with username and password - data = {"username": username, "password": password} + data = {"username": username, "password": password, "maxDuration": True} # url2post = url + "api/authenticate/" - response = requests.post(url4req, data=data) + response = requests.post(url4req, proxies=proxies, json=data) if response.status_code == 200: cont = json.loads(response.content) - token = cont["tokens"]["access"] + if "token" in cont: # other authentication scheme + token = cont["token"] + else: # works only for bearer authentication scheme + token = "Bearer" + cont["tokens"]["access"] elif method == "get": # this will make a GET call with username and password response = requests.get(url4req) From 2768175f0e22f2847905e53829c386cf66328af9 Mon Sep 17 00:00:00 2001 From: Fernando Marino Date: Thu, 21 Dec 2023 08:34:58 +0100 Subject: [PATCH 2/5] incorporating PR comments --- metadata-ingestion/src/datahub/ingestion/source/openapi.py | 6 +++++- .../src/datahub/ingestion/source/openapi_parser.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 9a40fea69647cb..825b41c43a16b7 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -52,7 +52,11 @@ class OpenApiConfig(ConfigModel): ignore_endpoints: list = Field(default=[], description="") username: str = Field(default="", description="") password: str = Field(default="", description="") - proxies: dict = Field(default={"http": None, "https": None}, description="") + proxies: dict = Field(default=None, description="Eg. " + "'http': 'http://10.10.1.10:3128'" + "'https': 'http://10.10.1.10:1080'" + "If authentication is required, add it to the Url itself as " + "http://user:pass@10.10.1.10:3128/") forced_examples: dict = Field(default={}, description="") token: Optional[str] = Field(default=None, description="") get_token: dict = Field(default={}, description="") diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index 8e7e9a96f58727..ff1754262c0e8f 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -368,7 +368,7 @@ def get_tok( if "token" in cont: # other authentication scheme token = cont["token"] else: # works only for bearer authentication scheme - token = "Bearer" + cont["tokens"]["access"] + token = f"Bearer {cont['tokens']['access']}" elif method == "get": # this will make a GET call with username and password response = requests.get(url4req) From 2a9f1a42023cf576b7c5a33c2e567082b0915669 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 21 Dec 2023 13:54:59 -0500 Subject: [PATCH 3/5] Apply suggestions from code review --- .../src/datahub/ingestion/source/openapi.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 825b41c43a16b7..b80bc462760dca 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -52,11 +52,11 @@ class OpenApiConfig(ConfigModel): ignore_endpoints: list = Field(default=[], description="") username: str = Field(default="", description="") password: str = Field(default="", description="") - proxies: dict = Field(default=None, description="Eg. " + proxies: dict = Field(default=None, description="Eg. `{" "'http': 'http://10.10.1.10:3128'" - "'https': 'http://10.10.1.10:1080'" - "If authentication is required, add it to the Url itself as " - "http://user:pass@10.10.1.10:3128/") + "'https': 'http://10.10.1.10:1080'}`" + "If authentication is required, add it to the proxy url directly e.g. " + "`http://user:pass@10.10.1.10:3128/`.") forced_examples: dict = Field(default={}, description="") token: Optional[str] = Field(default=None, description="") get_token: dict = Field(default={}, description="") From a02ac484e0203e2b467787298f02adc810d56e96 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 21 Dec 2023 13:56:50 -0500 Subject: [PATCH 4/5] format --- .../src/datahub/ingestion/source/openapi.py | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index b80bc462760dca..ad62ef7362aebd 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -52,11 +52,13 @@ class OpenApiConfig(ConfigModel): ignore_endpoints: list = Field(default=[], description="") username: str = Field(default="", description="") password: str = Field(default="", description="") - proxies: dict = Field(default=None, description="Eg. `{" - "'http': 'http://10.10.1.10:3128'" - "'https': 'http://10.10.1.10:1080'}`" - "If authentication is required, add it to the proxy url directly e.g. " - "`http://user:pass@10.10.1.10:3128/`.") + proxies: Optional[dict] = Field( + default=None, + description="Eg. " + "`{'http': 'http://10.10.1.10:3128', 'https': 'http://10.10.1.10:1080'}`." + "If authentication is required, add it to the proxy url directly e.g. " + "`http://user:pass@10.10.1.10:3128/`.", + ) forced_examples: dict = Field(default={}, description="") token: Optional[str] = Field(default=None, description="") get_token: dict = Field(default={}, description="") @@ -92,10 +94,13 @@ def get_swagger(self) -> Dict: password=self.password, tok_url=url4req, method=self.get_token["request_type"], - proxies=self.proxies + proxies=self.proxies, ) sw_dict = get_swag_json( - self.url, token=self.token, swagger_file=self.swagger_file, proxies=self.proxies + self.url, + token=self.token, + swagger_file=self.swagger_file, + proxies=self.proxies, ) # load the swagger file else: # using basic auth for accessing endpoints @@ -104,7 +109,7 @@ def get_swagger(self) -> Dict: username=self.username, password=self.password, swagger_file=self.swagger_file, - proxies=self.proxies + proxies=self.proxies, ) return sw_dict @@ -265,10 +270,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 tot_url = clean_url(config.url + self.url_basepath + endpoint_k) if config.token: - response = request_call(tot_url, token=config.token, proxies=config.proxies) + response = request_call( + tot_url, token=config.token, proxies=config.proxies + ) else: response = request_call( - tot_url, username=config.username, password=config.password, proxies=config.proxies + tot_url, + username=config.username, + password=config.password, + proxies=config.proxies, ) if response.status_code == 200: fields2add, root_dataset_samples[dataset_name] = extract_fields( @@ -288,10 +298,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 url_guess = try_guessing(endpoint_k, root_dataset_samples) tot_url = clean_url(config.url + self.url_basepath + url_guess) if config.token: - response = request_call(tot_url, token=config.token, proxies=config.proxies) + response = request_call( + tot_url, token=config.token, proxies=config.proxies + ) else: response = request_call( - tot_url, username=config.username, password=config.password, proxies=config.proxies + tot_url, + username=config.username, + password=config.password, + proxies=config.proxies, ) if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) @@ -311,10 +326,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 ) tot_url = clean_url(config.url + self.url_basepath + composed_url) if config.token: - response = request_call(tot_url, token=config.token, proxies=config.proxies) + response = request_call( + tot_url, token=config.token, proxies=config.proxies + ) else: response = request_call( - tot_url, username=config.username, password=config.password, proxies=config.proxies + tot_url, + username=config.username, + password=config.password, + proxies=config.proxies, ) if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) From 63dff7cb3f5433f5549bf1c34d4ac5e69e0d877e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 27 Dec 2023 14:50:46 -0500 Subject: [PATCH 5/5] fix lint --- .../src/datahub/ingestion/source/openapi_parser.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index ff1754262c0e8f..84bb3ad4526117 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -51,7 +51,7 @@ def request_call( token: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, - proxies: Optional[dict] = None + proxies: Optional[dict] = None, ) -> requests.Response: headers = {"accept": "application/json"} @@ -73,13 +73,15 @@ def get_swag_json( username: Optional[str] = None, password: Optional[str] = None, swagger_file: str = "", - proxies: Optional[dict] = None + proxies: Optional[dict] = None, ) -> Dict: tot_url = url + swagger_file if token is not None: response = request_call(url=tot_url, token=token, proxies=proxies) else: - response = request_call(url=tot_url, username=username, password=password, proxies=proxies) + response = request_call( + url=tot_url, username=username, password=password, proxies=proxies + ) if response.status_code != 200: raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}") @@ -351,7 +353,7 @@ def get_tok( password: str = "", tok_url: str = "", method: str = "post", - proxies: Optional[dict] = None + proxies: Optional[dict] = None, ) -> str: """ Trying to post username/password to get auth.