This repository has been archived by the owner on Feb 22, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 177
/
__init__.py
362 lines (264 loc) · 12 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# -*- coding: utf-8 -*-
"""
flask_jwt
~~~~~~~~~
Flask-JWT module
"""
import logging
import warnings
from collections import OrderedDict
from datetime import datetime, timedelta
from functools import wraps
import jwt
from flask import current_app, request, jsonify, _request_ctx_stack
from werkzeug.local import LocalProxy
__version__ = '0.3.2'
logger = logging.getLogger(__name__)
current_identity = LocalProxy(lambda: getattr(_request_ctx_stack.top, 'current_identity', None))
_jwt = LocalProxy(lambda: current_app.extensions['jwt'])
CONFIG_DEFAULTS = {
'JWT_DEFAULT_REALM': 'Login Required',
'JWT_AUTH_URL_RULE': '/auth',
'JWT_AUTH_ENDPOINT': 'jwt',
'JWT_AUTH_USERNAME_KEY': 'username',
'JWT_AUTH_PASSWORD_KEY': 'password',
'JWT_ALGORITHM': 'HS256',
'JWT_LEEWAY': timedelta(seconds=10),
'JWT_AUTH_HEADER_PREFIX': 'JWT',
'JWT_EXPIRATION_DELTA': timedelta(seconds=300),
'JWT_NOT_BEFORE_DELTA': timedelta(seconds=0),
'JWT_VERIFY_CLAIMS': ['signature', 'exp', 'nbf', 'iat'],
'JWT_REQUIRED_CLAIMS': ['exp', 'iat', 'nbf']
}
def _default_jwt_headers_handler(identity):
return None
def _default_jwt_payload_handler(identity):
iat = datetime.utcnow()
exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA')
nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
identity = getattr(identity, 'id') or identity['id']
return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}
def _default_jwt_encode_handler(identity):
secret = current_app.config['JWT_SECRET_KEY']
algorithm = current_app.config['JWT_ALGORITHM']
required_claims = current_app.config['JWT_REQUIRED_CLAIMS']
payload = _jwt.jwt_payload_callback(identity)
missing_claims = list(set(required_claims) - set(payload.keys()))
if missing_claims:
raise RuntimeError('Payload is missing required claims: %s' % ', '.join(missing_claims))
headers = _jwt.jwt_headers_callback(identity)
return jwt.encode(payload, secret, algorithm=algorithm, headers=headers)
def _default_jwt_decode_handler(token):
secret = current_app.config['JWT_SECRET_KEY']
algorithm = current_app.config['JWT_ALGORITHM']
leeway = current_app.config['JWT_LEEWAY']
verify_claims = current_app.config['JWT_VERIFY_CLAIMS']
required_claims = current_app.config['JWT_REQUIRED_CLAIMS']
options = {
'verify_' + claim: True
for claim in verify_claims
}
options.update({
'require_' + claim: True
for claim in required_claims
})
return jwt.decode(token, secret, options=options, algorithms=[algorithm], leeway=leeway)
def _default_request_handler():
auth_header_value = request.headers.get('Authorization', None)
auth_header_prefix = current_app.config['JWT_AUTH_HEADER_PREFIX']
if not auth_header_value:
return
parts = auth_header_value.split()
if parts[0].lower() != auth_header_prefix.lower():
raise JWTError('Invalid JWT header', 'Unsupported authorization type')
elif len(parts) == 1:
raise JWTError('Invalid JWT header', 'Token missing')
elif len(parts) > 2:
raise JWTError('Invalid JWT header', 'Token contains spaces')
return parts[1]
def _default_auth_request_handler():
data = request.get_json()
username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None)
password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None)
criterion = [username, password, len(data) == 2]
if not all(criterion):
raise JWTError('Bad Request', 'Invalid credentials')
identity = _jwt.authentication_callback(username, password)
if identity:
access_token = _jwt.jwt_encode_callback(identity)
return _jwt.auth_response_callback(access_token, identity)
else:
raise JWTError('Bad Request', 'Invalid credentials')
def _default_auth_response_handler(access_token, identity):
return jsonify({'access_token': access_token.decode('utf-8')})
def _default_jwt_error_handler(error):
logger.error(error)
return jsonify(OrderedDict([
('status_code', error.status_code),
('error', error.error),
('description', error.description),
])), error.status_code, error.headers
def _jwt_required(realm):
"""Does the actual work of verifying the JWT data in the current request.
This is done automatically for you by `jwt_required()` but you could call it manually.
Doing so would be useful in the context of optional JWT access in your APIs.
:param realm: an optional realm
"""
token = _jwt.request_callback()
if token is None:
raise JWTError('Authorization Required', 'Request does not contain an access token',
headers={'WWW-Authenticate': 'JWT realm="%s"' % realm})
try:
payload = _jwt.jwt_decode_callback(token)
except jwt.InvalidTokenError as e:
raise JWTError('Invalid token', str(e))
_request_ctx_stack.top.current_identity = identity = _jwt.identity_callback(payload)
if identity is None:
raise JWTError('Invalid JWT', 'User does not exist')
def jwt_required(realm=None):
"""View decorator that requires a valid JWT token to be present in the request
:param realm: an optional realm
"""
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
_jwt_required(realm or current_app.config['JWT_DEFAULT_REALM'])
return fn(*args, **kwargs)
return decorator
return wrapper
class JWTError(Exception):
def __init__(self, error, description, status_code=401, headers=None):
self.error = error
self.description = description
self.status_code = status_code
self.headers = headers
def __repr__(self):
return 'JWTError: %s' % self.error
def __str__(self):
return '%s. %s' % (self.error, self.description)
def encode_token():
return _jwt.encode_callback(_jwt.header_callback(), _jwt.payload_callback())
class JWT(object):
def __init__(self, app=None, authentication_handler=None, identity_handler=None):
self.authentication_callback = authentication_handler
self.identity_callback = identity_handler
self.auth_response_callback = _default_auth_response_handler
self.auth_request_callback = _default_auth_request_handler
self.jwt_encode_callback = _default_jwt_encode_handler
self.jwt_decode_callback = _default_jwt_decode_handler
self.jwt_headers_callback = _default_jwt_headers_handler
self.jwt_payload_callback = _default_jwt_payload_handler
self.jwt_error_callback = _default_jwt_error_handler
self.request_callback = _default_request_handler
if app is not None:
self.init_app(app)
def init_app(self, app):
for k, v in CONFIG_DEFAULTS.items():
app.config.setdefault(k, v)
app.config.setdefault('JWT_SECRET_KEY', app.config['SECRET_KEY'])
auth_url_rule = app.config.get('JWT_AUTH_URL_RULE', None)
if auth_url_rule:
if self.auth_request_callback == _default_auth_request_handler:
assert self.authentication_callback is not None, (
'an authentication_handler function must be defined when using the built in '
'authentication resource')
auth_url_options = app.config.get('JWT_AUTH_URL_OPTIONS', {'methods': ['POST']})
auth_url_options.setdefault('view_func', self.auth_request_callback)
app.add_url_rule(auth_url_rule, **auth_url_options)
app.errorhandler(JWTError)(self._jwt_error_callback)
if not hasattr(app, 'extensions'): # pragma: no cover
app.extensions = {}
app.extensions['jwt'] = self
def _jwt_error_callback(self, error):
return self.jwt_error_callback(error)
def authentication_handler(self, callback):
"""Specifies the identity handler function. This function receives two positional
arguments. The first being the username the second being the password. It should return an
object representing an authenticated identity. Example::
@jwt.authentication_handler
def authenticate(username, password):
user = User.query.filter(User.username == username).scalar()
if bcrypt.check_password_hash(user.password, password):
return user
:param callback: the identity handler function
"""
self.authentication_callback = callback
return callback
def identity_handler(self, callback):
"""Specifies the identity handler function. This function receives one positional argument
being the JWT payload. For example::
@jwt.identity_handler
def identify(payload):
return User.query.filter(User.id == payload['identity']).scalar()
:param callback: the identity handler function
"""
self.identity_callback = callback
return callback
def jwt_error_handler(self, callback):
"""Specifies the error handler function. Example::
@jwt.error_handler
def error_handler(e):
return "Something bad happened", 400
:param callback: the error handler function
"""
self.jwt_error_callback = callback
return callback
def auth_response_handler(self, callback):
"""Specifies the authentication response handler function.
:param callable callback: the auth response handler function
"""
self.auth_response_callback = callback
return callback
def auth_request_handler(self, callback):
"""Specifies the authentication response handler function.
:param callable callback: the auth request handler function
.. deprecated
"""
warnings.warn("This handler is deprecated. The recommended approach to have control over "
"the authentication resource is to disable the built-in resource by "
"setting JWT_AUTH_URL_RULE=None and registering your own authentication "
"resource directly on your application.", DeprecationWarning, stacklevel=2)
self.auth_request_callback = callback
return callback
def request_handler(self, callback):
"""Specifieds the request handler function. This function returns a JWT from the current
request.
:param callable callback: the request handler function
"""
self.request_callback = callback
return callback
def jwt_encode_handler(self, callback):
"""Specifies the encoding handler function. This function receives a payload and signs it.
:param callable callback: the encoding handler function
"""
self.jwt_encode_callback = callback
return callback
def jwt_decode_handler(self, callback):
"""Specifies the decoding handler function. This function receives a
signed payload and decodes it.
:param callable callback: the decoding handler function
"""
self.jwt_decode_callback = callback
return callback
def jwt_payload_handler(self, callback):
"""Specifies the JWT payload handler function. This function receives the return value from
the ``identity_handler`` function
Example::
@jwt.payload_handler
def make_payload(identity):
return {'user_id': identity.id}
:param callable callback: the payload handler function
"""
self.jwt_payload_callback = callback
return callback
def jwt_headers_handler(self, callback):
"""Specifies the JWT header handler function. This function receives the return value from
the ``identity_handler`` function.
Example::
@jwt.payload_handler
def make_payload(identity):
return {'user_id': identity.id}
:param callable callback: the payload handler function
"""
self.jwt_headers_callback = callback
return callback