123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- from typing import Type, Optional
- from time import time
- from html import escape
- from mautrix.types import MediaMessageEventContent, TextMessageEventContent, MessageType, Format, RelatesTo, RelationType, RoomID
- from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
- from maubot import Plugin, MessageEvent
- from maubot.handlers import command
- import json
- import openai
- class Config(BaseProxyConfig):
- def do_update(self, helper: ConfigUpdateHelper) -> None:
- helper.copy("gpt_on")
- helper.copy("gpt_room_id")
- helper.copy("gpt_apikey")
- helper.copy("gpt_model_engine")
- helper.copy("gpt_temperature")
- helper.copy("gpt_top_p")
- helper.copy("gpt_presence_penalty")
- helper.copy("gpt_frequency_penalty")
- helper.copy("gpt_echo")
- helper.copy("gpt_stop")
- helper.copy("gpt_n")
- helper.copy("gpt_stream")
- helper.copy("gpt_logprobs")
- helper.copy("gpt_best_of")
- helper.copy("gpt_logit_bias")
- class EchoBot(Plugin):
-
- @classmethod
- def get_config_class(cls) -> Type[BaseProxyConfig]:
- return Config
- async def start(self) -> None:
- await super().start()
- self.config.load_and_update()
- self.http = self.client.api.session
-
- async def stop(self) -> None:
- await super().stop()
- @staticmethod
- def plural(num: float, unit: str, decimals: Optional[int] = None) -> str:
- num = round(num, decimals)
- if num == 1:
- return f"{num} {unit}"
- else:
- return f"{num} {unit}s"
- @classmethod
- def prettify_diff(cls, diff: int) -> str:
- if abs(diff) < 10 * 1_000:
- return f"{diff} ms"
- elif abs(diff) < 60 * 1_000:
- return cls.plural(diff / 1_000, 'second', decimals=1)
- minutes, seconds = divmod(diff / 1_000, 60)
- if abs(minutes) < 60:
- return f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}"
- hours, minutes = divmod(minutes, 60)
- if abs(hours) < 24:
- return (f"{cls.plural(hours, 'hour')}, {cls.plural(minutes, 'minute')}"
- f" and {cls.plural(seconds, 'second')}")
- days, hours = divmod(hours, 24)
- return (f"{cls.plural(days, 'day')}, {cls.plural(hours, 'hour')}, "
- f"{cls.plural(minutes, 'minute')} and {cls.plural(seconds, 'second')}")
- @command.new("ping", help="Ping")
- @command.argument("message", pass_raw=True, required=False)
- async def ping_handler(self, evt: MessageEvent, message: str = "") -> None:
- diff = int(time() * 1000) - evt.timestamp
- pretty_diff = self.prettify_diff(diff)
- text_message = f'"{message[:20]}" took' if message else "took"
- html_message = f'"{escape(message[:20])}" took' if message else "took"
- content = TextMessageEventContent(
- msgtype=MessageType.NOTICE, format=Format.HTML,
- body=f"{evt.sender}: Pong! (ping {text_message} {pretty_diff} to arrive)",
- formatted_body=f"<a href='https://matrix.example.pl/#/{evt.sender}'>{evt.sender}</a>: Pong! "
- f"(<a href='https://matrix.example.pl/#/{evt.room_id}/{evt.event_id}'>ping</a> {html_message} "
- f"{pretty_diff} to arrive)",
- relates_to=RelatesTo(
- rel_type=RelationType("xyz.maubot.gpt.echo"),
- event_id=evt.event_id,
- ))
- pong_from = evt.sender.split(":", 1)[1]
- content.relates_to["from"] = pong_from
- content.relates_to["ms"] = diff
- content["pong"] = {
- "ms": diff,
- "from": pong_from,
- "ping": evt.event_id,
- }
- await evt.respond(content)
- @command.new("echo", help="Repeat a message")
- @command.argument("message", pass_raw=True)
- async def echo_handler(self, evt: MessageEvent, message: str) -> None:
- await evt.respond(message)
- @command.new("gpt", help="ChatGPT response")
- @command.argument("message", pass_raw=True, required=False)
- async def gpt_handler(self, evt: MessageEvent, message: str = "") -> None:
- if self.config["gpt_on"] and evt.room_id in self.config["gpt_room_id"]:
- openai.api_key = self.config["gpt_apikey"]
- resp = openai.Completion.create(engine=self.config["gpt_model_engine"],
- prompt=message,
- max_tokens=int(self.config["gpt_max_tokens"]),
- temperature=float(self.config["gpt_temperature"]),
- top_p=int(self.config["gpt_top_p"]),
- presence_penalty=int(self.config["gpt_presence_penalty"]),
- frequency_penalty=int(self.config["gpt_frequency_penalty"]),
- echo=self.config["gpt_echo"],
- stop=self.config["gpt_stop"],
- n=int(self.config["gpt_n"]),
- stream=self.config["gpt_stream"],
- # logprobs=self.config["gpt_logprobs"],
- best_of=int(self.config["gpt_best_of"]),
- logit_bias={}
- )
- n = len(resp.choices)
- if n == 1:
- html_message = resp.choices[0].text
- text_message = resp.choices[0].text
- else:
- texts = []
- for idx in range(0, n):
- html_message.append(resp.choices[idx].text)
- text_message.append(resp.choices[idx].text)
- else:
- html_message = f'chatGPT jest wyłączony.'
- text_message = 'chatGPT jest wyłączony.'
-
- content = TextMessageEventContent(
- msgtype=MessageType.NOTICE, format=Format.HTML,
- body=f"{evt.sender}: {text_message}",
- formatted_body=f"{evt.sender}: "
- f"{html_message}",
- relates_to=RelatesTo(
- rel_type=RelationType("xyz.maubot.gpt.echo"),
- event_id=evt.event_id,
- ))
- pong_from = evt.sender.split(":", 1)[1]
- content.relates_to["from"] = pong_from
- await evt.respond(content)
|