発散解像度 -divergence resolution-
Signed-off-by: Shin'ya Minazuki <shinyoukai@laidback.moe>
This commit is contained in:
parent
1ff613dee6
commit
01bb65f8da
457 changed files with 929 additions and 602 deletions
805
api/funkwhale_api/music/metadata.py
Normal file
805
api/funkwhale_api/music/metadata.py
Normal file
|
|
@ -0,0 +1,805 @@
|
|||
import base64
|
||||
import datetime
|
||||
import logging
|
||||
from collections.abc import Mapping
|
||||
|
||||
import arrow
|
||||
import mutagen._util
|
||||
import mutagen.flac
|
||||
import mutagen.oggtheora
|
||||
import mutagen.oggvorbis
|
||||
from rest_framework import serializers
|
||||
|
||||
from funkwhale_api.tags import models as tags_models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
NODEFAULT = object()
|
||||
# default title used when imported tracks miss the `Album` tag, see #122
|
||||
UNKNOWN_ALBUM = "[Unknown Album]"
|
||||
|
||||
|
||||
class TagNotFound(KeyError):
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedTag(KeyError):
|
||||
pass
|
||||
|
||||
|
||||
class ParseError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def get_id3_tag(f, k):
|
||||
if k == "pictures":
|
||||
return f.tags.getall("APIC")
|
||||
# First we try to grab the standard key
|
||||
possible_attributes = [("text", True), ("url", False)]
|
||||
for attr, select_first in possible_attributes:
|
||||
try:
|
||||
v = getattr(f.tags[k], attr)
|
||||
if select_first:
|
||||
v = v[0]
|
||||
return v
|
||||
except KeyError:
|
||||
break
|
||||
except IndexError:
|
||||
break
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
# then we fallback on parsing non standard tags
|
||||
all_tags = f.tags.getall("TXXX")
|
||||
try:
|
||||
matches = [t for t in all_tags if t.desc.lower() == k.lower()]
|
||||
return matches[0].text[0]
|
||||
except (KeyError, IndexError):
|
||||
raise TagNotFound(k)
|
||||
|
||||
|
||||
def clean_id3_pictures(apic):
|
||||
pictures = []
|
||||
for p in list(apic):
|
||||
pictures.append(
|
||||
{
|
||||
"mimetype": p.mime,
|
||||
"content": p.data,
|
||||
"description": p.desc,
|
||||
"type": p.type.real,
|
||||
}
|
||||
)
|
||||
return pictures
|
||||
|
||||
|
||||
def get_mp4_tag(f, k):
|
||||
if k == "pictures":
|
||||
return f.get("covr", [])
|
||||
raw_value = f.get(k, None)
|
||||
|
||||
if not raw_value:
|
||||
raise TagNotFound(k)
|
||||
|
||||
value = raw_value[0]
|
||||
try:
|
||||
return value.decode()
|
||||
except AttributeError:
|
||||
return value
|
||||
|
||||
|
||||
def get_mp4_position(raw_value):
|
||||
return raw_value[0]
|
||||
|
||||
|
||||
def clean_mp4_pictures(raw_pictures):
|
||||
pictures = []
|
||||
for p in list(raw_pictures):
|
||||
if p.imageformat == p.FORMAT_JPEG:
|
||||
mimetype = "image/jpeg"
|
||||
elif p.imageformat == p.FORMAT_PNG:
|
||||
mimetype = "image/png"
|
||||
else:
|
||||
continue
|
||||
pictures.append(
|
||||
{
|
||||
"mimetype": mimetype,
|
||||
"content": bytes(p),
|
||||
"description": "",
|
||||
"type": mutagen.id3.PictureType.COVER_FRONT,
|
||||
}
|
||||
)
|
||||
return pictures
|
||||
|
||||
|
||||
def get_flac_tag(f, k):
|
||||
if k == "pictures":
|
||||
return f.pictures
|
||||
try:
|
||||
return f.get(k, [])[0]
|
||||
except (KeyError, IndexError):
|
||||
raise TagNotFound(k)
|
||||
|
||||
|
||||
def clean_flac_pictures(apic):
|
||||
pictures = []
|
||||
for p in list(apic):
|
||||
pictures.append(
|
||||
{
|
||||
"mimetype": p.mime,
|
||||
"content": p.data,
|
||||
"description": p.desc,
|
||||
"type": p.type.real,
|
||||
}
|
||||
)
|
||||
return pictures
|
||||
|
||||
|
||||
def clean_ogg_pictures(metadata_block_picture):
|
||||
pictures = []
|
||||
for b64_data in [metadata_block_picture]:
|
||||
try:
|
||||
data = base64.b64decode(b64_data)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
try:
|
||||
picture = mutagen.flac.Picture(data)
|
||||
except mutagen.flac.FLACError:
|
||||
continue
|
||||
|
||||
pictures.append(
|
||||
{
|
||||
"mimetype": picture.mime,
|
||||
"content": picture.data,
|
||||
"description": "",
|
||||
"type": picture.type.real,
|
||||
}
|
||||
)
|
||||
return pictures
|
||||
|
||||
|
||||
def get_mp3_recording_id(f, k):
|
||||
try:
|
||||
return [t for t in f.tags.getall("UFID") if "musicbrainz.org" in t.owner][
|
||||
0
|
||||
].data.decode("utf-8")
|
||||
except IndexError:
|
||||
raise TagNotFound(k)
|
||||
|
||||
|
||||
def get_mp3_comment(f, k):
|
||||
keys_to_try = ["COMM", "COMM::eng"]
|
||||
for key in keys_to_try:
|
||||
try:
|
||||
return get_id3_tag(f, key)
|
||||
except TagNotFound:
|
||||
pass
|
||||
|
||||
raise TagNotFound("COMM")
|
||||
|
||||
|
||||
VALIDATION = {}
|
||||
|
||||
CONF = {
|
||||
"OggOpus": {
|
||||
"getter": lambda f, k: f[k][0],
|
||||
"fields": {
|
||||
"position": {"field": "TRACKNUMBER"},
|
||||
"disc_number": {"field": "DISCNUMBER"},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"artists": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date"},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"mbid": {"field": "musicbrainz_trackid"},
|
||||
"license": {},
|
||||
"copyright": {},
|
||||
"genre": {},
|
||||
"pictures": {
|
||||
"field": "metadata_block_picture",
|
||||
"to_application": clean_ogg_pictures,
|
||||
},
|
||||
"comment": {"field": "comment"},
|
||||
},
|
||||
},
|
||||
"OggVorbis": {
|
||||
"getter": lambda f, k: f[k][0],
|
||||
"fields": {
|
||||
"position": {"field": "TRACKNUMBER"},
|
||||
"disc_number": {"field": "DISCNUMBER"},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"artists": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date"},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"mbid": {"field": "musicbrainz_trackid"},
|
||||
"license": {},
|
||||
"copyright": {},
|
||||
"genre": {},
|
||||
"pictures": {
|
||||
"field": "metadata_block_picture",
|
||||
"to_application": clean_ogg_pictures,
|
||||
},
|
||||
"comment": {"field": "comment"},
|
||||
},
|
||||
},
|
||||
"OggTheora": {
|
||||
"getter": lambda f, k: f[k][0],
|
||||
"fields": {
|
||||
"position": {"field": "TRACKNUMBER"},
|
||||
"disc_number": {"field": "DISCNUMBER"},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"artists": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date"},
|
||||
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
|
||||
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
|
||||
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
|
||||
"mbid": {"field": "MusicBrainz Track Id"},
|
||||
"license": {},
|
||||
"copyright": {},
|
||||
"genre": {},
|
||||
"comment": {"field": "comment"},
|
||||
},
|
||||
},
|
||||
"ID3": {
|
||||
"getter": get_id3_tag,
|
||||
"clean_pictures": clean_id3_pictures,
|
||||
"fields": {
|
||||
"position": {"field": "TRCK"},
|
||||
"disc_number": {"field": "TPOS"},
|
||||
"title": {"field": "TIT2"},
|
||||
"artist": {"field": "TPE1"},
|
||||
"artists": {"field": "ARTISTS"},
|
||||
"album_artist": {"field": "TPE2"},
|
||||
"album": {"field": "TALB"},
|
||||
"date": {"field": "TDRC"},
|
||||
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
|
||||
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
|
||||
"genre": {"field": "TCON"},
|
||||
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
|
||||
"mbid": {"field": "UFID", "getter": get_mp3_recording_id},
|
||||
"pictures": {},
|
||||
"license": {"field": "WCOP"},
|
||||
"copyright": {"field": "TCOP"},
|
||||
"comment": {"field": "COMM", "getter": get_mp3_comment},
|
||||
},
|
||||
},
|
||||
"MP4": {
|
||||
"getter": get_mp4_tag,
|
||||
"clean_pictures": clean_mp4_pictures,
|
||||
"fields": {
|
||||
"position": {"field": "trkn", "to_application": get_mp4_position},
|
||||
"disc_number": {"field": "disk", "to_application": get_mp4_position},
|
||||
"title": {"field": "©nam"},
|
||||
"artist": {"field": "©ART"},
|
||||
"artists": {"field": "----:com.apple.iTunes:ARTISTS"},
|
||||
"album_artist": {"field": "aART"},
|
||||
"album": {"field": "©alb"},
|
||||
"date": {"field": "©day"},
|
||||
"musicbrainz_albumid": {
|
||||
"field": "----:com.apple.iTunes:MusicBrainz Album Id"
|
||||
},
|
||||
"musicbrainz_artistid": {
|
||||
"field": "----:com.apple.iTunes:MusicBrainz Artist Id"
|
||||
},
|
||||
"genre": {"field": "©gen"},
|
||||
"musicbrainz_albumartistid": {
|
||||
"field": "----:com.apple.iTunes:MusicBrainz Album Artist Id"
|
||||
},
|
||||
"mbid": {"field": "----:com.apple.iTunes:MusicBrainz Track Id"},
|
||||
"pictures": {},
|
||||
"license": {"field": "----:com.apple.iTunes:LICENSE"},
|
||||
"copyright": {"field": "cprt"},
|
||||
"comment": {"field": "©cmt"},
|
||||
},
|
||||
},
|
||||
"FLAC": {
|
||||
"getter": get_flac_tag,
|
||||
"clean_pictures": clean_flac_pictures,
|
||||
"fields": {
|
||||
"position": {"field": "tracknumber"},
|
||||
"disc_number": {"field": "discnumber"},
|
||||
"title": {},
|
||||
"artist": {},
|
||||
"artists": {},
|
||||
"album_artist": {"field": "albumartist"},
|
||||
"album": {},
|
||||
"date": {"field": "date"},
|
||||
"musicbrainz_albumid": {},
|
||||
"musicbrainz_artistid": {},
|
||||
"musicbrainz_albumartistid": {},
|
||||
"genre": {},
|
||||
"mbid": {"field": "musicbrainz_trackid"},
|
||||
"test": {},
|
||||
"pictures": {},
|
||||
"license": {},
|
||||
"copyright": {},
|
||||
"comment": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
CONF["MP3"] = CONF["ID3"]
|
||||
CONF["AIFF"] = CONF["ID3"]
|
||||
|
||||
ALL_FIELDS = [
|
||||
"position",
|
||||
"disc_number",
|
||||
"title",
|
||||
"artist",
|
||||
"album_artist",
|
||||
"album",
|
||||
"date",
|
||||
"musicbrainz_albumid",
|
||||
"musicbrainz_artistid",
|
||||
"musicbrainz_albumartistid",
|
||||
"mbid",
|
||||
"license",
|
||||
"copyright",
|
||||
"comment",
|
||||
]
|
||||
|
||||
|
||||
class Metadata(Mapping):
|
||||
def __init__(self, filething, kind=mutagen.File):
|
||||
self._file = kind(filething)
|
||||
if self._file is None:
|
||||
raise ValueError(f"Cannot parse metadata from {filething}")
|
||||
if len(self._file) == 0:
|
||||
raise ValueError(f"No tags found in {filething}")
|
||||
self.fallback = self.load_fallback(filething, self._file)
|
||||
ft = self.get_file_type(self._file)
|
||||
try:
|
||||
self._conf = CONF[ft]
|
||||
except KeyError:
|
||||
raise ValueError(f"Unsupported format {ft}")
|
||||
|
||||
def get_file_type(self, f):
|
||||
return f.__class__.__name__
|
||||
|
||||
def load_fallback(self, filething, parent):
|
||||
"""
|
||||
In some situations, such as Ogg Theora files tagged with MusicBrainz Picard,
|
||||
part of the tags are only available in the ogg vorbis comments
|
||||
"""
|
||||
try:
|
||||
filething.seek(0)
|
||||
except AttributeError:
|
||||
pass
|
||||
if isinstance(parent, mutagen.oggtheora.OggTheora):
|
||||
try:
|
||||
return Metadata(filething, kind=mutagen.oggvorbis.OggVorbis)
|
||||
except (ValueError, mutagen._util.MutagenError):
|
||||
raise
|
||||
pass
|
||||
|
||||
def get(self, key, default=NODEFAULT):
|
||||
try:
|
||||
return self._get_from_self(key)
|
||||
except TagNotFound:
|
||||
if not self.fallback:
|
||||
if default != NODEFAULT:
|
||||
return default
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
return self.fallback.get(key, default=default)
|
||||
except UnsupportedTag:
|
||||
if not self.fallback:
|
||||
raise
|
||||
else:
|
||||
return self.fallback.get(key, default=default)
|
||||
|
||||
def all(self):
|
||||
"""
|
||||
Return a dict with all support metadata fields, if they are available
|
||||
"""
|
||||
final = {}
|
||||
for field in self._conf["fields"]:
|
||||
if field in ["pictures"]:
|
||||
continue
|
||||
value = self.get(field, None)
|
||||
if value is None:
|
||||
continue
|
||||
final[field] = str(value)
|
||||
|
||||
return final
|
||||
|
||||
def _get_from_self(self, key, default=NODEFAULT):
|
||||
try:
|
||||
field_conf = self._conf["fields"][key]
|
||||
except KeyError:
|
||||
raise UnsupportedTag(f"{key} is not supported for this file format")
|
||||
real_key = field_conf.get("field", key)
|
||||
try:
|
||||
getter = field_conf.get("getter", self._conf["getter"])
|
||||
v = getter(self._file, real_key)
|
||||
except KeyError:
|
||||
if default == NODEFAULT:
|
||||
raise TagNotFound(real_key)
|
||||
return default
|
||||
|
||||
converter = field_conf.get("to_application")
|
||||
if converter:
|
||||
v = converter(v)
|
||||
field = VALIDATION.get(key)
|
||||
if field:
|
||||
v = field.to_python(v)
|
||||
return v
|
||||
|
||||
def get_picture(self, *picture_types):
|
||||
if not picture_types:
|
||||
raise ValueError("You need to request at least one picture type")
|
||||
ptypes = [
|
||||
getattr(mutagen.id3.PictureType, picture_type.upper())
|
||||
for picture_type in picture_types
|
||||
]
|
||||
|
||||
try:
|
||||
pictures = self.get("pictures")
|
||||
except (UnsupportedTag, TagNotFound):
|
||||
return
|
||||
|
||||
cleaner = self._conf.get("clean_pictures", lambda v: v)
|
||||
pictures = cleaner(pictures)
|
||||
if not pictures:
|
||||
return
|
||||
for ptype in ptypes:
|
||||
for p in pictures:
|
||||
if p["type"] == ptype:
|
||||
return p
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.get(key)
|
||||
|
||||
def __len__(self):
|
||||
return 1
|
||||
|
||||
def __iter__(self):
|
||||
yield from self._conf["fields"]
|
||||
|
||||
|
||||
class ArtistField(serializers.Field):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.for_album = kwargs.pop("for_album", False)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_value(self, data):
|
||||
if self.for_album:
|
||||
keys = [
|
||||
("artists", "album_artist"),
|
||||
("names", "artists"),
|
||||
("mbids", "musicbrainz_albumartistid"),
|
||||
]
|
||||
else:
|
||||
keys = [
|
||||
("artists", "artists"),
|
||||
("names", "artist"),
|
||||
("mbids", "musicbrainz_artistid"),
|
||||
]
|
||||
|
||||
final = {}
|
||||
for field, key in keys:
|
||||
final[field] = data.get(key, None)
|
||||
|
||||
return final
|
||||
|
||||
def to_internal_value(self, data):
|
||||
# we have multiple values that can be separated by various separators
|
||||
separators = [";", ","]
|
||||
# we get a list like that if tagged via musicbrainz
|
||||
# ae29aae4-abfb-4609-8f54-417b1f4d64cc; 3237b5a8-ae44-400c-aa6d-cea51f0b9074;
|
||||
raw_mbids = data["mbids"]
|
||||
used_separator = None
|
||||
mbids = [raw_mbids]
|
||||
if raw_mbids:
|
||||
if "/" in raw_mbids:
|
||||
# it's a featuring, we can't handle this now
|
||||
mbids = []
|
||||
else:
|
||||
for separator in separators:
|
||||
if separator in raw_mbids:
|
||||
used_separator = separator
|
||||
mbids = [m.strip() for m in raw_mbids.split(separator)]
|
||||
break
|
||||
|
||||
# now, we split on artist names, using the same separator as the one used
|
||||
# by mbids, if any
|
||||
names = []
|
||||
|
||||
if data.get("artists", None):
|
||||
for separator in separators:
|
||||
if separator in data["artists"]:
|
||||
names = [n.strip() for n in data["artists"].split(separator)]
|
||||
break
|
||||
# corner case: 'album artist' field with only one artist but multiple names in 'artits' field
|
||||
if (
|
||||
not names
|
||||
and data.get("names", None)
|
||||
and any(separator in data["names"] for separator in separators)
|
||||
):
|
||||
names = [n.strip() for n in data["names"].split(separators[0])]
|
||||
elif not names:
|
||||
names = [data["artists"]]
|
||||
elif used_separator and mbids:
|
||||
names = [n.strip() for n in data["names"].split(used_separator)]
|
||||
else:
|
||||
names = [data["names"]]
|
||||
|
||||
final = []
|
||||
for i, name in enumerate(names):
|
||||
try:
|
||||
mbid = mbids[i]
|
||||
except IndexError:
|
||||
mbid = None
|
||||
artist = {"name": name, "mbid": mbid}
|
||||
final.append(artist)
|
||||
field = serializers.ListField(
|
||||
child=ArtistSerializer(strict=self.context.get("strict", True)),
|
||||
min_length=1,
|
||||
)
|
||||
return field.to_internal_value(final)
|
||||
|
||||
|
||||
class AlbumField(serializers.Field):
|
||||
def get_value(self, data):
|
||||
return data
|
||||
|
||||
def to_internal_value(self, data):
|
||||
try:
|
||||
title = data.get("album") or ""
|
||||
except TagNotFound:
|
||||
title = ""
|
||||
|
||||
title = title.strip() or UNKNOWN_ALBUM
|
||||
final = {
|
||||
"title": title,
|
||||
"release_date": data.get("date", None),
|
||||
"mbid": data.get("musicbrainz_albumid", None),
|
||||
}
|
||||
artists_field = ArtistField(for_album=True)
|
||||
payload = artists_field.get_value(data)
|
||||
try:
|
||||
artists = artists_field.to_internal_value(payload)
|
||||
except serializers.ValidationError as e:
|
||||
artists = []
|
||||
logger.debug("Ignoring validation error on album artists: %s", e)
|
||||
album_serializer = AlbumSerializer(data=final)
|
||||
album_serializer.is_valid(raise_exception=True)
|
||||
album_serializer.validated_data["artists"] = artists
|
||||
return album_serializer.validated_data
|
||||
|
||||
|
||||
class CoverDataField(serializers.Field):
|
||||
def get_value(self, data):
|
||||
return data
|
||||
|
||||
def to_internal_value(self, data):
|
||||
return data.get_picture("cover_front", "other")
|
||||
|
||||
|
||||
class PermissiveDateField(serializers.CharField):
|
||||
def to_internal_value(self, value):
|
||||
if not value:
|
||||
return None
|
||||
value = super().to_internal_value(str(value))
|
||||
ADDITIONAL_FORMATS = [
|
||||
"%Y-%d-%m %H:%M", # deezer date format
|
||||
"%Y-%W", # weird date format based on week number, see #718
|
||||
]
|
||||
|
||||
for date_format in ADDITIONAL_FORMATS:
|
||||
try:
|
||||
parsed = datetime.datetime.strptime(value, date_format)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
return datetime.date(parsed.year, parsed.month, parsed.day)
|
||||
|
||||
try:
|
||||
parsed = arrow.get(str(value))
|
||||
return datetime.date(parsed.year, parsed.month, parsed.day)
|
||||
except (arrow.parser.ParserError, ValueError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def extract_tags_from_genre(string):
|
||||
tags = []
|
||||
delimiter = "@@@@@"
|
||||
for d in [" - ", ",", ";", "/"]:
|
||||
# Replace common tags separators by a custom delimiter
|
||||
string = string.replace(d, delimiter)
|
||||
|
||||
# loop on the parts (splitting on our custom delimiter)
|
||||
for tag in string.split(delimiter):
|
||||
tag = tag.strip()
|
||||
for d in ["-"]:
|
||||
# preparation for replacement so that Pop-Rock becomes Pop Rock, then PopRock
|
||||
# (step 1, step 2 happens below)
|
||||
tag = tag.replace(d, " ")
|
||||
if not tag:
|
||||
continue
|
||||
final_tag = ""
|
||||
if not tags_models.TAG_REGEX.match(tag.replace(" ", "")):
|
||||
# the string contains some non words chars ($, €, etc.), right now
|
||||
# we simply skip such tags
|
||||
continue
|
||||
# concatenate the parts and uppercase them so that 'pop rock' becomes 'PopRock'
|
||||
if len(tag.split(" ")) == 1:
|
||||
# we append the tag "as is", because it doesn't contain any space
|
||||
tags.append(tag)
|
||||
continue
|
||||
for part in tag.split(" "):
|
||||
# the tag contains space, there's work to do to have consistent case
|
||||
# 'pop rock' -> 'PopRock'
|
||||
# (step 2)
|
||||
if not part:
|
||||
continue
|
||||
final_tag += part[0].upper() + part[1:]
|
||||
if final_tag:
|
||||
tags.append(final_tag)
|
||||
return tags
|
||||
|
||||
|
||||
class TagsField(serializers.CharField):
|
||||
def get_value(self, data):
|
||||
return data
|
||||
|
||||
def to_internal_value(self, data):
|
||||
try:
|
||||
value = data.get("genre") or ""
|
||||
except TagNotFound:
|
||||
return []
|
||||
value = super().to_internal_value(str(value))
|
||||
|
||||
return extract_tags_from_genre(value)
|
||||
|
||||
|
||||
class MBIDField(serializers.UUIDField):
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs.setdefault("allow_null", True)
|
||||
kwargs.setdefault("required", False)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def to_internal_value(self, v):
|
||||
if v in ["", None]:
|
||||
return None
|
||||
return super().to_internal_value(v)
|
||||
|
||||
|
||||
class ArtistSerializer(serializers.Serializer):
|
||||
name = serializers.CharField(required=False, allow_null=True, allow_blank=True)
|
||||
mbid = MBIDField()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.strict = kwargs.pop("strict", True)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def validate_name(self, v):
|
||||
if self.strict and not v:
|
||||
raise serializers.ValidationError("This field is required.")
|
||||
return v
|
||||
|
||||
|
||||
class AlbumSerializer(serializers.Serializer):
|
||||
title = serializers.CharField(required=False, allow_null=True)
|
||||
mbid = MBIDField()
|
||||
release_date = PermissiveDateField(
|
||||
required=False, allow_null=True, allow_blank=True
|
||||
)
|
||||
|
||||
def validate_title(self, v):
|
||||
if self.context.get("strict", True) and not v:
|
||||
raise serializers.ValidationError("This field is required.")
|
||||
return v
|
||||
|
||||
|
||||
def get_valid_position(v):
|
||||
if v <= 0:
|
||||
v = 1
|
||||
return v
|
||||
|
||||
|
||||
class PositionField(serializers.CharField):
|
||||
def to_internal_value(self, v):
|
||||
v = super().to_internal_value(v)
|
||||
if not v:
|
||||
return v
|
||||
|
||||
try:
|
||||
return get_valid_position(int(v))
|
||||
except ValueError:
|
||||
# maybe the position is of the form "1/4"
|
||||
pass
|
||||
|
||||
try:
|
||||
return get_valid_position(int(v.split("/")[0]))
|
||||
except (ValueError, AttributeError, IndexError):
|
||||
return
|
||||
|
||||
|
||||
class DescriptionField(serializers.CharField):
|
||||
def get_value(self, data):
|
||||
return data
|
||||
|
||||
def to_internal_value(self, data):
|
||||
try:
|
||||
value = data.get("comment") or None
|
||||
except TagNotFound:
|
||||
return None
|
||||
if not value:
|
||||
return None
|
||||
value = super().to_internal_value(value)
|
||||
return {"text": value, "content_type": "text/plain"}
|
||||
|
||||
|
||||
class TrackMetadataSerializer(serializers.Serializer):
|
||||
title = serializers.CharField(required=False, allow_null=True)
|
||||
position = PositionField(allow_blank=True, allow_null=True, required=False)
|
||||
disc_number = PositionField(allow_blank=True, allow_null=True, required=False)
|
||||
copyright = serializers.CharField(allow_blank=True, allow_null=True, required=False)
|
||||
license = serializers.CharField(allow_blank=True, allow_null=True, required=False)
|
||||
mbid = MBIDField()
|
||||
tags = TagsField(allow_blank=True, allow_null=True, required=False)
|
||||
description = DescriptionField(allow_null=True, allow_blank=True, required=False)
|
||||
|
||||
album = AlbumField()
|
||||
artists = ArtistField()
|
||||
cover_data = CoverDataField(required=False)
|
||||
|
||||
remove_blank_null_fields = [
|
||||
"copyright",
|
||||
"description",
|
||||
"license",
|
||||
"position",
|
||||
"disc_number",
|
||||
"mbid",
|
||||
"tags",
|
||||
]
|
||||
|
||||
def validate_title(self, v):
|
||||
if self.context.get("strict", True) and not v:
|
||||
raise serializers.ValidationError("This field is required.")
|
||||
return v
|
||||
|
||||
def validate(self, validated_data):
|
||||
validated_data = super().validate(validated_data)
|
||||
for field in self.remove_blank_null_fields:
|
||||
try:
|
||||
v = validated_data[field]
|
||||
except KeyError:
|
||||
continue
|
||||
if v in ["", None, []]:
|
||||
validated_data.pop(field)
|
||||
validated_data["album"]["cover_data"] = validated_data.pop("cover_data", None)
|
||||
return validated_data
|
||||
|
||||
|
||||
class FakeMetadata(Mapping):
|
||||
def __init__(self, data, picture=None):
|
||||
self.data = data
|
||||
self.picture = None
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.data[key]
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def __iter__(self):
|
||||
yield from self.data
|
||||
|
||||
def get_picture(self, *args):
|
||||
return self.picture
|
||||
Loading…
Add table
Add a link
Reference in a new issue