From b25b2eddd7306b40fcd9f716de224c5c3ce316fb Mon Sep 17 00:00:00 2001 From: caojiale1 Date: Thu, 13 Apr 2023 16:32:09 +0800 Subject: [PATCH 1/2] Signed-off-by:caojiale1@huawei.com --- localCoverage/coverage_tools.py | 4 + .../keyword_registration/keyword.json | 83 ++ .../keyword_registration/keyword_filter.py | 925 ++++++++++++++++++ .../restore_comment/build_before_generate.py | 5 +- 4 files changed, 1016 insertions(+), 1 deletion(-) create mode 100644 localCoverage/keyword_registration/keyword.json create mode 100644 localCoverage/keyword_registration/keyword_filter.py diff --git a/localCoverage/coverage_tools.py b/localCoverage/coverage_tools.py index ad036c7..0fb2335 100644 --- a/localCoverage/coverage_tools.py +++ b/localCoverage/coverage_tools.py @@ -176,5 +176,9 @@ if __name__ == '__main__': developer_path, "localCoverage/restore_comment/restore_source_code.py") subprocess.run("python3 %s" % restore_source_code_path, shell=True) + keyword_path = os.path.join( + developer_path, "localCoverage/keyword_registration/keyword_filter.py") + subprocess.run("python3 %s" % keyword_path, shell=True) + print(r"See the code coverage report in: " r"/test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/reports/cxx/html") diff --git a/localCoverage/keyword_registration/keyword.json b/localCoverage/keyword_registration/keyword.json new file mode 100644 index 0000000..3c742e9 --- /dev/null +++ b/localCoverage/keyword_registration/keyword.json @@ -0,0 +1,83 @@ +{ + "KEYWORD": [ + "unmap(", "ReadInterfaceToken(", "DeepCopyGroupEntry(", "readUint32(", + "ifstream fin(", "InitFunc", "AddInt64StringToJson(", "WriteFloatVector(", + "GetUint64Value(", "realpath(", "fchmod(", "SendEvent(", "WriteParcelable(", + "EVP_DecryptInit_ex(", "CreateJson(", "InitThread(", "ParcelWrite(", + "SetUint8Value(", "OnRemoveSystemAbility(", "EVP_CIPHER_CTX_ctrl(", "umount(", + "SetAttributesValue(", "GetBundleManager(", "fstat(", "ChangeModeFile(", + "HcFileClose", "FD_ISSET(", "CheckSystemAbility(", "SendTransactCmd(", + "SetUint8ArrayValue(", "GetUint8ArrayValue(", "SerializeToArray(", + "pthread_getname_np(", "LoadSymbol(", "fchown(", "InitClient(", + "ReadString16(", "TryConnect(", "EVP_PKEY_new_mac_key(", "WIFEXITED(", + "SendRequest(", "bind(", "GetInt32Value(", "read(", "ReadUint8(", "AsObject(", + "lseek(", "ChangePropertyContent(", "WriteToParcel(", "dlsym(", "pushBackT(", + "close(", "pwrite(", "SetCallRequestParamInfo(", "CheckGetSAPermission(", + "RSRenderServiceConnectHub::GetRenderService(", "readdir(", "memset_s(", + "is_object(", "curl_global_init(", "WriteTargetAndCallback", + "GetSystemAbility(", "pipe(", "fwrite(", "sendfile(", "Remote(", "HcMalloc(", + "CreateHdiScreen(", "GetDmsProxy(", "opendir(", "WriteInt32(", + "SetStringValue(", "WriteUint64(", "open(", "mkdir(", "CreateGroupEntry(", + "Peek(", "DirIterator(", "select(", "AddByteToJson(", "AddIntToJson(", + "SetDataCapacity", "RSTransactionProxy::GetInstance(", "ReadUint32(", "link(", + "Malloc(", "DeepCopyDeviceEntry(", "ParseFromString(", "CreateConnection(", + "readstring(", "AddDeathRecipient(", "WriteBuffer(", "EVP_DecryptFinal_ex(", + "GetStringValue(", "curl_multi_init(", "mount(", "InitDmsImplFunc", + "GetUint32Value(", "WriteUint16(", "umount2(", "AddObjToArray(", + "GetContextObject(", "rfind(", "SetUint16Value(", "GetDeviceProfileService(", + "statvfs(", "fork(", "sprintf_s(", "free(", "ReadInt16(", "remove(", "ioctl(", + "GetParcelableInfo(", "GetAttributesValue(", "WriteParcelableVector(", + "ReadUint64(", "HcFileRead(", "futimens(", "fputc(", + "HdiFramebufferSurface::CreateFramebufferSurface(", "popen(", "HcFileOpen(", + "ReadStringVector(", "ReadString(", "strdup(", "GetUint16ArrayValue(", + "is_discarded(", "ReadUnpadBuffer(", "WriteInt32Vector(", + "SetUint16ArrayValue(", "fgets(", "kill(", "ChangeModeDirectory(", "getline(", + "GetRealPath(", "fclose(", "GetUdid(", "newLine.empty(", "ReportSystemFault(", + "is_open(", "GetUint16Value(", "MakeShared(", "ReadParcelable&", + "AddObjToJson(", "SetUint32Value(", "EVP_EncryptInit_ex(", "make_shared&", + "CreateDeviceEntry(", "setmntent(", "OnAddSystemAbility(", "fdopen(", + "RSMainThread::Instance(", "access(", "NLMSG_SPACE(", "mmap(", + "CheckAddOrRemovePermission(", "dlopen(", "dlsy(", "malloc(", "chmod(", + "curl_multi_setopt(", "PackJsonToString(", "OnStart(", "ReadRemoteObject(", + "stat(","GetUint64ArrayValue(", "new","ifstream(", "poll(", "CmMalloc(", + "HcfMalloc(", "SetUint64ArrayValue(", "PathExists(", "StringSetPointer(", + "unlink(", "major(", "CreateSurfaceAsConsumer(", "ParcelWriteUint64(", + "EVP_DigestSignUpdate(", "ReadInt32(","ReadUInt8Vector(", "snprintf_s(", + "WriteInt16(", "readlink(", "LoadLibrary(", "WriteString16(", "memcpy_s(", + "rename(", "statfs(", "GetUint8Value(","closedir(", "mknod(","HcFileRemove(", + "getmntent_r(","fopen(", "SetBoolValue(","Seek(", "GetUint32ArrayValue(", + "strcmp(", "CheckListSAPermission(", "Mmap(","iface_cast(", + "EVP_DigestSignInit(", "UnpackInt32(", "GetSystemAbilityManager(", "Write(", + "ReadBool(","vsnprintf_s(", "IsProxyObject(","dlerror(", "WriteInt64(", + "EVP_EncryptFinal_ex(","lstat(", "IsObjectDead","Unmarshing(", "strcpy_s(", + "EVP_DecryptUpdate(","New(", "rmdir(","WriteInterfaceToken(", + "GetBoolValue(", "WriteUInt32Vector(", "ReadUInt32Vector(", "EVP_Digest(", + "ParcelWriteUint32(", "StringSet(","AddStringToJson(", "Marshalling(", + "ParcelReadUint64(", "ReadFloat(","Create(", "SaveStringToFd(","fprintf(", + "GetProxy(", "EVP_EncryptUpdate(", "ParcelRead(","Publish(","Free(", + "make_unique&","execvp(", "HdiBackend::GetInstance(", "ReadInt32Vector(", + "pushBack(", "CreateVSyncReceiver(", "WriteStringVector(", "HcGetUdid(", + "UnMarshalling(", "WriteUint32(", "ReadUInt64Vector(", "GetRemoteObject(", + "none_of(", "WriteUInt64Vector(", "CreateJsonArray(","sendmsg(", "socket(", + "SetUint32ArrayValue(", "SetInt32Value(","dprintf(", "syncfs(", + "SetUint64Value(", "RAND_bytes(","CheckGetRemoteSAPermission(", + "MarshalSAExtraProp(", "ReadFloatVector(","WriteFloat(", "Serialize(", + "syscall(", "HcFileWrite(", "ReadFromParcel(", "InitHcMutex(", "MALLOC(", + "new(", "CreateSurfaceAsProducer(", "ReadString16Vector(", "chown(", + "HcFileSize(", "HdiDevice::GetInstance(", + "RSProcessorFactory::CreateProcessor(", "ReadInt64(", "WriteRawData(", + "strncat_s(", "newLine.find(", "ReadParcelableVector(", "ChkRWError(", + "CreateOrGetScreenManager(", "fread(", "readUint64(", "HksMalloc(", + "RemoveDeathRecipient(", "sscanf_s(", "EVP_DigestSignFinal(", "ReadUint16(", + "fseek(", "promote(", "WriteUInt8Vector(", "WriteRemoteObject(", + "EVP_CIPHER_CTX_new(", "ParcelReadUint32(", "PostTask(", "ReadRawData(", + "ReadVectorFromparcel(", "WriteUint8(", "UnpackUint64(", "WriteBool(", + "ForceCreateDirectory(", "chdir(", "DelayedSingleton&", "dup2(", "write(", + "AddBoolToJson(", "EVP_MD_CTX_create(", "Alloc(", "recvmsg(", "WriteString(", + "SoftBusMutexLock(", "pthread_mutex_lock(", "RequireAuthLock(", "Lock(", + "LaneQosLock(", "SoftBusCalloc(", "SoftBusMalloc(", "cJSON_AddItemToObject(", + "RewindWrite(", "WritePointer(", "DBinderService::GetInstance(", + "IPCProcessSkeleton::GetCurrent(", "IPCThreadSkeleton::GetCurrent(", + "ISessionService::GetInstance(" + ] +} \ No newline at end of file diff --git a/localCoverage/keyword_registration/keyword_filter.py b/localCoverage/keyword_registration/keyword_filter.py new file mode 100644 index 0000000..c23d3c8 --- /dev/null +++ b/localCoverage/keyword_registration/keyword_filter.py @@ -0,0 +1,925 @@ +#!/usr/bin/env python3 +# coding=utf-8 + +# +# Copyright (c) 2023 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import re +import time +import json +import fcntl +import platform +import linecache +import traceback +from multiprocessing import Pool +from lxml import html +from selectolax.parser import HTMLParser + +etree = html.etree + + +class CoverageReportPath: + def __init__(self, report_path): + self.report_path = report_path + + def gcov_file_generator(self): + """ + gcov文件生成器 + """ + for root_path, dir, files in os.walk(self.report_path): + for file in files: + if not file.endswith(".gcov.html"): + continue + file_path = os.path.join(root_path, file) + yield file_path + + def modify_report_style(self): + """ + 修改覆盖率报告css样式 + """ + css_file_path = "" + for root_path, dir, files in os.walk(self.report_path): + for file in files: + if file == "gcov.css": + css_file_path = os.path.join(root_path, file) + text = """ +/* update */ +span.linenumupdate, span.branchexecupdate, span.branchnoexecupdate, +span.linecovupdate, span.linenocovupdate, span.branchcovupdate, +span.branchnocovupdate +{ + background-color:#BBBBBB; +}""" + with open(css_file_path, "a+", encoding="utf-8") as file: + file.write(text) + + def get_statistic_path(self, gcov_file_path: str): + """ + 根据gcov.html路径和统计等级获取统计报告路径 + """ + file_name_list = ["index.html", "index-sort-b.html", + "index-sort-f.html", "index-sort-l.html"] + index_path_list = [ + os.path.normcase(os.path.join(os.path.dirname(gcov_file_path), file_name)) + for file_name in file_name_list + ] + out_path_list = [ + os.path.normcase(os.path.join(self.report_path, file_name)) + for file_name in file_name_list + ] + index_path_list.extend(out_path_list) + return index_path_list + + +class KeywordRegistration: + def __init__(self, report_path): + self.report_path = report_path + self.keyword_file_path = os.path.normcase( + os.path.join(os.path.dirname(__file__), "keyword.json")) + + def get_keyword_info(self): + """ + 获取报备关键字信息 + """ + try: + with open(self.keyword_file_path, "r") as file: + keyword_dict = json.load(file) + keyword_list = keyword_dict.get("KEYWORD") + return keyword_list + except (FileNotFoundError, AttributeError, FileExistsError) as error: + print(f"获取报备过滤关键字报错,error:{error}") + + @staticmethod + def get_coverage_content(file_path): + """ + 获取覆盖率文件内容 + """ + if not os.path.exists(file_path): + return + with open(file_path, "r", encoding="utf-8") as file: + content = file.read() + return content + + @staticmethod + def get_tag(content, line): + start_index = content.find(f'")[0] + + if source_code: + source_code = "".join(source_code) + return source_code + + @staticmethod + def update_tag(tag: str): + replace_item_list = ["lineNum", "branchNoCov", "branchCov", "lineCov", + "branchNoExec", "lineNoCov", "branchExec"] + + for item in replace_item_list: + if item in tag: + replace_item = (item + "Update").lower() + tag = tag.replace(item, replace_item) + return tag + + def get_coverage_lines_by_branch(self, file_path, content=None): + """ + 获取覆盖率报告中的所有的if分支行号 + """ + if not content: + content = self.get_coverage_content(file_path) + if not content: + return + _, file_name = os.path.split(file_path) + branch_line_list = re.findall( + r''' 200: + return None, None + next_tag = get_tag(content, str(next_line)) + func_body_tag_list_append(next_tag) + if "{" in next_tag: + if branch_braces_sub: + branch_braces_sub += 1 + braces_sub += 1 + if "}" in next_tag: + if branch_braces_sub: + branch_braces_sub -= 1 + braces_sub -= 1 + if 'title="Branch' in next_tag: + branch_braces_sub += 1 + sub_brace_start_status = True + if not branch_braces_sub and \ + (sub_brace_start_status == sub_brace_end_status): + next_tag = update_tag(next_tag) + if not branch_braces_sub and sub_brace_start_status: + sub_brace_end_status = True + update_func_body_tag_list_append(next_tag) + next_line += 1 + return func_body_tag_list, update_func_body_tag_list + + def get_line_funcname(self, branch_line, content): + """ + 获取行号源码所在函数的函数名 + """ + function_name = "" + previous_line = int(branch_line) + loop_count = 0 + get_tag = self.get_tag + try: + while not function_name: + loop_count += 1 + if loop_count > 200: + return + previous_line -= 1 + tag = get_tag(content, previous_line) + tag_text = HTMLParser(tag).text() + if tag_text[39:].strip().startswith("{"): + left_parentheses_count = right_parentheses_count = 0 + child_count = 0 + while previous_line: + child_count += 1 + if child_count > 200: + return + previous_line -= 1 + html_text = get_tag(content, previous_line) + source_code = HTMLParser(html_text).text()[39:].strip() + + left_parentheses_count += source_code.count("(") + right_parentheses_count += source_code.count(")") + if not left_parentheses_count == right_parentheses_count: + continue + if " operator" in source_code: + class_name_list = re.findall(r'\((.*?)\)', source_code) + if class_name_list: + function_name_str = class_name_list[0] + if not function_name_str: + return + function_name = function_name_str.split()[-1] + return function_name + function_name_list = re.findall(r' (.*?)\(', source_code) + if function_name_list: + function_name_str = function_name_list[0] + if not function_name_str: + return + function_name = function_name_str.split()[-1] + return function_name + except (OSError, IndexError, TypeError) as error: + print(f"覆盖率报告{branch_line}行获取函数名报错, error:{error}", + traceback.format_exc()) + + @staticmethod + def get_branch_line_list(keyword_line: int, branch_line_list: list): + """ + 获取大于关键字行号的所有分支行号 + """ + if keyword_line in branch_line_list: + index = branch_line_list.index(keyword_line) + branch_line_list = branch_line_list[index:] + else: + for line in branch_line_list: + if line > keyword_line: + index = branch_line_list.index(line) + branch_line_list = branch_line_list[index:] + break + return branch_line_list + + @staticmethod + def get_keyword_judge_char(keyword, keyword_source_code): + """ + 获取关键字替代字符 + """ + if "&" in keyword: + keyword = keyword.replace("&", "<") + + keyword_index = keyword_source_code.find(keyword) + if keyword_index == -1: + return + + try: + keyword_code = keyword_source_code[:keyword_index + len(keyword)] + if " = " in keyword_code: + judge_key = keyword_code.split(" = ")[0].split()[-1] + else: + bracket_index = keyword_code.find("(") + bracket_code = keyword_code[:bracket_index] + judge_key = bracket_code.split()[-1] + return judge_key + except (IndexError, ValueError) as error: + print("获取关键字替代字符", error, traceback.format_exc()) + + @staticmethod + def get_branch_data_by_tag(tag_html: str, symbol_status=None): + """ + 根据前端标签获取分支数据 + """ + if symbol_status: + key = r"#+\-*" + else: + key = r"#+\-" + branch_line_list = re.findall(rf"> ([{key}]) ", tag_html) + + return branch_line_list + + @staticmethod + def get_judge_condition_index(judge_key: str, source_code: str): + """ + 获取判断条件索引 + """ + keyword_index_list = [] + keyword_index_list_append = keyword_index_list.append + + condition_str_list = re.split(rf"\|\||&&", source_code) + for index, code_str in enumerate(condition_str_list): + if judge_key in code_str: + keyword_index_list_append(index) + return keyword_index_list, condition_str_list + + @staticmethod + def update_source_code_tag(html_tag: str): + replace_item_list = ["lineNum", "lineCov", "lineNoCov"] + + for item in replace_item_list: + if item in html_tag: + replace_item = (item + "Update").lower() + html_tag = html_tag.replace(item, replace_item) + + return html_tag + + def get_break_line_tag(self, content, origin_branch_html, branch_line): + """ + 获取分支换行的判断条件源码tag + """ + get_tag = self.get_tag + left_brace_exist = False + line_break = loop_count = 0 + while not left_brace_exist: + if loop_count > 10: + break + line_break += 1 + origin_branch_html += "\n" + next_line_tag = get_tag(content, branch_line + line_break) + if "{" in next_line_tag: + left_brace_exist = True + origin_branch_html += next_line_tag + loop_count += 1 + return origin_branch_html + + @staticmethod + def modify_tag_style(tag, rate): + """ + 修改标签样式 + """ + if 75 <= rate < 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed") + tag = tag.replace("coverPerLo", "coverPerMed") + tag = tag.replace("coverNumLo", "coverNumMed") + elif rate >= 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi") + tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi") + tag = tag.replace("coverPerLo", "coverPerHi") + tag = tag.replace("coverNumLo", "coverNumHi") + tag = tag.replace("coverPerMed", "coverPerHi") + tag = tag.replace("coverNumMed", "coverNumHi") + return tag + + def update_coverage_ratio_tag(self, file_path): + """ + 修改覆盖率比率数据 + """ + with open(file_path, "r", encoding="utf-8") as coverage_file: + content = coverage_file.read() + + covered_nums = len(re.findall(r"> ([+]) ", content)) + branch_total = len(re.findall(r"> ([+\-#]) ", content)) + + line = 0 + start_line = 1 + content_list = content.splitlines() + for line_text in content_list: + if 'Branches:' in line_text: + line = start_line + break + start_line += 1 + linecache.clearcache() + coverage_tag = linecache.getline(file_path, line) + covered_tag = linecache.getline(file_path, line + 1) + coverage_total_tag = linecache.getline(file_path, line + 2) + try: + origin_hit = int(re.findall(r"(\d+)", covered_tag)[0]) + origin_total = int(re.findall(r"(\d+)", coverage_total_tag)[0]) + coverage_rate_tag = linecache.getline(file_path, line + 3) + replace_tag = coverage_tag + covered_tag + coverage_total_tag + \ + coverage_rate_tag + if origin_total - branch_total == 0 and origin_hit - covered_nums == 0: + return + if int(branch_total) == 0: + coverage = 0.0 + else: + coverage = round(int(covered_nums) / int(branch_total) * 100, 1) + + origin_rate = re.findall(r"(\d+.\d+)", coverage_rate_tag)[0] + update_tag = coverage_tag + covered_tag.replace( + str(origin_hit), str(covered_nums)) + \ + coverage_total_tag.replace(str(origin_total), + str(branch_total)) + \ + coverage_rate_tag.replace(origin_rate, str(coverage)) + + update_tag = self.modify_tag_style(update_tag, coverage) + if replace_tag in content: + content = content.replace(replace_tag, update_tag) + + with open(file_path, "w", encoding="utf-8") as file: + file.write(content) + + # 修改数据统计页面 + hit_shield_num = origin_hit - covered_nums + total_shield_num = origin_total - branch_total + self.update_statistic(file_path, hit_shield_num, total_shield_num) + except (IndexError, TypeError, FileNotFoundError) as error: + print("修改分支数据", error, traceback.format_exc()) + + def update_statistic(self, file_path, hit_shield_num, total_shield_num): + """ + 修改覆盖率分支数据 + """ + index_path_list = CoverageReportPath(self.report_path).get_statistic_path(file_path) + for index_path in index_path_list: + if not os.path.exists(index_path): + continue + + with open(index_path, "r", encoding="utf-8") as coverage_file: + fcntl.flock(coverage_file.fileno(), fcntl.LOCK_EX) + content = coverage_file.read() + + file_dir, file_name = os.path.split(file_path) + if os.path.normcase(file_dir) not in os.path.normcase(index_path): + if platform.system() == "Windows": + judge_item = file_dir.replace(self.report_path + "\\", "") + judge_item = ">" + judge_item.replace("\\", "/") + "" + else: + judge_item = file_dir.replace(self.report_path + "/", "") + judge_item = ">" + judge_item + "" + else: + judge_item = ">" + file_name.replace(".gcov.html", "") + + start_line = 1 + line = tag_line = 0 + content_list = content.splitlines() + tmp = 'Branches:' + for line_text in content_list: + if not line and tmp in line_text: + line = start_line + if judge_item in line_text: + tag_line = start_line + break + start_line += 1 + try: + # total statistic modify + linecache.clearcache() + coverage_head_tag = linecache.getline(index_path, line) + origin_hit_tag = linecache.getline(index_path, line + 1) + origin_hit_num = int(re.findall(r"(\d+)", origin_hit_tag)[0]) + origin_total_tag = linecache.getline(index_path, line + 2) + origin_total_num = int(re.findall(r"(\d+)", origin_total_tag)[0]) + origin_rate_tag = linecache.getline(index_path, line + 3) + origin_rate = re.findall(r"(\d+.\d+)", origin_rate_tag)[0] + replace_tag = coverage_head_tag + origin_hit_tag + \ + origin_total_tag + origin_rate_tag + + # file branch modify + no_change_file_tag = "" + for i in range(8): + no_change_file_tag += linecache.getline(index_path, tag_line + i) + file_rate_tag = linecache.getline(index_path, tag_line + 8) + file_covered_tag = linecache.getline(index_path, tag_line + 9) + file_tag = no_change_file_tag + file_rate_tag + \ + file_covered_tag + origin_file_covered_num, origin_file_total_num = map( + int, re.findall(r"(\d+)", file_covered_tag)) + if origin_file_total_num: + origin_file_rate = re.findall(r"(\d+.\d+)", file_rate_tag)[0] + else: + # 不存在覆盖率 + origin_file_rate = "-" + head_hit_num = origin_hit_num - hit_shield_num + head_total_num = origin_total_num - total_shield_num + if head_total_num == 0: + rate = 0.0 + else: + rate = round(head_hit_num / head_total_num * 100, 1) + file_hit_num = origin_file_covered_num - hit_shield_num + file_total_num = origin_file_total_num - total_shield_num + if file_total_num == 0: + file_rate = 0.0 + else: + file_rate = round(file_hit_num / file_total_num * 100, 1) + update_tag = coverage_head_tag + origin_hit_tag.replace( + str(origin_hit_num), str(head_hit_num)) + \ + origin_total_tag.replace(str(origin_total_num), + str(head_total_num)) + \ + origin_rate_tag.replace(origin_rate, str(rate)) + + update_tag = self.modify_tag_style(update_tag, rate) + + update_branch_tag = file_rate_tag.replace( + origin_file_rate, str(file_rate)) + \ + file_covered_tag.replace( + str(origin_file_covered_num) + " / " + + str(origin_file_total_num), str(file_hit_num) + " / " + + str(file_total_num)) + + update_branch_tag = self.modify_tag_style(update_branch_tag, file_rate) + update_branch_tag = no_change_file_tag + update_branch_tag + if replace_tag in content: + content = content.replace(replace_tag, update_tag) + if file_tag in content: + content = content.replace(file_tag, update_branch_tag) + with open(index_path, "w", encoding="utf-8") as file: + file.write(content) + fcntl.flock(file.fileno(), fcntl.LOCK_UN) + time.sleep(1) + except (IndexError, TypeError, FileNotFoundError) as error: + print("修改分支统计数据报错", error, traceback.format_exc()) + + def _check_if_branch_line(self, judge_key, sub_branch_line_list, + key_line, content, function_name): + """ + 确定if分支行号 + """ + if_branch_line = None + for branch_line in sub_branch_line_list: + if branch_line == key_line: + if_branch_line = key_line + break + # 获取分支行所在函数名 + branch_function_name = self.get_line_funcname( + branch_line, content) + # 关键字范围只在关键字所在函数之内 + branch_line_tag = self.get_tag(content, branch_line) + try: + if "{" not in branch_line_tag: + branch_line_tag = self.get_break_line_tag( + content, branch_line_tag, branch_line) + branch_line_source_code = self.get_source_code( + branch_line_tag) + if function_name == branch_function_name: + if judge_key in branch_line_source_code: + if_branch_line = branch_line + break + else: + break + except: + print("获取关键字if分支行报错", traceback.format_exc()) + + return if_branch_line + + @staticmethod + def _branch_replace(branch): + """ + 分支符号替换 + """ + if branch == "#": + branch = "> # <" + elif branch == "+": + branch = "> + <" + elif branch == "*": + branch = "> * <" + else: + branch = "> - <" + return branch + + @staticmethod + def _single_condition_modify_html(branch_html, branch_list): + """ + 单条件修改代码块html + """ + line_item = ' ' + line_feed_index = branch_html.find(line_item) + if line_feed_index == -1: + if "+" in branch_list: + update_branch_tag = branch_html.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + else: + try: + first_branch = branch_list[0] + first_branch = "> " + first_branch + " <" + first_branch_index = branch_html.find(first_branch) + branch_tag = branch_html[:first_branch_index + 5] + update_branch_tag = branch_html[first_branch_index + 5:] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag = branch_tag + update_branch_tag + except: + return + else: + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1] + if "-" not in branch_list and "+" not in branch_list: + del_count = update_branch_tag.count("> # <") + update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1) + else: + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_html[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + if line_feed_index == -1: + branch_tag = branch_tag.replace("> - <", "> <") + branch_tag = branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + else: + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + try: + update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_tag[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + except: + return + + branch_tag = branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + return update_branch_tag + + def _multi_condition_modify_html(self, branch_html, branch_length, + condition_str_list, judge_index_list): + """ + 多条件修改代码块html + """ + line_item = ' ' + if branch_length % len(condition_str_list): + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + branch_html = branch_html[line_feed_index:] + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + line_feed_index = branch_html.count(line_item) + if line_feed_index > 1: + try: + line_feed_length = len(line_item) + branch_tag_before = branch_html[:line_feed_length] + branch_html = branch_html[line_feed_length:] + line_feed_index = branch_html.find(line_item) + branch_tag_after = branch_html[:line_feed_index] + branch_tag_after = branch_tag_after.replace("> - <", "> <") + branch_tag = branch_tag_before + branch_tag_after + except: + return + else: + branch_tag = branch_html + branch_tag = branch_tag.replace("> - <", "> <") + # 不再换行,索引为-1 + line_feed_index = -1 + + update_branch_tag += branch_tag + if line_feed_index == -1: + branch_html = "" + else: + branch_html = branch_html[line_feed_index:] + if branch_html != "": + branch_html = branch_html.replace("> - <", "> <") + update_branch_tag += branch_html + else: + branch_list = self.get_branch_data_by_tag(branch_html, True) + update_branch_tag = "" + end_count = -1 + try: + for index in judge_index_list: + branch_data = branch_list[:(index + 1) * 2] + # 要修改的分支数据列表长度 + branch_data_length = len(branch_data) + change_status = False + for count, branch in enumerate(branch_data, 1): + if count <= end_count: + continue + + end_count = count + branch = self._branch_replace(branch) + end_index = branch_html.find(branch) + branch_tag = branch_html[:end_index + 5] + if branch_data_length - count in [0, 1]: + if change_status: + continue + if branch == "> # <": + change_status = True + branch_tag = branch_tag.replace("> # <", "> * <") + elif branch == "> - <": + change_status = True + branch_tag = branch_tag.replace("> - <", "> * <") + update_branch_tag += branch_tag + branch_html = branch_html[end_index + 5:] + except: + return + update_branch_tag += branch_html + return update_branch_tag + + def judge_branch_exists(self, file_path): + """ + 判断报告是否存在分支 + """ + content = self.get_coverage_content(file_path) + branch_total = len(re.findall(r"> ([+\-#]) ", content)) + if branch_total: + return True + else: + return False + + def modify_branch(self, file_path, keyword_list): + """ + html报告分支屏蔽修改 + """ + branch_line_list = self.get_coverage_lines_by_branch(file_path) + if not branch_line_list: + return True + + branch_line_list.sort() + no_change_content = self.get_coverage_content(file_path) + for keyword in keyword_list: + content = self.get_coverage_content(file_path) + keyword_line_list = self.get_coverage_lines_by_keyword(keyword, content) + if not keyword_line_list: + continue + for key_line in keyword_line_list: + key_html = self.get_tag(content, key_line) + if not key_html or "LOG" in key_html: + continue + + function_name = self.get_line_funcname(key_line, content) + sub_branch_line_list = self.get_branch_line_list( + key_line, branch_line_list) + keyword_source_code = self.get_source_code(key_html) + if not all([function_name, keyword_source_code, + sub_branch_line_list]): + continue + + judge_key = keyword + if 'title="Branch' not in key_html: + judge_key = self.get_keyword_judge_char( + keyword, keyword_source_code) + if "*" in judge_key: + judge_key = judge_key.replace("*", "") + if "&" in judge_key: + judge_key = judge_key.replace("&", "") + if_branch_line = self._check_if_branch_line( + judge_key, sub_branch_line_list, key_line, + content, function_name) + + if not if_branch_line: + continue + + origin_branch_html = self.get_tag(content, if_branch_line) + branch_html = origin_branch_html + if "{" not in origin_branch_html: + origin_branch_html = self.get_break_line_tag( + content, origin_branch_html, if_branch_line) + branch_html = origin_branch_html + top_source_code = self.get_source_code(branch_html) + no_change_branch_html = self.get_tag( + no_change_content, if_branch_line) + branch_list = self.get_branch_data_by_tag(no_change_branch_html) + if not branch_list: + continue + if "#" not in branch_list and "-" not in branch_list: + continue + branch_tag_data = self.get_branch_data_by_tag(branch_html, True) + if len(branch_tag_data) <= 1: + continue + if len(branch_list) == 2: + if "-" in branch_list: + update_branch_tag = branch_html.replace("> - <", "> <") + else: + update_branch_tag = branch_html.replace("> # <", "> <", 1) + else: + branch_length = len(branch_list) + judge_index_list, condition_str_list = self.\ + get_judge_condition_index(judge_key, top_source_code) + if not len(judge_index_list): + continue + if len(condition_str_list) == 1: + update_branch_tag = self._single_condition_modify_html( + branch_html, branch_list) + else: + update_branch_tag = self._multi_condition_modify_html( + branch_html, branch_length, condition_str_list, + judge_index_list) + if not update_branch_tag: + return False + + update_branch_tag = self.update_source_code_tag( + update_branch_tag) + content = content.replace(origin_branch_html, update_branch_tag) + source_code_tag_list, update_source_code_tag_list = self.\ + code_body_judge(int(if_branch_line), content) + if source_code_tag_list: + source_code_tag_html = "\n".join(source_code_tag_list) + update_source_code_tag_html = "\n".join( + update_source_code_tag_list) + content = content.replace( + source_code_tag_html, update_source_code_tag_html) + with open(file_path, "w", encoding="utf-8") as new_html: + new_html.write(content) + + content = self.get_coverage_content(file_path) + content = content.replace('> * ', '> ') + with open(file_path, "w", encoding="utf-8") as new_html: + new_html.write(content) + + def keyword_registration(self, file_path, keyword_list): + """ + 报备 + """ + self.modify_branch(file_path, keyword_list) + + # 修改覆盖率数据 + self.update_coverage_ratio_tag(file_path) + return True + + def multiprocessing_registration(self): + """ + 多进程关键字报备 + """ + keyword_list = self.get_keyword_info() + if not keyword_list: + return + + # 创建报告路径生成器 + report_instance = CoverageReportPath(self.report_path) + # 修改css文件 + report_instance.modify_report_style() + # 创建报告路径生成器 + report_generator = report_instance.gcov_file_generator() + # 获取关键字 + keyword_registration = self.keyword_registration + + pool = Pool(10) + apply_async = pool.apply_async + while True: + file_path = next(report_generator, None) + if not file_path: + break + + # 关键字报备 + apply_async(keyword_registration, args=(file_path, keyword_list)) + + pool.close() + pool.join() + + +def main(report_path): + print("*" * 50, "报备开始", "*" * 50) + manager = KeywordRegistration(report_path) + start_time = time.time() + manager.multiprocessing_registration() + end_time = time.time() + times = round(end_time - start_time, 3) + print("*" * 50, "报备结束", "*" * 50, "耗时:", times) + + +if __name__ == '__main__': + current_path = os.getcwd() + root_path = current_path.split("/test/testfwk/developer_test")[0] + developer_path = os.path.join(root_path, "test/testfwk/developer_test") + report_path = os.path.join(developer_path, "localCoverage/codeCoverage/results/coverage/reports/cxx/html") + main(report_path) diff --git a/localCoverage/restore_comment/build_before_generate.py b/localCoverage/restore_comment/build_before_generate.py index 086c79c..c264cba 100644 --- a/localCoverage/restore_comment/build_before_generate.py +++ b/localCoverage/restore_comment/build_before_generate.py @@ -84,11 +84,14 @@ def rewrite_source_file(source_path_list: list): for key in keys: if key in line and line.strip().startswith(key): write_fp.write(line) + break elif " //LCOV_EXCL_BR_LINE" not in line and not line.strip().endswith("\\"): write_fp.write(line.strip("\n").strip("\n\r") + " //LCOV_EXCL_BR_LINE") write_fp.write("\n") - else: + break + elif key == keys[-1]: write_fp.write(line) + break break os.remove(path) -- Gitee From 0ea65dadc70ab94a6500ce5172bf05b2e3533da4 Mon Sep 17 00:00:00 2001 From: caojiale1 Date: Thu, 13 Apr 2023 16:32:09 +0800 Subject: [PATCH 2/2] Signed-off-by:caojiale1@huawei.com --- localCoverage/coverage_tools.py | 4 + .../keyword_registration/keyword.json | 83 ++ .../keyword_registration/keyword_filter.py | 925 ++++++++++++++++++ .../restore_comment/build_before_generate.py | 5 +- 4 files changed, 1016 insertions(+), 1 deletion(-) create mode 100644 localCoverage/keyword_registration/keyword.json create mode 100644 localCoverage/keyword_registration/keyword_filter.py diff --git a/localCoverage/coverage_tools.py b/localCoverage/coverage_tools.py index ad036c7..0fb2335 100644 --- a/localCoverage/coverage_tools.py +++ b/localCoverage/coverage_tools.py @@ -176,5 +176,9 @@ if __name__ == '__main__': developer_path, "localCoverage/restore_comment/restore_source_code.py") subprocess.run("python3 %s" % restore_source_code_path, shell=True) + keyword_path = os.path.join( + developer_path, "localCoverage/keyword_registration/keyword_filter.py") + subprocess.run("python3 %s" % keyword_path, shell=True) + print(r"See the code coverage report in: " r"/test/testfwk/developer_test/localCoverage/codeCoverage/results/coverage/reports/cxx/html") diff --git a/localCoverage/keyword_registration/keyword.json b/localCoverage/keyword_registration/keyword.json new file mode 100644 index 0000000..3c742e9 --- /dev/null +++ b/localCoverage/keyword_registration/keyword.json @@ -0,0 +1,83 @@ +{ + "KEYWORD": [ + "unmap(", "ReadInterfaceToken(", "DeepCopyGroupEntry(", "readUint32(", + "ifstream fin(", "InitFunc", "AddInt64StringToJson(", "WriteFloatVector(", + "GetUint64Value(", "realpath(", "fchmod(", "SendEvent(", "WriteParcelable(", + "EVP_DecryptInit_ex(", "CreateJson(", "InitThread(", "ParcelWrite(", + "SetUint8Value(", "OnRemoveSystemAbility(", "EVP_CIPHER_CTX_ctrl(", "umount(", + "SetAttributesValue(", "GetBundleManager(", "fstat(", "ChangeModeFile(", + "HcFileClose", "FD_ISSET(", "CheckSystemAbility(", "SendTransactCmd(", + "SetUint8ArrayValue(", "GetUint8ArrayValue(", "SerializeToArray(", + "pthread_getname_np(", "LoadSymbol(", "fchown(", "InitClient(", + "ReadString16(", "TryConnect(", "EVP_PKEY_new_mac_key(", "WIFEXITED(", + "SendRequest(", "bind(", "GetInt32Value(", "read(", "ReadUint8(", "AsObject(", + "lseek(", "ChangePropertyContent(", "WriteToParcel(", "dlsym(", "pushBackT(", + "close(", "pwrite(", "SetCallRequestParamInfo(", "CheckGetSAPermission(", + "RSRenderServiceConnectHub::GetRenderService(", "readdir(", "memset_s(", + "is_object(", "curl_global_init(", "WriteTargetAndCallback", + "GetSystemAbility(", "pipe(", "fwrite(", "sendfile(", "Remote(", "HcMalloc(", + "CreateHdiScreen(", "GetDmsProxy(", "opendir(", "WriteInt32(", + "SetStringValue(", "WriteUint64(", "open(", "mkdir(", "CreateGroupEntry(", + "Peek(", "DirIterator(", "select(", "AddByteToJson(", "AddIntToJson(", + "SetDataCapacity", "RSTransactionProxy::GetInstance(", "ReadUint32(", "link(", + "Malloc(", "DeepCopyDeviceEntry(", "ParseFromString(", "CreateConnection(", + "readstring(", "AddDeathRecipient(", "WriteBuffer(", "EVP_DecryptFinal_ex(", + "GetStringValue(", "curl_multi_init(", "mount(", "InitDmsImplFunc", + "GetUint32Value(", "WriteUint16(", "umount2(", "AddObjToArray(", + "GetContextObject(", "rfind(", "SetUint16Value(", "GetDeviceProfileService(", + "statvfs(", "fork(", "sprintf_s(", "free(", "ReadInt16(", "remove(", "ioctl(", + "GetParcelableInfo(", "GetAttributesValue(", "WriteParcelableVector(", + "ReadUint64(", "HcFileRead(", "futimens(", "fputc(", + "HdiFramebufferSurface::CreateFramebufferSurface(", "popen(", "HcFileOpen(", + "ReadStringVector(", "ReadString(", "strdup(", "GetUint16ArrayValue(", + "is_discarded(", "ReadUnpadBuffer(", "WriteInt32Vector(", + "SetUint16ArrayValue(", "fgets(", "kill(", "ChangeModeDirectory(", "getline(", + "GetRealPath(", "fclose(", "GetUdid(", "newLine.empty(", "ReportSystemFault(", + "is_open(", "GetUint16Value(", "MakeShared(", "ReadParcelable&", + "AddObjToJson(", "SetUint32Value(", "EVP_EncryptInit_ex(", "make_shared&", + "CreateDeviceEntry(", "setmntent(", "OnAddSystemAbility(", "fdopen(", + "RSMainThread::Instance(", "access(", "NLMSG_SPACE(", "mmap(", + "CheckAddOrRemovePermission(", "dlopen(", "dlsy(", "malloc(", "chmod(", + "curl_multi_setopt(", "PackJsonToString(", "OnStart(", "ReadRemoteObject(", + "stat(","GetUint64ArrayValue(", "new","ifstream(", "poll(", "CmMalloc(", + "HcfMalloc(", "SetUint64ArrayValue(", "PathExists(", "StringSetPointer(", + "unlink(", "major(", "CreateSurfaceAsConsumer(", "ParcelWriteUint64(", + "EVP_DigestSignUpdate(", "ReadInt32(","ReadUInt8Vector(", "snprintf_s(", + "WriteInt16(", "readlink(", "LoadLibrary(", "WriteString16(", "memcpy_s(", + "rename(", "statfs(", "GetUint8Value(","closedir(", "mknod(","HcFileRemove(", + "getmntent_r(","fopen(", "SetBoolValue(","Seek(", "GetUint32ArrayValue(", + "strcmp(", "CheckListSAPermission(", "Mmap(","iface_cast(", + "EVP_DigestSignInit(", "UnpackInt32(", "GetSystemAbilityManager(", "Write(", + "ReadBool(","vsnprintf_s(", "IsProxyObject(","dlerror(", "WriteInt64(", + "EVP_EncryptFinal_ex(","lstat(", "IsObjectDead","Unmarshing(", "strcpy_s(", + "EVP_DecryptUpdate(","New(", "rmdir(","WriteInterfaceToken(", + "GetBoolValue(", "WriteUInt32Vector(", "ReadUInt32Vector(", "EVP_Digest(", + "ParcelWriteUint32(", "StringSet(","AddStringToJson(", "Marshalling(", + "ParcelReadUint64(", "ReadFloat(","Create(", "SaveStringToFd(","fprintf(", + "GetProxy(", "EVP_EncryptUpdate(", "ParcelRead(","Publish(","Free(", + "make_unique&","execvp(", "HdiBackend::GetInstance(", "ReadInt32Vector(", + "pushBack(", "CreateVSyncReceiver(", "WriteStringVector(", "HcGetUdid(", + "UnMarshalling(", "WriteUint32(", "ReadUInt64Vector(", "GetRemoteObject(", + "none_of(", "WriteUInt64Vector(", "CreateJsonArray(","sendmsg(", "socket(", + "SetUint32ArrayValue(", "SetInt32Value(","dprintf(", "syncfs(", + "SetUint64Value(", "RAND_bytes(","CheckGetRemoteSAPermission(", + "MarshalSAExtraProp(", "ReadFloatVector(","WriteFloat(", "Serialize(", + "syscall(", "HcFileWrite(", "ReadFromParcel(", "InitHcMutex(", "MALLOC(", + "new(", "CreateSurfaceAsProducer(", "ReadString16Vector(", "chown(", + "HcFileSize(", "HdiDevice::GetInstance(", + "RSProcessorFactory::CreateProcessor(", "ReadInt64(", "WriteRawData(", + "strncat_s(", "newLine.find(", "ReadParcelableVector(", "ChkRWError(", + "CreateOrGetScreenManager(", "fread(", "readUint64(", "HksMalloc(", + "RemoveDeathRecipient(", "sscanf_s(", "EVP_DigestSignFinal(", "ReadUint16(", + "fseek(", "promote(", "WriteUInt8Vector(", "WriteRemoteObject(", + "EVP_CIPHER_CTX_new(", "ParcelReadUint32(", "PostTask(", "ReadRawData(", + "ReadVectorFromparcel(", "WriteUint8(", "UnpackUint64(", "WriteBool(", + "ForceCreateDirectory(", "chdir(", "DelayedSingleton&", "dup2(", "write(", + "AddBoolToJson(", "EVP_MD_CTX_create(", "Alloc(", "recvmsg(", "WriteString(", + "SoftBusMutexLock(", "pthread_mutex_lock(", "RequireAuthLock(", "Lock(", + "LaneQosLock(", "SoftBusCalloc(", "SoftBusMalloc(", "cJSON_AddItemToObject(", + "RewindWrite(", "WritePointer(", "DBinderService::GetInstance(", + "IPCProcessSkeleton::GetCurrent(", "IPCThreadSkeleton::GetCurrent(", + "ISessionService::GetInstance(" + ] +} \ No newline at end of file diff --git a/localCoverage/keyword_registration/keyword_filter.py b/localCoverage/keyword_registration/keyword_filter.py new file mode 100644 index 0000000..c23d3c8 --- /dev/null +++ b/localCoverage/keyword_registration/keyword_filter.py @@ -0,0 +1,925 @@ +#!/usr/bin/env python3 +# coding=utf-8 + +# +# Copyright (c) 2023 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import re +import time +import json +import fcntl +import platform +import linecache +import traceback +from multiprocessing import Pool +from lxml import html +from selectolax.parser import HTMLParser + +etree = html.etree + + +class CoverageReportPath: + def __init__(self, report_path): + self.report_path = report_path + + def gcov_file_generator(self): + """ + gcov文件生成器 + """ + for root_path, dir, files in os.walk(self.report_path): + for file in files: + if not file.endswith(".gcov.html"): + continue + file_path = os.path.join(root_path, file) + yield file_path + + def modify_report_style(self): + """ + 修改覆盖率报告css样式 + """ + css_file_path = "" + for root_path, dir, files in os.walk(self.report_path): + for file in files: + if file == "gcov.css": + css_file_path = os.path.join(root_path, file) + text = """ +/* update */ +span.linenumupdate, span.branchexecupdate, span.branchnoexecupdate, +span.linecovupdate, span.linenocovupdate, span.branchcovupdate, +span.branchnocovupdate +{ + background-color:#BBBBBB; +}""" + with open(css_file_path, "a+", encoding="utf-8") as file: + file.write(text) + + def get_statistic_path(self, gcov_file_path: str): + """ + 根据gcov.html路径和统计等级获取统计报告路径 + """ + file_name_list = ["index.html", "index-sort-b.html", + "index-sort-f.html", "index-sort-l.html"] + index_path_list = [ + os.path.normcase(os.path.join(os.path.dirname(gcov_file_path), file_name)) + for file_name in file_name_list + ] + out_path_list = [ + os.path.normcase(os.path.join(self.report_path, file_name)) + for file_name in file_name_list + ] + index_path_list.extend(out_path_list) + return index_path_list + + +class KeywordRegistration: + def __init__(self, report_path): + self.report_path = report_path + self.keyword_file_path = os.path.normcase( + os.path.join(os.path.dirname(__file__), "keyword.json")) + + def get_keyword_info(self): + """ + 获取报备关键字信息 + """ + try: + with open(self.keyword_file_path, "r") as file: + keyword_dict = json.load(file) + keyword_list = keyword_dict.get("KEYWORD") + return keyword_list + except (FileNotFoundError, AttributeError, FileExistsError) as error: + print(f"获取报备过滤关键字报错,error:{error}") + + @staticmethod + def get_coverage_content(file_path): + """ + 获取覆盖率文件内容 + """ + if not os.path.exists(file_path): + return + with open(file_path, "r", encoding="utf-8") as file: + content = file.read() + return content + + @staticmethod + def get_tag(content, line): + start_index = content.find(f'")[0] + + if source_code: + source_code = "".join(source_code) + return source_code + + @staticmethod + def update_tag(tag: str): + replace_item_list = ["lineNum", "branchNoCov", "branchCov", "lineCov", + "branchNoExec", "lineNoCov", "branchExec"] + + for item in replace_item_list: + if item in tag: + replace_item = (item + "Update").lower() + tag = tag.replace(item, replace_item) + return tag + + def get_coverage_lines_by_branch(self, file_path, content=None): + """ + 获取覆盖率报告中的所有的if分支行号 + """ + if not content: + content = self.get_coverage_content(file_path) + if not content: + return + _, file_name = os.path.split(file_path) + branch_line_list = re.findall( + r''' 200: + return None, None + next_tag = get_tag(content, str(next_line)) + func_body_tag_list_append(next_tag) + if "{" in next_tag: + if branch_braces_sub: + branch_braces_sub += 1 + braces_sub += 1 + if "}" in next_tag: + if branch_braces_sub: + branch_braces_sub -= 1 + braces_sub -= 1 + if 'title="Branch' in next_tag: + branch_braces_sub += 1 + sub_brace_start_status = True + if not branch_braces_sub and \ + (sub_brace_start_status == sub_brace_end_status): + next_tag = update_tag(next_tag) + if not branch_braces_sub and sub_brace_start_status: + sub_brace_end_status = True + update_func_body_tag_list_append(next_tag) + next_line += 1 + return func_body_tag_list, update_func_body_tag_list + + def get_line_funcname(self, branch_line, content): + """ + 获取行号源码所在函数的函数名 + """ + function_name = "" + previous_line = int(branch_line) + loop_count = 0 + get_tag = self.get_tag + try: + while not function_name: + loop_count += 1 + if loop_count > 200: + return + previous_line -= 1 + tag = get_tag(content, previous_line) + tag_text = HTMLParser(tag).text() + if tag_text[39:].strip().startswith("{"): + left_parentheses_count = right_parentheses_count = 0 + child_count = 0 + while previous_line: + child_count += 1 + if child_count > 200: + return + previous_line -= 1 + html_text = get_tag(content, previous_line) + source_code = HTMLParser(html_text).text()[39:].strip() + + left_parentheses_count += source_code.count("(") + right_parentheses_count += source_code.count(")") + if not left_parentheses_count == right_parentheses_count: + continue + if " operator" in source_code: + class_name_list = re.findall(r'\((.*?)\)', source_code) + if class_name_list: + function_name_str = class_name_list[0] + if not function_name_str: + return + function_name = function_name_str.split()[-1] + return function_name + function_name_list = re.findall(r' (.*?)\(', source_code) + if function_name_list: + function_name_str = function_name_list[0] + if not function_name_str: + return + function_name = function_name_str.split()[-1] + return function_name + except (OSError, IndexError, TypeError) as error: + print(f"覆盖率报告{branch_line}行获取函数名报错, error:{error}", + traceback.format_exc()) + + @staticmethod + def get_branch_line_list(keyword_line: int, branch_line_list: list): + """ + 获取大于关键字行号的所有分支行号 + """ + if keyword_line in branch_line_list: + index = branch_line_list.index(keyword_line) + branch_line_list = branch_line_list[index:] + else: + for line in branch_line_list: + if line > keyword_line: + index = branch_line_list.index(line) + branch_line_list = branch_line_list[index:] + break + return branch_line_list + + @staticmethod + def get_keyword_judge_char(keyword, keyword_source_code): + """ + 获取关键字替代字符 + """ + if "&" in keyword: + keyword = keyword.replace("&", "<") + + keyword_index = keyword_source_code.find(keyword) + if keyword_index == -1: + return + + try: + keyword_code = keyword_source_code[:keyword_index + len(keyword)] + if " = " in keyword_code: + judge_key = keyword_code.split(" = ")[0].split()[-1] + else: + bracket_index = keyword_code.find("(") + bracket_code = keyword_code[:bracket_index] + judge_key = bracket_code.split()[-1] + return judge_key + except (IndexError, ValueError) as error: + print("获取关键字替代字符", error, traceback.format_exc()) + + @staticmethod + def get_branch_data_by_tag(tag_html: str, symbol_status=None): + """ + 根据前端标签获取分支数据 + """ + if symbol_status: + key = r"#+\-*" + else: + key = r"#+\-" + branch_line_list = re.findall(rf"> ([{key}]) ", tag_html) + + return branch_line_list + + @staticmethod + def get_judge_condition_index(judge_key: str, source_code: str): + """ + 获取判断条件索引 + """ + keyword_index_list = [] + keyword_index_list_append = keyword_index_list.append + + condition_str_list = re.split(rf"\|\||&&", source_code) + for index, code_str in enumerate(condition_str_list): + if judge_key in code_str: + keyword_index_list_append(index) + return keyword_index_list, condition_str_list + + @staticmethod + def update_source_code_tag(html_tag: str): + replace_item_list = ["lineNum", "lineCov", "lineNoCov"] + + for item in replace_item_list: + if item in html_tag: + replace_item = (item + "Update").lower() + html_tag = html_tag.replace(item, replace_item) + + return html_tag + + def get_break_line_tag(self, content, origin_branch_html, branch_line): + """ + 获取分支换行的判断条件源码tag + """ + get_tag = self.get_tag + left_brace_exist = False + line_break = loop_count = 0 + while not left_brace_exist: + if loop_count > 10: + break + line_break += 1 + origin_branch_html += "\n" + next_line_tag = get_tag(content, branch_line + line_break) + if "{" in next_line_tag: + left_brace_exist = True + origin_branch_html += next_line_tag + loop_count += 1 + return origin_branch_html + + @staticmethod + def modify_tag_style(tag, rate): + """ + 修改标签样式 + """ + if 75 <= rate < 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryMed") + tag = tag.replace("coverPerLo", "coverPerMed") + tag = tag.replace("coverNumLo", "coverNumMed") + elif rate >= 90: + tag = tag.replace("headerCovTableEntryLo", "headerCovTableEntryHi") + tag = tag.replace("headerCovTableEntryMed", "headerCovTableEntryHi") + tag = tag.replace("coverPerLo", "coverPerHi") + tag = tag.replace("coverNumLo", "coverNumHi") + tag = tag.replace("coverPerMed", "coverPerHi") + tag = tag.replace("coverNumMed", "coverNumHi") + return tag + + def update_coverage_ratio_tag(self, file_path): + """ + 修改覆盖率比率数据 + """ + with open(file_path, "r", encoding="utf-8") as coverage_file: + content = coverage_file.read() + + covered_nums = len(re.findall(r"> ([+]) ", content)) + branch_total = len(re.findall(r"> ([+\-#]) ", content)) + + line = 0 + start_line = 1 + content_list = content.splitlines() + for line_text in content_list: + if 'Branches:' in line_text: + line = start_line + break + start_line += 1 + linecache.clearcache() + coverage_tag = linecache.getline(file_path, line) + covered_tag = linecache.getline(file_path, line + 1) + coverage_total_tag = linecache.getline(file_path, line + 2) + try: + origin_hit = int(re.findall(r"(\d+)", covered_tag)[0]) + origin_total = int(re.findall(r"(\d+)", coverage_total_tag)[0]) + coverage_rate_tag = linecache.getline(file_path, line + 3) + replace_tag = coverage_tag + covered_tag + coverage_total_tag + \ + coverage_rate_tag + if origin_total - branch_total == 0 and origin_hit - covered_nums == 0: + return + if int(branch_total) == 0: + coverage = 0.0 + else: + coverage = round(int(covered_nums) / int(branch_total) * 100, 1) + + origin_rate = re.findall(r"(\d+.\d+)", coverage_rate_tag)[0] + update_tag = coverage_tag + covered_tag.replace( + str(origin_hit), str(covered_nums)) + \ + coverage_total_tag.replace(str(origin_total), + str(branch_total)) + \ + coverage_rate_tag.replace(origin_rate, str(coverage)) + + update_tag = self.modify_tag_style(update_tag, coverage) + if replace_tag in content: + content = content.replace(replace_tag, update_tag) + + with open(file_path, "w", encoding="utf-8") as file: + file.write(content) + + # 修改数据统计页面 + hit_shield_num = origin_hit - covered_nums + total_shield_num = origin_total - branch_total + self.update_statistic(file_path, hit_shield_num, total_shield_num) + except (IndexError, TypeError, FileNotFoundError) as error: + print("修改分支数据", error, traceback.format_exc()) + + def update_statistic(self, file_path, hit_shield_num, total_shield_num): + """ + 修改覆盖率分支数据 + """ + index_path_list = CoverageReportPath(self.report_path).get_statistic_path(file_path) + for index_path in index_path_list: + if not os.path.exists(index_path): + continue + + with open(index_path, "r", encoding="utf-8") as coverage_file: + fcntl.flock(coverage_file.fileno(), fcntl.LOCK_EX) + content = coverage_file.read() + + file_dir, file_name = os.path.split(file_path) + if os.path.normcase(file_dir) not in os.path.normcase(index_path): + if platform.system() == "Windows": + judge_item = file_dir.replace(self.report_path + "\\", "") + judge_item = ">" + judge_item.replace("\\", "/") + "" + else: + judge_item = file_dir.replace(self.report_path + "/", "") + judge_item = ">" + judge_item + "" + else: + judge_item = ">" + file_name.replace(".gcov.html", "") + + start_line = 1 + line = tag_line = 0 + content_list = content.splitlines() + tmp = 'Branches:' + for line_text in content_list: + if not line and tmp in line_text: + line = start_line + if judge_item in line_text: + tag_line = start_line + break + start_line += 1 + try: + # total statistic modify + linecache.clearcache() + coverage_head_tag = linecache.getline(index_path, line) + origin_hit_tag = linecache.getline(index_path, line + 1) + origin_hit_num = int(re.findall(r"(\d+)", origin_hit_tag)[0]) + origin_total_tag = linecache.getline(index_path, line + 2) + origin_total_num = int(re.findall(r"(\d+)", origin_total_tag)[0]) + origin_rate_tag = linecache.getline(index_path, line + 3) + origin_rate = re.findall(r"(\d+.\d+)", origin_rate_tag)[0] + replace_tag = coverage_head_tag + origin_hit_tag + \ + origin_total_tag + origin_rate_tag + + # file branch modify + no_change_file_tag = "" + for i in range(8): + no_change_file_tag += linecache.getline(index_path, tag_line + i) + file_rate_tag = linecache.getline(index_path, tag_line + 8) + file_covered_tag = linecache.getline(index_path, tag_line + 9) + file_tag = no_change_file_tag + file_rate_tag + \ + file_covered_tag + origin_file_covered_num, origin_file_total_num = map( + int, re.findall(r"(\d+)", file_covered_tag)) + if origin_file_total_num: + origin_file_rate = re.findall(r"(\d+.\d+)", file_rate_tag)[0] + else: + # 不存在覆盖率 + origin_file_rate = "-" + head_hit_num = origin_hit_num - hit_shield_num + head_total_num = origin_total_num - total_shield_num + if head_total_num == 0: + rate = 0.0 + else: + rate = round(head_hit_num / head_total_num * 100, 1) + file_hit_num = origin_file_covered_num - hit_shield_num + file_total_num = origin_file_total_num - total_shield_num + if file_total_num == 0: + file_rate = 0.0 + else: + file_rate = round(file_hit_num / file_total_num * 100, 1) + update_tag = coverage_head_tag + origin_hit_tag.replace( + str(origin_hit_num), str(head_hit_num)) + \ + origin_total_tag.replace(str(origin_total_num), + str(head_total_num)) + \ + origin_rate_tag.replace(origin_rate, str(rate)) + + update_tag = self.modify_tag_style(update_tag, rate) + + update_branch_tag = file_rate_tag.replace( + origin_file_rate, str(file_rate)) + \ + file_covered_tag.replace( + str(origin_file_covered_num) + " / " + + str(origin_file_total_num), str(file_hit_num) + " / " + + str(file_total_num)) + + update_branch_tag = self.modify_tag_style(update_branch_tag, file_rate) + update_branch_tag = no_change_file_tag + update_branch_tag + if replace_tag in content: + content = content.replace(replace_tag, update_tag) + if file_tag in content: + content = content.replace(file_tag, update_branch_tag) + with open(index_path, "w", encoding="utf-8") as file: + file.write(content) + fcntl.flock(file.fileno(), fcntl.LOCK_UN) + time.sleep(1) + except (IndexError, TypeError, FileNotFoundError) as error: + print("修改分支统计数据报错", error, traceback.format_exc()) + + def _check_if_branch_line(self, judge_key, sub_branch_line_list, + key_line, content, function_name): + """ + 确定if分支行号 + """ + if_branch_line = None + for branch_line in sub_branch_line_list: + if branch_line == key_line: + if_branch_line = key_line + break + # 获取分支行所在函数名 + branch_function_name = self.get_line_funcname( + branch_line, content) + # 关键字范围只在关键字所在函数之内 + branch_line_tag = self.get_tag(content, branch_line) + try: + if "{" not in branch_line_tag: + branch_line_tag = self.get_break_line_tag( + content, branch_line_tag, branch_line) + branch_line_source_code = self.get_source_code( + branch_line_tag) + if function_name == branch_function_name: + if judge_key in branch_line_source_code: + if_branch_line = branch_line + break + else: + break + except: + print("获取关键字if分支行报错", traceback.format_exc()) + + return if_branch_line + + @staticmethod + def _branch_replace(branch): + """ + 分支符号替换 + """ + if branch == "#": + branch = "> # <" + elif branch == "+": + branch = "> + <" + elif branch == "*": + branch = "> * <" + else: + branch = "> - <" + return branch + + @staticmethod + def _single_condition_modify_html(branch_html, branch_list): + """ + 单条件修改代码块html + """ + line_item = ' ' + line_feed_index = branch_html.find(line_item) + if line_feed_index == -1: + if "+" in branch_list: + update_branch_tag = branch_html.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + else: + try: + first_branch = branch_list[0] + first_branch = "> " + first_branch + " <" + first_branch_index = branch_html.find(first_branch) + branch_tag = branch_html[:first_branch_index + 5] + update_branch_tag = branch_html[first_branch_index + 5:] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag = branch_tag + update_branch_tag + except: + return + else: + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index + len(line_item) + 1] + if "-" not in branch_list and "+" not in branch_list: + del_count = update_branch_tag.count("> # <") + update_branch_tag = update_branch_tag.replace("> # <", "> <", del_count - 1) + else: + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_html[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + if line_feed_index == -1: + branch_tag = branch_tag.replace("> - <", "> <") + branch_tag = branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + else: + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + try: + update_branch_tag += branch_tag[:line_feed_index + len(line_item) + 1] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + branch_tag = branch_tag[line_feed_index + len(line_item) + 1:] + line_feed_index = branch_tag.find(line_item) + except: + return + + branch_tag = branch_tag.replace("> - <", "> <") + update_branch_tag = update_branch_tag.replace("> # <", "> <") + update_branch_tag += branch_tag + return update_branch_tag + + def _multi_condition_modify_html(self, branch_html, branch_length, + condition_str_list, judge_index_list): + """ + 多条件修改代码块html + """ + line_item = ' ' + if branch_length % len(condition_str_list): + line_feed_index = branch_html.find(line_item) + update_branch_tag = branch_html[:line_feed_index] + update_branch_tag = update_branch_tag.replace("> - <", "> <") + branch_html = branch_html[line_feed_index:] + loop_count = 0 + while line_feed_index + 1: + loop_count += 1 + if loop_count > 200: + continue + line_feed_index = branch_html.count(line_item) + if line_feed_index > 1: + try: + line_feed_length = len(line_item) + branch_tag_before = branch_html[:line_feed_length] + branch_html = branch_html[line_feed_length:] + line_feed_index = branch_html.find(line_item) + branch_tag_after = branch_html[:line_feed_index] + branch_tag_after = branch_tag_after.replace("> - <", "> <") + branch_tag = branch_tag_before + branch_tag_after + except: + return + else: + branch_tag = branch_html + branch_tag = branch_tag.replace("> - <", "> <") + # 不再换行,索引为-1 + line_feed_index = -1 + + update_branch_tag += branch_tag + if line_feed_index == -1: + branch_html = "" + else: + branch_html = branch_html[line_feed_index:] + if branch_html != "": + branch_html = branch_html.replace("> - <", "> <") + update_branch_tag += branch_html + else: + branch_list = self.get_branch_data_by_tag(branch_html, True) + update_branch_tag = "" + end_count = -1 + try: + for index in judge_index_list: + branch_data = branch_list[:(index + 1) * 2] + # 要修改的分支数据列表长度 + branch_data_length = len(branch_data) + change_status = False + for count, branch in enumerate(branch_data, 1): + if count <= end_count: + continue + + end_count = count + branch = self._branch_replace(branch) + end_index = branch_html.find(branch) + branch_tag = branch_html[:end_index + 5] + if branch_data_length - count in [0, 1]: + if change_status: + continue + if branch == "> # <": + change_status = True + branch_tag = branch_tag.replace("> # <", "> * <") + elif branch == "> - <": + change_status = True + branch_tag = branch_tag.replace("> - <", "> * <") + update_branch_tag += branch_tag + branch_html = branch_html[end_index + 5:] + except: + return + update_branch_tag += branch_html + return update_branch_tag + + def judge_branch_exists(self, file_path): + """ + 判断报告是否存在分支 + """ + content = self.get_coverage_content(file_path) + branch_total = len(re.findall(r"> ([+\-#]) ", content)) + if branch_total: + return True + else: + return False + + def modify_branch(self, file_path, keyword_list): + """ + html报告分支屏蔽修改 + """ + branch_line_list = self.get_coverage_lines_by_branch(file_path) + if not branch_line_list: + return True + + branch_line_list.sort() + no_change_content = self.get_coverage_content(file_path) + for keyword in keyword_list: + content = self.get_coverage_content(file_path) + keyword_line_list = self.get_coverage_lines_by_keyword(keyword, content) + if not keyword_line_list: + continue + for key_line in keyword_line_list: + key_html = self.get_tag(content, key_line) + if not key_html or "LOG" in key_html: + continue + + function_name = self.get_line_funcname(key_line, content) + sub_branch_line_list = self.get_branch_line_list( + key_line, branch_line_list) + keyword_source_code = self.get_source_code(key_html) + if not all([function_name, keyword_source_code, + sub_branch_line_list]): + continue + + judge_key = keyword + if 'title="Branch' not in key_html: + judge_key = self.get_keyword_judge_char( + keyword, keyword_source_code) + if "*" in judge_key: + judge_key = judge_key.replace("*", "") + if "&" in judge_key: + judge_key = judge_key.replace("&", "") + if_branch_line = self._check_if_branch_line( + judge_key, sub_branch_line_list, key_line, + content, function_name) + + if not if_branch_line: + continue + + origin_branch_html = self.get_tag(content, if_branch_line) + branch_html = origin_branch_html + if "{" not in origin_branch_html: + origin_branch_html = self.get_break_line_tag( + content, origin_branch_html, if_branch_line) + branch_html = origin_branch_html + top_source_code = self.get_source_code(branch_html) + no_change_branch_html = self.get_tag( + no_change_content, if_branch_line) + branch_list = self.get_branch_data_by_tag(no_change_branch_html) + if not branch_list: + continue + if "#" not in branch_list and "-" not in branch_list: + continue + branch_tag_data = self.get_branch_data_by_tag(branch_html, True) + if len(branch_tag_data) <= 1: + continue + if len(branch_list) == 2: + if "-" in branch_list: + update_branch_tag = branch_html.replace("> - <", "> <") + else: + update_branch_tag = branch_html.replace("> # <", "> <", 1) + else: + branch_length = len(branch_list) + judge_index_list, condition_str_list = self.\ + get_judge_condition_index(judge_key, top_source_code) + if not len(judge_index_list): + continue + if len(condition_str_list) == 1: + update_branch_tag = self._single_condition_modify_html( + branch_html, branch_list) + else: + update_branch_tag = self._multi_condition_modify_html( + branch_html, branch_length, condition_str_list, + judge_index_list) + if not update_branch_tag: + return False + + update_branch_tag = self.update_source_code_tag( + update_branch_tag) + content = content.replace(origin_branch_html, update_branch_tag) + source_code_tag_list, update_source_code_tag_list = self.\ + code_body_judge(int(if_branch_line), content) + if source_code_tag_list: + source_code_tag_html = "\n".join(source_code_tag_list) + update_source_code_tag_html = "\n".join( + update_source_code_tag_list) + content = content.replace( + source_code_tag_html, update_source_code_tag_html) + with open(file_path, "w", encoding="utf-8") as new_html: + new_html.write(content) + + content = self.get_coverage_content(file_path) + content = content.replace('> * ', '> ') + with open(file_path, "w", encoding="utf-8") as new_html: + new_html.write(content) + + def keyword_registration(self, file_path, keyword_list): + """ + 报备 + """ + self.modify_branch(file_path, keyword_list) + + # 修改覆盖率数据 + self.update_coverage_ratio_tag(file_path) + return True + + def multiprocessing_registration(self): + """ + 多进程关键字报备 + """ + keyword_list = self.get_keyword_info() + if not keyword_list: + return + + # 创建报告路径生成器 + report_instance = CoverageReportPath(self.report_path) + # 修改css文件 + report_instance.modify_report_style() + # 创建报告路径生成器 + report_generator = report_instance.gcov_file_generator() + # 获取关键字 + keyword_registration = self.keyword_registration + + pool = Pool(10) + apply_async = pool.apply_async + while True: + file_path = next(report_generator, None) + if not file_path: + break + + # 关键字报备 + apply_async(keyword_registration, args=(file_path, keyword_list)) + + pool.close() + pool.join() + + +def main(report_path): + print("*" * 50, "报备开始", "*" * 50) + manager = KeywordRegistration(report_path) + start_time = time.time() + manager.multiprocessing_registration() + end_time = time.time() + times = round(end_time - start_time, 3) + print("*" * 50, "报备结束", "*" * 50, "耗时:", times) + + +if __name__ == '__main__': + current_path = os.getcwd() + root_path = current_path.split("/test/testfwk/developer_test")[0] + developer_path = os.path.join(root_path, "test/testfwk/developer_test") + report_path = os.path.join(developer_path, "localCoverage/codeCoverage/results/coverage/reports/cxx/html") + main(report_path) diff --git a/localCoverage/restore_comment/build_before_generate.py b/localCoverage/restore_comment/build_before_generate.py index 086c79c..c264cba 100644 --- a/localCoverage/restore_comment/build_before_generate.py +++ b/localCoverage/restore_comment/build_before_generate.py @@ -84,11 +84,14 @@ def rewrite_source_file(source_path_list: list): for key in keys: if key in line and line.strip().startswith(key): write_fp.write(line) + break elif " //LCOV_EXCL_BR_LINE" not in line and not line.strip().endswith("\\"): write_fp.write(line.strip("\n").strip("\n\r") + " //LCOV_EXCL_BR_LINE") write_fp.write("\n") - else: + break + elif key == keys[-1]: write_fp.write(line) + break break os.remove(path) -- Gitee