diff --git a/angel.py b/angel.py new file mode 100644 index 0000000..cc9ebb1 --- /dev/null +++ b/angel.py @@ -0,0 +1,353 @@ +import requests +import bs4 +import yt_dlp as youtube_dl +import random +import re +import os +import asyncio +from collections import defaultdict +from slixmpp import ClientXMPP +from urllib.parse import urlparse, parse_qs, urlunparse +from pantomime import normalize_mimetype +import cgi +import ipaddress +import io + +parser = "html.parser" +user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)" +" Gecko/20100101 Firefox/10.0" +accept_lang = "en-US" +data_limit = 100000000 # 100MB + +headers = { + "user-agent": user_agent, + "Accept-Language": accept_lang, + "Cache-Control": "no-cache", +} + +youtube_link = "youtu.be" + +ydl = youtube_dl.YoutubeDL() + +req_list = ("http://", "https://") + +html_files = ("text/html", "application/xhtml+xml") + +class Lifo(list): + """Limited size LIFO array to store messages and urls.""" + + def __init__(self, size): + """Initialize the LIFO array.""" + super().__init__() + self.size = size + + def add(self, item): + """Add an item to the LIFO array.""" + self.insert(0, item) + if len(self) > self.size: + self.pop() + + +def get_youtube_title(url): + """Get the title of a youtube video.""" + try: + info = ydl.extract_info(url, download=False) + return info["title"] + except Exception as e: + print(e) + return None + + +def get_yurl(path): + """Get a youtube link from a path.""" + yurl = f"https://youtu.be/{path}" + return yurl + +# decorator to define a regex command +class RegexCmd: + """Regex command decorator.""" + + def __init__(self, bot, pattern): + """Initialize the decorator.""" + self.pattern = pattern + self.bot = bot + + def __call__(self, func): + """Call the decorator.""" + self.bot.regex_cmds.append(self) + self.func = func + return self + +class AngelBot(ClientXMPP): + """AngelBot class.""" + + messages = defaultdict( + lambda: { + "messages": Lifo(100), + "links": Lifo(10), + "previews": Lifo(10), + } + ) + + regex_cmds = [] + + def get_urls(self, msg): + """Get urls from a message.""" + str_list = msg["body"].strip().split() + urls = [u for u in str_list if any(r in u for r in req_list)] + return urls + + def get_invidious_link(self, yurl): + """Get an invidious link from a youtube link.""" + video = yurl.split("/")[-1] + instance = random.choice(self.invidious_instances) + return f"https://{instance}/watch?v={video}" + + + def send_youtube_info(self, uri, sender, mtype): + """Send youtube info to the sender.""" + yurl = None + if uri.netloc == youtube_link: + yurl = get_yurl(uri.path) + elif "v" in (query := parse_qs(uri.query)): + if v := query["v"]: + yurl = get_yurl(v[0]) + else: + return + + invidious = self.get_invidious_link(yurl) + + if output := get_youtube_title(invidious): + if output in self.messages[sender]["previews"]: + return + self.messages[sender]["previews"].add(output) + + self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) + self.send_message(mto=sender, mbody=invidious, mtype=mtype) + + async def parse_uri(self, uri, sender, mtype): + """Parse a uri and send the result to the sender.""" + netloc = uri.netloc + if self.invidious_instances and netloc in (self.youtube_links + [youtube_link]): + self.send_youtube_info(uri, sender, mtype) + return + try: + if ipaddress.ip_address(netloc.split(":")[0]).is_private: + return + except ValueError: + pass + + await self.process_link(uri, sender, mtype) + + async def process_link(self, uri, sender, mtype): + """Process a link and send the result to the sender.""" + url = urlunparse(uri) + r = requests.get(url, stream=True, headers=headers, timeout=6) + if not r.ok: + return + + ftype = normalize_mimetype(r.headers.get("content-type")) + + if not ftype: + return + + if ftype in html_files: + data = "" + for i in r.iter_content(chunk_size=1024, decode_unicode=False): + data += i.decode("utf-8", errors="ignore") + if len(data) > data_limit or "" in data.lower(): + break + soup = bs4.BeautifulSoup(data, parser) + if title := soup.find("title"): + output = title.text.strip() + if output: + output = f"*{output}*" if ("\n" not in output) else output + if output in self.messages[sender]["previews"]: + return + + self.messages[sender]["previews"].add(output) + if r.history: + self.send_message(mto=sender, mbody=r.url, mtype=mtype) + self.send_message(mto=sender, mbody=output, mtype=mtype) + + else: + try: + lenght = 0 + outfile = io.BytesIO() + for chunk in r.iter_content( + chunk_size=512, + decode_unicode=False, + ): + lenght += 512 + if lenght >= data_limit: + return + outfile.write(chunk) + + content_disposition = r.headers.get("content-disposition") + filename = None + if content_disposition: + _, params = cgi.parse_header(content_disposition) + filename = params.get("filename") + if params.get("filename*"): + filename = params.get("filename*") + filename = filename.split("''")[-1] + else: + filename = os.path.basename(uri.path) + + ext = os.path.splitext(filename)[1] if filename else ".txt" + fname = filename if filename else f"file{ext}" + await self.embed_file(url, sender, mtype, ftype, fname, outfile) + except Exception as e: + print(e) + + async def embed_file(self, url, sender, mtype, ftype, fname, outfile): + """Embed a file and send the result to the sender.""" + furl = await self.plugin["xep_0363"].upload_file( + fname, content_type=ftype, input_file=outfile + ) + self.messages[sender]["links"].add(furl) + message = self.make_message(sender) + message["body"] = furl + message["type"] = mtype + message["oob"]["url"] = furl + message.send() + + async def parse_urls(self, msg, urls, sender, mtype): + """Parse urls and send the result to the sender.""" + body = msg["body"].lower() + if "nsfw" in body: return + if "nsfl" in body: return + for u in urls: + if u in self.messages[sender]["links"]: + continue + else: + self.messages[sender]["links"].add(u) + + uri = urlparse(u) + await self.parse_uri(uri, sender, mtype) + + def __init__(self, jid, password, nick="angel", autojoin=None, + youtube_links=None, + invidious_instances=None): + """Initialize the bot.""" + ClientXMPP.__init__(self, jid, password) + self.jid = jid + self.nick = nick + self.autojoin = autojoin or [] + self.invidious_instances = invidious_instances or [] + self.youtube_links = youtube_links or [] + + + self.register_plugin("xep_0030") + self.register_plugin("xep_0060") + self.register_plugin("xep_0054") + self.register_plugin("xep_0045") + self.register_plugin("xep_0066") + self.register_plugin("xep_0084") + self.register_plugin("xep_0153") + self.register_plugin("xep_0363") + + self.add_event_handler("session_start", self.session_start) + self.add_event_handler("message", self.message) + self.add_event_handler("groupchat_message", self.muc_message) + # self.add_event_handler("vcard_avatar_update", self.debug_event) + # self.add_event_handler("stream_error", self.debug_event) + self.add_event_handler("disconnected", lambda _: self.connect()) + + async def session_start(self, event): + """Start the bot.""" + self.send_presence() + await self.get_roster() + await self.update_info() + for channel in self.autojoin: + try: + self.plugin["xep_0045"].join_muc(channel, self.nick) + except Exception as e: + print(e) + + async def update_info(self): + """Update the bot info.""" + with open("angel.png", "rb") as avatar_file: + avatar = avatar_file.read() + + avatar_type = "image/png" + avatar_id = self.plugin["xep_0084"].generate_id(avatar) + avatar_bytes = len(avatar) + + asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) + + asyncio.gather( + self.plugin["xep_0153"].set_avatar( + avatar=avatar, + mtype=avatar_type, + ) + ) + + info = { + "id": avatar_id, + "type": avatar_type, + "bytes": avatar_bytes, + } + + asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) + + vcard = self.plugin["xep_0054"].make_vcard() + + vcard["URL"] = "https://wiki.kalli.st/Angel" + vcard["DESC"] = "Angel is a bot that can do link previews and embeds." + vcard["NICKNAME"] = "Angel" + vcard["FN"] = "Angel" + + asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) + + async def message(self, msg): + """Process a message.""" + if msg["type"] in ("chat", "normal"): + edit = "urn:xmpp:message-correct:0" in str(msg) + if edit: + return + + mtype = msg["type"] + sender = msg["from"].bare + + + try: + if not msg["oob"]["url"]: + if urls := self.get_urls(msg): + await self.parse_urls(msg, urls, sender, mtype) + except Exception as e: + print(e) + + self.process_commands(msg, sender, mtype) + + async def muc_message(self, msg): + """Process a groupchat message.""" + if msg["type"] in ("groupchat", "normal"): + edit = "urn:xmpp:message-correct:0" in str(msg) + + if edit: + return + + if msg["mucnick"] == self.nick: + return + + mtype = msg["type"] + sender = msg["from"].bare + + + try: + if not msg["oob"]["url"]: + if urls := self.get_urls(msg): + await self.parse_urls(msg, urls, sender, mtype) + except Exception as e: + print(e) + + self.process_commands(msg, sender, mtype) + + + def process_commands(self, msg, sender, mtype): + """Process commands.""" + for cmd in self.regex_cmds: + if cmd.pattern.match(msg["body"]): + return cmd.func(self, msg, sender, mtype) + self.messages[sender]["messages"].add(msg["body"]) diff --git a/main.py b/main.py index 9f5a4ae..ef5785d 100644 --- a/main.py +++ b/main.py @@ -1,383 +1,68 @@ -import requests -import bs4 -import yt_dlp as youtube_dl -import random -import configparser +from angel import AngelBot, RegexCmd +from configparser import ConfigParser +from PythonSed import Sed import re import io -import os -import asyncio -from collections import defaultdict -from PythonSed import Sed -from slixmpp import ClientXMPP -from urllib.parse import urlparse, parse_qs, urlunparse -from pantomime import normalize_mimetype -import cgi -import ipaddress sed_parse = re.compile("(? self.size: - self.pop() - - -def get_youtube_title(url): - """Get the title of a youtube video.""" +@RegexCmd(bot, sed_cmd) +def sed_command(bot, msg, sender, mtype): + """Process sed command.""" try: - info = ydl.extract_info(url, download=False) - return info["title"] + text = msg["body"] + if not sed_cmd.match(text): + bot.messages[sender]["messages"].add(text) + return + sed_args = sed_parse.split(text) + + if len(sed_args) < 4: + return + + sed = Sed() + sed.load_string(text) + + for message in bot.messages[sender]["messages"]: + if not re.search(sed_args[1], message): + continue + msg = io.StringIO(message) + res = "\n".join(sed.apply(msg, None)) + bot.messages[sender]["messages"].add(res) + return bot.send_message( + mto=sender, + mbody=res, + mtype=mtype, + ) except Exception as e: print(e) - return None - -def get_yurl(path): - """Get a youtube link from a path.""" - yurl = f"https://youtu.be/{path}" - return yurl - - -class AngelBot(ClientXMPP): - """AngelBot class.""" - - messages = defaultdict( - lambda: { - "messages": Lifo(100), - "links": Lifo(10), - "previews": Lifo(10), - } +# ping command +@RegexCmd(bot, re.compile(r"^ping$")) +def ping_command(bot, msg, sender, mtype): + """Process ping command.""" + bot.send_message( + mto=sender, + mbody="pong", + mtype=mtype, ) - def get_urls(self, msg): - """Get urls from a message.""" - str_list = msg["body"].strip().split() - urls = [u for u in str_list if any(r in u for r in req_list)] - return urls - def get_invidious_link(self, yurl): - """Get an invidious link from a youtube link.""" - video = yurl.split("/")[-1] - instance = random.choice(self.invidious_instances) - return f"https://{instance}/watch?v={video}" - - - def send_youtube_info(self, uri, sender, mtype): - """Send youtube info to the sender.""" - yurl = None - if uri.netloc == youtube_link: - yurl = get_yurl(uri.path) - elif "v" in (query := parse_qs(uri.query)): - if v := query["v"]: - yurl = get_yurl(v[0]) - else: - return - - invidious = self.get_invidious_link(yurl) - - if output := get_youtube_title(invidious): - if output in self.messages[sender]["previews"]: - return - self.messages[sender]["previews"].add(output) - - self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) - self.send_message(mto=sender, mbody=invidious, mtype=mtype) - - async def parse_uri(self, uri, sender, mtype): - """Parse a uri and send the result to the sender.""" - netloc = uri.netloc - if self.invidious_instances and netloc in (self.youtube_links + [youtube_link]): - self.send_youtube_info(uri, sender, mtype) - return - try: - if ipaddress.ip_address(netloc.split(":")[0]).is_private: - return - except ValueError: - pass - - await self.process_link(uri, sender, mtype) - - async def process_link(self, uri, sender, mtype): - """Process a link and send the result to the sender.""" - url = urlunparse(uri) - r = requests.get(url, stream=True, headers=headers, timeout=6) - if not r.ok: - return - - ftype = normalize_mimetype(r.headers.get("content-type")) - - if not ftype: - return - - if ftype in html_files: - data = "" - for i in r.iter_content(chunk_size=1024, decode_unicode=False): - data += i.decode("utf-8", errors="ignore") - if len(data) > data_limit or "" in data.lower(): - break - soup = bs4.BeautifulSoup(data, parser) - if title := soup.find("title"): - output = title.text.strip() - if output: - output = f"*{output}*" if ("\n" not in output) else output - if output in self.messages[sender]["previews"]: - return - - self.messages[sender]["previews"].add(output) - if r.history: - self.send_message(mto=sender, mbody=r.url, mtype=mtype) - self.send_message(mto=sender, mbody=output, mtype=mtype) - - else: - try: - lenght = 0 - outfile = io.BytesIO() - for chunk in r.iter_content( - chunk_size=512, - decode_unicode=False, - ): - lenght += 512 - if lenght >= data_limit: - return - outfile.write(chunk) - - content_disposition = r.headers.get("content-disposition") - filename = None - if content_disposition: - _, params = cgi.parse_header(content_disposition) - filename = params.get("filename") - if params.get("filename*"): - filename = params.get("filename*") - filename = filename.split("''")[-1] - else: - filename = os.path.basename(uri.path) - - ext = os.path.splitext(filename)[1] if filename else ".txt" - fname = filename if filename else f"file{ext}" - await self.embed_file(url, sender, mtype, ftype, fname, outfile) - except Exception as e: - print(e) - - async def embed_file(self, url, sender, mtype, ftype, fname, outfile): - """Embed a file and send the result to the sender.""" - furl = await self.plugin["xep_0363"].upload_file( - fname, content_type=ftype, input_file=outfile - ) - self.messages[sender]["links"].add(furl) - message = self.make_message(sender) - message["body"] = furl - message["type"] = mtype - message["oob"]["url"] = furl - message.send() - - async def parse_urls(self, msg, urls, sender, mtype): - """Parse urls and send the result to the sender.""" - body = msg["body"].lower() - if "nsfw" in body: return - if "nsfl" in body: return - for u in urls: - if u in self.messages[sender]["links"]: - continue - else: - self.messages[sender]["links"].add(u) - - uri = urlparse(u) - await self.parse_uri(uri, sender, mtype) - - def sed_command(self, msg, sender, mtype): - """Process sed command.""" - try: - text = msg["body"] - if not sed_cmd.match(text): - self.messages[sender]["messages"].add(text) - return - sed_args = sed_parse.split(text) - - if len(sed_args) < 4: - return - - sed = Sed() - sed.load_string(text) - - for message in self.messages[sender]["messages"]: - if not re.search(sed_args[1], message): - continue - msg = io.StringIO(message) - res = "\n".join(sed.apply(msg, None)) - self.messages[sender]["messages"].add(res) - return self.send_message( - mto=sender, - mbody=res, - mtype=mtype, - ) - - except Exception as e: - print(e) - - def __init__(self, jid, password, nick="angel", autojoin=None, - youtube_links=None, - invidious_instances=None): - """Initialize the bot.""" - ClientXMPP.__init__(self, jid, password) - self.jid = jid - self.nick = nick - self.autojoin = autojoin or [] - self.invidious_instances = invidious_instances or [] - self.youtube_links = youtube_links or [] - - self.register_plugin("xep_0030") - self.register_plugin("xep_0060") - self.register_plugin("xep_0054") - self.register_plugin("xep_0045") - self.register_plugin("xep_0066") - self.register_plugin("xep_0084") - self.register_plugin("xep_0153") - self.register_plugin("xep_0363") - - self.add_event_handler("session_start", self.session_start) - self.add_event_handler("message", self.message) - self.add_event_handler("groupchat_message", self.muc_message) - # self.add_event_handler("vcard_avatar_update", self.debug_event) - # self.add_event_handler("stream_error", self.debug_event) - self.add_event_handler("disconnected", lambda _: self.connect()) - - async def session_start(self, event): - """Start the bot.""" - self.send_presence() - await self.get_roster() - await self.update_info() - for channel in self.autojoin: - try: - self.plugin["xep_0045"].join_muc(channel, self.nick) - except Exception as e: - print(e) - - async def update_info(self): - """Update the bot info.""" - with open("angel.png", "rb") as avatar_file: - avatar = avatar_file.read() - - avatar_type = "image/png" - avatar_id = self.plugin["xep_0084"].generate_id(avatar) - avatar_bytes = len(avatar) - - asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) - - asyncio.gather( - self.plugin["xep_0153"].set_avatar( - avatar=avatar, - mtype=avatar_type, - ) - ) - - info = { - "id": avatar_id, - "type": avatar_type, - "bytes": avatar_bytes, - } - - asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) - - vcard = self.plugin["xep_0054"].make_vcard() - - vcard["URL"] = "https://wiki.kalli.st/Angel" - vcard["DESC"] = "Angel is a bot that can do link previews and embeds." - vcard["NICKNAME"] = "Angel" - vcard["FN"] = "Angel" - - asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) - - async def message(self, msg): - """Process a message.""" - if msg["type"] in ("chat", "normal"): - edit = "urn:xmpp:message-correct:0" in str(msg) - if edit: - return - - mtype = msg["type"] - sender = msg["from"].bare - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - - self.sed_command(msg, sender, mtype) - - async def muc_message(self, msg): - """Process a groupchat message.""" - if msg["type"] in ("groupchat", "normal"): - edit = "urn:xmpp:message-correct:0" in str(msg) - - if edit: - return - - if msg["mucnick"] == self.nick: - return - - mtype = msg["type"] - sender = msg["from"].bare - - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - - self.sed_command(msg, sender, mtype) - - -if __name__ == "__main__": - config = configparser.ConfigParser() - config.read("config.ini") - jid = config["angel"]["jid"] - password = config["angel"]["password"] - autojoin = config["angel"].get("autojoin", "").split() - nick = config["angel"]["nick"] - youtube_links = config["angel"].get("youtube_links", "").split() - - invidious_instances = config["angel"].get( - "invidious_instances", "" - ).split() - - bot = AngelBot(jid, password, nick=nick, autojoin=autojoin, - youtube_links=youtube_links, - invidious_instances=invidious_instances) - - bot.connect() - bot.process(forever=True) +bot.connect() +bot.process(forever=True)