Compare commits

...

10 Commits

Author SHA1 Message Date
8f560ce1ba 新成语接龙
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-19 01:25:34 +08:00
9f3f79f51d 自动同意小团体的好友请求
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-17 01:04:34 +08:00
92048aeff7 让 wzq 东西在 wzq 群不可用 2025-10-17 00:54:14 +08:00
81aac10665 添加文档并修复问题
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 23:27:42 +08:00
3ce230adfe 优化卵总展示光影
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:43:54 +08:00
4f885554ca 添加卵总展示
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 22:29:07 +08:00
7ebcb8add4 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-16 19:13:51 +08:00
e18cc82792 修复 av/bv 号无法直接被筛选读取的问题 2025-10-16 19:13:36 +08:00
eb28cd0a0c 更正 Giftool 错误的文档 2025-10-16 18:44:22 +08:00
2d688a6ed6 new
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2025-10-14 12:43:25 +08:00
13 changed files with 580 additions and 173 deletions

BIN
assets/img/meme/snaur_1_base.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

BIN
assets/img/meme/snaur_1_top.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1008 KiB

View File

@ -0,0 +1,34 @@
from typing import cast
from nonebot import get_bot, get_plugin_config, logger
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot.rule import Rule
from pydantic import BaseModel
class WZQConflictConfig(BaseModel):
wzq_bot_qq: int = 0
config = get_plugin_config(WZQConflictConfig)
async def no_wzqbot(evt: BaseEvent):
if config.wzq_bot_qq <= 0:
return True
if not isinstance(evt, GroupMessageEvent):
return True
gid = evt.group_id
sid = evt.self_id
bot = cast(OnebotBot, get_bot(str(sid)))
members = await bot.get_group_member_list(group_id=gid)
members = set((m.get("user_id", -1) for m in members))
if config.wzq_bot_qq in members:
return False
return True
no_wzqbot_rule = Rule(no_wzqbot)

View File

@ -45,7 +45,7 @@
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
--speed <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
使用方式

View File

@ -0,0 +1,20 @@
指令介绍
卵总展示 - 让卵总举起你的图片
格式
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
选项
`--whiteness <number>` 白度
将原图进行指数变换,以调整它的白的程度,默认为 0.0
`--black-level <number>` 黑色等级
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
`--opacity <number>` 不透明度
将你的图片叠放在图片上的不透明度,默认为 0.8
`--saturation <number>` 饱和度
调整原图的饱和度,应该要大于 0.0,默认为 0.85

View File

@ -0,0 +1,25 @@
import asyncio
import random
from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from konabot.common.nb.is_admin import cfg as adminConfig
add_request = on_request()
@add_request.handle()
async def _(req: FriendRequestEvent):
bot = cast(OnebotBot, get_bot(str(req.self_id)))
ok_member_ls: set[int] = set()
for group in adminConfig.admin_qq_group:
members = await bot.get_group_member_list(group_id=group)
ok_member_ls |= cast(set[int], set((m.get("user_id") for m in members)))
if req.user_id in ok_member_ls:
await asyncio.sleep(random.randint(5, 10))
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")

View File

@ -0,0 +1,39 @@
import re
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
return
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)
# b_url: str
# b_page: str | None
# b_time: str | None
#
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
#
# b_url, b_page, b_time = bilibili_extract(to_search)
# if b_url is None:
# return
#
# await matcher_fix.send(await UniMessage().text(b_url).export())

View File

@ -1,25 +0,0 @@
import re
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
if not re.search(pattern, to_search):
return
logger.info("检测到有 Bilibili 相关的消息,直接进行一个调用")
_module = __import__("nonebot_plugin_analysis_bilibili")
await _module.handle_analysis(event)

View File

