代码拉取完成,页面将自动刷新
import os
import uuid # 唯一标识符模块
import hashlib # 哈希算法模块
from fastapi import (
Cookie,
Header,
Depends,
Security,
HTTPException,
Response,
UploadFile,
Request,
Query,
Form,
APIRouter,
)
from fastapi.responses import JSONResponse, FileResponse
from fastapi.security import SecurityScopes
from config import settings
from FakeDB import file_db
router = APIRouter()
# 生成唯一命名
def unique_generator(*, length=8):
unique_name = hashlib.md5(str(uuid.uuid4()).encode("utf-8")).hexdigest()[:length]
return unique_name
# 保存上传文件,并返回分享名
async def save_file(file):
if not os.path.exists(settings.UPLOAD_DIR):
os.makedirs(settings.UPLOAD_DIR, exist_ok=True)
res = await file.read()
unique_name = unique_generator()
file_name = f"{unique_name}.{file.filename.split('.')[-1]}"
file_path = f"{settings.UPLOAD_DIR}/{file_name}"
with open(file_path, "wb") as f:
f.write(res)
return unique_name
def get_user_token(pan_token: str | None = Cookie(None)):
# print(pan_token)
if pan_token is None:
raise HTTPException(status_code=401, detail="请先登录Login获取权限")
return pan_token
def get_user_permissions(pan_token: str = Security(get_user_token)):
token = pan_token.split("-")[0]
# print("token", token)
if token == "Admin":
return "admin"
if token == "User":
return "user"
return "guest"
# 获取请求头授权信息要使用Header,使用Cookie只获取本地浏览器存储的cookie信息
def check_user(
security_scopes: SecurityScopes, user_permission: str = Depends(get_user_permissions)
): # pan_token: str | None = Header(None)
# print("security_scopes", security_scopes.scopes)
if user_permission not in security_scopes.scopes:
raise HTTPException(status_code=401, detail=f"{user_permission}权限不足")
return user_permission
# pan_token = request.cookies.get("pan_token", None)
# if pan_token is None:
# raise HTTPException(status_code=401, detail="请先登录Login获取权限")
# role = pan_token.split("-")[0]
# if role not in ["admin", "user"]:
# return "guest"
# return role
# 上传文件
@router.post("/upload_file", summary="上传文件", dependencies=[Security(check_user, scopes=["admin"])])
async def upload_file(
*, file: UploadFile, request: Request, user_permission: str = Security(check_user, scopes=["admin"])
):
print("user_permission:", user_permission)
unique_name = await save_file(file)
file_db.create_file(unique_name, file.filename)
share_code = unique_generator(length=6)
file_db.create_share_code(unique_name, share_code)
return {
"file_name": file.filename,
"unique_name": unique_name,
"code": share_code,
"url": request.url_for("file_page", unique_name=unique_name).path,
}
@router.get("/share", summary="全部文件页面")
async def share_file(request: Request):
all_files = file_db.get_all_files()
return request.app.state.templates.TemplateResponse(
"share.html", {"request": request, "all_files": all_files}
)
@router.get("/file/{unique_name}", summary="文件下载页面")
async def file_page(request: Request, unique_name: str, share_code: str | None = Query(None, min_length=6)):
file_name = file_db.get_file(unique_name)
if file_name is None:
return JSONResponse(status_code=404, content={"message": "文件不存在"})
if share_code is None:
share_code = ""
data = {
"unique_name": unique_name,
"file_name": str(file_name),
"share_code": share_code,
}
return request.app.state.templates.TemplateResponse("file.html", {"request": request, "data": data})
@router.post(
"/download/{unique_name}",
summary="文件下载",
dependencies=[Security(check_user, scopes=["user", "admin"])],
)
async def download_file(unique_name: str, share: str = Form()):
code = str(file_db.get_share_code(unique_name))
if code is None:
return {"验证码错误": "请检查分享码是否正确, 你无权限下载此文件"}
file_name = str(file_db.get_file(unique_name))
download_file = f"{unique_name}.{file_name.split('.',1)[-1]}"
file_path = f"{settings.UPLOAD_DIR}/{download_file}"
if not os.path.exists(file_path):
return JSONResponse(status_code=404, content={"message": "文件不存在"})
return FileResponse(file_path, media_type="application/octet-stream", filename=file_name)
@router.get("/login", summary="模拟登录")
async def user_login(resp: Response, user: str, token: str):
resp.set_cookie(key="pan_token", value=f"{user}-{token}", expires=600)
return {"message": f"Cookie设置{user}-{token}成功"}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。