diff --git a/assets/json/poll.json b/assets/json/poll.json new file mode 100644 index 0000000..2f39493 --- /dev/null +++ b/assets/json/poll.json @@ -0,0 +1 @@ +{"poll": {"0": {"create": 1760357553, "expiry": 1760443953, "options": {"0": "此方bot", "1": "testpilot", "2": "小镜bot", "3": "可怜bot"}, "polldata": {}, "qq": "2975499623", "title": "我~是~谁~?"}}} \ No newline at end of file diff --git a/konabot/plugins/gen_qrcode/__init__.py b/konabot/plugins/gen_qrcode/__init__.py new file mode 100644 index 0000000..a65b83b --- /dev/null +++ b/konabot/plugins/gen_qrcode/__init__.py @@ -0,0 +1,69 @@ +import qrcode +# from pyzbar.pyzbar import decode +# from PIL import Image +import requests +from io import BytesIO + +from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage, + on_alconna) +from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply + +async def download_img(url): + resp = requests.get(url.replace("https://multimedia.nt.qq","http://multimedia.nt.qq")) # bim获取QQ的图片时避免SSLv3报错 + img_bytes = BytesIO() + with open(img_bytes,"wb") as f: + f.write(resp.content) + return img_bytes + +def genqr(data): + qr = qrcode.QRCode(version=1,error_correction=qrcode.constants.ERROR_CORRECT_L,box_size=8,border=4) + qr.add_data(data) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + img_bytes = BytesIO() + img.save(img_bytes, format="PNG") + return img_bytes + +""" +async def recqr(url): + im_path = "assets/img/qrcode/2.jpg" + data = await download_img(url) + img = Image.open(im_path) + decoded_objects = decode(img) + data = "" + for obj in decoded_objects: + data += obj.data.decode('utf-8') + return data +""" + +gqrc = on_alconna(Alconna( + "genqr", + Args["saying", MultiVar(str, '+'), Field( + missing_tips=lambda: "请输入你要转换为二维码的文字!" + )], + # UniMessage[] +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"生成二维码","genqrcode"}) + +@gqrc.handle() +async def _(saying: list): + """ + img = await draw_pt("\n".join(saying)) + img_bytes = BytesIO() + img.save(img_bytes, format="PNG") + + await pt.send(await UniMessage().image(raw=img_bytes).export()) + + # print(saying) + # 二维码识别 + if type(saying[0]) == 'image': + data = await recqr(saying[0].data['url']) + if data == "": + await gqrc.send("二维码图片解析失败!") + else: + await gqrc.send(recqr(saying[0].data['url'])) + + # 二维码生成 + else: + """ + # genqr("\n".join(saying)) + await gqrc.send(await UniMessage().image(raw=genqr("\n".join(saying))).export()) \ No newline at end of file diff --git a/konabot/plugins/poll/__init__.py b/konabot/plugins/poll/__init__.py new file mode 100644 index 0000000..194f6c0 --- /dev/null +++ b/konabot/plugins/poll/__init__.py @@ -0,0 +1,160 @@ +import json, time + +from nonebot_plugin_alconna import (Alconna, Args, Field, MultiVar, UniMessage, + on_alconna) +from nonebot_plugin_alconna.uniseg import UniMsg, At, Reply +from nonebot.adapters.onebot.v11 import Event + +poll_json_path = "assets/json/poll.json" + +poll_file = open(poll_json_path,"r",encoding="utf-8") +poll_list_raw = poll_file.read() +poll_file.close() +poll_list = json.loads(poll_list_raw)['poll'] + +async def createpoll(title,qqid,options): + polllength = len(poll_list) + pollid = str(polllength) + poll_create = int(time.time()) + poll_expiry = poll_create + 24*3600 + polljson = {"title":title,"qq":qqid,"create":poll_create,"expiry":poll_expiry,"options":options,"polldata":{}} + poll_list[pollid] = polljson + writeback() + return pollid + +def getpolldata(pollid_or_title): + # 初始化“被指定的投票项目” + thepoll = {} + polnum = -1 + # 判断是ID还是标题 + if str.isdigit(pollid_or_title): + if pollid_or_title in poll_list: + thepoll = poll_list[pollid_or_title] + polnum = pollid_or_title + else: + return [{},-1] + else: + for i in poll_list: + if poll_list[i]["title"] == pollid_or_title: + thepoll = poll_list[i] + polnum = i + break + if polnum == -1: + return [{},-1] + return [thepoll,polnum] + +def writeback(): + file = open(poll_json_path,"w",encoding="utf-8") + json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True) + +async def pollvote(polnum,optionnum,qqnum): + optiond = poll_list[polnum]["polldata"] + if optionnum not in optiond: + poll_list[polnum]["polldata"][optionnum] = [] + poll_list[polnum]["polldata"][optionnum].append(qqnum) + writeback() + return + +poll = on_alconna(Alconna( + "poll", + Args["saying", MultiVar(str, '+'), Field( + missing_tips=lambda: "参数错误。用法:/poll [投票标题] [选项1] [选项2]" + )], +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}) +@poll.handle() +async def _(saying: list, event: Event): + if (len(saying) < 3): + await poll.send("请提供至少两个投票选项!") + elif (len(saying) < 17): + title = saying[0] + saying.remove(title) + options = {} + for i in saying: + options[saying.index(i)] = i + qqid = event.get_user_id() + result = await createpoll(title,qqid,options) + await poll.send("已创建投票。回复 /viewpoll "+str(result)+" 查看该投票。") + else: + await poll.send("投票选项太多了!请减少到15个选项以内。") + +viewpoll = on_alconna(Alconna( + "viewpoll", + Args["saying", MultiVar(str, '+'), Field( + missing_tips=lambda: "请指定投票ID或标题!。用法:/viewpoll [投票ID/标题]" + )], +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}) +@viewpoll.handle() +async def _(saying: list): + # 参数,投票ID或者标题 + # pollid_or_title = params[0] + polldata = getpolldata(saying[0]) + # 被指定的投票项目 + thepoll = polldata[0] + polnum = polldata[1] + if polnum == -1: + await viewpoll.send("该投票不存在!") + else: + # 检查投票是否已结束 + pollended = 0 + if time.time() > thepoll["expiry"]: + pollended = 1 + # 回复内容 + reply = "投票:"+thepoll["title"]+" [ID: "+str(polnum)+"]" + # 如果投票已结束 + if pollended: + for i in thepoll["options"]: + reply += "\n" + # 检查该选项是否有人投票 + if i in thepoll["polldata"]: + reply += "["+str(len(thepoll["polldata"][i]))+" 票]" + else: + reply += "[0 票]" + reply += " "+thepoll["options"][i] + reply += "\n\n此投票已结束。" + else: + for i in thepoll["options"]: + reply += "\n" + reply += "- "+thepoll["options"][i] + # reply += "\n\n小提示:向bot私聊发送 /viewpoll "+str(polnum)+" 可查看已投票数哦!" + reply += "\n\n发送 /vote "+str(polnum)+" [选项文本] 即可参与投票!" + await viewpoll.send(reply) + +vote = on_alconna(Alconna( + "vote", + Args["saying", MultiVar(str, '+'), Field( + missing_tips=lambda: "参数错误。用法:/vote [投票ID/标题] [选项文本]" + )], +), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}) +@vote.handle() +async def _(saying: list, event: Event): + if (len(saying) < 2): + await vote.send("请指定投给哪一项!") + else: + polldata = getpolldata(saying[0]) + # 被指定的投票项目 + thepoll = polldata[0] + polnum = polldata[1] + if polnum == -1: + await viewpoll.finish("没有找到这个投票!") + # thepolldata = thepoll["polldata"] + # 查找对应的投票项 + optionnum = -1 + for i in thepoll["options"]: + if saying[1] == thepoll["options"][i]: + optionnum = i + break + if optionnum == -1: + reply = "此投票里面没有这一项!可用的选项有:" + for i in thepoll["options"]: + reply += "\n" + reply += "- "+thepoll["options"][i] + await viewpoll.send(reply) + # 检查是否符合投票条件(该qq号是否已参与过投票、投票是否过期) + elif time.time() > thepoll["expiry"]: + await viewpoll.send("此投票已经结束!请发送 /viewpoll "+polnum+" 查看结果。") + elif str(event.get_user_id()) in str(thepoll["polldata"]): + await viewpoll.send("你已参与过此投票!请在投票结束后发送 /viewpoll "+polnum+" 查看结果。") + # 写入项目 + else: + await pollvote(polnum,optionnum,event.get_user_id()) + await viewpoll.send("投票成功!你投给了 "+saying[1]) \ No newline at end of file