diff --git a/core/check_meta_service.py b/core/check_meta_service.py index 41e0ec65fcdf57177fd4704bd183e770b401e4c7..a097c16679a709ce0ab5e2ee9853985490f26aec 100644 --- a/core/check_meta_service.py +++ b/core/check_meta_service.py @@ -18,8 +18,12 @@ check the format and URL of the service file """ import os import sys +Now_path = os.path.join(os.path.split(os.path.realpath(__file__))[0]) +sys.path.append(os.path.join(Now_path, "..")) from common.log_obs import log from xml.etree import ElementTree as ET +from bs4 import BeautifulSoup +import xml.dom.minidom class CheckMetaPull(object): """ @@ -32,6 +36,9 @@ class CheckMetaPull(object): """ self.kwargs = kwargs self.prid = self.kwargs['pr_id'] + self.branch = self.kwargs['branch'] + self.current_path = os.getcwd() + self.check_error = [] def _clean(self): """ @@ -57,18 +64,20 @@ class CheckMetaPull(object): if "Already" in pull_result[0]: log.info(pull_result) log.info("Success to clone obs_meta") - break + return else: os.popen("if [ -d obs_meta ];then rm -rf obs_meta") + raise SystemExit("*******obs_meta-clone-error:please check your net*******") - def _get_new_pkg(self): + def _get_change_file(self): """ Gets a list of newly added files in obs_meta """ + current_path = os.getcwd() fetch_cmd = "git fetch origin pull/%s/head:thispr" % self.prid branch_cmd = "git branch -a | grep 'thispr'" checkout_cmd = "git checkout thispr" - changed_file_cmd = "git log --name-status -1" + changed_file_cmd = "git diff --name-status HEAD~1 HEAD~0" for x in range(5): os.chdir("obs_meta") fetch_result = os.popen(fetch_cmd).read() @@ -83,90 +92,302 @@ class CheckMetaPull(object): self._get_latest_obs_meta() show_result = os.popen(checkout_cmd).readlines() log.info(show_result) - changed_file_result = os.popen(changed_file_cmd).readlines() - log.info(changed_file_result) - new_pkg_path = [] - for pkg in changed_file_result: - if "A\t" in pkg: - all_msg = pkg.replace("\n", "") - log.info(all_msg) - pkg_path = all_msg.replace("A\t", "") - new_pkg_path.append(pkg_path) - log.info("All_change_pkg:%s" % new_pkg_path) - if not new_pkg_path: - log.info("There have no Package add") - sys.exit() + changed_file = os.popen(changed_file_cmd).readlines() + log.info(changed_file) + os.chdir("../") + return changed_file + + def _parse_git_log(self, line): + """ + deal diff_patch line mesg + """ + log.info("line:%s" % line) + new_file_path = '' + log_list = list(line.split()) + temp_log_type = log_list[0] + if len(log_list) == 3: + if log_list[1].split('/')[0] != log_list[2].split('/')[0] and \ + log_list[1].split('/')[-2] == log_list[2].split('/')[-2]: + log.error("ERROR_COMMIT: %s" % line) + log.error("FAILED:Your PR contains the movement of same package between branches") + raise SystemExit("*******PLEASE CHECK YOUR PR*******") + else: + new_file_path = list(line.split())[2] + elif len(log_list) == 2: + if temp_log_type != "D": + new_file_path = list(line.split())[1] + log.info(new_file_path) + return new_file_path + + def _check_file_format(self, file_path): + """ + Function modules: Check the format of the service file + """ + try: + ET.parse(file_path) + log.info("**************FORMAT CORRECT***************") + log.info("The %s has a nice format" % file_path) + log.info("**************FORMAT CORRECT***************") + except Exception as e: + log.error("**************FORMAT ERROR*****************") + log.error("MAY be %s has a bad format" % file_path) + log.error("%s format bad Because:%s" % (file_path, e)) + log.error("**************FORMAT ERROR*****************") + return "yes" + + def _check_pkg_services(self, pkg_path): + """ + Check the format of the service file and the correctness of its URL + """ + os.chdir("%s/obs_meta" % self.current_path) + error_flag = self._check_file_format(pkg_path) + service_path = pkg_path + url_list = [] + service_name = [] + service_branch = [] + service_next = [] + with open(service_path, "r") as f: + log.info('\n' + f.read()) + dom = xml.dom.minidom.parse(pkg_path) + root = dom.documentElement + param_lines = root.getElementsByTagName('param') + for url in param_lines: + if url.getAttribute("name") == "url": + url_list.append(url.firstChild.data.strip('/')) + log.info("ALL_URL:%s" % url_list) + for all_url in url_list: + service_next.append(all_url.split('/')[0]) + service_name.append(all_url.split('/')[-1]) + service_branch.append(all_url.split('/')[-2]) + pkg_name = pkg_path.split('/')[-2] + #Support Multi_Version + if "Multi" in pkg_path.split('/')[0] or "multi" in pkg_path.split('/')[0]: + pkg_url = pkg_path.split('/')[1] else: - return new_pkg_path + pkg_url = pkg_path.split('/')[0] + log.info("Url_Next:%s" % service_next) + log.info("Pkgname_in_Service:%s" % service_name) + log.info("pkgbranch_in_Service:%s" % service_branch) + log.info("Pkgname_in_Obe_meta_path:%s" % pkg_name) + log.info("Pkgbranch_in_Obs_meta_path:%s" % pkg_url) + if list(set(service_branch))[0] == "openEuler" and pkg_url == "master" \ + and len(list(set(service_branch))) == 1 and list(set(service_next))[0] == "next" \ + and len(list(set(service_next))) == 1: + if pkg_name in service_name: + log.info("SUCCESS_CHECK:The %s in _service url is correct" % pkg_name) + else: + log.error("**************_Service URL ERROR*****************") + log.error("FAILED:The %s in _service pkgname is not same" % pkg_name) + error_flag = "yes" + elif list(set(service_branch))[0] == pkg_url and len(list(set(service_branch))) == 1 \ + and list(set(service_next))[0] == "next" and len(list(set(service_next))) == 1: + if pkg_name in service_name: + log.info("SUCCESS_CHECk:The %s in _service url is correct" % pkg_name) + else: + log.error("**************_Service URL ERROR*****************") + log.error("FAILED:The %s in _service pkgname is not same" % pkg_name) + error_flag = "yes" + else: + log.error("**************_Service URL ERROR*****************") + log.error("FAILED:You need to check the branch in your %s _service again" % pkg_name) + error_flag = "yes" + if error_flag == "yes": + self.check_error.append(pkg_path) + return error_flag - def _check_pkg_services(self, pkg_path_list): - """ - Function modules: Check the format of the service file and the correctness of its URL - """ - error_flag = "" - service_flag = "" - for pkg_path in pkg_path_list: - if "_service" in pkg_path: - service_flag = "exist" - try: - ET.parse(pkg_path) - log.info("**************FORMAT CORRECT***************") - log.info("The %s has a nice _service" % pkg_path) - log.info("**************FORMAT CORRECT***************") - except Exception as e: - log.error("**************FORMAT ERROR*****************") - log.error("MAY be %s has a bad _service format" % pkg_path) - log.error("%s/_service format bad Because:%s" % (pkg_path, e)) - log.error("**************FORMAT ERROR*****************") - error_flag = "yes" - service_path = pkg_path - url_result = "" - with open(service_path, "r") as f: - log.info('\n' + f.read()) - f.seek(0, 0) - for url in f.readlines(): - if "name=\"url\"" in url.replace("\n", ""): - all_url = url.replace("\n", "").split('/') - #log.info(all_url) - spkg_name = all_url[-2].replace("<", "") - spkg_url = all_url[-3] - pkg_name = pkg_path.split('/')[-2] - pkg_url = pkg_path.split('/')[0] - log.info("Service_pkgname:%s" % spkg_name) - log.info("Service_pkgurl:%s" % spkg_url) - log.info("Pkgname:%s" % pkg_name) - log.info("Pkg_url:%s" % pkg_url) - if spkg_url == "openEuler" and pkg_url == "master": - if spkg_name == pkg_name: - log.info("Yes:The %s in _service url is correct" % pkg_name) - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED The %s in _service pkgname is not same" % pkg_name) - error_flag = "yes" - elif spkg_url == pkg_url: - if spkg_name == pkg_name: - log.info("Yes:The %s in _service url is correct" % pkg_name) - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED The %s in _service pkgname is not same" % pkg_name) - error_flag = "yes" - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED You need to check your %s _service again" % pkg_name) - error_flag = "yes" - break - os.chdir("../") - self._clean() + def _check_pro_meta(self, pkg_path): + """ + check the _meta have the maintainer or path is or not + """ + os.chdir("%s/obs_meta" % self.current_path) + error_flag1 = self._check_file_format(pkg_path) + error_flag2 = "" + error_flag3 = "" + with open(pkg_path, "r") as f: + parse = f.read() + log.info('\n' + parse) + soup = BeautifulSoup(parse, "html.parser") + maintainers = soup.find_all('person') + pro_repo = soup.find_all('path') + log.info("maintainers:%s" % maintainers) + log.info("pro_repo:%s" % pro_repo) + if not maintainers: + error_flag2 = "yes" + log.error("The _meta file does not have the maintainers section!!!!!!") + for msg in pro_repo: + log.info("msg:%s" % msg) + msglist = str(msg).split() + for key in msglist: + if "project=" in key: + project = key.split("\"")[1] + if "repository=" in key: + repository = key.split("\"")[1] + log.info(project) + log.info(repository) + exit_cmd = "osc r %s --xml 2>/dev/null | grep %s" % (project, repository) + exit_result = os.popen(exit_cmd).read() + log.info(exit_result) + if not exit_result: + error_flag3 = "yes" + log.error("Failed:The %s Not exit in %s" % (repository, project)) + else: + log.info("Success:The %s exit in %s" % (repository, project)) + error_flag = error_flag1 or error_flag2 or error_flag3 if error_flag == "yes": - raise SystemExit("*******PLEASE CHECK AGAIN*******") - if service_flag == "": - log.info("There has no pkg add to project") + self.check_error.append(pkg_path) + return error_flag - def do_all(self): + def _multi_name_check(self, change_file): """ - Assemble all inspection processes and provide external interfaces + check the multi_version format + """ + path_list = change_file.split('/') + path_list_len = len(path_list) + log.info("multi_name_check:%s" % path_list) + error_flag2 = "" + error_flag1 = "" + if path_list_len >= 3: + multi_branch = path_list[2] + multi_branch_list = multi_branch.split('_') + multi_branch_head = multi_branch_list[0] + multi_branch_mid = multi_branch_list[1] + multi_branch_tail = multi_branch_list[2] + branch_list = os.listdir(os.path.join(self.current_path, "obs_meta")) + if path_list_len == 4: + multi_project = path_list[3] + multi_pro_head = multi_project.split(':Multi-Version:')[0] + multi_pro_tail = multi_project.split(':Multi-Version:')[1] + if os.path.exists(os.path.join(self.current_path, "obs_meta", multi_branch_tail)): + project_list = os.listdir(os.path.join(self.current_path, "obs_meta", multi_branch_tail)) + else: + project_list = None + log.error("The %s is not exists!,please check your %s!!!" % (multi_branch_tail, multi_branch)) + error_flag1 = "yes" + if branch_list: + log.info("multi_branch_head:%s" % multi_branch_head) + log.info("multi_branch_tail:%s" % multi_branch_tail) + log.info("branch_list:%s" % branch_list) + if multi_branch_head == "Multi-Version" and multi_branch_tail in branch_list: + log.info("Success to check branch:%s" % multi_branch) + else: + log.error("Failed!!! you need to make sure your Mukti-branch:%s" % multi_branch) + error_flag2 = "yes" + if branch_list and project_list: + log.info("multi_pro_head:%s" % multi_pro_head) + log.info("multi_pro_tail:%s" % multi_pro_tail.replace(':', '-')) + log.info("multi_branch_mid:%s" % multi_branch_mid) + log.info("project_list:%s" % project_list) + if multi_pro_head in project_list and multi_pro_tail.replace(':', '-') == multi_branch_mid and \ + error_flag2 != "yes": + log.info("Success to check project:%s/%s" % (multi_branch, multi_project)) + else: + log.error("Failed!!! you need to make sure your Multi_branch and project:%s/%s" \ + % (multi_branch, multi_project)) + error_flag2 = "yes" + error_flag = error_flag1 or error_flag2 + return error_flag + + def _get_all_change_file(self): + """ + get all change file_path """ self._clean() self._get_latest_obs_meta() - changelist = self._get_new_pkg() - self._check_pkg_services(changelist) + changefile = self._get_change_file() + changelist = [] + for msg in changefile: + parse_git = self._parse_git_log(msg) + if parse_git: + changelist.append(parse_git) + if changelist: + return changelist + else: + log.info("Finish!!There are no file need to check") + sys.exit() + + def _check_service_meta(self, change_list): + """ + check the _service or _meta file in commit + """ + flag_list = [] + for change in change_list: + if "_service" in change: + if "multi" in change.split('/')[0] or "Multi" in change.split('/')[0] and \ + len(change.split('/')) == 5: + pro_dir = "multi_version/%s" % change.split('/')[1] + branchs = os.listdir(os.path.join(self.current_path, \ + "obs_meta", "OBS_PRJ_meta/multi_version")) + projects = os.listdir(os.path.join(self.current_path, \ + "obs_meta", "OBS_PRJ_meta", pro_dir)) + log.info("multi_branch_list:%s" % branchs) + log.info("multi_project_list:%s" % projects) + if change.split('/')[1] in branchs and change.split('/')[2] in projects: + error_flag = self._check_pkg_services(change) + else: + log.error("Please check your commit dir %s!!!" % change) + error_flag = "yes" + elif len(change.split('/')) == 4: + error_flag = self._check_pkg_services(change) + else: + log.error("Please check your commit dir %s!!!" % change) + error_flag = "yes" + flag_list.append(error_flag) + elif "OBS_PRJ_meta" in change: + error_flag1 = None + if "multi" in change or "Multi" in change: + error_flag1 = self._multi_name_check(change) + error_flag2 = self._check_pro_meta(change) + error_flag = error_flag1 or error_flag2 + flag_list.append(error_flag) + else: + log.info("There are no _service or _meta need to check") + os.chdir(self.current_path) + self._clean() + if "yes" in flag_list: + print("error_check_list:") + for error_service in self.check_error: + print(error_service) + raise SystemExit("*******PLEASE CHECK YOUR PR*******") + + def _check_branch_service(self, branch): + """ + check the file for the corresponding branch + """ + if "multi" in branch or "Multi" in branch: + branchlist = os.listdir(os.path.join(self.current_path, "obs_meta", "OBS_PRJ_meta", "multi_version")) + else: + branchlist = os.listdir(os.path.join(self.current_path, "obs_meta", "OBS_PRJ_meta")) + meta_path = os.path.join(self.current_path, "obs_meta") + os.chdir(meta_path) + if branch in branchlist: + if "multi" in branch or "Multi" in branch: + service = os.popen("find multi_version/%s | grep _service" % branch).read() + else: + service = os.popen("find %s | grep _service" % branch).read() + elif branch == "all": + service = os.popen("find | grep _service").read() + service_list = list(service.split('\n')) + service_path_list = [x.lstrip("./") for x in service_list if x and "openEuler-EPOL-LTS" not in x] + log.info(service_path_list) + self._check_service_meta(service_path_list) + + def do_all(self): + """ + make the get obs_meta change fuction and check fuction doing + """ + if self.prid: + change_result = self._get_all_change_file() + check_result = self._check_service_meta(change_result) + elif not self.prid and self.branch: + self._clean() + self._get_latest_obs_meta() + self._check_branch_service(self.branch) + else: + log.error("ERROR_INPUT:PLEASE CHECK YOU INPUT") + +if __name__ == "__main__": + kw = {} + kw['pr_id'] = "715" + kw['branch'] = "" + check = CheckMetaPull(**kw) + check.do_all()