reintroduce commands

This commit is contained in:
Czar 2025-04-07 15:57:52 -03:00
commit acaad7b5a1
2 changed files with 404 additions and 366 deletions

353
angel.py Normal file
View file

@ -0,0 +1,353 @@
import requests
import bs4
import yt_dlp as youtube_dl
import random
import re
import os
import asyncio
from collections import defaultdict
from slixmpp import ClientXMPP
from urllib.parse import urlparse, parse_qs, urlunparse
from pantomime import normalize_mimetype
import cgi
import ipaddress
import io
parser = "html.parser"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
" Gecko/20100101 Firefox/10.0"
accept_lang = "en-US"
data_limit = 100000000 # 100MB
headers = {
"user-agent": user_agent,
"Accept-Language": accept_lang,
"Cache-Control": "no-cache",
}
youtube_link = "youtu.be"
ydl = youtube_dl.YoutubeDL()
req_list = ("http://", "https://")
html_files = ("text/html", "application/xhtml+xml")
class Lifo(list):
"""Limited size LIFO array to store messages and urls."""
def __init__(self, size):
"""Initialize the LIFO array."""
super().__init__()
self.size = size
def add(self, item):
"""Add an item to the LIFO array."""
self.insert(0, item)
if len(self) > self.size:
self.pop()
def get_youtube_title(url):
"""Get the title of a youtube video."""
try:
info = ydl.extract_info(url, download=False)
return info["title"]
except Exception as e:
print(e)
return None
def get_yurl(path):
"""Get a youtube link from a path."""
yurl = f"https://youtu.be/{path}"
return yurl
# decorator to define a regex command
class RegexCmd:
"""Regex command decorator."""
def __init__(self, bot, pattern):
"""Initialize the decorator."""
self.pattern = pattern
self.bot = bot
def __call__(self, func):
"""Call the decorator."""
self.bot.regex_cmds.append(self)
self.func = func
return self
class AngelBot(ClientXMPP):
"""AngelBot class."""
messages = defaultdict(
lambda: {
"messages": Lifo(100),
"links": Lifo(10),
"previews": Lifo(10),
}
)
regex_cmds = []
def get_urls(self, msg):
"""Get urls from a message."""
str_list = msg["body"].strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def get_invidious_link(self, yurl):
"""Get an invidious link from a youtube link."""
video = yurl.split("/")[-1]
instance = random.choice(self.invidious_instances)
return f"https://{instance}/watch?v={video}"
def send_youtube_info(self, uri, sender, mtype):
"""Send youtube info to the sender."""
yurl = None
if uri.netloc == youtube_link:
yurl = get_yurl(uri.path)
elif "v" in (query := parse_qs(uri.query)):
if v := query["v"]:
yurl = get_yurl(v[0])
else:
return
invidious = self.get_invidious_link(yurl)
if output := get_youtube_title(invidious):
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype)
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
async def parse_uri(self, uri, sender, mtype):
"""Parse a uri and send the result to the sender."""
netloc = uri.netloc
if self.invidious_instances and netloc in (self.youtube_links + [youtube_link]):
self.send_youtube_info(uri, sender, mtype)
return
try:
if ipaddress.ip_address(netloc.split(":")[0]).is_private:
return
except ValueError:
pass
await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=6)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if not ftype:
return
if ftype in html_files:
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode("utf-8", errors="ignore")
if len(data) > data_limit or "</head>" in data.lower():
break
soup = bs4.BeautifulSoup(data, parser)
if title := soup.find("title"):
output = title.text.strip()
if output:
output = f"*{output}*" if ("\n" not in output) else output
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
if r.history:
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
self.send_message(mto=sender, mbody=output, mtype=mtype)
else:
try:
lenght = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
lenght += 512
if lenght >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
if params.get("filename*"):
filename = params.get("filename*")
filename = filename.split("''")[-1]
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception as e:
print(e)
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
"""Embed a file and send the result to the sender."""
furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile
)
self.messages[sender]["links"].add(furl)
message = self.make_message(sender)
message["body"] = furl
message["type"] = mtype
message["oob"]["url"] = furl
message.send()
async def parse_urls(self, msg, urls, sender, mtype):
"""Parse urls and send the result to the sender."""
body = msg["body"].lower()
if "nsfw" in body: return
if "nsfl" in body: return
for u in urls:
if u in self.messages[sender]["links"]:
continue
else:
self.messages[sender]["links"].add(u)
uri = urlparse(u)
await self.parse_uri(uri, sender, mtype)
def __init__(self, jid, password, nick="angel", autojoin=None,
youtube_links=None,
invidious_instances=None):
"""Initialize the bot."""
ClientXMPP.__init__(self, jid, password)
self.jid = jid
self.nick = nick
self.autojoin = autojoin or []
self.invidious_instances = invidious_instances or []
self.youtube_links = youtube_links or []
self.register_plugin("xep_0030")
self.register_plugin("xep_0060")
self.register_plugin("xep_0054")
self.register_plugin("xep_0045")
self.register_plugin("xep_0066")
self.register_plugin("xep_0084")
self.register_plugin("xep_0153")
self.register_plugin("xep_0363")
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("groupchat_message", self.muc_message)
# self.add_event_handler("vcard_avatar_update", self.debug_event)
# self.add_event_handler("stream_error", self.debug_event)
self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event):
"""Start the bot."""
self.send_presence()
await self.get_roster()
await self.update_info()
for channel in self.autojoin:
try:
self.plugin["xep_0045"].join_muc(channel, self.nick)
except Exception as e:
print(e)
async def update_info(self):
"""Update the bot info."""
with open("angel.png", "rb") as avatar_file:
avatar = avatar_file.read()
avatar_type = "image/png"
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
avatar_bytes = len(avatar)
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
asyncio.gather(
self.plugin["xep_0153"].set_avatar(
avatar=avatar,
mtype=avatar_type,
)
)
info = {
"id": avatar_id,
"type": avatar_type,
"bytes": avatar_bytes,
}
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
vcard = self.plugin["xep_0054"].make_vcard()
vcard["URL"] = "https://wiki.kalli.st/Angel"
vcard["DESC"] = "Angel is a bot that can do link previews and embeds."
vcard["NICKNAME"] = "Angel"
vcard["FN"] = "Angel"
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
async def message(self, msg):
"""Process a message."""
if msg["type"] in ("chat", "normal"):
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception as e:
print(e)
self.process_commands(msg, sender, mtype)
async def muc_message(self, msg):
"""Process a groupchat message."""
if msg["type"] in ("groupchat", "normal"):
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
if msg["mucnick"] == self.nick:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception as e:
print(e)
self.process_commands(msg, sender, mtype)
def process_commands(self, msg, sender, mtype):
"""Process commands."""
for cmd in self.regex_cmds:
if cmd.pattern.match(msg["body"]):
return cmd.func(self, msg, sender, mtype)
self.messages[sender]["messages"].add(msg["body"])

