diff --git a/common/common.py b/common/common.py index b85c22384ec0740241bc21b13275bee7af61ed94..8d6391c51f8590483a4b2c4140986269cd2e9708 100644 --- a/common/common.py +++ b/common/common.py @@ -18,7 +18,15 @@ function for all """ import os +import re import pexpect +import requests +import jenkins +from requests.auth import HTTPBasicAuth +try: + from urllib import urlencode +except ImportError: + from urllib.parse import urlencode def str_to_bool(s): @@ -117,6 +125,160 @@ class Pexpect(object): return msg +class Comment(object): + """ + gitee comments process + :param owner: 仓库属于哪个组织 + :param repo: 仓库名 + :param token: gitee 账户token + """ + + def __init__(self, owner, repo, token): + self._owner = owner + self._repo = repo + self._token = token + + + def comment_pr(self, pr, comment): + """ + 评论pull request + :param pr: 本仓库PR id + :param comment: 评论内容 + :return: 0成功,其它失败 + """ + comment_pr_url = "https://gitee.com/api/v5/repos/{}/{}/pulls/{}/comments".format(self._owner, self._repo, pr) + data = {"access_token": self._token, "body": comment} + rs = self.do_requests("post", comment_pr_url, body=data, timeout=10) + if rs == 0: + return True + else: + return False + + def parse_comment_to_table(self, pr, results, tips, details): + """ + :param pr: 仓库PR id + :param results: 门禁检查返回结果 + :return: none + """ + comment_state = {"success":":white_check_mark:", "warning":":bug:", "failed":":x:"} + comments = ["", ""] + for check_item, check_result in results.items(): + emoji_result = comment_state[check_result] + word_result = check_result.upper() + info_str = ''''''.format(check_item, emoji_result, word_result, details[check_item]) + comments.append(info_str) + comments.append("
Check Item Check Result Description
{} {}{} {}
") + comments.extend(tips) + self.comment_pr(pr, "\n".join(comments)) + + + def do_requests(self, method, url, querystring=None, body=None, auth=None, timeout=30, obj=None): + """ + http request + :param method: http method + :param url: http[s] schema + :param querystring: dict + :param body: json + :param auth: dict, basic auth with user and password + :param timeout: second + :param obj: callback object, support list/dict/object + :return: + """ + try: + if method.lower() not in ["get", "post", "put", "delete"]: + return -1 + if querystring: + url = "{}?{}".format(url, urlencode(querystring)) + func = getattr(requests, method.lower()) + if body: + if auth: + rs = func(url, json=body, timeout=timeout, auth=HTTPBasicAuth(auth["user"], auth["password"])) + else: + rs = func(url, json=body, timeout=timeout) + else: + if auth: + rs = func(url, timeout=timeout, auth=HTTPBasicAuth(auth["user"], auth["password"])) + else: + rs = func(url, timeout=timeout) + if rs.status_code not in [requests.codes.ok, requests.codes.created, requests.codes.no_content]: + return 1 + # return response + if obj is not None: + if isinstance(obj, list): + obj.extend(rs.json()) + elif isinstance(obj, dict): + obj.update(rs.json()) + elif callable(obj): + obj(rs) + elif hasattr(obj, "cb"): + getattr(obj, "cb")(rs.json()) + return 0 + except requests.exceptions.SSLError as e: + return -2 + except requests.exceptions.Timeout as e: + return 2 + except requests.exceptions.RequestException as e: + return 3 + +class JenkinsProxy(object): + """ + Jenkins 代理,实现jenkins一些操作 + """ + + def __init__(self, base_url, username, token, timeout=10): + """ + + :param base_url: + :param username: 用户名 + :param token: + :param timeout: + """ + self._username = username + self._token = token + self._timeout = timeout + self._jenkins = jenkins.Jenkins(base_url, username=username, password=token, timeout=timeout) + + def get_job_info(self, job_path): + """ + 获取任务信息 + :param job_path: job路径 + :return: None if job_path not exist + """ + try: + return self._jenkins.get_job_info(job_path) + except jenkins.JenkinsException as e: + return None + + @classmethod + def get_job_path_from_job_url(cls, job_url): + """ + 从url中解析job路径 + :param job_url: 当前工程url, for example https://domain/job/A/job/B/job/C + :return: for example, A/B/C + """ + jenkins_first_level_dir_index = 2 + jenkins_dir_interval_with_level = 2 + job_path = re.sub(r"/$", "", job_url) + job_path = re.sub(r"http[s]?://", "", job_path) + sp = job_path.split("/")[jenkins_first_level_dir_index:: + jenkins_dir_interval_with_level] + sp = [item for item in sp if item != ""] + job_path = "/".join(sp) + return job_path + + @staticmethod + def get_job_path_build_no_from_build_url(build_url): + """ + 从url中解析job路径 + :param build_url: 当前构建url, for example https://domain/job/A/job/B/job/C/number/ + :return: for example A/B/C/number + """ + job_build_no = re.sub(r"/$", "", build_url) + job_url = os.path.dirname(job_build_no) + build_no = os.path.basename(job_build_no) + job_path = JenkinsProxy.get_job_path_from_job_url(job_url) + return job_path, build_no + if __name__ == "__main__": res = git_repo_src("https://gitee.com/src-openeuler/zip", "xxxxx", "xxxxx") diff --git a/core/check_release_management.py b/core/check_release_management.py index 8cdff6273aa1564352e4b6f44ff94c91b947d20d..8c5def2cc159d7ca1e41c30aa8b0ff1655763616 100644 --- a/core/check_release_management.py +++ b/core/check_release_management.py @@ -17,6 +17,7 @@ check the software package for the corresponding project of thecorresponding branch of source """ import os +import re import sys import yaml import requests @@ -864,15 +865,16 @@ class CheckReleaseManagement(object): ''' log.info("internal move pkgs check") error_flag = False + internal_move_pkgs = {} for branch,new_msgs in new_msg.items(): if old_msg.get(branch, []): temp_new = {} temp_old = {} old_msgs = old_msg[branch] for new_pkg in new_msgs: - temp_new[new_pkg['name']] = {'obs_to':new_pkg['obs_to'],'obs_from':new_pkg['obs_from'],'source_dir':new_pkg['source_dir'],'destination_dir':new_pkg['destination_dir']} + temp_new[new_pkg['name']] = {'name':new_pkg['name'],'obs_to':new_pkg['obs_to'],'obs_from':new_pkg['obs_from'],'source_dir':new_pkg['source_dir'],'destination_dir':new_pkg['destination_dir']} for old_pkg in old_msgs: - temp_old[old_pkg['name']] = {'obs_to':old_pkg['obs_to'],'obs_from':old_pkg['obs_from'],'source_dir':old_pkg['source_dir'],'destination_dir':old_pkg['destination_dir']} + temp_old[old_pkg['name']] = {'name':old_pkg['name'],'obs_to':old_pkg['obs_to'],'obs_from':old_pkg['obs_from'],'source_dir':old_pkg['source_dir'],'destination_dir':old_pkg['destination_dir']} for pkgname,obsinfo in temp_new.items(): if temp_old.get(pkgname,''): old_obsto = temp_old[pkgname]['obs_to'] @@ -886,9 +888,55 @@ class CheckReleaseManagement(object): error_flag = True log.error("{}:{}".format(pkgname, obsinfo)) log.error("internal move pkg:{} source_dir must same with destination_dir and obs_from must same with before obs_to".format(pkgname)) + else: + if internal_move_pkgs.get(branch, []): + internal_move_pkgs[branch].append(obsinfo) + else: + internal_move_pkgs[branch] = [obsinfo] + if internal_move_pkgs: + self._check_move_pkg_depends(internal_move_pkgs) if error_flag: raise SystemExit("ERROR: Please check your PR") + def rpm_name(self, rpm): + """ + :param rpm:complete rpm name + :return:only rpm name + """ + m = re.match(r"^(.+)-.+-.+", rpm) + if m: + return m.group(1) + else: + return rpm + + def _check_move_pkg_depends(self, add_infos): + """ + check move pkgs depends pkgs + """ + for branch,pkgs in add_infos.items(): + log.info('check branch:{} pkgs depends check running...'.format(branch)) + for pkg in pkgs: + if pkg['obs_from'] and 'Multi-Version' not in branch: + project = pkg['obs_from'] + self.get_pkg_depends(project, pkg) + + def get_pkg_depends(self, project, pkg): + """ + :param project:obs project name + :param pkgs: internal move pkgs name + :return none + """ + packages = list(set(os.popen("osc list {}".format(project)).read().split("\n")) - set([])) + pkg = pkg['name'] + repo_name = project.replace(':','-') + cmd = 'dnf repoquery --repo {} --whatdepends {}'.format(repo_name, pkg) + rpm_depends = [x.strip() for x in list(set((os.popen(cmd)).read().split("\n"))) if x.strip() != ''] + log.info("depend pkgs: {}".format(rpm_depends)) + rpm_depends_name = list(map(self.rpm_name, rpm_depends)) + require_rpms = list(set(rpm_depends_name).intersection(set(packages))) + if require_rpms: + log.warning("as follow pkgs {} in project {} are require by pkg {}!!!".format(require_rpms, project, pkg)) + def _get_new_version_yaml_msg(self, yaml_path_list, manage_path,vtype='master'): ''' get new version yaml msg content