from angel import AngelBot, RegexCmd, CommandContext from configparser import ConfigParser from PythonSed import Sed import re import io from urllib.parse import urlparse, urlunparse from pantomime import normalize_mimetype import cgi import ipaddress import bs4 import requests import os import gusmobile import yt_dlp as youtube_dl import random sed_parse = re.compile('(? bool: if ctx.is_oob: return False body = ctx.body.lower() return 'nsfw' not in body and 'nsfl' not in body @RegexCmd(bot, sed_cmd, block=True) def sed_command(ctx: CommandContext): """Process sed command.""" try: text = ctx.body sed_args = sed_parse.split(text) sed = Sed() sed.load_string(text) pattern = re.compile(sed_args[1]) for history_message in ctx.message_history: if not pattern.search(history_message): continue msg = io.StringIO(history_message) response = '\n'.join(sed.apply(msg, None)) return ctx.reply(response) except Exception as e: print(e) @RegexCmd(bot, re.compile(r'^ping$')) def ping_command(ctx: CommandContext): """Process ping command.""" ctx.reply('pong') @RegexCmd(bot, url_cmd, matcher=default_matcher) def url_command(ctx: CommandContext): """Process url command.""" urls = get_urls(ctx.body) + get_gemini_urls(ctx.body) if not urls: return parse_urls(ctx, urls) # URL parsing req_list = ('http://', 'https://') gemini_links = ('gemini://',) html_files = ('text/html', 'application/xhtml+xml', 'text/xml') html_parser = 'html.parser' xml_parser = 'xml' user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0)' ' Gecko/20100101 Firefox/10.0' accept_lang = 'en-US' data_limit = 100000000 # 100MB headers = { 'user-agent': user_agent, 'Accept-Language': accept_lang, 'Cache-Control': 'no-cache', } def get_urls(body): """Get urls from a message.""" str_list = body.strip().split() urls = [u for u in str_list if any(r in u for r in req_list)] return urls def get_gemini_urls(body) -> list[str]: """Get gemini urls from a message.""" str_list = body.strip().split() urls = [u for u in str_list if any(r in u for r in gemini_links)] return urls def is_private(uri): """Check if a uri is private.""" netloc = uri.netloc try: if ipaddress.ip_address(netloc.split(':')[0]).is_private: return True except ValueError: pass return False def preview_page(ctx: CommandContext, r, ftype): data = '' for i in r.iter_content(chunk_size=1024, decode_unicode=False): data += i.decode('utf-8', errors='ignore') if len(data) > data_limit or '' in data.lower(): break if ftype == 'text/xml': soup = bs4.BeautifulSoup(data, xml_parser) else: soup = bs4.BeautifulSoup(data, html_parser) if title := soup.find('title'): output = title.text.strip() if output: output = f'*{output}*' if ('\n' not in output) else output if output in ctx.preview_history: return ctx.save_preview_history(output) if r.history and r.url: ctx.raw_reply(r.url) ctx.reply(output) def preview_file(ctx: CommandContext, uri, ftype, r): try: lenght = 0 outfile = io.BytesIO() for chunk in r.iter_content( chunk_size=512, decode_unicode=False, ): lenght += 512 if lenght >= data_limit: return outfile.write(chunk) content_disposition = r.headers.get('content-disposition') filename = None if content_disposition: _, params = cgi.parse_header(content_disposition) filename = params.get('filename') if params.get('filename*'): filename = params.get('filename*') filename = filename.split("''")[-1] else: filename = os.path.basename(uri.path) ext = os.path.splitext(filename)[1] if filename else '.txt' fname = filename if filename else f'file{ext}' ctx.embed_file(ftype, fname, outfile) except Exception as e: print(e) def process_http_url(ctx: CommandContext, uri): """Process a link and send the result to the sender.""" url = urlunparse(uri) r = requests.get(url, stream=True, headers=headers, timeout=6) if not r.ok: return ftype = normalize_mimetype(r.headers.get('content-type')) if not ftype: return if ftype in html_files: preview_page(ctx, r, ftype) else: preview_file(ctx, uri, ftype, r) def process_gemini_url(ctx: CommandContext, uri): url = urlunparse(uri) response = gusmobile.fetch(url) if not response: return if response.status != '20': return content: str = response.content title: str = content.strip().split('\n', 1)[0].strip() if title: ctx.reply(f'*{title.strip("#").strip()}*') def process_youtube_url(ctx: CommandContext, uri): """Process a YouTube link and send the result to the sender.""" url = urlunparse(uri) with youtube_dl.YoutubeDL() as ydl: try: info = ydl.extract_info(url, download=False) title = info.get('title', 'No title') if invidious_instances: instance = random.choice(invidious_instances) invidious_url = f'{instance}/watch?v={info["id"]}' ctx.raw_reply(invidious_url) ctx.reply(f'*{title}*') except Exception as e: print(e) def parse_urls(ctx: CommandContext, urls): """Parse urls and send the result to the sender.""" for u in urls: if u in ctx.link_history: continue ctx.save_link_history(u) uri = urlparse(u) if is_private(uri): continue if uri.scheme == 'gemini': process_gemini_url(ctx, uri) elif uri.scheme in ('http', 'https'): if any(youtube in u for youtube in youtube_links): process_youtube_url(ctx, uri) else: process_http_url(ctx, uri) bot.connect() bot.process(forever=True)