1 Star 1 Fork 0

single_yang/FastApi-Demo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
app.py 2.44 KB
一键复制 编辑 原始数据 按行查看 历史
杨宇航 提交于 2020-12-12 17:08 +08:00 . 添加代码
import uvicorn
from fastapi import FastAPI
from conf import Item, JenkinsConf
from message import message
from jenkins import Jenkins
app = FastAPI()
@app.post('/code_review')
async def code_review(item: Item):
if item.action is None:
return {"success": False, "errorText": "只支持「合并请求相关」操作"}
# 获取接口数据
merge_user = item.mergeRequest["user"] # 合并请求创建人信息
merge_title, merge_title_url = item.mergeRequest['title'], item.mergeRequest['html_url'] # 合并请求标题、前端地址
user_name, user_url = merge_user['name'], merge_user['html_url'] # 合并请求创建人姓名、coding主页
origin_branch, target_branch = item.mergeRequest['head']['ref'], item.mergeRequest['base']['ref'] # 源分支、目标分支
project_name, project_url = item.repository['name'], item.repository['html_url'] # 项目名称、项目coding主页
kwargs = {"merge_title": merge_title, "merge_title_url": merge_title_url, "user_name": user_name,
"user_url": user_url, "origin_branch": origin_branch, "target_branch": target_branch,
"project_name": project_name, "project_url": project_url}
if item.action == "create":
# 发送review通知到钉钉群
message(**kwargs)
return {"success": True, "errorText": ""}
if item.action == "merge":
# 需要传递给jenkins的数据
git_url = item.repository.get("ssh_url") # git clone仓库地址
job_name = "fvs-common" if target_branch == "master" else "fvs-common-test" # Jenkins任务名称
# 初始化Jenkins
server = Jenkins(url=JenkinsConf.URL, username=JenkinsConf.USER_ID, password=JenkinsConf.API_TOKEN)
# 组合Jenkins任务参数字典
params = {"GIT_URL": git_url, "PROJECT_NAME": project_name, "FVS_NAME": "-".join(project_name.split("_")),
"USER_NAME": user_name, "MERGE_TITLE": merge_title, "MERGE_URL": merge_title_url}
# 触发构建任务
output = server.build_job(name=job_name, parameters=params)
return {"success": True, "errorText": output}
else:
return {"success": False, "errorText": "只支持「合并、创建合并请求」操作"}
if __name__ == '__main__':
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["access"]["fmt"] = "%(asctime)s - %(levelname)s - %(message)s"
uvicorn.run(app, log_config=log_config, host="0.0.0.0", port=5000)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/single_yang/fast-api-demo.git
git@gitee.com:single_yang/fast-api-demo.git
single_yang
fast-api-demo
FastApi-Demo
master

搜索帮助