From cecf31720c63472ad56d6fcffc797bfd56b68557 Mon Sep 17 00:00:00 2001 From: Kyle Czar Date: Wed, 9 Apr 2025 21:31:37 -0300 Subject: [PATCH] gardening --- angel.py | 353 +++++++++++++++++++++++-------------------------------- main.py | 177 +++++++++++++++++++++++----- 2 files changed, 293 insertions(+), 237 deletions(-) diff --git a/angel.py b/angel.py index cc9ebb1..a98036d 100644 --- a/angel.py +++ b/angel.py @@ -1,37 +1,7 @@ -import requests -import bs4 -import yt_dlp as youtube_dl -import random -import re -import os import asyncio from collections import defaultdict from slixmpp import ClientXMPP -from urllib.parse import urlparse, parse_qs, urlunparse -from pantomime import normalize_mimetype -import cgi -import ipaddress -import io - -parser = "html.parser" -user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)" -" Gecko/20100101 Firefox/10.0" -accept_lang = "en-US" -data_limit = 100000000 # 100MB - -headers = { - "user-agent": user_agent, - "Accept-Language": accept_lang, - "Cache-Control": "no-cache", -} - -youtube_link = "youtu.be" - -ydl = youtube_dl.YoutubeDL() - -req_list = ("http://", "https://") - -html_files = ("text/html", "application/xhtml+xml") +from slixmpp.stanza import Message class Lifo(list): """Limited size LIFO array to store messages and urls.""" @@ -47,30 +17,24 @@ class Lifo(list): if len(self) > self.size: self.pop() +def create_messages_dict(): + return defaultdict( + lambda: { + "messages": Lifo(100), + "links": Lifo(10), + "previews": Lifo(10), + } + ) -def get_youtube_title(url): - """Get the title of a youtube video.""" - try: - info = ydl.extract_info(url, download=False) - return info["title"] - except Exception as e: - print(e) - return None - - -def get_yurl(path): - """Get a youtube link from a path.""" - yurl = f"https://youtu.be/{path}" - return yurl - -# decorator to define a regex command class RegexCmd: """Regex command decorator.""" - def __init__(self, bot, pattern): + def __init__(self, bot, pattern, block=False, matcher=None): """Initialize the decorator.""" self.pattern = pattern self.bot = bot + self.block = block + self.matcher = matcher def __call__(self, func): """Call the decorator.""" @@ -81,126 +45,66 @@ class RegexCmd: class AngelBot(ClientXMPP): """AngelBot class.""" - messages = defaultdict( - lambda: { - "messages": Lifo(100), - "links": Lifo(10), - "previews": Lifo(10), - } - ) + def __init__(self, jid, password, nick="angel", autojoin=None, + youtube_links=None, + invidious_instances=None): + """Initialize the bot.""" + super().__init__(jid, password) + self.jid = jid + self.nick = nick + self.autojoin = autojoin or [] + self.invidious_instances = invidious_instances or [] + self.youtube_links = youtube_links or [] + self.messages = create_messages_dict() + self.register_plugins() + self.add_handlers() + + def reply(self, msg, body): + """Reply to a message.""" + self.save_message_history(msg) + self.raw_reply(msg, body) + + def raw_reply(self, msg, body): + """Reply to a message without saving history.""" + self.send_message( + mto=msg["from"].bare, + mbody=body, + mtype=msg["type"], + ) + + def save_message_history(self, msg): + """Save the history of messages.""" + sender = msg["from"].bare + self.messages[sender]["messages"].add(msg["body"]) + + def get_message_history(self, msg): + """Get the messages from the sender.""" + sender = msg["from"].bare + return self.messages[sender]["messages"] + + def save_link_history(self, msg, url): + """Save the history of links.""" + sender = msg["from"].bare + self.messages[sender]["links"].add(url) + + def get_link_history(self, msg): + """Get the links from the sender.""" + sender = msg["from"].bare + return self.messages[sender]["links"] + + def save_preview_history(self, msg, preview): + """Save the history of previews.""" + sender = msg["from"].bare + self.messages[sender]["previews"].add(preview) + + def get_preview_history(self, msg): + """Get the previews from the sender.""" + sender = msg["from"].bare + return self.messages[sender]["previews"] regex_cmds = [] - def get_urls(self, msg): - """Get urls from a message.""" - str_list = msg["body"].strip().split() - urls = [u for u in str_list if any(r in u for r in req_list)] - return urls - - def get_invidious_link(self, yurl): - """Get an invidious link from a youtube link.""" - video = yurl.split("/")[-1] - instance = random.choice(self.invidious_instances) - return f"https://{instance}/watch?v={video}" - - - def send_youtube_info(self, uri, sender, mtype): - """Send youtube info to the sender.""" - yurl = None - if uri.netloc == youtube_link: - yurl = get_yurl(uri.path) - elif "v" in (query := parse_qs(uri.query)): - if v := query["v"]: - yurl = get_yurl(v[0]) - else: - return - - invidious = self.get_invidious_link(yurl) - - if output := get_youtube_title(invidious): - if output in self.messages[sender]["previews"]: - return - self.messages[sender]["previews"].add(output) - - self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype) - self.send_message(mto=sender, mbody=invidious, mtype=mtype) - - async def parse_uri(self, uri, sender, mtype): - """Parse a uri and send the result to the sender.""" - netloc = uri.netloc - if self.invidious_instances and netloc in (self.youtube_links + [youtube_link]): - self.send_youtube_info(uri, sender, mtype) - return - try: - if ipaddress.ip_address(netloc.split(":")[0]).is_private: - return - except ValueError: - pass - - await self.process_link(uri, sender, mtype) - - async def process_link(self, uri, sender, mtype): - """Process a link and send the result to the sender.""" - url = urlunparse(uri) - r = requests.get(url, stream=True, headers=headers, timeout=6) - if not r.ok: - return - - ftype = normalize_mimetype(r.headers.get("content-type")) - - if not ftype: - return - - if ftype in html_files: - data = "" - for i in r.iter_content(chunk_size=1024, decode_unicode=False): - data += i.decode("utf-8", errors="ignore") - if len(data) > data_limit or "" in data.lower(): - break - soup = bs4.BeautifulSoup(data, parser) - if title := soup.find("title"): - output = title.text.strip() - if output: - output = f"*{output}*" if ("\n" not in output) else output - if output in self.messages[sender]["previews"]: - return - - self.messages[sender]["previews"].add(output) - if r.history: - self.send_message(mto=sender, mbody=r.url, mtype=mtype) - self.send_message(mto=sender, mbody=output, mtype=mtype) - - else: - try: - lenght = 0 - outfile = io.BytesIO() - for chunk in r.iter_content( - chunk_size=512, - decode_unicode=False, - ): - lenght += 512 - if lenght >= data_limit: - return - outfile.write(chunk) - - content_disposition = r.headers.get("content-disposition") - filename = None - if content_disposition: - _, params = cgi.parse_header(content_disposition) - filename = params.get("filename") - if params.get("filename*"): - filename = params.get("filename*") - filename = filename.split("''")[-1] - else: - filename = os.path.basename(uri.path) - - ext = os.path.splitext(filename)[1] if filename else ".txt" - fname = filename if filename else f"file{ext}" - await self.embed_file(url, sender, mtype, ftype, fname, outfile) - except Exception as e: - print(e) - - async def embed_file(self, url, sender, mtype, ftype, fname, outfile): + async def embed_file(self, sender, mtype, ftype, fname, outfile): """Embed a file and send the result to the sender.""" furl = await self.plugin["xep_0363"].upload_file( fname, content_type=ftype, input_file=outfile @@ -212,32 +116,7 @@ class AngelBot(ClientXMPP): message["oob"]["url"] = furl message.send() - async def parse_urls(self, msg, urls, sender, mtype): - """Parse urls and send the result to the sender.""" - body = msg["body"].lower() - if "nsfw" in body: return - if "nsfl" in body: return - for u in urls: - if u in self.messages[sender]["links"]: - continue - else: - self.messages[sender]["links"].add(u) - - uri = urlparse(u) - await self.parse_uri(uri, sender, mtype) - - def __init__(self, jid, password, nick="angel", autojoin=None, - youtube_links=None, - invidious_instances=None): - """Initialize the bot.""" - ClientXMPP.__init__(self, jid, password) - self.jid = jid - self.nick = nick - self.autojoin = autojoin or [] - self.invidious_instances = invidious_instances or [] - self.youtube_links = youtube_links or [] - - + def register_plugins(self): self.register_plugin("xep_0030") self.register_plugin("xep_0060") self.register_plugin("xep_0054") @@ -247,6 +126,7 @@ class AngelBot(ClientXMPP): self.register_plugin("xep_0153") self.register_plugin("xep_0363") + def add_handlers(self): self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.add_event_handler("groupchat_message", self.muc_message) @@ -310,14 +190,6 @@ class AngelBot(ClientXMPP): mtype = msg["type"] sender = msg["from"].bare - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - self.process_commands(msg, sender, mtype) async def muc_message(self, msg): @@ -334,14 +206,6 @@ class AngelBot(ClientXMPP): mtype = msg["type"] sender = msg["from"].bare - - try: - if not msg["oob"]["url"]: - if urls := self.get_urls(msg): - await self.parse_urls(msg, urls, sender, mtype) - except Exception as e: - print(e) - self.process_commands(msg, sender, mtype) @@ -349,5 +213,78 @@ class AngelBot(ClientXMPP): """Process commands.""" for cmd in self.regex_cmds: if cmd.pattern.match(msg["body"]): - return cmd.func(self, msg, sender, mtype) + ctx = CommandContext(self, msg) + if cmd.matcher and not cmd.matcher(ctx): + continue + cmd.func(ctx) + if(cmd.block): + return self.messages[sender]["messages"].add(msg["body"]) + +class CommandContext: + """Command context.""" + + def __init__(self, bot: AngelBot, msg: Message): + """Initialize the command context.""" + self.bot = bot + self.msg = msg + + def reply(self, body): + """Get the reply function.""" + return self.bot.reply(self.msg, body) + + @property + def sender(self): + """Get the sender of the message.""" + return self.msg["from"].bare + + @property + def mtype(self): + """Get the message type.""" + return self.msg["type"] + + @property + def body(self): + """Get the message body.""" + return self.msg["body"] + + @property + def raw_reply(self, body): + """Get the raw reply function.""" + return self.bot.raw_reply(self.msg, body) + + @property + def message_history(self): + """Get the message history.""" + return self.bot.get_message_history(self.msg) + + @property + def link_history(self): + """Get the link history.""" + return self.bot.get_link_history(self.msg) + + @property + def preview_history(self): + """Get the preview history.""" + return self.bot.get_preview_history(self.msg) + + def save_link_history(self, url): + """Save the link history.""" + self.bot.save_link_history(self.msg, url) + + def save_message_history(self): + """Save the message history.""" + self.bot.save_message_history(self.msg) + + def save_preview_history(self, preview): + """Save the preview history.""" + self.bot.save_preview_history(self.msg, preview) + + @property + def is_oob(self): + """Check if the message is OOB.""" + return bool(self.msg["oob"]["url"]) + + def embed_file(self, ftype, fname, outfile): + """Embed a file and send the result to the sender.""" + asyncio.gather(self.bot.embed_file(self.sender, self.mtype, ftype, fname, outfile)) diff --git a/main.py b/main.py index ef5785d..92d0657 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,15 @@ -from angel import AngelBot, RegexCmd +from angel import AngelBot, RegexCmd, CommandContext from configparser import ConfigParser from PythonSed import Sed import re import io +from urllib.parse import urlparse, parse_qs, urlunparse +from pantomime import normalize_mimetype +import cgi +import ipaddress +import bs4 +import requests +import os sed_parse = re.compile("(? bool: + if ctx.is_oob: + return False + body = ctx.body.lower() + return "nsfw" not in body and "nsfl" not in body + +@RegexCmd(bot, sed_cmd, block=True) +def sed_command(ctx: CommandContext): """Process sed command.""" try: - text = msg["body"] - if not sed_cmd.match(text): - bot.messages[sender]["messages"].add(text) - return + text = ctx.body sed_args = sed_parse.split(text) - - if len(sed_args) < 4: - return - sed = Sed() sed.load_string(text) - - for message in bot.messages[sender]["messages"]: - if not re.search(sed_args[1], message): + pattern = re.compile(sed_args[1]) + for history_message in ctx.message_history: + if not pattern.search(history_message): continue - msg = io.StringIO(message) - res = "\n".join(sed.apply(msg, None)) - bot.messages[sender]["messages"].add(res) - return bot.send_message( - mto=sender, - mbody=res, - mtype=mtype, - ) + msg = io.StringIO(history_message) + response = "\n".join(sed.apply(msg, None)) + return ctx.reply(response) except Exception as e: print(e) -# ping command @RegexCmd(bot, re.compile(r"^ping$")) -def ping_command(bot, msg, sender, mtype): +def ping_command(ctx: CommandContext): """Process ping command.""" - bot.send_message( - mto=sender, - mbody="pong", - mtype=mtype, - ) + ctx.reply("pong") +@RegexCmd(bot, re.compile(r"^https?://"), matcher=default_matcher) +def url_command(ctx: CommandContext): + """Process url command.""" + urls = get_urls(ctx.body) + if not urls: + return + parse_urls(ctx, urls) + + +# URL parsing + +req_list = ("http://", "https://") + +html_files = ("text/html", "application/xhtml+xml") + +parser = "html.parser" +user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)" +" Gecko/20100101 Firefox/10.0" +accept_lang = "en-US" +data_limit = 100000000 # 100MB + +headers = { + "user-agent": user_agent, + "Accept-Language": accept_lang, + "Cache-Control": "no-cache", +} + +def get_urls(body): + """Get urls from a message.""" + str_list = body.strip().split() + urls = [u for u in str_list if any(r in u for r in req_list)] + return urls + +def is_private(uri): + """Check if a uri is private.""" + netloc = uri.netloc + try: + if ipaddress.ip_address(netloc.split(":")[0]).is_private: + return True + except ValueError: + pass + return False + +def preview_page(ctx: CommandContext, r): + data = "" + + for i in r.iter_content(chunk_size=1024, decode_unicode=False): + data += i.decode("utf-8", errors="ignore") + if len(data) > data_limit or "" in data.lower(): + break + soup = bs4.BeautifulSoup(data, parser) + if title := soup.find("title"): + output = title.text.strip() + if output: + output = f"*{output}*" if ("\n" not in output) else output + if output in ctx.preview_history: + return + + ctx.save_preview_history(output) + + if r.history: + ctx.raw_reply(r.url) + + ctx.reply(output) + +def preview_file(ctx: CommandContext, uri, ftype, r): + try: + lenght = 0 + outfile = io.BytesIO() + for chunk in r.iter_content( + chunk_size=512, + decode_unicode=False, + ): + lenght += 512 + if lenght >= data_limit: + return + outfile.write(chunk) + + content_disposition = r.headers.get("content-disposition") + filename = None + if content_disposition: + _, params = cgi.parse_header(content_disposition) + filename = params.get("filename") + if params.get("filename*"): + filename = params.get("filename*") + filename = filename.split("''")[-1] + else: + filename = os.path.basename(uri.path) + + ext = os.path.splitext(filename)[1] if filename else ".txt" + fname = filename if filename else f"file{ext}" + ctx.embed_file(ftype, fname, outfile) + except Exception as e: + print(e) + +def process_link(ctx: CommandContext, uri): + """Process a link and send the result to the sender.""" + url = urlunparse(uri) + r = requests.get(url, stream=True, headers=headers, timeout=6) + if not r.ok: + return + + ftype = normalize_mimetype(r.headers.get("content-type")) + + if not ftype: + return + + if ftype in html_files: + preview_page(ctx, r) + else: + preview_file(ctx, uri, ftype, r) + +def parse_urls(ctx: CommandContext, urls): + """Parse urls and send the result to the sender.""" + for u in urls: + if u in ctx.link_history: + continue + ctx.save_link_history(u) + uri = urlparse(u) + if is_private(uri): + continue + process_link(ctx, uri) bot.connect() bot.process(forever=True)