diff --git a/ascend_deployer/jobs.py b/ascend_deployer/jobs.py index 123b95967faba28326e8f40cf41ae9c3f5e09858..c209b40f3ac1a5f4a6de8f85fe70dac9e27051cb 100644 --- a/ascend_deployer/jobs.py +++ b/ascend_deployer/jobs.py @@ -34,6 +34,7 @@ from utils import compare_version import utils from module_utils.common_info import get_os_and_arch from module_utils.inventory_file import inventory_file +from module_utils.path_manager import CompressedFileCheckUtils from scripts import nexus from scripts.pkg_utils import filter_pkg, search_paths, get_run_dir, get_config_dir, need_nexus, tags_map @@ -549,6 +550,9 @@ class ResourcePkg(object): def extract(self, file, path): if not os.path.exists(path): os.makedirs(path, 0o750) + ret, err_msg = CompressedFileCheckUtils.check_compressed_file_valid(file) + if not ret: + raise Exception(err_msg) if file.endswith('.zip'): if "faultdiag" in file or "mcu" in file: return self.extract_zip(file, path) diff --git a/ascend_deployer/library/process_framework.py b/ascend_deployer/library/process_framework.py index 488ae3f8e2324a0b4eb4ae12c45db6048abac7d4..854e4a920e80434dc566c4c6032b73e9dbef8446 100644 --- a/ascend_deployer/library/process_framework.py +++ b/ascend_deployer/library/process_framework.py @@ -26,6 +26,7 @@ from ansible.module_utils.basic import AnsibleModule from ansible.module_utils import common_info, common_utils from ansible.module_utils.common_info import DeployStatus from ansible.module_utils.common_utils import compare_version +from ansible.module_utils.path_manager import CompressedFileCheckUtils class Installation: @@ -171,7 +172,11 @@ class Installation: if glob.glob("{}/lib/libprotobuf.so.*".format(local_path)): return build_dir = os.path.join(os.path.expanduser("~"), "build") - with tarfile.open(os.path.join(self.resource_dir, "sources/protobuf-python-3.13.0.tar.gz"), "r") as tf: + src = os.path.join(self.resource_dir, "sources/protobuf-python-3.13.0.tar.gz") + ret, err_msg = CompressedFileCheckUtils.check_compressed_file_valid(src) + if not ret: + self.module.fail_json(msg=err_msg, rc=1, changed=False) + with tarfile.open(src, "r") as tf: tf.extractall(build_dir) for member in tf.getmembers(): os.chown(os.path.join(build_dir, member.name), os.getuid(), os.getgid()) diff --git a/ascend_deployer/library/uncompress_resources.py b/ascend_deployer/library/uncompress_resources.py index ea0c41712f7ec4fc77e786e487aca753c9be09e0..21acd79b0336134b93688ab78c8aef0131e3b5f9 100644 --- a/ascend_deployer/library/uncompress_resources.py +++ b/ascend_deployer/library/uncompress_resources.py @@ -24,6 +24,7 @@ import shutil import tarfile from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.path_manager import CompressedFileCheckUtils class UncompressResources(object): @@ -93,6 +94,9 @@ class UncompressResources(object): src = os.path.expanduser('~/resources_{}.tar'.format(self.arch)) if not os.path.exists(src): self.module.fail_json(msg='{} is not existed'.format(src)) + ret, err_msg = CompressedFileCheckUtils.check_compressed_file_valid(src) + if not ret: + self.module.fail_json(msg=err_msg, rc=1, changed=False) with tarfile.open(src) as f: members = [] uname = getpass.getuser() diff --git a/ascend_deployer/module_utils/path_manager.py b/ascend_deployer/module_utils/path_manager.py index 121e5898e91cb40f3f564f5d8b3dbf2177ddf66e..82ee261cd30502b39c90c37224d4205ae66797dd 100644 --- a/ascend_deployer/module_utils/path_manager.py +++ b/ascend_deployer/module_utils/path_manager.py @@ -2,6 +2,8 @@ import errno import os.path import shutil import string +import tarfile +import zipfile _CUR_DIR = os.path.dirname(__file__) PATH_WHITE_LIST_LIN = string.digits + string.ascii_letters + '~-+_./ ' @@ -149,3 +151,103 @@ class PathManager: for file in os.listdir(remote_info_path): if file != ProjectPath.INVENTORY_FILE: os.remove(os.path.join(remote_info_path, file)) + + +class CompressedFileCheckUtils: + """ + Utility class for checking compressed files (ZIP and TAR) for security issues. + + This class provides methods to validate compressed files by checking for: + - Symbolic links (which can be a security risk) + - Path traversal attempts (e.g., files containing ../ sequences) + - Absolute paths (which can be a security risk) + + The class supports both ZIP and TAR file formats and provides comprehensive + security checks to prevent potential security vulnerabilities when extracting + compressed files. + """ + @staticmethod + def check_tar_file_symbolic_link(file_info): + if file_info.issym(): + err_msg = "[ASCEND][ERROR] The file:{} is a symbolic link, please check it.".format(file_info.path) + return False, err_msg + return True, '' + + @staticmethod + def check_zip_file_symbolic_link(file_info): + # external_attr表示zip中该文件的外部属性,包括目录,符号链接,文件权限,形如:lrwxrwxrwx,drwxr-xr-x,-rw-rw-r-- + # 0o120000为符号链接的权限模式前缀,加上文件权限就是0o120777,使用os.lstat(符号链接路径).st_mode查看符号链接权限模式 + # 0o40000为目录的权限模式前缀,加上文件权限就是0o40755,使用os.stat(目录路径).st_mode查看目录权限模式 + # 0o100000为普通文件的权限模式前缀,加上文件权限就是0o100664,使用os.stat(普通文件路径).st_mode查看普通文件权限模式 + # external_attr=0,然后分文件和目录处理不同 + # external_attr |= (权限模式) << 16,然后目录为了兼容ms-dos会再来一下:external_attr |= 0x10 + # 所以判断zip文件中的文件是否为符号链接,只需要external_attr > 0o120000 << 16即可,前提是文件类型是ZIP_STORED + if file_info.compress_type == zipfile.ZIP_STORED and file_info.external_attr > 0o120000 << 16: + err_msg = "[ASCEND][ERROR] The file:{} is a symbolic link, please check it.".format(file_info.path) + return False, err_msg + return True, '' + + @staticmethod + def check_package_inner_file_name(file_name): + check_str_list = ["../", "..\\", ".\\", "./", "~/"] + for check_str in check_str_list: + if check_str in file_name: + err_msg = "[ASCEND][ERROR] check compressed file:{} failed ,inner file has special string".format( + file_name) + return False, err_msg + if os.path.isabs(file_name): + err_msg = "[ASCEND][ERROR] check compressed file:{} failed ,inner file cannot be abspath".format( + file_name) + return False, err_msg + return True, '' + + @staticmethod + def check_zip_file_info(filepath): + with zipfile.ZipFile(filepath, 'r') as file_list: + for file in file_list.infolist(): + checks = [ + CompressedFileCheckUtils.check_zip_file_symbolic_link(file), + CompressedFileCheckUtils.check_package_inner_file_name(file.filename) + ] + + for ret, err_msg in checks: + if not ret: + return False, err_msg + return True, '' + + @staticmethod + def check_tar_file_info(filepath): + try: + with tarfile.open(filepath, 'r') as file_list: + for file in file_list: + checks = [ + CompressedFileCheckUtils.check_tar_file_symbolic_link(file), + CompressedFileCheckUtils.check_package_inner_file_name(file.name) + ] + + for ret, err_msg in checks: + if not ret: + return False, err_msg + return True, '' + except Exception as e: + return False, "[ASCEND][ERROR] Failed to check tar file {}: {}".format(filepath, str(e)) + + @staticmethod + def check_compressed_file_valid(filepath): + try: + if filepath.endswith((".tar.gz", ".tar")): + ret, err_msg = CompressedFileCheckUtils.check_tar_file_info(filepath) + if not ret: + return False, err_msg + return True, "" + elif filepath.endswith(".zip"): + ret, err_msg = CompressedFileCheckUtils.check_zip_file_info(filepath) + if not ret: + return False, err_msg + return True, "" + else: + err_msg = "[ASCEND][ERROR] unsupported compressed file format {}".format(filepath) + return False, err_msg + except Exception as e: + err_msg = "[ASCEND][ERROR] {}".format(str(e)) + return False, err_msg