From c5823aa454880d173e7d6a0e6fef7d77f06d4933 Mon Sep 17 00:00:00 2001 From: cuican Date: Fri, 15 Dec 2023 16:54:02 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=E9=80=BB=E8=BE=91,=20=E4=BF=AE=E6=94=B9=E8=BE=93=E5=85=A5?= =?UTF-8?q?=E8=BE=93=E5=87=BA=E5=8F=82=E6=95=B0=E4=BB=A5=E5=8F=8A=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: cuican --- tools/syscap_collector.py | 455 +++++++++++++++++++++++++++++++------- 1 file changed, 378 insertions(+), 77 deletions(-) diff --git a/tools/syscap_collector.py b/tools/syscap_collector.py index c60c140..c0dd01f 100644 --- a/tools/syscap_collector.py +++ b/tools/syscap_collector.py @@ -16,7 +16,6 @@ import logging import os import json import argparse -import stat def get_args(): @@ -24,42 +23,21 @@ def get_args(): parser.add_argument( "-p", "--project_path", - default=r"./", + default=r"", type=str, help="root path of project. default: ./", ) - parser.add_argument( - "-o", - "--output_path", - default=r"./", - type=str, - help="path of output file. default: ./", - ) - parser.add_argument( - "-e", - "--error_output", - action='store_true', - help="output error_list or not. default not", - ) args = parser.parse_args() return args -def dict_to_json(output_path: str, syscaps_dict: dict, error_output: bool, error_dict: dict): - flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL - modes = stat.S_IWUSR | stat.S_IRUSR - logging.info("start generate syscap json...") - filename = os.path.join(output_path, 'component_list_syscap.json') - with os.fdopen(os.open(filename, flags, modes), 'w') as f: - json.dump(syscaps_dict, f) - logging.info("end...") - - if error_output: - logging.info("start generate error json......") - filename = os.path.join(output_path, 'error_dict.json') - with os.fdopen(os.open(filename, flags, modes), 'w') as error_f: - json.dump(error_dict, error_f) - logging.info("end...") +def dict_to_json(output_path: str, syscaps_dict: dict): + print("start generate syscap json...") + for product_name, syscaps_list in syscaps_dict.items(): + filename = os.path.join(output_path, f'{product_name}.json') + with open(filename, 'w') as f: + json.dump({'SysCaps': syscaps_list}, f) + print("end...") def check_syscap(syscap_str: str): @@ -70,15 +48,15 @@ def check_syscap(syscap_str: str): return syscap[0] -def bundle_syscap_list_handler(bundle_syscap_list: list, component_syscaps_list: list) -> list: - for i in component_syscaps_list: - i = check_syscap(i) - if i: - bundle_syscap_list.append(i) +def bundle_syscap_list_handler(bundle_syscap_list: list, component_syscaps_list: list): + for component_syscap in component_syscaps_list: + component_syscap = check_syscap(component_syscap) + if component_syscap: + bundle_syscap_list.append(component_syscap) return bundle_syscap_list -def read_json_file(bundle_json_path: str) -> tuple: +def read_json_file(bundle_json_path: str): bundle_syscap_list = list() error_list = dict() try: @@ -110,17 +88,19 @@ def path_component_to_bundle(path: str) -> str: return bundle_json_path -def handle_bundle_json_file(component_path_list: list) -> tuple: - logging.info("start collect syscap path...") - syscap_list = list() - error_dict = dict() - for path in component_path_list: - bundle_json_path = path_component_to_bundle(path) - bundle_syscap_list, error_list = read_json_file(bundle_json_path) - syscap_list.extend(bundle_syscap_list) - error_dict.update(error_list) - - return {"SysCaps": syscap_list}, error_dict +def handle_bundle_json_file(component_path_dict: dict): + print("start collect syscap path...") + syscap_dict = dict() + errors_list = list() + for product_name, path_list in component_path_dict.items(): + bundles_list = list() + for path in path_list: + bundle_json_path = path_component_to_bundle(path) + bundle_syscap_list, error_list = read_json_file(bundle_json_path) + bundles_list.extend(bundle_syscap_list) + errors_list.extend(error_list) + syscap_dict.update({product_name: bundles_list}) + return syscap_dict, errors_list def format_component_path(component_path: str): @@ -130,49 +110,370 @@ def format_component_path(component_path: str): return component_path -def traversal_path(path_dict: dict, project_path: str) -> list: - component_path_list = list() - for _, v in path_dict.items(): - component_path = os.path.join(project_path, v) - component_path = format_component_path(component_path) - component_path_list.append(component_path) - return component_path_list +def traversal_path(parts_path_info: dict, project_path: str, product_define_dict): + component_path_dict = dict() + for product_name, component_name_list in product_define_dict.items(): + component_paths = list() + for component_name in component_name_list: + component_relpath = parts_path_info.get(component_name) + if component_relpath: + component_path = os.path.join(project_path, component_relpath) + component_path = format_component_path(component_path) + component_paths.append(component_path) + else: + logging.error(f'can\'t find component_name : {component_name}') + component_path_dict.update({product_name: component_paths}) + return component_path_dict -def collect_all_syscap(components_path: str, project_path: str) -> tuple: - """ - 从各部件收集syscap并返回字典 - :param components_path: 必选部件json文件的路径 - :param project_path: 项目根路径 - :return: 所有典型品类syscap字典 - """ - components_path_dict = get_all_components_path(components_path) - if components_path_dict: - logging.info("start collect component path...") - component_path_list = traversal_path(components_path_dict, project_path) - syscap_dict, error_dict = handle_bundle_json_file(component_path_list) - return syscap_dict, error_dict +def collect_all_product_component_syscap_dict(parts_path_info: dict, project_path: str, product_define_dict): + if parts_path_info: + print("start collect component path...") + component_path_dict = traversal_path(parts_path_info, project_path, product_define_dict) + syscap_dict, errors_list = handle_bundle_json_file(component_path_dict) + return syscap_dict, errors_list else: return 0, 0 -def components_path_handler(project_path): - components_path = os.path.join(project_path, "out", "rk3568", "build_configs", "parts_info", - "parts_path_info.json") - return components_path +def get_subsystem_info(subsystem_config_file, source_root_dir): + subsystem_configs = scan(subsystem_config_file, source_root_dir) + _all_components_path = [] + for key, value in subsystem_configs.get('subsystem').items(): + for i in value.get('build_files'): + _all_components_path.append(i) + return subsystem_configs.get('subsystem') + + +def _check_path_prefix(paths): + allow_path_prefix = ['vendor', 'device'] + result = list( + filter(lambda x: x is False, + map(lambda p: p.split('/')[0] in allow_path_prefix, paths))) + return len(result) <= 1 + + +def traversal_files(subsystem_path, _files): + for item in os.scandir(subsystem_path): + if is_symlik(item.path): + continue + elif item.is_file() and item.name == 'ohos.build': + _files.append(item.path) + elif item.is_file() and item.name == 'bundle.json': + _files.append(item.path) + elif item.is_dir(): + traversal_files(item, _files) + return _files + + +def get_file_type(file_path): + if os.path.islink(file_path): + return 'symlink' + elif os.path.isfile(file_path): + return 'file' + elif os.path.isdir(file_path): + return 'directory' + else: + return 'unknown' + + +def is_symlik(file_path): + file_type = get_file_type(file_path) + if file_type == 'symlink': + link_target = os.readlink(file_path) + return link_target != file_type + return False + + +def _scan_build_file(subsystem_path): + _files = [] + _bundle_files = [] + try: + _files = traversal_files(subsystem_path, _files) + except FileNotFoundError as e: + print(e) + return _files + + +def scan(subsystem_config_file, source_root_dir): + subsystem_infos = _read_config(subsystem_config_file) + _default_subsystem = {"build": "build"} + subsystem_infos.update(_default_subsystem) + no_src_subsystem = {} + _build_configs = {} + for key, val in subsystem_infos.items(): + _all_build_config_files = [] + if not isinstance(val, list): + val = [val] + else: + if not _check_path_prefix(val): + raise Exception("subsystem '{}' path configuration is incorrect.".format(key), "2013") + _info = {'path': val} + for _path in val: + _subsystem_path = os.path.join(source_root_dir, _path) + _build_config_files = _scan_build_file(_subsystem_path) + _all_build_config_files.extend(_build_config_files) + if _all_build_config_files: + _info['build_files'] = _all_build_config_files + _build_configs[key] = _info + else: + no_src_subsystem[key] = val + + scan_result = { + 'source_path': source_root_dir, + 'subsystem': _build_configs, + } + print('subsytem config scan completed') + return scan_result + + +def _read_config(subsystem_config_file): + if not os.path.exists(subsystem_config_file): + raise Exception("config file '{}' doesn't exist.".format(subsystem_config_file), "2013") + subsystem_config = _read_json_file(subsystem_config_file) + if subsystem_config is None: + raise Exception("read file '{}' failed.".format(subsystem_config_file), "2013") + + subsystem_info = {} + for key, val in subsystem_config.items(): + if 'path' not in val: + raise Exception("subsystem '{}' not config path.".format(key), "2013") + subsystem_info[key] = val.get('path') + return subsystem_info + + +def read_build_file(ohos_build_file): + if not os.path.exists(ohos_build_file): + raise Exception("config file '{}' doesn't exist.".format(ohos_build_file), "2014") + subsystem_config = _read_json_file(ohos_build_file) + if subsystem_config is None: + raise Exception("read file '{}' failed.".format(ohos_build_file), "2014") + return subsystem_config + + +class BundlePartObj(object): + def __init__(self, bundle_config_file): + self._build_config_file = bundle_config_file + self._loading_config() + + def _loading_config(self): + if not os.path.exists(self._build_config_file): + raise Exception("file '{}' doesn't exist.".format( + self._build_config_file), "2011") + self.bundle_info = _read_json_file(self._build_config_file) + if self.bundle_info is None: + raise Exception("read file '{}' failed.".format( + self._build_config_file), "2011") + + def to_ohos_build(self): + _component_info = self.bundle_info.get('component') + _subsystem_name = _component_info.get('subsystem') + _part_name = _component_info.get('name') + _bundle_build = _component_info.get('build') + _ohos_build_info = dict() + _ohos_build_info['subsystem'] = _subsystem_name + _part_info = {} + module_list = [] + if _component_info.get('build').__contains__('sub_component'): + _part_info['module_list'] = _component_info.get('build').get( + 'sub_component') + elif _component_info.get('build').__contains__('modules'): + _part_info['module_list'] = _component_info.get( + 'build').get('modules') + elif _component_info.get('build').__contains__('group_type'): + _module_groups = _component_info.get('build').get('group_type') + for _group_type, _module_list in _module_groups.items(): + _key = '{}:{}'.format(_subsystem_name, _part_name) + _part_info['module_list'] = module_list + if 'inner_kits' in _bundle_build: + _part_info['inner_kits'] = _bundle_build.get('inner_kits') + elif 'inner_api' in _bundle_build: + _part_info['inner_kits'] = _bundle_build.get('inner_api') + if 'features' in _component_info: + _part_info['feature_list'] = _component_info.get('features') + if 'syscap' in _component_info: + _part_info['system_capabilities'] = _component_info.get('syscap') + if 'hisysevent_config' in _component_info: + _part_info['hisysevent_config'] = _component_info.get( + 'hisysevent_config') + _part_info['part_deps'] = _component_info.get('deps', {}) + _part_info['part_deps']['build_config_file'] = self._build_config_file + _ohos_build_info['parts'] = {_part_name: _part_info} + return _ohos_build_info + + +class LoadBuildConfig(object): + """load build config file and parse configuration info.""" + + def __init__(self, source_root_dir, subsystem_build_info, subsystem_name): + self._source_root_dir = source_root_dir + self._build_info = subsystem_build_info + self._is_load = False + self._parts_variants = {} + self._part_list = {} + self._part_targets_label = {} + self._subsystem_name = subsystem_name + self._parts_info_dict = {} + self._phony_targets = {} + self._parts_path_dict = {} + self._part_hisysevent_config = {} + self._parts_module_list = {} + self._parts_deps = {} + + def _merge_build_config(self): + _build_files = self._build_info.get('build_files') + is_thirdparty_subsystem = False + if _build_files[0].startswith(self._source_root_dir + 'third_party'): + is_thirdparty_subsystem = True + subsystem_name = None + parts_info = {} + parts_path_dict = {} + for _build_file in _build_files: + if _build_file.endswith('bundle.json'): + bundle_part_obj = BundlePartObj(_build_file) + _parts_config = bundle_part_obj.to_ohos_build() + else: + + _parts_config = read_build_file(_build_file) + _subsystem_name = _parts_config.get('subsystem') + if not is_thirdparty_subsystem and subsystem_name and _subsystem_name != subsystem_name: + raise Exception( + "subsystem name config incorrect in '{}'.".format( + _build_file), "2014") + subsystem_name = _subsystem_name + _curr_parts_info = _parts_config.get('parts') + for _pname in _curr_parts_info.keys(): + parts_path_dict[_pname] = os.path.relpath( + os.path.dirname(_build_file), self._source_root_dir) + parts_info.update(_curr_parts_info) + subsystem_config = dict() + subsystem_config['subsystem'] = subsystem_name + subsystem_config['parts'] = parts_info + return subsystem_config, parts_path_dict + + def parse(self): + """parse part info from build config file.""" + if self._is_load: + return + subsystem_config, parts_path_dict = self._merge_build_config() + parts_config = subsystem_config.get('parts') + self._parts_module_list.update(parts_config) + self._parts_path_dict = parts_path_dict + self._is_load = True + + def parts_path_info(self): + """parts to path info.""" + self.parse() + return self._parts_path_dict + + def parts_info_filter(self, save_part): + if save_part is None: + raise Exception + self._parts_info_dict = { + key: value for key, value in self._parts_info_dict.items() if key in save_part} + + +def get_parts_info(source_root_dir, subsystem_info, build_xts=False): + """parts info, get info from build config file. + """ + _phony_target = {} + _parts_path_info = {} + _parts_hisysevent_config = {} + _parts_modules_info = {} + _parts_deps = {} + for subsystem_name, build_config_info in subsystem_info.items(): + if not len(build_config_info.get("build_files")): + continue + build_loader = LoadBuildConfig(source_root_dir, build_config_info, subsystem_name) + if subsystem_name == 'xts' and build_xts is False: + xts_device_attest_name = ['device_attest_lite', 'device_attest'] + build_loader.parse() + build_loader.parts_info_filter(xts_device_attest_name) + _parts_path_info.update(build_loader.parts_path_info()) + return _parts_path_info + + +def _read_json_file(input_file): + if not os.path.exists(input_file): + print("file '{}' doesn't exist.".format(input_file)) + return None + try: + with open(input_file, 'r') as input_f: + data = json.load(input_f) + return data + except json.decoder.JSONDecodeError: + print("The file '{}' format is incorrect.".format(input_file)) + raise + except Exception: + print("read file '{}' failed.".format(input_file)) + raise + + +def get_product_define_path(source_root_dir): + return os.path.join(source_root_dir, 'productdefine', 'common', 'inherit') + + +def product_component_handler(product_file, product_file_path): + all_components_dict = dict() + components_list = list() + try: + with open(product_file_path, 'r', encoding='utf-8') as f: + product_file_json = json.load(f) + for subsystems in product_file_json.get('subsystems'): + for components in subsystems.get('components'): + components_list.append(components.get('component')) + except Exception as e: + print(e) + all_components_dict.update({product_file.split('.')[0]: components_list}) + return all_components_dict + + +def collect_all_product_component(product_file_dict: dict): + all_components_dict = dict() + for product_file, product_file_path in product_file_dict.items(): + product_components_dict = product_component_handler(product_file, product_file_path) + all_components_dict.update(product_components_dict) + return all_components_dict + + +def get_product_define_dict(source_root_dir): + product_define_path = get_product_define_path(source_root_dir) + product_file_dict = dict() + for file in os.scandir(product_define_path): + if file.name.split('.')[-1] == 'json': + product_file_dict.update({file.name: os.path.join(product_define_path, file.name)}) + product_define_dict = collect_all_product_component(product_file_dict) + return product_define_dict + + +def output_path_handler(project_path): + output_path = os.path.join(project_path, 'interface', 'sdk-js', 'api', 'device-define-common') + folder = os.path.exists(output_path) + if not folder: + os.mkdir(output_path) + return output_path + + +def project_path_handler(project_path): + if not project_path: + project_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + return project_path def main(): logging.basicConfig(level=logging.INFO) - args = get_args() project_path = args.project_path - output_path = args.output_path - error_output = args.error_output - components_path = components_path_handler(project_path) - syscap_dict, error_dict = collect_all_syscap(components_path, project_path) + project_path = project_path_handler(project_path) + output_path = output_path_handler(project_path) + subsystem_config_file = os.path.join(project_path, 'build', 'subsystem_config.json') + product_define_dict = get_product_define_dict(project_path) + _subsystem_info = get_subsystem_info(subsystem_config_file, project_path) + _parts_path_info = get_parts_info(project_path, _subsystem_info) + syscap_dict, errors_list = collect_all_product_component_syscap_dict(_parts_path_info, project_path, + product_define_dict) if syscap_dict: - dict_to_json(output_path, syscap_dict, error_output, error_dict) + dict_to_json(output_path, syscap_dict) if __name__ == "__main__": -- Gitee From 9d8fcad9f11b63cdb5d50026ea4e6adcd02c7f8c Mon Sep 17 00:00:00 2001 From: cuican Date: Fri, 15 Dec 2023 17:45:04 +0800 Subject: [PATCH 2/4] =?UTF-8?q?codecheck=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: cuican --- tools/syscap_collector.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tools/syscap_collector.py b/tools/syscap_collector.py index c0dd01f..b10bb13 100644 --- a/tools/syscap_collector.py +++ b/tools/syscap_collector.py @@ -16,6 +16,7 @@ import logging import os import json import argparse +import stat def get_args(): @@ -33,9 +34,11 @@ def get_args(): def dict_to_json(output_path: str, syscaps_dict: dict): print("start generate syscap json...") + flags = os.O_WRONLY | os.O_CREAT + modes = stat.S_IWUSR | stat.S_IRUSR for product_name, syscaps_list in syscaps_dict.items(): filename = os.path.join(output_path, f'{product_name}.json') - with open(filename, 'w') as f: + with os.fdopen(os.open(filename, flags, modes), 'w') as f: json.dump({'SysCaps': syscaps_list}, f) print("end...") @@ -139,7 +142,7 @@ def collect_all_product_component_syscap_dict(parts_path_info: dict, project_pat def get_subsystem_info(subsystem_config_file, source_root_dir): subsystem_configs = scan(subsystem_config_file, source_root_dir) _all_components_path = [] - for key, value in subsystem_configs.get('subsystem').items(): + for _, value in subsystem_configs.get('subsystem').items(): for i in value.get('build_files'): _all_components_path.append(i) return subsystem_configs.get('subsystem') @@ -190,8 +193,8 @@ def _scan_build_file(subsystem_path): _bundle_files = [] try: _files = traversal_files(subsystem_path, _files) - except FileNotFoundError as e: - print(e) + except FileNotFoundError: + print(f"read file {subsystem_path} failed.") return _files @@ -413,17 +416,24 @@ def get_product_define_path(source_root_dir): return os.path.join(source_root_dir, 'productdefine', 'common', 'inherit') +def components_list_handler(product_file_json): + components_list = list() + for subsystems in product_file_json.get('subsystems'): + for components in subsystems.get('components'): + components_list.append(components.get('component')) + + return components_list + + def product_component_handler(product_file, product_file_path): all_components_dict = dict() components_list = list() try: with open(product_file_path, 'r', encoding='utf-8') as f: product_file_json = json.load(f) - for subsystems in product_file_json.get('subsystems'): - for components in subsystems.get('components'): - components_list.append(components.get('component')) - except Exception as e: - print(e) + components_list = components_list_handler(product_file_json) + except FileNotFoundError: + print(f"read file {product_file_path} failed.") all_components_dict.update({product_file.split('.')[0]: components_list}) return all_components_dict -- Gitee From 9c28aa973c91de3e8b6286c131abf6faf2023bde Mon Sep 17 00:00:00 2001 From: cuican Date: Fri, 15 Dec 2023 17:59:29 +0800 Subject: [PATCH 3/4] =?UTF-8?q?codecheck=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=20,retuen=20None=20=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: cuican --- tools/syscap_collector.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/syscap_collector.py b/tools/syscap_collector.py index b10bb13..8ddb630 100644 --- a/tools/syscap_collector.py +++ b/tools/syscap_collector.py @@ -249,7 +249,7 @@ def read_build_file(ohos_build_file): if not os.path.exists(ohos_build_file): raise Exception("config file '{}' doesn't exist.".format(ohos_build_file), "2014") subsystem_config = _read_json_file(ohos_build_file) - if subsystem_config is None: + if not subsystem_config: raise Exception("read file '{}' failed.".format(ohos_build_file), "2014") return subsystem_config @@ -264,7 +264,7 @@ class BundlePartObj(object): raise Exception("file '{}' doesn't exist.".format( self._build_config_file), "2011") self.bundle_info = _read_json_file(self._build_config_file) - if self.bundle_info is None: + if not self.bundle_info: raise Exception("read file '{}' failed.".format( self._build_config_file), "2011") @@ -399,7 +399,7 @@ def get_parts_info(source_root_dir, subsystem_info, build_xts=False): def _read_json_file(input_file): if not os.path.exists(input_file): print("file '{}' doesn't exist.".format(input_file)) - return None + return 0 try: with open(input_file, 'r') as input_f: data = json.load(input_f) -- Gitee From b65f73d8e12e509dfcbf3ae238f3fbf049b3f49d Mon Sep 17 00:00:00 2001 From: cuican Date: Fri, 15 Dec 2023 18:13:59 +0800 Subject: [PATCH 4/4] =?UTF-8?q?codecheck=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: cuican --- tools/syscap_collector.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/syscap_collector.py b/tools/syscap_collector.py index 8ddb630..0902dda 100644 --- a/tools/syscap_collector.py +++ b/tools/syscap_collector.py @@ -336,7 +336,6 @@ class LoadBuildConfig(object): bundle_part_obj = BundlePartObj(_build_file) _parts_config = bundle_part_obj.to_ohos_build() else: - _parts_config = read_build_file(_build_file) _subsystem_name = _parts_config.get('subsystem') if not is_thirdparty_subsystem and subsystem_name and _subsystem_name != subsystem_name: @@ -399,7 +398,7 @@ def get_parts_info(source_root_dir, subsystem_info, build_xts=False): def _read_json_file(input_file): if not os.path.exists(input_file): print("file '{}' doesn't exist.".format(input_file)) - return 0 + return {} try: with open(input_file, 'r') as input_f: data = json.load(input_f) -- Gitee