gardening

This commit is contained in:
Czar 2023-02-19 23:20:20 -03:00
commit 8d3b290106
Signed by: czar
GPG key ID: 2DBA9558E9277C37
5 changed files with 914 additions and 35 deletions

98
main.py
View file

@ -44,42 +44,47 @@ req_list = ("http://", "https://")
html_files = ("text/html", "application/xhtml+xml")
class Lifo(list):
"""
Limited size LIFO array to store messages and urls
"""
"""Limited size LIFO array to store messages and urls."""
def __init__(self, size):
"""Initialize the LIFO array."""
super().__init__()
self.size = size
def add(self, item):
"""Add an item to the LIFO array."""
self.insert(0, item)
if len(self) > self.size:
self.pop()
def get_youtube_title(url):
"""Get the title of a youtube video."""
try:
info = ydl.extract_info(url, download=False)
return info["title"]
except Exception:
return ""
except Exception as e:
print(e)
return None
def get_invidious_link(yurl):
"""Get an invidious link from a youtube link."""
video = yurl.split("/")[-1]
instance = random.choice(invidious_instances)
return f"https://{instance}/watch?v={video}"
def get_yurl(path):
"""Get a youtube link from a path."""
yurl = f"https://youtu.be/{path}"
return yurl
class AngelBot(ClientXMPP):
"""AngelBot class."""
messages = defaultdict(
lambda: {
"messages": Lifo(100),
@ -89,11 +94,14 @@ class AngelBot(ClientXMPP):
)
def get_urls(self, msg):
"""Get urls from a message."""
str_list = msg["body"].strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
def send_youtube_info(self, uri, sender, mtype):
"""Send youtube info to the sender."""
yurl = None
if uri.netloc == youtube_link:
yurl = get_yurl(uri.path)
elif "v" in (query := parse_qs(uri.query)):
@ -111,6 +119,7 @@ class AngelBot(ClientXMPP):
self.send_message(mto=sender, mbody=invidious, mtype=mtype)
async def parse_uri(self, uri, sender, mtype):
"""Parse a uri and send the result to the sender."""
netloc = uri.netloc
if netloc in (youtube_links + [youtube_link]):
self.send_youtube_info(uri, sender, mtype)
@ -120,11 +129,17 @@ class AngelBot(ClientXMPP):
await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=5)
r = requests.get(url, stream=True, headers=headers, timeout=6)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if not ftype:
return
if ftype in html_files:
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
@ -158,17 +173,21 @@ class AngelBot(ClientXMPP):
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
ext = os.path.splitext(filename)[1] if filename else None or ".bin"
fname = filename or uri.path.strip("/").split("/")[-1] or f"file{ext}"
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception:
...
except Exception as e:
print(e)
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
"""Embed a file and send the result to the sender."""
furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile
)
@ -179,8 +198,10 @@ class AngelBot(ClientXMPP):
message.send()
async def parse_urls(self, msg, urls, sender, mtype):
if "nsfw" in msg["body"].lower():
return
"""Parse urls and send the result to the sender."""
body = msg["body"].lower()
if "nsfw" in body: return
if "nsfl" in body: return
for u in urls:
if u in self.messages[sender]["links"]:
continue
@ -191,6 +212,7 @@ class AngelBot(ClientXMPP):
await self.parse_uri(uri, sender, mtype)
def sed_command(self, msg, sender, mtype):
"""Process sed command."""
try:
text = msg["body"]
if not sed_cmd.match(text):
@ -216,10 +238,11 @@ class AngelBot(ClientXMPP):
mtype=mtype,
)
except Exception:
return
except Exception as e:
print(e)
def __init__(self, jid, password, nick="angel", autojoin=None):
"""Initialize the bot."""
ClientXMPP.__init__(self, jid, password)
self.jid = jid
self.nick = nick
@ -241,16 +264,18 @@ class AngelBot(ClientXMPP):
self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event):
"""Start the bot."""
self.send_presence()
await self.get_roster()
await self.update_info()
for channel in self.autojoin:
try:
self.plugin["xep_0045"].join_muc(channel, self.nick)
except:
...
except Exception as e:
print(e)
async def update_info(self):
"""Update the bot info."""
with open("angel.png", "rb") as avatar_file:
avatar = avatar_file.read()
@ -282,40 +307,45 @@ class AngelBot(ClientXMPP):
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
async def message(self, msg):
"""Process a message."""
if msg["type"] in ("chat", "normal"):
mtype = "chat"
sender = msg["from"].bare
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception:
...
except Exception as e:
print(e)
self.sed_command(msg, sender, mtype)
async def muc_message(self, msg):
"""Process a groupchat message."""
if msg["type"] in ("groupchat", "normal"):
mtype = "groupchat"
sender = msg["from"].bare
if msg["mucnick"] == self.nick:
return
edit = "urn:xmpp:message-correct:0" in str(msg)
if edit:
return
if msg["mucnick"] == self.nick:
return
mtype = msg["type"]
sender = msg["from"].bare
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception:
pass
except Exception as e:
print(e)
self.sed_command(msg, sender, mtype)
@ -327,7 +357,7 @@ if __name__ == "__main__":
password = config["angel"]["password"]
autojoin = config["angel"]["autojoin"].split()
bot = AngelBot(jid, password, autojoin=autojoin)
bot = AngelBot(jid, password, nick="angel-from-overworld", autojoin=autojoin)
bot.connect()
bot.process(forever=True)