Files
konabot/konabot/common/web_render/host_images.py

67 lines
1.9 KiB
Python

import asyncio
import tempfile
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import cast
from fastapi import HTTPException
from fastapi.responses import FileResponse
import nanoid
import nonebot
from nonebot.drivers.fastapi import Driver as FastAPIDriver
from .config import web_render_config
app = cast(FastAPIDriver, nonebot.get_driver()).asgi
hosted_tempdirs: dict[str, Path] = {}
hosted_tempdirs_lock = asyncio.Lock()
@dataclass
class TempDir:
path: Path
url_base: str
def url_of(self, file: Path):
assert file.is_relative_to(self.path)
relative_path = file.relative_to(self.path)
url_path_segment = str(relative_path).replace("\\", "/")
return f"{self.url_base}/{url_path_segment}"
@asynccontextmanager
async def host_tempdir():
with tempfile.TemporaryDirectory() as tempdir:
fp = Path(tempdir)
nid = nanoid.generate(size=10)
async with hosted_tempdirs_lock:
hosted_tempdirs[nid] = fp
yield TempDir(
path=fp,
url_base=f"{web_render_config.get_instance_baseurl()}/tempdir/{nid}",
)
async with hosted_tempdirs_lock:
del hosted_tempdirs[nid]
@app.get("/tempdir/{nid}/{file_path:path}")
async def _(nid: str, file_path: str):
async with hosted_tempdirs_lock:
base_path = hosted_tempdirs.get(nid)
if base_path is None:
raise HTTPException(404)
full_path = base_path / file_path
try:
if not full_path.resolve().is_relative_to(base_path.resolve()):
raise HTTPException(status_code=403, detail="Access denied.")
except Exception:
raise HTTPException(status_code=403, detail="Access denied.")
if not full_path.is_file():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(full_path.resolve())