Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 03900f4416 | |||
| 62f4195e46 | |||
| 751297e3bc | |||
| b450998f3f |
@ -21,10 +21,13 @@ from nonebot_plugin_alconna import (
|
||||
from konabot.common.longtask import DepLongTaskTarget
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
|
||||
DATA_FILE_PATH = (
|
||||
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
|
||||
)
|
||||
from konabot.common.llm import get_llm
|
||||
|
||||
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
|
||||
|
||||
DATA_FILE_PATH = (
|
||||
DATA_DIR / "idiom_banned.json"
|
||||
)
|
||||
|
||||
def load_banned_ids() -> list[str]:
|
||||
if not DATA_FILE_PATH.exists():
|
||||
@ -75,6 +78,27 @@ class TryVerifyState(Enum):
|
||||
BUT_NO_NEXT = 5
|
||||
GAME_END = 6
|
||||
|
||||
class IdiomGameLLM:
|
||||
@classmethod
|
||||
async def verify_idiom_with_llm(cls, idiom: str) -> bool:
|
||||
if len(idiom) != 4:
|
||||
return False
|
||||
llm = get_llm()
|
||||
system_prompt = "请判断用户的输入是否为一个合理的成语,或者这四个字在中文环境下是否说得通。如果是请回答「T」,否则回答「F」。请注意,即使这个词不是成语,如果说得通(也就是能念起来很通顺),你也该输出「T」。请不要包含任何解释,也不要包含任何标点符号。"
|
||||
message = await llm.chat([{"role": "system", "content": system_prompt}, {"role": "user", "content": idiom}])
|
||||
answer = message.content
|
||||
logger.info(f"LLM 对成语 {idiom} 的判断结果是 {answer}")
|
||||
if answer == "T":
|
||||
await cls.storage_idiom(idiom)
|
||||
return answer == "T"
|
||||
|
||||
@classmethod
|
||||
async def storage_idiom(cls, idiom: str):
|
||||
# 将 idiom 存入本地文件以备后续分析
|
||||
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f:
|
||||
f.write(idiom + "\n")
|
||||
IdiomGame.append_into_word_list(idiom)
|
||||
|
||||
|
||||
class IdiomGame:
|
||||
ALL_WORDS = [] # 所有四字词语
|
||||
@ -101,6 +125,17 @@ class IdiomGame:
|
||||
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
|
||||
IdiomGame.INSTANCE_LIST[group_id] = self
|
||||
|
||||
@classmethod
|
||||
def append_into_word_list(cls, word: str):
|
||||
'''
|
||||
将一个新词加入到词语列表中
|
||||
'''
|
||||
if word not in cls.ALL_WORDS:
|
||||
cls.ALL_WORDS.append(word)
|
||||
if word[0] not in cls.IDIOM_FIRST_CHAR:
|
||||
cls.IDIOM_FIRST_CHAR[word[0]] = []
|
||||
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
|
||||
|
||||
def be_able_to_play(self) -> bool:
|
||||
if self.last_play_date != datetime.date.today():
|
||||
self.last_play_date = datetime.date.today()
|
||||
@ -186,7 +221,7 @@ class IdiomGame:
|
||||
用户发送成语
|
||||
"""
|
||||
async with self.lock:
|
||||
state = self._verify_idiom(idiom, user_id)
|
||||
state = await self._verify_idiom(idiom, user_id)
|
||||
return state
|
||||
|
||||
def is_nextable(self, last_char: str) -> bool:
|
||||
@ -218,16 +253,24 @@ class IdiomGame:
|
||||
result.append(" -> ".join(chain))
|
||||
return result
|
||||
|
||||
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
|
||||
async def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
|
||||
state = []
|
||||
# 新成语的首字应与上一条成语的尾字相同
|
||||
if idiom[0] != self.last_char:
|
||||
state.append(TryVerifyState.WRONG_FIRST_CHAR)
|
||||
return state
|
||||
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
|
||||
self.add_score(user_id, -0.1)
|
||||
state.append(TryVerifyState.NOT_IDIOM)
|
||||
return state
|
||||
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
|
||||
try:
|
||||
if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
|
||||
self.add_score(user_id, -0.1)
|
||||
state.append(TryVerifyState.NOT_IDIOM)
|
||||
return state
|
||||
except Exception as e:
|
||||
logger.error(f"LLM 验证成语 {idiom} 时出现错误:{e}")
|
||||
self.add_score(user_id, -0.1)
|
||||
state.append(TryVerifyState.NOT_IDIOM)
|
||||
return state
|
||||
# 成语合法,更新状态
|
||||
self.add_history_idiom(idiom)
|
||||
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
|
||||
@ -335,6 +378,16 @@ class IdiomGame:
|
||||
logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files")
|
||||
logger.debug(f"Sample words: {THUOCL_WORDS[:5]}")
|
||||
|
||||
# 读取本地的 idiom_llm_storage.txt 文件,补充词语表
|
||||
LOCAL_LLM_WORDS = []
|
||||
if (DATA_DIR / "idiom_llm_storage.txt").exists():
|
||||
with open(DATA_DIR / "idiom_llm_storage.txt", "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
word = line.strip()
|
||||
if len(word) == 4:
|
||||
LOCAL_LLM_WORDS.append(word)
|
||||
logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt")
|
||||
|
||||
# 只有成语的大表
|
||||
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
|
||||
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
|
||||
@ -344,6 +397,7 @@ class IdiomGame:
|
||||
[word for word in cls.ALL_WORDS if len(word) == 4]
|
||||
+ THUOCL_WORDS
|
||||
+ COMMON_WORDS
|
||||
+ LOCAL_LLM_WORDS
|
||||
)
|
||||
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
|
||||
|
||||
|
||||
76
konabot/plugins/markdown/__init__.py
Normal file
76
konabot/plugins/markdown/__init__.py
Normal file
@ -0,0 +1,76 @@
|
||||
from loguru import logger
|
||||
import nonebot
|
||||
from nonebot.adapters import Event as BaseEvent
|
||||
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
|
||||
from nonebot_plugin_alconna import (
|
||||
UniMessage,
|
||||
UniMsg
|
||||
)
|
||||
|
||||
from playwright.async_api import ConsoleMessage, Page
|
||||
|
||||
from konabot.common.web_render import konaweb
|
||||
from konabot.common.web_render.core import WebRenderer
|
||||
from konabot.plugins.markdown.core import MarkDownCore
|
||||
|
||||
def is_markdown_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
txt = msg.extract_plain_text()
|
||||
if "markdown" not in txt[:10] or "md" not in txt[:3]:
|
||||
return False
|
||||
return True
|
||||
|
||||
evt = nonebot.on_message(rule=is_markdown_mentioned)
|
||||
|
||||
@evt.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent):
|
||||
if isinstance(event, DiscordMessageEvent):
|
||||
content = msg.extract_plain_text()
|
||||
else:
|
||||
content = msg.extract_plain_text()
|
||||
|
||||
logger.debug(f"Received markdown command with content: {content}")
|
||||
if "md" in content[:3]:
|
||||
message = content.replace("md", "", 1).strip()
|
||||
else:
|
||||
message = content.replace("markdown", "", 1).strip()
|
||||
# 如果回复了消息,则转换回复的内容
|
||||
if(len(message) == 0):
|
||||
if event.reply:
|
||||
message = event.reply.message.extract_plain_text()
|
||||
else:
|
||||
return
|
||||
logger.debug(f"Markdown content to render: {message}")
|
||||
|
||||
out = await MarkDownCore.render_markdown(message, theme="dark")
|
||||
|
||||
await evt.send(await UniMessage().image(raw=out).export())
|
||||
|
||||
|
||||
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
|
||||
txt = msg.extract_plain_text()
|
||||
if "latex" not in txt[:8]:
|
||||
return False
|
||||
return True
|
||||
|
||||
evt = nonebot.on_message(rule=is_latex_mentioned)
|
||||
|
||||
@evt.handle()
|
||||
async def _(msg: UniMsg, event: BaseEvent):
|
||||
if isinstance(event, DiscordMessageEvent):
|
||||
content = msg.extract_plain_text()
|
||||
else:
|
||||
content = msg.extract_plain_text()
|
||||
|
||||
logger.debug(f"Received markdown command with content: {content}")
|
||||
message = content.replace("latex", "", 1).strip()
|
||||
# 如果回复了消息,则转换回复的内容
|
||||
if(len(message) == 0):
|
||||
if event.reply:
|
||||
message = event.reply.message.extract_plain_text()
|
||||
else:
|
||||
return
|
||||
logger.debug(f"Latex content to render: {message}")
|
||||
|
||||
out = await MarkDownCore.render_latex(message, theme="dark")
|
||||
|
||||
await evt.send(await UniMessage().image(raw=out).export())
|
||||
57
konabot/plugins/markdown/core.py
Normal file
57
konabot/plugins/markdown/core.py
Normal file
@ -0,0 +1,57 @@
|
||||
from loguru import logger
|
||||
from playwright.async_api import ConsoleMessage, Page
|
||||
|
||||
from konabot.common.web_render import konaweb
|
||||
from konabot.common.web_render.core import WebRenderer
|
||||
|
||||
class MarkDownCore:
|
||||
@staticmethod
|
||||
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
|
||||
async def page_function(page: Page):
|
||||
async def on_console(msg: ConsoleMessage):
|
||||
logger.debug(f"WEB CONSOLE {msg.text}")
|
||||
|
||||
await page.emulate_media(color_scheme=theme)
|
||||
|
||||
page.on('console', on_console)
|
||||
|
||||
await page.locator('textarea[name=content]').fill(markdown_text)
|
||||
await page.wait_for_timeout(200)
|
||||
await page.locator('#button').click()
|
||||
await page.wait_for_timeout(200)
|
||||
|
||||
out = await WebRenderer.render(
|
||||
konaweb('markdown'),
|
||||
target='#main',
|
||||
other_function=page_function,
|
||||
params=params
|
||||
)
|
||||
|
||||
return out
|
||||
|
||||
@staticmethod
|
||||
async def render_latex(text: str, theme: str = "dark") -> bytes:
|
||||
params = {
|
||||
"size": "2em",
|
||||
}
|
||||
async def page_function(page: Page):
|
||||
async def on_console(msg: ConsoleMessage):
|
||||
logger.debug(f"WEB CONSOLE {msg.text}")
|
||||
|
||||
await page.emulate_media(color_scheme=theme)
|
||||
|
||||
page.on('console', on_console)
|
||||
|
||||
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
|
||||
await page.wait_for_timeout(200)
|
||||
await page.locator('#button').click()
|
||||
await page.wait_for_timeout(200)
|
||||
|
||||
out = await WebRenderer.render(
|
||||
konaweb('latex'),
|
||||
target='#main',
|
||||
other_function=page_function,
|
||||
params=params
|
||||
)
|
||||
|
||||
return out
|
||||
@ -4,12 +4,17 @@ import PIL.Image
|
||||
from konabot.common.path import ASSETS_PATH
|
||||
from konabot.common.utils.to_async import make_async
|
||||
|
||||
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA").resize((140, 40))
|
||||
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA")
|
||||
|
||||
|
||||
@make_async
|
||||
def draw_doubao_watermark(base: PIL.Image.Image) -> PIL.Image.Image:
|
||||
base = base.copy().convert("RGBA")
|
||||
base.alpha_composite(doubao_watermark, (base.size[0] - 160, base.size[1] - 60))
|
||||
w = base.size[0] / 768 * 140
|
||||
h = base.size[0] / 768 * 40
|
||||
x = base.size[0] / 768 * 160
|
||||
y = base.size[0] / 768 * 60
|
||||
w, h, x, y = map(int, (w, h, x, y))
|
||||
base.alpha_composite(doubao_watermark.resize((w, h)), (base.size[0] - x, base.size[1] - y))
|
||||
return base
|
||||
|
||||
|
||||
Reference in New Issue
Block a user