音楽で楽しみましょう!-Let's have fun with music!-

Signed-off-by: Shin'ya Minazuki <shinyoukai@laidback.moe>
This commit is contained in:
Shin'ya Minazuki 2026-01-24 16:16:49 -03:00
commit 54c6d22102
517 changed files with 637 additions and 639 deletions

View file

@ -1,121 +0,0 @@
from django.core.exceptions import ImproperlyConfigured
from rest_framework import permissions
from funkwhale_api.common import preferences
from .. import models
from . import scopes
def normalize(*scope_ids):
"""
Given an iterable containing scopes ids such as {read, write:playlists}
will return a set containing all the leaf scopes (and no parent scopes)
"""
final = set()
for scope_id in scope_ids:
try:
scope_obj = scopes.SCOPES_BY_ID[scope_id]
except KeyError:
continue
if scope_obj.children:
final = final | {s.id for s in scope_obj.children}
else:
final.add(scope_obj.id)
return final
def should_allow(required_scope, request_scopes):
if not required_scope:
return True
if not request_scopes:
return False
return required_scope in normalize(*request_scopes)
METHOD_SCOPE_MAPPING = {
"get": "read",
"post": "write",
"patch": "write",
"put": "write",
"delete": "write",
}
class ScopePermission(permissions.BasePermission):
def has_permission(self, request, view):
if request.method.lower() in ["options", "head"]:
return True
scope_config = getattr(view, "required_scope", "noopscope")
anonymous_policy = getattr(view, "anonymous_policy", False)
if anonymous_policy not in [True, False, "setting"]:
raise ImproperlyConfigured(
f"{anonymous_policy} is not a valid value for anonymous_policy"
)
if isinstance(scope_config, str):
scope_config = {
"read": f"read:{scope_config}",
"write": f"write:{scope_config}",
}
action = METHOD_SCOPE_MAPPING[request.method.lower()]
required_scope = scope_config[action]
else:
# we have a dict with explicit viewset actions / scopes
required_scope = scope_config[view.action]
token = request.auth
if isinstance(token, models.AccessToken):
return self.has_permission_token(token, required_scope)
elif getattr(request, "scopes", None):
return should_allow(
required_scope=required_scope, request_scopes=set(request.scopes)
)
elif request.user.is_authenticated:
user_scopes = scopes.get_from_permissions(**request.user.get_permissions())
return should_allow(
required_scope=required_scope, request_scopes=user_scopes
)
elif hasattr(request, "actor") and request.actor:
# we use default anonymous scopes
user_scopes = scopes.FEDERATION_REQUEST_SCOPES
return should_allow(
required_scope=required_scope, request_scopes=user_scopes
)
else:
if anonymous_policy is False:
return False
if anonymous_policy == "setting" and preferences.get(
"common__api_authentication_required"
):
return False
user_scopes = (
getattr(view, "anonymous_scopes", set()) | scopes.ANONYMOUS_SCOPES
)
return should_allow(
required_scope=required_scope, request_scopes=user_scopes
)
def has_permission_token(self, token, required_scope):
if token.is_expired():
return False
if not token.user:
return False
user = token.user
user_scopes = scopes.get_from_permissions(**user.get_permissions())
token_scopes = set(token.scopes.keys())
final_scopes = (
user_scopes
& normalize(*token_scopes)
& token.application.normalized_scopes
& scopes.OAUTH_APP_SCOPES
)
return should_allow(required_scope=required_scope, request_scopes=final_scopes)

View file

