Resolve "Implement a Oauth provider in Funkwhale"

This commit is contained in:
Eliot Berriot 2019-03-25 17:02:51 +01:00
commit 4c13d47387
54 changed files with 2811 additions and 249 deletions

View file

View file

@ -0,0 +1,79 @@
import pytest
import uuid
from django.urls import reverse
from funkwhale_api.users.oauth import scopes
# mutations
@pytest.mark.parametrize(
"name, url_kwargs, scope, method",
[
("api:v1:search", {}, "read:libraries", "get"),
("api:v1:artists-list", {}, "read:libraries", "get"),
("api:v1:albums-list", {}, "read:libraries", "get"),
("api:v1:tracks-list", {}, "read:libraries", "get"),
("api:v1:tracks-mutations", {"pk": 42}, "read:edits", "get"),
("api:v1:tags-list", {}, "read:libraries", "get"),
("api:v1:licenses-list", {}, "read:libraries", "get"),
("api:v1:moderation:content-filters-list", {}, "read:filters", "get"),
("api:v1:listen-detail", {"uuid": uuid.uuid4()}, "read:libraries", "get"),
("api:v1:uploads-list", {}, "read:libraries", "get"),
("api:v1:playlists-list", {}, "read:playlists", "get"),
("api:v1:playlist-tracks-list", {}, "read:playlists", "get"),
("api:v1:favorites:tracks-list", {}, "read:favorites", "get"),
("api:v1:history:listenings-list", {}, "read:listenings", "get"),
("api:v1:radios:radios-list", {}, "read:radios", "get"),
("api:v1:oauth:grants-list", {}, "read:security", "get"),
("api:v1:federation:inbox-list", {}, "read:notifications", "get"),
(
"api:v1:federation:libraries-detail",
{"uuid": uuid.uuid4()},
"read:libraries",
"get",
),
("api:v1:federation:library-follows-list", {}, "read:follows", "get"),
# admin / privileged stuff
("api:v1:instance:admin-settings-list", {}, "read:instance:settings", "get"),
(
"api:v1:manage:users:invitations-list",
{},
"read:instance:invitations",
"get",
),
("api:v1:manage:users:users-list", {}, "read:instance:users", "get"),
("api:v1:manage:library:uploads-list", {}, "read:instance:libraries", "get"),
("api:v1:manage:accounts-list", {}, "read:instance:accounts", "get"),
("api:v1:manage:federation:domains-list", {}, "read:instance:domains", "get"),
(
"api:v1:manage:moderation:instance-policies-list",
{},
"read:instance:policies",
"get",
),
],
)
def test_views_permissions(
name, url_kwargs, scope, method, mocker, logged_in_api_client
):
"""
Smoke tests to ensure viewsets are correctly protected
"""
url = reverse(name, kwargs=url_kwargs)
user_scopes = scopes.get_from_permissions(
**logged_in_api_client.user.get_permissions()
)
should_allow = mocker.patch(
"funkwhale_api.users.oauth.permissions.should_allow", return_value=False
)
handler = getattr(logged_in_api_client, method)
response = handler(url)
should_allow.assert_called_once_with(
required_scope=scope, request_scopes=user_scopes
)
assert response.status_code == 403, "{} on {} is not protected correctly!".format(
method, url
)

View file

@ -0,0 +1,21 @@
import pytest
from django import forms
from funkwhale_api.users import models
@pytest.mark.parametrize(
"uri",
["urn:ietf:wg:oauth:2.0:oob", "urn:ietf:wg:oauth:2.0:oob:auto", "http://test.com"],
)
def test_redirect_uris_oob(uri, db):
app = models.Application(redirect_uris=uri)
assert app.clean() is None
@pytest.mark.parametrize("uri", ["urn:ietf:wg:oauth:2.0:invalid", "noop"])
def test_redirect_uris_invalid(uri, db):
app = models.Application(redirect_uris=uri)
with pytest.raises(forms.ValidationError):
app.clean()

View file

