diff --git a/models/product_model.py b/models/product_model.py new file mode 100644 index 0000000000000000000000000000000000000000..7364a3cb8fa05224c33aa1f2dab62c65bafac8f6 --- /dev/null +++ b/models/product_model.py @@ -0,0 +1,27 @@ +from sqlalchemy import Column, String, Integer +from models import CommonModel + + +class TestProduct(CommonModel): + __tablename__ = 'test_product' + + name = Column(String(64), nullable=False, comment='测试产品名称') + category_id = Column(Integer(), nullable=False, default=0, comment='大类id') + origin_plan_id = Column(Integer(), nullable=False, default=0, comment='测试产品对应的plan id') + desc = Column(String(256), nullable=True, comment='产品描述') + test_method = Column(String(256), nullable=True, comment='测试方法') + test_requirement = Column(String(256), nullable=True, comment='测试要求') + owner = Column(String(256), nullable=False, comment='测试产品执行人') + + +class ProductCategory(CommonModel): + __tablename__ = 'product_category' + + name = Column(String(64), nullable=False, comment='大类名称') + + +class ProductPlan(CommonModel): + __tablename__ = 'product_plan' + + test_product_id = Column(Integer(), nullable=False, default=0, comment='测试产品id') + plan_id = Column(Integer(), nullable=False, default=0, comment='测试产品实际复制执行的plan id') diff --git a/services/const.py b/services/const.py index 2d8c1f29a72114a696234a5fcd2d3cc0c7eced49..6e1dcf8a3158a3536743edb9368b9be3efe83aca 100644 --- a/services/const.py +++ b/services/const.py @@ -12,6 +12,7 @@ ERROR_UN_EXISTED_DEVICE = '设备不存在' ERROR_UN_EXISTED_TASK = '任务不存在' ERROR_UN_EXISTED_REQUIREMENT = '任务不存在' ERROR_UN_EXISTED_OUTLINE = '测试大纲不存在' +ERROR_UN_EXISTED_PRODUCT = '测试产品不存在' ERROR_NO_OP_PERMISSION = '缺少操作权限' ERROR_UN_EXISTED_USER = '用户不存在' ERROR_LACK_PERMISSION = '缺少权限' diff --git a/services/product_service.py b/services/product_service.py new file mode 100644 index 0000000000000000000000000000000000000000..c20b0cf0695d21b8bed297c3feb729d625a802c0 --- /dev/null +++ b/services/product_service.py @@ -0,0 +1,123 @@ +from datetime import datetime + +from common.enums import User_Role +from models.product_model import TestProduct, ProductCategory, ProductPlan +from models.plan_model import Plan +from services.plan_service import run_task, make_a_copy +from services.const import ERROR_NO_OP_PERMISSION, ERROR_UN_EXISTED_PRODUCT, ERROR_UN_EXISTED_TASK,\ + ERROR_DUPLICATED_NAME + +ERROR_UN_EXISTED_CASE_IN_PLAN = '该用例不存在于当前方案中,禁止关联' + + +async def get_products(name=None, is_paginate='', page_num=1, page_size=10): + conditions_list, conditions_dict = [], dict() + if name: + conditions_list.append(TestProduct.name.startswith(name)) + conditions_dict['name'] = f'{name}%' + product_list = await TestProduct.query_dict_all(*conditions_list) + for product in product_list: + category = await ProductCategory.query_obj_one(ProductCategory.id == product.get('category_id')) + if category: + product['category_name'] = category.name + plan = await Plan.query_obj_one(Plan.id == product.get('origin_plan_id')) + if plan: + product['plan_name'] = plan.title + return product_list + + +async def get_run_record(page_num=1, page_size=10): + product_list = await TestProduct.query_page(page_num=page_num, page_size=page_size) + for product in product_list.get('data'): + category = await ProductCategory.query_obj_one(ProductCategory.id == product.get('category_id')) + if category: + product['category_name'] = category.name + plan = await Plan.query_obj_one(Plan.id == product.get('origin_plan_id')) + if plan: + product['plan_name'] = plan.title + run_plan = list() + product_plans = await ProductPlan.query_dict_all(ProductPlan.test_product_id == product.get('id')) + for product_plan in product_plans: + plan = await Plan.query_dict_one(Plan.id == product_plan.get('plan_id')) + if plan: + run_plan.append(plan) + product['plans'] = run_plan + product['run_count'] = len(run_plan) + return product_list + + +async def get_product_by_id(test_product_id): + test_product = await TestProduct.query_obj_one(TestProduct.id == test_product_id) + if not test_product: + return ERROR_UN_EXISTED_PRODUCT, False + return test_product.to_dict(), True + + +async def create_product(data, creator): + exist_product = await TestProduct.query_obj_one(TestProduct.name == data['name']) + if exist_product: + return ERROR_DUPLICATED_NAME, False + data.update(dict({'owner': creator})) + test_product = await TestProduct().save(data) + return test_product.to_dict(), True + + +async def run_check(test_product_id, user_infos): + test_product = await TestProduct.query_obj_one(TestProduct.id == test_product_id) + if not test_product: + return ERROR_UN_EXISTED_PRODUCT, False + origin_plan = await Plan.query_obj_one(Plan.id == test_product.origin_plan_id) + if not origin_plan: + return ERROR_UN_EXISTED_TASK, False + result, ok = await make_a_copy(origin_plan.id, user_infos) + await ProductPlan().save({'test_product_id': test_product.id, 'plan_id': result['id']}) + return await run_task(result['id'], user_infos) + + +async def modify_product(data, test_product_id, user): + test_product = await TestProduct.query_obj_one(TestProduct.id == test_product_id) + if not test_product: + return ERROR_UN_EXISTED_PRODUCT, False + if test_product.owner != user['user_name'] and user['role'] == User_Role.JUNIOR.value: + return ERROR_NO_OP_PERMISSION, False + if 'name' in data: + test_product.name = data['name'] + if 'category_id' in data: + test_product.category_id = data['category_id'] + if 'origin_plan_id' in data: + test_product.origin_plan_id = data['origin_plan_id'] + if 'desc' in data: + test_product.desc = data['desc'] + if 'test_method' in data: + test_product.test_method = data['test_method'] + if 'test_requirement' in data: + test_product.test_requirement = data['test_requirement'] + test_product.gmt_modified = datetime.now() + await test_product.update() + return test_product.to_dict(), True + + +async def remove_product(test_product_id, user): + test_product = await TestProduct.query_obj_one(TestProduct.id == test_product_id) + if not test_product: + return ERROR_UN_EXISTED_PRODUCT, False + if test_product.owner != user['user_name'] and user['role'] == User_Role.JUNIOR.value: + return ERROR_NO_OP_PERMISSION, False + await test_product.remove() + return None, True + + +async def get_categories(name=None): + conditions_list, conditions_dict = [], dict() + if name: + conditions_list.append(ProductCategory.name.startswith(name)) + conditions_dict['name'] = f'{name}%' + return await ProductCategory.query_dict_all(*conditions_list) + + +async def create_category(data): + exist_category = await ProductCategory.query_obj_one(ProductCategory.name == data['name']) + if exist_category: + return ERROR_DUPLICATED_NAME, False + category = await ProductCategory().save(data) + return category.to_dict(), True diff --git a/views/product_view.py b/views/product_view.py new file mode 100644 index 0000000000000000000000000000000000000000..2621dbd394c5d0bc2d216221eb6880ada746a429 --- /dev/null +++ b/views/product_view.py @@ -0,0 +1,97 @@ +from sanic import Blueprint + +from common.http import rsp +from services.auth_service import login_auth, read_auth +from services.product_service import * +from views.utils import check_args + +bp = Blueprint('product', url_prefix='api/product') + + +@bp.route('') +@login_auth +async def get_product_list(request, user_infos): + name = request.args.get('name', None) + return rsp(data=await get_products(name)) + + +@bp.get('/test_record') +@login_auth +async def get_test_record_list(request, user_infos): + page_num = int(request.args.get('page_num')) if request.args.get('page_num') else 1 + page_size = int(request.args.get('page_size')) if request.args.get('page_size') else 50 + result = await get_run_record(page_num, page_size) + return rsp(paginate=result) + + +@bp.post('/create') +@read_auth +async def create(request, user_infos): + check_list = ['name', 'category_id', 'origin_plan_id'] + response, ok = check_args(check_list, request.json) + if not ok: + return response + result, ok = await create_product(request.json, user_infos['user_name']) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result) + + +@bp.get('/') +@login_auth +async def get_product(_, test_product_id, user_infos): + result, ok = await get_product_by_id(test_product_id) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result) + + +@bp.post('/edit/') +@read_auth +async def edit(request, test_product_id, user_infos): + check_list = ['name', 'category_id', 'origin_plan_id'] + response, ok = check_args(check_list, request.json) + if not ok: + return response + result, ok = await modify_product(request.json, test_product_id, user_infos) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result) + + +@bp.delete('/') +@read_auth +async def delete(_, test_product_id, user_infos): + result, ok = await remove_product(test_product_id, user_infos) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result) + + +@bp.get('/run/') +@read_auth +async def run(_, test_product_id, user_infos): + result, ok = await run_check(test_product_id, user_infos) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result) + + +@bp.get('/get_category') +@login_auth +async def get_category_list(request, user_infos): + name = request.args.get('name', None) + return rsp(data=await get_categories(name)) + + +@bp.post('/create_category') +@read_auth +async def create(request, user_infos): + check_list = ['name'] + response, ok = check_args(check_list, request.json) + if not ok: + return response + result, ok = await create_category(request.json) + if not ok: + return rsp(code=500, msg=result) + return rsp(data=result)