Compare commits

...

24 Commits

Author SHA1 Message Date
9f3f79f51d 自动同意小团体的好友请求 2025-10-17 01:04:34 +08:00
92048aeff7 让 wzq 东西在 wzq 群不可用 2025-10-17 00:54:14 +08:00
81aac10665 添加文档并修复问题 2025-10-16 23:27:42 +08:00
3ce230adfe 优化卵总展示光影 2025-10-16 22:43:54 +08:00
4f885554ca 添加卵总展示 2025-10-16 22:29:07 +08:00
7ebcb8add4 Merge branch 'master' of ssh://gitea.service.jazzwhom.top:2221/mttu-developers/konabot 2025-10-16 19:13:51 +08:00
e18cc82792 修复 av/bv 号无法直接被筛选读取的问题 2025-10-16 19:13:36 +08:00
eb28cd0a0c 更正 Giftool 错误的文档 2025-10-16 18:44:22 +08:00
2d688a6ed6 new 2025-10-14 12:43:25 +08:00
e9aac52200 chengyu update 2025-10-14 01:23:49 +00:00
4305548ab5 submodule 2025-10-13 22:53:44 +08:00
99382a3bf5 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-10-13 22:48:17 +08:00
92e43785bf submodule 2025-10-13 22:46:30 +08:00
fc5b11c5e8 调整 notify 的强制退出 2025-10-13 22:16:50 +08:00
0ec66988fa 更新投票存储位置 2025-10-13 22:05:21 +08:00
e5c3081c22 Merge branch 'master' of https://gitea.service.jazzwhom.top/mttu-developers/konabot 2025-10-13 22:02:44 +08:00
14b356120a 成语接龙 2025-10-13 22:02:33 +08:00
a208302cb9 添加依赖 2025-10-13 21:35:44 +08:00
01ffa451bb Merge pull request '投票功能和二维码生成(从 testpilot 移植)' (#26) from wzq02/konabot:master into master
Reviewed-on: mttu-developers/konabot#26
2025-10-13 21:33:03 +08:00
2b6c2e84bd Merge branch 'master' into master 2025-10-13 21:31:40 +08:00
4f0a9af2dc 成语接龙 2025-10-13 21:10:18 +08:00
4a4aa6b243 Add submodule: THUOCL 2025-10-13 21:10:05 +08:00
4c8625ae02 小完善(添加对应的 man) 2025-10-13 21:08:32 +08:00
c5f820a1f9 投票功能和二维码生成(从 testpilot 移植) 2025-10-13 20:49:56 +08:00
31 changed files with 701547 additions and 51 deletions

View File

@ -10,6 +10,10 @@ trigger:
- master
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建 Docker 镜像
image: plugins/docker:latest
privileged: true
@ -50,6 +54,10 @@ trigger:
- tag
steps:
- name: submodules
image: alpine/git
commands:
- git submodule update --init --recursive
- name: 构建并推送 Release Docker 镜像
image: plugins/docker:latest
privileged: true

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "assets/lexicon/THUOCL"]
path = assets/lexicon/THUOCL
url = https://github.com/thunlp/THUOCL.git

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

BIN
assets/img/meme/snaur_1_base.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

BIN
assets/img/meme/snaur_1_top.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1008 KiB

1
assets/json/poll.json Normal file
View File

@ -0,0 +1 @@
{"poll": {"0": {"create": 1760357553, "expiry": 1760443953, "options": {"0": "此方bot", "1": "testpilot", "2": "小镜bot", "3": "可怜bot"}, "polldata": {}, "qq": "2975499623", "title": "我~是~谁~"}}}

1
assets/lexicon/THUOCL Submodule

Submodule assets/lexicon/THUOCL added at a30ce79d89

1
assets/lexicon/ci.json Normal file

File diff suppressed because one or more lines are too long

360393
assets/lexicon/common.txt Normal file

File diff suppressed because it is too large Load Diff

