Skip to content

Commit

Permalink
Add oauth cc method
Browse files Browse the repository at this point in the history
  • Loading branch information
davidesner committed May 21, 2024
1 parent 20fc680 commit baa9981
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
3 changes: 2 additions & 1 deletion python-sync-actions/src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ def convert(cls, config_parameters: dict) -> Authentication | None:
'bearer': cls._convert_bearer,
'api-key': cls._convert_api_key,
'query': cls._convert_query,
'login': cls._convert_login
'login': cls._convert_login,
'oauth2': cls._convert_login
}

func = methods.get(auth_method)
Expand Down
49 changes: 48 additions & 1 deletion python-sync-actions/src/http_generic/auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import inspect
import json
from abc import ABC, abstractmethod
from typing import Callable, Union, Dict
from typing import Callable, Union, Dict, Literal
from urllib.parse import urlencode

import requests
Expand Down Expand Up @@ -264,3 +264,50 @@ def __call__(self, r):
r.url = f"{r.url}?{urlencode(self.api_request_query_parameters)}"
r.headers.update(self.api_request_headers)
return r


class OAuth20ClientCredentials(AuthMethodBase, AuthBase):
def __init__(self, login_endpoint: str,
client_secret: str,
client_id: str,
method: Literal['client_secret_post', 'client_secret_basic'] = 'client_secret_basic',
scopes: list[str] = None):
"""
Args:
login_endpoint:
client_secret:
client_id:
method: 'client_secret_post' or 'client_secret_basic'
scopes:
"""
self.login_endpoint = login_endpoint
self.method = method
self.client_secret = client_secret
self.client_id = client_id
self.scopes = scopes or []
self.auth_header = {}

def login(self) -> Union[AuthBase, Callable]:
data = {"grant_type": "client_credentials"}
auth = None
if self.scopes:
data['scope'] = ' '.join(self.scopes)

if self.method == 'client_secret_post':
data['client_id'] = self.client_id
data['client_secret'] = self.client_secret
elif self.method == 'client_secret_basic':
auth = (self.client_id, self.client_secret)

response = requests.request('POST', self.login_endpoint, data=data, auth=auth)

response.raise_for_status()

self.auth_header = {'Authorization': f"Bearer {response.json()['access_token']}"}

return self

def __call__(self, r):
r.headers.update(self.auth_header)
return r

0 comments on commit baa9981

Please sign in to comment.