405
main.py
View file

@ -1,369 +1,13 @@
import requests
import bs4
import yt_dlp as youtube_dl
import random
import configparser
from angel import AngelBot, RegexCmd
from configparser import ConfigParser
from PythonSed import Sed
import re
import io
import os
import asyncio
from collections import defaultdict
from PythonSed import Sed
from slixmpp import ClientXMPP
from urllib.parse import urlparse, parse_qs, urlunparse
from pantomime import normalize_mimetype
import cgi
import ipaddress
sed_parse = re.compile("(?<!\\\\)[/#]")
sed_cmd = re.compile("^s[/#].*[/#].*[/#]")
parser = "html.parser"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
" Gecko/20100101 Firefox/10.0"
accept_lang = "en-US"
data_limit = 100000000 # 100MB
headers = {
"user-agent": user_agent,
"Accept-Language": accept_lang,
"Cache-Control": "no-cache",
}
youtube_link = "youtu.be"
ydl = youtube_dl.YoutubeDL()
req_list = ("http://", "https://")
html_files = ("text/html", "application/xhtml+xml")
class Lifo(list):
"""Limited size LIFO array to store messages and urls."""
def __init__(self, size):
"""Initialize the LIFO array."""
super().__init__()
self.size = size
def add(self, item):
"""Add an item to the LIFO array."""
self.insert(0, item)
if len(self) > self.size:
self.pop()
def get_youtube_title(url):
"""Get the title of a youtube video."""
try:
info = ydl.extract_info(url, download=False)
return info["title"]
except Exception as e:
print(e)
return None
def get_yurl(path):
"""Get a youtube link from a path."""
yurl = f"https://youtu.be/{path}"
return yurl
class AngelBot(ClientXMPP):
"""AngelBot class."""
messages = defaultdict(
lambda: {
"messages": Lifo(100),
"links": Lifo(10),
"previews": Lifo(10),
}
)
def get_urls(self, msg):
"""Get urls from a message."""
str_list = msg["body"].strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def get_invidious_link(self, yurl):
"""Get an invidious link from a youtube link."""
video = yurl.split("/")[-1]
instance = random.choice(self.invidious_instances)
return f"https://{instance}/watch?v={video}"
def send_youtube_info(self, uri, sender, mtype):
"""Send youtube info to the sender."""
yurl = None
if uri.netloc == youtube_link:
yurl = get_yurl(uri.path)
elif "v" in (query := parse_qs(uri.query)):
if v := query["v"]:
yurl = get_yurl(v[0])
else:
return
invidious = self.get_invidious_link(yurl)
if output := get_youtube_title(invidious):
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype)
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
async def parse_uri(self, uri, sender, mtype):
"""Parse a uri and send the result to the sender."""
netloc = uri.netloc
if self.invidious_instances and netloc in (self.youtube_links + [youtube_link]):
self.send_youtube_info(uri, sender, mtype)
return
try:
if ipaddress.ip_address(netloc.split(":")[0]).is_private:
return
except ValueError:
pass
await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=6)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if not ftype:
return
if ftype in html_files:
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode("utf-8", errors="ignore")
if len(data) > data_limit or "</head>" in data.lower():
break
soup = bs4.BeautifulSoup(data, parser)
if title := soup.find("title"):
output = title.text.strip()
if output:
output = f"*{output}*" if ("\n" not in output) else output
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
if r.history:
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
self.send_message(mto=sender, mbody=output, mtype=mtype)
else:
try:
lenght = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
lenght += 512
if lenght >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
if params.get("filename*"):
filename = params.get("filename*")
filename = filename.split("''")[-1]
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception as e:
print(e)
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
"""Embed a file and send the result to the sender."""
furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile
)
self.messages[sender]["links"].add(furl)
message = self.make_message(sender)
message["body"] = furl
message["type"] = mtype
message["oob"]["url"] = furl
message.send()
async def parse_urls(self, msg, urls, sender, mtype):
"""Parse urls and send the result to the sender."""
body = msg["body"].lower()
if "nsfw" in body: return
if "nsfl" in body: return
for u in urls:
if u in self.messages[sender]["links"]:
continue
else:
self.messages[sender]["links"].add(u)
uri = urlparse(u)
await self.parse_uri(uri, sender, mtype)
def sed_command(self, msg, sender, mtype):
"""Process sed command."""
try:
text = msg["body"]
if not sed_cmd.match(text):
self.messages[sender]["messages"].add(text)
return
sed_args = sed_parse.split(text)
if len(sed_args) < 4:
return
sed = Sed()
sed.load_string(text)
for message in self.messages[sender]["messages"]:
if not re.search(sed_args[1], message):
continue
msg = io.StringIO(message)
res = "\n".join(sed.apply(msg, None))
self.messages[sender]["messages"].add(res)
return self.send_message(
mto=sender,
mbody=res,
mtype=mtype,
)
except Exception as e:
print(e)
def __init__(self, jid, password, nick="angel", autojoin=None,
youtube_links=None,
invidious_instances=None):
"""Initialize the bot."""
ClientXMPP.__init__(self, jid, password)
self.jid = jid
self.nick = nick
self.autojoin = autojoin or []
self.invidious_instances = invidious_instances or []
self.youtube_links = youtube_links or []
self.register_plugin("xep_0030")
self.register_plugin("xep_0060")
self.register_plugin("xep_0054")
self.register_plugin("xep_0045")
self.register_plugin("xep_0066")
self.register_plugin("xep_0084")
self.register_plugin("xep_0153")
self.register_plugin("xep_0363")
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("groupchat_message", self.muc_message)
# self.add_event_handler("vcard_avatar_update", self.debug_event)
# self.add_event_handler("stream_error", self.debug_event)
self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event):
"""Start the bot."""
self.send_presence()
await self.get_roster()
await self.update_info()
for channel in self.autojoin:
try:
self.plugin["xep_0045"].join_muc(channel, self.nick)
except Exception as e:
print(e)
async def update_info(self):
"""Update the bot info."""
with open("angel.png", "rb") as avatar_file:
avatar = avatar_file.read()
avatar_type = "image/png"
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
avatar_bytes = len(avatar)
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
asyncio.gather(
self.plugin["xep_0153"].set_avatar(
avatar=avatar,
mtype=avatar_type,
)
)
info = {
"id": avatar_id,
"type": avatar_type,
"bytes": avatar_bytes,
}
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
vcard = self.plugin["xep_0054"].make_vcard()
vcard["URL"] = "https://wiki.kalli.st/Angel"
vcard["DESC"] = "Angel is a bot that can do link previews and embeds."
vcard["NICKNAME"] = "Angel"
vcard["FN"] = "Angel"
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
async def message(self, msg):
"""Process a message."""
if msg["type"] in ("chat", "normal"):
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception as e:
print(e)
self.sed_command(msg, sender, mtype)
async def muc_message(self, msg):
"""Process a groupchat message."""
if msg["type"] in ("groupchat", "normal"):
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
if msg["mucnick"] == self.nick:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception as e:
print(e)
self.sed_command(msg, sender, mtype)
if __name__ == "__main__":
config = configparser.ConfigParser()
config = ConfigParser()
config.read("config.ini")
jid = config["angel"]["jid"]
password = config["angel"]["password"]
@ -379,5 +23,46 @@ if __name__ == "__main__":
youtube_links=youtube_links,
invidious_instances=invidious_instances)
@RegexCmd(bot, sed_cmd)
def sed_command(bot, msg, sender, mtype):
"""Process sed command."""
try:
text = msg["body"]
if not sed_cmd.match(text):
bot.messages[sender]["messages"].add(text)
return
sed_args = sed_parse.split(text)
if len(sed_args) < 4:
return
sed = Sed()
sed.load_string(text)
for message in bot.messages[sender]["messages"]:
if not re.search(sed_args[1], message):
continue
msg = io.StringIO(message)
res = "\n".join(sed.apply(msg, None))
bot.messages[sender]["messages"].add(res)
return bot.send_message(
mto=sender,
mbody=res,
mtype=mtype,
)
except Exception as e:
print(e)
# ping command
@RegexCmd(bot, re.compile(r"^ping$"))
def ping_command(bot, msg, sender, mtype):
"""Process ping command."""
bot.send_message(
mto=sender,
mbody="pong",
mtype=mtype,
)
bot.connect()
bot.process(forever=True)