339847
assets/lexicon/idiom.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
from typing import cast
from nonebot import get_bot, get_plugin_config, logger
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from nonebot.rule import Rule
from pydantic import BaseModel
class WZQConflictConfig(BaseModel):
wzq_bot_qq: int = 0
config = get_plugin_config(WZQConflictConfig)
async def no_wzqbot(evt: BaseEvent):
if config.wzq_bot_qq <= 0:
return True
if not isinstance(evt, GroupMessageEvent):
return True
gid = evt.group_id
sid = evt.self_id
bot = cast(OnebotBot, get_bot(str(sid)))
members = await bot.get_group_member_list(group_id=gid)
members = set((m.get("user_id", -1) for m in members))
if config.wzq_bot_qq in members:
return False
return True
no_wzqbot_rule = Rule(no_wzqbot)

View File

@ -12,3 +12,10 @@ DOCS_PATH_MAN1 = DOCS_PATH / "user"
DOCS_PATH_MAN3 = DOCS_PATH / "lib"
DOCS_PATH_MAN7 = DOCS_PATH / "concepts"
DOCS_PATH_MAN8 = DOCS_PATH / "sys"
if not DATA_PATH.exists():
DATA_PATH.mkdir()
if not LOG_PATH.exists():
LOG_PATH.mkdir()

View File

@ -45,7 +45,7 @@
- 帧数必须为正整数(> 0
- 若原始帧数 ≤ 指定帧数,则保留全部帧。
--s <速度>(可选)
--speed <速度>(可选)
- 调整 gif 图的速度。若为负数,则代表倒放
使用方式

View File

@ -0,0 +1,20 @@
指令介绍
卵总展示 - 让卵总举起你的图片
格式
<引用图片> 卵总展示 [选项]
卵总展示 [选项] <图片>
选项
`--whiteness <number>` 白度
将原图进行指数变换,以调整它的白的程度,默认为 0.0
`--black-level <number>` 黑色等级
将原图减淡,数值越大越淡,范围 0.0-1.0,默认 0.2
`--opacity <number>` 不透明度
将你的图片叠放在图片上的不透明度,默认为 0.8
`--saturation <number>` 饱和度
调整原图的饱和度,应该要大于 0.0,默认为 0.85

View File

@ -0,0 +1,11 @@
指令介绍
发起投票 - 发起一个投票
格式
发起投票 <投票标题> <选项1> <选项2> ...
示例
`发起投票 这是一个投票 A B C` 发起标题为“这是一个投票”选项为“A”、“B”、“C”的投票
说明
投票各个选项之间用空格分隔选项数量为2-15项。投票的默认有效期为24小时。

View File

@ -0,0 +1,12 @@
指令介绍
投票 - 参与已发起的投票
格式
投票 <投票ID/标题> <选项文本>
示例
`投票 1 A` 在ID为1的投票中投给“A”
`投票 这是一个投票 B` 在标题为“这是一个投票”的投票中投给“B”
说明
目前不支持单人多投,每个人只能投一项。

View File

@ -0,0 +1,12 @@
指令介绍
查看投票 - 查看已发起的投票
格式
查看投票 <投票ID或标题>
示例
`查看投票 1` 查看ID为1的投票
`查看投票 这是一个投票` 查看标题为“这是一个投票”的投票
说明
投票在进行时,使用此命令可以看到投票的各个选项;投票结束后,则可以看到各项的票数。

View File

@ -0,0 +1,8 @@
指令介绍
生成二维码 - 将文本内容转换为二维码
格式
生成二维码 <文本内容>
示例
`生成二维码 嗨嗨嗨` 生成扫描结果为“嗨嗨嗨”的二维码图片

View File

@ -0,0 +1,25 @@
import asyncio
import random
from typing import cast
from loguru import logger
from nonebot import get_bot, on_request
from nonebot.adapters.onebot.v11.event import FriendRequestEvent
from nonebot.adapters.onebot.v11.bot import Bot as OnebotBot
from konabot.common.nb.is_admin import cfg as adminConfig
add_request = on_request()
@add_request.handle()
async def _(req: FriendRequestEvent):
bot = cast(OnebotBot, get_bot(str(req.self_id)))
ok_member_ls: set[int] = set()
for group in adminConfig.admin_qq_group:
members = await bot.get_group_member_list(group_id=group)
ok_member_ls |= cast(set[int], set((m.get("user_id") for m in members)))
if req.user_id in ok_member_ls:
await asyncio.sleep(random.randint(5, 10))
await req.approve(bot)
logger.info(f"已经自动同意 {req.user_id} 的好友请求")

View File

@ -0,0 +1,39 @@
import re
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
to_search2 = msg.exclude(Reply, Reference).extract_plain_text()
if not re.search(pattern, to_search) and not re.search(pattern, to_search2):
return
from nonebot_plugin_analysis_bilibili import handle_analysis
await handle_analysis(event)
# b_url: str
# b_page: str | None
# b_time: str | None
#
# from nonebot_plugin_analysis_bilibili.analysis_bilibili import extract as bilibili_extract
#
# b_url, b_page, b_time = bilibili_extract(to_search)
# if b_url is None:
# return
#
# await matcher_fix.send(await UniMessage().text(b_url).export())

View File

@ -1,25 +0,0 @@
import re
from loguru import logger
from nonebot import on_message
from nonebot_plugin_alconna import Reference, Reply, UniMsg
from nonebot.adapters import Event
matcher_fix = on_message()
pattern = (
r"^(?:(?:av|cv)\d+|BV[a-zA-Z0-9]{10})|"
r"(?:b23\.tv|bili(?:22|23|33|2233)\.cn|\.bilibili\.com|QQ小程序(?:&amp;#93;|&#93;|\])哔哩哔哩).{0,500}"
)
@matcher_fix.handle()
async def _(msg: UniMsg, event: Event):
to_search = msg.exclude(Reply, Reference).dump(json=True)
if not re.search(pattern, to_search):
return
logger.info("检测到有 Bilibili 相关的消息,直接进行一个调用")
_module = __import__("nonebot_plugin_analysis_bilibili")
await _module.handle_analysis(event)

View File

@ -0,0 +1,71 @@
import qrcode
# from pyzbar.pyzbar import decode
# from PIL import Image
import requests
from io import BytesIO
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage,
on_alconna)
from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
async def download_img(url):
resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错
img_bytes = BytesIO()
with open(img_bytes,"wb") as f:
f.write(resp.content)
return img_bytes
def genqr(data):
qr = qrcode.QRCode(version=1,error_correction=qrcode.constants.ERROR_CORRECT_L,box_size=8,border=4)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
return img_bytes
"""
async def recqr(url):
im_path = "assets/img/qrcode/2.jpg"
data = await download_img(url)
img = Image.open(im_path)
decoded_objects = decode(img)
data = ""
for obj in decoded_objects:
data += obj.data.decode('utf-8')
return data
"""
gqrc = on_alconna(Alconna(
"genqr",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请输入你要转换为二维码的文字!"
)],
# UniMessage[]
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}, rule=no_wzqbot_rule)
@gqrc.handle()
async def _(saying: list):
"""
img = await draw_pt("\n".join(saying))
img_bytes = BytesIO()
img.save(img_bytes, format="PNG")
await pt.send(await UniMessage().image(raw=img_bytes).export())
# print(saying)
# 二维码识别
if type(saying[0]) == 'image':
data = await recqr(saying[0].data['url'])
if data == "":
await gqrc.send("二维码图片解析失败!")
else:
await gqrc.send(recqr(saying[0].data['url']))
# 二维码生成
else:
"""
# genqr("\n".join(saying))
await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export())

