From bb38e234f1935878fb237b12e19882c6d3a94e44 Mon Sep 17 00:00:00 2001 From: c00570162 Date: Fri, 16 May 2025 14:27:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D=E5=BE=AE=E6=9E=B6=E6=9E=84?= =?UTF-8?q?=E9=87=87=E9=9B=86=EF=BC=8C=E6=89=93=E9=80=9A=E7=AB=AF=E5=88=B0?= =?UTF-8?q?=E7=AB=AFEulerCopilot=E8=B0=83=E4=BC=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../micro_dep_analyzer.py | 95 +++++ .../performance_analyzer.py | 4 + src/performance_collector/base_collector2.py | 113 ++++++ .../micro_dep_collector.py | 342 ++++++++++++++++++ src/performance_optimizer/param_optimizer.py | 7 +- src/testmain.py | 96 +++-- 6 files changed, 631 insertions(+), 26 deletions(-) create mode 100644 src/performance_analyzer/micro_dep_analyzer.py create mode 100644 src/performance_collector/base_collector2.py create mode 100644 src/performance_collector/micro_dep_collector.py diff --git a/src/performance_analyzer/micro_dep_analyzer.py b/src/performance_analyzer/micro_dep_analyzer.py new file mode 100644 index 00000000..a4619d53 --- /dev/null +++ b/src/performance_analyzer/micro_dep_analyzer.py @@ -0,0 +1,95 @@ +from .base_analyzer import BaseAnalyzer +import logging +class MicroDepAnalyzer(BaseAnalyzer): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.prompt_dict = { + "frontend_bound": "TopDown中的前端瓶颈(frontend bound)", + "bad_spec": "TopDown中的预测失败瓶颈(bad speculation)", + "retiring": "TopDown中的指令完成(retiring)", + "backend_bound": "TopDown中的后端瓶颈(backend bound)", + "frontend_latency_bound": "TopDown中的前端瓶颈下的前端延时瓶颈(frontend latency bound)", + "frontend_bandwidth_bound": "TopDown中的前端瓶颈下的前端带宽瓶颈(frontend bandwidth bound)", + "bs_mispred": "TopDown中的预测失败瓶颈中的分支预测失败瓶颈(bad speculation branch misprediction)", + "bs_mclear": "TopDown中的预测失败瓶颈中的流水线清空瓶颈(bad speculation machine clears)", + "core_bound": "TopDown中的后端瓶颈中的后端执行瓶颈(core bound)", + "mem_bound": "TopDown中的后端瓶颈中的后端内存子系统瓶颈(memory bound)", + "core_fsu_bound": "TopDown中的后端执行瓶颈中的浮点/向量计算瓶颈(core fsu bound)", + "core_other_bound": "TopDown中的后端执行瓶颈中的后端其他执行瓶颈(core other bound)", + "mem_l1_bound": "TopDown中的后端内存子系统瓶颈中的读取L1 cache造成的指令执行瓶颈(不包含L2/L3)", + "mem_l2_bound": "TopDown中的后端内存子系统瓶颈中的读取L2 cache造成的指令执行瓶颈(不包含L1/L3)", + "mem_l3_dram_bound": "TopDown中的后端内存子系统瓶颈中的读取L3以及内存造成的指令执行瓶颈(不包含L1/L2)", + "mem_store_bound": "TopDown中的后端内存子系统瓶颈中的内存写瓶颈(memory store bound)", + "context_switches": "上下文切换次数(context-switches)", + "cpu_migrations": "进程在不同CPU核之间的迁移次数(cpu-migrations)", + "page_faults": "缺页异常次数(page-faults)", + "l1i_missrate": "L1指令miss rate", + "l1d_missrate": "L1数据miss rate", + "l2i_missrate": "L2指令miss rate", + "l2d_missrate": "L2数据miss rate", + "l1i_mpki": "L1指令每千条指令中miss次数", + "l1d_mpki": "L1数据每千条指令中miss次数", + "l2i_mpki": "L2指令每千条指令中miss次数", + "l2d_mpki": "L2数据每千条指令中miss次数", + "branch_missrate": "分支预测失败率(branch missrate)", + "alu_isq_stall": "算术逻辑单元全部被占用导致的执行瓶颈", + "lsu_isq_stall": "访存逻辑单元全部被占用导致的执行瓶颈", + "fsu_isq_stall": "浮点单元全部被占用导致的执行瓶颈", + "l1i_tlb_missrate": "L1指令快表miss rate(l1i_tlb_missrate)", + "l1d_tlb_missrate": "L1数据快表miss rate(l1d_tlb_missrate)", + "l2i_tlb_missrate": "L2指令快表miss rate(l2i_tlb_missrate)", + "l2d_tlb_missrate": "L2数据快表miss rate(l2d_tlb_missrate)", + "itlb_walk_rate": "指令页表缓存未命中时触发页表遍历的频率(itlb_walk_rate)", + "dtlb_walk_rate": "数据页表缓存未命中时触发页表遍历的频率(dtlb_walk_rate)", + "l1i_tlb_mpki": "L1指令TLB每千条指令中miss次数", + "l1d_tlb_mpki": "L1数据TLB每千条指令中miss次数", + "l2i_tlb_mpki": "L2指令TLB每千条指令中miss次数", + "l2d_tlb_mpki": "L2数据TLB每千条指令中miss次数", + "itlb_walk_mpki": "指令TLB每千条指令中到页表查找次数", + "dtlb_walk_mpki": "指令TLB每千条指令中到页表查找次数", + "div_stall": "除法指令在关键路径导致的执行瓶颈", + } + def analyze(self) -> str: + report = "基于采集的系统指标, 微架构初步的性能分析报告如下: \n" + processed_data_dict = {} + for k, v in self.data.items(): + if k in self.prompt_dict.keys(): + processed_data_dict[self.prompt_dict[k]] = v + else: + logging.warning("Cannot find prompt for item {k}") + report += f"系统微架构状态是{processed_data_dict}\n" + return report + + def generate_report( + self, + micro_report: str + ) -> str: + # TO DO + # 要有一个报告模板,指明包含哪些信息,以及报告格式 + report_prompt = f""" + 以下内容是linux系统中应用微架构相关的性能信息: + {micro_report} + 信息中所涉及到的数据准确无误,真实可信。 + + # OBJECTIVE # + 请根据上述信息,分析系统应用微架构的性能状况。 + 要求: + 1.答案中不要包含任何优化建议。 + 2.答案中尽可能保留信息中真实有效的数据。 + 3.不要遗漏任何值得分析的信息。 + + # STYLE # + 你是一个专业的系统运维专家,你的回答应该逻辑严谨、表述客观、简洁易懂、条理清晰,让你的回答真实可信 + + # Tone # + 你应该尽可能秉承严肃、认真、严谨的态度 + + # AUDIENCE # + 你的答案将会是其他系统运维专家的重要参考意见,请尽可能提供真实有用的信息,不要胡编乱造。 + + # RESPONSE FORMAT # + 回答以"应用微架构分析如下:"开头,然后另起一行逐条分析。 + 如果有多条分析结论,请用数字编号分点作答。 + + """ + return self.ask_llm(report_prompt) + "\n" diff --git a/src/performance_analyzer/performance_analyzer.py b/src/performance_analyzer/performance_analyzer.py index 7d7f8f56..738dbcb7 100644 --- a/src/performance_analyzer/performance_analyzer.py +++ b/src/performance_analyzer/performance_analyzer.py @@ -3,6 +3,7 @@ from .disk_analyzer import DiskAnalyzer from .memory_analyzer import MemoryAnalyzer from .network_analyzer import NetworkAnalyzer from .mysql_analyzer import MysqlAnalyzer +from .micro_dep_analyzer import MicroDepAnalyzer from .base_analyzer import BaseAnalyzer from typing import Tuple from src.utils.thread_pool import ThreadPoolManager @@ -14,6 +15,7 @@ class PerformanceAnalyzer(BaseAnalyzer): self.disk_analyzer = DiskAnalyzer(data=self.data.get("Disk", {})) self.memory_analyzer = MemoryAnalyzer(data=self.data.get("Memory", {})) self.network_analyzer = NetworkAnalyzer(data=self.data.get("Network", {})) + self.micro_analyer = MicroDepAnalyzer(data=self.data.get("micro_dep", {})) self.mysql_analyzer = MysqlAnalyzer(data=self.data.get("Mysql", {})) self.thread_pool = ThreadPoolManager(max_workers=5) @@ -69,6 +71,7 @@ class PerformanceAnalyzer(BaseAnalyzer): disk_analyzer_task = self.thread_pool.add_task(self.disk_analyzer.run) memory_analyzer_task = self.thread_pool.add_task(self.memory_analyzer.run) network_analyzer_task = self.thread_pool.add_task(self.network_analyzer.run) + micro_analyzer_task = self.thread_pool.add_task(self.micro_analyer.run) mysql_analyzer_task = self.thread_pool.add_task(self.mysql_analyzer.run) self.thread_pool.run_all_task() @@ -83,6 +86,7 @@ class PerformanceAnalyzer(BaseAnalyzer): os_performance_report += report_results[disk_analyzer_task] os_performance_report += report_results[memory_analyzer_task] os_performance_report += report_results[network_analyzer_task] + os_performance_report += report_results[micro_analyzer_task] app_performance_report = "" app_performance_report += report_results[mysql_analyzer_task] return os_performance_report, app_performance_report diff --git a/src/performance_collector/base_collector2.py b/src/performance_collector/base_collector2.py new file mode 100644 index 00000000..229984e0 --- /dev/null +++ b/src/performance_collector/base_collector2.py @@ -0,0 +1,113 @@ +from abc import abstractmethod + +class COLLECTMODE: + DIRECT_MODE = 0 + ATTACH_MODE = 1 + +class HostInfo: + def __init__(self, host_ip="", host_port=22, host_user="root", host_password=""): + self.host_ip = host_ip + self.host_port = host_port + self.host_user = host_user + self.host_password = host_password + +# [NEW] shell_execute.py +import paramiko +from typing import Dict, Any, Tuple +import logging + +def remote_execute( + cmd: str, + host_info: HostInfo +) -> Dict[str, Any]: + # 创建SSH对象 + client = paramiko.SSHClient() + # 允许连接不在known_hosts文件中的主机 + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + # 连接到远程主机 + client.connect(host_info.host_ip, host_info.host_port, host_info.host_user, host_info.host_password, timeout=5) + # 执行指令 + stdin, stdout, stderr = client.exec_command(cmd) + # 获取执行结果 + result = stdout.read().decode() + error = stderr.read().decode() + status_code = stdout.channel.recv_exit_status() + if status_code: + raise RuntimeError(f"Error executing command '{cmd}': {error}") + else: + logging.info("Command '%s' executed successfully.", cmd) + return {cmd: result} + except Exception as e: + raise RuntimeError(f"Exception occurred while executing command '{cmd}': {e}") + finally: + # 关闭连接 + client.close() + +def remote_execute_with_exit_code( + cmd: str, + host_info: HostInfo +) -> Tuple[str, str, int]: + # 创建SSH对象 + client = paramiko.SSHClient() + # 允许连接不在known_hosts文件中的主机 + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + # 连接到远程主机 + client.connect(host_info.host_ip, host_info.host_port, host_info.host_user, host_info.host_password, timeout=5) + # 执行指令 + logging.debug(f"luckky test: remote_execute_with_exit_code execute cmd {cmd}") + stdin, stdout, stderr = client.exec_command(cmd) + # 获取执行结果 + result = stdout.read().decode() + error = stderr.read().decode() + status_code = stdout.channel.recv_exit_status() + logging.debug(f"luckky test: remote_execute_with_exit_code [{result}] [{error}] status_code {status_code}") + return result, error, status_code + except Exception as e: + raise RuntimeError(f"Exception occurred while executing command '{cmd}': {e}") + finally: + # 关闭连接 + client.close() + +def run_nohup_cmd( + cmd: str, + host_info: HostInfo +) -> str: + cmd = f"nohup {cmd} > /dev/null 2>&1 & echo $!" + + pid, _, _ = remote_execute_with_exit_code(cmd, host_info) + pid = pid.strip() + logging.debug(f"luckky test pid is {pid}") + if not pid.isdigit(): + raise RuntimeError("Failed to get PID") + return pid + +def get_process_pid( + process_name: str, + host_info: HostInfo +): + cmd = f"pgrep -f {process_name}" + pid, _, _ = remote_execute_with_exit_code(cmd, host_info) + if not pid.isdigit(): + raise RuntimeError("Failed to get PID") + return pid + + + +class BaseCollector: + def __init__(self): + # self.collect_mode = collect_mode + self.raw_data = {} + self.processed_data = {} + self.collect_cmd = "" + + @abstractmethod + def collect(self): + pass + + @abstractmethod + def process(self): + pass + + diff --git a/src/performance_collector/micro_dep_collector.py b/src/performance_collector/micro_dep_collector.py new file mode 100644 index 00000000..ece8fb10 --- /dev/null +++ b/src/performance_collector/micro_dep_collector.py @@ -0,0 +1,342 @@ +# from .base_collector import BaseCollector, CollectorArgs +from .base_collector2 import BaseCollector, HostInfo, remote_execute_with_exit_code, run_nohup_cmd +import logging +from typing import List +cfg_fw = { + 'dispatch_size': 4, +} +cfg_pmu_topdown = { + 'r0011': "cycle", + 'r2014': "fetch_bubble", + 'r201d': "fetch_bubble_max", + 'r001b': "inst_spec", + 'r0008': "inst_retired", + 'r7001': "execstall_cycle", + 'r7003': "fsustall", + 'r7004': "memstall_anyload", + 'r7005': "memstall_anystore", + 'r7006': "memstall_l1miss", + 'r7007': "memstall_l2miss", + 'r0010': "brmisspred", + 'r2013': "o3flush", + 'context-switches': "context_switches", + 'cpu-migrations': "cpu_migrations", + 'page-faults': "page_faults", +} + +cfg_pmu_cache = { + 'r0001': 'l1i_refill', + 'r0014': 'l1i_access', + 'r0003': 'l1d_refill', + 'r0004': 'l1d_access', + 'r0028': 'l2i_refill', + 'r0027': 'l2i_access', + 'r0017': 'l2d_refill', + 'r0016': 'l2d_access', + 'r0008': 'inst_retired', +} + +cfg_pmu_branch = { + 'r0011': 'cycle', + 'r200b': 'alu_isq_stall', + 'r200c': 'lsu_isq_stall', + 'r200d': 'fsu_isq_stall', + 'r0010': 'brmisspred', + 'r0012': 'brpred', +} + +cfg_pmu_tlb = { + 'r0002': 'l1i_tlb_refill', + 'r0026': 'l1i_tlb', + 'r0005': 'l1d_tlb_refill', + 'r0025': 'l1d_tlb', + 'r002e': 'l2i_tlb_refill', + 'r0030': 'l2i_tlb', + 'r002d': 'l2d_tlb_refill', + 'r002f': 'l2d_tlb', + 'r0035': 'itlb_walk', + 'r0034': 'dtlb_walk', + 'r0008': 'inst_retired', + 'r0011': 'cycle', + 'r7002': 'divstall', +} +def is_number(s: str): + try: + float(s) + return True + except ValueError: + return False + +def is_pid_valid( + pid: int, + host_info: HostInfo + ): + check_cmd = f"ps -p {pid} > /dev/null 2>&1" + _, _, exit_code = remote_execute_with_exit_code(check_cmd, host_info) + return exit_code == 0 + # return os.path.exists(f'/proc/{pid}') + + +class COLLECTMODE: + DIRECT_MODE = 0 + ATTACH_MODE = 1 + + +prompt_dict = { + "frontend_bound": "TopDown中的前端瓶颈(frontend bound)", + "bad_spec": "TopDown中的预测失败瓶颈(bad speculation)", + "retiring": "TopDown中的指令完成(retiring)", + "backend_bound": "TopDown中的后端瓶颈(backend bound)", + "frontend_latency_bound": "TopDown中的前端瓶颈下的前端延时瓶颈(frontend latency bound)", + "frontend_bandwidth_bound": "TopDown中的前端瓶颈下的前端带宽瓶颈(frontend bandwidth bound)", + "bs_mispred": "TopDown中的预测失败瓶颈中的分支预测失败瓶颈(bad speculation branch misprediction)", + "bs_mclear": "TopDown中的预测失败瓶颈中的流水线清空瓶颈(bad speculation machine clears)", + "core_bound": "TopDown中的后端瓶颈中的后端执行瓶颈(core bound)", + "mem_bound": "TopDown中的后端瓶颈中的后端内存子系统瓶颈(memory bound)", + "core_fsu_bound": "TopDown中的后端执行瓶颈中的浮点/向量计算瓶颈(core fsu bound)", + "core_other_bound": "TopDown中的后端执行瓶颈中的后端其他执行瓶颈(core other bound)", + "mem_l1_bound": "TopDown中的后端内存子系统瓶颈中的读取L1 cache造成的指令执行瓶颈(不包含L2/L3)", + "mem_l2_bound": "TopDown中的后端内存子系统瓶颈中的读取L2 cache造成的指令执行瓶颈(不包含L1/L3)", + "mem_l3_dram_bound": "TopDown中的后端内存子系统瓶颈中的读取L3以及内存造成的指令执行瓶颈(不包含L1/L2)", + "mem_store_bound": "TopDown中的后端内存子系统瓶颈中的内存写瓶颈(memory store bound)", + "context_switches": "上下文切换次数(context-switches)", + "cpu_migrations": "进程在不同CPU核之间的迁移次数(cpu-migrations)", + "page_faults": "缺页异常次数(page-faults)", + "l1i_missrate": "L1指令miss rate", + "l1d_missrate": "L1数据miss rate", + "l2i_missrate": "L2指令miss rate", + "l2d_missrate": "L2数据miss rate", + "l1i_mpki": "L1指令每千条指令中miss次数", + "l1d_mpki": "L1数据每千条指令中miss次数", + "l2i_mpki": "L2指令每千条指令中miss次数", + "l2d_mpki": "L2数据每千条指令中miss次数", + "branch_missrate": "分支预测失败率(branch missrate)", + "alu_isq_stall": "算术逻辑单元全部被占用导致的执行瓶颈", + "lsu_isq_stall": "访存逻辑单元全部被占用导致的执行瓶颈", + "fsu_isq_stall": "浮点单元全部被占用导致的执行瓶颈", + "l1i_tlb_missrate": "L1指令快表miss rate(l1i_tlb_missrate)", + "l1d_tlb_missrate": "L1数据快表miss rate(l1d_tlb_missrate)", + "l2i_tlb_missrate": "L2指令快表miss rate(l2i_tlb_missrate)", + "l2d_tlb_missrate": "L2数据快表miss rate(l2d_tlb_missrate)", + "itlb_walk_rate": "指令页表缓存未命中时触发页表遍历的频率(itlb_walk_rate)", + "dtlb_walk_rate": "数据页表缓存未命中时触发页表遍历的频率(dtlb_walk_rate)", + "l1i_tlb_mpki": "L1指令TLB每千条指令中miss次数", + "l1d_tlb_mpki": "L1数据TLB每千条指令中miss次数", + "l2i_tlb_mpki": "L2指令TLB每千条指令中miss次数", + "l2d_tlb_mpki": "L2数据TLB每千条指令中miss次数", + "itlb_walk_mpki": "指令TLB每千条指令中到页表查找次数", + "dtlb_walk_mpki": "指令TLB每千条指令中到页表查找次数", + "div_stall": "除法指令在关键路径导致的执行瓶颈", +} + + +class PerfCollector(BaseCollector): + def __init__(self, cfg_pmu={}): + self.cfg_pmu: dict = cfg_pmu + super().__init__() + + def set_collector_param(self, host_info: HostInfo, duration=0.1, target_pid=0): + self.host_info = host_info + self.duration = duration + self.target_pid = target_pid + self._gen_collect_cmd() + + def _gen_collect_cmd(self): + cmd = f"perf stat -e {','.join(self.cfg_pmu.keys())} " + if self.target_pid: + cmd += f"-p {self.target_pid} sleep {self.duration}" + else: + cmd += f"-a sleep {self.duration}" + + logging.debug(f"cmd: {cmd}") + self.collect_cmd = cmd + + + def collect(self): + raw_data = {} + _, stderr, _ = remote_execute_with_exit_code( + cmd=self.collect_cmd, + host_info=self.host_info, + ) + for line in stderr.splitlines(): + line = line.rstrip() + logging.debug(f"raw output: {line}") + if line == "": + continue + elems = line.split() + if len(elems) < 2: + continue + value = elems[0].replace(',', '') + if value == ">> 运行MetricProfileCollector:") -metric_collector = StaticMetricProfileCollector( +#logging.info(">>> 运行MetricProfileCollector:") +print(">>> 运行MetricProfileCollector:") +static_metric_collector = StaticMetricProfileCollector( ssh_client=ssh_client, max_workers=5 ) -static_profile_info = metric_collector.run() -logging.info(">>> MetricProfileCollector运行结果:") -logging.info(static_profile_info) +static_profile_info = static_metric_collector.run() +print("static_profile:", static_profile_info) + + +host_info = HostInfo(host_ip="9.82.230.156",host_port=22, host_password="Huawei12#$") +target_pid = 2144391 +benchmark_cmd = "tail -f /dev/null" +collect_mode = COLLECTMODE.ATTACH_MODE +microDepCollector = MicroDepCollector( + host_info=host_info, + iteration=10, + target_pid=target_pid, + benchmark_cmd=benchmark_cmd, + mode=collect_mode, +) +micro_dep_dollector_data = microDepCollector.run() +print("microDepCollector data", micro_dep_dollector_data) -logging.info(">>> 运行MetricCollector:") -testCollector = MetricCollector( +print(">>> 运行MetricCollector:") +metric_collector = MetricCollector( host_ip=host_ip, host_port=22, host_user="root", host_password=host_password, app=app ) -data = testCollector.run() -logging.info(">>> MetricCollector运行结果:") -logging.info(data) +data = metric_collector.run() +data["micro_dep"] = micro_dep_dollector_data +print("metric_collector data:", data) -logging.info(">>> 运行PerformanceAnalyzer:") +print(">>> 运行PerformanceAnalyzer:") testAnalyzer = PerformanceAnalyzer( data=data ) report, bottleneck = testAnalyzer.run() -logging.info(">>> PerformanceAnalyzer运行结果:") -logging.info(report) -logging.info(bottleneck) +print(">>> PerformanceAnalyzer运行结果:",report, bottleneck) + + +def slo_calc_callback(baseline, benchmark_result): + if baseline is None or abs(baseline) < 1e-9: + return 0.0 + return (benchmark_result - baseline) / baseline + +def benchmark_callback(ssh_client): + print("🔄 正在验证mysql benchmark性能...") + result = parse_mysql_sysbench(ssh_client) + try: + return float(result.output["qps"]) + except ValueError: + return 0.0 + +param_optimizer = ParamOptimizer( + service_name="mysql", + performance_metric=PerformanceMetric.QPS, + slo_goal=0.1, + analysis_report=report, + static_profile=static_profile_info, + ssh_client=ssh_client, + slo_calc_callback=slo_calc_callback, + benchmark_callback=benchmark_callback, + apply_params_callback=apply_mysql_config, + max_iterations = 1, +) +param_optimizer.run() -logging.info(">>> 运行StrategyOptimizer:") strategy_optimizer = StrategyOptimizer( application="mysql", bottle_neck=bottleneck, @@ -70,7 +119,6 @@ strategy_optimizer = StrategyOptimizer( target_config_path="" ) plan, isfinish, feedback = strategy_optimizer.run() -logging.info(">>> StrategyOptimizer运行结果:") -logging.info(plan) -logging.info(isfinish) -logging.info(feedback) \ No newline at end of file +print("plan:", plan) +print("isfinish", isfinish) +print("feedback", feedback) \ No newline at end of file -- Gitee