diff --git a/AUTHORS b/AUTHORS index a1591b6da..c6e66453d 100644 --- a/AUTHORS +++ b/AUTHORS @@ -64,6 +64,7 @@ Jadiel Teófilo pySilver Łukasz Skarżyński Shaheed Haque +Peter Karman Vinay Karanam Eduardo Oliveira Andrea Greco diff --git a/CHANGELOG.md b/CHANGELOG.md index b66e0822d..da07b6cab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> +## [unreleased] + +## [2.0.0] unreleased + +### Changed +* #1093 (**Breaking**) Changed to implement [hashed](https://docs.djangoproject.com/en/stable/topics/auth/passwords/) + client_secret values. This is a **breaking change** that will migrate all your existing + cleartext `application.client_secret` values to be hashed with Django's default password hashing algorithm + and can not be reversed. When adding or modifying an Application in the Admin console, you must copy the + auto-generated or manually-entered `client_secret` before hitting Save. + + ## [1.7.0] 2022-01-23 ### Added diff --git a/docs/settings.rst b/docs/settings.rst index 01baaaf4b..0ba12df11 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -97,19 +97,19 @@ of those three can be a callable) must be passed here directly and classes must be instantiated (callables should accept request as their only argument). GRANT_MODEL -~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~ The import string of the class (model) representing your grants. Overwrite this value if you wrote your own implementation (subclass of ``oauth2_provider.models.Grant``). APPLICATION_ADMIN_CLASS -~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~ The import string of the class (model) representing your application admin class. Overwrite this value if you wrote your own implementation (subclass of ``oauth2_provider.admin.ApplicationAdmin``). ACCESS_TOKEN_ADMIN_CLASS -~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~ The import string of the class (model) representing your access token admin class. Overwrite this value if you wrote your own implementation (subclass of ``oauth2_provider.admin.AccessTokenAdmin``). @@ -121,7 +121,7 @@ Overwrite this value if you wrote your own implementation (subclass of ``oauth2_provider.admin.GrantAdmin``). REFRESH_TOKEN_ADMIN_CLASS -~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ The import string of the class (model) representing your refresh token admin class. Overwrite this value if you wrote your own implementation (subclass of ``oauth2_provider.admin.RefreshTokenAdmin``). @@ -154,7 +154,7 @@ If you don't change the validator code and don't run cleartokens all refresh tokens will last until revoked or the end of time. You should change this. REFRESH_TOKEN_GRACE_PERIOD_SECONDS -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The number of seconds between when a refresh token is first used when it is expired. The most common case of this for this is native mobile applications that run into issues of network connectivity during the refresh cycle and are @@ -178,7 +178,7 @@ See also: validator's rotate_refresh_token method can be overridden to make this when close to expiration, theoretically). REFRESH_TOKEN_GENERATOR -~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~ See `ACCESS_TOKEN_GENERATOR`. This is the same but for refresh tokens. Defaults to access token generator if not provided. @@ -265,7 +265,7 @@ Default: ``""`` The RSA private key used to sign OIDC ID tokens. If not set, OIDC is disabled. OIDC_RSA_PRIVATE_KEYS_INACTIVE -~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Default: ``[]`` An array of *inactive* RSA private keys. These keys are not used to sign tokens, @@ -276,7 +276,7 @@ This is useful for providing a smooth transition during key rotation. should be retained in this inactive list. OIDC_JWKS_MAX_AGE_SECONDS -~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ Default: ``3600`` The max-age value for the Cache-Control header on jwks_uri. @@ -354,9 +354,9 @@ load when clearing large batches of expired tokens. Settings imported from Django project --------------------------- +------------------------------------- USE_TZ -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~ Used to determine whether or not to make token expire dates timezone aware. diff --git a/oauth2_provider/migrations/0006_alter_application_client_secret.py b/oauth2_provider/migrations/0006_alter_application_client_secret.py new file mode 100644 index 000000000..88e148274 --- /dev/null +++ b/oauth2_provider/migrations/0006_alter_application_client_secret.py @@ -0,0 +1,31 @@ +from django.db import migrations +from django.contrib.auth.hashers import identify_hasher, make_password +import logging +import oauth2_provider.generators +import oauth2_provider.models + + +def forwards_func(apps, schema_editor): + """ + Forward migration touches every application.client_secret which will cause it to be hashed if not already the case. + """ + Application = apps.get_model('oauth2_provider', 'application') + applications = Application.objects.all() + for application in applications: + application.save(update_fields=['client_secret']) + + +class Migration(migrations.Migration): + + dependencies = [ + ('oauth2_provider', '0005_auto_20211222_2352'), + ] + + operations = [ + migrations.AlterField( + model_name='application', + name='client_secret', + field=oauth2_provider.models.ClientSecretField(blank=True, db_index=True, default=oauth2_provider.generators.generate_client_secret, help_text='Hashed on Save. Copy it now if this is a new secret.', max_length=255), + ), + migrations.RunPython(forwards_func), + ] diff --git a/oauth2_provider/models.py b/oauth2_provider/models.py index 2c9747ce8..1ded7a4e2 100644 --- a/oauth2_provider/models.py +++ b/oauth2_provider/models.py @@ -6,6 +6,7 @@ from django.apps import apps from django.conf import settings +from django.contrib.auth.hashers import identify_hasher, make_password from django.core.exceptions import ImproperlyConfigured from django.db import models, transaction from django.urls import reverse @@ -24,6 +25,20 @@ logger = logging.getLogger(__name__) +class ClientSecretField(models.CharField): + def pre_save(self, model_instance, add): + secret = getattr(model_instance, self.attname) + try: + hasher = identify_hasher(secret) + logger.debug(f"{model_instance}: {self.attname} is already hashed with {hasher}.") + except ValueError: + logger.debug(f"{model_instance}: {self.attname} is not hashed; hashing it now.") + hashed_secret = make_password(secret) + setattr(model_instance, self.attname, hashed_secret) + return hashed_secret + return super().pre_save(model_instance, add) + + class AbstractApplication(models.Model): """ An Application instance represents a Client on the Authorization server. @@ -90,8 +105,12 @@ class AbstractApplication(models.Model): ) client_type = models.CharField(max_length=32, choices=CLIENT_TYPES) authorization_grant_type = models.CharField(max_length=32, choices=GRANT_TYPES) - client_secret = models.CharField( - max_length=255, blank=True, default=generate_client_secret, db_index=True + client_secret = ClientSecretField( + max_length=255, + blank=True, + default=generate_client_secret, + db_index=True, + help_text=_("Hashed on Save. Copy it now if this is a new secret."), ) name = models.CharField(max_length=255, blank=True) skip_authorization = models.BooleanField(default=False) diff --git a/oauth2_provider/oauth2_validators.py b/oauth2_provider/oauth2_validators.py index 4d9480be1..00c5e7de0 100644 --- a/oauth2_provider/oauth2_validators.py +++ b/oauth2_provider/oauth2_validators.py @@ -12,6 +12,7 @@ import requests from django.conf import settings from django.contrib.auth import authenticate, get_user_model +from django.contrib.auth.hashers import check_password from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.db.models import Q @@ -123,7 +124,7 @@ def _authenticate_basic_auth(self, request): elif request.client.client_id != client_id: log.debug("Failed basic auth: wrong client id %s" % client_id) return False - elif request.client.client_secret != client_secret: + elif not check_password(client_secret, request.client.client_secret): log.debug("Failed basic auth: wrong client secret %s" % client_secret) return False else: @@ -148,7 +149,7 @@ def _authenticate_request_body(self, request): if self._load_application(client_id, request) is None: log.debug("Failed body auth: Application %s does not exists" % client_id) return False - elif request.client.client_secret != client_secret: + elif not check_password(client_secret, request.client.client_secret): log.debug("Failed body auth: wrong client secret %s" % client_secret) return False else: diff --git a/tests/conftest.py b/tests/conftest.py index a3274aa33..520b6cbac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,8 @@ Application = get_application_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + class OAuthSettingsWrapper: """ @@ -101,12 +103,14 @@ def application(): client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, algorithm=Application.RS256_ALGORITHM, + client_secret=CLEARTEXT_SECRET, ) @pytest.fixture def hybrid_application(application): application.authorization_grant_type = application.GRANT_OPENID_HYBRID + application.client_secret = CLEARTEXT_SECRET application.save() return application @@ -141,7 +145,7 @@ def oidc_tokens(oauth2_settings, application, test_user, client): "code": code, "redirect_uri": "http://example.org", "client_id": application.client_id, - "client_secret": application.client_secret, + "client_secret": CLEARTEXT_SECRET, "scope": "openid", }, ) diff --git a/tests/test_authorization_code.py b/tests/test_authorization_code.py index c9bef0f5c..91fd06bd1 100644 --- a/tests/test_authorization_code.py +++ b/tests/test_authorization_code.py @@ -34,6 +34,7 @@ URI_OOB = "urn:ietf:wg:oauth:2.0:oob" URI_OOB_AUTO = "urn:ietf:wg:oauth:2.0:oob:auto" +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" # mocking a protected resource view @@ -60,6 +61,7 @@ def setUp(self): user=self.dev_user, client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, + client_secret=CLEARTEXT_SECRET, ) def tearDown(self): @@ -677,7 +679,7 @@ def test_basic_auth(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -699,7 +701,7 @@ def test_refresh(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -744,7 +746,7 @@ def test_refresh_with_grace_period(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -795,7 +797,7 @@ def test_refresh_invalidates_old_tokens(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -827,7 +829,7 @@ def test_refresh_no_scopes(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -855,7 +857,7 @@ def test_refresh_bad_scopes(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -881,7 +883,7 @@ def test_refresh_fail_repeating_requests(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -911,7 +913,7 @@ def test_refresh_repeating_requests(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -948,7 +950,7 @@ def test_refresh_repeating_requests_non_rotating_tokens(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -977,7 +979,7 @@ def test_basic_auth_bad_authcode(self): "code": "BLAH", "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -989,7 +991,7 @@ def test_basic_auth_bad_granttype(self): self.client.login(username="test_user", password="123456") token_request_data = {"grant_type": "UNKNOWN", "code": "BLAH", "redirect_uri": "http://example.org"} - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -1014,7 +1016,7 @@ def test_basic_auth_grant_expired(self): "code": "BLAH", "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -1049,7 +1051,7 @@ def test_basic_auth_wrong_auth_type(self): "redirect_uri": "http://example.org", } - user_pass = "{0}:{1}".format(self.application.client_id, self.application.client_secret) + user_pass = "{0}:{1}".format(self.application.client_id, CLEARTEXT_SECRET) auth_string = base64.b64encode(user_pass.encode("utf-8")) auth_headers = { "HTTP_AUTHORIZATION": "Wrong " + auth_string.decode("utf-8"), @@ -1070,7 +1072,7 @@ def test_request_body_params(self): "code": authorization_code, "redirect_uri": "http://example.org", "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, } response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) @@ -1445,7 +1447,7 @@ def test_code_exchange_succeed_when_redirect_uri_match(self): "code": authorization_code, "redirect_uri": "http://example.org?foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1480,7 +1482,7 @@ def test_code_exchange_fails_when_redirect_uri_does_not_match(self): "code": authorization_code, "redirect_uri": "http://example.org?foo=baraa", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -1520,7 +1522,7 @@ def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_param "code": authorization_code, "redirect_uri": "http://example.com?bar=baz&foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1565,7 +1567,7 @@ def test_oob_as_html(self): "code": authorization_code, "redirect_uri": URI_OOB, "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, } response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) @@ -1605,7 +1607,7 @@ def test_oob_as_json(self): "code": authorization_code, "redirect_uri": URI_OOB_AUTO, "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, } response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) @@ -1681,7 +1683,7 @@ def test_id_token_code_exchange_succeed_when_redirect_uri_match_with_multiple_qu "code": authorization_code, "redirect_uri": "http://example.com?bar=baz&foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1715,7 +1717,7 @@ def test_id_token(self): "code": authorization_code, "redirect_uri": "http://example.org", "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "scope": "openid", } @@ -1761,7 +1763,7 @@ def test_resource_access_allowed(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -1819,7 +1821,7 @@ def test_id_token_resource_access_allowed(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) diff --git a/tests/test_client_credential.py b/tests/test_client_credential.py index 8159d55db..38265c3d9 100644 --- a/tests/test_client_credential.py +++ b/tests/test_client_credential.py @@ -1,6 +1,5 @@ import json from unittest.mock import patch -from urllib.parse import quote_plus import pytest from django.contrib.auth import get_user_model @@ -24,6 +23,8 @@ AccessToken = get_access_token_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "abcdefghijklmnopqrstuvwxyz1234567890" + # mocking a protected resource view class ResourceView(ProtectedResourceView): @@ -44,6 +45,7 @@ def setUp(self): user=self.dev_user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_CLIENT_CREDENTIALS, + client_secret=CLEARTEXT_SECRET, ) def tearDown(self): @@ -55,35 +57,28 @@ def tearDown(self): class TestClientCredential(BaseTest): def test_client_credential_access_allowed(self): """ - Request an access token using Client Credential Flow + Request an access token using Client Credential Flow with hashed secrets """ + self.assertNotEqual(self.application.client_secret, CLEARTEXT_SECRET) + token_request_data = { "grant_type": "client_credentials", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) - content = json.loads(response.content.decode("utf-8")) - access_token = content["access_token"] - - # use token to access the resource - auth_headers = { - "HTTP_AUTHORIZATION": "Bearer " + access_token, - } - request = self.factory.get("/fake-resource", **auth_headers) - request.user = self.test_user - - view = ResourceView.as_view() - response = view(request) - self.assertEqual(response, "This is a protected resource") + # secret mismatch should return a 401 + auth_headers = get_basic_auth_header(self.application.client_id, "not-the-secret") + response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) + self.assertEqual(response.status_code, 401) def test_client_credential_does_not_issue_refresh_token(self): token_request_data = { "grant_type": "client_credentials", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -93,7 +88,7 @@ def test_client_credential_does_not_issue_refresh_token(self): def test_client_credential_user_is_none_on_access_token(self): token_request_data = {"grant_type": "client_credentials"} - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -122,7 +117,7 @@ def test_extended_request(self): token_request_data = { "grant_type": "client_credentials", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -174,12 +169,11 @@ def test_client_resource_password_based(self): user=self.dev_user, client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_PASSWORD, + client_secret=CLEARTEXT_SECRET, ) token_request_data = {"grant_type": "password", "username": "test_user", "password": "123456"} - auth_headers = get_basic_auth_header( - quote_plus(self.application.client_id), quote_plus(self.application.client_secret) - ) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) diff --git a/tests/test_commands.py b/tests/test_commands.py index ff5deba4e..13b0eeb3d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,6 +1,7 @@ from io import StringIO from django.contrib.auth import get_user_model +from django.contrib.auth.hashers import check_password from django.core.management import call_command from django.core.management.base import CommandError from django.test import TestCase @@ -83,7 +84,7 @@ def test_application_created_with_client_secret(self): ) app = Application.objects.get() - self.assertEqual(app.client_secret, "SECRET") + self.assertTrue(check_password("SECRET", app.client_secret)) def test_application_created_with_client_id(self): call_command( diff --git a/tests/test_hybrid.py b/tests/test_hybrid.py index 4f9753979..3f4048698 100644 --- a/tests/test_hybrid.py +++ b/tests/test_hybrid.py @@ -30,6 +30,8 @@ RefreshToken = get_refresh_token_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + # mocking a protected resource view class ResourceView(ProtectedResourceView): @@ -62,6 +64,7 @@ def setUp(self): client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_OPENID_HYBRID, algorithm=Application.RS256_ALGORITHM, + client_secret=CLEARTEXT_SECRET, ) self.application.save() @@ -829,7 +832,7 @@ def test_basic_auth(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -850,7 +853,7 @@ def test_basic_auth_bad_authcode(self): "code": "BLAH", "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -862,7 +865,7 @@ def test_basic_auth_bad_granttype(self): self.client.login(username="hy_test_user", password="123456") token_request_data = {"grant_type": "UNKNOWN", "code": "BLAH", "redirect_uri": "http://example.org"} - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -887,7 +890,7 @@ def test_basic_auth_grant_expired(self): "code": "BLAH", "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -922,7 +925,7 @@ def test_basic_auth_wrong_auth_type(self): "redirect_uri": "http://example.org", } - user_pass = "{0}:{1}".format(self.application.client_id, self.application.client_secret) + user_pass = "{0}:{1}".format(self.application.client_id, CLEARTEXT_SECRET) auth_string = base64.b64encode(user_pass.encode("utf-8")) auth_headers = { "HTTP_AUTHORIZATION": "Wrong " + auth_string.decode("utf-8"), @@ -943,7 +946,7 @@ def test_request_body_params(self): "code": authorization_code, "redirect_uri": "http://example.org", "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, } response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data) @@ -1056,7 +1059,7 @@ def test_code_exchange_succeed_when_redirect_uri_match(self): "code": authorization_code, "redirect_uri": "http://example.org?foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1091,7 +1094,7 @@ def test_code_exchange_fails_when_redirect_uri_does_not_match(self): "code": authorization_code, "redirect_uri": "http://example.org?foo=baraa", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -1126,7 +1129,7 @@ def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_param "code": authorization_code, "redirect_uri": "http://example.com?bar=baz&foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1163,7 +1166,7 @@ def test_id_token_code_exchange_succeed_when_redirect_uri_match_with_multiple_qu "code": authorization_code, "redirect_uri": "http://example.com?bar=baz&foo=bar", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -1200,7 +1203,7 @@ def test_resource_access_allowed(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -1239,7 +1242,7 @@ def test_id_token_resource_access_allowed(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -1351,7 +1354,7 @@ def test_id_token_nonce_in_token_response(oauth2_settings, test_user, hybrid_app "code": code, "redirect_uri": "http://example.org", "client_id": hybrid_application.client_id, - "client_secret": hybrid_application.client_secret, + "client_secret": CLEARTEXT_SECRET, "scope": "openid", }, ) @@ -1422,7 +1425,7 @@ def test_claims_passed_to_code_generation( "code": code, "redirect_uri": "http://example.org", "client_id": hybrid_application.client_id, - "client_secret": hybrid_application.client_secret, + "client_secret": CLEARTEXT_SECRET, "scope": "openid", }, ) diff --git a/tests/test_introspection_view.py b/tests/test_introspection_view.py index 95374cda5..b19c521d5 100644 --- a/tests/test_introspection_view.py +++ b/tests/test_introspection_view.py @@ -17,6 +17,8 @@ AccessToken = get_access_token_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + @pytest.mark.usefixtures("oauth2_settings") @pytest.mark.oauth2_settings(presets.INTROSPECTION_SETTINGS) @@ -35,6 +37,7 @@ def setUp(self): user=self.test_user, client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, + client_secret=CLEARTEXT_SECRET, ) self.resource_server_token = AccessToken.objects.create( @@ -281,7 +284,7 @@ def test_view_post_notexisting_token(self): def test_view_post_valid_client_creds_basic_auth(self): """Test HTTP basic auth working""" - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post( reverse("oauth2_provider:introspect"), {"token": self.valid_token.token}, **auth_headers ) @@ -301,9 +304,7 @@ def test_view_post_valid_client_creds_basic_auth(self): def test_view_post_invalid_client_creds_basic_auth(self): """Must fail for invalid client credentials""" - auth_headers = get_basic_auth_header( - self.application.client_id, self.application.client_secret + "_so_wrong" - ) + auth_headers = get_basic_auth_header(self.application.client_id, f"{CLEARTEXT_SECRET}_so_wrong") response = self.client.post( reverse("oauth2_provider:introspect"), {"token": self.valid_token.token}, **auth_headers ) @@ -316,7 +317,7 @@ def test_view_post_valid_client_creds_plaintext(self): { "token": self.valid_token.token, "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, }, ) self.assertEqual(response.status_code, 200) @@ -340,7 +341,7 @@ def test_view_post_invalid_client_creds_plaintext(self): { "token": self.valid_token.token, "client_id": self.application.client_id, - "client_secret": self.application.client_secret + "_so_wrong", + "client_secret": f"{CLEARTEXT_SECRET}_so_wrong", }, ) self.assertEqual(response.status_code, 403) diff --git a/tests/test_oauth2_validators.py b/tests/test_oauth2_validators.py index 7997d3bca..fd06a1eda 100644 --- a/tests/test_oauth2_validators.py +++ b/tests/test_oauth2_validators.py @@ -15,6 +15,7 @@ from oauth2_provider.oauth2_validators import OAuth2Validator from . import presets +from .utils import get_basic_auth_header try: @@ -28,6 +29,8 @@ AccessToken = get_access_token_model() RefreshToken = get_refresh_token_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + @contextlib.contextmanager def always_invalid_token(): @@ -51,7 +54,7 @@ def setUp(self): self.validator = OAuth2Validator() self.application = Application.objects.create( client_id="client_id", - client_secret="client_secret", + client_secret=CLEARTEXT_SECRET, user=self.user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_PASSWORD, @@ -69,7 +72,7 @@ def test_authenticate_request_body(self): self.request.client_secret = "wrong_client_secret" self.assertFalse(self.validator._authenticate_request_body(self.request)) - self.request.client_secret = "client_secret" + self.request.client_secret = CLEARTEXT_SECRET self.assertTrue(self.validator._authenticate_request_body(self.request)) def test_extract_basic_auth(self): @@ -86,26 +89,22 @@ def test_extract_basic_auth(self): def test_authenticate_basic_auth(self): self.request.encoding = "utf-8" - # client_id:client_secret - self.request.headers = {"HTTP_AUTHORIZATION": "Basic Y2xpZW50X2lkOmNsaWVudF9zZWNyZXQ=\n"} + self.request.headers = get_basic_auth_header("client_id", CLEARTEXT_SECRET) self.assertTrue(self.validator._authenticate_basic_auth(self.request)) def test_authenticate_basic_auth_default_encoding(self): self.request.encoding = None - # client_id:client_secret - self.request.headers = {"HTTP_AUTHORIZATION": "Basic Y2xpZW50X2lkOmNsaWVudF9zZWNyZXQ=\n"} + self.request.headers = get_basic_auth_header("client_id", CLEARTEXT_SECRET) self.assertTrue(self.validator._authenticate_basic_auth(self.request)) def test_authenticate_basic_auth_wrong_client_id(self): self.request.encoding = "utf-8" - # wrong_id:client_secret - self.request.headers = {"HTTP_AUTHORIZATION": "Basic d3JvbmdfaWQ6Y2xpZW50X3NlY3JldA==\n"} + self.request.headers = get_basic_auth_header("wrong_id", CLEARTEXT_SECRET) self.assertFalse(self.validator._authenticate_basic_auth(self.request)) def test_authenticate_basic_auth_wrong_client_secret(self): self.request.encoding = "utf-8" - # client_id:wrong_secret - self.request.headers = {"HTTP_AUTHORIZATION": "Basic Y2xpZW50X2lkOndyb25nX3NlY3JldA==\n"} + self.request.headers = get_basic_auth_header("client_id", "wrong_secret") self.assertFalse(self.validator._authenticate_basic_auth(self.request)) def test_authenticate_basic_auth_not_b64_auth_string(self): @@ -116,7 +115,6 @@ def test_authenticate_basic_auth_not_b64_auth_string(self): def test_authenticate_basic_auth_invalid_b64_string(self): self.request.encoding = "utf-8" - # client_id:wrong_secret self.request.headers = {"HTTP_AUTHORIZATION": "Basic ZHVtbXk=:ZHVtbXk=\n"} self.assertFalse(self.validator._authenticate_basic_auth(self.request)) @@ -140,7 +138,7 @@ def test_client_authentication_required(self): self.assertTrue(self.validator.client_authentication_required(self.request)) self.request.headers = {} self.request.client_id = "client_id" - self.request.client_secret = "client_secret" + self.request.client_secret = CLEARTEXT_SECRET self.assertTrue(self.validator.client_authentication_required(self.request)) self.request.client_secret = "" self.assertFalse(self.validator.client_authentication_required(self.request)) @@ -327,7 +325,7 @@ def setUp(self): self.validator = OAuth2Validator() self.application = Application.objects.create( client_id="client_id", - client_secret="client_secret", + client_secret=CLEARTEXT_SECRET, user=self.user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_PASSWORD, diff --git a/tests/test_password.py b/tests/test_password.py index 953b076e2..ab0f49228 100644 --- a/tests/test_password.py +++ b/tests/test_password.py @@ -14,6 +14,8 @@ Application = get_application_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + # mocking a protected resource view class ResourceView(ProtectedResourceView): @@ -33,6 +35,7 @@ def setUp(self): user=self.dev_user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_PASSWORD, + client_secret=CLEARTEXT_SECRET, ) def tearDown(self): @@ -51,7 +54,7 @@ def test_get_token(self): "username": "test_user", "password": "123456", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 200) @@ -70,7 +73,7 @@ def test_bad_credentials(self): "username": "test_user", "password": "NOT_MY_PASS", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) self.assertEqual(response.status_code, 400) @@ -83,7 +86,7 @@ def test_password_resource_access_allowed(self): "username": "test_user", "password": "123456", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) diff --git a/tests/test_scopes.py b/tests/test_scopes.py index a310e223a..39601ed3b 100644 --- a/tests/test_scopes.py +++ b/tests/test_scopes.py @@ -18,6 +18,8 @@ Grant = get_grant_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + # mocking a protected resource view class ScopeResourceView(ScopedProtectedResourceView): @@ -67,6 +69,7 @@ def setUp(self): user=self.dev_user, client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, + client_secret=CLEARTEXT_SECRET, ) def tearDown(self): @@ -123,7 +126,7 @@ def test_scopes_save_in_access_token(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -159,7 +162,7 @@ def test_scopes_protection_valid(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -201,7 +204,7 @@ def test_scopes_protection_fail(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -243,7 +246,7 @@ def test_multi_scope_fail(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -285,7 +288,7 @@ def test_multi_scope_valid(self): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) @@ -326,7 +329,7 @@ def get_access_token(self, scopes): "code": authorization_code, "redirect_uri": "http://example.org", } - auth_headers = get_basic_auth_header(self.application.client_id, self.application.client_secret) + auth_headers = get_basic_auth_header(self.application.client_id, CLEARTEXT_SECRET) response = self.client.post(reverse("oauth2_provider:token"), data=token_request_data, **auth_headers) content = json.loads(response.content.decode("utf-8")) diff --git a/tests/test_token_revocation.py b/tests/test_token_revocation.py index 1ed1c9119..b4f5af7dd 100644 --- a/tests/test_token_revocation.py +++ b/tests/test_token_revocation.py @@ -13,6 +13,8 @@ RefreshToken = get_refresh_token_model() UserModel = get_user_model() +CLEARTEXT_SECRET = "1234567890abcdefghijklmnopqrstuvwxyz" + class BaseTest(TestCase): def setUp(self): @@ -26,6 +28,7 @@ def setUp(self): user=self.dev_user, client_type=Application.CLIENT_CONFIDENTIAL, authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, + client_secret=CLEARTEXT_SECRET, ) def tearDown(self): @@ -46,7 +49,7 @@ def test_revoke_access_token(self): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": tok.token, } url = reverse("oauth2_provider:revoke-token") @@ -93,7 +96,7 @@ def test_revoke_access_token_with_hint(self): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": tok.token, "token_type_hint": "access_token", } @@ -115,7 +118,7 @@ def test_revoke_access_token_with_invalid_hint(self): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": tok.token, "token_type_hint": "bad_hint", } @@ -139,7 +142,7 @@ def test_revoke_refresh_token(self): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": rtok.token, } @@ -164,7 +167,7 @@ def test_revoke_refresh_token_with_revoked_access_token(self): for token in (tok.token, rtok.token): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": token, } @@ -194,7 +197,7 @@ def test_revoke_token_with_wrong_hint(self): data = { "client_id": self.application.client_id, - "client_secret": self.application.client_secret, + "client_secret": CLEARTEXT_SECRET, "token": tok.token, "token_type_hint": "refresh_token", }