-
Notifications
You must be signed in to change notification settings - Fork 0
/
backends.py
105 lines (80 loc) · 3.97 KB
/
backends.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import jwt
from django.conf import settings
from rest_framework import (
authentication,
exceptions
)
from users.models import User
SECRET_KEY = getattr(settings, 'SECRET_KEY', None)
class JWTAuthentication(authentication.BaseAuthentication):
authentication_header_prefix = 'Bearer'
def authenticate(self, request):
"""
The `authenticate` method is called on every request regardless of
whether the endpoint requires authentication.
`authenticate` has two possible return values:
1) `None` - We return `None` if we do not wish to authenticate. Usually
this means we know authentication will fail. An example of
this is when the request does not include a token in the
headers.
2) `(user, token)` - We return a user/token combination when
authentication is successful.
If neither case is met, that means there's an error
and we do not return anything.
We simple raise the `AuthenticationFailed`
exception and let Django REST Framework
handle the rest.
"""
request.user = None
# print('\033[32m' + 'start' + '\033[0m')
# `auth_header` should be an array with two elements: 1) the name of
# the authentication header (in this case, "Bearer") and 2) the JWT
# that we should authenticate against.
auth_header = authentication.get_authorization_header(request).split()
auth_header_prefix = self.authentication_header_prefix.lower()
if not auth_header:
return None
if len(auth_header) == 1:
# Invalid token header. No credentials provided. Do not attempt to authenticate.
return None
elif len(auth_header) > 2:
# Invalid token header. The Token string should not contain spaces. Do
# not attempt to authenticate.
return None
# The JWT library we're using can't handle the `byte` type, which is
# commonly used by standard libraries in Python 3. To get around this,
# we simply have to decode `prefix` and `token`. This does not make for
# clean code, but it is a good decision because we would get an error
# if we didn't decode these values.
prefix = auth_header[0].decode('utf-8')
token = auth_header[1].decode('utf-8')
prefix = auth_header[0].decode('utf-8') if isinstance(auth_header[0], bytes) else auth_header[0]
token = auth_header[1].decode('utf-8') if isinstance(auth_header[1], bytes) else auth_header[1]
if prefix.lower() != auth_header_prefix:
# The auth header prefix is not what we expected. Do not attempt to
# authenticate.
return None
# By now, we are sure there is a *chance* that authentication will
# succeed. We delegate the actual credentials authentication to the
# method below.
return self._authenticate_credentials(request, token)
def _authenticate_credentials(self, request, token):
"""
Try to authenticate the given credentials. If authentication is
successful, return the user and token. If not, throw an error.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except Exception as e:
msg = 'Invalid authentication. Could not decode token.'
raise exceptions.AuthenticationFailed()
try:
user = User.objects.get(pk=payload['id'])
except User.DoesNotExist:
msg = 'No user matching this token was found.'
raise exceptions.AuthenticationFailed(msg)
if not user.is_active:
msg = 'This user has been deactivated.'
raise exceptions.AuthenticationFailed(msg)
return (user, token)