169 lines
6.3 KiB
Python
169 lines
6.3 KiB
Python
import json, time
|
||
|
||
from nonebot.rule import Rule
|
||
from nonebot_plugin_alconna import Alconna, Args, Field, MultiVar, on_alconna
|
||
from nonebot.adapters.onebot.v11 import Event
|
||
|
||
from konabot.common.nb.wzq_conflict import no_wzqbot_rule
|
||
from konabot.common.path import ASSETS_PATH, DATA_PATH
|
||
|
||
|
||
POLL_TEMPLATE_FILE = ASSETS_PATH / "json" / "poll.json"
|
||
POLL_DATA_FILE = DATA_PATH / "poll.json"
|
||
|
||
if not POLL_DATA_FILE.exists():
|
||
POLL_DATA_FILE.write_bytes(POLL_TEMPLATE_FILE.read_bytes())
|
||
|
||
|
||
poll_list = json.loads(POLL_DATA_FILE.read_text())['poll']
|
||
|
||
async def createpoll(title,qqid,options):
|
||
polllength = len(poll_list)
|
||
pollid = str(polllength)
|
||
poll_create = int(time.time())
|
||
poll_expiry = poll_create + 24*3600
|
||
polljson = {"title":title,"qq":qqid,"create":poll_create,"expiry":poll_expiry,"options":options,"polldata":{}}
|
||
poll_list[pollid] = polljson
|
||
writeback()
|
||
return pollid
|
||
|
||
def getpolldata(pollid_or_title):
|
||
# 初始化“被指定的投票项目”
|
||
thepoll = {}
|
||
polnum = -1
|
||
# 判断是ID还是标题
|
||
if str.isdigit(pollid_or_title):
|
||
if pollid_or_title in poll_list:
|
||
thepoll = poll_list[pollid_or_title]
|
||
polnum = pollid_or_title
|
||
else:
|
||
return [{},-1]
|
||
else:
|
||
for i in poll_list:
|
||
if poll_list[i]["title"] == pollid_or_title:
|
||
thepoll = poll_list[i]
|
||
polnum = i
|
||
break
|
||
if polnum == -1:
|
||
return [{},-1]
|
||
return [thepoll,polnum]
|
||
|
||
def writeback():
|
||
# file = open(poll_json_path,"w",encoding="utf-8")
|
||
# json.dump({'poll':poll_list},file,ensure_ascii=False,sort_keys=True)
|
||
POLL_DATA_FILE.write_text(json.dumps({
|
||
'poll': poll_list,
|
||
}, ensure_ascii=False, sort_keys=True))
|
||
|
||
async def pollvote(polnum,optionnum,qqnum):
|
||
optiond = poll_list[polnum]["polldata"]
|
||
if optionnum not in optiond:
|
||
poll_list[polnum]["polldata"][optionnum] = []
|
||
poll_list[polnum]["polldata"][optionnum].append(qqnum)
|
||
writeback()
|
||
return
|
||
|
||
poll = on_alconna(Alconna(
|
||
"poll",
|
||
Args["saying", MultiVar(str, '+'), Field(
|
||
missing_tips=lambda: "参数错误。用法:发起投票 <投票标题> <选项1> <选项2> ..."
|
||
)],
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"发起投票","createpoll"}, rule=no_wzqbot_rule)
|
||
@poll.handle()
|
||
async def _(saying: list, event: Event):
|
||
if (len(saying) < 3):
|
||
await poll.send("请提供至少两个投票选项!")
|
||
elif (len(saying) < 17):
|
||
title = saying[0]
|
||
saying.remove(title)
|
||
options = {}
|
||
for i in saying:
|
||
options[saying.index(i)] = i
|
||
qqid = event.get_user_id()
|
||
result = await createpoll(title,qqid,options)
|
||
await poll.send("已创建投票。回复 查看投票 "+str(result)+" 查看该投票。")
|
||
else:
|
||
await poll.send("投票选项太多了!请减少到15个选项以内。")
|
||
|
||
viewpoll = on_alconna(Alconna(
|
||
"viewpoll",
|
||
Args["saying", MultiVar(str, '+'), Field(
|
||
missing_tips=lambda: "请指定投票ID或标题!。用法:查看投票 <投票ID或标题>"
|
||
)],
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"查看投票"}, rule=no_wzqbot_rule)
|
||
@viewpoll.handle()
|
||
async def _(saying: list):
|
||
# 参数,投票ID或者标题
|
||
# pollid_or_title = params[0]
|
||
polldata = getpolldata(saying[0])
|
||
# 被指定的投票项目
|
||
thepoll = polldata[0]
|
||
polnum = polldata[1]
|
||
if polnum == -1:
|
||
await viewpoll.send("该投票不存在!")
|
||
else:
|
||
# 检查投票是否已结束
|
||
pollended = 0
|
||
if time.time() > thepoll["expiry"]:
|
||
pollended = 1
|
||
# 回复内容
|
||
reply = "投票:"+thepoll["title"]+" [ID: "+str(polnum)+"]"
|
||
# 如果投票已结束
|
||
if pollended:
|
||
for i in thepoll["options"]:
|
||
reply += "\n"
|
||
# 检查该选项是否有人投票
|
||
if i in thepoll["polldata"]:
|
||
reply += "["+str(len(thepoll["polldata"][i]))+" 票]"
|
||
else:
|
||
reply += "[0 票]"
|
||
reply += " "+thepoll["options"][i]
|
||
reply += "\n\n此投票已结束。"
|
||
else:
|
||
for i in thepoll["options"]:
|
||
reply += "\n"
|
||
reply += "- "+thepoll["options"][i]
|
||
# reply += "\n\n小提示:向bot私聊发送 /viewpoll "+str(polnum)+" 可查看已投票数哦!"
|
||
reply += "\n\n发送 投票 "+str(polnum)+" <选项文本> 即可参与投票!"
|
||
await viewpoll.send(reply)
|
||
|
||
vote = on_alconna(Alconna(
|
||
"vote",
|
||
Args["saying", MultiVar(str, '+'), Field(
|
||
missing_tips=lambda: "参数错误。用法:投票 <投票ID/标题> <选项文本>"
|
||
)],
|
||
), use_cmd_start=True, use_cmd_sep=False, skip_for_unmatch=False, aliases={"投票","参与投票"}, rule=no_wzqbot_rule)
|
||
@vote.handle()
|
||
async def _(saying: list, event: Event):
|
||
if (len(saying) < 2):
|
||
await vote.send("请指定投给哪一项!")
|
||
else:
|
||
polldata = getpolldata(saying[0])
|
||
# 被指定的投票项目
|
||
thepoll = polldata[0]
|
||
polnum = polldata[1]
|
||
if polnum == -1:
|
||
await viewpoll.finish("没有找到这个投票!")
|
||
# thepolldata = thepoll["polldata"]
|
||
# 查找对应的投票项
|
||
optionnum = -1
|
||
for i in thepoll["options"]:
|
||
if saying[1] == thepoll["options"][i]:
|
||
optionnum = i
|
||
break
|
||
if optionnum == -1:
|
||
reply = "此投票里面没有这一项!可用的选项有:"
|
||
for i in thepoll["options"]:
|
||
reply += "\n"
|
||
reply += "- "+thepoll["options"][i]
|
||
await viewpoll.send(reply)
|
||
# 检查是否符合投票条件(该qq号是否已参与过投票、投票是否过期)
|
||
elif time.time() > thepoll["expiry"]:
|
||
await viewpoll.send("此投票已经结束!请发送 查看投票 "+polnum+" 查看结果。")
|
||
elif str(event.get_user_id()) in str(thepoll["polldata"]):
|
||
await viewpoll.send("你已参与过此投票!请在投票结束后发送 查看投票 "+polnum+" 查看结果。")
|
||
# 写入项目
|
||
else:
|
||
await pollvote(polnum,optionnum,event.get_user_id())
|
||
await viewpoll.send("投票成功!你投给了 "+saying[1])
|