import requests import bs4 import youtube_dl import random import configparser import re import io import mimetypes import asyncio import logging from collections import defaultdict from PythonSed import Sed from slixmpp import ClientXMPP from urllib.parse import urlparse, parse_qs, urlunparse from pantomime import normalize_mimetype sed_parse = re.compile("(? self.size: self.pop() def get_youtube_title(url): try: info = ydl.extract_info(url, download=False) return info["title"] except Exception: return "" def get_invidious_link(yurl): video = yurl.split("/")[-1] instance = random.choice(invidious_instances) return f"https://{instance}/watch?v={video}" def get_yurl(path): yurl = f"https://youtu.be/{path}" return yurl class AngelBot(ClientXMPP): messages = defaultdict( lambda: { "messages": Lifo(100), "links": Lifo(10), "previews": Lifo(10), } ) def get_urls(self, msg): str_list = msg["body"].strip().split() urls = [u for u in str_list if any(r in u for r in req_list)] return urls def send_youtube_info(self, uri, sender, mtype): if uri.netloc == youtube_link: yurl = get_yurl(uri.path) elif "v" in (query := parse_qs(uri.query)): if v := query["v"]: yurl = get_yurl(v[0]) else: return if output := get_youtube_title(yurl): if output in self.messages[sender]["previews"]: return self.messages[sender]["previews"].add(output) invidious = get_invidious_link(yurl) self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) self.send_message(mto=sender, mbody=invidious, mtype=mtype) async def parse_uri(self, uri, sender, mtype): netloc = uri.netloc if netloc in (youtube_links + [youtube_link]): self.send_youtube_info(uri, sender, mtype) elif netloc.split(":")[0] in block_list: return else: await self.process_link(uri, sender, mtype) async def process_link(self, uri, sender, mtype): url = urlunparse(uri) r = requests.get(url, stream=True, headers=headers, timeout=5) if not r.ok: return ftype = normalize_mimetype(r.headers.get("content-type")) if ftype in html_files: data = "" for i in r.iter_content(chunk_size=1024, decode_unicode=False): data += i.decode("utf-8", errors="ignore") if len(data) > data_limit or "" in data.lower(): break soup = bs4.BeautifulSoup(data, parser) if title := soup.find("title"): output = title.text.strip() if output: output = f"*{output}*" if ("\n" not in output) else output if output in self.messages[sender]["previews"]: return self.messages[sender]["previews"].add(output) if r.history: self.send_message(mto=sender, mbody=r.url, mtype=mtype) self.send_message(mto=sender, mbody=output, mtype=mtype) else: try: lenght = 0 outfile = io.BytesIO() for chunk in r.iter_content( chunk_size=512, decode_unicode=False, ): lenght += 512 if lenght >= data_limit: return outfile.write(chunk) await self.embed_file(url, sender, mtype, ftype, outfile) except Exception: ... async def embed_file(self, url, sender, mtype, ftype, outfile): ext = mimetypes.guess_extension(ftype) filename = f"file{ext}" furl = await self.plugin["xep_0363"].upload_file( filename, content_type=ftype, input_file=outfile ) message = self.make_message(sender) message["body"] = furl message["type"] = mtype message["oob"]["url"] = furl message.send() async def parse_urls(self, msg, urls, sender, mtype): if "nsfw" in msg["body"].lower(): return for u in urls: if u in self.messages[sender]["links"]: continue else: self.messages[sender]["links"].add(u) uri = urlparse(u) await self.parse_uri(uri, sender, mtype) def sed_command(self, msg, sender, mtype): try: text = msg["body"] print(f"{text = }") if not sed_cmd.match(text): self.messages[sender]["messages"].add(text) return sed_args = sed_parse.split(text) if len(sed_args) < 4: return sed = Sed() sed.load_string(text) for message in self.messages[sender]["messages"]: message_io = io.StringIO(message) res = sed.apply(message_io, None) print(f"{ res = }") out = "\n".join(res) print(f"{ out = }") if out.strip() != message.strip(): self.messages[sender]["messages"].add(out) return self.send_message( mto=sender, mbody=out, mtype=mtype, ) except Exception as e: print(e) return def __init__(self, jid, password, nick="angel", autojoin=None): ClientXMPP.__init__(self, jid, password) self.jid = jid self.nick = nick self.autojoin = autojoin or [] self.register_plugin("xep_0030") self.register_plugin("xep_0060") self.register_plugin("xep_0054") self.register_plugin("xep_0045") self.register_plugin("xep_0066") self.register_plugin("xep_0084") self.register_plugin("xep_0153") self.register_plugin("xep_0363") self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.add_event_handler("groupchat_message", self.muc_message) # self.add_event_handler("vcard_avatar_update", self.debug_event) # self.add_event_handler("stream_error", self.debug_event) self.add_event_handler("disconnected", lambda _: self.connect()) async def session_start(self, event): self.send_presence() await self.get_roster() for channel in self.autojoin: try: self.plugin["xep_0045"].join_muc(channel, self.nick) except: ... # await self.update_info() logging.info("Session started!") async def update_info(self): with open("angel.png", "rb") as avatar_file: avatar = avatar_file.read() avatar_type = "image/png" avatar_id = self.plugin["xep_0084"].generate_id(avatar) avatar_bytes = len(avatar) info = { "id": avatar_id, "type": avatar_type, "bytes": avatar_bytes, } asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar)) asyncio.gather( self.plugin["xep_0153"].set_avatar( avatar=avatar, mtype=avatar_type, ) ) asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info])) async def message(self, msg): if msg["type"] in ("chat", "normal"): mtype = "chat" sender = msg["from"].bare edit = "urn:xmpp:message-correct:0" in str(msg) if edit: return try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception: ... self.sed_command(msg, sender, mtype) async def muc_message(self, msg): if msg["type"] in ("groupchat", "normal"): mtype = "groupchat" sender = msg["from"].bare if msg["mucnick"] == self.nick: return edit = "urn:xmpp:message-correct:0" in str(msg) if edit: return try: if not msg["oob"]["url"]: if urls := self.get_urls(msg): await self.parse_urls(msg, urls, sender, mtype) except Exception: pass self.sed_command(msg, sender, mtype) if __name__ == "__main__": config = configparser.ConfigParser() config.read("config.ini") logging.basicConfig(level=logging.DEBUG) jid = config["angel"]["jid"] password = config["angel"]["password"] autojoin = config["angel"]["autojoin"].split() bot = AngelBot(jid, password, autojoin=autojoin) bot.connect() bot.process(forever=True)