From 6bbbe3148a7914228705f3a98912870508c38b2a Mon Sep 17 00:00:00 2001 From: rabbitali Date: Sat, 6 Jul 2024 16:00:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=E6=BC=8F=E6=B4=9E=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ceres/cli/apollo.py | 15 +- .../manages/vulnerability_manage/__init__.py | 12 + .../vulnerability_manage/fix_cve_manage.py | 434 +++++++++++ .../remove_hotpatch_manage.py | 138 ++++ .../rollback_manage.py | 35 +- .../scan_cve_vulnerability.py} | 732 ++---------------- .../vulnerability_manage/set_repo_manage.py | 98 +++ .../test_vulnerability_manage/__init__.py | 12 + .../test_set_repo.py | 90 +++ .../test_vulnerability_manage.py | 0 ceres/tests/test_cli/__init__.py | 12 + ceres/tests/test_cli/test_collect.py | 12 + ceres/tests/test_cli/test_plugin.py | 13 + 13 files changed, 921 insertions(+), 682 deletions(-) create mode 100644 ceres/manages/vulnerability_manage/__init__.py create mode 100644 ceres/manages/vulnerability_manage/fix_cve_manage.py create mode 100644 ceres/manages/vulnerability_manage/remove_hotpatch_manage.py rename ceres/manages/{ => vulnerability_manage}/rollback_manage.py (93%) rename ceres/manages/{vulnerability_manage.py => vulnerability_manage/scan_cve_vulnerability.py} (40%) create mode 100644 ceres/manages/vulnerability_manage/set_repo_manage.py create mode 100644 ceres/tests/manages/test_vulnerability_manage/__init__.py create mode 100644 ceres/tests/manages/test_vulnerability_manage/test_set_repo.py rename ceres/tests/manages/{ => test_vulnerability_manage}/test_vulnerability_manage.py (100%) diff --git a/ceres/cli/apollo.py b/ceres/cli/apollo.py index d547a14..0fc2315 100644 --- a/ceres/cli/apollo.py +++ b/ceres/cli/apollo.py @@ -25,8 +25,11 @@ from ceres.function.schema import ( from ceres.function.status import StatusCode from ceres.function.util import validate_data from ceres.manages.collect_manage import Collect -from ceres.manages.rollback_manage import RollbackManage -from ceres.manages.vulnerability_manage import VulnerabilityManage +from ceres.manages.vulnerability_manage.rollback_manage import RollbackManage +from ceres.manages.vulnerability_manage.set_repo_manage import SetRepoManage +from ceres.manages.vulnerability_manage.fix_cve_manage import CveFixManage +from ceres.manages.vulnerability_manage.scan_cve_vulnerability import CveScanManage +from ceres.manages.vulnerability_manage.remove_hotpatch_manage import HotpatchRemoveManage class VulnerabilityCommand(BaseCommand): @@ -79,7 +82,7 @@ class VulnerabilityCommand(BaseCommand): if not result: sys.exit(1) - res = StatusCode.make_response_body(VulnerabilityManage().repo_set(data)) + res = StatusCode.make_response_body(SetRepoManage.set_repo(data)) print(json.dumps(res)) @staticmethod @@ -91,7 +94,7 @@ class VulnerabilityCommand(BaseCommand): if not result: sys.exit(1) kernel = data.get("kernel", True) - _, cve_scan_info = VulnerabilityManage().cve_scan(data) + _, cve_scan_info = CveScanManage().cve_scan(data) kernel_check, _ = PreCheck.kernel_consistency_check() print( json.dumps( @@ -114,7 +117,7 @@ class VulnerabilityCommand(BaseCommand): result, data = validate_data(arguments, CVE_FIX_SCHEMA) if not result: sys.exit(1) - cve_fix_result = VulnerabilityManage().cve_fix(data) + cve_fix_result = CveFixManage.cve_fix(data) print(json.dumps(cve_fix_result)) @staticmethod @@ -125,7 +128,7 @@ class VulnerabilityCommand(BaseCommand): result, data = validate_data(arguments, REMOVE_HOTPATCH_SCHEMA) if not result: sys.exit(1) - print(json.dumps(VulnerabilityManage().remove_hotpatch(data.get("cves")))) + print(json.dumps(HotpatchRemoveManage.remove_hotpatch(data.get("cves")))) @staticmethod def rollback_handle(arguments): diff --git a/ceres/manages/vulnerability_manage/__init__.py b/ceres/manages/vulnerability_manage/__init__.py new file mode 100644 index 0000000..41673ec --- /dev/null +++ b/ceres/manages/vulnerability_manage/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ diff --git a/ceres/manages/vulnerability_manage/fix_cve_manage.py b/ceres/manages/vulnerability_manage/fix_cve_manage.py new file mode 100644 index 0000000..7975082 --- /dev/null +++ b/ceres/manages/vulnerability_manage/fix_cve_manage.py @@ -0,0 +1,434 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ +import re +import os +from collections import defaultdict +from typing import Tuple, List, Set, Optional + +from ceres.conf.constant import CommandExitCode, TaskExecuteRes +from ceres.function.log import LOGGER +from ceres.function.util import execute_shell_command +from ceres.function.check import PreCheck +from ceres.function.status import SUCCESS, COMMAND_EXEC_ERROR + + +class CveFixTaskType: + HOTPATCH = "hotpatch" + COLDPATCH = "coldpatch" + + +class CveFixManage: + def cve_fix(self, task_info: dict) -> dict: + """ + fix cves by upgrading packages + + Args: + task_info(dict): cve info which need to fix and check_items,e.g. + { + "fix_type": "coldpatch", + "check_items": [], + "rpms": [ + { + "installed_rpm": "xxxxx", + "available_rpm": "unzip-6.0-50.oe2203.x86_64", + } + ], + "accepted": False, + } + + Returns: + dict: cve fix result e.g + { + "check_items":[ + { + "item":"network", + "result":true, + "log":"xxxx" + } + ], + "rpms":[ + { + "installed_rpm":"kernel-4.19xxx", + "result": "succeed", + "log": "fix succeed" + } + ], + "dnf_event_start": 1, + "dnf_event_end": 5, + "status": succeed + } + """ + result = {} + + rpms = [rpm.get("available_rpm") for rpm in task_info["rpms"]] + check_result, items_check_log = PreCheck.execute_check(task_info["check_items"]) + result["check_items"] = items_check_log + if not check_result: + LOGGER.warning("The pre-check is failed before execute command!") + result["rpms"] = [ + { + "available_rpm": rpm, + "result": TaskExecuteRes.FAIL, + "log": "pre-check items check failed", + } + for rpm in rpms + ] + result["status"] = TaskExecuteRes.FAIL + return result + + if task_info["fix_type"] == CveFixTaskType.COLDPATCH: + result["status"], result["rpms"] = self._update_coldpatch_by_dnf_plugin(rpms) + else: + # The implementation of the hotpatch upgrade and rollback plan relies on the dnf transaction, + # so the dnf transaction ID information needs to be returned after the repair is completed. + result["dnf_event_start"] = self._query_latest_dnf_transaction_id() + result["status"], result["rpms"], transaction_count = self._update_hotpatch_by_dnf_plugin( + rpms, task_info["accepted"] + ) + result["dnf_event_end"] = self._query_latest_dnf_transaction_id() + if result["dnf_event_end"] - result["dnf_event_start"] != transaction_count: + result["dnf_event_start"] = result["dnf_event_end"] = None + return result + + def _update_coldpatch_by_dnf_plugin(self, rpms: List[str]) -> Tuple[str, list]: + """ + update rpm of list and return their upgrade log + + Args: + rpms(list): List of packages that need to be upgraded + + Returns: + Tuple[str, List[dict]] + a tuple containing two elements (update result, Information about each package upgrade log). + """ + + def gen_fail_result(rpms: List[str], log: str): + return [ + { + "available_rpm": rpm, + "result": TaskExecuteRes.FAIL, + "log": log, + } + for rpm in rpms + ] + + status, fixable_cve_info = self._query_fixable_cve_info() + if status != SUCCESS: + return TaskExecuteRes.FAIL, gen_fail_result( + rpms, "Execution of CVE comparison failed due to failure to query fixable CVE information." + ) + status, fixed_cve_info = self._query_fixed_cve_info_by_hotpatch() + if status != SUCCESS: + return TaskExecuteRes.FAIL, gen_fail_result( + rpms, "Execution of CVE comparison failed due to failure to query fixed CVE information." + ) + + final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, [] + for rpm in rpms: + rpm_fix_info = {"available_rpm": rpm, "result": TaskExecuteRes.SUCCEED, "log": ""} + compare_result, log = self.compare_cve(rpm, fixable_cve_info, fixed_cve_info) + if compare_result: + rpm_fix_info["result"], rpm_fix_info["log"] = self.__update_coldpatch(rpm) + else: + rpm_fix_info["result"] = TaskExecuteRes.FAIL + rpm_fix_info["log"] = log + + if rpm_fix_info["result"] == TaskExecuteRes.FAIL: + final_fix_result = TaskExecuteRes.FAIL + + package_update_info.append(rpm_fix_info) + return final_fix_result, package_update_info + + def __update_coldpatch(self, rpm: str) -> Tuple[str, str]: + """ + upgrade rpm by dnf plugin (coldpatch) + + Args: + rpm(str): package that need to be upgraded + + Returns: + Tuple[str, str] + a tuple containing two elements (upgrade result, package upgrade log). + """ + code, stdout, stderr = execute_shell_command([f"dnf upgrade-en {rpm} -y"]) + if code != CommandExitCode.SUCCEED: + LOGGER.error(stderr) + return TaskExecuteRes.FAIL, stdout + stderr + elif rpm.rsplit("-", 2)[0] == "kernel": + if not self.set_default_grub_kernel_version(rpm): + return TaskExecuteRes.FAIL, stdout + stderr + "\nerror: set default kernel failed!" + return TaskExecuteRes.SUCCEED, stdout + stderr + + def _update_hotpatch_by_dnf_plugin(self, rpms: List[str], accepted: bool) -> Tuple[str, list, int]: + """ + upgrade rpm by dnf plugin (hotpatch) + + Args: + rpms(list): List of packages that need to be upgraded + + Returns: + Tuple[str, List[dict], int] + a tuple containing three elements (update result, Information about each package upgrade, upgrade count). + """ + upgrade_count = 0 + check_result, check_log = PreCheck.kernel_consistency_check() + if not check_result: + return ( + TaskExecuteRes.FAIL, + [ + { + "available_rpm": rpm, + "result": TaskExecuteRes.FAIL, + "log": f"kernel consistency check failed\n{check_log}", + } + for rpm in rpms + ], + upgrade_count, + ) + + final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, [] + + for rpm in rpms: + code, stdout, stderr = execute_shell_command([f"dnf hotupgrade {rpm} -y"]) + tmp = { + "available_rpm": rpm, + "result": TaskExecuteRes.SUCCEED, + "log": stdout + stderr, + } + if code != CommandExitCode.SUCCEED or "Apply hot patch succeed" not in stdout: + tmp["result"] = TaskExecuteRes.FAIL + final_fix_result = TaskExecuteRes.FAIL + elif "Nothing to do" not in stdout: + upgrade_count += 1 + + if tmp["result"] == TaskExecuteRes.SUCCEED and accepted: + try: + hotpatch_name = rpm.rsplit(".", 1)[0].split("-", 1)[1] + _, hotpatch_status_set_log = self._set_hotpatch_status_by_dnf_plugin(hotpatch_name, "accept") + tmp["log"] += f"\n\n{hotpatch_status_set_log}" + except IndexError as error: + LOGGER.error(error) + tmp["log"] += f"\n\nhotpatch status set failed due to can't get correct hotpatch name!" + package_update_info.append(tmp) + return final_fix_result, package_update_info, upgrade_count + + @staticmethod + def _query_fixable_cve_info() -> Tuple[str, dict]: + """ + Query the CVEs fixed by the upgradeable version of each package + + Returns: + Tuple[status, dict] + a tuple containing two elements (status code, fixed_cve_info). + + Example: + "Succeed", {"kernel": { + "kernel-5.10.0-60.91.0.115.oe2203.x86_64": ["CVE-2023-1829"], + "kernel-5.10.0-60.91.0.116.oe2203.x86_64": ["CVE-2023-2006"] + }} + """ + code, stdout, stderr = execute_shell_command(["dnf updateinfo list cves"]) + if code != CommandExitCode.SUCCEED: + LOGGER.error("Failed to query update info by dnf!") + LOGGER.error(stderr) + return COMMAND_EXEC_ERROR, defaultdict() + + all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)", stdout) + rpm_update_info = defaultdict(lambda: defaultdict(list)) + for cve_id, _, coldpatch in all_cve_info: + rpm_name = coldpatch.rsplit("-", 2)[0] + rpm_update_info[rpm_name][coldpatch].append(cve_id) + + return SUCCESS, rpm_update_info + + @staticmethod + def _query_fixed_cve_info_by_hotpatch() -> Tuple[str, dict]: + """ + Statistics CVE data that will be fixed by hotpatch + + Returns: + Tuple[status, dict] + a tuple containing two elements (status code, fixed_cve_info). + + Example: + "Succeed", {"kernel": {"CVE-2023-XXXX","CVE-2022-XXXX"}} + """ + code, stdout, stderr = execute_shell_command(["dnf hot-updateinfo list cves --installed"]) + if code != CommandExitCode.SUCCEED: + LOGGER.error("Failed to query fixed cves by hotpatch!") + LOGGER.error(stderr) + return COMMAND_EXEC_ERROR, set() + + hotpatch_fixed_info = defaultdict(set) + all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout) + for cve_id, _, _, hotpatch in all_cve_info: + rpm_name = hotpatch.rsplit("-", 5)[0][6:] + hotpatch_fixed_info[rpm_name].add(cve_id) + + return SUCCESS, hotpatch_fixed_info + + def compare_cve(self, rpm: str, updated_info: dict, hotpatch_fixed_info: dict) -> Tuple[bool, str]: + """ + Determine whether the packages to be upgraded covers the vulnerabilities fixed by the hotpatch + + Args: + rpms(list): List of packages that need to be upgraded + + Returns: + Tuple[bool, str] + a tuple containing two elements (compare result, compare log). + """ + compare_info = dict() + upgraded_packages: set = self._query_upgraded_packages(rpm) + if not upgraded_packages: + return False, "Execution of CVE comparison failed due to failure to query upgraded_packages." + for rpm in upgraded_packages: + fixed_cve_by_coldpatch = set() + rpm_name = rpm.rsplit("-", 2)[0] + + for update_rpm, cve_list in updated_info.get(rpm_name, {}).items(): + if rpm >= update_rpm: + fixed_cve_by_coldpatch.update(cve_list) + cve_difference_set = hotpatch_fixed_info.get(rpm_name, set()) - fixed_cve_by_coldpatch + if cve_difference_set: + compare_info[rpm_name] = cve_difference_set + + if not compare_info.values(): + return True, "" + + log = ( + "After upgrading the package, vulnerabilities in the package or in its dependent software package " + "may be re-exposed. \nHere are some specific vulnerabilities that could potentially re-exposed:\n" + ) + for rpm_name, cve_info in compare_info.items(): + for cve_id in cve_info: + log += f"{rpm_name}\t{cve_id}\n" + return False, log + + @staticmethod + def _query_upgraded_packages(package: str) -> Set[str]: + """ + Resolve all packages to be upgraded and their dependencies, store them in a set and + return it + + Args: + packages(list): List of package that need to be upgraded + + Returns: + set + + """ + package_set = set() + if package.rsplit("-", 2)[0] == "kernel": + package_set.add(package) + return package_set + + # The exit code of the command is 1 when input parameters contains assumeno + _, stdout, _ = execute_shell_command([f"dnf upgrade-en {package} --assumeno"]) + + installed_rpm_info = re.findall(r"(Upgrading|Installing):(.*?)Transaction Summary", stdout, re.S) + if not installed_rpm_info: + return package_set + + installed_rpm_info_list = installed_rpm_info[0][1].strip().split("\n") + for single_rpm_info in installed_rpm_info_list: + # info_list example: + # ['aops-ceres', 'aarch64', 'v1.3.4-5.oe2203sp2', '@commandline', '107 k] + pkg_info_list = re.split(r'\s+', single_rpm_info.strip()) + if len(pkg_info_list) < 5: + break + package_set.add(f"{pkg_info_list[0]}-{pkg_info_list[2]}.{pkg_info_list[1]}") + return package_set + + @staticmethod + def set_default_grub_kernel_version(kernel_rpm_name: str) -> bool: + """ + Set the boot kernel + + Args: + kernel_rpm_name(str): The name of the installed kernel package + + Returns: + bool + """ + boot_kernel_path = os.path.join("/boot/", f"vmlinuz-{kernel_rpm_name[7:]}") + if not os.path.exists(boot_kernel_path): + LOGGER.error("Can't find target kernel in /boot when set default kernel") + return False + + LOGGER.info("The Linux boot kernel is about to be changed") + code, _, stderr = execute_shell_command([f"grubby --set-default={boot_kernel_path}"]) + + if code != CommandExitCode.SUCCEED: + LOGGER.info("The Linux boot kernel change failed") + LOGGER.error(stderr) + return False + LOGGER.info("The Linux boot kernel change successful") + return True + + @staticmethod + def _query_latest_dnf_transaction_id() -> Optional[int]: + """Query latest yum transaction id + + Returns: + int + """ + # Example of command execution result: + # [root@localhost ~]# dnf history + # ID | Command line | Date and time | Action(s) | Altered + # --------------------------------------------------------------------- + # 3 | rm aops-ceres | 2023-11-30 09:57 | Removed | 1 + # 2 | install gcc | 2023-11-30 09:57 | Install | 1 + code, stdout, stderr = execute_shell_command( + ["dnf history", "grep -E '^\s*[0-9]+'", "head -1", "awk '{print $1}'"] + ) + if code != CommandExitCode.SUCCEED: + LOGGER.error(stderr) + return None + + return int(stdout) + + @staticmethod + def _set_hotpatch_status_by_dnf_plugin(hotpatch: str, operation: str) -> Tuple[bool, str]: + """ + change hotpatch status by dnf plugin + + Args: + hotpatch(str): hotpatch name which you want to change its status + operation(str): the action that needs to be performed on this hot patch. + supported actions: apply,deactive,remove,active,accept + Returns: + Tuple[bool, str] + a tuple containing two elements (operation result, operation log). + """ + + # replace -ACC to /ACC or -SGL to /SGL + # Example: kernel-5.10.0-153.12.0.92.oe2203sp2-ACC-1-1 >> kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 + wait_to_remove_patch = re.sub(r'-(ACC|SGL)', r'/\1', hotpatch) + # Example of command execution result: + # Succeed: + # [root@openEuler ~]# dnf hotpatch --remove kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 + # Last metadata expiration check: 3:24:16 ago on Wed 13 Sep 2023 08:16:17 AM CST. + # Gonna remove this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 + # remove hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' succeed + # Fail: + # [root@openEuler ~]# dnf hotpatch --accept kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 + # Last metadata expiration check: 3:25:24 ago on Wed 13 Sep 2023 08:16:17 AM CST. + # Gonna accept this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 + # accept hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' failed, remain original status + code, stdout, stderr = execute_shell_command([f"dnf hotpatch --{operation} {wait_to_remove_patch}"]) + if code != CommandExitCode.SUCCEED or 'failed' in stdout: + LOGGER.error(f"hotpatch {hotpatch} set status failed!") + return False, stdout + stderr + + return True, stdout + stderr diff --git a/ceres/manages/vulnerability_manage/remove_hotpatch_manage.py b/ceres/manages/vulnerability_manage/remove_hotpatch_manage.py new file mode 100644 index 0000000..810e3a1 --- /dev/null +++ b/ceres/manages/vulnerability_manage/remove_hotpatch_manage.py @@ -0,0 +1,138 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ +import re +from collections import defaultdict +from typing import List, Tuple + +from ceres.conf.constant import CommandExitCode, TaskExecuteRes +from ceres.function.log import LOGGER +from ceres.function.util import execute_shell_command + + +class HotpatchRemoveManage: + def remove_hotpatch(self, cves: List[str]) -> dict: + """ + remove hotpatch + + Args: + cves(list): List of CVE IDs fixed by hotpatch,e.g. + ["CVE-XXXX-XXXX"] + + Returns: + dict e.g + { + "status": "fail/succeed", + "cves": [{ + "cve_id": cve, + "result": "succeed", + "log": "rollback succeed" + }] + } + """ + hotpatch_list = self._hotpatch_list_cve() + if not hotpatch_list: + return { + "status": TaskExecuteRes.FAIL, + "cves": [ + dict(cve_id=cve, log="No valid hotpatch is matched.", result=TaskExecuteRes.FAIL) for cve in cves + ], + } + + wait_to_remove_patch = set() + for cve in cves: + wait_to_remove_patch = wait_to_remove_patch.union(hotpatch_list.get(cve, set())) + + hotpatch_remove_res = {} + for patch in set(wait_to_remove_patch): + remove_result, log = self._hotpatch_remove(patch) + hotpatch_remove_res[patch] = { + "result": TaskExecuteRes.SUCCEED if remove_result else TaskExecuteRes.FAIL, + "log": log, + } + + cve_hotpatch_remove_result = [] + + for cve in cves: + if cve not in hotpatch_list: + fail_result = { + "cve_id": cve, + "log": "No valid hot patch is matched.", + "result": TaskExecuteRes.FAIL, + } + cve_hotpatch_remove_result.append(fail_result) + else: + tmp_result_list = [] + tmp_log = [] + + for patch in hotpatch_list.get(cve): + tmp_result_list.append(hotpatch_remove_res[patch]["result"] == TaskExecuteRes.SUCCEED) + tmp_log.append(hotpatch_remove_res[patch]["log"]) + + cve_hotpatch_remove_result.append( + { + "cve_id": cve, + "log": "\n".join(tmp_log), + "result": TaskExecuteRes.SUCCEED if all(tmp_result_list) else TaskExecuteRes.FAIL, + } + ) + + return {"status": TaskExecuteRes.SUCCEED, "cves": cve_hotpatch_remove_result} + + @staticmethod + def _hotpatch_list_cve() -> dict: + """ + Run the dnf hotpatch list cve command to query the hotpatch list corresponding to the cve + + Returns: + dict: e.g + { + "CVE-XXXX-XXX": {"patch 1", "patch 2"} + } + """ + code, stdout, _ = execute_shell_command([f"dnf hot-updateinfo list cves --installed", "grep patch"]) + if code != CommandExitCode.SUCCEED: + LOGGER.error(f"Failed to query the hotpatch list.") + return None + + all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout) + if not all_cve_info: + LOGGER.error(f"Failed to query the hotpatch list.") + return None + + applied_hotpatch_info = {} + hotpatch_dic = {} + for cve_id, _, _, hotpatch in all_cve_info: + applied_hotpatch_info[cve_id] = hotpatch + hotpatch_dic_key = hotpatch.rsplit("-", 2)[0] + if hotpatch_dic_key.endswith("ACC"): + hotpatch_dic[hotpatch_dic_key] = max(hotpatch, hotpatch_dic.get(hotpatch_dic_key, hotpatch)) + + for cve_id, cmd_output_hotpatch in applied_hotpatch_info.items(): + applied_hotpatch_info[cve_id] = hotpatch_dic.get(cmd_output_hotpatch.rsplit("-", 2)[0], cmd_output_hotpatch) + + hotpatch_list = defaultdict(set) + for cve_id, hotpatch in applied_hotpatch_info.items(): + hotpatch_list[cve_id].add(hotpatch) + + return hotpatch_list + + def _hotpatch_remove(self, hotpatch: str) -> Tuple[bool, str]: + """ + remove hotpatch package + + Args: + hotpatch: hotpatch package which needs to remove + """ + cmd = [f"dnf remove {hotpatch} -y"] + _, stdout, stderr = execute_shell_command(cmd) + return True, f"Command:{cmd}\n\n{stdout}\n{stderr}\n" diff --git a/ceres/manages/rollback_manage.py b/ceres/manages/vulnerability_manage/rollback_manage.py similarity index 93% rename from ceres/manages/rollback_manage.py rename to ceres/manages/vulnerability_manage/rollback_manage.py index 090c0bf..2869f95 100644 --- a/ceres/manages/rollback_manage.py +++ b/ceres/manages/vulnerability_manage/rollback_manage.py @@ -18,7 +18,6 @@ from ceres.conf.constant import CommandExitCode, CveFixTaskType, TaskExecuteRes from ceres.function.check import PreCheck from ceres.function.log import LOGGER from ceres.function.util import execute_shell_command -from ceres.manages.vulnerability_manage import VulnerabilityManage class RollbackManage: @@ -28,6 +27,28 @@ class RollbackManage: BOOT_FILE = "/boot/vmlinuz-%s" + @staticmethod + def _query_latest_dnf_transaction_id() -> Optional[int]: + """Query latest yum transaction id + + Returns: + int + """ + # Example of command execution result: + # [root@localhost ~]# dnf history + # ID | Command line | Date and time | Action(s) | Altered + # --------------------------------------------------------------------- + # 3 | rm aops-ceres | 2023-11-30 09:57 | Removed | 1 + # 2 | install gcc | 2023-11-30 09:57 | Install | 1 + code, stdout, stderr = execute_shell_command( + ["dnf history", "grep -E '^\s*[0-9]+'", "head -1", "awk '{print $1}'"] + ) + if code != CommandExitCode.SUCCEED: + LOGGER.error(stderr) + return None + + return int(stdout) + def rollback(self, rollback_info: dict) -> dict: """ Rollback for hotpatch/coldpatch transaction. @@ -110,7 +131,7 @@ class RollbackManage: return self._rollback_for_hotpatch(dnf_event_start) elif rollback_type == CveFixTaskType.COLDPATCH: - check_result, check_log = self._check_if_rpm_str_vaild(installed_rpm, target_rpm) + check_result, check_log = self._check_if_rpm_str_valid(installed_rpm, target_rpm) if check_result != TaskExecuteRes.SUCCEED: return TaskExecuteRes.FAIL, check_log return self._rollback_for_coldpatch(installed_rpm, target_rpm) @@ -121,7 +142,7 @@ class RollbackManage: self, dnf_event_start: Optional[int] = None, dnf_event_end: Optional[int] = None ) -> Tuple[str, str]: """ - Check if the dnf_event_start id and the dnf_event_end id are vaild. + Check if the dnf_event_start id and the dnf_event_end id are valid. Args: dnf_event_start(int): dnf event start transaction-id @@ -138,8 +159,8 @@ class RollbackManage: LOGGER.error(tmp_log) return TaskExecuteRes.FAIL, tmp_log - if dnf_event_end != VulnerabilityManage._query_latest_dnf_transaction_id(): - tmp_log = f"Not the last executed dnf trasnaction, failed to process rollback operation." + if dnf_event_end != self._query_latest_dnf_transaction_id(): + tmp_log = f"Not the last executed dnf transaction, failed to process rollback operation." LOGGER.error(tmp_log) return TaskExecuteRes.FAIL, tmp_log @@ -173,9 +194,9 @@ class RollbackManage: return TaskExecuteRes.SUCCEED, f"Command:{cmd}{os.linesep}{stdout}{os.linesep}" - def _check_if_rpm_str_vaild(self, installed_rpm: str, target_rpm: str) -> Tuple[str, str]: + def _check_if_rpm_str_valid(self, installed_rpm: str, target_rpm: str) -> Tuple[str, str]: """ - Check if the rpm str is vaild. + Check if the rpm str is valid. Args: installed_rpm(str): the installed kernel in executed fix task diff --git a/ceres/manages/vulnerability_manage.py b/ceres/manages/vulnerability_manage/scan_cve_vulnerability.py similarity index 40% rename from ceres/manages/vulnerability_manage.py rename to ceres/manages/vulnerability_manage/scan_cve_vulnerability.py index a1cde50..81ef0e3 100644 --- a/ceres/manages/vulnerability_manage.py +++ b/ceres/manages/vulnerability_manage/scan_cve_vulnerability.py @@ -10,94 +10,25 @@ # PURPOSE. # See the Mulan PSL v2 for more details. # ******************************************************************************/ -import os import re from collections import defaultdict -from typing import Dict, List, Tuple, Optional, Set +from typing import Tuple, Dict -from ceres.conf.constant import CommandExitCode, TaskExecuteRes, CveFixTaskType -from ceres.function.check import PreCheck +from ceres.conf.constant import CommandExitCode from ceres.function.log import LOGGER -from ceres.function.status import * from ceres.function.util import execute_shell_command +from ceres.function.check import PreCheck +from ceres.function.status import PRE_CHECK_ERROR, SUCCESS from ceres.manages.collect_manage import Collect +__all__ = ["CveScanManage"] -class VulnerabilityManage: - def repo_set(self, data: dict) -> int: - """ - Save the repo source to local, and do simple verification. - - Args: - data (dict): e.g - { - "repo_info": { - "name": "string", - "dest": "save location", - "repo_content": "repo source info" - }, - "check_items": ["string"], - "check": false - } - - Returns: - int: status code - """ - repo_path = data.get("repo_info").get("dest") - if re.match(r"/etc/yum.repos.d/[\w-]+.repo$", repo_path) is None: - LOGGER.debug('Incorrect repo save path.') - return PARAM_ERROR - - content = data.get("repo_info").get("repo_content") - repo_id_list = re.findall(r'\[([^\]]+)\]', re.sub(r'^\s*#.*$', '', content, flags=re.MULTILINE)) - if not repo_id_list: - LOGGER.warning("Failed to extract repo id information, please check the repo content.") - return REPO_CONTENT_INCORRECT - - with open(repo_path, 'w', encoding='utf8') as repo_file: - repo_file.write(content) - LOGGER.info(f'Repo source {data.get("repo_info").get("name")} has been saved to {repo_path}.') - - if self._validate_repo_source(repo_id_list): - LOGGER.info('Repo source set succeed.') - return SUCCESS - os.remove(repo_path) - LOGGER.warning("Repo source can't be used, it has been deleted.") - return REPO_CONTENT_INCORRECT - - @staticmethod - def _validate_repo_source(repo_id_list: List[str]) -> bool: - """ - A sample validate which repo can used by yum. - - Args: - repo_id(list): repo id list - - Returns: - bool - """ - - def query_repo_info(repo_id: str) -> bool: - """ - Verify the validity of the repo source by querying the repo source information. - - Args: - repo_id(str) - Returns: - bool - """ - code, _, stderr = execute_shell_command([f"dnf repoinfo --repo {repo_id}"]) - if code == CommandExitCode.SUCCEED: - return True - LOGGER.warning(f"Failed to query repo information with repo id {repo_id}.") - LOGGER.warning(stderr) - return False - - validate_result = True - for repo_id in repo_id_list: - validate_result = validate_result and query_repo_info(repo_id) - return validate_result +class CveScanManage: + def __init__(self) -> None: + self.kernel_filter = None + self.installed_rpm_info = None + self.available_hotpatch_key_set = set() def cve_scan(self, cve_scan_args: dict) -> Tuple[str, dict]: """ @@ -142,34 +73,29 @@ class VulnerabilityManage: } """ cve_scan_result = {} - check_result, items_check_log = PreCheck.execute_check(cve_scan_args.get("check_items")) - kernel = cve_scan_args.get("kernel", True) cve_scan_result["check_items"] = items_check_log if not check_result: LOGGER.info("The pre-check is failed before execute command!") return PRE_CHECK_ERROR, cve_scan_result - self.installed_rpm_info = self._query_installed_rpm(kernel) + self.installed_rpm_info = self._query_installed_rpm() self.available_hotpatch_key_set = set() + self.kernel_filter = cve_scan_args.get("kernel") cve_scan_result.update( { "check_items": items_check_log, - "unfixed_cves": self._query_unfixed_cves_by_dnf_plugin(kernel) or self._query_unfixed_cves_by_dnf(kernel), - "fixed_cves": self._query_fixed_cves_by_dnf_plugin(kernel) or self._query_fixed_cves_by_dnf(kernel), + "unfixed_cves": self._query_unfixed_cves_by_dnf_plugin(), + "fixed_cves": self._query_fixed_cves_by_dnf_plugin(), } ) return SUCCESS, cve_scan_result - @staticmethod - def _query_installed_rpm(kernel=True): + def _query_installed_rpm(self): """ query installed rpm package info - Args: - kernel(bool): only for kernel package filtering - Returns: dict: all rpm info. e.g { @@ -184,10 +110,11 @@ class VulnerabilityManage: # perl-Net-SSLeay:perl-Net-SSLeay-1.88-5.oe1.x86_64 # powertop:powertop-2.9-12.oe1.x86_64 # libusbx:libusbx-1.0.23-1.oe1.x86_64 - command = ["rpm -qa --queryformat '%{NAME}:%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}\n'"] - if kernel: - command.append("grep kernel") - code, stdout, _ = execute_shell_command(command) + commands = ["rpm -qa --queryformat '%{NAME}:%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}\n'"] + if self.kernel_filter: + commands.append("grep kernel") + + code, stdout, _ = execute_shell_command(commands) if code != CommandExitCode.SUCCEED or not stdout: LOGGER.error("query installed packages info failed!") return rpm_info_dict @@ -203,13 +130,10 @@ class VulnerabilityManage: LOGGER.debug("query installed rpm package info succeed!") return rpm_info_dict - def _query_unfixed_cves_by_dnf(self, kernel=True) -> list: + def _query_unfixed_cves_by_dnf(self) -> list: """ parse unfixed kernel vulnerability info by dnf (coldpatch) - Args: - kernel(bool): only for kernel package filtering - Return: str: command execute result list: cve info e.g @@ -228,10 +152,11 @@ class VulnerabilityManage: # CVE-2021-45469 Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64 # CVE-2021-44733 Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64 unfixed_cves = [] - command = ["dnf updateinfo list cves"] - if kernel: - command.append("grep kernel") - code, stdout, stderr = execute_shell_command(command) + commands = ["dnf updateinfo list cves"] + if self.kernel_filter: + commands.append("grep kernel") + code, stdout, stderr = execute_shell_command(commands) + if code != CommandExitCode.SUCCEED: LOGGER.error("query unfixed cve info failed by dnf!") LOGGER.error(stderr) @@ -242,10 +167,9 @@ class VulnerabilityManage: # ("CVE-2021-43976", "Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64"), # ("CVE-2021-0941", "Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64") # ] - if kernel: + pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)" + if self.kernel_filter: pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+)" - else: - pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)" all_cve_info = re.findall(pattern, stdout) if not all_cve_info: return unfixed_cves @@ -262,13 +186,10 @@ class VulnerabilityManage: ) return unfixed_cves - def _query_unfixed_cves_by_dnf_plugin(self, kernel=True) -> list: + def _query_unfixed_cves_by_dnf_plugin(self) -> list: """ parse unfixed kernel vulnerability info by dnf hotpatch plugin (hotpatch and coldpatch) - Args: - kernel(bool): only for kernel package filtering - Return: str: command execute result list: cve info e.g @@ -303,10 +224,10 @@ class VulnerabilityManage: # CVE-2021-42574 Important/Sec. binutils-2.34-19.oe1.x86_64 - # CVE-2023-1513 Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64 patch-kernel-4.19.90-2112... cve_info_list = [] - command = ["dnf hot-updateinfo list cves"] - if kernel: - command.append("grep kernel") - code, stdout, stderr = execute_shell_command(command) + commands = ["dnf hot-updateinfo list cves"] + if self.kernel_filter: + commands.append("grep kernel") + code, stdout, stderr = execute_shell_command(commands) if code != CommandExitCode.SUCCEED: LOGGER.error("query unfixed cve info failed by dnf!") LOGGER.error(stderr) @@ -317,10 +238,9 @@ class VulnerabilityManage: # ("CVE-2023-1513", "Important/Sec.", "kernel-4.19.90-2304.1.0.0196.oe1.x86_64", "patch-kernel-4.19.90-2112.."), # ("CVE-2021-xxxx", "Important/Sec.", "-", "patch-redis-6.2.5-1-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64") # ] - if kernel: + pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(\S+|-)" + if self.kernel_filter: pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+|-)\s+(patch-kernel-\d\S+|-)" - else: - pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch-\d\S+|-)" all_cve_info = re.findall(pattern, stdout) if not all_cve_info: return cve_info_list @@ -353,13 +273,10 @@ class VulnerabilityManage: return cve_info_list - def _query_fixed_cves_by_dnf(self, kernel=True) -> list: + def _query_fixed_cves_by_dnf(self) -> list: """ parse the fixed kernel vulnerability info by dnf - Args: - kernel(bool): only for kernel package filtering - Return: str: command execute result list: cve info e.g @@ -379,10 +296,12 @@ class VulnerabilityManage: if not current_kernel_version: return fixed_cves current_kernel_rpm_name = f"kernel-{current_kernel_version}" - command = ["dnf updateinfo list cves --installed"] - if kernel: - command.append("grep kernel") - code, stdout, stderr = execute_shell_command(command) + + commands = ["dnf updateinfo list cves --installed"] + if self.kernel_filter: + commands.append("grep kernel") + code, stdout, stderr = execute_shell_command(commands) + if code != CommandExitCode.SUCCEED: LOGGER.error("query fixed cve info failed!") LOGGER.error(stderr) @@ -393,10 +312,10 @@ class VulnerabilityManage: # ("CVE-2021-43976","Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64"), # ("CVE-2021-0941","Important/Sec.", "kernel-4.19.90-2201.1.0.0132.oe1.x86_64") # ] - if kernel: - pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+)" - else: + pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+)" + if self.kernel_filter: pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)" + fixed_cves_info = re.findall(pattern, stdout) if not fixed_cves_info: @@ -404,23 +323,14 @@ class VulnerabilityManage: for cve_id, _, coldpatch in fixed_cves_info: install_rpm = self.installed_rpm_info.get(coldpatch.rsplit("-", 2)[0]) - if (kernel and coldpatch <= current_kernel_rpm_name) or coldpatch.rsplit(".", 2)[0] <= install_rpm.rsplit(".", 2)[0]: - fixed_cves.append( - { - "cve_id": cve_id, - "installed_rpm": install_rpm, - "fix_way": "coldpatch", - } - ) + if coldpatch <= current_kernel_rpm_name or coldpatch.rsplit(".", 2)[0] <= install_rpm.rsplit(".", 2)[0]: + fixed_cves.append({"cve_id": cve_id, "installed_rpm": install_rpm, "fix_way": "coldpatch"}) return fixed_cves - def _query_fixed_cves_by_dnf_plugin(self, kernel=True) -> list: + def _query_fixed_cves_by_dnf_plugin(self) -> list: """ parse the fixed kernel vulnerability info by dnf plugin - Args: - kernel(bool): only for kernel package filtering - Return: list: hotpatch info list. e.g [{"cve_id": "CVE-XXXX-XXXX", "fix_way": "hotpatch", "hp_status": "ACCEPTED", "installed_rpm":"xxxx"}] @@ -435,10 +345,11 @@ class VulnerabilityManage: if not current_kernel_version: return [] current_kernel_rpm_name = f"kernel-{current_kernel_version}" - command = ["dnf hot-updateinfo list cves --installed"] - if kernel: - command.append("grep kernel") - code, stdout, stderr = execute_shell_command(command) + + commands = ["dnf hot-updateinfo list cves --installed"] + if self.kernel_filter: + commands.append("grep kernel") + code, stdout, stderr = execute_shell_command(commands) if code != CommandExitCode.SUCCEED: LOGGER.error("query unfixed cve info failed by dnf!") LOGGER.error(stderr) @@ -450,17 +361,18 @@ class VulnerabilityManage: # ("CVE-2021-xxxx", "Important/Sec.", "-", "patch-redis-6.2.5-1-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64") # ] hotpatch_status = self._query_applied_hotpatch_status() - if kernel: + pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch-\S+|-)" + if self.kernel_filter: pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(kernel-\d\S+|-)\s+(patch-kernel-\d\S+|-)" - else: - pattern = r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch-\d\S+|-)" all_cve_info = re.findall(pattern, stdout) cve_info_fixed_by_coldpatch, cve_info_fixed_by_hotpatch, hotpatch_dic = [], [], defaultdict(str) for cve_id, _, coldpatch, hotpatch in all_cve_info: if hotpatch == "-": installed_rpm = self.installed_rpm_info.get(coldpatch.rsplit("-", 2)[0]) - if (kernel and coldpatch > current_kernel_rpm_name) or coldpatch.rsplit(".", 2)[0] > installed_rpm.rsplit(".", 2)[0]: + if (self.kernel_filter and coldpatch > current_kernel_rpm_name) or ( + coldpatch.rsplit(".", 2)[0] > installed_rpm.rsplit(".", 2)[0] + ): continue cve_info_fixed_by_coldpatch.append( { @@ -522,7 +434,11 @@ class VulnerabilityManage: # ("CVE-2023-1112", "redis-6.2.5-1/SGL_CVE_2023_1111_CVE_2023_1112-1-1/redis-server", "NOT-APPLIED"), # ("CVE-2023-1111", "redis-6.2.5-1/ACC-1-1/redis-benchmark", "ACTIVED") # ] - applied_hotpatch_info_list = re.findall(r"(CVE-\d{4}-\d+)\s+(kernel-\d[\w\-/.]+)\s+([A-W]+)", stdout) + pattern = r"(CVE-\d{4}-\d+)\s+([\w\-/.]+)\s+([A-W]+)" + if self.kernel_filter: + pattern = r"(CVE-\d{4}-\d+)\s+(kernel-\d[\w\-/.]+)\s+([A-W]+)" + + applied_hotpatch_info_list = re.findall(pattern, stdout) if not applied_hotpatch_info_list: return result @@ -544,525 +460,3 @@ class VulnerabilityManage: result[f"patch-{patch_name.rsplit('/',1)[0].replace('/','-')}"] = hotpatch_status record_key_set.add(record_key) return result - - def cve_fix(self, task_info: dict) -> dict: - """ - fix cves by upgrading packages - - Args: - task_info(dict): cve info which need to fix and check_items,e.g. - { - "fix_type": "coldpatch", - "check_items": [], - "rpms": [ - { - "installed_rpm": "xxxxx", - "available_rpm": "unzip-6.0-50.oe2203.x86_64", - } - ], - "accepted": False, - } - - Returns: - dict: cve fix result e.g - { - "check_items":[ - { - "item":"network", - "result":true, - "log":"xxxx" - } - ], - "rpms":[ - { - "installed_rpm":"kernel-4.19xxx", - "result": "succeed", - "log": "fix succeed" - } - ], - "dnf_event_start": 1, - "dnf_event_end": 5, - "status": succeed - } - """ - result = {} - - rpms = [rpm.get("available_rpm") for rpm in task_info["rpms"]] - check_result, items_check_log = PreCheck.execute_check(task_info["check_items"]) - result["check_items"] = items_check_log - if not check_result: - LOGGER.warning("The pre-check is failed before execute command!") - result["rpms"] = [ - { - "available_rpm": rpm, - "result": TaskExecuteRes.FAIL, - "log": "pre-check items check failed", - } - for rpm in rpms - ] - result["status"] = TaskExecuteRes.FAIL - return result - - if task_info["fix_type"] == CveFixTaskType.COLDPATCH: - result["status"], result["rpms"] = self._update_coldpatch_by_dnf_plugin(rpms) - else: - # The implementation of the hotpatch upgrade and rollback plan relies on the dnf transaction, - # so the dnf transaction ID information needs to be returned after the repair is completed. - result["dnf_event_start"] = self._query_latest_dnf_transaction_id() - result["status"], result["rpms"], transaction_count = self._update_hotpatch_by_dnf_plugin( - rpms, task_info["accepted"] - ) - result["dnf_event_end"] = self._query_latest_dnf_transaction_id() - if result["dnf_event_end"] - result["dnf_event_start"] != transaction_count: - result["dnf_event_start"] = result["dnf_event_end"] = None - return result - - def _update_coldpatch_by_dnf_plugin(self, rpms: List[str]) -> Tuple[str, list]: - """ - update rpm of list and return their upgrade log - - Args: - rpms(list): List of packages that need to be upgraded - - Returns: - Tuple[str, List[dict]] - a tuple containing two elements (update result, Information about each package upgrade log). - """ - - def gen_fail_result(rpms: List[str], log: str): - return [ - { - "available_rpm": rpm, - "result": TaskExecuteRes.FAIL, - "log": log, - } - for rpm in rpms - ] - - status, fixable_cve_info = self._query_fixable_cve_info() - if status != SUCCESS: - return TaskExecuteRes.FAIL, gen_fail_result( - rpms, "Execution of CVE comparison failed due to failure to query fixable CVE information." - ) - status, fixed_cve_info = self._query_fixed_cve_info_by_hotpatch() - if status != SUCCESS: - return TaskExecuteRes.FAIL, gen_fail_result( - rpms, "Execution of CVE comparison failed due to failure to query fixed CVE information." - ) - - final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, [] - for rpm in rpms: - rpm_fix_info = {"available_rpm": rpm, "result": TaskExecuteRes.SUCCEED, "log": ""} - compare_result, log = self.compare_cve(rpm, fixable_cve_info, fixed_cve_info) - if compare_result: - rpm_fix_info["result"], rpm_fix_info["log"] = self.__update_coldpatch(rpm) - else: - rpm_fix_info["result"] = TaskExecuteRes.FAIL - rpm_fix_info["log"] = log - - if rpm_fix_info["result"] == TaskExecuteRes.FAIL: - final_fix_result = TaskExecuteRes.FAIL - - package_update_info.append(rpm_fix_info) - return final_fix_result, package_update_info - - def __update_coldpatch(self, rpm: str) -> Tuple[str, str]: - """ - upgrade rpm by dnf plugin (coldpatch) - - Args: - rpm(str): package that need to be upgraded - - Returns: - Tuple[str, str] - a tuple containing two elements (upgrade result, package upgrade log). - """ - code, stdout, stderr = execute_shell_command([f"dnf upgrade-en {rpm} -y"]) - if code != CommandExitCode.SUCCEED: - LOGGER.error(stderr) - return TaskExecuteRes.FAIL, stdout + stderr - elif rpm.rsplit("-", 2)[0] == "kernel": - if not self.set_default_grub_kernel_version(rpm): - return TaskExecuteRes.FAIL, stdout + stderr + "\nerror: set default kernel failed!" - return TaskExecuteRes.SUCCEED, stdout + stderr - - def _update_hotpatch_by_dnf_plugin(self, rpms: List[str], accepted: bool) -> Tuple[str, list, int]: - """ - upgrade rpm by dnf plugin (hotpatch) - - Args: - rpms(list): List of packages that need to be upgraded - - Returns: - Tuple[str, List[dict], int] - a tuple containing three elements (update result, Information about each package upgrade, upgrade count). - """ - upgrade_count = 0 - check_result, check_log = PreCheck.kernel_consistency_check() - if not check_result: - return ( - TaskExecuteRes.FAIL, - [ - { - "available_rpm": rpm, - "result": TaskExecuteRes.FAIL, - "log": f"kernel consistency check failed\n{check_log}", - } - for rpm in rpms - ], - upgrade_count, - ) - - final_fix_result, package_update_info = TaskExecuteRes.SUCCEED, [] - - for rpm in rpms: - code, stdout, stderr = execute_shell_command([f"dnf hotupgrade {rpm} -y"]) - tmp = { - "available_rpm": rpm, - "result": TaskExecuteRes.SUCCEED, - "log": stdout + stderr, - } - if code != CommandExitCode.SUCCEED or "Apply hot patch succeed" not in stdout: - tmp["result"] = TaskExecuteRes.FAIL - final_fix_result = TaskExecuteRes.FAIL - elif "Nothing to do" not in stdout: - upgrade_count += 1 - - if tmp["result"] == TaskExecuteRes.SUCCEED and accepted: - try: - hotpatch_name = rpm.rsplit(".", 1)[0].split("-", 1)[1] - _, hotpatch_status_set_log = self._set_hotpatch_status_by_dnf_plugin(hotpatch_name, "accept") - tmp["log"] += f"\n\n{hotpatch_status_set_log}" - except IndexError as error: - LOGGER.error(error) - tmp["log"] += f"\n\nhotpatch status set failed due to can't get correct hotpatch name!" - package_update_info.append(tmp) - return final_fix_result, package_update_info, upgrade_count - - @staticmethod - def _query_fixable_cve_info() -> Tuple[str, dict]: - """ - Query the CVEs fixed by the upgradeable version of each package - - Retunrs: - Tuple[status, dict] - a tuple containing two elements (status code, fixed_cve_info). - - Example: - "Succeed", {"kernel": { - "kernel-5.10.0-60.91.0.115.oe2203.x86_64": ["CVE-2023-1829"], - "kernel-5.10.0-60.91.0.116.oe2203.x86_64": ["CVE-2023-2006"] - }} - """ - code, stdout, stderr = execute_shell_command(["dnf updateinfo list cves"]) - if code != CommandExitCode.SUCCEED: - LOGGER.error("Failed to query update info by dnf!") - LOGGER.error(stderr) - return COMMAND_EXEC_ERROR, defaultdict() - - all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+)", stdout) - rpm_update_info = defaultdict(lambda: defaultdict(list)) - for cve_id, _, coldpatch in all_cve_info: - rpm_name = coldpatch.rsplit("-", 2)[0] - rpm_update_info[rpm_name][coldpatch].append(cve_id) - - return SUCCESS, rpm_update_info - - @staticmethod - def _query_fixed_cve_info_by_hotpatch() -> Tuple[str, dict]: - """ - Statistics CVE data that will be fixed by hotpatch - - Returns: - Tuple[status, dict] - a tuple containing two elements (status code, fixed_cve_info). - - Example: - "Succeed", {"kernel": {"CVE-2023-XXXX","CVE-2022-XXXX"}} - """ - code, stdout, stderr = execute_shell_command(["dnf hot-updateinfo list cves --installed"]) - if code != CommandExitCode.SUCCEED: - LOGGER.error("Failed to query fixed cves by hotpatch!") - LOGGER.error(stderr) - return COMMAND_EXEC_ERROR, set() - - hotpatch_fixed_info = defaultdict(set) - all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout) - for cve_id, _, _, hotpatch in all_cve_info: - rpm_name = hotpatch.rsplit("-", 5)[0][6:] - hotpatch_fixed_info[rpm_name].add(cve_id) - - return SUCCESS, hotpatch_fixed_info - - def compare_cve(self, rpm: str, updated_info: dict, hotpatch_fixed_info: dict) -> Tuple[bool, str]: - """ - Determine whether the packages to be upgraded covers the vulnerabilities fixed by the hotpatch - - Args: - rpms(list): List of packages that need to be upgraded - - Returns: - Tuple[bool, str] - a tuple containing two elements (compare result, compare log). - """ - compare_info = dict() - upgraded_packages: set = self._query_upgraded_packages(rpm) - if not upgraded_packages: - return False, "Execution of CVE comparison failed due to failure to query upgraded_packages." - for rpm in upgraded_packages: - fixed_cve_by_coldpatch = set() - rpm_name = rpm.rsplit("-", 2)[0] - - for update_rpm, cve_list in updated_info.get(rpm_name, {}).items(): - if rpm >= update_rpm: - fixed_cve_by_coldpatch.update(cve_list) - cve_difference_set = hotpatch_fixed_info.get(rpm_name, set()) - fixed_cve_by_coldpatch - if cve_difference_set: - compare_info[rpm_name] = cve_difference_set - - if not compare_info.values(): - return True, "" - - log = ( - "After upgrading the package, vulnerabilities in the package or in its dependent software package " - "may be re-exposed. \nHere are some specific vulnerabilities that could potentially re-exposed:\n" - ) - for rpm_name, cve_info in compare_info.items(): - for cve_id in cve_info: - log += f"{rpm_name}\t{cve_id}\n" - return False, log - - @staticmethod - def _query_upgraded_packages(package: str) -> Set[str]: - """ - Resolve all packages to be upgraded and their dependencies, store them in a set and - return it - - Args: - packages(list): List of package that need to be upgraded - - Returns: - set - - """ - package_set = set() - if package.rsplit("-", 2)[0] == "kernel": - package_set.add(package) - return package_set - - # The exit code of the command is 1 when input parameters contains assumeno - _, stdout, _ = execute_shell_command([f"dnf upgrade-en {package} --assumeno"]) - - installed_rpm_info = re.findall(r"(Upgrading|Installing):(.*?)Transaction Summary", stdout, re.S) - if not installed_rpm_info: - return package_set - - installed_rpm_info_list = installed_rpm_info[0][1].strip().split("\n") - for single_rpm_info in installed_rpm_info_list: - # info_list example: - # ['aops-ceres', 'aarch64', 'v1.3.4-5.oe2203sp2', '@commandline', '107 k] - pkg_info_list = re.split(r'\s+', single_rpm_info.strip()) - if len(pkg_info_list) < 5: - break - package_set.add(f"{pkg_info_list[0]}-{pkg_info_list[2]}.{pkg_info_list[1]}") - return package_set - - @staticmethod - def set_default_grub_kernel_version(kernel_rpm_name: str) -> bool: - """ - Set the boot kernel - - Args: - kernel_rpm_name(str): The name of the installed kernel package - - Returns: - bool - """ - boot_kernel_path = os.path.join("/boot/", f"vmlinuz-{kernel_rpm_name[7:]}") - if not os.path.exists(boot_kernel_path): - LOGGER.error("Can't find target kernel in /boot when set default kernel") - return False - - LOGGER.info("The Linux boot kernel is about to be changed") - code, _, stderr = execute_shell_command([f"grubby --set-default={boot_kernel_path}"]) - - if code != CommandExitCode.SUCCEED: - LOGGER.info("The Linux boot kernel change failed") - LOGGER.error(stderr) - return False - LOGGER.info("The Linux boot kernel change successful") - return True - - @staticmethod - def _query_latest_dnf_transaction_id() -> Optional[int]: - """Query latest yum transaction id - - Returns: - int - """ - # Example of command execution result: - # [root@localhost ~]# dnf history - # ID | Command line | Date and time | Action(s) | Altered - # --------------------------------------------------------------------- - # 3 | rm aops-ceres | 2023-11-30 09:57 | Removed | 1 - # 2 | install gcc | 2023-11-30 09:57 | Install | 1 - code, stdout, stderr = execute_shell_command( - ["dnf history", "grep -E '^\s*[0-9]+'", "head -1", "awk '{print $1}'"] - ) - if code != CommandExitCode.SUCCEED: - LOGGER.error(stderr) - return None - - return int(stdout) - - @staticmethod - def _set_hotpatch_status_by_dnf_plugin(hotpatch: str, operation: str) -> Tuple[bool, str]: - """ - change hotpatch status by dnf plugin - - Args: - hotpatch(str): hotpatch name which you want to change its status - operation(str): the action that needs to be performed on this hot patch. - supported actions: apply,deactive,remove,active,accept - Returns: - Tuple[bool, str] - a tuple containing two elements (operation result, operation log). - """ - - # replace -ACC to /ACC or -SGL to /SGL - # Example: kernel-5.10.0-153.12.0.92.oe2203sp2-ACC-1-1 >> kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 - wait_to_remove_patch = re.sub(r'-(ACC|SGL)', r'/\1', hotpatch) - # Example of command execution result: - # Succeed: - # [root@openEuler ~]# dnf hotpatch --remove kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 - # Last metadata expiration check: 3:24:16 ago on Wed 13 Sep 2023 08:16:17 AM CST. - # Gonna remove this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 - # remove hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' succeed - # Fail: - # [root@openEuler ~]# dnf hotpatch --accept kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 - # Last metadata expiration check: 3:25:24 ago on Wed 13 Sep 2023 08:16:17 AM CST. - # Gonna accept this hot patch: kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1 - # accept hot patch 'kernel-5.10.0-153.12.0.92.oe2203sp2/ACC-1-1' failed, remain original status - code, stdout, stderr = execute_shell_command([f"dnf hotpatch --{operation} {wait_to_remove_patch}"]) - if code != CommandExitCode.SUCCEED or 'failed' in stdout: - LOGGER.error(f"hotpatch {hotpatch} set status failed!") - return False, stdout + stderr - - return True, stdout + stderr - - def remove_hotpatch(self, cves: List[str]) -> dict: - """ - remove hotpatch - - Args: - cves(list): List of CVE IDs fixed by hotpatch,e.g. - ["CVE-XXXX-XXXX"] - - Returns: - dict e.g - { - "status": "fail/succeed", - "cves": [{ - "cve_id": cve, - "result": "succeed", - "log": "rollback succeed" - }] - } - """ - hotpatch_list = self._hotpatch_list_cve() - if not hotpatch_list: - return { - "status": TaskExecuteRes.FAIL, - "cves": [ - dict(cve_id=cve, log="No valid hotpatch is matched.", result=TaskExecuteRes.FAIL) for cve in cves - ], - } - - wait_to_remove_patch = set() - for cve in cves: - wait_to_remove_patch = wait_to_remove_patch.union(hotpatch_list.get(cve, set())) - - hotpatch_remove_res = {} - for patch in set(wait_to_remove_patch): - remove_result, log = self._hotpatch_remove(patch) - hotpatch_remove_res[patch] = { - "result": TaskExecuteRes.SUCCEED if remove_result else TaskExecuteRes.FAIL, - "log": log, - } - - cve_hotpatch_remove_result = [] - - for cve in cves: - if cve not in hotpatch_list: - fail_result = { - "cve_id": cve, - "log": "No valid hot patch is matched.", - "result": TaskExecuteRes.FAIL, - } - cve_hotpatch_remove_result.append(fail_result) - else: - tmp_result_list = [] - tmp_log = [] - - for patch in hotpatch_list.get(cve): - tmp_result_list.append(hotpatch_remove_res[patch]["result"] == TaskExecuteRes.SUCCEED) - tmp_log.append(hotpatch_remove_res[patch]["log"]) - - cve_hotpatch_remove_result.append( - { - "cve_id": cve, - "log": "\n".join(tmp_log), - "result": TaskExecuteRes.SUCCEED if all(tmp_result_list) else TaskExecuteRes.FAIL, - } - ) - - return {"status": TaskExecuteRes.SUCCEED, "cves": cve_hotpatch_remove_result} - - @staticmethod - def _hotpatch_list_cve() -> dict: - """ - Run the dnf hotpatch list cve command to query the hotpatch list corresponding to the cve - - Returns: - dict: e.g - { - "CVE-XXXX-XXX": {"patch 1", "patch 2"} - } - """ - code, stdout, _ = execute_shell_command([f"dnf hot-updateinfo list cves --installed", "grep patch"]) - if code != CommandExitCode.SUCCEED: - LOGGER.error(f"Failed to query the hotpatch list.") - return None - - all_cve_info = re.findall(r"(CVE-\d{4}-\d+)\s+([\w+/.]+)\s+(\S+|-)\s+(patch\S+)", stdout) - if not all_cve_info: - LOGGER.error(f"Failed to query the hotpatch list.") - return None - - applied_hotpatch_info = {} - hotpatch_dic = {} - for cve_id, _, _, hotpatch in all_cve_info: - applied_hotpatch_info[cve_id] = hotpatch - hotpatch_dic_key = hotpatch.rsplit("-", 2)[0] - if hotpatch_dic_key.endswith("ACC"): - hotpatch_dic[hotpatch_dic_key] = max(hotpatch, hotpatch_dic.get(hotpatch_dic_key, hotpatch)) - - for cve_id, cmd_output_hotpatch in applied_hotpatch_info.items(): - applied_hotpatch_info[cve_id] = hotpatch_dic.get(cmd_output_hotpatch.rsplit("-", 2)[0], cmd_output_hotpatch) - - hotpatch_list = defaultdict(set) - for cve_id, hotpatch in applied_hotpatch_info.items(): - hotpatch_list[cve_id].add(hotpatch) - - return hotpatch_list - - def _hotpatch_remove(self, hotpatch: str) -> Tuple[bool, str]: - """ - remove hotpatch package - - Args: - hotpatch: hotpatch package which needs to remove - """ - cmd = [f"dnf remove {hotpatch} -y"] - _, stdout, stderr = execute_shell_command(cmd) - return True, f"Command:{cmd}\n\n{stdout}\n{stderr}\n" diff --git a/ceres/manages/vulnerability_manage/set_repo_manage.py b/ceres/manages/vulnerability_manage/set_repo_manage.py new file mode 100644 index 0000000..5c7ae51 --- /dev/null +++ b/ceres/manages/vulnerability_manage/set_repo_manage.py @@ -0,0 +1,98 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ +import re +import os + +from typing import List +from ceres.conf.constant import CommandExitCode +from ceres.function.log import LOGGER +from ceres.function.util import execute_shell_command + +__all__ = ["SetRepoManage"] + + +class SetRepoManage: + def set_repo(self, data: dict) -> int: + """ + Save the repo source to local, and do simple verification. + + Args: + data (dict): e.g + { + "repo_info": { + "name": "string", + "dest": "save location", + "repo_content": "repo source info" + }, + "check_items": ["string"], + "check": false + } + + Returns: + bool: operation result of repo set + """ + repo_path = data.get("repo_info").get("dest") + if re.match(r"/etc/yum.repos.d/[\w-]+.repo$", repo_path) is None: + LOGGER.debug('Incorrect repo save path.') + return False + + content = data.get("repo_info").get("repo_content") + repo_id_list = re.findall(r'\[([^\]]+)\]', re.sub(r'^\s*#.*$', '', content, flags=re.MULTILINE)) + if not repo_id_list: + LOGGER.warning("Failed to extract repo id information, please check the repo content.") + return False + + with open(repo_path, 'w', encoding='utf8') as repo_file: + repo_file.write(content) + LOGGER.info(f'Repo source {data.get("repo_info").get("name")} has been saved to {repo_path}.') + + if self._validate_repo_source(repo_id_list): + LOGGER.info('Repo source set succeed.') + return True + os.remove(repo_path) + LOGGER.warning("Repo source can't be used, it has been deleted.") + return False + + @staticmethod + def _validate_repo_source(repo_id_list: List[str]) -> bool: + """ + A sample validate which repo can used by yum. + + Args: + repo_id(list): repo id list + + Returns: + bool + """ + + def query_repo_info(repo_id: str) -> bool: + """ + Verify the validity of the repo source by querying the repo source information. + + Args: + repo_id(str) + + Returns: + bool + """ + code, _, stderr = execute_shell_command([f"dnf repoinfo --repo {repo_id}"]) + if code == CommandExitCode.SUCCEED: + return True + LOGGER.warning(f"Failed to query repo information with repo id {repo_id}.") + LOGGER.warning(stderr) + return False + + validate_result = True + for repo_id in repo_id_list: + validate_result = validate_result and query_repo_info(repo_id) + return validate_result diff --git a/ceres/tests/manages/test_vulnerability_manage/__init__.py b/ceres/tests/manages/test_vulnerability_manage/__init__.py new file mode 100644 index 0000000..47e07ff --- /dev/null +++ b/ceres/tests/manages/test_vulnerability_manage/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ \ No newline at end of file diff --git a/ceres/tests/manages/test_vulnerability_manage/test_set_repo.py b/ceres/tests/manages/test_vulnerability_manage/test_set_repo.py new file mode 100644 index 0000000..f85663d --- /dev/null +++ b/ceres/tests/manages/test_vulnerability_manage/test_set_repo.py @@ -0,0 +1,90 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ +import os +import unittest +from unittest import mock + +from ceres.conf.constant import CommandExitCode +from ceres.manages.vulnerability_manage.set_repo_manage import SetRepoManage + + +class TestSetRepo(unittest.TestCase): + + @mock.patch("sys.exit") + @mock.patch("builtins.open", mock.mock_open()) + @mock.patch.object(SetRepoManage, "_validate_repo_source") + def test_repo_set_should_return_true_when_input_repo_content_can_be_used_by_yum( + self, mock_validate_source, mock_remove + ): + mock_validate_source.return_value = True + mock_remove.return_value = '' + mock_args = { + "repo_info": { + "repo_name": "mock_name", + "dest": "/etc/yum.repos.d/mock.repo", + "repo_content": "[repo_id]mock_content", + }, + "check_items": [], + } + result = SetRepoManage().set_repo(mock_args) + self.assertEqual(True, result) + + @mock.patch.object(os, "remove") + @mock.patch("builtins.open", mock.mock_open()) + @mock.patch.object(SetRepoManage, "_validate_repo_source") + def test_repo_set_should_return_repo_content_is_incorrect_when_repo_content_cannot_be_used_by_yum( + self, mock_validate_source, mock_remove + ): + mock_validate_source.return_value = False + mock_remove.return_value = '' + mock_args = { + "repo_info": { + "repo_name": "mock_name", + "dest": "/etc/yum.repos.d/mock.repo", + "repo_content": "[repo_id]mock_content", + "check": False, + }, + "check_items": [], + } + result = SetRepoManage().set_repo(mock_args) + self.assertEqual(False, result) + + def test_repo_set_should_return_param_error_when_repo_save_path_is_incorrect(self): + mock_args = { + "repo_info": { + "repo_name": "mock_name", + "dest": "mock_dest", + "repo_content": "[repo_id]mock_content", + "check": False, + }, + "check_items": [], + } + result = SetRepoManage().set_repo(mock_args) + self.assertEqual(False, result) + + @mock.patch('ceres.manages.vulnerability_manage.set_repo_manage.execute_shell_command') + def test_validate_repo_source_should_return_true_when_query_repo_info_succeed(self, mock_execute_shell_command): + mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", "" + result = SetRepoManage._validate_repo_source(["repo_id"]) + self.assertEqual(True, result) + + @mock.patch('ceres.manages.vulnerability_manage.set_repo_manage.execute_shell_command') + def test_validate_repo_source_should_return_false_when_shell_command_execute_failed( + self, mock_execute_shell_command + ): + mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", "" + self.assertEqual(False, SetRepoManage._validate_repo_source(['update'])) + + +if __name__ == "__main__": + unittest.main() diff --git a/ceres/tests/manages/test_vulnerability_manage.py b/ceres/tests/manages/test_vulnerability_manage/test_vulnerability_manage.py similarity index 100% rename from ceres/tests/manages/test_vulnerability_manage.py rename to ceres/tests/manages/test_vulnerability_manage/test_vulnerability_manage.py diff --git a/ceres/tests/test_cli/__init__.py b/ceres/tests/test_cli/__init__.py index e69de29..d457eda 100644 --- a/ceres/tests/test_cli/__init__.py +++ b/ceres/tests/test_cli/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2021-2024. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ diff --git a/ceres/tests/test_cli/test_collect.py b/ceres/tests/test_cli/test_collect.py index f411604..5a3a986 100644 --- a/ceres/tests/test_cli/test_collect.py +++ b/ceres/tests/test_cli/test_collect.py @@ -1,3 +1,15 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2021-2024. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ import unittest import json from unittest import mock diff --git a/ceres/tests/test_cli/test_plugin.py b/ceres/tests/test_cli/test_plugin.py index 9f6ecfe..19abf2f 100644 --- a/ceres/tests/test_cli/test_plugin.py +++ b/ceres/tests/test_cli/test_plugin.py @@ -1,3 +1,16 @@ +#!/usr/bin/python3 +# ****************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2021-2024. All rights reserved. +# licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# ******************************************************************************/ + import unittest import json from unittest import mock -- Gitee