From 9801202e3981a79a13972250331bda65f5bd6c50 Mon Sep 17 00:00:00 2001 From: Czar Date: Thu, 14 Dec 2023 21:53:51 +0100 Subject: [PATCH] Update invidious instance Closes #8 --- main.py | 736 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 368 insertions(+), 368 deletions(-) diff --git a/main.py b/main.py index 25beda8..319dba5 100644 --- a/main.py +++ b/main.py @@ -1,368 +1,368 @@ -import requests -import bs4 -import yt_dlp as youtube_dl -import random -import configparser -import re -import io -import os -import asyncio -from collections import defaultdict -from PythonSed import Sed -from slixmpp import ClientXMPP -from urllib.parse import urlparse, parse_qs, urlunparse -from pantomime import normalize_mimetype -import cgi - -sed_parse = re.compile("(? self.size: - self.pop() - - -def get_youtube_title(url): - """Get the title of a youtube video.""" - try: - info = ydl.extract_info(url, download=False) - return info["title"] - except Exception as e: - print(e) - return None - - -def get_invidious_link(yurl): - """Get an invidious link from a youtube link.""" - video = yurl.split("/")[-1] - instance = random.choice(invidious_instances) - return f"https://{instance}/watch?v={video}" - - -def get_yurl(path): - """Get a youtube link from a path.""" - yurl = f"https://youtu.be/{path}" - return yurl - - -class AngelBot(ClientXMPP): - """AngelBot class.""" - - messages = defaultdict( - lambda: { - "messages": Lifo(100), - "links": Lifo(10), - "previews": Lifo(10), - } - ) - - def get_urls(self, msg): - """Get urls from a message.""" - str_list = msg["body"].strip().split() - urls = [u for u in str_list if any(r in u for r in req_list)] - return urls - - def send_youtube_info(self, uri, sender, mtype): - """Send youtube info to the sender.""" - yurl = None - if uri.netloc == youtube_link: - yurl = get_yurl(uri.path) - elif "v" in (query := parse_qs(uri.query)): - if v := query["v"]: - yurl = get_yurl(v[0]) - else: - return - - invidious = get_invidious_link(yurl) - - if output := get_youtube_title(invidious): - if output in self.messages[sender]["previews"]: - return - self.messages[sender]["previews"].add(output) - - self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) - self.send_message(mto=sender, mbody=invidious, mtype=mtype) - - async def parse_uri(self, uri, sender, mtype): - """Parse a uri and send the result to the sender.""" - netloc = uri.netloc - if netloc in (youtube_links + [youtube_link]): - self.send_youtube_info(uri, sender, mtype) - elif netloc.split(":")[0] in block_list: - return - else: - await self.process_link(uri, sender, mtype) - - async def process_link(self, uri, sender, mtype): - """Process a link and send the result to the sender.""" - url = urlunparse(uri) - r = requests.get(url, stream=True, headers=headers, timeout=6) - if not r.ok: - return - - ftype = normalize_mimetype(r.headers.get("content-type")) - - if not ftype: - return - - if ftype in html_files: - data = "" - for i in r.iter_content(chunk_size=1024, decode_unicode=False): - data += i.decode("utf-8", errors="ignore") - if len(data) > data_limit or "" in data.lower(): - break - soup = bs4.BeautifulSoup(data, parser) - if title := soup.find("title"): - output = title.text.strip() - if output: - output = f"*{output}*" if ("\n" not in output) else output - if output in self.messages[sender]["previews"]: - return - - self.messages[sender]["previews"].add(output) - if r.history: - self.send_message(mto=sender, mbody=r.url, mtype=mtype) - self.send_message(mto=sender, mbody=output, mtype=mtype) - - else: - try: - lenght = 0 - outfile = io.BytesIO() - for chunk in r.iter_content( - chunk_size=512, - decode_unicode=False, - ): - lenght += 512 - if lenght >= data_limit: - return - outfile.write(chunk) - - content_disposition = r.headers.get("content-disposition") - filename = None - if content_disposition: - _, params = cgi.parse_header(content_disposition) - filename = params.get("filename") - else: - filename = os.path.basename(uri.path) - - ext = os.path.splitext(filename)[1] if filename else ".txt" - fname = filename if filename else f"file{ext}" - await self.embed_file(url, sender, mtype, ftype, fname, outfile) - except Exception as e: - print(e) - - async def embed_file(self, url, sender, mtype, ftype, fname, outfile): - """Embed a file and send the result to the sender.""" - furl = await self.plugin["xep_0363"].upload_file( - fname, content_type=ftype, input_file=outfile - ) - message = self.make_message(sender) - message["body"] = furl - message["type"] = mtype - message["oob"]["url"] = furl - message.send() - - async def parse_urls(self, msg, urls, sender, mtype): - """Parse urls and send the result to the sender.""" - body = msg["body"].lower() - if "nsfw" in body: return - if "nsfl" in body: return - for u in urls: - if u in self.messages[sender]["links"]: - continue - else: - self.messages[sender]["links"].add(u) - - uri = urlparse(u) - await self.parse_uri(uri, sender, mtype) - - def sed_command(self, msg, sender, mtype): - """Process sed command.""" - try: - text = msg["body"] - if not sed_cmd.match(text): - self.messages[sender]["messages"].add(text) - return - sed_args = sed_parse.split(text) - - if len(sed_args) < 4: - return - - sed = Sed() - sed.load_string(text) - - for message in self.messages[sender]["messages"]: - if sed_args[1] not in message: - continue - msg = io.StringIO(message) - res = "\n".join(sed.apply(msg, None)) - self.messages[sender]["messages"].add(res) - return self.send_message( - mto=sender, - mbody=res, - mtype=mtype, - ) - - except Exception as e: - print(e) - - def __init__(self, jid, password, nick="angel", autojoin=None): - """Initialize the bot.""" - ClientXMPP.__init__(self, jid, password) - self.jid = jid - self.nick = nick - self.autojoin = autojoin or [] - self.register_plugin("xep_0030") - self.register_plugin("xep_0060") - self.register_plugin("xep_0054") - self.register_plugin("xep_0045") - self.register_plugin("xep_0066") - self.register_plugin("xep_0084") - self.register_plugin("xep_0153") - self.register_plugin("xep_0363") - - self.add_event_handler("session_start", self.session_start) - self.add_event_handler("message", self.message) - self.add_event_handler("groupchat_message", self.muc_message) - # self.add_event_handler("vcard_avatar_update", self.debug_event) - # self.add_event_handler("stream_error", self.debug_event) - self.add_event_handler("disconnected", lambda _: self.connect()) - - async def session_start(self, event): - """Start the bot.""" - self.send_presence() - await self.get_roster() - await self.update_info() - for channel in self.autojoin: - try: - self.plugin["xep_0045"].join_muc(channel, self.nick) - except Exception as e: - print(e) - - async def update_info(self): - """Update the bot info.""" - with open("angel.png", "rb") as avatar_file: - avatar = avatar_file.read() - - avatar_type = "image/png" - avatar_id = self.plugin["xep_0084"].generate_id(avatar) - avatar_bytes = len(avatar) - - asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) - - asyncio.gather( - self.plugin["xep_0153"].set_avatar( - avatar=avatar, - mtype=avatar_type, - ) - ) - - info = { - "id": avatar_id, - "type": avatar_type, - "bytes": avatar_bytes, - } - - asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) - - vcard = self.plugin["xep_0054"].make_vcard() - - vcard["URL"] = "https://wiki.kalli.st/Angel" - vcard["DESC"] = "Angel is a bot that can do link previews and embeds." - vcard["NICKNAME"] = "Angel" - vcard["FN"] = "Angel" - - asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) - - async def message(self, msg): - """Process a message.""" - if msg["type"] in ("chat", "normal"): - edit = "urn:xmpp:message-correct:0" in str(msg) - if edit: - return - - mtype = msg["type"] - sender = msg["from"].bare - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - - self.sed_command(msg, sender, mtype) - - async def muc_message(self, msg): - """Process a groupchat message.""" - if msg["type"] in ("groupchat", "normal"): - edit = "urn:xmpp:message-correct:0" in str(msg) - - if edit: - return - - if msg["mucnick"] == self.nick: - return - - mtype = msg["type"] - sender = msg["from"].bare - - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - - self.sed_command(msg, sender, mtype) - - -if __name__ == "__main__": - config = configparser.ConfigParser() - config.read("config.ini") - jid = config["angel"]["jid"] - password = config["angel"]["password"] - autojoin = config["angel"]["autojoin"].split() - nick = config["angel"]["nick"] - bot = AngelBot(jid, password, nick=nick, autojoin=autojoin) - - bot.connect() - bot.process(forever=True) +import requests +import bs4 +import yt_dlp as youtube_dl +import random +import configparser +import re +import io +import os +import asyncio +from collections import defaultdict +from PythonSed import Sed +from slixmpp import ClientXMPP +from urllib.parse import urlparse, parse_qs, urlunparse +from pantomime import normalize_mimetype +import cgi + +sed_parse = re.compile("(? self.size: + self.pop() + + +def get_youtube_title(url): + """Get the title of a youtube video.""" + try: + info = ydl.extract_info(url, download=False) + return info["title"] + except Exception as e: + print(e) + return None + + +def get_invidious_link(yurl): + """Get an invidious link from a youtube link.""" + video = yurl.split("/")[-1] + instance = random.choice(invidious_instances) + return f"https://{instance}/watch?v={video}" + + +def get_yurl(path): + """Get a youtube link from a path.""" + yurl = f"https://youtu.be/{path}" + return yurl + + +class AngelBot(ClientXMPP): + """AngelBot class.""" + + messages = defaultdict( + lambda: { + "messages": Lifo(100), + "links": Lifo(10), + "previews": Lifo(10), + } + ) + + def get_urls(self, msg): + """Get urls from a message.""" + str_list = msg["body"].strip().split() + urls = [u for u in str_list if any(r in u for r in req_list)] + return urls + + def send_youtube_info(self, uri, sender, mtype): + """Send youtube info to the sender.""" + yurl = None + if uri.netloc == youtube_link: + yurl = get_yurl(uri.path) + elif "v" in (query := parse_qs(uri.query)): + if v := query["v"]: + yurl = get_yurl(v[0]) + else: + return + + invidious = get_invidious_link(yurl) + + if output := get_youtube_title(invidious): + if output in self.messages[sender]["previews"]: + return + self.messages[sender]["previews"].add(output) + + self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) + self.send_message(mto=sender, mbody=invidious, mtype=mtype) + + async def parse_uri(self, uri, sender, mtype): + """Parse a uri and send the result to the sender.""" + netloc = uri.netloc + if netloc in (youtube_links + [youtube_link]): + self.send_youtube_info(uri, sender, mtype) + elif netloc.split(":")[0] in block_list: + return + else: + await self.process_link(uri, sender, mtype) + + async def process_link(self, uri, sender, mtype): + """Process a link and send the result to the sender.""" + url = urlunparse(uri) + r = requests.get(url, stream=True, headers=headers, timeout=6) + if not r.ok: + return + + ftype = normalize_mimetype(r.headers.get("content-type")) + + if not ftype: + return + + if ftype in html_files: + data = "" + for i in r.iter_content(chunk_size=1024, decode_unicode=False): + data += i.decode("utf-8", errors="ignore") + if len(data) > data_limit or "" in data.lower(): + break + soup = bs4.BeautifulSoup(data, parser) + if title := soup.find("title"): + output = title.text.strip() + if output: + output = f"*{output}*" if ("\n" not in output) else output + if output in self.messages[sender]["previews"]: + return + + self.messages[sender]["previews"].add(output) + if r.history: + self.send_message(mto=sender, mbody=r.url, mtype=mtype) + self.send_message(mto=sender, mbody=output, mtype=mtype) + + else: + try: + lenght = 0 + outfile = io.BytesIO() + for chunk in r.iter_content( + chunk_size=512, + decode_unicode=False, + ): + lenght += 512 + if lenght >= data_limit: + return + outfile.write(chunk) + + content_disposition = r.headers.get("content-disposition") + filename = None + if content_disposition: + _, params = cgi.parse_header(content_disposition) + filename = params.get("filename") + else: + filename = os.path.basename(uri.path) + + ext = os.path.splitext(filename)[1] if filename else ".txt" + fname = filename if filename else f"file{ext}" + await self.embed_file(url, sender, mtype, ftype, fname, outfile) + except Exception as e: + print(e) + + async def embed_file(self, url, sender, mtype, ftype, fname, outfile): + """Embed a file and send the result to the sender.""" + furl = await self.plugin["xep_0363"].upload_file( + fname, content_type=ftype, input_file=outfile + ) + message = self.make_message(sender) + message["body"] = furl + message["type"] = mtype + message["oob"]["url"] = furl + message.send() + + async def parse_urls(self, msg, urls, sender, mtype): + """Parse urls and send the result to the sender.""" + body = msg["body"].lower() + if "nsfw" in body: return + if "nsfl" in body: return + for u in urls: + if u in self.messages[sender]["links"]: + continue + else: + self.messages[sender]["links"].add(u) + + uri = urlparse(u) + await self.parse_uri(uri, sender, mtype) + + def sed_command(self, msg, sender, mtype): + """Process sed command.""" + try: + text = msg["body"] + if not sed_cmd.match(text): + self.messages[sender]["messages"].add(text) + return + sed_args = sed_parse.split(text) + + if len(sed_args) < 4: + return + + sed = Sed() + sed.load_string(text) + + for message in self.messages[sender]["messages"]: + if sed_args[1] not in message: + continue + msg = io.StringIO(message) + res = "\n".join(sed.apply(msg, None)) + self.messages[sender]["messages"].add(res) + return self.send_message( + mto=sender, + mbody=res, + mtype=mtype, + ) + + except Exception as e: + print(e) + + def __init__(self, jid, password, nick="angel", autojoin=None): + """Initialize the bot.""" + ClientXMPP.__init__(self, jid, password) + self.jid = jid + self.nick = nick + self.autojoin = autojoin or [] + self.register_plugin("xep_0030") + self.register_plugin("xep_0060") + self.register_plugin("xep_0054") + self.register_plugin("xep_0045") + self.register_plugin("xep_0066") + self.register_plugin("xep_0084") + self.register_plugin("xep_0153") + self.register_plugin("xep_0363") + + self.add_event_handler("session_start", self.session_start) + self.add_event_handler("message", self.message) + self.add_event_handler("groupchat_message", self.muc_message) + # self.add_event_handler("vcard_avatar_update", self.debug_event) + # self.add_event_handler("stream_error", self.debug_event) + self.add_event_handler("disconnected", lambda _: self.connect()) + + async def session_start(self, event): + """Start the bot.""" + self.send_presence() + await self.get_roster() + await self.update_info() + for channel in self.autojoin: + try: + self.plugin["xep_0045"].join_muc(channel, self.nick) + except Exception as e: + print(e) + + async def update_info(self): + """Update the bot info.""" + with open("angel.png", "rb") as avatar_file: + avatar = avatar_file.read() + + avatar_type = "image/png" + avatar_id = self.plugin["xep_0084"].generate_id(avatar) + avatar_bytes = len(avatar) + + asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) + + asyncio.gather( + self.plugin["xep_0153"].set_avatar( + avatar=avatar, + mtype=avatar_type, + ) + ) + + info = { + "id": avatar_id, + "type": avatar_type, + "bytes": avatar_bytes, + } + + asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) + + vcard = self.plugin["xep_0054"].make_vcard() + + vcard["URL"] = "https://wiki.kalli.st/Angel" + vcard["DESC"] = "Angel is a bot that can do link previews and embeds." + vcard["NICKNAME"] = "Angel" + vcard["FN"] = "Angel" + + asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) + + async def message(self, msg): + """Process a message.""" + if msg["type"] in ("chat", "normal"): + edit = "urn:xmpp:message-correct:0" in str(msg) + if edit: + return + + mtype = msg["type"] + sender = msg["from"].bare + + try: + if not msg["oob"]["url"]: + if urls := self.get_urls(msg): + await self.parse_urls(msg, urls, sender, mtype) + except Exception as e: + print(e) + + self.sed_command(msg, sender, mtype) + + async def muc_message(self, msg): + """Process a groupchat message.""" + if msg["type"] in ("groupchat", "normal"): + edit = "urn:xmpp:message-correct:0" in str(msg) + + if edit: + return + + if msg["mucnick"] == self.nick: + return + + mtype = msg["type"] + sender = msg["from"].bare + + + try: + if not msg["oob"]["url"]: + if urls := self.get_urls(msg): + await self.parse_urls(msg, urls, sender, mtype) + except Exception as e: + print(e) + + self.sed_command(msg, sender, mtype) + + +if __name__ == "__main__": + config = configparser.ConfigParser() + config.read("config.ini") + jid = config["angel"]["jid"] + password = config["angel"]["password"] + autojoin = config["angel"]["autojoin"].split() + nick = config["angel"]["nick"] + bot = AngelBot(jid, password, nick=nick, autojoin=autojoin) + + bot.connect() + bot.process(forever=True)