@ -1,105 +0,0 @@
class Scope:
def __init__(self, id, label="", children=None):
self.id = id
self.label = ""
self.children = children or []
def copy(self, prefix):
return Scope(f"{prefix}:{self.id}")
BASE_SCOPES = [
Scope(
"profile", "Access profile data (e-mail, username, avatar, subsonic password…)"
),
Scope("libraries", "Access uploads, libraries, and audio metadata"),
Scope("edits", "Browse and submit edits on audio metadata"),
Scope("follows", "Access library follows"),
Scope("favorites", "Access favorites"),
Scope("filters", "Access content filters"),
Scope("listenings", "Access listening history"),
Scope("radios", "Access radios"),
Scope("playlists", "Access playlists"),
Scope("notifications", "Access personal notifications"),
Scope("security", "Access security settings"),
Scope("reports", "Access reports"),
Scope("plugins", "Access plugins"),
# Privileged scopes that require specific user permissions
Scope("instance:settings", "Access instance settings"),
Scope("instance:users", "Access local user accounts"),
Scope("instance:invitations", "Access invitations"),
Scope("instance:edits", "Access instance metadata edits"),
Scope(
"instance:libraries", "Access instance uploads, libraries and audio metadata"
),
Scope("instance:accounts", "Access instance federated accounts"),
Scope("instance:domains", "Access instance domains"),
Scope("instance:policies", "Access instance moderation policies"),
Scope("instance:reports", "Access instance moderation reports"),
Scope("instance:requests", "Access instance moderation requests"),
Scope("instance:notes", "Access instance moderation notes"),
]
SCOPES = [
Scope("read", children=[s.copy("read") for s in BASE_SCOPES]),
Scope("write", children=[s.copy("write") for s in BASE_SCOPES]),
]
def flatten(*scopes):
for scope in scopes:
yield scope
yield from flatten(*scope.children)
SCOPES_BY_ID = {s.id: s for s in flatten(*SCOPES)}
FEDERATION_REQUEST_SCOPES = {"read:libraries"}
ANONYMOUS_SCOPES = {
"read:libraries",
"read:playlists",
"read:listenings",
"read:favorites",
"read:radios",
"read:edits",
}
COMMON_SCOPES = ANONYMOUS_SCOPES | {
"read:profile",
"write:profile",
"write:libraries",
"write:playlists",
"read:follows",
"write:follows",
"write:favorites",
"read:notifications",
"write:notifications",
"write:radios",
"write:edits",
"read:filters",
"write:filters",
"read:reports",
"write:reports",
"write:listenings",
}
LOGGED_IN_SCOPES = COMMON_SCOPES | {
"read:security",
"write:security",
"read:plugins",
"write:plugins",
}
# We don't allow admin access for oauth apps yet
OAUTH_APP_SCOPES = COMMON_SCOPES
def get_from_permissions(**permissions):
from funkwhale_api.users import models
final = LOGGED_IN_SCOPES
for permission_name, value in permissions.items():
if not value:
continue
config = models.PERMISSIONS_CONFIGURATION[permission_name]
final = final | config["scopes"]
return final

View file

@ -1,41 +0,0 @@
from rest_framework import serializers
from .. import models
class ApplicationSerializer(serializers.ModelSerializer):
scopes = serializers.CharField(source="scope")
class Meta:
model = models.Application
fields = ["client_id", "name", "scopes", "created", "updated"]
def to_representation(self, obj):
repr = super().to_representation(obj)
if obj.user_id:
repr["token"] = obj.token
return repr
class CreateApplicationSerializer(serializers.ModelSerializer):
name = serializers.CharField(required=True, max_length=255)
scopes = serializers.CharField(source="scope", default="read")
class Meta:
model = models.Application
fields = [
"client_id",
"name",
"scopes",
"client_secret",
"created",
"updated",
"redirect_uris",
]
read_only_fields = ["client_id", "created", "updated"]
def to_representation(self, obj):
repr = super().to_representation(obj)
if obj.user_id:
repr["token"] = obj.token
return repr

View file

