diff --git a/rom_ram_analyzer/README.md b/rom_ram_analyzer/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b12c6b648e2adc05349bfb63d526a7a824c8d670 --- /dev/null +++ b/rom_ram_analyzer/README.md @@ -0,0 +1,142 @@ +[toc] + +# Rom_analyzer.py + +## 功能介绍 + +基于BUILD.gn、bundle.json、编译产物system_module_info.json、out/{product_name}/packages/phone目录下的编译产物,分析各子系统及部件的rom大小。 + +结果以json与xls格式进行存储,其中,json格式是必输出的,xls格式需要-e参数控制。 + +## 使用说明 + +前置条件: + +1. 获取整个rom_ram_analyzer目录 +1. 对系统进行编译 +1. linux平台 +1. python3.8及以后 +1. 安装requirements + ```txt + xlwt==1.3.0 + ``` + +命令介绍: + +1. `-h`或`--help`命令查看帮助 + ```shell + > python3 rom_analyzer.py -h + usage: rom_analyzer.py [-h] [-v] -p PROJECT_PATH -j MODULE_INFO_JSON -n PRODUCT_NAME -d PRODUCT_DIR [-o OUTPUT_FILE] [-e EXCEL] + + analyze rom size of component. + + optional arguments: + -h, --help show this help message and exit + -v, -version show program\'s version number and exit + -p PROJECT_PATH, --project_path PROJECT_PATH + root path of openharmony. eg: -p ~/openharmony + -j MODULE_INFO_JSON, --module_info_json MODULE_INFO_JSON + path of out/{product_name}/packages/phone/system_module_info.json + -n PRODUCT_NAME, --product_name PRODUCT_NAME + product name. eg: -n rk3568 + -d PRODUCT_DIR, --product_dir PRODUCT_DIR + subdirectories of out/{product_name}/packages/phone to be counted.eg: -d system -d vendor + -o OUTPUT_FILE, --output_file OUTPUT_FILE + basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result + -e EXCEL, --excel EXCEL + if output result as excel, default: False. eg: -e True + ``` +1. 使用示例 + ```shell + python3 rom_analyzer.py -p ~/nomodify_oh/ -j ../system_module_info.json -n rk3568 -d system -d vendor -d updater -o demo/demo -e True + # oh:rootpath of openharmony + # rk3568: product_name, same as out/{product_name} + # demo/demo: path of output file, where the second 'demo' is the basename of output file + # -e True:output result in excel format additionally + ``` + +## 输出格式说明(json) + + +```json +{ + 子系统名: { + "size": 整个子系统输出文件的总大小, + "file_count": 整个子系统产生的文件数, + 输出文件名: 本文件的大小, + ... + }, + ... +} +``` + +# ram_analyzer.py + +## 功能介绍 + +基于out/{product_name}/packages/phone下所有cfg文件、out/{product_name}/packages/phone/system/profile下所有xml文件,分析各进程及对应部件的ram占用(默认取Pss) + +结果以json与xls格式存储,其中,json格式是必输出的,xls格式需要-e参数控制。 + +## 使用说明 + +前置条件: + +1. 获取整个rom_ram_analyzer目录 +2. hdc可用 +2. 设备已连接 +3. python3.8及以后 +4. 安装requirements + ```txt + xlwt==1.3.0 + ``` +5. 准备好相关数据: + 1. out/{product_name}/packages/phone下所有cfg文件,并将其放置于同一个目录中(ps:同名文件仅保存一份即可) + 1. out/{product_name}/packages/phone/system/profile下所有xml文件 +6. 运行rom_analyzer.py产生的json结果一份(即-o参数对应的文件,默认rom_analysis_result.json) + +命令介绍: + +1. 使用`-h`或`--help`查看帮助 + ```shell + > python .\ram_analyzer.py -h + usage: ram_analyzer.py [-h] [-v] -x XML_PATH -c CFG_PATH [-j ROM_RESULT] -n DEVICE_NUM [-o OUTPUT_FILENAME] [-e EXCEL] + + analyze ram size of component + + optional arguments: + -h, --help show this help message and exit + -v, -version show program\'s version number and exit + -x XML_PATH, --xml_path XML_PATH + path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile + -c CFG_PATH, --cfg_path CFG_PATH + path of cfg files. eg: -c ./cfgs/ + -j ROM_RESULT, --rom_result ROM_RESULT + json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json.eg: -j ./demo/rom_analysis_result.json + -n DEVICE_NUM, --device_num DEVICE_NUM + device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800 + -o OUTPUT_FILENAME, --output_filename OUTPUT_FILENAME + base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result + -e EXCEL, --excel EXCEL + if output result as excel, default: False. eg: -e True + ``` +2. 使用示例: + ```shell + python .\ram_analyzer.py -x .\profile\ -c .\init\ -n 7001005458323933328a01fce16d3800 -j .\rom_analysis_result.json -o /demo/demo -e True + # demo/demo: path of output file, where the second 'demo' is the basename of output file + # -e True:output result in excel format additionally + ``` +## 输出格式说明(json) +```json +{ + 进程名:{ + "size": 本进程占用内存的大小, + 部件名: { + elf文件名: elf文件大小 + ... + } + ... + }, + ... +} +``` \ No newline at end of file diff --git a/rom_ram_analyzer/__init__.py b/rom_ram_analyzer/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rom_ram_analyzer/packages/__init__.py b/rom_ram_analyzer/packages/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rom_ram_analyzer/packages/basic_tool.py b/rom_ram_analyzer/packages/basic_tool.py new file mode 100644 index 0000000000000000000000000000000000000000..18941d90270215cda4b4a0a6802bedb4078228c4 --- /dev/null +++ b/rom_ram_analyzer/packages/basic_tool.py @@ -0,0 +1,32 @@ +import sys +import typing +import os +from pathlib import Path +from typing import * + + +class BasicTool: + @classmethod + def find_all_files(cls, folder: str, real_path: bool = True, apply_abs: bool = True, de_duplicate: bool = True, + p_filter: typing.Callable = lambda x: True) -> list: + filepath_list = set() + for root, _, file_names in os.walk(folder): + filepath_list.update( + [os.path.abspath(os.path.realpath( + os.path.join(root, f) if real_path else os.path.join(root, f))) if apply_abs else os.path.relpath( + os.path.realpath(os.path.join(root, f) if real_path else os.path.join(root, f))) for f in file_names + if p_filter(os.path.join(root, f))]) + if de_duplicate: + filepath_list = set(filepath_list) + filepath_list = sorted(filepath_list, key=str.lower) + return filepath_list + + @classmethod + def get_abs_path(cls, path: str) -> str: + return os.path.abspath(os.path.expanduser(path)) + + +if __name__ == '__main__': + # print(BasicTool.get_abs_path("~/git/..")) + for i in BasicTool.find_all_files(".", apply_abs=False): + print(i) \ No newline at end of file diff --git a/rom_ram_analyzer/packages/gn_common_tool.py b/rom_ram_analyzer/packages/gn_common_tool.py new file mode 100644 index 0000000000000000000000000000000000000000..11c2718ffe53ec054c33223ae15bcd6e57890544 --- /dev/null +++ b/rom_ram_analyzer/packages/gn_common_tool.py @@ -0,0 +1,157 @@ +import os +import json + + +class GnCommonTool: + """ + 处理BUILD.gn文件的通用方法 + """ + + @classmethod + def is_gn_variable(cls, target: str, has_quote: bool = True): + """ + 判断target是否是gn中的变量: + 规则:如果是有引号的模式,则没有引号均认为是变量,有引号的情况下,如有是"$xxx"的模式,则认为xxx是变量;如果是无引号模式,则只要$开头就认为是变量 + b = "xxx" + c = b + c = "${b}" + "$p" + """ + target = target.strip() + if not has_quote: + return target.startswith("$") + if target.startswith('"') and target.endswith('"'): + target = target.strip('"') + if target.startswith("${") and target.endswith("}"): + return True + elif target.startswith("$"): + return True + return False + else: + return True + + # 给__find_variables_in_gn用的,减少io + __var_val_mem_dict = dict() + + @classmethod + def find_variables_in_gn(cls, var_name_tuple: tuple, path: str, stop_tail: str = "home") -> tuple: + """ + 同时查找多个gn变量的值 + var_name_tuple:变量名的tuple,变量名应是未经过处理后的,如: + xxx + "${xxx}" + "$xxx" + """ + + if os.path.isfile(path): + path = os.path.split(path)[0] + var_val_dict = dict() + not_found_count = len(var_name_tuple) + for var in var_name_tuple: + val = GnCommonTool.__var_val_mem_dict.get(var) + if val is not None: + not_found_count -= 1 + var_val_dict[var] = val + while not path.endswith(stop_tail) and not_found_count != 0: + for v in var_name_tuple: + cmd = r"grep -Ern '^( *){} *= *\".*?\"' --include=*.gn* {}| grep -Ev '\$' | head -n 1 | grep -E '\".*\"' -wo".format( + v.strip('"').lstrip("${").rstrip('}'), path) + output = os.popen(cmd).read().strip().strip('"') + if len(output) != 0: + not_found_count -= 1 + var_val_dict[v] = output + GnCommonTool.__var_val_mem_dict[v] = output + path = os.path.split(path)[0] + return tuple(var_val_dict.values()) + + @classmethod + def __find_part_subsystem_from_bundle(cls, gnpath: str, stop_tail: str = "home") -> tuple: + """ + 根据BUILD.gn的全路径,一层层往上面查找bundle.json文件, + 并从bundle.json中查找part_name和subsystem + """ + filename = "bundle.json" + part_name = None + subsystem_name = None + if stop_tail not in gnpath: + return part_name, subsystem_name + if os.path.isfile(gnpath): + gnpath = os.path.split(gnpath)[0] + while not gnpath.endswith(stop_tail): + bundle_path = os.path.join(gnpath, filename) + if not os.path.isfile(bundle_path): # 如果该文件不在该目录下 + gnpath = os.path.split(gnpath)[0] + continue + with open(bundle_path, 'r', encoding='utf-8') as f: + content = json.load(f) + try: + part_name = content["component"]["name"] + subsystem_name = content["component"]["subsystem"] + except KeyError: + ... + finally: + break + part_name = None if (part_name is not None and len( + part_name) == 0) else part_name + subsystem_name = None if (subsystem_name is not None and len( + subsystem_name) == 0) else subsystem_name + return part_name, subsystem_name + + @classmethod + def find_part_subsystem(cls, gn_file: str, project_path: str) -> tuple: + """ + 查找gn_file对应的part_name和subsystem + 如果在gn中找不到,就到bundle.json中去找 + """ + part_name = None + subsystem_name = None + part_var_flag = False # 标识这个变量从gn中取出的原始值是不是变量 + subsystem_var_flag = False + var_list = list() + part_name_pattern = r"part_name *=\s*\S*" + subsystem_pattern = r"subsystem_name *=\s*\S*" + meta_grep_pattern = "grep -E '{}' {} | head -n 1" + part_cmd = meta_grep_pattern.format(part_name_pattern, gn_file) + subsystem_cmd = meta_grep_pattern.format(subsystem_pattern, gn_file) + part = os.popen(part_cmd).read().strip() + if len(part) != 0: + part = part.split('=')[-1].strip() + if GnCommonTool.is_gn_variable(part): + part_var_flag = True + var_list.append(part) + else: + part_name = part.strip('"') + if len(part_name) == 0: + part_name = None + subsystem = os.popen(subsystem_cmd).read().strip() + if len(subsystem) != 0: # 这里是只是看有没有grep到关键字 + subsystem = subsystem.split('=')[-1].strip() + if GnCommonTool.is_gn_variable(subsystem): + subsystem_var_flag = True + var_list.append(subsystem) + else: + subsystem_name = subsystem.strip('"') + if len(subsystem_name) == 0: + subsystem_name = None + if part_var_flag and subsystem_var_flag: + part_name, subsystem_name = GnCommonTool.find_variables_in_gn( + tuple(var_list), gn_file, project_path) + elif part_var_flag: + t = GnCommonTool.find_variables_in_gn( + tuple(var_list), gn_file, project_path)[0] + part_name = t if t is not None and len(t) != 0 else part_name + elif subsystem_var_flag: + t = GnCommonTool.find_variables_in_gn( + tuple(var_list), gn_file, project_path)[0] + subsystem_name = t if t is not None and len( + t) != 0 else subsystem_name + if part_name is not None and subsystem_name is not None: + return part_name, subsystem_name + # 如果有一个没有找到,就要一层层去找bundle.json文件 + t_part_name, t_subsystem_name = cls.__find_part_subsystem_from_bundle( + gn_file, stop_tail=project_path) + if t_part_name is not None: + part_name = t_part_name + if t_subsystem_name is not None: + subsystem_name = t_subsystem_name + return part_name, subsystem_name diff --git a/rom_ram_analyzer/packages/simple_excel_writer.py b/rom_ram_analyzer/packages/simple_excel_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..a1f2aa67e0e3a5257b66b93cacf1c13e745294a0 --- /dev/null +++ b/rom_ram_analyzer/packages/simple_excel_writer.py @@ -0,0 +1,118 @@ +import xlwt +from xlwt import Worksheet +import typing +from typing import Optional +from collections.abc import Iterable + + +class SimpleExcelWriter: + def __init__(self, default_sheet_name: str = "sheet1"): + self.__book = xlwt.Workbook(encoding='utf-8', style_compression=0) + self.__sheet_dict = { + default_sheet_name: self.__book.add_sheet( + sheetname=default_sheet_name, cell_overwrite_ok=True) + } + self.__sheet_pos = { + default_sheet_name: (0, 0) # 记录各个sheet页已经写到什么位置了,当前值为还没有写的 + } + self.__default_sheet_name = default_sheet_name + # 表头样式 + self.__head_style = xlwt.XFStyle() + # 内容样式 + self.__content_style = xlwt.XFStyle() + # 字体 + font = xlwt.Font() + font.bold = True + + # 居中对齐 + alignment = xlwt.Alignment() + alignment.horz = xlwt.Alignment.HORZ_CENTER # 水平方向 + alignment.vert = xlwt.Alignment.VERT_CENTER # 垂直方向 + + # 设置背景颜色 + pattern = xlwt.Pattern() + pattern.pattern = xlwt.Pattern.SOLID_PATTERN + pattern.pattern_fore_colour = 22 # 背景颜色 + + self.__head_style.font = font + self.__head_style.alignment = alignment + self.__head_style.pattern = pattern + self.__content_style.alignment = alignment + + def __increment_y(self, sheet_name: str, value: int = 1) -> int: + if sheet_name in self.__sheet_pos.keys(): + x, y = self.__sheet_pos.get(sheet_name) + y = y + value + self.__sheet_pos[sheet_name] = (x, y) + return y + + def __increment_x(self, sheet_name: str, value: int = 1) -> int: + if sheet_name in self.__sheet_pos.keys(): + x, y = self.__sheet_pos.get(sheet_name) + x = x + value + self.__sheet_pos[sheet_name] = (x, 0) + return x + + def append_line(self, content: list, sheet_name: str = None): + sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name + if sheet_name not in self.__sheet_dict.keys(): + print("error: sheet name '{}' not exist".format(sheet_name)) + return + sheet: Worksheet = self.__sheet_dict.get(sheet_name) + x, y = self.__sheet_pos.get(sheet_name) + for ele in content: + sheet.write(x, y, ele, style=self.__content_style) + y = self.__increment_y(sheet_name) + self.__increment_x(sheet_name) + + def write_merge(self, x0: int, y0: int, x1: int, y1: int, content: typing.Any, + sheet_name: str = None): + sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name + if sheet_name not in self.__sheet_dict.keys(): + print("error: sheet name '{}' not exist".format(sheet_name)) + return + sheet: Worksheet = self.__sheet_dict.get(sheet_name) + sheet.write_merge(x0, x1, y0, y1, content, style=self.__content_style) + + def set_sheet_header(self, headers: Iterable, sheet_name: str = None): + """ + 给sheet页设置表头 + """ + sheet_name = self.__default_sheet_name if sheet_name is None else sheet_name + if sheet_name not in self.__sheet_dict.keys(): + print("error: sheet name '{}' not exist".format(sheet_name)) + return + x, y = self.__sheet_pos.get(sheet_name) + if x != 0 or y != 0: + print( + "error: pos of sheet '{}' is not (0,0). set_sheet_header must before write".format(sheet_name)) + return + sheet: Worksheet = self.__sheet_dict.get(sheet_name) + for h in headers: + sheet.write(x, y, h, self.__head_style) + y = self.__increment_y(sheet_name) + self.__increment_x(sheet_name) + + def add_sheet(self, sheet_name: str, cell_overwrite_ok=True) -> Optional[xlwt.Worksheet]: + if sheet_name in self.__sheet_dict.keys(): + print("error: sheet name '{}' has exist".format(sheet_name)) + return + self.__sheet_dict[sheet_name] = self.__book.add_sheet( + sheetname=sheet_name, cell_overwrite_ok=cell_overwrite_ok) + self.__sheet_pos[sheet_name] = (0, 0) + return self.__sheet_dict.get(sheet_name) + + def save(self, file_name: str): + self.__book.save(file_name) + + +if __name__ == '__main__': + writer = SimpleExcelWriter(default_sheet_name="first") + writer.add_sheet("second") + writer.add_sheet("third") + writer.set_sheet_header(["h", "m", "n"]) + writer.append_line([1, 2, 3]) + writer.append_line([2, 3, 4], "second") + writer.append_line([3, 4, 5], "third") + writer.append_line([3, 2, 1]) + writer.save("demo.xls") diff --git a/rom_ram_analyzer/ram_analyzer.py b/rom_ram_analyzer/ram_analyzer.py new file mode 100644 index 0000000000000000000000000000000000000000..03030ab47b4f36f42ff7878bdd1d2bfd34c7710f --- /dev/null +++ b/rom_ram_analyzer/ram_analyzer.py @@ -0,0 +1,427 @@ +import argparse +import copy +import glob +import json +import os +import re +import sys +import subprocess +import typing +import xml.dom.minidom as dom + +from packages.simple_excel_writer import SimpleExcelWriter + +debug = True if sys.gettrace() else False + + +class HDCTool: + @classmethod + def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool: + """ + 验证hdc是否可用 + True:可用 + False:不可用 + """ + cp = subprocess.run(["hdc"], capture_output=True) + stdout = str(cp.stdout) + stderr = str(cp.stderr) + return verify_str in stdout or verify_str in stderr + + @classmethod + def verify_device(cls, device_num: str) -> bool: + """ + 验证设备是否已经连接 + True:已连接 + False:未连接 + """ + cp = subprocess.run(["hdc", "list", "targets"], capture_output=True) + stdout = str(cp.stdout) + stderr = str(cp.stderr) + return device_num in stderr or device_num in stdout + + __MODE = typing.Literal["stdout", "stderr"] + + @classmethod + def exec(cls, args: list, output_from: __MODE = "stdout"): + cp = subprocess.run(args, capture_output=True) + if output_from == "stdout": + return cp.stdout.decode() + elif output_from == "stderr": + return cp.stderr.decode() + else: + print("error: 'output_from' must be stdout or stdin") + + +def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable): + for k in key_list: + if k not in target_dict.keys(): + continue + del target_dict[k] + + +class RamAnalyzer: + @classmethod + def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]: + """ + 将hidumper的拥有的数据行进行分割,得到 + [pid, name, pss, vss, rss, uss]格式的list + """ + trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)") + content = re.sub(trival_pattern, "", content) + blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)") + return re.sub(blank_pattern, ' ', content.strip()).split() + + __SS_Mode = typing.Literal["Pss", "Vss", "Rss", "Uss"] # 提示输入 + __ss_dict: typing.Dict[str, int] = { + "Pss": 2, + "Vss": 3, + "Rss": 4, + "Uss": 5 + } + + @classmethod + def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: __SS_Mode = "Pss") -> typing.Dict[ + typing.Text, int]: + """ + 解析:hidumper --meme的结果 + 返回{process_name: pss}形式的字典 + '248 samgr 1464(0 in SwapPss) kB 15064 kB 6928 kB 1072 kB\r' + """ + + def find_full_process_name(hname: str) -> str: + for lname in __process_name_list: + if lname.startswith(hname): + return lname + + def process_ps_ef(content: str) -> list: + line_list = content.strip().split("\n")[1:] + process_name_list = list() + for line in line_list: + process_name = line.split()[7] + if process_name.startswith('['): + # 内核进程 + continue + process_name_list.append(process_name) + return process_name_list + + if ss not in cls.__ss_dict.keys(): + print("error: {} is not a valid parameter".format(ss)) + return dict() + output = content.split('\n') + process_pss_dict = dict() + __process_name_list: typing.List[str] = process_ps_ef( + HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"])) + for line in output: + if "Total Memory Usage by Size" in line: + break + if line.isspace(): + continue + processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(line) + if not processed[0].isnumeric(): # 如果第一列不是数字(pid),就过 + continue + name = processed[1] # 否则的话就取名字,和对应的size + size = int(processed[cls.__ss_dict.get(ss)]) + process_pss_dict[find_full_process_name(name)] = size + return process_pss_dict + + @classmethod + def process_hidumper_info(cls, device_num: str, ss: __SS_Mode) -> typing.Dict[str, int]: + """ + 处理进程名与对应进程大小 + """ + + def exec_once() -> typing.Dict[str, int]: + stdout = HDCTool.exec(["hdc", "-t", device_num, "shell", "hidumper", "--mem"]) + name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss) + return name_size_dict + + if not HDCTool.verify_hdc(): + print("error: Command 'hdc' not found") + return dict() + if not HDCTool.verify_device(device_num): + print("error: {} is inaccessible or not found".format(device_num)) + return dict() + + return exec_once() + + @classmethod + def __parse_process_xml(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]): + """ + 解析xml文件,结存存入 result_dict中,格式:{process_name: os_list} + 其中,so_list中是so的base_name + """ + if not (os.path.isfile(file_path) and file_path.endswith(".xml")): + print("warning: {} not exist or not a xml file".format(file_path)) + return + doc = dom.parse(file_path) + info = doc.getElementsByTagName("info")[0] + process = info.getElementsByTagName("process")[0] + process_name = process.childNodes[0].data + result_dict[process_name] = list() + libs = info.getElementsByTagName("loadlibs")[0].getElementsByTagName("libpath") + for lib in libs: + so = lib.childNodes[0].data + result_dict.get(process_name).append(os.path.split(so)[-1]) + if debug: + print(process_name, " ", so) + + @classmethod + def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]: + """ + 利用rom_analyzer.py的分析结果,重组成 + {file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}} + 的形式 + """ + with open(rom_result_json, 'r', encoding='utf-8') as f: + rom_info_dict = json.load(f) + elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict() + for subsystem_name in rom_info_dict.keys(): + sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(subsystem_name) + delete_values_from_dict(sub_val_dict, ["size", "file_count"]) + for component_name in sub_val_dict.keys(): + component_val_dict: typing.Dict[str, str] = sub_val_dict.get(component_name) + delete_values_from_dict(component_val_dict, ["size", "file_count"]) + for file_name, size in component_val_dict.items(): + file_basename: str = os.path.split(file_name)[-1] + elf_info_dict[file_basename] = { + "subsystem_name": subsystem_name, + "component_name": component_name, + "size": size + } + return elf_info_dict + + @classmethod + def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict): + """ + 解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml + """ + with open(cfg_path, 'r', encoding='utf-8') as f: + cfg_dict = json.loads(f.read()) + services = cfg_dict.get("services") + if services is None: + print("warning: 'services' not in {}".format(cfg_path)) + return + for service in services: + process_name = service.get("name") + first, *path_list = service.get("path") + if first.endswith("sa_main"): + # 由sa_main去来起进程 + xml_base_name = os.path.split(path_list[0])[-1] + cls.__parse_process_xml(os.path.join(profile_path, xml_base_name), result_dict) + else: + # 直接执行 + # process_name = os.path.split(first)[-1] + if result_dict.get(process_name) is None: + result_dict[process_name] = list() + result_dict.get(process_name).append(os.path.split(first)[-1]) + + @classmethod + def get_process_so_relationship(cls, xml_path: str, cfg_path: str, profile_path: str) -> typing.Dict[ + str, typing.List[str]]: + """ + 从out/{product_name}/packages/phone/sa_profile/merged_sa查找xml文件并处理得到进程与so的对应关系 + """ + # xml_path = os.path.join(product_path, "packages", "phone", "sa_profile", "merged_sa") + # 从merged_sa里面收集 + xml_list = glob.glob(xml_path + os.sep + "*[.]xml", recursive=True) + process_elf_dict: typing.Dict[str, typing.List[str]] = dict() + for xml in xml_list: + if debug: + print("parsing: ", xml) + try: + cls.__parse_process_xml(xml, process_elf_dict) + except: + print("parse '{}' failed".format(xml)) + finally: + ... + # 从system/etc/init/*.cfg中收集,如果是sa_main拉起的,则从system/profile/*.xml中进行解析 + cfg_list = glob.glob(cfg_path + os.sep + "*[.]cfg", recursive=True) + for cfg in cfg_list: + if debug: + print("parsing: ", cfg) + try: + cls.__parse_process_cfg(cfg, profile_path, process_elf_dict) + except: + print("parse '{}' failed".format(cfg)) + finally: + ... + return process_elf_dict + + @classmethod + def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: __SS_Mode): + """ + 保存结果到excel中 + """ + tmp_dict = copy.deepcopy(data_dict) + writer = SimpleExcelWriter("ram_info") + writer.set_sheet_header( + ["process_name", "process_size({}, KB)".format(ss), "component_name", "elf_name", "elf_size(KB)"]) + process_start_r = 1 + process_end_r = 0 + process_c = 0 + process_size_c = 1 + component_start_r = 1 + component_end_r = 0 + component_c = 2 + for process_name in tmp_dict.keys(): + process_val_dict: typing.Dict[str, typing.Dict[str, int]] = tmp_dict.get(process_name) + process_size = process_val_dict.get("size") + delete_values_from_dict(process_val_dict, ["size"]) + for component_name, component_val_dict in process_val_dict.items(): + elf_count_of_component = len(component_val_dict) + for elf_name, size in component_val_dict.items(): + writer.append_line([process_name, process_size, component_name, elf_name, "%.2f" % (size / 1024)]) + component_end_r += elf_count_of_component + # 重写component + writer.write_merge(component_start_r, component_c, component_end_r, + component_c, component_name) + component_start_r = component_end_r + 1 + process_end_r += elf_count_of_component + writer.write_merge(process_start_r, process_c, process_end_r, process_c, process_name) + writer.write_merge(process_start_r, process_size_c, process_end_r, process_size_c, process_size) + process_start_r = process_end_r + 1 + writer.save(filename) + + @classmethod + def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str, + evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \ + typing.Tuple[ + bool, str, str, int]: + """ + 全局查找进程的相关elf文件 + subsystem_name与component_name可明确指定,或为*以遍历整个dict + evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了 + returns: 是否查找到,elf文件名,部件名,size + """ + subsystem_name_list = [subsystem_name] if subsystem_name != "*" else rom_result_dict.keys() + for sn in subsystem_name_list: + sub_val_dict = rom_result_dict.get(sn) + component_name_list = [component_name] if component_name != '*' else sub_val_dict.keys() + for cn in component_name_list: + if cn == "size" or cn == "file_count": + continue + component_val_dict: typing.Dict[str, int] = sub_val_dict.get(cn) + for k, v in component_val_dict.items(): + if k == "size" or k == "file_count": + continue + if not evaluator(service_name, k): + continue + return True, os.path.split(k)[-1], cn, v + return False, str(), str(), int() + + @classmethod + def analysis(cls, cfg_path: str, xml_path: str, rom_result_json: str, device_num: str, + output_file: str, ss: __SS_Mode, output_excel: bool): + """ + process size subsystem/component so so_size + """ + if not HDCTool.verify_hdc(): + print("error: Command 'hdc' not found") + return + if not HDCTool.verify_device(device_num): + print("error: {} is inaccessible or not found".format(device_num)) + return + with open(rom_result_json, 'r', encoding='utf-8') as f: + rom_result_dict: typing.Dict = json.loads(f.read()) + # 从rom的分析结果中将需要的elf信息重组 + so_info_dict: typing.Dict[ + str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result( + rom_result_json) + process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(xml_path, cfg_path, + profile_path) + process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(device_num, ss) + result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict() + + def get(key: typing.Any, dt: typing.Dict[str, typing.Any]): + for k, v in dt.items(): + if k.startswith(key) or key == v[0]: + # 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到 + return v + + for process_name, process_size in process_size_dict.items(): # 从进程出发 + if process_name == "init": + _, bin, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init", + lambda x, y: os.path.split(y)[ + -1].lower() == x.lower(), + rom_result_dict) + result_dict[process_name] = dict() + result_dict[process_name]["size"] = process_size + result_dict[process_name]["init"] = dict() + result_dict[process_name]["init"][bin if len(bin) != 0 else "UNKOWN"] = size + continue + # 如果是hap,特殊处理 + if (process_name.startswith("com.") or process_name.startswith("ohos.")): + _, hap_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*", "*", + lambda x, y: len( + y.split( + '/')) >= 3 and x.lower().startswith( + y.split('/')[2].lower()), + rom_result_dict) + result_dict[process_name] = dict() + result_dict[process_name]["size"] = process_size + result_dict[process_name][component_name] = dict() + result_dict[process_name][component_name][hap_name if len(hap_name) != 0 else "UNKOWN"] = size + continue + so_list: list = get(process_name, process_elf_dict) # 得到进程相关的elf文件list + if so_list is None: + print("warning: process '{}' not found in .xml or .cfg".format(process_name)) + result_dict[process_name] = dict() + result_dict[process_name]["size"] = process_size + result_dict[process_name]["UNKOWN"] = dict() + result_dict[process_name]["UNKOWN"]["UNKOWN"] = int() + continue + result_dict[process_name] = dict() + result_dict[process_name]["size"] = process_size + for so in so_list: + unit = so_info_dict.get(so) + if unit is None: + print("warning: '{}' in {} not found in json from rom_analysis".format(so, process_name)) + continue + component_name = unit.get("component_name") + so_size = unit.get("size") + if result_dict.get(process_name).get(component_name) is None: + result_dict[process_name][component_name] = dict() + result_dict[process_name][component_name][so] = so_size + base_dir, _ = os.path.split(output_file) + if len(base_dir) != 0 and not os.path.isdir(base_dir): + os.makedirs(base_dir, exist_ok=True) + with open(output_file + ".json", 'w', encoding='utf-8') as f: + f.write(json.dumps(result_dict, indent=4)) + if output_excel: + cls.__save_result_as_excel(result_dict, output_file + ".xls", ss) + + +def get_args(): + VERSION = 1.0 + parser = argparse.ArgumentParser( + description="analyze ram size of component" + ) + parser.add_argument("-v", "-version", action="version", + version=f"version {VERSION}") + parser.add_argument("-x", "--xml_path", type=str, required=True, + help="path of xml file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile") + parser.add_argument("-c", "--cfg_path", type=str, required=True, + help="path of cfg files. eg: -c ./cfgs/") + parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json", + help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json." + "eg: -j ./demo/rom_analysis_result.json") + parser.add_argument("-n", "--device_num", type=str, required=True, + help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800") + parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str, + help="base name of output file, default: rom_analysis_result. eg: -o ram_analysis_result") + parser.add_argument("-e", "--excel", type=bool, default=False, + help="if output result as excel, default: False. eg: -e True") + args = parser.parse_args() + return args + + +if __name__ == '__main__': + args = get_args() + cfg_path = args.cfg_path + profile_path = args.xml_path + rom_result = args.rom_result + device_num = args.device_num + output_filename = args.output_filename + output_excel = args.excel + RamAnalyzer.analysis(cfg_path, profile_path, rom_result, + device_num=device_num, output_file=output_filename, ss="Pss", output_excel=output_excel) diff --git a/rom_ram_analyzer/rom_analyzer.py b/rom_ram_analyzer/rom_analyzer.py new file mode 100644 index 0000000000000000000000000000000000000000..1428471d217f30e82fc250030af8c13579a7072b --- /dev/null +++ b/rom_ram_analyzer/rom_analyzer.py @@ -0,0 +1,184 @@ +import argparse +import json +import os +import sys +import typing +from copy import deepcopy +from typing import * + +from packages.basic_tool import BasicTool +from packages.gn_common_tool import GnCommonTool +from packages.simple_excel_writer import SimpleExcelWriter + +debug = bool(sys.gettrace()) + + +class RomAnalyzer: + @classmethod + def __collect_product_info(cls, system_module_info_json: Text, + project_path: Text) -> Dict[Text, Dict[Text, Text]]: + """ + 根据system_module_info.json生成target字典 + """ + with open(system_module_info_json, 'r', encoding='utf-8') as f: + product_list = json.loads(f.read()) + project_path = BasicTool.get_abs_path(project_path) + product_info_dict: Dict[Text, Dict[Text, Text]] = dict() + for unit in product_list: + dest: List = unit.get("dest") + if dest is None: + print("warning: keyword 'dest' not found in {}".format(system_module_info_json)) + continue + label: Text = unit.get("label") + gn_path = component_name = subsystem_name = None + if label: + gn_path = os.path.join(project_path, label.split(':')[0].lstrip('/'), "BUILD.gn") + component_name, subsystem_name = GnCommonTool.find_part_subsystem(gn_path, project_path) + else: + print("warning: keyword 'label' not found in {}".format(unit)) + for target in dest: + product_info_dict[target] = { + "component_name": component_name, + "subsystem_name": subsystem_name, + "gn_path": gn_path, + } + return product_info_dict + + @classmethod + def __save_result_as_excel(cls, result_dict: dict, output_name: str): + header = ["subsystem_name", "component_name", "output_file", "size(Byte)"] + tmp_dict = deepcopy(result_dict) + excel_writer = SimpleExcelWriter("rom") + excel_writer.set_sheet_header(headers=header) + subsystem_start_row = 1 + subsystem_end_row = 0 + subsystem_col = 0 + component_start_row = 1 + component_end_row = 0 + component_col = 1 + + for subsystem_name in tmp_dict.keys(): + subsystem_dict = tmp_dict.get(subsystem_name) + subsystem_size = subsystem_dict.get("size") + subsystem_file_count = subsystem_dict.get("file_count") + del subsystem_dict["file_count"] + del subsystem_dict["size"] + subsystem_end_row += subsystem_file_count + + for component_name in subsystem_dict.keys(): + component_dict: Dict[str, int] = subsystem_dict.get(component_name) + component_size = component_dict.get("size") + component_file_count = component_dict.get("file_count") + del component_dict["file_count"] + del component_dict["size"] + component_end_row += component_file_count + + for file_name, size in component_dict.items(): + excel_writer.append_line( + [subsystem_name, component_name, file_name, size]) + excel_writer.write_merge(component_start_row, component_col, component_end_row, component_col, + component_name) + component_start_row = component_end_row + 1 + excel_writer.write_merge(subsystem_start_row, subsystem_col, subsystem_end_row, subsystem_col, + subsystem_name) + subsystem_start_row = subsystem_end_row + 1 + excel_writer.save(output_name + ".xls") + + @classmethod + def __put(cls, unit: typing.Dict[Text, Any], result_dict: typing.Dict[Text, Dict]): + """ + { + subsystem_name:{ + component_name: { + file_name: file_size + } + } + } + """ + component_name = "others" if unit.get("component_name") is None else unit.get("component_name") + subsystem_name = "others" if unit.get("subsystem_name") is None else unit.get("subsystem_name") + size = unit.get("size") + relative_filepath = unit.get("relative_filepath") + if result_dict.get(subsystem_name) is None: + result_dict[subsystem_name] = dict() + result_dict[subsystem_name]["size"] = 0 + result_dict[subsystem_name]["file_count"] = 0 + + if result_dict.get(subsystem_name).get(component_name) is None: + result_dict[subsystem_name][component_name] = dict() + result_dict[subsystem_name][component_name]["size"] = 0 + result_dict[subsystem_name][component_name]["file_count"] = 0 + result_dict[subsystem_name]["size"] += size + result_dict[subsystem_name]["file_count"] += 1 + result_dict[subsystem_name][component_name]["size"] += size + result_dict[subsystem_name][component_name]["file_count"] += 1 + result_dict[subsystem_name][component_name][relative_filepath] = size + + @classmethod + def analysis(cls, system_module_info_json: Text, product_dirs: List[str], + project_path: Text, product_name: Text, output_file: Text, output_execel: bool): + """ + system_module_info_json: json文件 + product_dirs:要处理的产物的路径列表如["vendor", "system/"] + project_path: 项目根路径 + product_name: eg,rk3568 + output_file: basename of output file + """ + project_path = BasicTool.get_abs_path(project_path) + phone_dir = os.path.join(project_path, "out", product_name, "packages", "phone") + product_dirs = [os.path.join(phone_dir, d) for d in product_dirs] + product_info_dict = cls.__collect_product_info(system_module_info_json, project_path) # 所有产物信息 + result_dict: Dict[Text:Dict] = dict() + for d in product_dirs: + file_list: List[Text] = BasicTool.find_all_files(d) + for f in file_list: + size = os.path.getsize(f) + relative_filepath = f.replace(phone_dir, "").lstrip(os.sep) + unit: Dict[Text, Any] = product_info_dict.get(relative_filepath) + if unit is None: + unit = { + "relative_filepath": relative_filepath, + } + unit["size"] = size + unit["relative_filepath"] = relative_filepath + cls.__put(unit, result_dict) + output_dir, _ = os.path.split(output_file) + if len(output_dir) != 0: + os.makedirs(output_dir, exist_ok=True) + with open(output_file + ".json", 'w', encoding='utf-8') as f: + f.write(json.dumps(result_dict, indent=4)) + if output_execel: + cls.__save_result_as_excel(result_dict, output_file) + + +def get_args(): + VERSION = 2.0 + parser = argparse.ArgumentParser( + description=f"analyze rom size of component.\n") + parser.add_argument("-v", "-version", action="version", + version=f"version {VERSION}") + parser.add_argument("-p", "--project_path", type=str, required=True, + help="root path of openharmony. eg: -p ~/openharmony") + parser.add_argument("-j", "--module_info_json", required=True, type=str, + help="path of out/{product_name}/packages/phone/system_module_info.json") + parser.add_argument("-n", "--product_name", required=True, type=str, help="product name. eg: -n rk3568") + parser.add_argument("-d", "--product_dir", required=True, action="append", + help="subdirectories of out/{product_name}/packages/phone to be counted." + "eg: -d system -d vendor") + parser.add_argument("-o", "--output_file", type=str, default="rom_analysis_result", + help="basename of output file, default: rom_analysis_result. eg: demo/rom_analysis_result") + parser.add_argument("-e", "--excel", type=bool, default=False, + help="if output result as excel, default: False. eg: -e True") + args = parser.parse_args() + return args + + +if __name__ == '__main__': + args = get_args() + module_info_json = args.module_info_json + project_path = args.project_path + product_name = args.product_name + product_dirs = args.product_dir + output_file = args.output_file + output_excel = args.excel + RomAnalyzer.analysis(module_info_json, product_dirs, project_path, product_name, output_file, output_excel)