angel/main.py

251 lines
6.7 KiB
Python

from angel import AngelBot, RegexCmd, CommandContext
from configparser import ConfigParser
from PythonSed import Sed
import re
import io
from urllib.parse import urlparse, urlunparse
from pantomime import normalize_mimetype
import cgi
import ipaddress
import bs4
import requests
import os
import gusmobile
import yt_dlp as youtube_dl
import random
sed_parse = re.compile('(?<!\\\\)[/#]')
sed_cmd = re.compile('^s[/#].*[/#].*[/#]')
url_cmd = re.compile(r'gemini://|https?://')
config = ConfigParser()
config.read('config.ini')
jid = config['angel']['jid']
password = config['angel']['password']
autojoin = config['angel'].get('autojoin', '').split()
nick = config['angel']['nick']
youtube_links = config['angel'].get('youtube_links', '').split()
invidious_instances = config['angel'].get('invidious_instances', '').split()
bot = AngelBot(jid, password, nick=nick, autojoin=autojoin)
def default_matcher(ctx: CommandContext) -> bool:
if ctx.is_oob:
return False
body = ctx.body.lower()
return 'nsfw' not in body and 'nsfl' not in body
@RegexCmd(bot, sed_cmd, block=True)
def sed_command(ctx: CommandContext):
"""Process sed command."""
try:
text = ctx.body
sed_args = sed_parse.split(text)
sed = Sed()
sed.load_string(text)
pattern = re.compile(sed_args[1])
for history_message in ctx.message_history:
if not pattern.search(history_message):
continue
msg = io.StringIO(history_message)
response = '\n'.join(sed.apply(msg, None))
return ctx.reply(response)
except Exception as e:
print(e)
@RegexCmd(bot, re.compile(r'^ping$'))
def ping_command(ctx: CommandContext):
"""Process ping command."""
ctx.reply('pong')
@RegexCmd(bot, url_cmd, matcher=default_matcher)
def url_command(ctx: CommandContext):
"""Process url command."""
urls = get_urls(ctx.body) + get_gemini_urls(ctx.body)
if not urls:
return
parse_urls(ctx, urls)
# URL parsing
req_list = ('http://', 'https://')
gemini_links = ('gemini://',)
html_files = ('text/html', 'application/xhtml+xml', 'text/xml')
html_parser = 'html.parser'
xml_parser = 'xml'
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0)'
' Gecko/20100101 Firefox/10.0'
accept_lang = 'en-US'
data_limit = 100000000 # 100MB
headers = {
'user-agent': user_agent,
'Accept-Language': accept_lang,
'Cache-Control': 'no-cache',
}
def get_urls(body):
"""Get urls from a message."""
str_list = body.strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def get_gemini_urls(body) -> list[str]:
"""Get gemini urls from a message."""
str_list = body.strip().split()
urls = [u for u in str_list if any(r in u for r in gemini_links)]
return urls
def is_private(uri):
"""Check if a uri is private."""
netloc = uri.netloc
try:
if ipaddress.ip_address(netloc.split(':')[0]).is_private:
return True
except ValueError:
pass
return False
def preview_page(ctx: CommandContext, r, ftype):
data = ''
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode('utf-8', errors='ignore')
if len(data) > data_limit or '</head>' in data.lower():
break
if ftype == 'text/xml':
soup = bs4.BeautifulSoup(data, xml_parser)
else:
soup = bs4.BeautifulSoup(data, html_parser)
if title := soup.find('title'):
output = title.text.strip()
if output:
output = f'*{output}*' if ('\n' not in output) else output
if output in ctx.preview_history:
return
ctx.save_preview_history(output)
if r.history and r.url:
ctx.raw_reply(r.url)
ctx.reply(output)
def preview_file(ctx: CommandContext, uri, ftype, r):
try:
lenght = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
lenght += 512
if lenght >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get('content-disposition')
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get('filename')
if params.get('filename*'):
filename = params.get('filename*')
filename = filename.split("''")[-1]
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else '.txt'
fname = filename if filename else f'file{ext}'
ctx.embed_file(ftype, fname, outfile)
except Exception as e:
print(e)
def process_http_url(ctx: CommandContext, uri):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=6)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get('content-type'))
if not ftype:
return
if ftype in html_files:
preview_page(ctx, r, ftype)
else:
preview_file(ctx, uri, ftype, r)
def process_gemini_url(ctx: CommandContext, uri):
url = urlunparse(uri)
response = gusmobile.fetch(url)
if not response:
return
if response.status != '20':
return
content: str = response.content
title: str = content.strip().split('\n', 1)[0].strip()
if title:
ctx.reply(f'*{title.strip("#").strip()}*')
def process_youtube_url(ctx: CommandContext, uri):
"""Process a YouTube link and send the result to the sender."""
url = urlunparse(uri)
with youtube_dl.YoutubeDL() as ydl:
try:
info = ydl.extract_info(url, download=False)
title = info.get('title', 'No title')
if invidious_instances:
instance = random.choice(invidious_instances)
invidious_url = f'{instance}/watch?v={info["id"]}'
ctx.raw_reply(invidious_url)
ctx.reply(f'*{title}*')
except Exception as e:
print(e)
def parse_urls(ctx: CommandContext, urls):
"""Parse urls and send the result to the sender."""
for u in urls:
if u in ctx.link_history:
continue
ctx.save_link_history(u)
uri = urlparse(u)
if is_private(uri):
continue
if uri.scheme == 'gemini':
process_gemini_url(ctx, uri)
elif uri.scheme in ('http', 'https'):
if any(youtube in u for youtube in youtube_links):
process_youtube_url(ctx, uri)
else:
process_http_url(ctx, uri)
bot.connect()
bot.process(forever=True)