@ -1,41 +0,0 @@
import urllib.parse
import oauthlib.oauth2
from funkwhale_api.common import authentication
def check(request):
user = request.user
request.user = user.__class__.objects.all().for_auth().get(pk=user.pk)
if request.user.should_verify_email():
raise authentication.UnverifiedEmail(user)
return True
class OAuth2Server(oauthlib.oauth2.Server):
def verify_request(self, uri, *args, **kwargs):
valid, request = super().verify_request(uri, *args, **kwargs)
if valid:
if not check(request):
return False, request
return valid, request
# maybe the token was given in the querystring?
query = urllib.parse.urlparse(request.uri).query
token = None
if query:
parsed_qs = urllib.parse.parse_qs(query)
token = parsed_qs.get("token", [])
if len(token) > 0:
token = token[0]
if token:
valid = self.request_validator.validate_bearer_token(
token, request.scopes, request
)
if valid:
if not check(request):
return False, request
return valid, request

View file

@ -1,8 +0,0 @@
from oauth2_provider import models as oauth2_models
from funkwhale_api.taskapp import celery
@celery.app.task(name="oauth.clear_expired_tokens")
def clear_expired_tokens():
oauth2_models.clear_expired()

View file

@ -1,16 +0,0 @@
from django.conf.urls import url
from django.views.decorators.csrf import csrf_exempt
from funkwhale_api.common import routers
from . import views
router = routers.OptionalSlashRouter()
router.register(r"apps", views.ApplicationViewSet, "apps")
router.register(r"grants", views.GrantViewSet, "grants")
urlpatterns = router.urls + [
url("^authorize/?$", csrf_exempt(views.AuthorizeView.as_view()), name="authorize"),
url("^token/?$", views.TokenView.as_view(), name="token"),
url("^revoke/?$", views.RevokeTokenView.as_view(), name="revoke"),
]

View file

