代码拉取完成,页面将自动刷新
import platform
from core import settings
settings.setup()
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError, HTTPException
from starlette.exceptions import HTTPException as StarletteHTTPException
from events import evnet_shutdown
from server_api.user import user_router
from server_api.role import role_router
from server_api.select import select_router
from server_api.login import login_router
from server_api.xterm import xterm_router
from server_api.server_instance import server_instance_router
from server_api.task import task_router
from server_api.websocket import ws_router
from server_api.mission import mission_router
from server_api.project import project_router
from server_api.software import software_router
from server_api.file_manage import file_manage_router
from server_api.service_group import service_group_router
from server_api.python_service import python_service_router
from server_api.nginx_service import nginx_service_router
from server_api.build_process import build_process_router
from common.permission_util import (
get_all_endpoints,
HttpAuthenticationMiddleware,
HttpIsAuthenticatedMiddleware,
HttpOperationPermissionMiddleware,
GetRouteMiddleware
)
from common.scheduler import SchedulerMiddleware, db_scheduler
from common.router_util import get_route_by_operation_id
from common.errors import (
handler_http_exception,
handler_validation_exception,
handler_base_exception,
handler_service_error,
handler_business_error,
handle_rpc_error,
ServiceError,
CMDError,
RpcError
)
from common.responses import SuccessResponse
from db.errors import BusinessError
app = FastAPI(
title='PyDep',
description='PyDepAPI接口文档',
version='1.0.0',
on_shutdown=[evnet_shutdown],
exception_handlers={
RequestValidationError: handler_validation_exception,
CMDError: handler_business_error,
BusinessError: handler_business_error,
HTTPException: handler_http_exception,
StarletteHTTPException: handler_http_exception,
RpcError: handle_rpc_error,
ServiceError: handler_service_error,
Exception: handler_base_exception
},
)
API_PREFIX = '/api'
WS_PREFIX = '/ws'
# 按钮权限 中间件
app.add_middleware(HttpOperationPermissionMiddleware, includes=[
*user_router.routes,
*role_router.routes,
*xterm_router.routes,
*server_instance_router.routes,
*mission_router.routes,
*project_router.routes,
*software_router.routes,
*file_manage_router.routes,
*service_group_router.routes,
*python_service_router.routes,
*nginx_service_router.routes,
*build_process_router.routes,
]
)
# 需认证 中间件
app.add_middleware(HttpIsAuthenticatedMiddleware, includes=[
get_route_by_operation_id(login_router, 'logout'),
get_route_by_operation_id(login_router, 'userinfo'),
get_route_by_operation_id(login_router, 'refresh:token'),
get_route_by_operation_id(xterm_router, 'test_connect'),
*select_router.routes,
])
# 获取 当前用户中间件
app.add_middleware(HttpAuthenticationMiddleware, excludes=[
get_route_by_operation_id(login_router, 'login'),
])
# 获取当前路由中间件
app.add_middleware(GetRouteMiddleware)
# 异步定时调度器中间件
app.add_middleware(SchedulerMiddleware, scheduler=db_scheduler)
app.include_router(user_router, prefix=API_PREFIX, tags=['Users'])
app.include_router(role_router, prefix=API_PREFIX, tags=['Roles'])
app.include_router(server_instance_router, prefix=API_PREFIX, tags=['ServerInstances'])
app.include_router(mission_router, prefix=API_PREFIX, tags=['Missions'])
app.include_router(project_router, prefix=API_PREFIX, tags=['Projects'])
app.include_router(software_router, prefix=API_PREFIX, tags=['Softwares'])
app.include_router(file_manage_router, prefix=API_PREFIX, tags=['FilesManages'])
app.include_router(service_group_router, prefix=API_PREFIX, tags=['ServiceGroups'])
app.include_router(python_service_router, prefix=API_PREFIX, tags=['PythonServices'])
app.include_router(nginx_service_router, prefix=API_PREFIX, tags=['NginxServices'])
app.include_router(build_process_router, prefix=API_PREFIX, tags=['BuildProcesses'])
expand_endpoints = get_all_endpoints(app.routes, expand=True)
nexpand_endpoints = get_all_endpoints(app.routes, expand=False)
@app.get(f'{API_PREFIX}/select/permissions', operation_id='permissions:list')
def list_permissions():
return SuccessResponse(nexpand_endpoints)
@app.get(f'{API_PREFIX}/system_info', operation_id='system:info')
def system_info():
return SuccessResponse(platform.system())
app.include_router(select_router, prefix=API_PREFIX, tags=['Selectors'])
app.include_router(login_router, prefix=API_PREFIX, tags=['Logins'])
app.include_router(xterm_router, prefix=API_PREFIX, tags=['Xterm'])
app.include_router(task_router, prefix=API_PREFIX, tags=['Tasks'])
app.include_router(ws_router, prefix=WS_PREFIX, tags=['WebSockets'])
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。