parent
4a9b4ccbb4
commit
9801202e39
1 changed files with 368 additions and 368 deletions
736
main.py
736
main.py
|
|
@ -1,368 +1,368 @@
|
|||
import requests
|
||||
import bs4
|
||||
import yt_dlp as youtube_dl
|
||||
import random
|
||||
import configparser
|
||||
import re
|
||||
import io
|
||||
import os
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from PythonSed import Sed
|
||||
from slixmpp import ClientXMPP
|
||||
from urllib.parse import urlparse, parse_qs, urlunparse
|
||||
from pantomime import normalize_mimetype
|
||||
import cgi
|
||||
|
||||
sed_parse = re.compile("(?<!\\\\)[/#]")
|
||||
sed_cmd = re.compile("^s[/#].*[/#].*[/#]")
|
||||
|
||||
parser = "html.parser"
|
||||
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
|
||||
" Gecko/20100101 Firefox/10.0"
|
||||
accept_lang = "en-US"
|
||||
data_limit = 100000000 # 100MB
|
||||
|
||||
headers = {
|
||||
"user-agent": user_agent,
|
||||
"Accept-Language": accept_lang,
|
||||
"Cache-Control": "no-cache",
|
||||
}
|
||||
|
||||
youtube_links = ["www.youtube.com", "m.youtube.com"]
|
||||
|
||||
youtube_link = "youtu.be"
|
||||
|
||||
ydl = youtube_dl.YoutubeDL()
|
||||
|
||||
invidious_instances = ["invidious.snopyta.org"]
|
||||
|
||||
block_list = ("localhost", "127.0.0.1", "0.0.0.0")
|
||||
|
||||
req_list = ("http://", "https://")
|
||||
|
||||
html_files = ("text/html", "application/xhtml+xml")
|
||||
|
||||
|
||||
class Lifo(list):
|
||||
"""Limited size LIFO array to store messages and urls."""
|
||||
|
||||
def __init__(self, size):
|
||||
"""Initialize the LIFO array."""
|
||||
super().__init__()
|
||||
self.size = size
|
||||
|
||||
def add(self, item):
|
||||
"""Add an item to the LIFO array."""
|
||||
self.insert(0, item)
|
||||
if len(self) > self.size:
|
||||
self.pop()
|
||||
|
||||
|
||||
def get_youtube_title(url):
|
||||
"""Get the title of a youtube video."""
|
||||
try:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
return info["title"]
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
|
||||
def get_invidious_link(yurl):
|
||||
"""Get an invidious link from a youtube link."""
|
||||
video = yurl.split("/")[-1]
|
||||
instance = random.choice(invidious_instances)
|
||||
return f"https://{instance}/watch?v={video}"
|
||||
|
||||
|
||||
def get_yurl(path):
|
||||
"""Get a youtube link from a path."""
|
||||
yurl = f"https://youtu.be/{path}"
|
||||
return yurl
|
||||
|
||||
|
||||
class AngelBot(ClientXMPP):
|
||||
"""AngelBot class."""
|
||||
|
||||
messages = defaultdict(
|
||||
lambda: {
|
||||
"messages": Lifo(100),
|
||||
"links": Lifo(10),
|
||||
"previews": Lifo(10),
|
||||
}
|
||||
)
|
||||
|
||||
def get_urls(self, msg):
|
||||
"""Get urls from a message."""
|
||||
str_list = msg["body"].strip().split()
|
||||
urls = [u for u in str_list if any(r in u for r in req_list)]
|
||||
return urls
|
||||
|
||||
def send_youtube_info(self, uri, sender, mtype):
|
||||
"""Send youtube info to the sender."""
|
||||
yurl = None
|
||||
if uri.netloc == youtube_link:
|
||||
yurl = get_yurl(uri.path)
|
||||
elif "v" in (query := parse_qs(uri.query)):
|
||||
if v := query["v"]:
|
||||
yurl = get_yurl(v[0])
|
||||
else:
|
||||
return
|
||||
|
||||
invidious = get_invidious_link(yurl)
|
||||
|
||||
if output := get_youtube_title(invidious):
|
||||
if output in self.messages[sender]["previews"]:
|
||||
return
|
||||
self.messages[sender]["previews"].add(output)
|
||||
|
||||
self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype)
|
||||
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
|
||||
|
||||
async def parse_uri(self, uri, sender, mtype):
|
||||
"""Parse a uri and send the result to the sender."""
|
||||
netloc = uri.netloc
|
||||
if netloc in (youtube_links + [youtube_link]):
|
||||
self.send_youtube_info(uri, sender, mtype)
|
||||
elif netloc.split(":")[0] in block_list:
|
||||
return
|
||||
else:
|
||||
await self.process_link(uri, sender, mtype)
|
||||
|
||||
async def process_link(self, uri, sender, mtype):
|
||||
"""Process a link and send the result to the sender."""
|
||||
url = urlunparse(uri)
|
||||
r = requests.get(url, stream=True, headers=headers, timeout=6)
|
||||
if not r.ok:
|
||||
return
|
||||
|
||||
ftype = normalize_mimetype(r.headers.get("content-type"))
|
||||
|
||||
if not ftype:
|
||||
return
|
||||
|
||||
if ftype in html_files:
|
||||
data = ""
|
||||
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
|
||||
data += i.decode("utf-8", errors="ignore")
|
||||
if len(data) > data_limit or "</head>" in data.lower():
|
||||
break
|
||||
soup = bs4.BeautifulSoup(data, parser)
|
||||
if title := soup.find("title"):
|
||||
output = title.text.strip()
|
||||
if output:
|
||||
output = f"*{output}*" if ("\n" not in output) else output
|
||||
if output in self.messages[sender]["previews"]:
|
||||
return
|
||||
|
||||
self.messages[sender]["previews"].add(output)
|
||||
if r.history:
|
||||
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
|
||||
self.send_message(mto=sender, mbody=output, mtype=mtype)
|
||||
|
||||
else:
|
||||
try:
|
||||
lenght = 0
|
||||
outfile = io.BytesIO()
|
||||
for chunk in r.iter_content(
|
||||
chunk_size=512,
|
||||
decode_unicode=False,
|
||||
):
|
||||
lenght += 512
|
||||
if lenght >= data_limit:
|
||||
return
|
||||
outfile.write(chunk)
|
||||
|
||||
content_disposition = r.headers.get("content-disposition")
|
||||
filename = None
|
||||
if content_disposition:
|
||||
_, params = cgi.parse_header(content_disposition)
|
||||
filename = params.get("filename")
|
||||
else:
|
||||
filename = os.path.basename(uri.path)
|
||||
|
||||
ext = os.path.splitext(filename)[1] if filename else ".txt"
|
||||
fname = filename if filename else f"file{ext}"
|
||||
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
|
||||
"""Embed a file and send the result to the sender."""
|
||||
furl = await self.plugin["xep_0363"].upload_file(
|
||||
fname, content_type=ftype, input_file=outfile
|
||||
)
|
||||
message = self.make_message(sender)
|
||||
message["body"] = furl
|
||||
message["type"] = mtype
|
||||
message["oob"]["url"] = furl
|
||||
message.send()
|
||||
|
||||
async def parse_urls(self, msg, urls, sender, mtype):
|
||||
"""Parse urls and send the result to the sender."""
|
||||
body = msg["body"].lower()
|
||||
if "nsfw" in body: return
|
||||
if "nsfl" in body: return
|
||||
for u in urls:
|
||||
if u in self.messages[sender]["links"]:
|
||||
continue
|
||||
else:
|
||||
self.messages[sender]["links"].add(u)
|
||||
|
||||
uri = urlparse(u)
|
||||
await self.parse_uri(uri, sender, mtype)
|
||||
|
||||
def sed_command(self, msg, sender, mtype):
|
||||
"""Process sed command."""
|
||||
try:
|
||||
text = msg["body"]
|
||||
if not sed_cmd.match(text):
|
||||
self.messages[sender]["messages"].add(text)
|
||||
return
|
||||
sed_args = sed_parse.split(text)
|
||||
|
||||
if len(sed_args) < 4:
|
||||
return
|
||||
|
||||
sed = Sed()
|
||||
sed.load_string(text)
|
||||
|
||||
for message in self.messages[sender]["messages"]:
|
||||
if sed_args[1] not in message:
|
||||
continue
|
||||
msg = io.StringIO(message)
|
||||
res = "\n".join(sed.apply(msg, None))
|
||||
self.messages[sender]["messages"].add(res)
|
||||
return self.send_message(
|
||||
mto=sender,
|
||||
mbody=res,
|
||||
mtype=mtype,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def __init__(self, jid, password, nick="angel", autojoin=None):
|
||||
"""Initialize the bot."""
|
||||
ClientXMPP.__init__(self, jid, password)
|
||||
self.jid = jid
|
||||
self.nick = nick
|
||||
self.autojoin = autojoin or []
|
||||
self.register_plugin("xep_0030")
|
||||
self.register_plugin("xep_0060")
|
||||
self.register_plugin("xep_0054")
|
||||
self.register_plugin("xep_0045")
|
||||
self.register_plugin("xep_0066")
|
||||
self.register_plugin("xep_0084")
|
||||
self.register_plugin("xep_0153")
|
||||
self.register_plugin("xep_0363")
|
||||
|
||||
self.add_event_handler("session_start", self.session_start)
|
||||
self.add_event_handler("message", self.message)
|
||||
self.add_event_handler("groupchat_message", self.muc_message)
|
||||
# self.add_event_handler("vcard_avatar_update", self.debug_event)
|
||||
# self.add_event_handler("stream_error", self.debug_event)
|
||||
self.add_event_handler("disconnected", lambda _: self.connect())
|
||||
|
||||
async def session_start(self, event):
|
||||
"""Start the bot."""
|
||||
self.send_presence()
|
||||
await self.get_roster()
|
||||
await self.update_info()
|
||||
for channel in self.autojoin:
|
||||
try:
|
||||
self.plugin["xep_0045"].join_muc(channel, self.nick)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
async def update_info(self):
|
||||
"""Update the bot info."""
|
||||
with open("angel.png", "rb") as avatar_file:
|
||||
avatar = avatar_file.read()
|
||||
|
||||
avatar_type = "image/png"
|
||||
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
|
||||
avatar_bytes = len(avatar)
|
||||
|
||||
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
|
||||
|
||||
asyncio.gather(
|
||||
self.plugin["xep_0153"].set_avatar(
|
||||
avatar=avatar,
|
||||
mtype=avatar_type,
|
||||
)
|
||||
)
|
||||
|
||||
info = {
|
||||
"id": avatar_id,
|
||||
"type": avatar_type,
|
||||
"bytes": avatar_bytes,
|
||||
}
|
||||
|
||||
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
|
||||
|
||||
vcard = self.plugin["xep_0054"].make_vcard()
|
||||
|
||||
vcard["URL"] = "https://wiki.kalli.st/Angel"
|
||||
vcard["DESC"] = "Angel is a bot that can do link previews and embeds."
|
||||
vcard["NICKNAME"] = "Angel"
|
||||
vcard["FN"] = "Angel"
|
||||
|
||||
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
|
||||
|
||||
async def message(self, msg):
|
||||
"""Process a message."""
|
||||
if msg["type"] in ("chat", "normal"):
|
||||
edit = "urn:xmpp:message-correct:0" in str(msg)
|
||||
if edit:
|
||||
return
|
||||
|
||||
mtype = msg["type"]
|
||||
sender = msg["from"].bare
|
||||
|
||||
try:
|
||||
if not msg["oob"]["url"]:
|
||||
if urls := self.get_urls(msg):
|
||||
await self.parse_urls(msg, urls, sender, mtype)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
self.sed_command(msg, sender, mtype)
|
||||
|
||||
async def muc_message(self, msg):
|
||||
"""Process a groupchat message."""
|
||||
if msg["type"] in ("groupchat", "normal"):
|
||||
edit = "urn:xmpp:message-correct:0" in str(msg)
|
||||
|
||||
if edit:
|
||||
return
|
||||
|
||||
if msg["mucnick"] == self.nick:
|
||||
return
|
||||
|
||||
mtype = msg["type"]
|
||||
sender = msg["from"].bare
|
||||
|
||||
|
||||
try:
|
||||
if not msg["oob"]["url"]:
|
||||
if urls := self.get_urls(msg):
|
||||
await self.parse_urls(msg, urls, sender, mtype)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
self.sed_command(msg, sender, mtype)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini")
|
||||
jid = config["angel"]["jid"]
|
||||
password = config["angel"]["password"]
|
||||
autojoin = config["angel"]["autojoin"].split()
|
||||
nick = config["angel"]["nick"]
|
||||
bot = AngelBot(jid, password, nick=nick, autojoin=autojoin)
|
||||
|
||||
bot.connect()
|
||||
bot.process(forever=True)
|
||||
import requests
|
||||
import bs4
|
||||
import yt_dlp as youtube_dl
|
||||
import random
|
||||
import configparser
|
||||
import re
|
||||
import io
|
||||
import os
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from PythonSed import Sed
|
||||
from slixmpp import ClientXMPP
|
||||
from urllib.parse import urlparse, parse_qs, urlunparse
|
||||
from pantomime import normalize_mimetype
|
||||
import cgi
|
||||
|
||||
sed_parse = re.compile("(?<!\\\\)[/#]")
|
||||
sed_cmd = re.compile("^s[/#].*[/#].*[/#]")
|
||||
|
||||
parser = "html.parser"
|
||||
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
|
||||
" Gecko/20100101 Firefox/10.0"
|
||||
accept_lang = "en-US"
|
||||
data_limit = 100000000 # 100MB
|
||||
|
||||
headers = {
|
||||
"user-agent": user_agent,
|
||||
"Accept-Language": accept_lang,
|
||||
"Cache-Control": "no-cache",
|
||||
}
|
||||
|
||||
youtube_links = ["www.youtube.com", "m.youtube.com"]
|
||||
|
||||
youtube_link = "youtu.be"
|
||||
|
||||
ydl = youtube_dl.YoutubeDL()
|
||||
|
||||
invidious_instances = ["invidious.kalli.st"]
|
||||
|
||||
block_list = ("localhost", "127.0.0.1", "0.0.0.0")
|
||||
|
||||
req_list = ("http://", "https://")
|
||||
|
||||
html_files = ("text/html", "application/xhtml+xml")
|
||||
|
||||
|
||||
class Lifo(list):
|
||||
"""Limited size LIFO array to store messages and urls."""
|
||||
|
||||
def __init__(self, size):
|
||||
"""Initialize the LIFO array."""
|
||||
super().__init__()
|
||||
self.size = size
|
||||
|
||||
def add(self, item):
|
||||
"""Add an item to the LIFO array."""
|
||||
self.insert(0, item)
|
||||
if len(self) > self.size:
|
||||
self.pop()
|
||||
|
||||
|
||||
def get_youtube_title(url):
|
||||
"""Get the title of a youtube video."""
|
||||
try:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
return info["title"]
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
|
||||
def get_invidious_link(yurl):
|
||||
"""Get an invidious link from a youtube link."""
|
||||
video = yurl.split("/")[-1]
|
||||
instance = random.choice(invidious_instances)
|
||||
return f"https://{instance}/watch?v={video}"
|
||||
|
||||
|
||||
def get_yurl(path):
|
||||
"""Get a youtube link from a path."""
|
||||
yurl = f"https://youtu.be/{path}"
|
||||
return yurl
|
||||
|
||||
|
||||
class AngelBot(ClientXMPP):
|
||||
"""AngelBot class."""
|
||||
|
||||
messages = defaultdict(
|
||||
lambda: {
|
||||
"messages": Lifo(100),
|
||||
"links": Lifo(10),
|
||||
"previews": Lifo(10),
|
||||
}
|
||||
)
|
||||
|
||||
def get_urls(self, msg):
|
||||
"""Get urls from a message."""
|
||||
str_list = msg["body"].strip().split()
|
||||
urls = [u for u in str_list if any(r in u for r in req_list)]
|
||||
return urls
|
||||
|
||||
def send_youtube_info(self, uri, sender, mtype):
|
||||
"""Send youtube info to the sender."""
|
||||
yurl = None
|
||||
if uri.netloc == youtube_link:
|
||||
yurl = get_yurl(uri.path)
|
||||
elif "v" in (query := parse_qs(uri.query)):
|
||||
if v := query["v"]:
|
||||
yurl = get_yurl(v[0])
|
||||
else:
|
||||
return
|
||||
|
||||
invidious = get_invidious_link(yurl)
|
||||
|
||||
if output := get_youtube_title(invidious):
|
||||
if output in self.messages[sender]["previews"]:
|
||||
return
|
||||
self.messages[sender]["previews"].add(output)
|
||||
|
||||
self.send_message(mto=sender, mbody=f"*{output}*", mtype=mtype)
|
||||
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
|
||||
|
||||
async def parse_uri(self, uri, sender, mtype):
|
||||
"""Parse a uri and send the result to the sender."""
|
||||
netloc = uri.netloc
|
||||
if netloc in (youtube_links + [youtube_link]):
|
||||
self.send_youtube_info(uri, sender, mtype)
|
||||
elif netloc.split(":")[0] in block_list:
|
||||
return
|
||||
else:
|
||||
await self.process_link(uri, sender, mtype)
|
||||
|
||||
async def process_link(self, uri, sender, mtype):
|
||||
"""Process a link and send the result to the sender."""
|
||||
url = urlunparse(uri)
|
||||
r = requests.get(url, stream=True, headers=headers, timeout=6)
|
||||
if not r.ok:
|
||||
return
|
||||
|
||||
ftype = normalize_mimetype(r.headers.get("content-type"))
|
||||
|
||||
if not ftype:
|
||||
return
|
||||
|
||||
if ftype in html_files:
|
||||
data = ""
|
||||
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
|
||||
data += i.decode("utf-8", errors="ignore")
|
||||
if len(data) > data_limit or "</head>" in data.lower():
|
||||
break
|
||||
soup = bs4.BeautifulSoup(data, parser)
|
||||
if title := soup.find("title"):
|
||||
output = title.text.strip()
|
||||
if output:
|
||||
output = f"*{output}*" if ("\n" not in output) else output
|
||||
if output in self.messages[sender]["previews"]:
|
||||
return
|
||||
|
||||
self.messages[sender]["previews"].add(output)
|
||||
if r.history:
|
||||
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
|
||||
self.send_message(mto=sender, mbody=output, mtype=mtype)
|
||||
|
||||
else:
|
||||
try:
|
||||
lenght = 0
|
||||
outfile = io.BytesIO()
|
||||
for chunk in r.iter_content(
|
||||
chunk_size=512,
|
||||
decode_unicode=False,
|
||||
):
|
||||
lenght += 512
|
||||
if lenght >= data_limit:
|
||||
return
|
||||
outfile.write(chunk)
|
||||
|
||||
content_disposition = r.headers.get("content-disposition")
|
||||
filename = None
|
||||
if content_disposition:
|
||||
_, params = cgi.parse_header(content_disposition)
|
||||
filename = params.get("filename")
|
||||
else:
|
||||
filename = os.path.basename(uri.path)
|
||||
|
||||
ext = os.path.splitext(filename)[1] if filename else ".txt"
|
||||
fname = filename if filename else f"file{ext}"
|
||||
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
|
||||
"""Embed a file and send the result to the sender."""
|
||||
furl = await self.plugin["xep_0363"].upload_file(
|
||||
fname, content_type=ftype, input_file=outfile
|
||||
)
|
||||
message = self.make_message(sender)
|
||||
message["body"] = furl
|
||||
message["type"] = mtype
|
||||
message["oob"]["url"] = furl
|
||||
message.send()
|
||||
|
||||
async def parse_urls(self, msg, urls, sender, mtype):
|
||||
"""Parse urls and send the result to the sender."""
|
||||
body = msg["body"].lower()
|
||||
if "nsfw" in body: return
|
||||
if "nsfl" in body: return
|
||||
for u in urls:
|
||||
if u in self.messages[sender]["links"]:
|
||||
continue
|
||||
else:
|
||||
self.messages[sender]["links"].add(u)
|
||||
|
||||
uri = urlparse(u)
|
||||
await self.parse_uri(uri, sender, mtype)
|
||||
|
||||
def sed_command(self, msg, sender, mtype):
|
||||
"""Process sed command."""
|
||||
try:
|
||||
text = msg["body"]
|
||||
if not sed_cmd.match(text):
|
||||
self.messages[sender]["messages"].add(text)
|
||||
return
|
||||
sed_args = sed_parse.split(text)
|
||||
|
||||
if len(sed_args) < 4:
|
||||
return
|
||||
|
||||
sed = Sed()
|
||||
sed.load_string(text)
|
||||
|
||||
for message in self.messages[sender]["messages"]:
|
||||
if sed_args[1] not in message:
|
||||
continue
|
||||
msg = io.StringIO(message)
|
||||
res = "\n".join(sed.apply(msg, None))
|
||||
self.messages[sender]["messages"].add(res)
|
||||
return self.send_message(
|
||||
mto=sender,
|
||||
mbody=res,
|
||||
mtype=mtype,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def __init__(self, jid, password, nick="angel", autojoin=None):
|
||||
"""Initialize the bot."""
|
||||
ClientXMPP.__init__(self, jid, password)
|
||||
self.jid = jid
|
||||
self.nick = nick
|
||||
self.autojoin = autojoin or []
|
||||
self.register_plugin("xep_0030")
|
||||
self.register_plugin("xep_0060")
|
||||
self.register_plugin("xep_0054")
|
||||
self.register_plugin("xep_0045")
|
||||
self.register_plugin("xep_0066")
|
||||
self.register_plugin("xep_0084")
|
||||
self.register_plugin("xep_0153")
|
||||
self.register_plugin("xep_0363")
|
||||
|
||||
self.add_event_handler("session_start", self.session_start)
|
||||
self.add_event_handler("message", self.message)
|
||||
self.add_event_handler("groupchat_message", self.muc_message)
|
||||
# self.add_event_handler("vcard_avatar_update", self.debug_event)
|
||||
# self.add_event_handler("stream_error", self.debug_event)
|
||||
self.add_event_handler("disconnected", lambda _: self.connect())
|
||||
|
||||
async def session_start(self, event):
|
||||
"""Start the bot."""
|
||||
self.send_presence()
|
||||
await self.get_roster()
|
||||
await self.update_info()
|
||||
for channel in self.autojoin:
|
||||
try:
|
||||
self.plugin["xep_0045"].join_muc(channel, self.nick)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
async def update_info(self):
|
||||
"""Update the bot info."""
|
||||
with open("angel.png", "rb") as avatar_file:
|
||||
avatar = avatar_file.read()
|
||||
|
||||
avatar_type = "image/png"
|
||||
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
|
||||
avatar_bytes = len(avatar)
|
||||
|
||||
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
|
||||
|
||||
asyncio.gather(
|
||||
self.plugin["xep_0153"].set_avatar(
|
||||
avatar=avatar,
|
||||
mtype=avatar_type,
|
||||
)
|
||||
)
|
||||
|
||||
info = {
|
||||
"id": avatar_id,
|
||||
"type": avatar_type,
|
||||
"bytes": avatar_bytes,
|
||||
}
|
||||
|
||||
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
|
||||
|
||||
vcard = self.plugin["xep_0054"].make_vcard()
|
||||
|
||||
vcard["URL"] = "https://wiki.kalli.st/Angel"
|
||||
vcard["DESC"] = "Angel is a bot that can do link previews and embeds."
|
||||
vcard["NICKNAME"] = "Angel"
|
||||
vcard["FN"] = "Angel"
|
||||
|
||||
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
|
||||
|
||||
async def message(self, msg):
|
||||
"""Process a message."""
|
||||
if msg["type"] in ("chat", "normal"):
|
||||
edit = "urn:xmpp:message-correct:0" in str(msg)
|
||||
if edit:
|
||||
return
|
||||
|
||||
mtype = msg["type"]
|
||||
sender = msg["from"].bare
|
||||
|
||||
try:
|
||||
if not msg["oob"]["url"]:
|
||||
if urls := self.get_urls(msg):
|
||||
await self.parse_urls(msg, urls, sender, mtype)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
self.sed_command(msg, sender, mtype)
|
||||
|
||||
async def muc_message(self, msg):
|
||||
"""Process a groupchat message."""
|
||||
if msg["type"] in ("groupchat", "normal"):
|
||||
edit = "urn:xmpp:message-correct:0" in str(msg)
|
||||
|
||||
if edit:
|
||||
return
|
||||
|
||||
if msg["mucnick"] == self.nick:
|
||||
return
|
||||
|
||||
mtype = msg["type"]
|
||||
sender = msg["from"].bare
|
||||
|
||||
|
||||
try:
|
||||
if not msg["oob"]["url"]:
|
||||
if urls := self.get_urls(msg):
|
||||
await self.parse_urls(msg, urls, sender, mtype)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
self.sed_command(msg, sender, mtype)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini")
|
||||
jid = config["angel"]["jid"]
|
||||
password = config["angel"]["password"]
|
||||
autojoin = config["angel"]["autojoin"].split()
|
||||
nick = config["angel"]["nick"]
|
||||
bot = AngelBot(jid, password, nick=nick, autojoin=autojoin)
|
||||
|
||||
bot.connect()
|
||||
bot.process(forever=True)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue