diff --git a/build/third_party_gn/icu/icu4c/BUILD.gn b/build/third_party_gn/icu/icu4c/BUILD.gn index 67d5a36aff5ae0f08a6358f4c6a7e1d678635678..ad2d5094edb773f42fc5cf09cd77aaa988fcba9c 100644 --- a/build/third_party_gn/icu/icu4c/BUILD.gn +++ b/build/third_party_gn/icu/icu4c/BUILD.gn @@ -509,7 +509,7 @@ ohos_shared_library("shared_icuuc") { ":icu_config", "$build_root/config/compiler:rtti", ] - all_dependent_configs = [ ":static_icustubdata_all_deps_config" ] + public_configs = [ ":static_icustubdata_all_deps_config" ] defines = [ "U_ATTRIBUTE_DEPRECATED=", "U_COMMON_IMPLEMENTATION", diff --git a/test/autotest/aw/all_utils.py b/test/autotest/aw/all_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..287108a94aa421850d119625a83901efdffc603f --- /dev/null +++ b/test/autotest/aw/all_utils.py @@ -0,0 +1,647 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Copyright (c) 2024 Huawei Device Co., Ltd. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Description: Utils for action words. +""" + +import asyncio +import os +import re +import subprocess +import threading +import time +from typing import Union + +from hypium import SystemUI, BY, MatchPattern + +from fport import Fport +from taskpool import TaskPool +from websocket import WebSocket + + +class TimeRecord: + def __init__(self, expected_time, actual_time): + self.expected_time = expected_time + self.actual_times = [actual_time] + + def avg_actual_time(self): + return sum(self.actual_times) / len(self.actual_times) + + def max_actual_time(self): + return max(self.actual_times) + + def min_actual_time(self): + return min(self.actual_times) + + def add_actual_time(self, actual_time): + self.actual_times.append(actual_time) + + def avg_proportion(self): + return self.avg_actual_time * 100 / self.expected_time + + def rounds(self): + return len(self.actual_times) + + def mid_actual_time(self): + self.actual_times.sort() + return self.actual_times[len(self.actual_times) // 2] + + +class CommonUtils(object): + def __init__(self, driver): + self.driver = driver + + @staticmethod + async def communicate_with_debugger_server(websocket, connection, command, message_id): + ''' + Assembles and send the commands, then return the response. + Send message to the debugger server corresponding to the to_send_queue. + Return the response from the received_queue. + ''' + command['id'] = message_id + await websocket.send_msg_to_debugger_server(connection.instance_id, connection.send_msg_queue, command) + response = await websocket.recv_msg_of_debugger_server(connection.instance_id, + connection.received_msg_message) + return response + + @staticmethod + def get_custom_protocols(): + protocols = ["removeBreakpointsByUrl", + "setMixedDebugEnabled", + "replyNativeCalling", + "getPossibleAndSetBreakpointByUrl", + "dropFrame", + "setNativeRange", + "resetSingleStepper", + "callFunctionOn", + "smartStepInto", + "saveAllPossibleBreakpoints"] + return protocols + + @staticmethod + def message_id_generator(): + message_id = 1 + while True: + yield message_id + message_id += 1 + + @staticmethod + def assert_equal(actual, expected): + ''' + 判断actual和expected是否相同,若不相同,则抛出异常 + ''' + assert actual == expected, f"\nExpected: {expected}\nActual: {actual}" + + @staticmethod + def get_variables_from_properties(properties, prefix_name): + ''' + properties是Runtime.getProperties协议返回的变量信息 + 该方法会根据prefix_name前缀匹配properties中的变量名,符合的变量会以字典形式返回变量名和相关描述 + 描述首先采用description中的内容,但会删掉其中的哈希值,没有description则会采用subtype或type中的内容 + ''' + variables = {} + for var in properties: + if not var['name'].startswith(prefix_name): + continue + name = var['name'] + value = var['value'] + description = value.get('description') + if description is not None: + index_of_at = description.find('@') + if index_of_at == -1: + variables[name] = description + continue + variables[name] = description[:index_of_at] + index_of_bracket = description.find('[', index_of_at + 1) + if index_of_bracket != -1: + variables[name] += description[index_of_bracket:] + else: + subtype = value.get('subtype') + variables[name] = subtype if subtype is not None else value.get('type') + return variables + + def get_pid(self, bundle_name): + ''' + 获取bundle_name对应的pid + ''' + pid = self.driver.shell("pidof " + bundle_name).strip() + if not pid.isdigit(): + return 0 + self.driver.log_info(f'pid of {bundle_name}: ' + pid) + return int(pid) + + def attach(self, bundle_name): + ''' + 通过bundle_name使指定应用进入调试模式 + ''' + attach_result = self.driver.shell(f"aa attach b {bundle_name}").strip() + self.driver.log_info(attach_result) + self.assert_equal(attach_result, 'attach app debug successfully.') + + def connect_server(self, config): + ''' + 根据config连接ConnectServer + ''' + fport = Fport(self.driver) + fport.clear_fport() + connect_server_port = fport.fport_connect_server(config['connect_server_port'], config['pid'], + config['bundle_name']) + assert connect_server_port > 0, 'Failed to fport connect server for 3 times, the port is very likely occupied' + config['connect_server_port'] = connect_server_port + config['websocket'] = WebSocket(self.driver, config['connect_server_port'], config['debugger_server_port'], + config.get('print_protocol', True)) + config['taskpool'] = TaskPool() + + def hot_reload(self, hqf_path: Union[str, list]): + ''' + 根据hqf_path路径对应用进行热重载 + ''' + assert isinstance(hqf_path, (str, list)), 'Tyep of hqf_path is NOT string or list!' + if isinstance(hqf_path, str): + cmd = f'bm quickfix -a -f {hqf_path} -d' + elif isinstance(hqf_path, list): + cmd = f'bm quickfix -a -f {" ".join(hqf_path)} -d' + self.driver.log_info('hot reload: ' + cmd) + result = self.driver.shell(cmd).strip() + self.driver.log_info(result) + self.assert_equal(result, 'apply quickfic succeed.') + + async def async_wait_timeout(self, task, timeout=3): + ''' + 在timeout内执行task异步任务,若执行超时则抛出异常 + ''' + try: + result = await asyncio.wait_for(task, timeout) + return result + except asyncio.TimeoutError: + self.driver.log_error('await timeout!') + + def hdc_target_mount(self): + ''' + 挂载设备文件系统 + ''' + cmd = 'target mount' + self.driver.log_info('Mount finish: ' + cmd) + result = self.driver.hdc(cmd).strip() + self.driver.log_info(result) + self.assert_equal(result, 'Mount finish') + + def hdc_file_send(self, source, sink): + ''' + 将source中的文件发送到设备的sink路径下 + ''' + cmd = f'file send {source} {sink}' + self.driver.log_info(cmd) + result = self.driver.hdc(cmd) + self.driver.log_info(result) + assert 'FileTransfer finish' in result, result + + def clear_fault_log(self): + ''' + 清楚故障日志 + ''' + cmd = 'rm /data/log/faultlog/faultlogger/*' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + self.driver.log_info(result) + assert 'successfully' in result, result + + def save_fault_log(self, log_path): + ''' + 保存故障日志到log_path + ''' + if not os.path.exists(log_path): + os.makedirs(log_path) + + cmd = f'file recv /data/log/faultlog/faultlogger/ {log_path}' + self.driver.log_info(cmd) + result = self.driver.hdc(cmd) + self.driver.log_info(result) + assert 'successfully' in result, result + + +class UiUtils(object): + def __init__(self, driver): + self.driver = driver + + def get_screen_size(self): + ''' + 获取屏幕大小 + ''' + screen_info = self.driver.shell(f"hidumper -s RenderService -a screen") + match = re.search(r'physical screen resolution: (\d+)x(\d+)', screen_info) + # 新版本镜像中screen_info信息格式有变,需要用下方正则表达式获取 + if match is None: + match = re.search(r'physical resolution=(\d+)x(\d+)', screen_info) + assert match is not None, f"screen_info is incorrect: {screen_info}" + return int(match.group(1)), int(match.group(2)) + + def click(self, x, y): + ''' + 点击屏幕指定坐标位置 + ''' + click_result = self.driver.shell(f"uinput -T -c {x} {y}") + self.driver.log_info(click_result) + assert "click coordinate" in click_result, f"click_result is incorrect: {click_result}" + + def click_on_middle(self): + ''' + 点击屏幕中心 + ''' + width, height = self.get_screen_size() + middle_x = width // 2 + middle_y = height // 2 + self.click(middle_x, middle_y) + + def back(self): + ''' + 返回上一步 + ''' + cmd = 'uitest uiInput keyEvent Back' + self.driver.log_info('click the back button: ' + cmd) + result = self.driver.shell(cmd).strip() + self.driver.log_info(result) + CommonUtils.assert_equal(result, 'No Error') + + def keep_awake(self): + ''' + 保持屏幕常亮,要在屏幕亮起时使用该方法才有效 + ''' + cmd = 'power-shell wakeup' + result = self.driver.shell(cmd).strip() + assert "WakeupDevice is called" in result, result + cmd = 'power-shell timeout -o 214748647' + result = self.driver.shell(cmd).strip() + assert "Override screen off time to 214748647" in result, result + cmd = 'power-shell setmode 602' + result = self.driver.shell(cmd).strip() + assert "Set Mode Success!" in result, result + self.driver.log_info('Keep the screen awake Success!') + + def open_control_center(self): + ''' + 滑动屏幕顶部右侧打开控制中心 + ''' + width, height = self.get_screen_size() + start = (int(width * 0.75), 20) + end = (int(width * 0.75), int(height * 0.3)) + cmd = f"uinput -T -m {start[0]} {start[1]} {end[0]} {end[1]} 500" + self.driver.log_info('open control center') + result = self.driver.shell(cmd) + self.driver.log_info(result) + + def click_location_component(self): + ''' + 点击控制中心中的位置控件 + ''' + self.open_control_center() + self.driver.wait(1) + width, height = self.get_screen_size() + self.click(int(width * 0.2), int(height * 0.95)) + self.back() + + def disable_location(self): + ''' + 关闭位置信息(GPS),不能在异步任务中使用 + ''' + self.driver.wait(1) + SystemUI.open_control_center(self.driver) + comp = self.driver.find_component(BY.key("ToggleBaseComponent_Image_location", MatchPattern.CONTAINS)) + result = comp.getAllProperties() + try: + bg = result.backgroudColor + value = int("0x" + bg[3:], base=16) + # 读取背景颜色判断开启或关闭 + if value < 0xffffff: + comp.click() + self.driver.wait(1) + self.driver.press_back() + except Exception as e: + raise RuntimeError("Fail to disable location") + + def enable_location(self): + ''' + 开启位置信息(GPS),不能在异步任务中使用 + ''' + self.driver.wait(1) + SystemUI.open_control_center(self.driver) + comp = self.driver.find_component(BY.key("ToggleBaseComponent_Image_location", MatchPattern.CONTAINS)) + result = comp.getAllProperties() + try: + bg = result.backgroudColor + value = int("0x" + bg[3:], base=16) + # 读取背景颜色判断开启或关闭 + if value == 0xffffff: + comp.click() + self.driver.wait(1) + self.driver.press_back() + except Exception as e: + raise RuntimeError("Fail to disable location") + + +class PerformanceUtils(object): + def __init__(self, driver): + self.driver = driver + self.time_data = {} + self.file_path_hwpower_genie_engine = "/system/app" + self.file_name_hwpower_genie_engine = "HwPowerGenieEngine3" + self.cpu_num_list = [0] * 3 + self.cpu_start_id = [0] * 3 + self.total_cpu_num = 0 + self.cpu_cluster_num = 0 + self.prev_mode = 0 + self.board_ipa_support = False + self.perfg_version = "0.0" + + def show_performace_data_in_html(self): + ''' + 将性能数据以html格式展示出来 + ''' + header = ['测试用例', '执行次数', '期望执行时间(ms)', '最大执行时间(ms)', + '最小执行时间(ms)', '执行时间中位数(ms)', '平均执行时间(ms)', + '平均执行时间/期望执行时间(%)'] + content = [] + failed_cases = [] + for key, value in self.time_data.items(): + if value.avg_proportion() >= 130: + failed_cases.append(key) + content.append([key, value.rounds(), value.expected_time, value.max_actual_time(), + value.min_actual_time(), value.mid_actual_time(), + f'{value.avg_actual_time()}:.2f', + f'{value.avg_proportion()}:.2f']) + table_html = '{i}' + header_html += '' + content_html = 'tbody' + for row in content: + row_html = '' if row[0] in failed_cases else '' + for i in row: + row_html += f'' + row_html += '' + content_html += row_html + table_html += header_html + table_html += content_html + table_html += '
{i}
' + self.driver.lg_info(table_html) + assert len(failed_cases) == 0, "The following cases have obvious deterioration: " + " ".join(failed_cases) + + def add_time_data(self, case_name: str, expected_time: int, actual_time: int): + ''' + 添加指定性能用例对应的耗时 + ''' + if case_name in self.time_data: + self.time_data[case_name].add_actual_time(actual_time) + else: + self.time_data[case_name] = TimeRecord(expected_time, actual_time) + + def get_perf_data_from_hilog(self): + ''' + 从hilog日志中获取性能数据 + ''' + # config the hilog + cmd = 'hilog -r' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + assert 'successfully' in result, result + + cmd = 'hilog -G 16M' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + assert 'successfully' in result, result + + cmd = 'hilog -Q pidoff' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + assert 'successfully' in result, result + + cmd = 'hilog -Q domainoff' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + assert 'successfully' in result, result + + cmd = 'hilog -b INFO' + self.driver.log_info(cmd) + result = self.driver.shell(cmd) + assert 'successfully' in result, result + + # create a sub-process to receive the hilog + hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + def get_time_from_records(records): + # 解析records的数据 + pattern = r"\((.*?)\) Expected Time = (\d+), Actual Time = (\d+)" + for record in records: + match = re.search(pattern, record) + if match: + expected_time = int(match.group(2)) + actual_time = int(match.group(3)) + case_name = match.group(1) + if case_name in self.time_data: + self.time_data[case_name].add_actual_time(actual_time) + else: + self.time_data[case_name] = TimeRecord(expected_time, actual_time) + + def get_perf_records(): + records = [] + try: + for line in iter(hilog_process.stdout.readline, b''): + decode_line = line.decode('utf-8') + if '[ArkCompilerPerfTest]' in decode_line: + records.append(decode_line) + except ValueError: + self.driver.log_info('hilog stream is closed.') + finally: + get_time_from_records(records) + + perf_records_thread = threading.Thread(target=get_perf_records) + perf_records_thread.start() + + return hilog_process, perf_records_thread + + def lock_for_performance(self): + ''' + 执行性能用例前先进行锁频锁核操作 + ''' + if self._check_if_already_locked(): + self.driver.log_info("The device has locked the frequency and core.") + return True + # 获取系统参数 + self._check_if_support_ipa() + self._check_perf_genius_version() + if not self._get_core_number(): + self.driver.log_info("Get core number failed.") + return False + # 取消性能限制 + self._disable_perf_limitation() + # 锁频 + self._lock_frequency() + self._lock_ddr_frequency() + # 锁中核和大核 + self._lock_core_by_id(self.cpu_start_id[1], self.total_cpu_num - 1) + return True + + def _check_if_support_ipa(self): + ''' + 壳温IPA支持情况检查,检查结果存在self.board_ipa_support中 + ''' + result = self.driver.shell("cat /sys/class/thermal/thermal_zone1/type").strip() + self.board_ipa_support = (result == 'board_thermal') + self.driver.log_info('If support IPA: ' + str(self.board_ipa_support)) + + def _check_perf_genius_version(self): + ''' + perfGenius版本号检查 + ''' + result = self.driver.shell("ps -ef | grep perfg") + idx = result.find('perfgenius@') + if idx != -1: + start_idx = idx + len('perfgenius') + 1 + self.perfg_version = result[start_idx:start_idx + 4] + self.driver.log_info('PerfGenius Version: ' + self.perfg_version) + + def _get_core_number(self): + ''' + 从设备上读取大中小核个数 + ''' + # 获取CPU总个数 + result = self.driver.shell("cat /sys/devices/system/cpu/possible").strip() + idx = result.find('-') + if idx == -1 or result[idx + 1:] == '0': + self.driver.log_info('Get total cpu num failed') + return False + self.total_cpu_num = int(result[idx + 1:]) + 1 + self.driver.log_info('total_cpu_num = ' + str(self.total_cpu_num)) + # 获取CPU小核个数 + result = self.driver.shell("cat /sys/devices/system/cpu/cpu0/topology/core_siblings_list").strip() + idx = result.find('-') + if idx == -1 or result[idx + 1:] == '0': + self.driver.log_info('Get small-core cpu num failed') + return False + self.cpu_start_id[1] = int(result[idx + 1:]) + 1 + self.driver.log_info('cpu_start_id[1] = ' + str(self.cpu_start_id[1])) + # 获取CPU中核个数 + result = self.driver.shell( + f"cat /sys/devices/system/cpu/cpu{self.cpu_start_id[1]}/topology/core_siblings_list").strip() + idx = result.find('-') + if idx == -1 or result[idx + 1:] == '0': + self.driver.log_info('Get medium-core cpu num failed') + return False + self.cpu_start_id[2] = int(result[idx + 1:]) + 1 + if self.cpu_start_id[2] == self.total_cpu_num: + self.cpu_start_id[2] = self.cpu_start_id[1] + return True + self.driver.log_info('cpu_start_id[2] = ' + str(self.cpu_start_id[2])) + # 获取CPU大核个数 + result = self.driver.shell( + f"cat /sys/devices/system/cpu/cpu{self.cpu_start_id[2]}/topology/core_siblings_list").strip() + if result == '': + self.driver.log_info('Get large-core cpu num failed') + return False + idx = result.find('-') + tmp_total_cpu_num = (int(result) if idx == -1 else int(result[idx + 1])) + 1 + if tmp_total_cpu_num != self.total_cpu_num: + self.driver.log_info('Get large-core cpu num failed') + return False + self.cpu_num_list[0] = self.cpu_start_id[1] + self.cpu_num_list[1] = self.cpu_start_id[2] - self.cpu_start_id[1] + self.cpu_num_list[2] = self.total_cpu_num - self.cpu_start_id[2] + self.driver.log_info(f'Small-core cpu number: {self.cpu_num_list[0]};' + f'Medium-core cpu number: {self.cpu_num_list[1]};' + f'Large-core cpu number: {self.cpu_num_list[2]};') + return True + + def _disable_perf_limitation(self): + ''' + 查杀省电精灵并关闭IPA和PerfGenius + ''' + # 如果存在省电精灵,强制删除并重启手机 + result = self.driver.shell("find /system/app -name HwPowerGenieEngine3").strip() + if result != '': + self.driver.shell("rm -rf /system/app/HwPowerGenieEngine3") + self.driver.System.reboot() + self.driver.System.wait_for_boot_complete() + + # 关闭IPA + self.driver.shell("echo disabled > /sys/class/thermal/thermal_zone0/mode") + if self.board_ipa_support: + self.driver.shell("echo disabled > /sys/class/thermal/thermal_zone0/mode") + self.driver.shell("echo 10000 > /sys/class/thermal/thermal_zone0/sustainable_power") + # 关闭perfGenius, 使用时需要手动修改下方bundle_name + bundle_name = '' + if self.perfg_version == '0.0': + self.driver.shell("dumpsys perfhub --perfhub_dis") + else: + self.driver.shell(f"lshal debug {bundle_name}@{self.perfg_version}" + f"::IPerfGenius/perfgenius --perfgenius_dis") + + def _lock_frequency_by_start_id(self, start_id): + ''' + 锁id为start_id的CPU核的频率 + ''' + self.driver.log_info(f"Lock frequency, start_id = {start_id}") + result = self.driver.shell( + f"cat /sys/devices/system/cpu/cpu{start_id}/cpufreq/scaling_available_frequencies").strip() + freq_list = list(map(int, result.strip())) + max_freq = max(freq_list) + self.driver.shell(f"echo {max_freq} > cat /sys/devices/system/cpu/cpu{start_id}/cpufreq/scaling_max_freq") + self.driver.shell(f"echo {max_freq} > cat /sys/devices/system/cpu/cpu{start_id}/cpufreq/scaling_min_freq") + self.driver.shell(f"echo {max_freq} > cat /sys/devices/system/cpu/cpu{start_id}/cpufreq/scaling_max_freq") + + def _lock_ddr_frequency(self): + ''' + DDR锁频,锁最接近749000000的频率 + ''' + std_freq = 749000000 + self.driver.log_info("Lock ddr frequency") + result = self.driver.shell("cat /sys/class/devfreq/ddrfreq/available_frequencies").strip() + freq_list = list(map(int, result.strip())) + freq = min(freq_list, key=lambda x: abs(x - std_freq)) + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq/max_freq") + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq/min_freq") + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq/max_freq") + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq_up_threshold/max_freq") + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq_up_threshold/min_freq") + self.driver.shell(f"echo {freq} > /sys/class/devfreq/ddrfreq_up_threshold/max_freq") + + def _lock_frequency(self): + ''' + 锁CPU的所有核的频率 + ''' + # 小核锁频 + self._lock_frequency_by_start_id(self.cpu_start_id[0]) + # 中核锁频 + if self.cpu_num_list[1] != 0: + self._lock_frequency_by_start_id(self.cpu_start_id[1]) + # 大核锁频 + self._lock_frequency_by_start_id(self.cpu_start_id[2]) + + def _lock_core_by_id(self, start_id, end_id): + ''' + 锁start_id到end_id的CPU核 + ''' + for i in range(start_id, end_id + 1): + self.driver.shell(f"echo 0 > sys/devices/system/cpu/cpu{i}/online") + self.driver.shell("echo test > sys/power/wake_unlock") + + def _check_if_already_locked(self): + ''' + 根据DDR频率来判断是否已经锁频锁核 + ''' + result = self.driver.shell("cat /sys/class/devfreq/ddrfreq/cur_freq").strip() + return result == "749000000" diff --git a/test/autotest/aw/application.py b/test/autotest/aw/application.py deleted file mode 100644 index bbceddce94b8d1e3a18c826fbc0676644c14eca6..0000000000000000000000000000000000000000 --- a/test/autotest/aw/application.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Copyright (c) 2024 Huawei Device Co., Ltd. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Description: Action words of Application launch. -""" - -import logging -import subprocess -import re -import time - - -class Application(object): - @classmethod - def stop(cls, bundle_name): - stop_cmd = ['hdc', 'shell', 'aa', 'force-stop', bundle_name] - logging.info('force stop application: ' + ' '.join(stop_cmd)) - stop_result = subprocess.run(stop_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(stop_result.stdout.strip()) - assert stop_result.returncode == 0 - - @classmethod - def uninstall(cls, bundle_name): - uninstall_cmd = ['hdc', 'uninstall', bundle_name] - logging.info('uninstall application: ' + ' '.join(uninstall_cmd)) - uninstall_result = subprocess.run(uninstall_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(uninstall_result.stdout.strip()) - assert uninstall_result.returncode == 0 - - @classmethod - def install(cls, hap_path): - install_cmd = ['hdc', 'install', '-p', hap_path] - logging.info('install application: ' + ' '.join(install_cmd)) - install_result = subprocess.run(install_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(install_result.stdout) - assert 'successfully' in install_result.stdout.decode('utf-8') - - @classmethod - def start(cls, bundle_name, start_mode=None): - start_cmd = (['hdc', 'shell', 'aa', 'start', '-a', 'EntryAbility', '-b', bundle_name] + - ([start_mode] if start_mode else [])) - logging.info('start application: ' + ' '.join(start_cmd)) - start_result = subprocess.run(start_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(start_result.stdout) - assert start_result.stdout.decode('utf-8').strip() == 'start ability successfully.' - - @classmethod - def get_pid(cls, bundle_name): - ps_cmd = ['hdc', 'shell', 'ps', '-ef'] - ps_result = subprocess.run(ps_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - ps_result_out = ps_result.stdout.decode('utf-8') - for line in ps_result_out.strip().split('\r\n'): - if bundle_name in line: - logging.info(f'pid of {bundle_name}: ' + line) - return int(line.strip().split()[1]) - return 0 - - @classmethod - def launch_application(cls, bundle_name, hap_path, start_mode=None): - cls.stop(bundle_name) - cls.uninstall(bundle_name) - cls.install(hap_path) - cls.start(bundle_name, start_mode) - cls.keep_awake() - time.sleep(3) - pid = cls.get_pid(bundle_name) - return int(pid) - - @classmethod - def attach(cls, bundle_name): - attach_cmd = (['hdc', 'shell', 'aa', 'attach', '-b', bundle_name]) - logging.info('start application: ' + ' '.join(attach_cmd)) - attach_result = subprocess.run(attach_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(attach_result.stdout) - assert attach_result.stdout.decode('utf-8').strip() == 'attach app debug successfully.' - - @classmethod - def click_on_middle(cls): - """ - Simulate clicking the center of the screen - """ - get_screen_info_cmd = ['hdc', 'shell', 'hidumper', '-s', 'RenderService', '-a', 'screen'] - screen_info = subprocess.run(get_screen_info_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - match = re.search(r'physical screen resolution: (\d+)x(\d+)', screen_info.stdout.decode('utf-8')) - assert match is not None - - middle_x = int(match.group(1)) // 2 - middle_y = int(match.group(2)) // 2 - click_cmd = ['hdc', 'shell', 'uinput', '-T', '-c', str(middle_x), str(middle_y)] - click_result = subprocess.run(click_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - click_result_out = click_result.stdout.decode('utf-8').strip() - logging.info(click_result_out) - assert "click coordinate" in click_result_out - - @classmethod - def back(cls): - """ - Simulate the back button to return to the previous step - """ - cmd = ['hdc', 'shell', 'uitest', 'uiInput', 'keyEvent', 'Back'] - logging.info('click the back button: ' + ' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - assert result.stdout.decode('utf-8').strip() == 'No Error' - - @classmethod - def keep_awake(cls): - keep_awake_cmd = ['hdc', 'shell', 'power-shell', 'setmode', '602'] - keep_awake_result = subprocess.run(keep_awake_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - keep_awake_result_out = keep_awake_result.stdout.decode('utf-8').strip() - logging.info(keep_awake_result_out) - assert "Set Mode Success!" in keep_awake_result_out - - @classmethod - def hot_reload(cls, hqf_path): - cmd = ['hdc', 'shell', 'bm', 'quickfix', '-a', '-f', hqf_path] - logging.info('hot reload: ' + ' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - assert result.stdout.decode('utf-8').strip() == 'apply quickfix succeed.' diff --git a/test/autotest/aw/cdp/debugger.py b/test/autotest/aw/cdp/debugger.py index b89ef13f689315d79db6fae9b6d6ffe1a6b28acd..3f94fb323997eb6aeb794979920308ef02678b8d 100644 --- a/test/autotest/aw/cdp/debugger.py +++ b/test/autotest/aw/cdp/debugger.py @@ -19,7 +19,18 @@ Description: Python CDP Debugger. from dataclasses import dataclass, field from enum import Enum -from typing import Optional, List +from typing import Optional + + +@dataclass +class EnableAccelerateLaunchParams: + options = ['enableLaunchAccelerate'] + max_scripts_cache_size: float = 1e7 + + +@dataclass +class SaveAllPossibleBreakpointsParams: + locations: dict @dataclass @@ -87,8 +98,13 @@ class SetBreakpointsLocations: locations: list = field(default_factory=list) -def enable(): +def enable(params: EnableAccelerateLaunchParams | None): command = {'method': 'Debugger.enable'} + if params is not None: + command['params'] = { + 'options': params.options, + 'maxScriptsCacheSize': params.max_scripts_cache_size + } return command @@ -175,4 +191,19 @@ def drop_frame(params: DropFrameParams): def smart_step_into(params: SmartStepIntoParams): command = {'method': 'Debugger.smartStepInto', 'params': {'url': params.url, 'lineNumber': params.line_number}} + return command + + +def save_all_possible_breakpoints(params: SaveAllPossibleBreakpointsParams): + locations = {} + for key, value in params.locations.items(): + positions = [] + for pos in value: + if isinstance(pos, int): + positions.append({"lineNumber": pos, "colomnNumber": 0}) + else: + positions.append({"lineNumber": pos[0], "colomnNumber": pos[1]}) + locations[key] = positions + command = {'method': 'Debugger.saveAllPossibleBreakpoints', + 'params': {'locations': locations}} return command \ No newline at end of file diff --git a/test/autotest/aw/types.py b/test/autotest/aw/customized_types.py similarity index 94% rename from test/autotest/aw/types.py rename to test/autotest/aw/customized_types.py index ca3464ad48abbe92511a8a6737bcf985f53f533c..aad4106165ad3e246aa1903e607f9c26b269b7fe 100644 --- a/test/autotest/aw/types.py +++ b/test/autotest/aw/customized_types.py @@ -31,4 +31,4 @@ class ThreadConnectionInfo: class ProtocolType(Enum): send = "send", - recv = "recv" \ No newline at end of file + recv = "recv" diff --git a/test/autotest/aw/fport.py b/test/autotest/aw/fport.py index 484ba2cd4b07a2587e7d8f2f72fd7812aee677ad..e59a27b414134a5f47e779ac04dfddca96e9e6ae 100644 --- a/test/autotest/aw/fport.py +++ b/test/autotest/aw/fport.py @@ -17,56 +17,50 @@ limitations under the License. Description: Action words of hdc fport. """ -import logging -import subprocess - class Fport(object): - retry_times = 3 - increase_step = 7 + def __init__(self, driver): + self.driver = driver + self.retry_times = 3 + self.increase_step = 7 - @classmethod - def fport_connect_server(cls, port, pid, bundle_name): + def fport_connect_server(self, port, pid, bundle_name): for _ in range(Fport.retry_times): - cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{bundle_name}'] - logging.info('fport connect server: ' + ' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - if 'TCP Port listen failed' not in result.stdout.decode('utf-8'): - assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK' + cmd = f"fport tcp:{port} ark:{pid}@{bundle_name}" + self.driver.log_info('fport connect server: ' + cmd) + result = self.driver.hdc(cmd) + self.driver.log_info(result) + if result == 'Forwardport result:OK': return port - else: # The port is occupied + else: # The port may be occupied port += Fport.increase_step return -1 - @classmethod - def fport_debugger_server(cls, port, pid, tid=0): + def fport_debugger_server(self, port, pid, tid=0): for _ in range(Fport.retry_times): if tid == 0: - cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@Debugger'] + cmd = f"fport tcp:{port} ark:{pid}@Debugger" else: - cmd = ['hdc', 'fport', f'tcp:{port}', f'ark:{pid}@{tid}@Debugger'] - logging.info('fport_debugger_server: ' + ' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - if 'TCP Port listen failed' not in result.stdout.decode('utf-8'): - assert result.stdout.decode('utf-8').strip() == 'Forwardport result:OK' + cmd = f"fport tcp:{port} ark:{pid}@{tid}@Debugger" + self.driver.log_info('fport_debugger_server: ' + cmd) + result = self.driver.hdc(cmd) + self.driver.log_info(result) + if result == 'Forwardport result:OK': return port - else: # The port is occupied + else: # The port may be occupied port += Fport.increase_step return -1 - @classmethod - def clear_fport(cls): - list_fport_cmd = ['hdc', 'fport', 'ls'] - list_fport_result = subprocess.run(list_fport_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(list_fport_result.stdout.strip()) - list_fport_out = list_fport_result.stdout.decode('utf-8') - if 'Empty' in list_fport_out: + def clear_fport(self): + list_fport_cmd = 'fport ls' + list_fport_result = self.driver.hdc(list_fport_cmd) + self.driver.log_info(list_fport_result) + if 'Empty' in list_fport_result: return - for fport_item in [item for item in list_fport_out.split('[Forward]') if item != '\r\n']: - un_fport_command = (['hdc', 'fport', 'rm'] + [fport_item.split(' ')[1].split(' ')[0]] + - [fport_item.split(' ')[1].split(' ')[1]]) - un_fport_result = subprocess.run(un_fport_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(un_fport_result.stdout.strip()) - assert 'success' in un_fport_result.stdout.decode('utf-8') + for fport_item in [item for item in list_fport_result.split('[Forward]') if 'ark' in item]: + un_fport_command = (f"fport rm {fport_item.split(' ')[1].split(' ')[0]} " + f"{fport_item.split(' ')[1].split(' ')[1]}") + un_fport_result = self.driver.hdc(un_fport_command) + self.driver.log_info(un_fport_command) + self.driver.log_info(un_fport_result) + assert 'success' in un_fport_result, un_fport_result diff --git a/test/autotest/aw/api/debugger_api.py b/test/autotest/aw/implement_api/debugger_api.py similarity index 45% rename from test/autotest/aw/api/debugger_api.py rename to test/autotest/aw/implement_api/debugger_api.py index 9ac16206afead4177c6284298e484c1c2d561777..333050c386893f9087197d51bcabf6dd70a99d42 100644 --- a/test/autotest/aw/api/debugger_api.py +++ b/test/autotest/aw/implement_api/debugger_api.py @@ -17,13 +17,18 @@ limitations under the License. Description: Python Debugger Domain Interfaces """ +import sys import json -import logging +from pathlib import Path -from aw import communicate_with_debugger_server, Utils -from aw.cdp import debugger -from aw.types import ThreadConnectionInfo, ProtocolType -from aw.api.protocol_api import ProtocolImpl +sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path + +from all_utils import CommonUtils +from implement_api.protocol_api import ProtocolImpl +from cdp import debugger +from customized_types import ThreadConnectionInfo, ProtocolType + +comm_with_debugger_server = CommonUtils.communicate_with_debugger_server class DebuggerImpl(ProtocolImpl): @@ -46,6 +51,7 @@ class DebuggerImpl(ProtocolImpl): "setMixedDebugEnabled": (self.set_mixed_debug_enabled, ProtocolType.send), "replyNativeCalling": (self.reply_native_calling, ProtocolType.send), "dropFrame": (self.drop_frame, ProtocolType.send), + "saveAllPossibleBreakpoints": (self.save_all_possible_breakpoints, ProtocolType.send), "scriptParsed": (self.script_parsed, ProtocolType.recv), "paused": (self.paused, ProtocolType.recv), "nativeCalling": (self.native_calling, ProtocolType.recv)} @@ -56,16 +62,16 @@ class DebuggerImpl(ProtocolImpl): await self.websocket.send_msg_to_connect_server(send_msg) response = await self.websocket.recv_msg_of_connect_server() response = json.loads(response) - assert response['type'] == 'addInstance' - assert response['instanceId'] == 0, logging.error('InstanceId of the main thread is not 0') - assert response['tid'] == pid + CommonUtils.assert_equal(response['type'], 'addInstance') + CommonUtils.assert_equal(response['instanceId'], 0) + CommonUtils.assert_equal(response['tid'], pid) else: response = await self.websocket.recv_msg_of_connect_server() response = json.loads(response) - assert response['type'] == 'addInstance' - assert response['instanceId'] != 0 - assert response['tid'] != pid - assert 'workerThread_' in response['name'] + CommonUtils.assert_equal(response['type'], 'addInstance') + assert response['instanceId'] != 0, f"Worker instanceId can not be 0" + assert response['tid'] != pid, f"Worker tid can not be {pid}" + assert 'workerThread_' in response['name'], f"'workerThread_' not in {response['name']}" instance_id = await self.websocket.get_instance() send_queue = self.websocket.to_send_msg_queues[instance_id] recv_queue = self.websocket.received_msg_queues[instance_id] @@ -75,33 +81,27 @@ class DebuggerImpl(ProtocolImpl): async def destroy_instance(self): response = await self.websocket.recv_msg_of_connect_server() response = json.loads(response) - assert response['type'] == 'destroyInstance' + CommonUtils.assert_equal(response['type'], 'destroyInstance') return response async def enable(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.enable(), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + debugger.enable(params), message_id) while response.startswith('{"method":"Debugger.scriptParsed"'): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {"debuggerId": "0", "protocols": Utils.get_custom_protocols()}} + # 暂时删除对response的校验,以适配不同版本 async def disable(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.disable(), message_id) - while response.startswith('{"method":"Debugger.resumed"') or \ - response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') or \ - response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'): + response = await comm_with_debugger_server(self.websocket, connection, + debugger.disable(), message_id) + while response.startswith('{"method":"Debugger.resumed"') \ + or response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') \ + or response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + expected_response = {"id": message_id, "result": {}} + CommonUtils.assert_equal(json.loads(response), expected_response) async def script_parsed(self, connection, params): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, @@ -113,145 +113,108 @@ class DebuggerImpl(ProtocolImpl): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) response = json.loads(response) - assert response['method'] == 'Debugger.paused' + CommonUtils.assert_equal(response['method'], 'Debugger.paused') return response async def resume(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.resume(), message_id) - assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} + response = await comm_with_debugger_server(self.websocket, connection, + debugger.resume(), message_id) + expected_response = {"method": "Debugger.resumed", "params": {}} + CommonUtils.assert_equal(json.loads(response), expected_response) response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + expected_response = {"id": message_id, "result": {}} + CommonUtils.assert_equal(json.loads(response), expected_response) async def remove_breakpoints_by_url(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.remove_breakpoints_by_url(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + debugger.remove_breakpoints_by_url(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def get_possible_and_set_breakpoints_by_url(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.get_possible_and_set_breakpoint_by_url(params), - message_id) + response = await comm_with_debugger_server(self.websocket, connection, + debugger.get_possible_and_set_breakpoint_by_url(params), + message_id) response = json.loads(response) - assert response['id'] == message_id + CommonUtils.assert_equal(response['id'], message_id) return response async def step_over(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.step_over(), message_id) - assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} + response = await comm_with_debugger_server(self.websocket, connection, + debugger.step_over(), message_id) + CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}}) response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def step_out(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.step_out(), message_id) - assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} + response = await comm_with_debugger_server(self.websocket, connection, + debugger.step_out(), message_id) + CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}}) response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def step_into(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.step_into(), message_id) - assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} + response = await comm_with_debugger_server(self.websocket, connection, + debugger.step_into(), message_id) + CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}}) response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def set_pause_on_exceptions(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.set_pause_on_exceptions(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + debugger.set_pause_on_exceptions(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def evaluate_on_call_frame(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.evaluate_on_call_frame(params), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + debugger.evaluate_on_call_frame(params), message_id) response = json.loads(response) - assert response['id'] == message_id + CommonUtils.assert_equal(response['id'], message_id) return response async def pause(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.pause(), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + debugger.pause(), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def smart_step_into(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.smart_step_into(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + debugger.smart_step_into(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def set_mixed_debug_enabled(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.set_mixed_debug_enabled(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + debugger.set_mixed_debug_enabled(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def native_calling(self, connection, params): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) response = json.loads(response) - assert response['method'] == 'Debugger.nativeCalling' + CommonUtils.assert_equal(response['method'], 'Debugger.nativeCalling') return response async def reply_native_calling(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.reply_native_calling(params), message_id) - assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} + response = await comm_with_debugger_server(self.websocket, connection, + debugger.reply_native_calling(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}}) response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def drop_frame(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - debugger.drop_frame(params), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + debugger.drop_frame(params), message_id) response = json.loads(response) - assert response['id'] == message_id - return response \ No newline at end of file + CommonUtils.assert_equal(response['id'], message_id) + return response + + async def save_all_possible_breakpoints(self, message_id, connection, params): + response = await comm_with_debugger_server(self.websocket, connection, + debugger.save_all_possible_breakpoints(params), message_id) + response = json.loads(response) + CommonUtils.assert_equal(response['id'], message_id) + return response diff --git a/test/autotest/aw/api/heap_profiler_api.py b/test/autotest/aw/implement_api/heap_profiler_api.py similarity index 50% rename from test/autotest/aw/api/heap_profiler_api.py rename to test/autotest/aw/implement_api/heap_profiler_api.py index 731cbeb8138be96d85151c055979c065b599e057..dd062608d8be6616f5346c62baf7665a5994d25a 100644 --- a/test/autotest/aw/api/heap_profiler_api.py +++ b/test/autotest/aw/implement_api/heap_profiler_api.py @@ -17,12 +17,18 @@ limitations under the License. Description: Python HeapProfiler Domain Interfaces """ +import sys import json +from pathlib import Path -from aw import communicate_with_debugger_server -from aw.cdp import heap_profiler -from aw.types import ProtocolType -from aw.api.protocol_api import ProtocolImpl +sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path + +from all_utils import CommonUtils +from implement_api.protocol_api import ProtocolImpl +from cdp import heap_profiler +from customized_types import ProtocolType + +comm_with_debugger_server = CommonUtils.communicate_with_debugger_server class HeapProfilerImpl(ProtocolImpl): @@ -36,65 +42,49 @@ class HeapProfilerImpl(ProtocolImpl): "stopSampling": (self.stop_sampling, ProtocolType.send)} async def start_tracking_heap_objects(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - heap_profiler.start_tracking_heap_objects(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + heap_profiler.start_tracking_heap_objects(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def stop_tracking_heap_objects(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - heap_profiler.stop_tracking_heap_objects(), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + heap_profiler.stop_tracking_heap_objects(), message_id) while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response + assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response, \ + f'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"] not in {response}' pre_response = response - while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"') or \ - response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): + while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"') \ + or response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): if response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): pre_response = response response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - assert pre_response.endswith(r'\n]\n}\n"}}') - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + assert pre_response.endswith(r'\n]\n}\n"}}'), 'This is not the last message of addHeapSnapshotChunk' + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def take_heap_snapshot(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - heap_profiler.take_heap_snapshot(), message_id) - assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response + response = await comm_with_debugger_server(self.websocket, connection, + heap_profiler.take_heap_snapshot(), message_id) + assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response, \ + 'This is not the last message of addHeapSnapshotChunk' pre_response = response while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): pre_response = response response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) - assert pre_response.endswith(r'\n]\n}\n"}}') - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + assert pre_response.endswith(r'\n]\n}\n"}}'), 'This is not the last message of addHeapSnapshotChunk' + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def start_sampling(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - heap_profiler.start_sampling(), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + heap_profiler.start_sampling(), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def stop_sampling(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - heap_profiler.stop_sampling(), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + heap_profiler.stop_sampling(), message_id) response = json.loads(response) - assert response['id'] == message_id - return response \ No newline at end of file + CommonUtils.assert_equal(response['id'], message_id) + return response diff --git a/test/autotest/aw/api/profiler_api.py b/test/autotest/aw/implement_api/profiler_api.py similarity index 42% rename from test/autotest/aw/api/profiler_api.py rename to test/autotest/aw/implement_api/profiler_api.py index 99d6c11173fa80864104dbb414e8f303cb64d55e..68b2fa51a8fe3c63ba7f95793bdf67d592aaf8b4 100644 --- a/test/autotest/aw/api/profiler_api.py +++ b/test/autotest/aw/implement_api/profiler_api.py @@ -17,12 +17,19 @@ limitations under the License. Description: Python Profiler Domain Interfaces """ +import re +import sys import json +from pathlib import Path -from aw import communicate_with_debugger_server -from aw.cdp import profiler -from aw.types import ProtocolType -from aw.api.protocol_api import ProtocolImpl +sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path + +from all_utils import CommonUtils +from implement_api.protocol_api import ProtocolImpl +from cdp import profiler +from customized_types import ProtocolType + +comm_with_debugger_server = CommonUtils.communicate_with_debugger_server class ProfilerImpl(ProtocolImpl): @@ -34,29 +41,29 @@ class ProfilerImpl(ProtocolImpl): "setSamplingInterval": (self.set_sampling_interval, ProtocolType.send)} async def start(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - profiler.start(), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + profiler.start(), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def stop(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - profiler.stop(), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + profiler.stop(), message_id) response = json.loads(response) - assert response['id'] == message_id - assert all(i >= 0 for i in response['result']['profile']['timeDeltas']) + CommonUtils.assert_equal(response['id'], message_id) + time_deltas = response['result']['profile']['timeDeltas'] + assert all(i >= 0 for i in time_deltas), \ + f"TimeDeltas are not correct: {time_deltas}" + nodes = response['result']['profile']['nodes'] + # NAPI方法名需要遵循格式: 方法名(地址)(NAPI) + pattern = r'^[\w.]+\([A-Za-z0-9]+\)\(NAPI\)$' + for node in nodes: + func_name = node['callFrame']['functionName'] + if func_name.endswith('(NAPI)'): + assert re.match(pattern, func_name), \ + f"The function name '{func_name}' is not match the pattern '{pattern}'" return response async def set_sampling_interval(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - profiler.set_sampling_interval(params), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response \ No newline at end of file + response = await comm_with_debugger_server(self.websocket, connection, + profiler.set_sampling_interval(params), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) diff --git a/test/autotest/aw/api/protocol_api.py b/test/autotest/aw/implement_api/protocol_api.py similarity index 93% rename from test/autotest/aw/api/protocol_api.py rename to test/autotest/aw/implement_api/protocol_api.py index bf4ca9c3696a2d383fb76d75246cc0cf79d6a0c6..a85315d1b97c01e256d3d5abd57dd51f783063c3 100644 --- a/test/autotest/aw/api/protocol_api.py +++ b/test/autotest/aw/implement_api/protocol_api.py @@ -17,7 +17,12 @@ limitations under the License. Description: Python Protocol Domain Interfaces """ -from aw.types import ProtocolType +import sys +from pathlib import Path + +sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path + +from customized_types import ProtocolType class ProtocolImpl(object): diff --git a/test/autotest/aw/api/runtime_api.py b/test/autotest/aw/implement_api/runtime_api.py similarity index 45% rename from test/autotest/aw/api/runtime_api.py rename to test/autotest/aw/implement_api/runtime_api.py index d74ec27b4cc65a8a1429948513ce298195c02b79..b733cbee378d307b25615c6dd29e6179d7d788a8 100644 --- a/test/autotest/aw/api/runtime_api.py +++ b/test/autotest/aw/implement_api/runtime_api.py @@ -17,12 +17,18 @@ limitations under the License. Description: Python Runtime Domain Interfaces """ +import sys import json +from pathlib import Path -from aw import communicate_with_debugger_server -from aw.cdp import runtime -from aw.types import ProtocolType -from aw.api.protocol_api import ProtocolImpl +sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path + +from all_utils import CommonUtils +from implement_api.protocol_api import ProtocolImpl +from cdp import runtime +from customized_types import ProtocolType + +comm_with_debugger_server = CommonUtils.communicate_with_debugger_server class RuntimeImpl(ProtocolImpl): @@ -35,43 +41,31 @@ class RuntimeImpl(ProtocolImpl): "getHeapUsage": (self.get_heap_usage, ProtocolType.send)} async def enable(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - runtime.enable(), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {"protocols": []}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + runtime.enable(), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {"protocol": []}}) async def run_if_waiting_for_debugger(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - runtime.run_if_waiting_for_debugger(), message_id) - response = json.loads(response) - assert response == {"id": message_id, "result": {}} - return response + response = await comm_with_debugger_server(self.websocket, connection, + runtime.run_if_waiting_for_debugger(), message_id) + CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}}) async def get_properties(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - runtime.get_properties(params), message_id) + response = await comm_with_debugger_server(self.websocket, connection, + runtime.get_properties(params), message_id) response = json.loads(response) - assert response["id"] == message_id + CommonUtils.assert_equal(response['id'], message_id) return response async def get_heap_usage(self, message_id, connection, params): - response = await communicate_with_debugger_server(connection.instance_id, - connection.send_msg_queue, - connection.received_msg_queue, - runtime.get_heap_usage(), message_id) - while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') or \ - response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'): + response = await comm_with_debugger_server(self.websocket, connection, + runtime.get_heap_usage(), message_id) + while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') \ + or response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'): response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, connection.received_msg_queue) response = json.loads(response) - assert response["id"] == message_id - assert response['result']['usedSize'] > 0 - assert response['result']['totalSize'] > 0 - return response \ No newline at end of file + CommonUtils.assert_equal(response['id'], message_id) + assert response['result']['usedSize'] > 0, f"{response['result']['usedSize']} is not greater than 0" + assert response['result']['totalSize'] > 0, f"{response['result']['totalSize']} is not greater than 0" + return response diff --git a/test/autotest/run.py b/test/autotest/aw/install_lib.py similarity index 31% rename from test/autotest/run.py rename to test/autotest/aw/install_lib.py index da71942017ff30e0ea5d6485fd49d87f95cc6e2b..98b16fb67ddebaf1c0fba558cd93f7831caf696c 100644 --- a/test/autotest/run.py +++ b/test/autotest/aw/install_lib.py @@ -1,68 +1,53 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Copyright (c) 2024 Huawei Device Co., Ltd. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Description: Run all test cases. -""" - -import logging -import os -import subprocess -import sys - -import pytest - -sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - - -def check_python_version(): - required_version = (3, 8, 0) - current_version = sys.version_info - - if current_version >= required_version: - logging.info(f"Python version meets requirements: " - f"{current_version.major}.{current_version.minor}.{current_version.micro}") - else: - logging.error(f"Python version too low: " - f"{current_version.major}.{current_version.minor}.{current_version.micro}, " - f"needs to be at least 3.8.0") - sys.exit(1) - - -def install_requirements(requirements_file=r'.\requirements.txt'): - try: - subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', requirements_file], timeout=10) - logging.info(f'Successfully installed dependencies from {requirements_file}') - except subprocess.CalledProcessError as e: - logging.error(f'Error occurred while installing dependencies: {e}') - sys.exit(1) - - -def clear_log_handler(): - # remove all old handlers of the root logger - log = logging.getLogger() - for handler in log.handlers[:]: - log.removeHandler(handler) - - -if __name__ == '__main__': - check_python_version() - install_requirements() - - clear_log_handler() - - args = sys.argv[1:] - - pytest.main(args) +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Copyright (c) 2024 Huawei Device Co., Ltd. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Description: Python package install. +""" + +import subprocess +import sys +from pathlib import Path + +python_path = Path(sys.executable) +python_path = (python_path.parent / python_path.stem) if sys.platform == 'win32' else python_path +pip_list = subprocess.check_output(str(python_path) + ' -m pip list'.decode('utf-8')) + + +def install(pkg_name=None): + if not pkg_name: # 按照requirements.txt安装 + install_by_requirements(python_path, pip_list) + else: # 安装单个package + install_by_pkg_name(pkg_name) + + +def install_by_requirements(): + require_file_path = Path.cwd().parent / 'requirements.txt' + with open(require_file_path, "r") as require: + pkgs = require.readlines() + for pkg in pkgs: + if pkg in pip_list: + continue + install_cmd = (f'{python_path} -m pip install {pkg.strip()}') + res = subprocess.call(install_cmd) + assert res == 0, "Install packages from requirements.txt failed!" + + +def install_by_pkg_name(pkg_name): + if pkg_name in pip_list: + return + install_cmd = (f'{python_path} -m pip install {pkg_name}') + res = subprocess.call(install_cmd) + assert res == 0, f"Install packages '{pkg_name}' failed!" diff --git a/test/autotest/aw/taskpool.py b/test/autotest/aw/taskpool.py index 01c446519720ba20bb6afddb834e048b39545517..57e97560c7839e72ed4f98ab79760d5afddc63ab 100644 --- a/test/autotest/aw/taskpool.py +++ b/test/autotest/aw/taskpool.py @@ -18,9 +18,9 @@ Description: A task pool that can execute tasks asynchronously. """ import asyncio -import logging from queue import Queue -from threading import Thread +from threading import Thread, current_thread +from time import time class TaskPool(object): @@ -28,6 +28,7 @@ class TaskPool(object): self.task_queue = Queue() self.event_loop = None self.task_exception = None + self.event_loop_thread = None self._start_event_loop() def submit(self, coroutine, callback=None): @@ -50,7 +51,6 @@ class TaskPool(object): def _task_done(self, future): # clear the task queue and stop the task pool once an exception occurs in the task if future.exception(): - logging.error(f'future.exception: {future.exception()}') while not self.task_queue.empty(): self.task_queue.get() self.task_queue.task_done() @@ -74,7 +74,8 @@ class TaskPool(object): def _start_event_loop(self): loop = asyncio.new_event_loop() - event_loop_thread = Thread(target=self._set_and_run_loop, args=(loop,)) - event_loop_thread.setDaemon(True) - event_loop_thread.setName('event_loop_thread') - event_loop_thread.start() + self.event_loop_thread = Thread(target=self._set_and_run_loop, args=(loop,)) + self.event_loop_thread.daemon = True + # Specifies the thread name to be able to save log of thread to the module_run.log file + self.event_loop_thread.name = current_thread().name + "-" + str(time()).replace('.', '')[-5:] + self.event_loop_thread.start() diff --git a/test/autotest/aw/utils.py b/test/autotest/aw/utils.py deleted file mode 100644 index 985438dd6fc931b22fea8f730f926088a380b275..0000000000000000000000000000000000000000 --- a/test/autotest/aw/utils.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Copyright (c) 2024 Huawei Device Co., Ltd. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Description: MISC action words. -""" - -import asyncio -import logging -import os -import stat -import subprocess -import threading - -from aw.websocket import WebSocket - - -class Utils(object): - @classmethod - def message_id_generator(cls): - message_id = 1 - while True: - yield message_id - message_id += 1 - - @classmethod - def get_custom_protocols(cls): - protocols = ["removeBreakpointsByUrl", - "setMixedDebugEnabled", - "replyNativeCalling", - "getPossibleAndSetBreakpointByUrl", - "dropFrame", - "setNativeRange", - "resetSingleStepper", - "callFunctionOn", - "smartStepInto"] - return protocols - - @classmethod - async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id): - """ - Assembles and send the commands, then return the response. - Send message to the debugger server corresponding to the to_send_queue. - Return the response from the received_queue. - """ - command['id'] = message_id - await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command) - response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue) - return response - - @classmethod - async def async_wait_timeout(cls, task, timeout=3): - try: - result = await asyncio.wait_for(task, timeout) - return result - except asyncio.TimeoutError: - logging.error('await timeout!') - - @classmethod - def hdc_target_mount(cls): - mount_cmd = ['hdc', 'target', 'mount'] - logging.info('Mount finish: ' + ' '.join(mount_cmd)) - mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(mount_result.stdout.strip()) - assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish' - - @classmethod - def hdc_file_send(cls, source, sink): - cmd = ['hdc', 'file', 'send', source, sink] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - assert 'FileTransfer finish' in result.stdout.decode('utf-8').strip() - - @classmethod - def clear_fault_log(cls): - cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - @classmethod - def save_fault_log(cls, log_path): - if not os.path.exists(log_path): - os.makedirs(log_path) - - cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info(result.stdout.strip()) - assert result.returncode == 0 - - @classmethod - def save_hilog(cls, log_path, file_name, debug_on: bool = False): - if not os.path.exists(log_path): - os.makedirs(log_path) - - # config the hilog - cmd = ['hdc', 'shell', 'hilog', '-r'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - cmd = ['hdc', 'shell', 'hilog', '-G', '16M'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - if debug_on: - cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - else: - cmd = ['hdc', 'shell', 'hilog', '-b', 'INFO'] - logging.info(' '.join(cmd)) - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - assert result.returncode == 0 - - hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - file = os.fdopen(os.open(rf'{log_path}\{file_name}', - os.O_WRONLY | os.O_CREAT | os.O_TRUNC, - stat.S_IRUSR | stat.S_IWUSR), 'wb') - - def write(): - try: - for line in iter(hilog_process.stdout.readline, b''): - file.write(line) - file.flush() - except ValueError: - logging.info('hilog stream is closed.') - file.close() - - # create a thread to receive the hilog - write_thread = threading.Thread(target=write) - write_thread.start() - - return hilog_process, write_thread - - @classmethod - def search_hilog(cls, hilog_path, key_world): - matched_logs = [] - with open(hilog_path, 'rb') as file: - while True: - line = file.readline() - if not line: - logging.debug('Reach the end of the hilog file') - break - if key_world in line: - matched_logs.append(line.strip()) - return matched_logs diff --git a/test/autotest/aw/websocket.py b/test/autotest/aw/websocket.py index d2389914f918fe8f01088a9491257d03b410bef5..0e5f6b5f4ef0fb6716f99117ac9d646874b6cabe 100644 --- a/test/autotest/aw/websocket.py +++ b/test/autotest/aw/websocket.py @@ -19,16 +19,18 @@ Description: Responsible for websocket communication. import asyncio import json -import logging +from install_lib import install + +install('websockets') import websockets.protocol from websockets import connect, ConnectionClosed - -from aw.fport import Fport +from fport import Fport class WebSocket(object): - def __init__(self, connect_server_port, debugger_server_port): + def __init__(self, driver, connect_server_port, debugger_server_port, print_protocol=True): + self.driver = driver self.connect_server_port = connect_server_port self.debugger_server_port = debugger_server_port self.debugger_server_connection_threshold = 3 @@ -39,46 +41,27 @@ class WebSocket(object): self.to_send_msg_queues = {} # key: instance_id, value: to_send_msg_queue self.received_msg_queues = {} # key: instance_id, value: received_msg_queue self.debugger_server_instance = None + self.new_instance_flag = None + self.log = self.driver.log_info if print_protocol else (lambda s: None) - @staticmethod - async def recv_msg_of_debugger_server(instance_id, queue): + async def recv_msg_of_debugger_server(self, instance_id, queue): message = await queue.get() queue.task_done() - logging.info(f'[<==] Instance {instance_id} receive message: {message}') + self.log(f'[<==] Instance {instance_id} receive message: {message}') return message - @staticmethod - async def send_msg_to_debugger_server(instance_id, queue, message): + async def send_msg_to_debugger_server(self, instance_id, queue, message): await queue.put(message) - logging.info(f'[==>] Instance {instance_id} send message: {message}') + self.log(f'[==>] Instance {instance_id} send message: {message}') return True - - @staticmethod - async def _sender(client, send_queue): - assert client.state == websockets.protocol.OPEN, logging.error(f'Client state of _sender is: {client.state}') - while True: - send_message = await send_queue.get() - send_queue.task_done() - if send_message == 'close': - await client.close(reason='close') - return - await client.send(json.dumps(send_message)) - - @staticmethod - async def _receiver(client, received_queue): - assert client.state == websockets.protocol.OPEN, logging.error(f'Client state of _receiver is: {client.state}') - while True: - try: - response = await client.recv() - await received_queue.put(response) - except ConnectionClosed: - logging.info('Debugger server connection closed') - return - + async def get_instance(self): instance_id = await self.debugger_server_instance.get() self.debugger_server_instance.task_done() return instance_id + + def no_more_instance(self): + self.new_instance_flag = False async def recv_msg_of_connect_server(self): message = await self.received_msg_queue_for_connect_server.get() @@ -87,7 +70,7 @@ class WebSocket(object): async def send_msg_to_connect_server(self, message): await self.to_send_msg_queue_for_connect_server.put(message) - logging.info(f'[==>] Connect server send message: {message}') + self.log(f'[==>] Connect server send message: {message}') return True async def main_task(self, taskpool, procedure, pid): @@ -103,6 +86,26 @@ class WebSocket(object): taskpool, pid)) taskpool.submit(procedure(self)) + async def _sender(self, client, send_queue): + assert client.state == websockets.protocol.OPEN, f'Client state of _sender is: {client.state}' + while True: + send_message = await send_queue.get() + send_queue.task_done() + if send_message == 'close': + await client.close(reason='close') + return + await client.send(json.dumps(send_message)) + + async def _receiver(self, client, received_queue): + assert client.state == websockets.protocol.OPEN, f'Client state of _receiver is: {client.state}' + while True: + try: + response = await client.recv() + await received_queue.put(response) + except ConnectionClosed: + self.log('Debugger server connection closed') + return + def _connect_connect_server(self): client = connect(f'ws://localhost:{self.connect_server_port}', open_timeout=10, @@ -117,13 +120,13 @@ class WebSocket(object): async def _receiver_of_connect_server(self, client, receive_queue, taskpool, pid): assert client.state == websockets.protocol.OPEN, \ - logging.error(f'Client state of _receiver_of_connect_server is: {client.state}') + f'Client state of _receiver_of_connect_server is: {client.state}' num_debugger_server_client = 0 while True: try: response = await client.recv() await receive_queue.put(response) - logging.info(f'[<==] Connect server receive message: {response}') + self.log(f'[<==] Connect server receive message: {response}') response = json.loads(response) # The debugger server client is only responsible for adding and removing instances @@ -132,12 +135,11 @@ class WebSocket(object): instance_id = response['instanceId'] port = Fport.fport_debugger_server(self.debugger_server_port, pid, instance_id) - assert port > 0, logging.error('Failed to fport debugger server for 3 times, ' - 'the port is very likely occupied') + assert port > 0, 'Failed to fport debugger server for 3 times, the port is very likely occupied' self.debugger_server_port = port debugger_server_client = await self._connect_debugger_server() - logging.info(f'InstanceId: {instance_id}, port: {self.debugger_server_port}, ' - f'debugger server connected') + self.log(f'InstanceId: {instance_id}, port: {self.debugger_server_port}, ' + f'debugger server connected') self.debugger_server_port += 1 to_send_msg_queue = asyncio.Queue() @@ -157,7 +159,7 @@ class WebSocket(object): num_debugger_server_client -= 1 except ConnectionClosed: - logging.info('Connect server connection closed') + self.log('Connect server connection closed') return async def _store_instance(self, instance_id): diff --git a/test/autotest/aw/__init__.py b/test/autotest/main.py similarity index 58% rename from test/autotest/aw/__init__.py rename to test/autotest/main.py index a8f80748600f12b82dc69637a245d712c1bccdba..4ddf69662e90e36f416361fcdd3e6d92e1f4f600 100644 --- a/test/autotest/aw/__init__.py +++ b/test/autotest/main.py @@ -14,18 +14,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -Description: __init__.py of the aw package +Description: Run all test cases. """ -from aw.websocket import WebSocket -from aw.application import Application -from aw.utils import Utils -from aw.fport import Fport -from aw.taskpool import TaskPool -from aw.cdp import debugger -from aw.cdp import runtime -from aw.cdp import heap_profiler -from aw.cdp import profiler +import sys -communicate_with_debugger_server = Utils.communicate_with_debugger_server -async_wait_timeout = Utils.async_wait_timeout \ No newline at end of file +from xdevice.__main__ import main_process + +if __name__ == "__main__": + argv = "{} {} {}".format(sys.argv[1], sys.argv[2], sys.argv[3]) + print(">>>>>>>: {}".format(argv)) + main_process(argv) \ No newline at end of file diff --git a/test/autotest/preload.py b/test/autotest/preload.py new file mode 100644 index 0000000000000000000000000000000000000000..b81d43ec7189a71f5f2af06eb2f8a9c111aeac9b --- /dev/null +++ b/test/autotest/preload.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Copyright (c) 2024 Huawei Device Co., Ltd. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Description: Preload before run test cases. +""" + +from devicetest.core.test_case import TestCase, Step +from xdevice import platform_logger + +from hypium import UiExplorer, BY + +log = platform_logger("Preload") + + +# 类名必须是Preload +class Preload(TestCase): + def __init__(self, configs): + self.tag = self.__class__.__name__ + TestCase.__init__(self, self.tag, configs) + + def setup(self): + Step("预置") + + def process(self): + log.info("message {}".format(self.configs.get("testargs").get("pass_through", {}))) + # 任务可能使用多个设备,需逐一进行初始化 + for device in self.devices: + d = UiExplorer(device) + Step("********处理打卡软件和日志********") + out_message1 = d.hdc("shell ls /sys_prod/app/MSPES") + out_message2 = d.hdc("shell param get persist.sys.hilog.binary.on") + flag = 0 + if "MSPES" in out_message1: + log.info("********打卡软件处理********") + d.Storage.remount("/sys_prod") + d.hdc("shell rm /sys_prod/app/MSPES/MSPES.hap") + flag = 1 + if "true" in out_message2: + log.info("********日志处理********") + d.hdc("shell param set persist.sys.hilog.binary.on false") + flag = 1 + if flag == 1: + log.info("********设备重启********") + d.System.reboot() + d.System.wait_for_boot_complete() + d.shell("power-shell setmode 602") + d.Screen.wake_up() + d.ScreenLock.unlock() + d.wait(5) + has_dialog_usb0 = d.find_component(BY.type("Dialog")) + if has_dialog_usb0: + d.touch(BY.text("取消")) + d.wait(5) + else: + log.info("********打卡软件和日志都已处理********") + + Step("********清理锁屏密码********") + d.ScreenLock.clear_password("123456", EXCEPTION=False) + d.wait(5) + + out_message3 = d.hdc("shell aa dump -l") + if "hwstartupguide" in out_message3: + Step("********跳过开机向导********") + d.AppManager.disable_startup_guide() + d.wait(5) + else: + log.info("********已跳过开机向导********") + + Step("********处理代码签名后亮屏解锁处理小艺输入法********") + d.shell("mount -o rw,remount /") + d.hdc("target mount") + d.wait(1) + d.shell('sed -i "s%/proc/sys/kernel/xpm/xpm_mode 4%/proc/sys/kernel/xpm/xpm_mode 0%g" ' + '/system/etc/init/key_enable.cfg') + d.shell('sed -i "s%/proc/sys/kernel/xpm/xpm_mode 4%/proc/sys/kernel/xpm/xpm_mode 0%g" ' + '/system/etc/init/key_enable.enable_xxpm.cfg') + d.wait(1) + d.System.reboot() + d.System.wait_for_boot_complete() + d.shell("power-shell setmode 602") + d.Screen.wake_up() + d.ScreenLock.unlock() + d.wait(5) + has_dialog_usb1 = d.find_component(BY.type("Dialog")) + if has_dialog_usb1: + d.touch(BY.text("取消")) + d.shell("sysctl -w kernel.xpm.xpm_mode=0") + # 使用时需手动修改下方bundle_name + bundle_name = "" + d.start_app(bundle_name, "MainAbility") + if d.find_component(BY.text("同意")) is not None: + d.touch(BY.text("同意")) + if d.find_component(BY.text("用于允许不同设备间的数据交换")) is not None: + d.touch(BY.text("允许")) + if d.find_component(BY.id("createNote")) is not None: + d.touch(BY.id("createNote")) + if d.find_component(BY.text("小艺输入法")) is not None: + d.touch(BY.text("同意")) + if d.find_component(BY.text("下一步")) is not None: + d.touch(BY.text("下一步")) + d.AppManager.clear_recent_app() + d.wait(5) + + Step("********关闭wifi********") + result = d.Wifi.disable() + log.info("********关闭wifi********" + str(result)) + + def tear_down(self): + Step("收尾") \ No newline at end of file diff --git a/test/autotest/requirements.txt b/test/autotest/requirements.txt index dd72420c7ca5d5b676fc586ac69eaa5b69d9a083..7199276b15c08bb57534fe6ec16bea5789a60453 100644 --- a/test/autotest/requirements.txt +++ b/test/autotest/requirements.txt @@ -1,6 +1,3 @@ -pytest==8.3.1 -pytest-timeout==2.3.1 -pytest-metadata==3.1.1 -pytest-html==4.1.1 -pluggy==1.5.0 -websockets==12.0 +hypium +websockets +xdevice \ No newline at end of file diff --git a/test/resource/tooling/ohos_test.xml b/test/resource/tooling/ohos_test.xml index b26b140758004309a2c9a483ebc184d0fbc92bdd..b78bc4788fa5f6ce115b26fdbaefd0b5bee3302d 100755 --- a/test/resource/tooling/ohos_test.xml +++ b/test/resource/tooling/ohos_test.xml @@ -36,6 +36,7 @@