from typing import Type, Optional
from time import time
from html import escape
from mautrix.types import MediaMessageEventContent, TextMessageEventContent, MessageType, Format, RelatesTo, RelationType, RoomID
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from maubot import Plugin, MessageEvent
from maubot.handlers import command
import json
import openai
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("gpt_on")
helper.copy("gpt_room_id")
helper.copy("gpt_apikey")
helper.copy("gpt_model_engine")
helper.copy("gpt_temperature")
helper.copy("gpt_top_p")
helper.copy("gpt_presence_penalty")
helper.copy("gpt_frequency_penalty")
helper.copy("gpt_echo")
helper.copy("gpt_stop")
helper.copy("gpt_n")
helper.copy("gpt_stream")
helper.copy("gpt_logprobs")
helper.copy("gpt_best_of")
helper.copy("gpt_logit_bias")
class EchoBot(Plugin):
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.http = self.client.api.session
async def stop(self) -> None:
await super().stop()
@staticmethod
def plural(num: float, unit: str, decimals: Optional[int] = None) -> str:
num = round(num, decimals)
if num == 1:
return f"{num} {unit}"
else:
return f"{num} {unit}s"
@classmethod
def prettify_diff(cls, diff: int) -> str:
if abs(diff) < 10 * 1_000:
return f"{diff} ms"
elif abs(diff) < 60 * 1_000:
return cls.plural(diff / 1_000, 'second', decimals=1)
minutes, seconds = divmod(diff / 1_000, 60)
if abs(minutes) < 60:
return f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}"
hours, minutes = divmod(minutes, 60)
if abs(hours) < 24:
return (f"{cls.plural(hours, 'hour')}, {cls.plural(minutes, 'minute')}"
f" and {cls.plural(seconds, 'second')}")
days, hours = divmod(hours, 24)
return (f"{cls.plural(days, 'day')}, {cls.plural(hours, 'hour')}, "
f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}")
@command.new("ping", help="Ping")
@command.argument("message", pass_raw=True, required=False)
async def ping_handler(self, evt: MessageEvent, message: str = "") -> None:
diff = int(time() * 1000) - evt.timestamp
pretty_diff = self.prettify_diff(diff)
text_message = f'"{message[:20]}" took' if message else "took"
html_message = f'"{escape(message[:20])}" took' if message else "took"
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body=f"{evt.sender}: Pong! (ping {text_message} {pretty_diff} to arrive)",
formatted_body=f"{evt.sender}: Pong! "
f"(ping {html_message} "
f"{pretty_diff} to arrive)",
relates_to=RelatesTo(
rel_type=RelationType("xyz.maubot.gpt.echo"),
event_id=evt.event_id,
))
pong_from = evt.sender.split(":", 1)[1]
content.relates_to["from"] = pong_from
content.relates_to["ms"] = diff
content["pong"] = {
"ms": diff,
"from": pong_from,
"ping": evt.event_id,
}
await evt.respond(content)
@command.new("echo", help="Repeat a message")
@command.argument("message", pass_raw=True)
async def echo_handler(self, evt: MessageEvent, message: str) -> None:
await evt.respond(message)
@command.new("gpt", help="ChatGPT response")
@command.argument("message", pass_raw=True, required=False)
async def gpt_handler(self, evt: MessageEvent, message: str = "") -> None:
if self.config["gpt_on"] and evt.room_id in self.config["gpt_room_id"]:
openai.api_key = self.config["gpt_apikey"]
resp = openai.Completion.create(engine=self.config["gpt_model_engine"],
prompt=message,
max_tokens=int(self.config["gpt_max_tokens"]),
temperature=float(self.config["gpt_temperature"]),
top_p=int(self.config["gpt_top_p"]),
presence_penalty=int(self.config["gpt_presence_penalty"]),
frequency_penalty=int(self.config["gpt_frequency_penalty"]),
echo=self.config["gpt_echo"],
stop=self.config["gpt_stop"],
n=int(self.config["gpt_n"]),
stream=self.config["gpt_stream"],
# logprobs=self.config["gpt_logprobs"],
best_of=int(self.config["gpt_best_of"]),
logit_bias={}
)
n = len(resp.choices)
if n == 1:
html_message = resp.choices[0].text
text_message = resp.choices[0].text
else:
texts = []
for idx in range(0, n):
html_message.append(resp.choices[idx].text)
text_message.append(resp.choices[idx].text)
else:
html_message = f'chatGPT jest wyłączony.'
text_message = 'chatGPT jest wyłączony.'
content = TextMessageEventContent(
msgtype=MessageType.NOTICE, format=Format.HTML,
body=f"{evt.sender}: {text_message}",
formatted_body=f"{evt.sender}: "
f"{html_message}",
relates_to=RelatesTo(
rel_type=RelationType("xyz.maubot.gpt.echo"),
event_id=evt.event_id,
))
pong_from = evt.sender.split(":", 1)[1]
content.relates_to["from"] = pong_from
await evt.respond(content)