-
Notifications
You must be signed in to change notification settings - Fork 13
/
IdentityProvider.py
119 lines (93 loc) · 4.58 KB
/
IdentityProvider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def getIdentityProvider(provider, appID, appSecret, roleARN):
from AmazonIdentityProvider import AmazonIdentityProvider
from FacebookIdentityProvider import FacebookIdentityProvider
from GoogleIdentityProvider import GoogleIdentityProvider
ip = None
if provider == 'google':
ip = GoogleIdentityProvider(appID, appSecret, roleARN)
elif provider == 'facebook':
ip = FacebookIdentityProvider(appID, appSecret, roleARN)
elif provider == 'amazon':
ip = AmazonIdentityProvider(appID, appSecret, roleARN)
assert ip != None
return ip
class IdentityProvider:
APP_ID = None
APP_SECRET = None
ROLE_ARN = None
def __init__(self, appID, appSecret, roleARN):
self.APP_ID = appID
self.APP_SECRET = appSecret
self.ROLE_ARN = roleARN
def oauthCallback(self,code):
# echange authorization code
print('--- exchanging token for code : ' + code)
token = self.doGetToken(code)
print('--- received token : ' + str(token))
# Call user service
print('--- Getting user Profile for access_token : ' + self.getAccessToken(token))
profile = self.doGetUserProfile(self.getAccessToken(token))
print('--- received profile : ' + str(profile))
# call AWS STS
print('--- Getting AWS Temp Credentials for token : ' + self.getIDToken(token))
credentials = self.doGetAccessCredentials(self.getIDToken(token), profile)
print('--- received credentials : ' + str(credentials))
return credentials, profile
def getAccessToken(self, token):
# default for all except FaceBook
return token['access_token']
def getIDToken(self,token):
# default for all, except Google, Facebook
return token['access_token']
def loginURL(self):
raise NotImplementedError("Please Implement this method in subclasses")
def doGetToken(self,code):
raise NotImplementedError("Please Implement this method in subclasses")
def doGetUserProfile(self,token):
raise NotImplementedError("Please Implement this method in subclasses")
def getRoleARN(self):
raise NotImplementedError("Please Implement this method in subclasses")
def doGetAccessCredentials(self, token, profile):
from boto.sts.connection import STSConnection
conn = STSConnection(anon=True, debug=1)
roleARN = self.getRoleARN()
email = profile['email'][:32] # Max 32 characters
providerID = ''
if profile['provider'] == 'Facebook':
providerID = 'graph.facebook.com'
elif profile['provider'] == 'Amazon':
providerID = 'www.amazon.com'
if providerID == '':
assumedRole = conn.assume_role_with_web_identity(role_arn=roleARN,
role_session_name=email,
web_identity_token=token)
else:
assumedRole = conn.assume_role_with_web_identity(role_arn=roleARN,
role_session_name=email,
web_identity_token=token,
provider_id=providerID)
return assumedRole.credentials.to_dict()
# def doGetAccessCredentials_NO_BOTO(self, token, profile):
# import urllib, urllib2, json
#
# # Let's subclass give us the role ARN
# roleARN = self.getRoleARN()
# email = profile['email']
#
# url = 'https://sts.amazonaws.com?Action=AssumeRoleWithWebIdentity'
# url = url + '&DurationSeconds=3600'
# url = url + '&RoleSessionName=' + email
# url = url + '&Version=2011-06-15'
# url = url + '&RoleArn=' + roleARN
# url = url + '&WebIdentityToken=' + token
#
# if profile['provider'] == 'Facebook':
# url = url + '&ProviderId=graph.facebook.com'
# elif profile['provider'] == 'Amazon':
# url = url + '&ProviderId=www.amazon.com'
#
# request = urllib2.Request(url, headers= {'Accept' : 'application/json'} )
# response = urllib2.urlopen(request)
# assumedRole = response.read()
# assumedRole = json.loads(assumedRole)
# return assumedRole['AssumeRoleWithWebIdentityResponse']['AssumeRoleWithWebIdentityResult']['Credentials']