1 Star 1 Fork 0

runler/FastApi20240815

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
pan.py 4.74 KB
一键复制 编辑 原始数据 按行查看 历史
runler 提交于 2024-08-18 15:57 +08:00 . Security权限控制
import os
import uuid # 唯一标识符模块
import hashlib # 哈希算法模块
from fastapi import (
Cookie,
Header,
Depends,
Security,
HTTPException,
Response,
UploadFile,
Request,
Query,
Form,
APIRouter,
)
from fastapi.responses import JSONResponse, FileResponse
from fastapi.security import SecurityScopes
from config import settings
from FakeDB import file_db
router = APIRouter()
# 生成唯一命名
def unique_generator(*, length=8):
unique_name = hashlib.md5(str(uuid.uuid4()).encode("utf-8")).hexdigest()[:length]
return unique_name
# 保存上传文件,并返回分享名
async def save_file(file):
if not os.path.exists(settings.UPLOAD_DIR):
os.makedirs(settings.UPLOAD_DIR, exist_ok=True)
res = await file.read()
unique_name = unique_generator()
file_name = f"{unique_name}.{file.filename.split('.')[-1]}"
file_path = f"{settings.UPLOAD_DIR}/{file_name}"
with open(file_path, "wb") as f:
f.write(res)
return unique_name
def get_user_token(pan_token: str | None = Cookie(None)):
# print(pan_token)
if pan_token is None:
raise HTTPException(status_code=401, detail="请先登录Login获取权限")
return pan_token
def get_user_permissions(pan_token: str = Security(get_user_token)):
token = pan_token.split("-")[0]
# print("token", token)
if token == "Admin":
return "admin"
if token == "User":
return "user"
return "guest"
# 获取请求头授权信息要使用Header,使用Cookie只获取本地浏览器存储的cookie信息
def check_user(
security_scopes: SecurityScopes, user_permission: str = Depends(get_user_permissions)
): # pan_token: str | None = Header(None)
# print("security_scopes", security_scopes.scopes)
if user_permission not in security_scopes.scopes:
raise HTTPException(status_code=401, detail=f"{user_permission}权限不足")
return user_permission
# pan_token = request.cookies.get("pan_token", None)
# if pan_token is None:
# raise HTTPException(status_code=401, detail="请先登录Login获取权限")
# role = pan_token.split("-")[0]
# if role not in ["admin", "user"]:
# return "guest"
# return role
# 上传文件
@router.post("/upload_file", summary="上传文件", dependencies=[Security(check_user, scopes=["admin"])])
async def upload_file(
*, file: UploadFile, request: Request, user_permission: str = Security(check_user, scopes=["admin"])
):
print("user_permission:", user_permission)
unique_name = await save_file(file)
file_db.create_file(unique_name, file.filename)
share_code = unique_generator(length=6)
file_db.create_share_code(unique_name, share_code)
return {
"file_name": file.filename,
"unique_name": unique_name,
"code": share_code,
"url": request.url_for("file_page", unique_name=unique_name).path,
}
@router.get("/share", summary="全部文件页面")
async def share_file(request: Request):
all_files = file_db.get_all_files()
return request.app.state.templates.TemplateResponse(
"share.html", {"request": request, "all_files": all_files}
)
@router.get("/file/{unique_name}", summary="文件下载页面")
async def file_page(request: Request, unique_name: str, share_code: str | None = Query(None, min_length=6)):
file_name = file_db.get_file(unique_name)
if file_name is None:
return JSONResponse(status_code=404, content={"message": "文件不存在"})
if share_code is None:
share_code = ""
data = {
"unique_name": unique_name,
"file_name": str(file_name),
"share_code": share_code,
}
return request.app.state.templates.TemplateResponse("file.html", {"request": request, "data": data})
@router.post(
"/download/{unique_name}",
summary="文件下载",
dependencies=[Security(check_user, scopes=["user", "admin"])],
)
async def download_file(unique_name: str, share: str = Form()):
code = str(file_db.get_share_code(unique_name))
if code is None:
return {"验证码错误": "请检查分享码是否正确, 你无权限下载此文件"}
file_name = str(file_db.get_file(unique_name))
download_file = f"{unique_name}.{file_name.split('.',1)[-1]}"
file_path = f"{settings.UPLOAD_DIR}/{download_file}"
if not os.path.exists(file_path):
return JSONResponse(status_code=404, content={"message": "文件不存在"})
return FileResponse(file_path, media_type="application/octet-stream", filename=file_name)
@router.get("/login", summary="模拟登录")
async def user_login(resp: Response, user: str, token: str):
resp.set_cookie(key="pan_token", value=f"{user}-{token}", expires=600)
return {"message": f"Cookie设置{user}-{token}成功"}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/runler/fast-api20240815.git
git@gitee.com:runler/fast-api20240815.git
runler
fast-api20240815
FastApi20240815
master

搜索帮助