Compare commits

...

4 Commits

Author SHA1 Message Date
03900f4416 成语接龙接入 LLM 和 MarkDown、LaTeX 接入
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-11-09 23:12:04 +08:00
62f4195e46 Merge pull request '让豆包水印使用相对大小' (#47) from feature/doubao-watermark into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #47
2025-11-07 21:17:16 +08:00
751297e3bc Merge branch 'master' into feature/doubao-watermark 2025-11-07 21:17:09 +08:00
b450998f3f 让豆包水印使用相对大小 2025-11-07 21:15:19 +08:00
4 changed files with 202 additions and 10 deletions

View File

@ -21,10 +21,13 @@ from nonebot_plugin_alconna import (
from konabot.common.longtask import DepLongTaskTarget
from konabot.common.path import ASSETS_PATH
DATA_FILE_PATH = (
Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
)
from konabot.common.llm import get_llm
DATA_DIR = Path(__file__).parent.parent.parent.parent / "data"
DATA_FILE_PATH = (
DATA_DIR / "idiom_banned.json"
)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
@ -75,6 +78,27 @@ class TryVerifyState(Enum):
BUT_NO_NEXT = 5
GAME_END = 6
class IdiomGameLLM:
@classmethod
async def verify_idiom_with_llm(cls, idiom: str) -> bool:
if len(idiom) != 4:
return False
llm = get_llm()
system_prompt = "请判断用户的输入是否为一个合理的成语或者这四个字在中文环境下是否说得通。如果是请回答「T」否则回答「F」。请注意即使这个词不是成语如果说得通也就是能念起来很通顺你也该输出「T」。请不要包含任何解释也不要包含任何标点符号。"
message = await llm.chat([{"role": "system", "content": system_prompt}, {"role": "user", "content": idiom}])
answer = message.content
logger.info(f"LLM 对成语 {idiom} 的判断结果是 {answer}")
if answer == "T":
await cls.storage_idiom(idiom)
return answer == "T"
@classmethod
async def storage_idiom(cls, idiom: str):
# 将 idiom 存入本地文件以备后续分析
with open(DATA_DIR / "idiom_llm_storage.txt", "a", encoding="utf-8") as f:
f.write(idiom + "\n")
IdiomGame.append_into_word_list(idiom)
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
@ -101,6 +125,17 @@ class IdiomGame:
self.idiom_history: list[list[str]] = [] # 成语使用历史记录,多个数组以存储不同成语链
IdiomGame.INSTANCE_LIST[group_id] = self
@classmethod
def append_into_word_list(cls, word: str):
'''
将一个新词加入到词语列表中
'''
if word not in cls.ALL_WORDS:
cls.ALL_WORDS.append(word)
if word[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[word[0]] = []
cls.IDIOM_FIRST_CHAR[word[0]].append(word)
def be_able_to_play(self) -> bool:
if self.last_play_date != datetime.date.today():
self.last_play_date = datetime.date.today()
@ -186,7 +221,7 @@ class IdiomGame:
用户发送成语
"""
async with self.lock:
state = self._verify_idiom(idiom, user_id)
state = await self._verify_idiom(idiom, user_id)
return state
def is_nextable(self, last_char: str) -> bool:
@ -218,16 +253,24 @@ class IdiomGame:
result.append(" -> ".join(chain))
return result
def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
async def _verify_idiom(self, idiom: str, user_id: str) -> list[TryVerifyState]:
state = []
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
state.append(TryVerifyState.WRONG_FIRST_CHAR)
return state
if idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS:
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
logger.info(f"用户 {user_id} 发送了未知词语 {idiom},正在使用 LLM 进行验证")
try:
if not await IdiomGameLLM.verify_idiom_with_llm(idiom):
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
except Exception as e:
logger.error(f"LLM 验证成语 {idiom} 时出现错误:{e}")
self.add_score(user_id, -0.1)
state.append(TryVerifyState.NOT_IDIOM)
return state
# 成语合法,更新状态
self.add_history_idiom(idiom)
score_k = 0.5 ** self.get_already_used_num(idiom) # 每被使用过一次,得分减半
@ -335,6 +378,16 @@ class IdiomGame:
logger.debug(f"Loaded {len(THUOCL_WORDS)} words from THUOCL txt files")
logger.debug(f"Sample words: {THUOCL_WORDS[:5]}")
# 读取本地的 idiom_llm_storage.txt 文件,补充词语表
LOCAL_LLM_WORDS = []
if (DATA_DIR / "idiom_llm_storage.txt").exists():
with open(DATA_DIR / "idiom_llm_storage.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
LOCAL_LLM_WORDS.append(word)
logger.debug(f"Loaded additional {len(LOCAL_LLM_WORDS)} words from idiom_llm_storage.txt")
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
@ -344,6 +397,7 @@ class IdiomGame:
[word for word in cls.ALL_WORDS if len(word) == 4]
+ THUOCL_WORDS
+ COMMON_WORDS
+ LOCAL_LLM_WORDS
)
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重

View File

@ -0,0 +1,76 @@
from loguru import logger
import nonebot
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (
UniMessage,
UniMsg
)
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
from konabot.plugins.markdown.core import MarkDownCore
def is_markdown_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "markdown" not in txt[:10] or "md" not in txt[:3]:
return False
return True
evt = nonebot.on_message(rule=is_markdown_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
if "md" in content[:3]:
message = content.replace("md", "", 1).strip()
else:
message = content.replace("markdown", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Markdown content to render: {message}")
out = await MarkDownCore.render_markdown(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())
def is_latex_mentioned(evt: BaseEvent, msg: UniMsg) -> bool:
txt = msg.extract_plain_text()
if "latex" not in txt[:8]:
return False
return True
evt = nonebot.on_message(rule=is_latex_mentioned)
@evt.handle()
async def _(msg: UniMsg, event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
content = msg.extract_plain_text()
else:
content = msg.extract_plain_text()
logger.debug(f"Received markdown command with content: {content}")
message = content.replace("latex", "", 1).strip()
# 如果回复了消息,则转换回复的内容
if(len(message) == 0):
if event.reply:
message = event.reply.message.extract_plain_text()
else:
return
logger.debug(f"Latex content to render: {message}")
out = await MarkDownCore.render_latex(message, theme="dark")
await evt.send(await UniMessage().image(raw=out).export())

View File

@ -0,0 +1,57 @@
from loguru import logger
from playwright.async_api import ConsoleMessage, Page
from konabot.common.web_render import konaweb
from konabot.common.web_render.core import WebRenderer
class MarkDownCore:
@staticmethod
async def render_markdown(markdown_text: str, theme: str = "dark", params: dict = {}) -> bytes:
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(markdown_text)
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
out = await WebRenderer.render(
konaweb('markdown'),
target='#main',
other_function=page_function,
params=params
)
return out
@staticmethod
async def render_latex(text: str, theme: str = "dark") -> bytes:
params = {
"size": "2em",
}
async def page_function(page: Page):
async def on_console(msg: ConsoleMessage):
logger.debug(f"WEB CONSOLE {msg.text}")
await page.emulate_media(color_scheme=theme)
page.on('console', on_console)
await page.locator('textarea[name=content]').fill(f"$$ {text} $$")
await page.wait_for_timeout(200)
await page.locator('#button').click()
await page.wait_for_timeout(200)
out = await WebRenderer.render(
konaweb('latex'),
target='#main',
other_function=page_function,
params=params
)
return out

View File

@ -4,12 +4,17 @@ import PIL.Image
from konabot.common.path import ASSETS_PATH
from konabot.common.utils.to_async import make_async
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA").resize((140, 40))
doubao_watermark = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "doubao.png").convert("RGBA")
@make_async
def draw_doubao_watermark(base: PIL.Image.Image) -> PIL.Image.Image:
base = base.copy().convert("RGBA")
base.alpha_composite(doubao_watermark, (base.size[0] - 160, base.size[1] - 60))
w = base.size[0] / 768 * 140
h = base.size[0] / 768 * 40
x = base.size[0] / 768 * 160
y = base.size[0] / 768 * 60
w, h, x, y = map(int, (w, h, x, y))
base.alpha_composite(doubao_watermark.resize((w, h)), (base.size[0] - x, base.size[1] - y))
return base