diff --git a/core/package_manager.py b/core/package_manager.py index e005c6b0ad364848daab956b3e198ee2e88484fd..561cc0e94383b32ac9e70067be6a18fc592e3851 100755 --- a/core/package_manager.py +++ b/core/package_manager.py @@ -46,7 +46,9 @@ class OBSPkgManager(object): self.patch_file_path = os.path.join(self.work_dir, "diff_patch") self.giteeUserName = self.kwargs["gitee_user"] self.giteeUserPwd = self.kwargs["gitee_pwd"] + self.sync_code = self.kwargs["sync_code"] self.import_list = [] + self.parent_dir = '' def _pre_env(self): """ @@ -86,7 +88,7 @@ class OBSPkgManager(object): os.system("osc co %s %s &>/dev/null" % (proj, pkg)) else: os.system("osc co %s `osc ls %s | sed -n '1p'` &>/dev/null" % (proj, proj)) - pkg_path = os.path.join(self.obs_meta_path, '%s/%s/%s' % (branch_name, proj, pkg)) + pkg_path = os.path.join(self.obs_meta_path, self.parent_dir, '%s/%s/%s' % (branch_name, proj, pkg)) os.chdir(proj) if os.path.exists(pkg): os.system("cp -rf %s ." % pkg_path) @@ -111,14 +113,14 @@ class OBSPkgManager(object): write and push _service file in obs_meta return 0 or -1 """ - proj_path = os.path.join(self.obs_meta_path, branch_name, proj) + proj_path = os.path.join(self.obs_meta_path, self.parent_dir, branch_name, proj) service_file = os.path.join(proj_path, pkg, "_service") if not os.path.exists(proj_path): - log.warning("obs_meta do not have %s %s" % (branch_name, proj)) + log.warning("obs_meta do not have %s %s %s" % (self.parent_dir, branch_name, proj)) return -1 if os.system("test -f %s" % service_file) == 0: - log.warning("obs_meta haved %s %s %s _service file, no need to add." - % (branch_name, proj, pkg)) + log.warning("obs_meta haved %s %s %s %s _service file, no need to add." + % (self.parent_dir, branch_name, proj, pkg)) return -1 os.chdir(proj_path) if not os.path.exists(pkg): @@ -130,12 +132,14 @@ class OBSPkgManager(object): f.write(' repo\n') if branch_name == "master": f.write(' next/openEuler/%s\n' % pkg) - else: + if self.parent_dir == "": f.write(' next/%s/%s\n' % (branch_name, pkg)) + else: + f.write(' next/%s/%s/%s\n' % (self.parent_dir, branch_name, pkg)) f.write(' \n') f.write('\n') f.close() - os.chdir("%s/%s/%s" % (self.obs_meta_path, branch_name, proj)) + os.chdir("%s/%s/%s/%s" % (self.obs_meta_path, self.parent_dir, branch_name, proj)) os.system("git add %s" % pkg) os.system("git commit -m 'add _service file by %s'" % self.giteeUserName) log.info("add %s %s _service file by %s" % (proj, pkg, self.giteeUserName)) @@ -191,15 +195,15 @@ class OBSPkgManager(object): delete the obs_meta project pkg service file return 0 or -1 """ - proj_path = os.path.join(self.obs_meta_path, branch, proj) + proj_path = os.path.join(self.obs_meta_path, self.parent_dir, branch, proj) service_file = os.path.join(proj_path, pkg, "_service") if os.system("test -f %s" % service_file) != 0: - log.warning("obs_meta not have %s %s %s _service file" % (branch, proj, pkg)) + log.warning("obs_meta not have %s %s %s %s _service file" % (self.parent_dir, branch, proj, pkg)) return -1 os.chdir(proj_path) os.system("rm -rf %s" % pkg) os.system("git add -A && git commit -m 'delete %s by %s'" % (pkg, self.giteeUserName)) - log.info("delete obs_meta %s %s %s by %s" % (branch, proj, pkg, self.giteeUserName)) + log.info("delete obs_meta %s %s %s %s by %s" % (self.parent_dir, branch, proj, pkg, self.giteeUserName)) for i in range(5): if os.system("git push") == 0: break @@ -217,7 +221,7 @@ class OBSPkgManager(object): os.chdir(self.work_dir) proj_path = os.path.join(self.work_dir, proj) pkg_path = os.path.join(proj_path, pkg) - service_file_path = os.path.join(self.obs_meta_path, "%s/%s/%s/_service" + service_file_path = os.path.join(self.obs_meta_path, self.parent_dir, "%s/%s/%s/_service" % (branch_name, proj, pkg)) if os.path.exists(proj_path): shutil.rmtree(proj) @@ -238,7 +242,7 @@ class OBSPkgManager(object): log.warning("%s %s not found" % (proj, pkg)) return -1 os.chdir(self.work_dir) - file_path = os.path.join(self.obs_meta_path, "%s/%s/%s/.osc/_meta" + file_path = os.path.join(self.obs_meta_path, self.parent_dir, "%s/%s/%s/.osc/_meta" % (branch_name, proj, pkg)) cmd = "osc meta pkg %s %s --file=%s | grep ^Done." % (proj, pkg, file_path) if os.system(cmd) != 0: @@ -272,14 +276,23 @@ class OBSPkgManager(object): log.info("line:%s" % line) new_proj = '' new_file_path = '' + parent_dir = '' log_list = list(line.split()) temp_log_type = log_list[0] file_path = log_list[1] if len(log_list) == 3: new_file_path = list(line.split())[2] - branch_name, proj, pkg, file_name = str(file_path).split('/') + tmp_str = str(file_path).split('/') + if len(tmp_str) == 5: + parent_dir, branch_name, proj, pkg, file_name = tmp_str + else: + branch_name, proj, pkg, file_name = str(file_path).split('/') if len(new_file_path) != 0: - new_proj = new_file_path.split('/')[1] + new_file_path_list = new_file_path.split('/') + if len(new_file_path_list) == 5: + new_proj = new_file_path_list[2] + else: + new_proj = new_file_path_list[1] log_type = "Change-pkg-prj" elif file_name == "_meta": if temp_log_type == "A" or temp_log_type == "M": @@ -290,7 +303,7 @@ class OBSPkgManager(object): if temp_log_type == "A": log_type = "Add-pkg" elif temp_log_type == "D": - pkg_path = os.path.join(self.obs_meta_path, '%s/%s/%s' % (branch_name, proj, pkg)) + pkg_path = os.path.join(self.obs_meta_path, parent_dir, '%s/%s/%s' % (branch_name, proj, pkg)) if os.path.exists(pkg_path): log_type = "Del-pkg-service" else: @@ -304,7 +317,7 @@ class OBSPkgManager(object): log.error("%s failed" % line) else: log.error("%s failed" % line) - mesg_list = [log_type, branch_name, proj, pkg, new_proj] + mesg_list = [log_type, branch_name, proj, pkg, new_proj, parent_dir] return mesg_list def _deal_some_param(self): @@ -321,13 +334,14 @@ class OBSPkgManager(object): if os.popen(cmd1).read(): tmp = {} line = line.strip('\n') - log_type, branch_name, proj, pkg, new_proj = self._parse_git_log(line) + log_type, branch_name, proj, pkg, new_proj, parent_dir = self._parse_git_log(line) tmp["log_type"] = log_type tmp["branch_name"] = branch_name tmp["proj"] = proj tmp["pkg"] = pkg tmp["new_proj"] = new_proj tmp["exist_flag"] = 0 + tmp["parent_dir"] = parent_dir for p in proj_list: cmd3 = "osc ls %s 2>&1 | grep -q -Fx %s" % (p, pkg) if os.system(cmd3) == 0: @@ -353,10 +367,12 @@ class OBSPkgManager(object): self._deal_some_param() log.info(self.import_list) for msg in self.import_list: + self.parent_dir = msg["parent_dir"] if msg["log_type"] == "Add-pkg": if msg["exist_flag"] == 0: self._add_pkg(msg["proj"], msg["pkg"], msg["branch_name"]) - self._sync_pkg_code(msg["proj"], msg["pkg"], msg["branch_name"]) + if self.sync_code: + self._sync_pkg_code(msg["proj"], msg["pkg"], msg["branch_name"]) elif msg["log_type"] == "Del-pkg": self._del_pkg(msg["proj"], msg["pkg"]) elif msg["log_type"] == "Del-pkg-service": @@ -367,6 +383,8 @@ class OBSPkgManager(object): self._modify_pkg_meta(msg["proj"], msg["pkg"], msg["branch_name"]) elif msg["log_type"] == "Change-pkg-prj": self._change_pkg_prj(msg["proj"], msg["new_proj"], msg["pkg"], msg["branch_name"]) + if self.sync_code: + self._sync_pkg_code(msg["new_proj"], msg["pkg"], msg["branch_name"]) return 0 def _parse_yaml_data(self): @@ -446,7 +464,8 @@ class OBSPkgManager(object): if len(need_add): for pkgname in list(need_add): self._add_pkg(proj, pkgname, meta_pb_dict[proj]) - self._sync_pkg_code(proj, pkgname, meta_pb_dict[proj]) + if self.sync_code: + self._sync_pkg_code(proj, pkgname, meta_pb_dict[proj]) if len(need_del): for pkgname in list(need_del): self._del_pkg(proj, pkgname) diff --git a/openeuler_obs.py b/openeuler_obs.py index 262f6624f24c1a7f505968bf08563656941925f8..614e4d230ad21ba52f8fea11e899d346b2656c50 100644 --- a/openeuler_obs.py +++ b/openeuler_obs.py @@ -85,6 +85,9 @@ par.add_argument("-cps", "--check_pkg_service", default=False, help="check if there are any problems with the content of the _service file in the rpm package", required=False) par.add_argument("-prid", "--pr_id", default=False, help="use the pr_id to get this pullrequest", required=False) +par.add_argument("-sc", "--sync_code", default=True, + help="when adding package to project or changing package project, \ + the code should be synchronized. type bool, default True", required=False) par.add_argument("-a", "--ALL_", default=False, help="update all obs repo rpms", required=False) @@ -119,6 +122,7 @@ kw = { "check_codes": args.check_codes, "check_pkg_service": args.check_pkg_service, "pr_id": args.pr_id, + "sync_code": args.sync_code, "all":args.ALL_ } diff --git a/test/test_package.py b/test/test_package.py new file mode 100644 index 0000000000000000000000000000000000000000..baabdfa5754757d440da058555ed566199811a87 --- /dev/null +++ b/test/test_package.py @@ -0,0 +1,210 @@ +#/bin/env python3 +# -*- encoding=utf8 -*- +#****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# Author: wangchong +# Create: 2021-04-23 +# ****************************************************************************** +""" +test all: python3 -m pytest -s test_package.py +test one class: python3 -m pytest -s test_package.py::TestCase +test one class one case: python3 -m pytest -s test_package.py::TestCase::test_1 +""" +import os +import sys +import pytest +current_path = os.path.join(os.path.split(os.path.realpath(__file__))[0]) +sys.path.append(os.path.join(current_path, "..")) +from func import SetEnv, CommonFunc +from core.package_manager import OBSPkgManager + + +S = SetEnv() +C = CommonFunc() +obs_meta_path = None +P = None + + +class TestCase(object): + def setup_class(self): + S.set_oscrc() + S.set_gitee() + global obs_meta_path + global P + obs_meta_path = C.pull_from_gitee_repo(S.gitee_info["user"], S.gitee_info["passwd"], \ + "https://gitee.com/{0}/obs_meta".format(S.gitee_info["user"]), "master", "obs_meta") + kw = {"obs_meta_path": obs_meta_path, + "gitee_user": S.gitee_info["user"], + "gitee_pwd": S.gitee_info["passwd"], + "sync_code": False, + "branch2": "", "project2": "", + "check_yaml": "", "check_meta": "", + } + P = OBSPkgManager(**kw) + for i in range(1, 3): + cmd = "osc list | grep home:{0}:test{1}".format(S.obs_info["user"], i) + if os.system(cmd) == 0: + cmd = "osc api -X DELETE /source/home:{0}:test{1}".format(S.obs_info["user"], i) + ret = os.system(cmd) + file_msg = """ + + + <description/> + <person userid=\\"{2}\\" role=\\"maintainer\\"/> +</project> + """.format(S.obs_info["user"], i, S.obs_info["user"]) + cmd = "echo \"{0}\" > {1}/_meta_test".format(file_msg, obs_meta_path) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + cmd = "osc api -X PUT /source/home:{0}:test{1}/_meta -T {2}/_meta_test".format(S.obs_info["user"], \ + i, obs_meta_path) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + cmd = "cd {0} && mkdir -p multi-version/test-rock/home:{1}:test{2}".format(obs_meta_path, S.obs_info["user"], i) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, "multi-version/test-rock/home:{0}:test{1}".format(S.obs_info["user"], i)) + + def teardown_class(self): + S.unset_oscrc() + S.unset_gitee() + for i in range(1, 3): + cmd = "osc api -X DELETE /source/home:{0}:test{1}".format(S.obs_info["user"], i) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to delete home:{0}:test{1} after testing".format(S.obs_info["user"], i) + cmd = "rm -fr {0}".format(obs_meta_path) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + + def test_1(self): + """ + test for creating package for multi-version + """ + assert os.path.exists(obs_meta_path), "{0} not exist".format(obs_meta_path) + for i in range(1, 3): + file_msg = """ +<services> + <service name=\\"tar_scm_kernel_repo\\"> + <param name=\\"scm\\">repo</param> + <param name=\\"url\\">next/multi-version/test-rock/mytest{0}</param> + </service> +</services> + """.format(i) + prj_path = os.path.join(obs_meta_path, "multi-version/test-rock/home:{0}:test{1}".format(S.obs_info["user"], i)) + cmd = "cd {0} && mkdir mytest{1} && echo \"{2}\" > mytest{3}/_service".format(prj_path, i, file_msg, i) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, \ + "multi-version/test-rock/home:{0}:test1/mytest1/_service".format(S.obs_info["user"]), \ + "multi-version/test-rock/home:{0}:test2/mytest2/_service".format(S.obs_info["user"])) + P.obs_pkg_admc() + for i in range(1, 3): + cmd = "osc list home:{0}:test{1} mytest{2} _service".format(S.obs_info["user"], i, i) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to create package mytest{0} in project home:{1}:test{2}".format(i, S.obs_info["user"], i) + + def test_2(self): + """ + test for modify package _service for multi-version + """ + assert os.path.exists(obs_meta_path), "{0} not exist".format(obs_meta_path) + cmd = "cd {0} && sed -i 's/mytest1/mytest1-new/g' \ + multi-version/test-rock/home:{1}:test1/mytest1/_service".format(obs_meta_path, S.obs_info["user"]) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, "multi-version/test-rock/home:{0}:test1/mytest1/_service".format(S.obs_info["user"])) + P.obs_pkg_admc() + cmd = "osc api -X GET /source/home:{0}:test1/mytest1/_service".format(S.obs_info["user"]) + ret = os.popen(cmd).read() + if "mytest1-new" in ret: + assert True + else: + assert False, "fail to modify package _service" + + def test_3(self): + """ + test for change package project for multi-version + """ + assert os.path.exists(obs_meta_path), "{0} not exist".format(obs_meta_path) + cmd = "cd {0} && mv multi-version/test-rock/home:{1}:test1/mytest1 \ + multi-version/test-rock/home:{2}:test2/".format(obs_meta_path, S.obs_info["user"], S.obs_info["user"]) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, \ + "multi-version/test-rock/home:{0}:test1/mytest1".format(S.obs_info["user"]), \ + "multi-version/test-rock/home:{0}:test2/mytest1".format(S.obs_info["user"])) + P.obs_pkg_admc() + cmd = "osc list home:{0}:test1 | grep ^mytest1$".format(S.obs_info["user"]) + ret1 = os.popen(cmd).read() + cmd = "osc list home:{0}:test2 | grep ^mytest1$".format(S.obs_info["user"]) + ret2 = os.popen(cmd).read() + if "mytest1" not in ret1 and "mytest1" in ret2: + assert True + else: + assert False, "fail to change package project" + + def test_4(self): + """ + test for delete package _service for multi-version + """ + assert os.path.exists(obs_meta_path), "{0} not exist".format(obs_meta_path) + cmd = "cd {0} && rm -f multi-version/test-rock/home:{1}:test2/mytest1/_service".format( + obs_meta_path, S.obs_info["user"]) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, \ + "multi-version/test-rock/home:{0}:test2/mytest1/_service".format(S.obs_info["user"])) + P.obs_pkg_admc() + cmd = "osc api -X GET /source/home:{0}:test2/mytest1/_service".format(S.obs_info["user"]) + if os.system(cmd) != 0: + assert True + else: + assert False, "fail to delete package _service" + + def test_5(self): + """ + test for delete package for multi-version + """ + assert os.path.exists(obs_meta_path), "{0} not exist".format(obs_meta_path) + cmd = "cd {0} && rm -rf multi-version/test-rock/home:{1}:test2/mytest1".format( + obs_meta_path, S.obs_info["user"]) + if os.system(cmd) == 0: + assert True + else: + assert False, "fail to exec cmd:{0}".format(cmd) + C.commit_to_gitee_repo(obs_meta_path, \ + "multi-version/test-rock/home:{0}:test2/mytest1".format(S.obs_info["user"])) + P.obs_pkg_admc() + cmd = "osc list home:{0}:test2 | grep mytest1".format(S.obs_info["user"]) + if os.system(cmd) != 0: + assert True + else: + assert False, "fail to delete package"