@ -1,238 +0,0 @@
import json
import secrets
import urllib.parse
from django import http
from django.db.models import Q
from django.utils import timezone
from drf_spectacular.utils import extend_schema
from oauth2_provider import exceptions as oauth2_exceptions
from oauth2_provider import views as oauth_views
from oauth2_provider.settings import oauth2_settings
from rest_framework import mixins, permissions, response, views, viewsets
from rest_framework.decorators import action
from funkwhale_api.common import throttling
from .. import models
from . import serializers
from .permissions import ScopePermission
class ApplicationViewSet(
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
anonymous_policy = True
required_scope = {
"retrieve": None,
"create": None,
"destroy": "write:security",
"update": "write:security",
"partial_update": "write:security",
"refresh_token": "write:security",
"list": "read:security",
}
lookup_field = "client_id"
queryset = models.Application.objects.all().order_by("-created")
serializer_class = serializers.ApplicationSerializer
throttling_scopes = {
"create": {
"anonymous": "anonymous-oauth-app",
"authenticated": "authenticated-oauth-app",
}
}
def create(self, request, *args, **kwargs):
request_data = request.data.copy()
secret = secrets.token_hex(64)
request_data["client_secret"] = secret
serializer = self.get_serializer(data=request_data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
data = serializer.data
# Since the serializer returns a hashed secret, we need to override it for the response.
data["client_secret"] = secret
return response.Response(data, status=201, headers=headers)
def get_serializer_class(self):
if self.request.method.lower() == "post":
return serializers.CreateApplicationSerializer
return super().get_serializer_class()
def perform_create(self, serializer):
return serializer.save(
client_type=models.Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=models.Application.GRANT_AUTHORIZATION_CODE,
user=self.request.user if self.request.user.is_authenticated else None,
token=models.get_token() if self.request.user.is_authenticated else None,
)
def get_serializer(self, *args, **kwargs):
serializer_class = self.get_serializer_class()
try:
owned = args[0].user == self.request.user
except (IndexError, AttributeError):
owned = False
if owned:
serializer_class = serializers.CreateApplicationSerializer
kwargs["context"] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_queryset(self):
qs = super().get_queryset()
if self.action in [
"list",
"destroy",
"update",
"partial_update",
"refresh_token",
]:
qs = qs.filter(user=self.request.user)
return qs
@extend_schema(operation_id="refresh_oauth_token")
@action(
detail=True,
methods=["post"],
url_name="refresh_token",
url_path="refresh-token",
)
def refresh_token(self, request, *args, **kwargs):
app = self.get_object()
if not app.user_id or request.user != app.user:
return response.Response(status=404)
app.token = models.get_token()
app.save(update_fields=["token"])
serializer = serializers.CreateApplicationSerializer(app)
return response.Response(serializer.data, status=200)
class GrantViewSet(
mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet,
):
"""
This is a viewset that list applications that have access to the request user
account, to allow revoking tokens easily.
"""
permission_classes = [permissions.IsAuthenticated, ScopePermission]
required_scope = "security"
lookup_field = "client_id"
queryset = models.Application.objects.all().order_by("-created")
serializer_class = serializers.ApplicationSerializer
pagination_class = None
def get_queryset(self):
now = timezone.now()
queryset = super().get_queryset()
grants = models.Grant.objects.filter(user=self.request.user, expires__gt=now)
access_tokens = models.AccessToken.objects.filter(user=self.request.user)
refresh_tokens = models.RefreshToken.objects.filter(
user=self.request.user, revoked=None
)
return queryset.filter(
Q(pk__in=access_tokens.values("application"))
| Q(pk__in=refresh_tokens.values("application"))
| Q(pk__in=grants.values("application"))
).distinct()
def perform_create(self, serializer):
return serializer.save(
client_type=models.Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=models.Application.GRANT_AUTHORIZATION_CODE,
)
def perform_destroy(self, instance):
application = instance
access_tokens = application.accesstoken_set.filter(user=self.request.user)
for token in access_tokens:
token.revoke()
refresh_tokens = application.refreshtoken_set.filter(user=self.request.user)
for token in refresh_tokens:
try:
token.revoke()
except models.AccessToken.DoesNotExist:
token.access_token = None
token.revoked = timezone.now()
token.save(update_fields=["access_token", "revoked"])
grants = application.grant_set.filter(user=self.request.user)
grants.delete()
class AuthorizeView(views.APIView, oauth_views.AuthorizationView):
permission_classes = [permissions.IsAuthenticated]
server_class = oauth2_settings.OAUTH2_SERVER_CLASS
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = oauth2_settings.OAUTH2_BACKEND_CLASS
skip_authorization_completely = False
oauth2_data = {}
def form_invalid(self, form):
"""
Return a JSON response instead of a template one
"""
errors = form.errors
return self.json_payload(errors, status_code=400)
def post(self, request, *args, **kwargs):
throttling.check_request(request, "oauth-authorize")
return super().post(request, *args, **kwargs)
def form_valid(self, form):
try:
return super().form_valid(form)
except models.Application.DoesNotExist:
return self.json_payload({"non_field_errors": ["Invalid application"]}, 400)
def redirect(self, redirect_to, application):
if self.request.is_ajax():
# Web client need this to be able to redirect the user
query = urllib.parse.urlparse(redirect_to).query
code = urllib.parse.parse_qs(query)["code"][0]
return self.json_payload(
{"redirect_uri": redirect_to, "code": code}, status_code=200
)
return super().redirect(redirect_to, application)
def error_response(self, error, application):
if isinstance(error, oauth2_exceptions.FatalClientError):
return self.json_payload({"detail": error.oauthlib_error.description}, 400)
return super().error_response(error, application)
def json_payload(self, payload, status_code):
return http.HttpResponse(
json.dumps(payload), status=status_code, content_type="application/json"
)
def handle_no_permission(self):
return self.json_payload(
{"detail": "Authentication credentials were not provided."}, 401
)
class TokenView(oauth_views.TokenView):
def post(self, request, *args, **kwargs):
throttling.check_request(request, "oauth-token")
return super().post(request, *args, **kwargs)
class RevokeTokenView(oauth_views.RevokeTokenView):
def post(self, request, *args, **kwargs):
throttling.check_request(request, "oauth-revoke-token")
return super().post(request, *args, **kwargs)