import requests import bs4 import youtube_dl import random import configparser import re import io import os import mimetypes import asyncio from collections import defaultdict from PythonSed import Sed from slixmpp import ClientXMPP from urllib.parse import urlparse, parse_qs, urlunparse from pantomime import normalize_mimetype import cgi sed_parse = re.compile("(? self.size: self.pop() def get_youtube_title(url): """Get the title of a youtube video.""" try: info = ydl.extract_info(url, download=False) return info["title"] except Exception as e: print(e) return None def get_invidious_link(yurl): """Get an invidious link from a youtube link.""" video = yurl.split("/")[-1] instance = random.choice(invidious_instances) return f"https://{instance}/watch?v={video}" def get_yurl(path): """Get a youtube link from a path.""" yurl = f"https://youtu.be/{path}" return yurl class AngelBot(ClientXMPP): """AngelBot class.""" messages = defaultdict( lambda: { "messages": Lifo(100), "links": Lifo(10), "previews": Lifo(10), } ) def get_urls(self, msg): """Get urls from a message.""" str_list = msg["body"].strip().split() urls = [u for u in str_list if any(r in u for r in req_list)] return urls def send_youtube_info(self, uri, sender, mtype): """Send youtube info to the sender.""" yurl = None if uri.netloc == youtube_link: yurl = get_yurl(uri.path) elif "v" in (query := parse_qs(uri.query)): if v := query["v"]: yurl = get_yurl(v[0]) else: return if output := get_youtube_title(yurl): if output in self.messages[sender]["previews"]: return self.messages[sender]["previews"].add(output) invidious = get_invidious_link(yurl) self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) self.send_message(mto=sender, mbody=invidious, mtype=mtype) async def parse_uri(self, uri, sender, mtype): """Parse a uri and send the result to the sender.""" netloc = uri.netloc if netloc in (youtube_links + [youtube_link]): self.send_youtube_info(uri, sender, mtype) elif netloc.split(":")[0] in block_list: return else: await self.process_link(uri, sender, mtype) async def process_link(self, uri, sender, mtype): """Process a link and send the result to the sender.""" url = urlunparse(uri) r = requests.get(url, stream=True, headers=headers, timeout=6) if not r.ok: return ftype = normalize_mimetype(r.headers.get("content-type")) if not ftype: return if ftype in html_files: data = "" for i in r.iter_content(chunk_size=1024, decode_unicode=False): data += i.decode("utf-8", errors="ignore") if len(data) > data_limit or "" in data.lower(): break soup = bs4.BeautifulSoup(data, parser) if title := soup.find("title"): output = title.text.strip() if output: output = f"*{output}*" if ("\n" not in output) else output if output in self.messages[sender]["previews"]: return self.messages[sender]["previews"].add(output) if r.history: self.send_message(mto=sender, mbody=r.url, mtype=mtype) self.send_message(mto=sender, mbody=output, mtype=mtype) else: try: lenght = 0 outfile = io.BytesIO() for chunk in r.iter_content( chunk_size=512, decode_unicode=False, ): lenght += 512 if lenght >= data_limit: return outfile.write(chunk) content_disposition = r.headers.get("content-disposition") filename = None if content_disposition: _, params = cgi.parse_header(content_disposition) filename = params.get("filename") else: filename = os.path.basename(uri.path) ext = os.path.splitext(filename)[1] if filename else ".txt" fname = filename if filename else f"file{ext}" await self.embed_file(url, sender, mtype, ftype, fname, outfile) except Exception as e: print(e) async def embed_file(self, url, sender, mtype, ftype, fname, outfile): """Embed a file and send the result to the sender.""" furl = await self.plugin["xep_0363"].upload_file( fname, content_type=ftype, input_file=outfile ) message = self.make_message(sender) message["body"] = furl message["type"] = mtype message["oob"]["url"] = furl message.send() async def parse_urls(self, msg, urls, sender, mtype): """Parse urls and send the result to the sender.""" body = msg["body"].lower() if "nsfw" in body: return if "nsfl" in body: return for u in urls: if u in self.messages[sender]["links"]: continue else: self.messages[sender]["links"].add(u) uri = urlparse(u) await self.parse_uri(uri, sender, mtype) def sed_command(self, msg, sender, mtype): """Process sed command.""" try: text = msg["body"] if not sed_cmd.match(text): self.messages[sender]["messages"].add(text) return sed_args = sed_parse.split(text) if len(sed_args) < 4: return sed = Sed() sed.load_string(text) for message in self.messages[sender]["messages"]: if sed_args[1] not in message: continue msg = io.StringIO(message) res = "\n".join(sed.apply(msg, None)) self.messages[sender]["messages"].add(res) return self.send_message( mto=sender, mbody=res, mtype=mtype, ) except Exception as e: print(e) def __init__(self, jid, password, nick="angel", autojoin=None): """Initialize the bot.""" ClientXMPP.__init__(self, jid, password) self.jid = jid self.nick = nick self.autojoin = autojoin or [] self.register_plugin("xep_0030") self.register_plugin("xep_0060") self.register_plugin("xep_0054") self.register_plugin("xep_0045") self.register_plugin("xep_0066") self.register_plugin("xep_0084") self.register_plugin("xep_0153") self.register_plugin("xep_0363") self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.add_event_handler("groupchat_message", self.muc_message) # self.add_event_handler("vcard_avatar_update", self.debug_event) # self.add_event_handler("stream_error", self.debug_event) self.add_event_handler("disconnected", lambda _: self.connect()) async def session_start(self, event): """Start the bot.""" self.send_presence() await self.get_roster() await self.update_info() for channel in self.autojoin: try: self.plugin["xep_0045"].join_muc(channel, self.nick) except Exception as e: print(e) async def update_info(self): """Update the bot info.""" with open("angel.png", "rb") as avatar_file: avatar = avatar_file.read() avatar_type = "image/png" avatar_id = self.plugin["xep_0084"].generate_id(avatar) avatar_bytes = len(avatar) info = { "id": avatar_id, "type": avatar_type, "bytes": avatar_bytes, } vcard = self.plugin["xep_0054"].make_vcard() vcard["URL"] = "https://gt.kalli.st/czar/angel" vcard["DESC"] = "Angel is a bot that can do link previews and embeds." vcard["NICKNAME"] = "Angel" vcard["FN"] = "Angel" vcard["PHOTO"] = info asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) asyncio.gather( self.plugin["xep_0153"].set_avatar( avatar=avatar, mtype=avatar_type, ) ) asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) async def message(self, msg): """Process a message.""" if msg["type"] in ("chat", "normal"): edit = "urn:xmpp:message-correct:0" in str(msg) if edit: return mtype = msg["type"] sender = msg["from"].bare try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception as e: print(e) self.sed_command(msg, sender, mtype) async def muc_message(self, msg): """Process a groupchat message.""" if msg["type"] in ("groupchat", "normal"): edit = "urn:xmpp:message-correct:0" in str(msg) if edit: return if msg["mucnick"] == self.nick: return mtype = msg["type"] sender = msg["from"].bare try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception as e: print(e) self.sed_command(msg, sender, mtype) if __name__ == "__main__": config = configparser.ConfigParser() config.read("config.ini") jid = config["angel"]["jid"] password = config["angel"]["password"] autojoin = config["angel"]["autojoin"].split() nick = config["angel"]["nick"] bot = AngelBot(jid, password, nick=nick, autojoin=autojoin) bot.connect() bot.process(forever=True)