@ -0,0 +1,241 @@
import pytest
from funkwhale_api.users.oauth import scopes
from funkwhale_api.users.oauth import permissions
@pytest.mark.parametrize(
"required_scope, request_scopes, expected",
[
(None, {}, True),
("write:profile", {"write"}, True),
("write:profile", {"read"}, False),
("write:profile", {"read:profile"}, False),
("write:profile", {"write:profile"}, True),
("read:profile", {"read"}, True),
("read:profile", {"write"}, False),
("read:profile", {"read:profile"}, True),
("read:profile", {"write:profile"}, False),
("write:profile", {"write"}, True),
("write:profile", {"read:profile"}, False),
("write:profile", {"write:profile"}, True),
("write:profile", {"write"}, True),
("write:profile", {"read:profile"}, False),
("write:profile", {"write:profile"}, True),
("write:profile", {"write"}, True),
("write:profile", {"read:profile"}, False),
("write:profile", {"write:profile"}, True),
],
)
def test_should_allow(required_scope, request_scopes, expected):
assert (
permissions.should_allow(
required_scope=required_scope, request_scopes=request_scopes
)
is expected
)
@pytest.mark.parametrize("method", ["OPTIONS", "HEAD"])
def test_scope_permission_safe_methods(method, mocker, factories):
view = mocker.Mock(required_scope="write:profile", anonymous_policy=False)
request = mocker.Mock(method=method)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is True
@pytest.mark.parametrize(
"policy, preference, expected",
[
(True, False, True),
(False, False, False),
("setting", True, False),
("setting", False, True),
],
)
def test_scope_permission_anonymous_policy(
policy, preference, expected, preferences, mocker, anonymous_user
):
preferences["common__api_authentication_required"] = preference
view = mocker.Mock(required_scope="libraries", anonymous_policy=policy)
request = mocker.Mock(method="GET", user=anonymous_user, actor=None)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is expected
def test_scope_permission_dict_no_required(mocker, anonymous_user):
view = mocker.Mock(
required_scope={"read": None, "write": "write:profile"},
anonymous_policy=True,
action="read",
)
request = mocker.Mock(method="GET", user=anonymous_user, actor=None)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is True
@pytest.mark.parametrize(
"required_scope, method, action, expected_scope",
[
("profile", "GET", "read", "read:profile"),
("profile", "POST", "write", "write:profile"),
({"read": "read:profile"}, "GET", "read", "read:profile"),
({"write": "write:profile"}, "POST", "write", "write:profile"),
],
)
def test_scope_permission_user(
required_scope, method, action, expected_scope, mocker, factories
):
user = factories["users.User"]()
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method=method, user=user, actor=None)
view = mocker.Mock(
required_scope=required_scope, anonymous_policy=False, action=action
)
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope=expected_scope,
request_scopes=scopes.get_from_permissions(**user.get_permissions()),
)
def test_scope_permission_token(mocker, factories):
token = factories["users.AccessToken"](
scope="write:profile read:playlists",
application__scope="write:profile read:playlists",
)
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", auth=token)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope="write:profile",
request_scopes={"write:profile", "read:playlists"},
)
def test_scope_permission_actor(mocker, factories, anonymous_user):
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(
method="POST", actor=factories["federation.Actor"](), user=anonymous_user
)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope="write:profile", request_scopes=scopes.FEDERATION_REQUEST_SCOPES
)
def test_scope_permission_token_anonymous_user_auth_required(
mocker, factories, anonymous_user, preferences
):
preferences["common__api_authentication_required"] = True
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", user=anonymous_user, actor=None)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is False
should_allow.assert_not_called()
def test_scope_permission_token_anonymous_user_auth_not_required(
mocker, factories, anonymous_user, preferences
):
preferences["common__api_authentication_required"] = False
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", user=anonymous_user, actor=None)
view = mocker.Mock(required_scope="profile", anonymous_policy="setting")
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope="write:profile", request_scopes=scopes.ANONYMOUS_SCOPES
)
def test_scope_permission_token_expired(mocker, factories, now):
token = factories["users.AccessToken"](
scope="profile:write playlists:read", expires=now
)
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", auth=token)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is False
should_allow.assert_not_called()
def test_scope_permission_token_no_user(mocker, factories, now):
token = factories["users.AccessToken"](
scope="profile:write playlists:read", user=None
)
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", auth=token)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) is False
should_allow.assert_not_called()
def test_scope_permission_token_honor_app_scopes(mocker, factories, now):
# token contains read access, but app scope only allows profile:write
token = factories["users.AccessToken"](
scope="write:profile read", application__scope="write:profile"
)
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", auth=token)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope="write:profile", request_scopes={"write:profile"}
)
def test_scope_permission_token_honor_allowed_app_scopes(mocker, factories, now):
mocker.patch.object(scopes, "OAUTH_APP_SCOPES", {"read:profile"})
token = factories["users.AccessToken"](
scope="write:profile read:profile read",
application__scope="write:profile read:profile read",
)
should_allow = mocker.patch.object(permissions, "should_allow")
request = mocker.Mock(method="POST", auth=token)
view = mocker.Mock(required_scope="profile", anonymous_policy=False)
p = permissions.ScopePermission()
assert p.has_permission(request, view) == should_allow.return_value
should_allow.assert_called_once_with(
required_scope="write:profile", request_scopes={"read:profile"}
)

