angel/main.py
2025-04-09 21:31:37 -03:00

187 lines
5.1 KiB
Python

from angel import AngelBot, RegexCmd, CommandContext
from configparser import ConfigParser
from PythonSed import Sed
import re
import io
from urllib.parse import urlparse, parse_qs, urlunparse
from pantomime import normalize_mimetype
import cgi
import ipaddress
import bs4
import requests
import os
sed_parse = re.compile("(?<!\\\\)[/#]")
sed_cmd = re.compile("^s[/#].*[/#].*[/#]")
config = ConfigParser()
config.read("config.ini")
jid = config["angel"]["jid"]
password = config["angel"]["password"]
autojoin = config["angel"].get("autojoin", "").split()
nick = config["angel"]["nick"]
youtube_links = config["angel"].get("youtube_links", "").split()
invidious_instances = config["angel"].get(
"invidious_instances", ""
).split()
bot = AngelBot(jid, password, nick=nick, autojoin=autojoin,
youtube_links=youtube_links,
invidious_instances=invidious_instances)
def default_matcher(ctx: CommandContext) -> bool:
if ctx.is_oob:
return False
body = ctx.body.lower()
return "nsfw" not in body and "nsfl" not in body
@RegexCmd(bot, sed_cmd, block=True)
def sed_command(ctx: CommandContext):
"""Process sed command."""
try:
text = ctx.body
sed_args = sed_parse.split(text)
sed = Sed()
sed.load_string(text)
pattern = re.compile(sed_args[1])
for history_message in ctx.message_history:
if not pattern.search(history_message):
continue
msg = io.StringIO(history_message)
response = "\n".join(sed.apply(msg, None))
return ctx.reply(response)
except Exception as e:
print(e)
@RegexCmd(bot, re.compile(r"^ping$"))
def ping_command(ctx: CommandContext):
"""Process ping command."""
ctx.reply("pong")
@RegexCmd(bot, re.compile(r"^https?://"), matcher=default_matcher)
def url_command(ctx: CommandContext):
"""Process url command."""
urls = get_urls(ctx.body)
if not urls:
return
parse_urls(ctx, urls)
# URL parsing
req_list = ("http://", "https://")
html_files = ("text/html", "application/xhtml+xml")
parser = "html.parser"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
" Gecko/20100101 Firefox/10.0"
accept_lang = "en-US"
data_limit = 100000000 # 100MB
headers = {
"user-agent": user_agent,
"Accept-Language": accept_lang,
"Cache-Control": "no-cache",
}
def get_urls(body):
"""Get urls from a message."""
str_list = body.strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def is_private(uri):
"""Check if a uri is private."""
netloc = uri.netloc
try:
if ipaddress.ip_address(netloc.split(":")[0]).is_private:
return True
except ValueError:
pass
return False
def preview_page(ctx: CommandContext, r):
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode("utf-8", errors="ignore")
if len(data) > data_limit or "</head>" in data.lower():
break
soup = bs4.BeautifulSoup(data, parser)
if title := soup.find("title"):
output = title.text.strip()
if output:
output = f"*{output}*" if ("\n" not in output) else output
if output in ctx.preview_history:
return
ctx.save_preview_history(output)
if r.history:
ctx.raw_reply(r.url)
ctx.reply(output)
def preview_file(ctx: CommandContext, uri, ftype, r):
try:
lenght = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
lenght += 512
if lenght >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
if params.get("filename*"):
filename = params.get("filename*")
filename = filename.split("''")[-1]
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
ctx.embed_file(ftype, fname, outfile)
except Exception as e:
print(e)
def process_link(ctx: CommandContext, uri):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=6)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if not ftype:
return
if ftype in html_files:
preview_page(ctx, r)
else:
preview_file(ctx, uri, ftype, r)
def parse_urls(ctx: CommandContext, urls):
"""Parse urls and send the result to the sender."""
for u in urls:
if u in ctx.link_history:
continue
ctx.save_link_history(u)
uri = urlparse(u)
if is_private(uri):
continue
process_link(ctx, uri)
bot.connect()
bot.process(forever=True)