250 lines
8.5 KiB
Python
250 lines
8.5 KiB
Python
import simplematrixbotlib as botlib
|
|
import os
|
|
import sys
|
|
from baichat_py import Completion
|
|
|
|
import traceback
|
|
|
|
PREFIX = "!"
|
|
|
|
USERNAME = os.environ.get("MATRIX_USERNAME", "ai")
|
|
SERVER = os.environ.get("MATRIX_SERVER", "https://matrix.projectsegfau.lt")
|
|
PASSWORD = os.environ.get("MATRIX_PASSWORD", None)
|
|
|
|
ENABLE_AI_COMMAND = os.environ.get("ENABLE_AI_COMMAND", False)
|
|
ENABLE_IMAGE_COMMAND = os.environ.get("ENABLE_IMAGE_COMMAND", True)
|
|
ENABLE_INSPIRE_COMMAND = os.environ.get("ENABLE_INSPIRE_COMMAND", True)
|
|
|
|
AI_COMMAND_ALIASES = os.environ.get("AI_COMMAND_ALIASES", "ask, ai, gpt").split(", ")
|
|
IMAGE_COMMAND_ALIASES = os.environ.get("IMAGE_COMMAND_ALIASES", "img, i").split(", ")
|
|
INSPIRE_COMMAND_ALIASES = os.environ.get("INSPIRE_COMMAND_ALIASES", "inspire, insp").split(", ")
|
|
|
|
PING_URL = os.environ.get(
|
|
"PING_URL",
|
|
"https://healthchecks.projectsegfau.lt/ping/2f7f2cd3-2a8d-4fff-b5ca-f4d17d47f75b",
|
|
)
|
|
|
|
SYSTEMD_SERVICE_NAME = os.environ.get("SYSTEMD_SERVICE_NAME", "matrixai")
|
|
|
|
import io
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
import os
|
|
from langdetect import detect
|
|
|
|
from enum import Enum
|
|
import uuid
|
|
import subprocess
|
|
|
|
from pyimagine import Imagine
|
|
from pyimagine.constants import Inspiration, Style, Ratio
|
|
|
|
def run():
|
|
if not USERNAME or not SERVER or not PASSWORD:
|
|
print(
|
|
"Please set the environment variables MATRIX_USERNAME(optional), MATRIX_SERVER(optional), and MATRIX_PASSWORD(required)"
|
|
)
|
|
return
|
|
|
|
creds = botlib.Creds(SERVER, USERNAME, PASSWORD)
|
|
bot = botlib.Bot(creds)
|
|
|
|
@bot.listener.on_message_event
|
|
async def ask(room, message):
|
|
match = botlib.MessageMatch(room, message, bot, PREFIX)
|
|
|
|
if match.is_not_from_this_bot() and match.prefix() and ENABLE_AI_COMMAND:
|
|
for alias in AI_COMMAND_ALIASES:
|
|
if match.command(alias):
|
|
break
|
|
else:
|
|
return
|
|
|
|
try:
|
|
prompt = " ".join(arg for arg in match.args())
|
|
response = "".join(Completion.create(prompt))
|
|
await bot.api.send_markdown_message(
|
|
room.room_id, f"> {prompt}\n\n{response}"
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
await bot.api.send_markdown_message(room.room_id, f"> {prompt}\n\n{e}")
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(PING_URL) as resp:
|
|
await resp.read()
|
|
|
|
@bot.listener.on_message_event
|
|
async def image(room, message):
|
|
match = botlib.MessageMatch(room, message, bot, PREFIX)
|
|
|
|
if match.is_not_from_this_bot() and match.prefix() and ENABLE_IMAGE_COMMAND:
|
|
for alias in IMAGE_COMMAND_ALIASES:
|
|
if match.command(alias):
|
|
break
|
|
else:
|
|
return
|
|
|
|
try:
|
|
print(match.args())
|
|
prompt = ""
|
|
negative = ""
|
|
|
|
style = "IMAGINE_V3"
|
|
ratio = "RATIO_1X1"
|
|
|
|
for arg in match.args():
|
|
if arg.startswith("neg="):
|
|
negative += arg.replace("neg=", "")
|
|
|
|
elif arg.startswith("style="):
|
|
style = arg.replace("style=", "")
|
|
|
|
elif arg.startswith("ratio="):
|
|
ratio = arg.replace("ratio=", "")
|
|
else:
|
|
prompt += arg + " "
|
|
|
|
async def generate_image(image_prompt, style_value, ratio_value, negative):
|
|
if negative is None:
|
|
negative = False
|
|
imagine = Imagine()
|
|
filename = str(uuid.uuid4()) + ".png"
|
|
style_enum = Style[style_value]
|
|
ratio_enum = Ratio[ratio_value]
|
|
img_data = imagine.sdprem(
|
|
prompt=image_prompt,
|
|
style=style_enum,
|
|
ratio=ratio_enum,
|
|
negative=negative
|
|
)
|
|
|
|
try:
|
|
with open(filename, mode="wb") as img_file:
|
|
img_file.write(img_data)
|
|
except Exception as e:
|
|
print(
|
|
f"An error occurred while creating the image file: {e}")
|
|
return None
|
|
|
|
return filename
|
|
|
|
filename = await generate_image(prompt, style, ratio, negative)
|
|
|
|
await bot.api.send_image_message(
|
|
room_id=room.room_id, image_filepath=filename
|
|
)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
traceback.print_exc()
|
|
await bot.api.send_markdown_message(room.room_id, f"> {prompt}\n\n{e}")
|
|
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(PING_URL) as resp:
|
|
await resp.read()
|
|
|
|
@bot.listener.on_message_event
|
|
async def inspire(room, message):
|
|
match = botlib.MessageMatch(room, message, bot, PREFIX)
|
|
|
|
if match.is_not_from_this_bot() and match.prefix() and ENABLE_INSPIRE_COMMAND:
|
|
for alias in INSPIRE_COMMAND_ALIASES:
|
|
if match.command(alias):
|
|
break
|
|
else:
|
|
return
|
|
|
|
try:
|
|
print(match.args())
|
|
try:
|
|
inspiration = match.args()[0]
|
|
except:
|
|
inspiration = "INSPIRATION_01"
|
|
|
|
async def generate_image(inspiration):
|
|
imagine = Imagine()
|
|
filename = str(uuid.uuid4()) + ".png"
|
|
print(inspiration)
|
|
try:
|
|
inspiration = Inspiration[inspiration]
|
|
except:
|
|
await bot.api.send_markdown_message(room.room_id, f"{inspiration} cannot be found, using default one")
|
|
inspiration = Inspiration.INSPIRATION_01
|
|
|
|
img_data = imagine.sdinsp(
|
|
inspiration=inspiration
|
|
)
|
|
|
|
try:
|
|
with open(filename, mode="wb") as img_file:
|
|
img_file.write(img_data)
|
|
except Exception as e:
|
|
print(
|
|
f"An error occurred while creating the image file: {e}")
|
|
return None
|
|
|
|
return filename
|
|
|
|
filename = await generate_image(inspiration)
|
|
|
|
await bot.api.send_image_message(
|
|
room_id=room.room_id, image_filepath=filename
|
|
)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
traceback.print_exc()
|
|
await bot.api.send_markdown_message(room.room_id, f"> {prompt}\n\n{e}")
|
|
|
|
else:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(PING_URL) as resp:
|
|
await resp.read()
|
|
|
|
@bot.listener.on_message_event
|
|
async def bot_help(room, message):
|
|
styles = ", ".join([style.name for style in Style])
|
|
ratios = ", ".join([ratio.name for ratio in Ratio])
|
|
inspirations = "- ".join([f"{inspiration.name}={inspiration.value[0]}\n" for inspiration in Inspiration])
|
|
# ask:
|
|
# command: {", ".join(AI_COMMAND_ALIASES)}
|
|
# description: ask gpt a question
|
|
bot_help_message = f"""
|
|
# Help of `@ai:psf.lt`
|
|
|
|
- `!help`, `!?`, `!h`: display help command
|
|
- `{"`, `".join(IMAGE_COMMAND_ALIASES)}`: generate an image from a prompt
|
|
options:
|
|
- `neg`: negative prompt
|
|
example: `neg=low`
|
|
- `style`: style of image
|
|
example: `style=IMAGINE_V3`
|
|
values: {styles}
|
|
- `ratio`: ratio of image
|
|
example: `ratio=RATIO_1X1`
|
|
values: {ratios}
|
|
|
|
For text generation, invite `@ai:psf.lt`
|
|
"""
|
|
match = botlib.MessageMatch(room, message, bot, PREFIX)
|
|
if (
|
|
match.is_not_from_this_bot()
|
|
and match.prefix()
|
|
and (match.command("help") or match.command("?") or match.command("h"))
|
|
):
|
|
await bot.api.send_markdown_message(room.room_id, bot_help_message)
|
|
|
|
try:
|
|
bot.run()
|
|
except Exception as e:
|
|
subprocess.run(["systemctl", "restart", "--user", SYSTEMD_SERVICE_NAME])
|
|
print("Restarting bot due to {e}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(run())
|