1 Star 0 Fork 0

吴宇航/PythonServiceDeployment

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
server_main.py 4.91 KB
一键复制 编辑 原始数据 按行查看 历史
吴宇航 提交于 2024-04-11 00:06 +08:00 . 标题: 0410同步代码
import platform
from core import settings
settings.setup()
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError, HTTPException
from starlette.exceptions import HTTPException as StarletteHTTPException
from events import evnet_shutdown
from server_api.user import user_router
from server_api.role import role_router
from server_api.select import select_router
from server_api.login import login_router
from server_api.xterm import xterm_router
from server_api.server_instance import server_instance_router
from server_api.task import task_router
from server_api.websocket import ws_router
from server_api.mission import mission_router
from server_api.project import project_router
from server_api.software import software_router
from server_api.file_manage import file_manage_router
from server_api.service_group import service_group_router
from server_api.python_service import python_service_router
from server_api.nginx_service import nginx_service_router
from server_api.build_process import build_process_router
from common.permission_util import (
get_all_endpoints,
HttpAuthenticationMiddleware,
HttpIsAuthenticatedMiddleware,
HttpOperationPermissionMiddleware,
GetRouteMiddleware
)
from common.scheduler import SchedulerMiddleware, db_scheduler
from common.router_util import get_route_by_operation_id
from common.errors import (
handler_http_exception,
handler_validation_exception,
handler_base_exception,
handler_service_error,
handler_business_error,
handle_rpc_error,
ServiceError,
CMDError,
RpcError
)
from common.responses import SuccessResponse
from db.errors import BusinessError
app = FastAPI(
title='PyDep',
description='PyDepAPI接口文档',
version='1.0.0',
on_shutdown=[evnet_shutdown],
exception_handlers={
RequestValidationError: handler_validation_exception,
CMDError: handler_business_error,
BusinessError: handler_business_error,
HTTPException: handler_http_exception,
StarletteHTTPException: handler_http_exception,
RpcError: handle_rpc_error,
ServiceError: handler_service_error,
Exception: handler_base_exception
},
)
API_PREFIX = '/api'
WS_PREFIX = '/ws'
# 按钮权限 中间件
app.add_middleware(HttpOperationPermissionMiddleware, includes=[
*user_router.routes,
*role_router.routes,
*xterm_router.routes,
*server_instance_router.routes,
*mission_router.routes,
*project_router.routes,
*software_router.routes,
*file_manage_router.routes,
*service_group_router.routes,
*python_service_router.routes,
*nginx_service_router.routes,
*build_process_router.routes,
]
)
# 需认证 中间件
app.add_middleware(HttpIsAuthenticatedMiddleware, includes=[
get_route_by_operation_id(login_router, 'logout'),
get_route_by_operation_id(login_router, 'userinfo'),
get_route_by_operation_id(login_router, 'refresh:token'),
get_route_by_operation_id(xterm_router, 'test_connect'),
*select_router.routes,
])
# 获取 当前用户中间件
app.add_middleware(HttpAuthenticationMiddleware, excludes=[
get_route_by_operation_id(login_router, 'login'),
])
# 获取当前路由中间件
app.add_middleware(GetRouteMiddleware)
# 异步定时调度器中间件
app.add_middleware(SchedulerMiddleware, scheduler=db_scheduler)
app.include_router(user_router, prefix=API_PREFIX, tags=['Users'])
app.include_router(role_router, prefix=API_PREFIX, tags=['Roles'])
app.include_router(server_instance_router, prefix=API_PREFIX, tags=['ServerInstances'])
app.include_router(mission_router, prefix=API_PREFIX, tags=['Missions'])
app.include_router(project_router, prefix=API_PREFIX, tags=['Projects'])
app.include_router(software_router, prefix=API_PREFIX, tags=['Softwares'])
app.include_router(file_manage_router, prefix=API_PREFIX, tags=['FilesManages'])
app.include_router(service_group_router, prefix=API_PREFIX, tags=['ServiceGroups'])
app.include_router(python_service_router, prefix=API_PREFIX, tags=['PythonServices'])
app.include_router(nginx_service_router, prefix=API_PREFIX, tags=['NginxServices'])
app.include_router(build_process_router, prefix=API_PREFIX, tags=['BuildProcesses'])
expand_endpoints = get_all_endpoints(app.routes, expand=True)
nexpand_endpoints = get_all_endpoints(app.routes, expand=False)
@app.get(f'{API_PREFIX}/select/permissions', operation_id='permissions:list')
def list_permissions():
return SuccessResponse(nexpand_endpoints)
@app.get(f'{API_PREFIX}/system_info', operation_id='system:info')
def system_info():
return SuccessResponse(platform.system())
app.include_router(select_router, prefix=API_PREFIX, tags=['Selectors'])
app.include_router(login_router, prefix=API_PREFIX, tags=['Logins'])
app.include_router(xterm_router, prefix=API_PREFIX, tags=['Xterm'])
app.include_router(task_router, prefix=API_PREFIX, tags=['Tasks'])
app.include_router(ws_router, prefix=WS_PREFIX, tags=['WebSockets'])
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wuyuhang409_user/PythonServiceDeployment.git
git@gitee.com:wuyuhang409_user/PythonServiceDeployment.git
wuyuhang409_user
PythonServiceDeployment
PythonServiceDeployment
master

搜索帮助