View file

@ -0,0 +1,156 @@
import pytest
from funkwhale_api.users.oauth import scopes
@pytest.mark.parametrize(
"user_perms, expected",
[
(
# All permissions, so all scopes
{"moderation": True, "library": True, "settings": True},
{
"read:profile",
"write:profile",
"read:libraries",
"write:libraries",
"read:playlists",
"write:playlists",
"read:favorites",
"write:favorites",
"read:notifications",
"write:notifications",
"read:radios",
"write:radios",
"read:follows",
"write:follows",
"read:edits",
"write:edits",
"read:filters",
"write:filters",
"read:listenings",
"write:listenings",
"read:security",
"write:security",
"read:instance:policies",
"write:instance:policies",
"read:instance:accounts",
"write:instance:accounts",
"read:instance:domains",
"write:instance:domains",
"read:instance:settings",
"write:instance:settings",
"read:instance:users",
"write:instance:users",
"read:instance:invitations",
"write:instance:invitations",
"read:instance:edits",
"write:instance:edits",
"read:instance:libraries",
"write:instance:libraries",
},
),
(
{"moderation": True, "library": False, "settings": True},
{
"read:profile",
"write:profile",
"read:libraries",
"write:libraries",
"read:playlists",
"write:playlists",
"read:favorites",
"write:favorites",
"read:notifications",
"write:notifications",
"read:radios",
"write:radios",
"read:follows",
"write:follows",
"read:edits",
"write:edits",
"read:filters",
"write:filters",
"read:listenings",
"write:listenings",
"read:security",
"write:security",
"read:instance:policies",
"write:instance:policies",
"read:instance:accounts",
"write:instance:accounts",
"read:instance:domains",
"write:instance:domains",
"read:instance:settings",
"write:instance:settings",
"read:instance:users",
"write:instance:users",
"read:instance:invitations",
"write:instance:invitations",
},
),
(
{"moderation": True, "library": False, "settings": False},
{
"read:profile",
"write:profile",
"read:libraries",
"write:libraries",
"read:playlists",
"write:playlists",
"read:favorites",
"write:favorites",
"read:notifications",
"write:notifications",
"read:radios",
"write:radios",
"read:follows",
"write:follows",
"read:edits",
"write:edits",
"read:filters",
"write:filters",
"read:listenings",
"write:listenings",
"read:security",
"write:security",
"read:instance:policies",
"write:instance:policies",
"read:instance:accounts",
"write:instance:accounts",
"read:instance:domains",
"write:instance:domains",
},
),
(
{"moderation": False, "library": False, "settings": False},
{
"read:profile",
"write:profile",
"read:libraries",
"write:libraries",
"read:playlists",
"write:playlists",
"read:favorites",
"write:favorites",
"read:notifications",
"write:notifications",
"read:radios",
"write:radios",
"read:follows",
"write:follows",
"read:edits",
"write:edits",
"read:filters",
"write:filters",
"read:listenings",
"write:listenings",
"read:security",
"write:security",
},
),
],
)
def test_get_scopes_from_user_permissions(user_perms, expected):
assert scopes.get_from_permissions(**user_perms) == expected