View File

@ -0,0 +1,229 @@
import base64
import secrets
import json
from typing import Literal
from nonebot import on_message
from nonebot.adapters import Event as BaseEvent
from nonebot.adapters.console.event import MessageEvent as ConsoleMessageEvent
from nonebot.adapters.discord.event import MessageEvent as DiscordMessageEvent
from nonebot_plugin_alconna import (Alconna, Args, Field, Subcommand,
UniMessage, UniMsg, on_alconna)
from konabot.common.path import ASSETS_PATH
ALL_WORDS = [] # 所有四字词语
ALL_IDIOMS = [] # 所有成语
IDIOM_FIRST_CHAR = {} # 成语首字字典
INITED = False
def init_lexicon():
global ALL_WORDS, ALL_IDIOMS, IDIOM_FIRST_CHAR
# 成语大表
with open(ASSETS_PATH / "lexicon" / "idiom.json", "r", encoding="utf-8") as f:
ALL_IDIOMS_INFOS = json.load(f)
# 词语大表
with open(ASSETS_PATH / "lexicon" / "ci.json", "r", encoding="utf-8") as f:
ALL_WORDS = json.load(f)
COMMON_WORDS = []
# 读取 COMMON 词语大表
with open(ASSETS_PATH / "lexicon" / "common.txt", "r", encoding="utf-8") as f:
for line in f:
word = line.strip()
if len(word) == 4:
COMMON_WORDS.append(word)
# 读取 THUOCL 成语库
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / "THUOCL_chengyu.txt", "r", encoding="utf-8") as f:
THUOCL_IDIOMS = [line.split(" ")[0].strip() for line in f]
# 读取 THUOCL 剩下的所有 txt 文件,只保留四字词
THUOCL_WORDS = []
import os
for filename in os.listdir(ASSETS_PATH / "lexicon" / "THUOCL" / "data"):
if filename.endswith(".txt") and filename != "THUOCL_chengyu.txt":
with open(ASSETS_PATH / "lexicon" / "THUOCL" / "data" / filename, "r", encoding="utf-8") as f:
for line in f:
word = line.lstrip().split(" ")[0].strip()
if len(word) == 4:
THUOCL_WORDS.append(word)
# 只有成语的大表
ALL_IDIOMS = [idiom["word"] for idiom in ALL_IDIOMS_INFOS] + THUOCL_IDIOMS
ALL_IDIOMS = list(set(ALL_IDIOMS)) # 去重
# 其他四字词语表,仅表示可以有这个词
ALL_WORDS = [word for word in ALL_WORDS if len(word) == 4] + THUOCL_WORDS + COMMON_WORDS
ALL_WORDS = list(set(ALL_WORDS)) # 去重
# 根据成语大表,划分出成语首字字典
IDIOM_FIRST_CHAR = {}
for idiom in ALL_IDIOMS + ALL_WORDS:
if idiom[0] not in IDIOM_FIRST_CHAR:
IDIOM_FIRST_CHAR[idiom[0]] = []
IDIOM_FIRST_CHAR[idiom[0]].append(idiom)
NOW_PLAYING = False
SCORE_BOARD = {}
LAST_CHAR = ""
USER_NAME_CACHE = {} # 缓存用户名称,避免多次获取
REMAIN_PLAYING_TIMES = 1
LAST_PLAY_DATE = ""
LOCK = False
ALL_BUFF_SCORE = 0 # 全体分数
import datetime
def be_able_to_play():
global REMAIN_PLAYING_TIMES, LAST_PLAY_DATE
if(LAST_PLAY_DATE != datetime.date.today()):
LAST_PLAY_DATE = datetime.date.today()
REMAIN_PLAYING_TIMES = 1
if(REMAIN_PLAYING_TIMES > 0):
REMAIN_PLAYING_TIMES -= 1
return True
return False
evt = on_alconna(Alconna(
"我要玩成语接龙"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def play_game(event: BaseEvent, force = False):
global NOW_PLAYING, LAST_CHAR, INITED
if NOW_PLAYING:
await evt.send(await UniMessage().text("当前已有成语接龙游戏在进行中,请稍后再试!").export())
return
if not be_able_to_play() and not force:
await evt.send(await UniMessage().text("玩玩玩,就知道玩,快去睡觉!").export())
return
if not INITED:
init_lexicon()
INITED = True
NOW_PLAYING = True
await evt.send(await UniMessage().text("你小子,还真有意思!\n好,成语接龙游戏开始!我说一个成语,请大家接下去!").export())
# 选择一个随机成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
# 发布成语
await evt.send(await UniMessage().text(f"第一个成语:「{idiom}」,请接!").export())
evt = on_alconna(Alconna(
"老子就是要玩成语接龙!!!"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def force_play_game(event: BaseEvent):
await play_game(event, force=True)
evt = on_alconna(Alconna(
"不玩了"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, SCORE_BOARD, LAST_CHAR, ALL_BUFF_SCORE
if NOW_PLAYING:
NOW_PLAYING = False
# 发送好吧狗图片
# 打开好吧狗本地文件
with open(ASSETS_PATH / "img" / "dog" / "haoba_dog.jpg", "rb") as f:
img_data = f.read()
await evt.send(await UniMessage().image(raw=img_data).export())
result_text = UniMessage().text("游戏结束!\n最终得分榜:\n")
if len(SCORE_BOARD) == 0:
result_text += "无人得分!"
else:
# 按分数排序,名字用 at 的方式
sorted_score = sorted(SCORE_BOARD.items(), key=lambda x: x[1]["score"], reverse=True)
for i, (user_id, info) in enumerate(sorted_score):
result_text += f"{i+1}. " + UniMessage().at(user_id) + f": {info['score'] + ALL_BUFF_SCORE}\n"
await evt.send(await result_text.export())
# 重置分数板
SCORE_BOARD = {}
LAST_CHAR = ""
else:
await evt.send(await UniMessage().text("当前没有成语接龙游戏在进行中!").export())
# 跳过
evt = on_alconna(Alconna(
"跳过成语"
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=True)
@evt.handle()
async def _(event: BaseEvent):
global NOW_PLAYING, LAST_CHAR, ALL_BUFF_SCORE
if not NOW_PLAYING:
return
await evt.send(await UniMessage().text("你们太菜了全部扣100分").export())
ALL_BUFF_SCORE -= 100
# 选择下一个成语
idiom = secrets.choice(ALL_IDIOMS)
LAST_CHAR = idiom[-1]
await evt.send(await UniMessage().text(f"重新开始,下一个成语是「{idiom}").export())
# 直接读取消息
evt = on_message()
@evt.handle()
async def _(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, LOCK
if not NOW_PLAYING:
return
user_idiom = msg.extract_plain_text().strip()
if(user_idiom[0] != LAST_CHAR):
return
if LOCK:
return
LOCK = True
await handle_send_info(event, msg)
LOCK = False
async def handle_send_info(event: BaseEvent, msg: UniMsg):
global NOW_PLAYING, LAST_CHAR, SCORE_BOARD, ALL_BUFF_SCORE
user_idiom = msg.extract_plain_text().strip()
if(user_idiom not in ALL_IDIOMS and user_idiom not in ALL_WORDS):
# 扣0.1分
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] -= 0.1
await evt.send(await UniMessage().at(user_id).text("接不上!这个不一样!你被扣了 0.1 分!").export())
return
# 成功接上
if isinstance(event, DiscordMessageEvent):
user_id = str(event.author.id)
user_name = str(event.author.name)
else:
user_id = str(event.get_user_id())
user_name = str(event.get_user_id())
if user_id not in SCORE_BOARD:
SCORE_BOARD[user_id] = {
"name": user_name,
"score": 0
}
SCORE_BOARD[user_id]["score"] += 1
# at 指定玩家
await evt.send(await UniMessage().at(user_id).text(f"接对了!你有 {SCORE_BOARD[user_id]['score'] + ALL_BUFF_SCORE} 分!").export())
LAST_CHAR = user_idiom[-1]
await evt.send(await UniMessage().text(f"下一个成语请以「{LAST_CHAR}」开头!").export())

View File

@ -0,0 +1,3 @@
from pathlib import Path
ASSETS = Path(__file__).parent.parent / "assets"

View File

@ -2,11 +2,11 @@ from io import BytesIO
from typing import Iterable, cast
from nonebot import on_message
from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, Text,
from nonebot_plugin_alconna import (Alconna, Args, Field, Image, MultiVar, Option, Text,
UniMessage, UniMsg, on_alconna)
from konabot.common.nb.extract_image import extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display
from konabot.common.nb.extract_image import PIL_Image, extract_image_from_message
from konabot.plugins.memepack.drawing.display import draw_cao_display, draw_snaur_display
from konabot.plugins.memepack.drawing.saying import (draw_cute_ten,
draw_geimao, draw_mnk,
draw_pt, draw_suan)
@ -139,3 +139,24 @@ async def _(msg: UniMsg, evt: Event, bot: Bot):
.text(err)
.export()
)
snaur_display_cmd = on_alconna(Alconna(
"卵总展示",
Option("--whiteness", Args["whiteness", float], alias=["-w"]),
Option("--black-level", Args["black_level", float], alias=["-b"]),
Option("--opacity", Args["opacity", float], alias=["-o"]),
Option("--saturation", Args["saturation", float], alias=["-s"]),
Args["image", Image | None],
))
@snaur_display_cmd.handle()
async def _(img: PIL_Image, whiteness: float = 0.0, black_level: float = 0.2,
opacity: float = 0.8, saturation: float = 0.85):
img_processed = await draw_snaur_display(
img, whiteness, black_level, opacity, saturation,
)
img_data = BytesIO()
img_processed.save(img_data, "PNG")
await snaur_display_cmd.send(await UniMessage().image(raw=img_data).export())

View File

@ -4,10 +4,12 @@ from typing import Any, cast
import cv2
import numpy as np
import PIL.Image
import PIL.ImageChops
import PIL.ImageEnhance
from konabot.common.path import ASSETS_PATH
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
cao_image = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "caoimg1.png")
CAO_QUAD_POINTS = np.float32(cast(Any, [
[392, 540],
[577, 557],
@ -15,6 +17,16 @@ CAO_QUAD_POINTS = np.float32(cast(Any, [
[381, 687],
]))
snaur_image_base = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_base.png")
snaur_image_top = PIL.Image.open(ASSETS_PATH / "img" / "meme" / "snaur_1_top.png")
SNAUR_RATIO = (1 / 2) ** .5
SNAUR_QUAD_POINTS = np.float32(cast(Any, [
[0, 466 ],
[673, 471 ],
[640, 1196],
[106, 1280],
]))
def _draw_cao_display(image: PIL.Image.Image):
src = np.array(image.convert("RGB"))
h, w = src.shape[:2]
@ -43,3 +55,87 @@ def _draw_cao_display(image: PIL.Image.Image):
async def draw_cao_display(image: PIL.Image.Image):
return await asyncio.to_thread(_draw_cao_display, image)
def _draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
):
src = np.array(image.convert("RGBA"))
_h, _w = src.shape[:2]
if _w / _h < SNAUR_RATIO:
_w_target = _w
_h_target = int(_w / SNAUR_RATIO)
else:
_w_target = int(_h * SNAUR_RATIO)
_h_target = _h
x_center = _w / 2
y_center = _h / 2
x1 = int(x_center - _w_target / 2)
x2 = int(x_center + _w_target / 2)
y1 = int(y_center - _h_target / 2)
y2 = int(y_center + _h_target / 2)
src = src[y1:y2, x1:x2, :]
h, w = src.shape[:2]
src_points = np.float32(cast(Any, [
[0, 0],
[w, 0],
[w, h],
[0, h],
]))
dst_points = SNAUR_QUAD_POINTS
M = cv2.getPerspectiveTransform(cast(Any, src_points), cast(Any, dst_points))
output_size = snaur_image_top.size
output_w, output_h = output_size
warped = cv2.warpPerspective(
src,
M,
(output_w, output_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0)
)
result = PIL.Image.fromarray(warped, 'RGBA')
r, g, b, a = result.split()
a = a.point(lambda p: int(p * opacity))
f2 = lambda p: int(
((p / 255) ** (2 ** whiteness)) * 255 * (1 - black_level)
+ 255 * black_level
)
r = r.point(f2)
g = g.point(f2)
b = b.point(f2)
result = PIL.Image.merge('RGBA', (r, g, b, a))
enhancer = PIL.ImageEnhance.Color(result)
result = enhancer.enhance(saturation)
result = PIL.ImageChops.multiply(result, snaur_image_base)
result = PIL.Image.alpha_composite(snaur_image_base, result)
result = PIL.Image.alpha_composite(result, snaur_image_top)
return result
async def draw_snaur_display(
image : PIL.Image.Image,
whiteness : float = 0.0 ,
black_level: float = 0.2 ,
opacity : float = 0.8 ,
saturation : float = 0.85 ,
) -> PIL.Image.Image:
return await asyncio.to_thread(
_draw_snaur_display, image, whiteness, black_level,
opacity, saturation,
)

View File

@ -0,0 +1,168 @@
import json, time
from nonebot.rule import Rule
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
from nonebot.adapters.onebot.v11 import Event
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
from konabot.common.path import ASSETS_PATH, DATA_PATH
POLL_TEMPLATE_FILE = ASSETS_PATH / "json" / "poll.json"
POLL_DATA_FILE = DATA_PATH / "poll.json"
if not POLL_DATA_FILE.exists():
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
async def createpoll(title,qqid,options):
polllength = len(poll_list)
pollid = str(polllength)
poll_create = int(time.time())
poll_expiry = poll_create + 24*3600
polljson = {"title":title,"qq":qqid,"create":poll_create,"expiry":poll_expiry,"options":options,"polldata":{}}
poll_list[pollid] = polljson
writeback()
return pollid
def getpolldata(pollid_or_title):
# 初始化“被指定的投票项目”
thepoll = {}
polnum = -1
# 判断是ID还是标题
if str.isdigit(pollid_or_title):
if pollid_or_title in poll_list:
thepoll = poll_list[pollid_or_title]
polnum = pollid_or_title
else:
return [{},-1]
else:
for i in poll_list:
if poll_list[i]["title"] == pollid_or_title:
thepoll = poll_list[i]
polnum = i
break
if polnum == -1:
return [{},-1]
return [thepoll,polnum]
def writeback():
# file = open(poll_json_path,"w",encoding="utf-8")
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
POLL_DATA_FILE.write_text(json.dumps({
'poll': poll_list,
}, ensure_ascii=False, sort_keys=True))
async def pollvote(polnum,optionnum,qqnum):
optiond = poll_list[polnum]["polldata"]
if optionnum not in optiond:
poll_list[polnum]["polldata"][optionnum] = []
poll_list[polnum]["polldata"][optionnum].append(qqnum)
writeback()
return
poll = on_alconna(Alconna(
"poll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
@poll.handle()
async def _(saying: list, event: Event):
if (len(saying) < 3):
await poll.send("请提供至少两个投票选项!")
elif (len(saying) < 17):
title = saying[0]
saying.remove(title)
options = {}
for i in saying:
options[saying.index(i)] = i
qqid = event.get_user_id()
result = await createpoll(title,qqid,options)
await poll.send("已创建投票。回复 查看投票 "+str(result)+" 查看该投票。")
else:
await poll.send("投票选项太多了请减少到15个选项以内。")
viewpoll = on_alconna(Alconna(
"viewpoll",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "请指定投票ID或标题。用法查看投票 <投票ID或标题>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
@viewpoll.handle()
async def _(saying: list):
# 参数投票ID或者标题
# pollid_or_title = params[0]
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.send("该投票不存在!")
else:
# 检查投票是否已结束
pollended = 0
if time.time() > thepoll["expiry"]:
pollended = 1
# 回复内容
reply = "投票:"+thepoll["title"]+" [ID: "+str(polnum)+"]"
# 如果投票已结束
if pollended:
for i in thepoll["options"]:
reply += "\n"
# 检查该选项是否有人投票
if i in thepoll["polldata"]:
reply += "["+str(len(thepoll["polldata"][i]))+" 票]"
else:
reply += "[0 票]"
reply += " "+thepoll["options"][i]
reply += "\n\n此投票已结束。"
else:
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
# reply += "\n\n小提示向bot私聊发送 /viewpoll "+str(polnum)+" 可查看已投票数哦!"
reply += "\n\n发送 投票 "+str(polnum)+" <选项文本> 即可参与投票!"
await viewpoll.send(reply)
vote = on_alconna(Alconna(
"vote",
Args["saying", MultiVar(str, '+'), Field(
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
)],
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
@vote.handle()
async def _(saying: list, event: Event):
if (len(saying) < 2):
await vote.send("请指定投给哪一项!")
else:
polldata = getpolldata(saying[0])
# 被指定的投票项目
thepoll = polldata[0]
polnum = polldata[1]
if polnum == -1:
await viewpoll.finish("没有找到这个投票!")
# thepolldata = thepoll["polldata"]
# 查找对应的投票项
optionnum = -1
for i in thepoll["options"]:
if saying[1] == thepoll["options"][i]:
optionnum = i
break
if optionnum == -1:
reply = "此投票里面没有这一项!可用的选项有:"
for i in thepoll["options"]:
reply += "\n"
reply += "- "+thepoll["options"][i]
await viewpoll.send(reply)
# 检查是否符合投票条件该qq号是否已参与过投票、投票是否过期
elif time.time() > thepoll["expiry"]:
await viewpoll.send("此投票已经结束!请发送 查看投票 "+polnum+" 查看结果。")
elif str(event.get_user_id()) in str(thepoll["polldata"]):
await viewpoll.send("你已参与过此投票!请在投票结束后发送 查看投票 "+polnum+" 查看结果。")
# 写入项目
else:
await pollvote(polnum,optionnum,event.get_user_id())
await viewpoll.send("投票成功!你投给了 "+saying[1])

View File

@ -309,7 +309,7 @@ async def generate_dice_image(number: str) -> BytesIO:
if(len(text) > 50):
output = BytesIO()
push_image = Image.open(ASSETS_PATH / "img" / "dice" / "stick.png")
push_image.save(output,format='PNG')
push_image.save(output,format='GIF')
output.seek(0)
return output

View File

@ -1,8 +1,10 @@
import asyncio
import asyncio as asynkio
import datetime
import functools
from pathlib import Path
from typing import Any, Literal, cast
import signal
import nonebot
import ptimeparse
from loguru import logger
@ -24,7 +26,9 @@ evt = on_message()
(Path(__file__).parent.parent.parent.parent / "data").mkdir(exist_ok=True)
DATA_FILE_PATH = Path(__file__).parent.parent.parent.parent / "data" / "notify.json"
DATA_FILE_LOCK = asyncio.Lock()
DATA_FILE_LOCK = asynkio.Lock()
ASYNK_TASKS: set[asynkio.Task[Any]] = set()
class Notify(BaseModel):
@ -111,7 +115,11 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
async def mission():
begin_time = datetime.datetime.now()
if begin_time < notify.notify_time:
await asyncio.sleep((notify.notify_time - begin_time).total_seconds())
try:
await asynkio.sleep((notify.notify_time - begin_time).total_seconds())
except asynkio.CancelledError:
logger.debug("代办提醒被信号中止,任务退出")
return
else:
logger.warning(
f"期望在 {notify.notify_time} 在平台 {notify.platform} {notify.target_env}"
@ -128,7 +136,7 @@ def create_notify_task(notify: Notify, fail2remove: bool = True):
DATA_FILE_LOCK.release()
else:
pass
return asyncio.create_task(mission())
return asynkio.create_task(mission())
@evt.handle()
@ -214,11 +222,11 @@ async def _():
DELTA = 2
logger.info(f"第一次探测到 Bot 连接,等待 {DELTA} 秒后开始通知")
await asyncio.sleep(DELTA)
await asynkio.sleep(DELTA)
await DATA_FILE_LOCK.acquire()
tasks: set[asyncio.Task[Any]] = set()
# tasks: set[asynkio.Task[Any]] = set()
cfg = load_notify_config()
if cfg.version == 1:
logger.info("将配置文件的版本升级为 2")
@ -227,11 +235,26 @@ async def _():
counter = 0
for notify in [*cfg.notifies]:
task = create_notify_task(notify, fail2remove=False)
tasks.add(task)
task.add_done_callback(lambda self: tasks.remove(self))
ASYNK_TASKS.add(task)
task.add_done_callback(lambda self: ASYNK_TASKS.remove(self))
counter += 1
logger.info(f"成功创建了 {counter} 条代办事项")
save_notify_config(cfg)
DATA_FILE_LOCK.release()
await asyncio.gather(*tasks)
loop = asynkio.get_running_loop()
# 解决 asynk task 没有被 cancel 的问题
async def shutdown(sig: signal.Signals):
logger.info(f"收到 {sig.name} 指令,正在关闭所有的东西")
for task in ASYNK_TASKS:
task.cancel()
await asynkio.gather(*ASYNK_TASKS, return_exceptions=True)
logger.info("所有的代办提醒 Task 都已经退出了")
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, functools.partial(
asynkio.create_task, shutdown(sig)
))
await asynkio.gather(*ASYNK_TASKS)

504
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@ dependencies = [
"ptimeparse (>=0.1.1,<0.2.0)",
"skia-python (>=138.0,<139.0)",
"nonebot-plugin-analysis-bilibili (>=2.8.1,<3.0.0)",
"qrcode (>=8.2,<9.0)",
]
[build-system]
@ -36,5 +37,10 @@ name = "pt-gitea-pypi"
url = "https://gitea.service.jazzwhom.top/api/packages/Passthem/pypi/simple/"
priority = "supplemental"
[[tool.poetry.source]]
name = "mirrors"
url = "https://pypi.tuna.tsinghua.edu.cn/simple/"
priority = "primary"
[tool.poetry.dependencies]
ptimeparse = {source = "pt-gitea-pypi"}