Skip to content
This repository was archived by the owner on May 26, 2020. It is now read-only.

Added a JWT verification view #75

Merged
merged 4 commits into from
Mar 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ Refresh with tokens can be repeated (token1 -> token2 -> token3), but this chain

A typical use case might be a web app where you'd like to keep the user "logged in" the site without having to re-enter their password, or get kicked out by surprise before their token expired. Imagine they had a 1-hour token and are just at the last minute while they're still doing something. With mobile you could perhaps store the username/password to get a new token, but this is not a great idea in a browser. Each time the user loads the page, you can check if there is an existing non-expired token and if it's close to being expired, refresh it to extend their session. In other words, if a user is actively using your site, they can keep their "session" alive.

## Verify Token

In some microservice architectures, authentication is handled by a single service. Other services delegate the responsibility of confirming that a user is logged in to this authentication service. This usually means that a service will pass a JWT received from the user to the authentication service, and wait for a confirmation that the JWT is valid before returning protected resources to the user.

This setup is supported in this package using a verification endpoint. Add the following URL pattern:
```python
url(r'^api-token-verify/', 'rest_framework_jwt.views.verify_jwt_token'),
```

Passing a token to the verification endpoint will return a 200 response and the token if it is valid. Otherwise, it will return a 400 Bad Request as well as an error identifying why the token was invalid.

```bash
$ curl -X POST -H "Content-Type: application/json" -d '{"token":"<EXISTING_TOKEN>"}' http://localhost:8000/api-token-verify/
```

## Additional Settings
There are some additional settings that you can override similar to how you'd do it with Django REST framework itself. Here are all the available defaults.

Expand Down
45 changes: 41 additions & 4 deletions rest_framework_jwt/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,17 @@ def validate(self, attrs):
raise serializers.ValidationError(msg)


class RefreshJSONWebTokenSerializer(Serializer):
class VerificationBaseSerializer(Serializer):
"""
Check an access token
Abstract serializer used for verifying and refreshing JWTs.
"""
token = serializers.CharField()

def validate(self, attrs):
User = utils.get_user_model()
token = attrs['token']
msg = 'Please define a validate method.'
raise NotImplementedError(msg)

def _check_payload(self, token):
# Check payload valid (based off of JSONWebTokenAuthentication,
# may want to refactor)
try:
Expand All @@ -102,6 +103,10 @@ def validate(self, attrs):
msg = _('Error decoding signature.')
raise serializers.ValidationError(msg)

return payload

def _check_user(self, payload):
User = utils.get_user_model()
# Make sure user exists (may want to refactor this)
try:
user_id = jwt_get_user_id_from_payload(payload)
Expand All @@ -115,6 +120,38 @@ def validate(self, attrs):
msg = _("User doesn't exist.")
raise serializers.ValidationError(msg)

return user


class VerifyJSONWebTokenSerializer(VerificationBaseSerializer):
"""
Check the veracity of an access token.
"""

def validate(self, attrs):
token = attrs['token']

payload = self._check_payload(token=token)
user = self._check_user(payload=payload)

new_payload = jwt_payload_handler(user)

return {
'token': jwt_encode_handler(new_payload),
'user': user
}


class RefreshJSONWebTokenSerializer(VerificationBaseSerializer):
"""
Refresh an access token.
"""

def validate(self, attrs):
token = attrs['token']

payload = self._check_payload(token=token)
user = self._check_user(payload=payload)
# Get and check 'orig_iat'
orig_iat = payload.get('orig_iat')

Expand Down
49 changes: 25 additions & 24 deletions rest_framework_jwt/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@

from rest_framework_jwt.settings import api_settings

from .serializers import JSONWebTokenSerializer, RefreshJSONWebTokenSerializer
from .serializers import (
JSONWebTokenSerializer, RefreshJSONWebTokenSerializer,
VerifyJSONWebTokenSerializer
)

jwt_response_payload_handler = api_settings.JWT_RESPONSE_PAYLOAD_HANDLER


class ObtainJSONWebToken(APIView):
class JSONWebTokenAPIView(APIView):
"""
API View that receives a POST with a user's username and password.

Returns a JSON Web Token that can be used for authenticated requests.
Base API View that various JWT interactions inherit from.
"""
throttle_classes = ()
permission_classes = ()
authentication_classes = ()
parser_classes = (parsers.FormParser, parsers.JSONParser,)
renderer_classes = (renderers.JSONRenderer,)
serializer_class = JSONWebTokenSerializer

def post(self, request):
serializer = self.serializer_class(data=request.DATA)
Expand All @@ -37,33 +37,34 @@ def post(self, request):
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)


class RefreshJSONWebToken(APIView):
class ObtainJSONWebToken(JSONWebTokenAPIView):
"""
API View that receives a POST with a user's username and password.

Returns a JSON Web Token that can be used for authenticated requests.
"""
serializer_class = JSONWebTokenSerializer


class VerifyJSONWebToken(JSONWebTokenAPIView):
"""
API View that checks the veracity of a token, returning the token if it
is valid.
"""
serializer_class = VerifyJSONWebTokenSerializer


