funquail/api/funkwhale_api/users/serializers.py
Georg Krause 02bb83e3f4 fix(api): Fix password reset via email
The original serializes assumes if allauth is installed, its also used
for the token generation. Thats not the case for us, so we need to
overwrite this behavior

Part-of: <https://dev.funkwhale.audio/funkwhale/funkwhale/-/merge_requests/2565>
2023-08-31 17:51:46 +00:00

343 lines
11 KiB
Python

import re
from allauth.account import models as allauth_models
from dj_rest_auth.registration.serializers import RegisterSerializer as RS
from dj_rest_auth.registration.serializers import get_adapter
from dj_rest_auth.serializers import PasswordResetConfirmSerializer as PRCS
from dj_rest_auth.serializers import PasswordResetSerializer as PRS
from django.contrib import auth
from django.contrib.auth.forms import PasswordResetForm
from django.core import validators
from django.utils.deconstruct import deconstructible
from django.utils.encoding import force_str
from django.utils.translation import gettext_lazy as _
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from funkwhale_api.activity import serializers as activity_serializers
from funkwhale_api.common import models as common_models
from funkwhale_api.common import preferences
from funkwhale_api.common import serializers as common_serializers
from funkwhale_api.common import utils as common_utils
from funkwhale_api.federation import models as federation_models
from funkwhale_api.moderation import models as moderation_models
from funkwhale_api.moderation import tasks as moderation_tasks
from funkwhale_api.moderation import utils as moderation_utils
from . import adapters
from . import authentication as users_authentication
from . import models
@deconstructible
class ASCIIUsernameValidator(validators.RegexValidator):
regex = r"^[\w]+$"
message = _(
"Enter a valid username. This value may contain only English letters, "
"numbers, and _ characters."
)
flags = re.ASCII
username_validators = [ASCIIUsernameValidator()]
NOOP = object()
class RegisterSerializer(RS):
invitation = serializers.CharField(
required=False, allow_null=True, allow_blank=True
)
def __init__(self, *args, **kwargs):
self.approval_enabled = preferences.get("moderation__signup_approval_enabled")
super().__init__(*args, **kwargs)
if self.approval_enabled:
customization = preferences.get("moderation__signup_form_customization")
self.fields[
"request_fields"
] = moderation_utils.get_signup_form_additional_fields_serializer(
customization
)
def validate_invitation(self, value):
if not value:
return
try:
return models.Invitation.objects.open().get(code__iexact=value)
except models.Invitation.DoesNotExist:
raise serializers.ValidationError("Invalid invitation code")
def validate(self, validated_data):
data = super().validate(validated_data)
# we create a fake user obj with validated data so we can validate
# password properly (we have a password validator that requires
# a user object)
user = models.User(username=data["username"], email=data["email"])
get_adapter().clean_password(data["password1"], user)
return data
def validate_username(self, value):
username = super().validate_username(value)
duplicates = federation_models.Actor.objects.local().filter(
preferred_username__iexact=username
)
if duplicates.exists():
raise serializers.ValidationError(
"A user with that username already exists."
)
return username
def save(self, request):
user = super().save(request)
update_fields = ["actor"]
user.actor = models.create_actor(user)
user_request = None
if self.approval_enabled:
# manually approve users
user.is_active = False
user_request = moderation_models.UserRequest.objects.create(
submitter=user.actor,
type="signup",
metadata=self.validated_data.get("request_fields", None) or None,
)
update_fields.append("is_active")
if self.validated_data.get("invitation"):
user.invitation = self.validated_data.get("invitation")
update_fields.append("invitation")
user.save(update_fields=update_fields)
if user_request:
common_utils.on_commit(
moderation_tasks.user_request_handle.delay,
user_request_id=user_request.pk,
new_status=user_request.status,
)
return user
class UserActivitySerializer(activity_serializers.ModelSerializer):
type = serializers.SerializerMethodField()
name = serializers.CharField(source="username")
local_id = serializers.CharField(source="username")
class Meta:
model = models.User
fields = ["id", "local_id", "name", "type"]
def get_type(self, obj):
return "Person"
class UserBasicSerializer(serializers.ModelSerializer):
avatar = common_serializers.AttachmentSerializer(
source="get_avatar", allow_null=True
)
class Meta:
model = models.User
fields = ["id", "username", "name", "date_joined", "avatar"]
class UserWriteSerializer(serializers.ModelSerializer):
summary = common_serializers.ContentSerializer(required=False, allow_null=True)
avatar = common_serializers.RelatedField(
"uuid",
queryset=common_models.Attachment.objects.all().local().attached(False),
serializer=None,
queryset_filter=lambda qs, context: qs.filter(
actor=context["request"].user.actor
),
write_only=True,
)
class Meta:
model = models.User
fields = [
"name",
"privacy_level",
"avatar",
"instance_support_message_display_date",
"funkwhale_support_message_display_date",
"summary",
]
def update(self, obj, validated_data):
if not obj.actor:
obj.create_actor()
summary = validated_data.pop("summary", NOOP)
avatar = validated_data.pop("avatar", NOOP)
obj = super().update(obj, validated_data)
if summary != NOOP:
common_utils.attach_content(obj.actor, "summary_obj", summary)
if avatar != NOOP:
obj.actor.attachment_icon = avatar
obj.actor.save(update_fields=["attachment_icon"])
return obj
def to_representation(self, instance):
r = super().to_representation(instance)
r["avatar"] = common_serializers.AttachmentSerializer(
instance.get_avatar()
).data
return r
class UserReadSerializer(serializers.ModelSerializer):
permissions = serializers.SerializerMethodField()
full_username = serializers.SerializerMethodField()
avatar = common_serializers.AttachmentSerializer(source="get_avatar")
class Meta:
model = models.User
fields = [
"id",
"username",
"full_username",
"name",
"email",
"is_staff",
"is_superuser",
"permissions",
"date_joined",
"privacy_level",
"avatar",
]
def get_permissions(self, o):
return o.get_permissions()
@extend_schema_field(OpenApiTypes.STR)
def get_full_username(self, o):
if o.actor:
return o.actor.full_username
class MeSerializer(UserReadSerializer):
quota_status = serializers.SerializerMethodField()
summary = serializers.SerializerMethodField()
tokens = serializers.SerializerMethodField()
class Meta(UserReadSerializer.Meta):
fields = UserReadSerializer.Meta.fields + [
"quota_status",
"instance_support_message_display_date",
"funkwhale_support_message_display_date",
"summary",
"tokens",
"settings",
]
def get_quota_status(self, o):
return o.get_quota_status() if o.actor else 0
def get_summary(self, o):
if not o.actor or not o.actor.summary_obj:
return
return common_serializers.ContentSerializer(o.actor.summary_obj).data
def get_tokens(self, o):
return {
"listen": users_authentication.generate_scoped_token(
user_id=o.pk, user_secret=o.secret_key, scopes=["read:libraries"]
)
}
class PasswordResetSerializer(PRS):
password_reset_form_class = PasswordResetForm
def get_email_options(self):
return {"extra_email_context": adapters.get_email_context()}
class PasswordResetConfirmSerializer(PRCS):
def validate(self, attrs):
from allauth.account.forms import default_token_generator
from django.utils.http import urlsafe_base64_decode as uid_decoder
UserModel = auth.get_user_model()
# Decode the uidb64 (allauth use base36) to uid to get User object
try:
uid = force_str(uid_decoder(attrs["uid"]))
self.user = UserModel._default_manager.get(pk=uid)
except (TypeError, ValueError, OverflowError, UserModel.DoesNotExist):
raise ValidationError({"uid": [_("Invalid value")]})
if not default_token_generator.check_token(self.user, attrs["token"]):
raise ValidationError({"token": [_("Invalid value")]})
self.custom_validation(attrs)
# Construct SetPasswordForm instance
self.set_password_form = self.set_password_form_class(
user=self.user,
data=attrs,
)
if not self.set_password_form.is_valid():
raise serializers.ValidationError(self.set_password_form.errors)
return attrs
class UserDeleteSerializer(serializers.Serializer):
password = serializers.CharField()
confirm = serializers.BooleanField()
def validate_password(self, value):
if not self.instance.check_password(value):
raise serializers.ValidationError("Invalid password")
def validate_confirm(self, value):
if not value:
raise serializers.ValidationError("Please confirm deletion")
return value
class LoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
def validate(self, data):
user = auth.authenticate(request=self.context.get("request"), **data)
if not user:
raise serializers.ValidationError(
"Unable to log in with provided credentials"
)
if not user.is_active:
raise serializers.ValidationError("This account was disabled")
return user
def save(self, request):
return auth.login(request, self.validated_data)
class UserChangeEmailSerializer(serializers.Serializer):
password = serializers.CharField()
email = serializers.EmailField()
def validate_password(self, value):
if not self.instance.check_password(value):
raise serializers.ValidationError("Invalid password")
def validate_email(self, value):
if (
allauth_models.EmailAddress.objects.filter(email__iexact=value)
.exclude(user=self.context["user"])
.exists()
):
raise serializers.ValidationError("This e-mail address is already in use")
return value
def save(self, request):
current, _ = allauth_models.EmailAddress.objects.get_or_create(
user=request.user,
email=request.user.email,
defaults={"verified": False, "primary": True},
)
current.change(request, self.validated_data["email"], confirm=True)