from typing import Type, Optional from time import time from html import escape from mautrix.types import MediaMessageEventContent, TextMessageEventContent, MessageType, Format, RelatesTo, RelationType, RoomID from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin, MessageEvent from maubot.handlers import command import json import openai class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("gpt_on") helper.copy("gpt_room_id") helper.copy("gpt_apikey") helper.copy("gpt_model_engine") helper.copy("gpt_temperature") helper.copy("gpt_top_p") helper.copy("gpt_presence_penalty") helper.copy("gpt_frequency_penalty") helper.copy("gpt_echo") helper.copy("gpt_stop") helper.copy("gpt_n") helper.copy("gpt_stream") helper.copy("gpt_logprobs") helper.copy("gpt_best_of") helper.copy("gpt_logit_bias") class EchoBot(Plugin): @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config async def start(self) -> None: await super().start() self.config.load_and_update() self.http = self.client.api.session async def stop(self) -> None: await super().stop() @staticmethod def plural(num: float, unit: str, decimals: Optional[int] = None) -> str: num = round(num, decimals) if num == 1: return f"{num} {unit}" else: return f"{num} {unit}s" @classmethod def prettify_diff(cls, diff: int) -> str: if abs(diff) < 10 * 1_000: return f"{diff} ms" elif abs(diff) < 60 * 1_000: return cls.plural(diff / 1_000, 'second', decimals=1) minutes, seconds = divmod(diff / 1_000, 60) if abs(minutes) < 60: return f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}" hours, minutes = divmod(minutes, 60) if abs(hours) < 24: return (f"{cls.plural(hours, 'hour')}, {cls.plural(minutes, 'minute')}" f" and {cls.plural(seconds, 'second')}") days, hours = divmod(hours, 24) return (f"{cls.plural(days, 'day')}, {cls.plural(hours, 'hour')}, " f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}") @command.new("ping", help="Ping") @command.argument("message", pass_raw=True, required=False) async def ping_handler(self, evt: MessageEvent, message: str = "") -> None: diff = int(time() * 1000) - evt.timestamp pretty_diff = self.prettify_diff(diff) text_message = f'"{message[:20]}" took' if message else "took" html_message = f'"{escape(message[:20])}" took' if message else "took" content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"{evt.sender}: Pong! (ping {text_message} {pretty_diff} to arrive)", formatted_body=f"{evt.sender}: Pong! " f"(ping {html_message} " f"{pretty_diff} to arrive)", relates_to=RelatesTo( rel_type=RelationType("xyz.maubot.gpt.echo"), event_id=evt.event_id, )) pong_from = evt.sender.split(":", 1)[1] content.relates_to["from"] = pong_from content.relates_to["ms"] = diff content["pong"] = { "ms": diff, "from": pong_from, "ping": evt.event_id, } await evt.respond(content) @command.new("echo", help="Repeat a message") @command.argument("message", pass_raw=True) async def echo_handler(self, evt: MessageEvent, message: str) -> None: await evt.respond(message) @command.new("gpt", help="ChatGPT response") @command.argument("message", pass_raw=True, required=False) async def gpt_handler(self, evt: MessageEvent, message: str = "") -> None: if self.config["gpt_on"] and evt.room_id in self.config["gpt_room_id"]: openai.api_key = self.config["gpt_apikey"] resp = openai.Completion.create(engine=self.config["gpt_model_engine"], prompt=message, max_tokens=int(self.config["gpt_max_tokens"]), temperature=float(self.config["gpt_temperature"]), top_p=int(self.config["gpt_top_p"]), presence_penalty=int(self.config["gpt_presence_penalty"]), frequency_penalty=int(self.config["gpt_frequency_penalty"]), echo=self.config["gpt_echo"], stop=self.config["gpt_stop"], n=int(self.config["gpt_n"]), stream=self.config["gpt_stream"], # logprobs=self.config["gpt_logprobs"], best_of=int(self.config["gpt_best_of"]), logit_bias={} ) n = len(resp.choices) if n == 1: html_message = resp.choices[0].text text_message = resp.choices[0].text else: texts = [] for idx in range(0, n): html_message.append(resp.choices[idx].text) text_message.append(resp.choices[idx].text) else: html_message = f'chatGPT jest wyłączony.' text_message = 'chatGPT jest wyłączony.' content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"{evt.sender}: {text_message}", formatted_body=f"{evt.sender}: " f"{html_message}", relates_to=RelatesTo( rel_type=RelationType("xyz.maubot.gpt.echo"), event_id=evt.event_id, )) pong_from = evt.sender.split(":", 1)[1] content.relates_to["from"] = pong_from await evt.respond(content)