View file

@ -0,0 +1,10 @@
from oauth2_provider import models
from funkwhale_api.users.oauth import tasks
def test_clear_expired_tokens(mocker, db):
clear_expired = mocker.spy(models, "clear_expired")
tasks.clear_expired_tokens()
clear_expired.assert_called_once()

View file

@ -0,0 +1,363 @@
import json
import pytest
from django.urls import reverse
from funkwhale_api.users import models
from funkwhale_api.users.oauth import serializers
def test_apps_post(api_client, db):
url = reverse("api:v1:oauth:apps-list")
data = {
"name": "Test app",
"redirect_uris": "http://test.app",
"scopes": "read write:profile",
}
response = api_client.post(url, data)
assert response.status_code == 201
app = models.Application.objects.get(name=data["name"])
assert app.client_type == models.Application.CLIENT_CONFIDENTIAL
assert app.authorization_grant_type == models.Application.GRANT_AUTHORIZATION_CODE
assert app.redirect_uris == data["redirect_uris"]
assert response.data == serializers.CreateApplicationSerializer(app).data
assert app.scope == "read write:profile"
assert app.user is None
def test_apps_post_logged_in_user(logged_in_api_client, db):
url = reverse("api:v1:oauth:apps-list")
data = {
"name": "Test app",
"redirect_uris": "http://test.app",
"scopes": "read write:profile",
}
response = logged_in_api_client.post(url, data)
assert response.status_code == 201
app = models.Application.objects.get(name=data["name"])
assert app.client_type == models.Application.CLIENT_CONFIDENTIAL
assert app.authorization_grant_type == models.Application.GRANT_AUTHORIZATION_CODE
assert app.redirect_uris == data["redirect_uris"]
assert response.data == serializers.CreateApplicationSerializer(app).data
assert app.scope == "read write:profile"
assert app.user == logged_in_api_client.user
def test_apps_list_anonymous(api_client, db):
url = reverse("api:v1:oauth:apps-list")
response = api_client.get(url)
assert response.status_code == 401
def test_apps_list_logged_in(factories, logged_in_api_client, db):
app = factories["users.Application"](user=logged_in_api_client.user)
factories["users.Application"]()
url = reverse("api:v1:oauth:apps-list")
response = logged_in_api_client.get(url)
assert response.status_code == 200
assert response.data["results"] == [serializers.ApplicationSerializer(app).data]
def test_apps_delete_not_owner(factories, logged_in_api_client, db):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.delete(url)
assert response.status_code == 404
def test_apps_delete_owner(factories, logged_in_api_client, db):
app = factories["users.Application"](user=logged_in_api_client.user)
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.delete(url)
assert response.status_code == 204
with pytest.raises(app.DoesNotExist):
app.refresh_from_db()
def test_apps_update_not_owner(factories, logged_in_api_client, db):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.patch(url, {"name": "Hello"})
assert response.status_code == 404
def test_apps_update_owner(factories, logged_in_api_client, db):
app = factories["users.Application"](user=logged_in_api_client.user)
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.patch(url, {"name": "Hello"})
assert response.status_code == 200
app.refresh_from_db()
assert app.name == "Hello"
def test_apps_get(preferences, logged_in_api_client, factories):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.get(url)
assert response.status_code == 200
assert response.data == serializers.ApplicationSerializer(app).data
def test_apps_get_owner(preferences, logged_in_api_client, factories):
app = factories["users.Application"](user=logged_in_api_client.user)
url = reverse("api:v1:oauth:apps-detail", kwargs={"client_id": app.client_id})
response = logged_in_api_client.get(url)
assert response.status_code == 200
assert response.data == serializers.CreateApplicationSerializer(app).data
def test_authorize_view_post(logged_in_client, factories):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": app.redirect_uris,
"client_id": app.client_id,
"state": "hello",
"response_type": "code",
"scope": "read",
},
)
grant = models.Grant.objects.get(application=app)
assert response.status_code == 302
assert response["Location"] == "{}?code={}&state={}".format(
app.redirect_uris, grant.code, "hello"
)
def test_authorize_view_post_ajax_no_redirect(logged_in_client, factories):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": app.redirect_uris,
"client_id": app.client_id,
"state": "hello",
"response_type": "code",
"scope": "read",
},
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
)
assert response.status_code == 200
grant = models.Grant.objects.get(application=app)
assert json.loads(response.content.decode()) == {
"redirect_uri": "{}?code={}&state={}".format(
app.redirect_uris, grant.code, "hello"
),
"code": grant.code,
}
def test_authorize_view_post_ajax_oob(logged_in_client, factories):
app = factories["users.Application"](redirect_uris="urn:ietf:wg:oauth:2.0:oob")
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": app.redirect_uris,
"client_id": app.client_id,
"state": "hello",
"response_type": "code",
"scope": "read",
},
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
)
assert response.status_code == 200
grant = models.Grant.objects.get(application=app)
assert json.loads(response.content.decode()) == {
"redirect_uri": "{}?code={}&state={}".format(
app.redirect_uris, grant.code, "hello"
),
"code": grant.code,
}
def test_authorize_view_invalid_form(logged_in_client, factories):
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": "",
"client_id": "Noop",
"state": "hello",
"response_type": "code",
"scope": "read",
},
)
assert response.status_code == 400
assert json.loads(response.content.decode()) == {
"redirect_uri": ["This field is required."]
}
def test_authorize_view_invalid_redirect_url(logged_in_client, factories):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": "http://wrong.url",
"client_id": app.client_id,
"state": "hello",
"response_type": "code",
"scope": "read",
},
)
assert response.status_code == 400
assert json.loads(response.content.decode()) == {
"detail": "Mismatching redirect URI."
}
def test_authorize_view_invalid_oauth(logged_in_client, factories):
app = factories["users.Application"]()
url = reverse("api:v1:oauth:authorize")
response = logged_in_client.post(
url,
{
"allow": True,
"redirect_uri": app.redirect_uris,
"client_id": "wrong_id",
"state": "hello",
"response_type": "code",
"scope": "read",
},
)
assert response.status_code == 400
assert json.loads(response.content.decode()) == {
"non_field_errors": ["Invalid application"]
}
def test_authorize_view_anonymous(client, factories):
url = reverse("api:v1:oauth:authorize")
response = client.post(url, {})
assert response.status_code == 401
def test_token_view_post(api_client, factories):
grant = factories["users.Grant"]()
app = grant.application
url = reverse("api:v1:oauth:token")
response = api_client.post(
url,
{
"redirect_uri": app.redirect_uris,
"client_id": app.client_id,
"client_secret": app.client_secret,
"grant_type": "authorization_code",
"code": grant.code,
},
)
payload = json.loads(response.content.decode())
assert "access_token" in payload
assert "refresh_token" in payload
assert payload["expires_in"] == 36000
assert payload["scope"] == grant.scope
assert payload["token_type"] == "Bearer"
assert response.status_code == 200
with pytest.raises(grant.DoesNotExist):
grant.refresh_from_db()
def test_revoke_view_post(logged_in_client, factories):
token = factories["users.AccessToken"]()
url = reverse("api:v1:oauth:revoke")
response = logged_in_client.post(
url,
{
"token": token.token,
"client_id": token.application.client_id,
"client_secret": token.application.client_secret,
},
)
assert response.status_code == 200
with pytest.raises(token.DoesNotExist):
token.refresh_from_db()
def test_grants_list(factories, logged_in_api_client):
token = factories["users.AccessToken"](user=logged_in_api_client.user)
refresh_token = factories["users.RefreshToken"](user=logged_in_api_client.user)
factories["users.AccessToken"]()
url = reverse("api:v1:oauth:grants-list")
expected = [
serializers.ApplicationSerializer(refresh_token.application).data,
serializers.ApplicationSerializer(token.application).data,
]
response = logged_in_api_client.get(url)
assert response.status_code == 200
assert response.data == expected
def test_grant_delete(factories, logged_in_api_client, mocker, now):
token = factories["users.AccessToken"](user=logged_in_api_client.user)
refresh_token = factories["users.RefreshToken"](
user=logged_in_api_client.user, application=token.application
)
grant = factories["users.Grant"](
user=logged_in_api_client.user, application=token.application
)
revoke_token = mocker.spy(token.__class__, "revoke")
revoke_refresh = mocker.spy(refresh_token.__class__, "revoke")
to_keep = [
factories["users.AccessToken"](application=token.application),
factories["users.RefreshToken"](application=token.application),
factories["users.Grant"](application=token.application),
]
url = reverse(
"api:v1:oauth:grants-detail", kwargs={"client_id": token.application.client_id}
)
response = logged_in_api_client.delete(url)
assert response.status_code == 204
revoke_token.assert_called_once()
revoke_refresh.assert_called_once()
with pytest.raises(token.DoesNotExist):
token.refresh_from_db()
with pytest.raises(grant.DoesNotExist):
grant.refresh_from_db()
refresh_token.refresh_from_db()
assert refresh_token.revoked == now
for t in to_keep:
t.refresh_from_db()