class RefreshJSONWebToken(JSONWebTokenAPIView):
"""
API View that returns a refreshed token (with new expiration) based on
existing token

If 'orig_iat' field (original issued-at-time) is found, will first check
if it's within expiration window, then copy it to the new token
"""
throttle_classes = ()
permission_classes = ()
authentication_classes = ()
parser_classes = (parsers.FormParser, parsers.JSONParser,)
renderer_classes = (renderers.JSONRenderer,)
serializer_class = RefreshJSONWebTokenSerializer

def post(self, request):
serializer = self.serializer_class(data=request.DATA)

if serializer.is_valid():
user = serializer.object.get('user') or request.user
token = serializer.object.get('token')
response_data = jwt_response_payload_handler(token, user, request)

return Response(response_data)

return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)


obtain_jwt_token = ObtainJSONWebToken.as_view()
refresh_jwt_token = RefreshJSONWebToken.as_view()
verify_jwt_token = VerifyJSONWebToken.as_view()
111 changes: 85 additions & 26 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
'',
(r'^auth-token/$', 'rest_framework_jwt.views.obtain_jwt_token'),
(r'^auth-token-refresh/$', 'rest_framework_jwt.views.refresh_jwt_token'),
(r'^auth-token-verify/$', 'rest_framework_jwt.views.verify_jwt_token'),

)

orig_datetime = datetime
Expand Down Expand Up @@ -204,20 +206,18 @@ def test_jwt_login_json_bad_creds(self):
self.assertEqual(response.status_code, 400)


class RefreshJSONWebTokenTests(BaseTestCase):
urls = 'tests.test_views'

def setUp(self):
super(RefreshJSONWebTokenTests, self).setUp()
api_settings.JWT_ALLOW_REFRESH = True
class TokenTestCase(BaseTestCase):
"""
Handlers for getting tokens from the API, or creating arbitrary ones.
"""

def get_token(self):
client = APIClient(enforce_csrf_checks=True)
response = client.post('/auth-token/', self.data, format='json')
return response.data['token']

def create_token(self, user, exp=None, orig_iat=None):
payload = utils.jwt_payload_handler(self.user)
payload = utils.jwt_payload_handler(user)
if exp:
payload['exp'] = exp

Expand All @@ -227,6 +227,84 @@ def create_token(self, user, exp=None, orig_iat=None):
token = utils.jwt_encode_handler(payload)
return token


class VerifyJSONWebTokenTests(TokenTestCase):

def test_verify_jwt(self):
"""
Test that a valid, non-expired token will return a 200 response
and itself when passed to the validation endpoint.
"""
client = APIClient(enforce_csrf_checks=True)

orig_token = self.get_token()

# Now try to get a refreshed token
response = client.post('/auth-token-verify/', {'token': orig_token},
format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)

self.assertEqual(response.data['token'], orig_token)

def test_verify_jwt_fails_with_expired_token(self):
"""
Test that an expired token will fail with the correct error.
"""
client = APIClient(enforce_csrf_checks=True)

# Make an expired token..
token = self.create_token(
self.user,
exp=datetime.utcnow() - timedelta(seconds=5),
orig_iat=datetime.utcnow() - timedelta(hours=1)
)

response = client.post('/auth-token-verify/', {'token': token},
format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegexpMatches(response.data['non_field_errors'][0],
'Signature has expired')

def test_verify_jwt_fails_with_bad_token(self):
"""
Test that an invalid token will fail with the correct error.
"""
client = APIClient(enforce_csrf_checks=True)

token = "i am not a correctly formed token"

response = client.post('/auth-token-verify/', {'token': token},
format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegexpMatches(response.data['non_field_errors'][0],
'Error decoding signature')

def test_verify_jwt_fails_with_missing_user(self):
"""
Test that an invalid token will fail with a user that does not exist.
"""
client = APIClient(enforce_csrf_checks=True)

user = User.objects.create_user(
email='jsmith@example.com', username='jsmith', password='password')

token = self.create_token(user)
# Delete the user used to make the token
user.delete()

response = client.post('/auth-token-verify/', {'token': token},
format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegexpMatches(response.data['non_field_errors'][0],
"User doesn't exist")


class RefreshJSONWebTokenTests(TokenTestCase):

def setUp(self):
super(RefreshJSONWebTokenTests, self).setUp()
api_settings.JWT_ALLOW_REFRESH = True

def test_refresh_jwt(self):
"""
Test getting a refreshed token from original token works
Expand Down Expand Up @@ -257,25 +335,6 @@ def test_refresh_jwt(self):
self.assertEquals(new_token_decoded['orig_iat'], orig_iat)
self.assertGreater(new_token_decoded['exp'], orig_token_decoded['exp'])

def test_refresh_jwt_fails_with_expired_token(self):
"""
Test that using an expired token to refresh won't work
"""
client = APIClient(enforce_csrf_checks=True)

# Make an expired token..
token = self.create_token(
self.user,
exp=datetime.utcnow() - timedelta(seconds=5),
orig_iat=datetime.utcnow() - timedelta(hours=1)
)

response = client.post('/auth-token-refresh/', {'token': token},
format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegexpMatches(response.data['non_field_errors'][0],
'Signature has expired')

def test_refresh_jwt_after_refresh_expiration(self):
"""
Test that token can't be refreshed after token refresh limit
Expand Down