@ -8,6 +8,8 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
async def download_img(url):
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
img_bytes = BytesIO()
@ -42,7 +44,7 @@ gqrc = on_alconna(Alconna(
missing_tips=lambda: "请输入你要转换为二维码的文字!"
)],
# UniMessage[]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}, rule=no_wzqbot_rule)
@gqrc.handle()
async def _(saying: list):
@ -66,4 +68,4 @@ async def _(saying: list):
else:
"""
# genqr("\n".join(saying))
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())

View File

@ -1,8 +1,13 @@
import asyncio as asynkio
import base64
from pathlib import Path
import secrets
import json
from typing import Literal
import datetime
from typing import Literal, Optional
from enum import Enum
from loguru import logger
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
@ -12,111 +17,302 @@ from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
from konabot.common.path import ASSETS_PATH
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
IDIOM_FIRST_CHAR = {} # 成语首字字典
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "idiom_banned.json"
INITED = False
def init_lexicon():
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
ALL_WORDS = json.load(f)
def load_banned_ids() -> list[str]:
if not DATA_FILE_PATH.exists():
return []
try:
return json.loads(DATA_FILE_PATH.read_text())
except Exception as e:
logger.warning(f"在解析成语接龙封禁文件时遇到问题:{e}")
return []
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
def is_idiom_game_banned(group_id: str) -> bool:
banned_ids = load_banned_ids()
return group_id in banned_ids
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
def add_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id not in banned_ids:
banned_ids.append(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
def remove_banned_id(group_id: str):
banned_ids = load_banned_ids()
if group_id in banned_ids:
banned_ids.remove(group_id)
DATA_FILE_PATH.write_text(json.dumps(banned_ids, ensure_ascii=False, indent=4))
class TryStartState(Enum):
STARTED = 0
ALREADY_PLAYING = 1
NO_REMAINING_TIMES = 2
class TryStopState(Enum):
STOPPED = 0
NOT_PLAYING = 1
class TryVerifyState(Enum):
VERIFIED = 0
NOT_IDIOM = 1
WRONG_FIRST_CHAR = 2
VERIFIED_BUT_NO_NEXT = 3
VERIFIED_GAME_END = 4
class IdiomGame:
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
INSTANCE_LIST : dict[str, "IdiomGame"] = {} # 群号对应的游戏实例
IDIOM_FIRST_CHAR = {} # 成语首字字典
__inited = False
def __init__(self, group_id: str):
# 初始化一局游戏
self.group_id = ""
self.now_playing = False
self.score_board = {}
self.last_idiom = ""
self.last_char = ""
self.remain_playing_times = 3
self.last_play_date = ""
self.all_buff_score = 0
self.lock = asynkio.Lock()
self.remain_rounds = 0 # 剩余回合数
IdiomGame.INSTANCE_LIST[group_id] = self
def be_able_to_play(self) -> bool:
if(self.last_play_date != datetime.date.today()):
self.last_play_date = datetime.date.today()
self.remain_playing_times = 1
if(self.remain_playing_times > 0):
self.remain_playing_times -= 1
return True
return False
def choose_start_idiom(self) -> str:
'''
随机选择一个成语作为起始成语
'''
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
if not self.is_nextable(self.last_char):
self.choose_start_idiom()
return self.last_idiom
@classmethod
def try_start_game(cls, group_id: str, force: bool = False) -> TryStartState:
cls.init_lexicon()
if not cls.INSTANCE_LIST.get(group_id):
cls(group_id)
instance = cls.INSTANCE_LIST[group_id]
if instance.now_playing:
return TryStartState.ALREADY_PLAYING
if not instance.be_able_to_play() and not force:
return TryStartState.NO_REMAINING_TIMES
instance.now_playing = True
return TryStartState.STARTED
def start_game(self, rounds: int = 100):
self.now_playing = True
self.remain_rounds = rounds
self.choose_start_idiom()
@classmethod
def try_stop_game(cls, group_id: str) -> TryStopState:
if not cls.INSTANCE_LIST.get(group_id):
return TryStopState.NOT_PLAYING
instance = cls.INSTANCE_LIST[group_id]
if not instance.now_playing:
return TryStopState.NOT_PLAYING
instance.now_playing = False
return TryStopState.STOPPED
def clear_score_board(self):
self.score_board = {}
self.last_char = ""
def get_score_board(self) -> dict:
return self.score_board
def get_all_buff_score(self) -> int:
return self.all_buff_score
async def skip_idiom(self, buff_score: int = -100) -> str:
'''
跳过当前成语,选择下一个成语
'''
await self.lock.acquire()
self._skip_idiom_async(buff_score)
self.lock.release()
return self.last_idiom
def _skip_idiom_async(self, buff_score: int = -100) -> str:
self.last_idiom = secrets.choice(IdiomGame.ALL_IDIOMS)
self.last_char = self.last_idiom[-1]
self.add_buff_score(buff_score)
return self.last_idiom
async def try_verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
'''
用户发送成语
'''
await self.lock.acquire()
state = self._verify_idiom(idiom, user_id)
self.lock.release()
return state
def is_nextable(self, last_char: str) -> bool:
'''
判断是否有成语可以接
'''
return last_char in IdiomGame.IDIOM_FIRST_CHAR
# 只有成语的大表
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
def _verify_idiom(self, idiom: str, user_id: str) -> TryVerifyState:
# 新成语的首字应与上一条成语的尾字相同
if idiom[0] != self.last_char:
return TryVerifyState.WRONG_FIRST_CHAR
if(idiom not in IdiomGame.ALL_IDIOMS and idiom not in IdiomGame.ALL_WORDS):
self.add_score(user_id, -0.1)
return TryVerifyState.NOT_IDIOM
self.last_idiom = idiom
self.last_char = idiom[-1]
self.add_score(user_id, 1)
self.remain_rounds -= 1
if(self.remain_rounds <= 0):
self.now_playing = False
return TryVerifyState.VERIFIED_GAME_END
if(not self.is_nextable(self.last_char)):
# 没有成语可以接了,自动跳过
self._skip_idiom_async()
return TryVerifyState.VERIFIED_BUT_NO_NEXT
return TryVerifyState.VERIFIED
def get_user_score(self, user_id: str) -> int:
if user_id not in self.score_board:
return 0
return self.score_board[user_id]["score"]
def add_score(self, user_id: str, score: int):
if user_id not in self.score_board:
self.score_board[user_id] = {
"name": user_id,
"score": 0
}
self.score_board[user_id]["score"] += score
def add_buff_score(self, score: int):
self.all_buff_score += score
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
ALL_WORDS = list(set(ALL_WORDS)) # 去重
def get_playing_state(self) -> bool:
return self.now_playing
def get_last_char(self) -> str:
return self.last_char
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = {}
for idiom in ALL_IDIOMS + ALL_WORDS:
if idiom[0] not in IDIOM_FIRST_CHAR:
IDIOM_FIRST_CHAR[idiom[0]] = []
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
@classmethod
def init_lexicon(cls):
if(cls.__inited):
return
cls.__inited = True
NOW_PLAYING = False
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
SCORE_BOARD = {}
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
cls.ALL_WORDS = json.load(f)
LAST_CHAR = ""
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
REMAIN_PLAYING_TIMES = 1
LAST_PLAY_DATE = ""
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
LOCK = False
ALL_BUFF_SCORE = 0 # 全体分数
# 只有成语的大表
cls.ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
cls.ALL_IDIOMS = list(set(cls.ALL_IDIOMS)) # 去重
import datetime
# 其他四字词语表,仅表示可以有这个词
cls.ALL_WORDS = [word for word in cls.ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
cls.ALL_WORDS = list(set(cls.ALL_WORDS)) # 去重
def be_able_to_play():
global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE
if(LAST_PLAY_DATE != datetime.date.today()):
LAST_PLAY_DATE = datetime.date.today()
REMAIN_PLAYING_TIMES = 1
if(REMAIN_PLAYING_TIMES > 0):
REMAIN_PLAYING_TIMES -= 1
return True
return False
# 根据成语大表,划分出成语首字字典
for idiom in cls.ALL_IDIOMS + cls.ALL_WORDS:
if idiom[0] not in cls.IDIOM_FIRST_CHAR:
cls.IDIOM_FIRST_CHAR[idiom[0]] = []
cls.IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
evt = on_alconna(Alconna(
"我要玩成语接龙"
"我要玩成语接龙",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, INITED
if not be_able_to_play():
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉").export())
async def play_game(event: BaseEvent, force = False, rounds: Optional[int] = 100):
group_id = str(event.get_session_id())
if is_idiom_game_banned(group_id):
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能").export())
return
if not INITED:
init_lexicon()
INITED = True
if NOW_PLAYING:
if rounds <= 0:
await evt.send(await UniMessage().text("干什么!你想玩负数局吗?").export())
return
state = IdiomGame.try_start_game(group_id, force)
if state == TryStartState.ALREADY_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
return
NOW_PLAYING = True
if state == TryStartState.NO_REMAINING_TIMES:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
# 选择一个随机成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
instance = IdiomGame.INSTANCE_LIST[group_id]
instance.start_game(rounds)
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
await evt.send(await UniMessage().text(f"第一个成语:「{instance.last_idiom}」,请接!").export())
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!",
Args['rounds?', int],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent, rounds: Optional[int] = 100):
await play_game(event, force=True, rounds=rounds)
async def end_game(event: BaseEvent, group_id: str):
instance = IdiomGame.INSTANCE_LIST[group_id]
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
score_board = instance.get_score_board()
if len(score_board) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(score_board.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + instance.get_all_buff_score()}\n"
await evt.send(await result_text.export())
instance.clear_score_board()
evt = on_alconna(Alconna(
"不玩了"
@ -124,26 +320,15 @@ evt = on_alconna(Alconna(
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE
if NOW_PLAYING:
NOW_PLAYING = False
group_id = str(event.get_session_id())
state = IdiomGame.try_stop_game(group_id)
if state == TryStopState.STOPPED:
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
if len(SCORE_BOARD) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE}\n"
await evt.send(await result_text.export())
# 重置分数板
SCORE_BOARD = {}
LAST_CHAR = ""
await end_game(event, group_id)
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
@ -154,58 +339,66 @@ evt = on_alconna(Alconna(
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE
if not NOW_PLAYING:
group_id = str(event.get_session_id())
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
ALL_BUFF_SCORE -= 100
# 选择下一个成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
idiom = await instance.skip_idiom(-100)
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, LOCK
if not NOW_PLAYING:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom[0] != LAST_CHAR):
return
if LOCK:
return
LOCK = True
await handle_send_info(event, msg)
LOCK = False
async def handle_send_info(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE
if not NOW_PLAYING:
return
if LOCK:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
await evt.send(await UniMessage().text("接不上!这个不一样!").export())
return
# 成功接上
def get_user_info(event: BaseEvent):
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
return user_id, user_name
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] += 1
# at 指定玩家
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export())
LAST_CHAR = user_idiom[-1]
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
group_id = str(event.get_session_id())
instance = IdiomGame.INSTANCE_LIST.get(group_id)
if not instance or not instance.get_playing_state():
return
user_idiom = msg.extract_plain_text().strip()
user_id , user_name = get_user_info(event)
state = await instance.try_verify_idiom(user_idiom, user_id)
if(state == TryVerifyState.WRONG_FIRST_CHAR):
return
if(state == TryVerifyState.NOT_IDIOM):
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
return
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {instance.get_user_score(user_id)} 分!").export())
if(state == TryVerifyState.VERIFIED_GAME_END):
await evt.send(await UniMessage().text("全部回合结束!").export())
await end_game(event, group_id)
return
if(state == TryVerifyState.VERIFIED_BUT_NO_NEXT):
await evt.send(await UniMessage().text("但是,这是条死路!你们全部都要扣 100 分!").export())
await evt.send(await UniMessage().text(f"重新抽取成语「{instance.last_idiom}").export())
await evt.send(await UniMessage().text(f"下一个成语请以「{instance.get_last_char()}」开头!").export())
evt = on_alconna(Alconna(
"禁止成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
add_banned_id(group_id)
await evt.send(await UniMessage().text("本群已被禁止使用成语接龙功能!").export())
evt = on_alconna(Alconna(
"开启成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
group_id = str(event.get_session_id())
remove_banned_id(group_id)
await evt.send(await UniMessage().text("本群已开启成语接龙功能!").export())

View File

@ -2,11 +2,11 @@ from io import BytesIO
from typing import Iterable, cast
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text,
UniMessage, UniMsg, on_alconna)
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
@ -139,3 +139,24 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
.text(err)
.export()
)
snaur_display_cmd = on_alconna(Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
))
@snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2,
opacity: float = 0.8, saturation: float = 0.85):
img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation,
)
img_data = BytesIO()
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())

View File

@ -4,10 +4,12 @@ from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
import PIL.ImageChops
import PIL.ImageEnhance
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
@ -15,6 +17,16 @@ CAO_QUAD_POINTS = np.float32(cast(Any, [
[381, 687],
]))
snaur_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_base.png")
snaur_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_top.png")
SNAUR_RATIO = (1 / 2) ** .5
SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[0, 466 ],
[673, 471 ],
[640, 1196],
[106, 1280],
]))
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
@ -43,3 +55,87 @@ def _draw_cao_display(image: PIL.Image.Image):
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)
def _draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
):
src = np.array(image.convert("RGBA"))
_h, _w = src.shape[:2]
if _w / _h < SNAUR_RATIO:
_w_target = _w
_h_target = int(_w / SNAUR_RATIO)
else:
_w_target = int(_h * SNAUR_RATIO)
_h_target = _h
x_center = _w / 2
y_center = _h / 2
x1 = int(x_center - _w_target / 2)
x2 = int(x_center + _w_target / 2)
y1 = int(y_center - _h_target / 2)
y2 = int(y_center + _h_target / 2)
src = src[y1:y2, x1:x2, :]
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h],
]))
dst_points = SNAUR_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = snaur_image_top.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGBA')
r, g, b, a = result.split()
a = a.point(lambda p: int(p * opacity))
f2 = lambda p: int(
((p / 255) ** (2 ** whiteness)) * 255 * (1 - black_level)
+ 255 * black_level
)
r = r.point(f2)
g = g.point(f2)
b = b.point(f2)
result = PIL.Image.merge('RGBA', (r, g, b, a))
enhancer = PIL.ImageEnhance.Color(result)
result = enhancer.enhance(saturation)
result = PIL.ImageChops.multiply(result, snaur_image_base)
result = PIL.Image.alpha_composite(snaur_image_base, result)
result = PIL.Image.alpha_composite(result, snaur_image_top)
return result
async def draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
) -> PIL.Image.Image:
return await asyncio.to_thread(
_draw_snaur_display, image, whiteness, black_level,
opacity, saturation,
)

View File

@ -1,8 +1,10 @@
import json, time
from nonebot.rule import Rule
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
from nonebot.adapters.onebot.v11 import Event
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
from konabot.common.path import ASSETS_PATH, DATA_PATH
@ -66,7 +68,7 @@ poll = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
@poll.handle()
async def _(saying: list, event: Event):
if (len(saying) < 3):
@ -88,7 +90,7 @@ viewpoll = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请指定投票ID或标题。用法查看投票 <投票ID或标题>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
@viewpoll.handle()
async def _(saying: list):
# 参数投票ID或者标题
@ -130,7 +132,7 @@ vote = on_alconna(Alconna(
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"})
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
@vote.handle()
async def _(saying: list, event: Event):
if (len(saying) < 2):