View file

@ -1,92 +0,0 @@
import pytest
from rest_framework.views import APIView
from funkwhale_api.users import permissions
def test_has_user_permission_no_user(api_request):
view = APIView.as_view()
permission = permissions.HasUserPermission()
request = api_request.get("/")
assert permission.has_permission(request, view) is False
def test_has_user_permission_anonymous(anonymous_user, api_request):
view = APIView.as_view()
permission = permissions.HasUserPermission()
request = api_request.get("/")
setattr(request, "user", anonymous_user)
assert permission.has_permission(request, view) is False
@pytest.mark.parametrize("value", [True, False])
def test_has_user_permission_logged_in_single(value, factories, api_request):
user = factories["users.User"](permission_moderation=value)
class View(APIView):
required_permissions = ["moderation"]
view = View()
permission = permissions.HasUserPermission()
request = api_request.get("/")
setattr(request, "user", user)
result = permission.has_permission(request, view)
assert result == user.has_permissions("moderation") == value
@pytest.mark.parametrize(
"moderation,library,expected",
[
(True, False, False),
(False, True, False),
(False, False, False),
(True, True, True),
],
)
def test_has_user_permission_logged_in_multiple_and(
moderation, library, expected, factories, api_request
):
user = factories["users.User"](
permission_moderation=moderation, permission_library=library
)
class View(APIView):
required_permissions = ["moderation", "library"]
permission_operator = "and"
view = View()
permission = permissions.HasUserPermission()
request = api_request.get("/")
setattr(request, "user", user)
result = permission.has_permission(request, view)
assert result == user.has_permissions("moderation", "library") == expected
@pytest.mark.parametrize(
"moderation,library,expected",
[
(True, False, True),
(False, True, True),
(False, False, False),
(True, True, True),
],
)
def test_has_user_permission_logged_in_multiple_or(
moderation, library, expected, factories, api_request
):
user = factories["users.User"](
permission_moderation=moderation, permission_library=library
)
class View(APIView):
required_permissions = ["moderation", "library"]
permission_operator = "or"
view = View()
permission = permissions.HasUserPermission()
request = api_request.get("/")
setattr(request, "user", user)
result = permission.has_permission(request, view)
has_permission_result = user.has_permissions("moderation", "library", operator="or")
assert result == has_permission_result == expected