From 62089e6b135c9e70ad345229911a651568bd5bd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Thu, 16 Jan 2025 10:54:55 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=80=82=E9=85=8D=20EulerCopilot=20Framewo?= =?UTF-8?q?rk=20v0.9.1/v0.9.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- README.en.md | 2 +- README.md | 4 +- distribution/build_rpm.sh | 47 ++++++++++++++++++++-- distribution/eulercopilot-cli.spec | 2 +- src/copilot/app/copilot_app.py | 10 ++++- src/copilot/app/copilot_cli.py | 19 ++++++--- src/copilot/app/copilot_init.py | 1 + src/copilot/backends/framework_api.py | 35 ++++++++++------ src/copilot/backends/llm_service.py | 6 ++- src/copilot/backends/openai_api.py | 7 +++- src/copilot/backends/spark_api.py | 7 +++- src/copilot/utilities/config_manager.py | 1 + src/copilot/utilities/i18n.py | 30 ++++++++++---- src/copilot/utilities/markdown_renderer.py | 9 ++++- src/eulercopilot.sh | 2 + 15 files changed, 141 insertions(+), 41 deletions(-) diff --git a/README.en.md b/README.en.md index 2c437ee..3dfd423 100644 --- a/README.en.md +++ b/README.en.md @@ -1,7 +1,7 @@ # euler-copilot-shell #### Description -A client application that enables developers to interact with the operating system using natural language. +{**When you're done, you can delete the content in this README and update the file with details for others getting started with your repository**} #### Software Architecture Software architecture description diff --git a/README.md b/README.md index 4bdd403..a11534c 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # euler-copilot-shell #### 介绍 -A client application that enables developers to interact with the operating system using natural language. +{**以下是 Gitee 平台说明,您可以替换此简介** +Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 +无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 [https://gitee.com/enterprises](https://gitee.com/enterprises)} #### 软件架构 软件架构说明 diff --git a/distribution/build_rpm.sh b/distribution/build_rpm.sh index 4fe2ddc..1e05630 100644 --- a/distribution/build_rpm.sh +++ b/distribution/build_rpm.sh @@ -2,6 +2,10 @@ # Check if ~/rpmbuild directory exists; if not, run rpmdev-setuptree if [ ! -d ~/rpmbuild ]; then + if ! command -v rpmdev-setuptree &> /dev/null; then + echo "Command \"rpmdevtools\" not found: dnf install rpmdevtools" + exit 1 + fi rpmdev-setuptree fi @@ -23,8 +27,43 @@ fi # Remove old builds rm -f ~/rpmbuild/RPMS/"$(uname -m)"/eulercopilot-cli-* +# Read command-line arguments +use_release=false +while [[ $# -gt 0 ]]; do + case $1 in + -t|--tag) + custom_tag="$2" + shift 2 + ;; + -r|--release) + use_release=true + shift + ;; + *) + shift + ;; + esac +done + +# Get dist tag +dist=$(python3 -c ' +import re +with open("/etc/openEuler-release", "r") as f: + release = f.readline().strip() +version = re.search(r"(\d+\.\d+)", release).group(1) +major, minor = version.split(".") +sp = re.search(r"SP(\d+)", release) +sp_str = f"sp{sp.group(1)}" if sp else "" +print(f"oe{major}{minor}{sp_str}") +') + +# Prepare `rpmbuild` command +if [ "$use_release" = true ]; then + rpmbuild_cmd="rpmbuild --define \"dist .${dist}\" -bb \"$spec_file\" --nodebuginfo" +else + tag=${custom_tag:-"a$(date +%s)"} + rpmbuild_cmd="rpmbuild --define \"_tag .${tag}\" --define \"dist .${dist}\" -bb \"$spec_file\" --nodebuginfo" +fi + # Build the RPM package using rpmbuild -rpmbuild --define "dist .oe2403" -bb "$spec_file" --nodebuginfo -# rpmbuild --define "_tag .a$(date +%s)" --define "dist .oe2203sp3" -bb "$spec_file" --nodebuginfo -# rpmbuild --define "_tag .beta3" --define "dist .oe2203sp3" -bb "$spec_file" --nodebuginfo -# rpmbuild --define "dist .oe2203sp3" -bb "$spec_file" --nodebuginfo +eval "$rpmbuild_cmd" diff --git a/distribution/eulercopilot-cli.spec b/distribution/eulercopilot-cli.spec index b8ee515..3748680 100644 --- a/distribution/eulercopilot-cli.spec +++ b/distribution/eulercopilot-cli.spec @@ -2,7 +2,7 @@ Name: eulercopilot-cli Version: 1.2.1 -Release: 4%{?_tag}%{?dist} +Release: 6%{?_tag}%{?dist} Group: Applications/Utilities Summary: openEuler Copilot System Command Line Assistant Source: %{name}-%{version}.tar.gz diff --git a/src/copilot/app/copilot_app.py b/src/copilot/app/copilot_app.py index 3d4592e..17b77c6 100644 --- a/src/copilot/app/copilot_app.py +++ b/src/copilot/app/copilot_app.py @@ -150,6 +150,8 @@ def handle_user_input(service: llm_service.LLMService, cmds: list = [] if mode == 'chat': cmds = service.get_shell_commands(user_input) + if mode == 'shell': + cmds = service.get_shell_commands(user_input, single_line_cmd=True) if isinstance(service, framework_api.Framework): if mode == 'flow': cmds = service.flow(user_input, selected_plugins) @@ -210,8 +212,12 @@ def main(user_input: Optional[str], config: dict) -> int: api_key=config.get('framework_api_key'), debug_mode=config.get('debug_mode', False) ) - service.update_session_id() # get "ECSESSION" cookie - service.create_new_conversation() # get conversation_id from backend + update_session_success = service.update_session_id() # get "ECSESSION" cookie + if not update_session_success: + return 1 + create_conversation_success = service.create_new_conversation() # get conversation_id from backend + if not create_conversation_success: + return 1 if mode == 'flow': # get plugin list from current backend plugins: list[framework_api.PluginData] = service.get_plugins() if not plugins: diff --git a/src/copilot/app/copilot_cli.py b/src/copilot/app/copilot_cli.py index caac80d..790254f 100644 --- a/src/copilot/app/copilot_cli.py +++ b/src/copilot/app/copilot_cli.py @@ -24,6 +24,7 @@ from copilot.utilities.i18n import ( cli_help_panel_switch_mode, cli_help_prompt_edit_settings, cli_help_prompt_init_settings, + cli_help_prompt_intro, cli_help_prompt_question, cli_help_prompt_select_backend, cli_help_prompt_switch_mode, @@ -48,7 +49,7 @@ app = typer.Typer( ) -@app.command() +@app.command(help=f'{BRAND_NAME} CLI\n\n{cli_help_prompt_intro}') def cli( question: Optional[str] = typer.Argument( None, show_default=False, @@ -58,6 +59,11 @@ def cli( help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["chat"]), rich_help_panel=cli_help_panel_switch_mode ), + shell: bool = typer.Option( + False, '--shell', '-s', + help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["shell"]), + rich_help_panel=cli_help_panel_switch_mode + ), flow: bool = typer.Option( False, '--plugin', '-p', help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["flow"]), @@ -94,7 +100,6 @@ def cli( hidden=(not ADVANCED_MODE) ) ) -> int: - '''openEuler Copilot System CLI\n\nPress Ctrl+O to ask a question''' if init: setup_copilot() return 0 @@ -118,9 +123,13 @@ def cli( select_query_mode(0) if not question: return 0 + elif shell: + select_query_mode(1) + if not question: + return 0 elif flow: if BACKEND == 'framework': - select_query_mode(1) + select_query_mode(2) if not question: return 0 else: @@ -128,7 +137,7 @@ def cli( return 1 elif diagnose: if BACKEND == 'framework': - select_query_mode(2) + select_query_mode(3) if not question: return 0 else: @@ -136,7 +145,7 @@ def cli( return 1 elif tuning: if BACKEND == 'framework': - select_query_mode(3) + select_query_mode(4) if not question: return 0 else: diff --git a/src/copilot/app/copilot_init.py b/src/copilot/app/copilot_init.py index 655e8bf..e843bd6 100644 --- a/src/copilot/app/copilot_init.py +++ b/src/copilot/app/copilot_init.py @@ -29,6 +29,7 @@ def setup_copilot(): rprint(i18n.settings_init_welcome_usage_guide + '\n') rprint(i18n.settings_init_welcome_help_hint) rprint(i18n.settings_init_welcome_docs_link.format(url=i18n.DOCS_URL) + '\n') + rprint(i18n.settings_init_welcome_alpha_warning.format(brand_name=i18n.BRAND_NAME) + '\n') config = config_manager.load_config() if config.get('backend') == 'spark': diff --git a/src/copilot/backends/framework_api.py b/src/copilot/backends/framework_api.py index c3210e6..fd9cf3e 100644 --- a/src/copilot/backends/framework_api.py +++ b/src/copilot/backends/framework_api.py @@ -60,8 +60,11 @@ class Framework(LLMService): # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str) -> list: - query = self._add_framework_extra_prompt(question) + def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: + if single_line_cmd: + query = self._gen_shell_prompt(question) + else: + query = self._add_framework_extra_prompt(question) if prompt_framework_keyword_install in question.lower(): query = self._add_framework_software_install_prompt(query) self._query_llm_service(query) @@ -73,7 +76,7 @@ class Framework(LLMService): query = self._gen_explain_cmd_prompt(cmd) self._query_llm_service(query, show_suggestion=False) - def update_session_id(self): + def update_session_id(self) -> bool: headers = self._get_headers() try: response = requests.post( @@ -84,16 +87,17 @@ class Framework(LLMService): ) except requests.exceptions.RequestException: self.console.print(backend_framework_request_exceptions.format(brand_name=BRAND_NAME)) - return + return False if response.status_code == 401: self.console.print(backend_framework_auth_invalid_api_key.format(brand_name=BRAND_NAME)) - return + return False if response.status_code != 200: self.console.print(backend_general_request_failed.format(code=response.status_code)) - return + return False self.session_id = response.json().get('result', {}).get('session_id', '') + return True - def create_new_conversation(self): + def create_new_conversation(self) -> bool: headers = self._get_headers() try: response = requests.post( @@ -103,14 +107,15 @@ class Framework(LLMService): ) except requests.exceptions.RequestException: self.console.print(backend_framework_request_exceptions.format(brand_name=BRAND_NAME)) - return + return False if response.status_code == 401: self.console.print(backend_framework_auth_invalid_api_key.format(brand_name=BRAND_NAME)) - return + return False if response.status_code != 200: self.console.print(backend_general_request_failed.format(code=response.status_code)) - return + return False self.conversation_id = response.json().get('result', {}).get('conversation_id', '') + return True def get_plugins(self) -> list: headers = self._get_headers() @@ -323,13 +328,17 @@ class Framework(LLMService): ) def _get_headers(self) -> dict: - return { + host = self.endpoint.strip('http://').strip('https://').strip('/') + headers = { + 'Host': host, 'Accept': '*/*', 'Content-Type': 'application/json; charset=UTF-8', 'Connection': 'keep-alive', - 'Authorization': f'Bearer {self.api_key}', - 'Cookie': f'ECSESSION={self.session_id};' if self.session_id else '', + 'Authorization': f'Bearer {self.api_key}' } + if self.session_id: + headers['Cookie'] = f'ECSESSION={self.session_id};' + return headers def _reset_session_from_cookie(self, cookie: str) -> str: if not cookie: diff --git a/src/copilot/backends/llm_service.py b/src/copilot/backends/llm_service.py index acd4459..275de8e 100644 --- a/src/copilot/backends/llm_service.py +++ b/src/copilot/backends/llm_service.py @@ -10,12 +10,13 @@ from copilot.utilities.i18n import ( prompt_general_root_false, prompt_general_root_true, prompt_general_system, + prompt_single_line_cmd, ) class LLMService(ABC): @abstractmethod - def get_shell_commands(self, question: str) -> list: + def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: pass def explain_shell_command(self, cmd: str): @@ -49,6 +50,9 @@ class LLMService(ABC): return prompt_general_system.format( os=get_os_info(), prompt_general_root=self._gen_sudo_prompt()) + def _gen_shell_prompt(self, question: str) -> str: + return f'{question}\n\n{prompt_single_line_cmd}' + def _gen_chat_prompt(self, question: str) -> str: return prompt_general_chat.format(question=question, os=get_os_info()) diff --git a/src/copilot/backends/openai_api.py b/src/copilot/backends/openai_api.py index d44e9f6..ff5417f 100644 --- a/src/copilot/backends/openai_api.py +++ b/src/copilot/backends/openai_api.py @@ -29,8 +29,11 @@ class ChatOpenAI(LLMService): # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str) -> list: - query = self._gen_chat_prompt(question) + def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: + if single_line_cmd: + query = self._gen_shell_prompt(question) + else: + query = self._gen_chat_prompt(question) self._query_llm_service(query) return self._extract_shell_code_blocks(self.answer) diff --git a/src/copilot/backends/spark_api.py b/src/copilot/backends/spark_api.py index 7f5282c..312e882 100644 --- a/src/copilot/backends/spark_api.py +++ b/src/copilot/backends/spark_api.py @@ -45,8 +45,11 @@ class Spark(LLMService): # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str) -> list: - query = self._gen_chat_prompt(question) + def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: + if single_line_cmd: + query = self._gen_shell_prompt(question) + else: + query = self._gen_chat_prompt(question) self._query_llm_service(query) return self._extract_shell_code_blocks(self.answer) diff --git a/src/copilot/utilities/config_manager.py b/src/copilot/utilities/config_manager.py index 0dac12a..d6acaeb 100644 --- a/src/copilot/utilities/config_manager.py +++ b/src/copilot/utilities/config_manager.py @@ -33,6 +33,7 @@ BACKEND_NAME = { QUERY_MODE_NAME = { 'chat': i18n.query_mode_chat, + 'shell': i18n.query_mode_shell, 'flow': i18n.query_mode_flow, 'diagnose': i18n.query_mode_diagnose, 'tuning': i18n.query_mode_tuning, diff --git a/src/copilot/utilities/i18n.py b/src/copilot/utilities/i18n.py index 4907840..3caed8b 100644 --- a/src/copilot/utilities/i18n.py +++ b/src/copilot/utilities/i18n.py @@ -12,7 +12,9 @@ main_exec_builtin_cmd = _('不支持执行 Shell 内置命令 "{cmd_prefix}", main_exec_value_error = _('执行命令时出错:{error}') main_exec_not_found_error = _('命令不存在:{error}') main_exec_cmd_failed_with_exit_code = _('命令 "{cmd}" 执行中止,退出码:{exit_code}') +main_content_panel_alpha_warning = _('当前为内测版本,请仔细甄别 AI 回答的内容') +cli_help_prompt_intro = _('输入问题后,按下 Ctrl+O 提问 (字母 O)') cli_help_prompt_question = _('通过自然语言提问') cli_help_prompt_switch_mode = _('切换到{mode}模式') cli_help_prompt_init_settings = _('初始化 copilot 设置') @@ -21,10 +23,8 @@ cli_help_prompt_select_backend = _('选择大语言模型后端') cli_help_panel_switch_mode = _('选择问答模式') cli_help_panel_advanced_options = _('高级选项') cli_notif_select_one_mode = _('当前版本只能选择一种问答模式') -cli_notif_compatibility = _('当前大模型后端不支持{mode}功能\n\ -推荐使用 {brand_name} 智能体框架') -cli_notif_no_config = _('请先初始化 copilot 设置\n\ -请使用 "copilot --init" 命令初始化') +cli_notif_compatibility = _('当前大模型后端不支持{mode}功能\n推荐使用 {brand_name} 智能体框架') +cli_notif_no_config = _('请先初始化 copilot 设置\n请使用 "copilot --init" 命令初始化') interact_action_explain = _('解释命令') interact_action_edit = _('编辑命令') @@ -49,10 +49,11 @@ interact_question_select_plugin = _('请选择插件:') interact_select_plugins_valiidate = _('请选择至少一个插件') backend_general_request_failed = _('请求失败: {code}') -backend_framework_auth_invalid_api_key = _('{brand_name} 智能体 API 密钥无效,请检查配置文件') +backend_check_config_msg = _('输入 "vi ~/.config/eulercopilot/config.json" 查看和编辑配置') +backend_framework_auth_invalid_api_key = _('{brand_name} 智能体 API 密钥无效,请检查配置文件\n\n'+ backend_check_config_msg + '\n') backend_framework_request_connection_error = _('{brand_name} 智能体连接失败,请检查网络连接') backend_framework_request_timeout = _('{brand_name} 智能体请求超时,请检查网络连接') -backend_framework_request_exceptions = _('{brand_name} 智能体请求异常,请检查网络连接') +backend_framework_request_exceptions = _('{brand_name} 智能体请求异常,请检查网络连接\n\n' + backend_check_config_msg + '\n') backend_framework_request_unauthorized = _('当前会话已过期,请退出后重试') backend_framework_request_too_many_requests = _('请求过于频繁,请稍后再试') backend_framework_response_ended_prematurely = _('响应异常中止,请检查网络连接') @@ -64,7 +65,7 @@ backend_framework_sugggestion = _('**你可以继续问** {sugggestion}') backend_spark_stream_error = _('请求错误: {code}\n{message}') backend_spark_websockets_exceptions_msg_title = _('请求错误') backend_spark_websockets_exceptions_msg_a = _('请检查 appid 和 api_key 是否正确,或检查网络连接是否正常。\n') -backend_spark_websockets_exceptions_msg_b = _('输入 "vi ~/.config/eulercopilot/config.json" 查看和编辑配置;\n') +backend_spark_websockets_exceptions_msg_b = _(backend_check_config_msg + ';\n') backend_spark_websockets_exceptions_msg_c = _('或尝试 ping {spark_url}') backend_spark_network_error = _('访问大模型失败,请检查网络连接') backend_openai_request_connection_error = _('连接大模型失败') @@ -90,13 +91,22 @@ settings_config_entry_model_api_key = _('OpenAI 模型 API Key') settings_config_entry_model_name = _('OpenAI 模型名称') settings_config_interact_query_mode_disabled_explain = _('当前后端无法使用{mode}模式') settings_init_welcome_msg = _('欢迎使用 {brand_name} 智能体') -settings_init_welcome_usage_guide = _('使用方法:输入问题,按下 Ctrl+O 提问') +settings_init_welcome_usage_guide = _('使用方法:输入问题,按下 Ctrl+O (字母 O) 提问') settings_init_welcome_help_hint = _('更多用法详见命令行帮助:"copilot --help"') settings_init_welcome_docs_link = _('使用指南:{url}') +settings_init_welcome_alpha_warning = _('{brand_name}(内测版)旨在让内测用户提前体验 \ +openEuler 的智能化能力,帮助发现和修复版本质量、可用性及易用性问题,共同将版本做得更加完善。\ +如果您发现任何问题(包括软件设计、软件功能、不合适的问答对等),欢迎您反馈您的宝贵意见!\n\n\ +[bold]本服务仅限于内测用户学习研究、内部测试目的使用[/bold]。您不得将本服务用于生产环境或任何其他商业目的,\ +否则您自行承担由此造成的所有后果和责任。\n\n\ +内测期间,除正常反馈问题外,应遵守内测用户保密规则:禁止在任何地方传播包括但不限于系统界面、\ +功能点等参与内测得知的有关本服务的各种非公开信息。\n\n\ +[bold]以上规则需严格遵守,如有违反,我们有权撤销您的内测资格,情节严重造成恶劣影响或损失者,我们将保留追究其责任的权利。[/bold]') settings_init_framework_api_key_notice_title = _('获取 {brand_name} 智能体 API Key') settings_init_framework_api_key_notice_content = _('请前往 {url},点击右上角头像图标获取 API Key') query_mode_chat = _('智能问答') +query_mode_shell = _('智能 Shell') query_mode_flow = _('智能插件') query_mode_diagnose = _('智能诊断') query_mode_tuning = _('智能调优') @@ -153,6 +163,10 @@ prompt_general_explain_cmd = _('''```bash 要求: 先在代码块中打印一次上述命令,再有条理地解释命令中的主要步骤 ''') +prompt_single_line_cmd = _('''要求: ++ 请用单行 Shell 命令回答; ++ 命令请放在代码块中,并标明代码的语言。 +''') prompt_framework_markdown_format = _('''格式要求: + 你的回答中的代码块和表格都必须用 Markdown 呈现; + 你需要用中文回答问题,除了代码,其他内容都要符合汉语的规范。 diff --git a/src/copilot/utilities/markdown_renderer.py b/src/copilot/utilities/markdown_renderer.py index 261c3b6..22e2f69 100644 --- a/src/copilot/utilities/markdown_renderer.py +++ b/src/copilot/utilities/markdown_renderer.py @@ -5,12 +5,19 @@ from rich.live import Live from rich.markdown import Markdown from rich.panel import Panel +from copilot.utilities.i18n import main_content_panel_alpha_warning + class MarkdownRenderer: @staticmethod def update(live: Live, content: str, sugggestion: str = '', refresh: bool = True): - content_panel = Panel(Markdown(content, code_theme='github-dark'), border_style='gray50') + content_panel = Panel( + Markdown(content, code_theme='github-dark'), + border_style='gray50', + subtitle=main_content_panel_alpha_warning, + subtitle_align='right' + ) if not sugggestion: live.update(content_panel, refresh=refresh) return diff --git a/src/eulercopilot.sh b/src/eulercopilot.sh index 4f10f29..178ae03 100644 --- a/src/eulercopilot.sh +++ b/src/eulercopilot.sh @@ -10,6 +10,8 @@ read_query_mode() { if [ "$query_mode" = "\"chat\"" ]; then echo "智能问答" + elif [ "$query_mode" = "\"shell\"" ]; then + echo "智能 SHELL" elif [ "$query_mode" = "\"flow\"" ]; then echo "智能插件" elif [ "$query_mode" = "\"diagnose\"" ]; then -- Gitee From e9d1de996800516bfb5d028f9d4ab0452c6ede95 Mon Sep 17 00:00:00 2001 From: Hongyu Shi Date: Fri, 28 Feb 2025 07:14:46 +0000 Subject: [PATCH 2/2] =?UTF-8?q?feat:=200.9.2=20*=20feat:=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20EulerCopilot=20=E5=91=BD=E4=BB=A4=E8=A1=8C=E5=8A=A9?= =?UTF-8?q?=E6=89=8B=E7=9A=84=E6=9E=84=E5=BB=BA=E5=92=8C=E5=AE=89=E8=A3=85?= =?UTF-8?q?=E8=A7=84=E8=8C=83=20*=20Prepare=20release=20for=20EulerCopilot?= =?UTF-8?q?=200.9.2=20*=20fix:=20=E6=9B=B4=E6=96=B0=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=EF=BC=8C=E4=BF=AE=E5=A4=8D=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=E5=92=8C=E5=AD=97=E7=AC=A6=E4=B8=B2=E5=BC=95=E5=8F=B7=E6=A0=B7?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E6=8F=90=E5=8D=87=E5=8F=AF=E8=AF=BB=E6=80=A7?= =?UTF-8?q?=20*=20feat:=20=E9=85=8D=E7=BD=AE=E7=AE=A1=E7=90=86=E5=99=A8=20?= =?UTF-8?q?*=20=E6=9B=B4=E6=96=B0=20LLMService=20=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E6=96=B9=E6=B3=95=E5=90=8D=E7=A7=B0?= =?UTF-8?q?=E5=B9=B6=E8=BF=94=E5=9B=9E=E7=BB=93=E6=9E=84=E5=8C=96=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=EF=BC=8C=E6=96=B0=E5=A2=9E=E5=B0=86=20Python=20?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=9D=97=E4=BF=9D=E5=AD=98=E5=88=B0=E6=96=87?= =?UTF-8?q?=E4=BB=B6=20*=20=E6=9B=B4=E6=96=B0=E7=89=88=E6=9D=83=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E8=87=B32024-2025=EF=BC=8C=E5=B9=B6=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=83=A8=E5=88=86=E5=87=BD=E6=95=B0=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 17 - distribution/build_rpm.sh | 24 +- distribution/create_tarball.py | 19 +- ...ilot-cli.spec => euler-copilot-shell.spec} | 8 +- ruff.toml | 2 +- src/copilot.py | 2 +- src/copilot/__init__.py | 1 + src/copilot/__main__.py | 2 +- src/copilot/app/__init__.py | 1 + src/copilot/app/copilot_app.py | 283 +++++++------ src/copilot/app/copilot_cli.py | 126 +++--- src/copilot/app/copilot_init.py | 103 ++--- src/copilot/backends/__init__.py | 1 + src/copilot/backends/framework_api.py | 380 ++++++++++-------- src/copilot/backends/llm_service.py | 49 ++- src/copilot/backends/openai_api.py | 153 +++---- src/copilot/backends/spark_api.py | 144 +++---- src/copilot/utilities/__init__.py | 1 + src/copilot/utilities/config_manager.py | 221 +++++----- src/copilot/utilities/env_info.py | 30 +- src/copilot/utilities/i18n.py | 227 ++++++----- src/copilot/utilities/interact.py | 131 +++--- src/copilot/utilities/markdown_renderer.py | 16 +- src/copilot/utilities/shell_script.py | 12 +- src/eulercopilot.sh | 10 +- src/setup.py | 74 ++-- 26 files changed, 1084 insertions(+), 953 deletions(-) delete mode 100644 .vscode/settings.json rename distribution/{eulercopilot-cli.spec => euler-copilot-shell.spec} (92%) create mode 100644 src/copilot/app/__init__.py create mode 100644 src/copilot/backends/__init__.py create mode 100644 src/copilot/utilities/__init__.py diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 1e37660..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "python.languageServer": "Pylance", - "python.terminal.focusAfterLaunch": true, - "python.analysis.extraPaths": ["${workspaceFolder}/src"], - "python.analysis.typeCheckingMode": "standard", - "[python]": { - "editor.formatOnSave": false, - "editor.defaultFormatter": "charliermarsh.ruff", - "editor.codeActionsOnSave": { - "source.fixAll": "never", - "source.organizeImports": "always" - } - }, - "pylint.cwd": "${workspaceFolder}/src", - "ruff.lint.enable": true, - "ruff.organizeImports": true -} \ No newline at end of file diff --git a/distribution/build_rpm.sh b/distribution/build_rpm.sh index 1e05630..43d7160 100644 --- a/distribution/build_rpm.sh +++ b/distribution/build_rpm.sh @@ -2,7 +2,7 @@ # Check if ~/rpmbuild directory exists; if not, run rpmdev-setuptree if [ ! -d ~/rpmbuild ]; then - if ! command -v rpmdev-setuptree &> /dev/null; then + if ! command -v rpmdev-setuptree &>/dev/null; then echo "Command \"rpmdevtools\" not found: dnf install rpmdevtools" exit 1 fi @@ -31,17 +31,17 @@ rm -f ~/rpmbuild/RPMS/"$(uname -m)"/eulercopilot-cli-* use_release=false while [[ $# -gt 0 ]]; do case $1 in - -t|--tag) - custom_tag="$2" - shift 2 - ;; - -r|--release) - use_release=true - shift - ;; - *) - shift - ;; + -t | --tag) + custom_tag="$2" + shift 2 + ;; + -r | --release) + use_release=true + shift + ;; + *) + shift + ;; esac done diff --git a/distribution/create_tarball.py b/distribution/create_tarball.py index 14923da..67b05eb 100644 --- a/distribution/create_tarball.py +++ b/distribution/create_tarball.py @@ -5,27 +5,24 @@ import tarfile def extract_spec_fields(spec_file): - with open(spec_file, 'r', encoding='utf-8') as f: + with open(spec_file, "r", encoding="utf-8") as f: content = f.read() - name_pattern = re.compile(r'^Name:\s*(.+)$', re.MULTILINE) - version_pattern = re.compile(r'^Version:\s*(.+)$', re.MULTILINE) + name_pattern = re.compile(r"^Name:\s*(.+)$", re.MULTILINE) + version_pattern = re.compile(r"^Version:\s*(.+)$", re.MULTILINE) name_match = name_pattern.search(content) version_match = version_pattern.search(content) if name_match and version_match: - return { - 'name': name_match.group(1).strip(), - 'version': version_match.group(1).strip() - } + return {"name": name_match.group(1).strip(), "version": version_match.group(1).strip()} else: raise ValueError("Could not find Name or Version fields in the spec file") def create_cache_folder(spec_info, src_dir): - name = spec_info['name'] - version = spec_info['version'] + name = spec_info["name"] + version = spec_info["version"] cache_folder_name = f"{name}-{version}" cache_folder_path = os.path.join(os.path.dirname(src_dir), cache_folder_name) @@ -41,13 +38,13 @@ def create_cache_folder(spec_info, src_dir): def copy_files(src_dir, dst_dir): for dirpath, _, files in os.walk(src_dir): relative_path = os.path.relpath(dirpath, src_dir) - target_path = os.path.join(dst_dir, relative_path.strip(f'{os.curdir}{os.sep}')) + target_path = os.path.join(dst_dir, relative_path.strip(f"{os.curdir}{os.sep}")) if not os.path.exists(target_path): os.makedirs(target_path) for file in files: - if file.endswith('.py') or file.endswith('.sh'): + if file.endswith(".py") or file.endswith(".sh"): src_file = os.path.join(dirpath, file) dst_file = os.path.join(target_path, file) os.link(src_file, dst_file) # 使用硬链接以节省空间和时间 diff --git a/distribution/eulercopilot-cli.spec b/distribution/euler-copilot-shell.spec similarity index 92% rename from distribution/eulercopilot-cli.spec rename to distribution/euler-copilot-shell.spec index 3748680..70b4a76 100644 --- a/distribution/eulercopilot-cli.spec +++ b/distribution/euler-copilot-shell.spec @@ -1,10 +1,10 @@ %global debug_package %{nil} -Name: eulercopilot-cli -Version: 1.2.1 +Name: euler-copilot-shell +Version: 0.9.2 Release: 6%{?_tag}%{?dist} Group: Applications/Utilities -Summary: openEuler Copilot System Command Line Assistant +Summary: EulerCopilot Command Line Assistant Source: %{name}-%{version}.tar.gz License: MulanPSL-2.0 URL: https://www.openeuler.org/zh/ @@ -16,7 +16,7 @@ BuildRequires: python3-Cython gcc Requires: python3 jq hostname %description -openEuler Copilot System Command Line Assistant +EulerCopilot Command Line Assistant %prep %setup -q diff --git a/ruff.toml b/ruff.toml index 9e4f59f..7966ca0 100644 --- a/ruff.toml +++ b/ruff.toml @@ -12,4 +12,4 @@ target-version = "py39" [format] # Prefer single quotes over double quotes. -quote-style = "single" +# quote-style = "single" diff --git a/src/copilot.py b/src/copilot.py index 88e8640..b1bab51 100755 --- a/src/copilot.py +++ b/src/copilot.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import sys diff --git a/src/copilot/__init__.py b/src/copilot/__init__.py index e69de29..4f5adbb 100644 --- a/src/copilot/__init__.py +++ b/src/copilot/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. diff --git a/src/copilot/__main__.py b/src/copilot/__main__.py index d266ac4..0ab163c 100644 --- a/src/copilot/__main__.py +++ b/src/copilot/__main__.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import sys diff --git a/src/copilot/app/__init__.py b/src/copilot/app/__init__.py new file mode 100644 index 0000000..4f5adbb --- /dev/null +++ b/src/copilot/app/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. diff --git a/src/copilot/app/copilot_app.py b/src/copilot/app/copilot_app.py index 17b77c6..d66f425 100644 --- a/src/copilot/app/copilot_app.py +++ b/src/copilot/app/copilot_app.py @@ -1,11 +1,10 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. - -# pylint: disable=W0611 +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import re -import readline # noqa: F401 +import readline import shlex import subprocess +from pathlib import Path from typing import Optional from rich.console import Console @@ -16,85 +15,85 @@ from rich.text import Text from copilot.backends import framework_api, llm_service, openai_api, spark_api from copilot.utilities import i18n, interact -from copilot.utilities.config_manager import CONFIG_ENTRY_NAME, config_to_markdown, load_config, update_config +from copilot.utilities.config_manager import CONFIG_ENTRY_NAME, Config + +CONFIG: Config = Config() selected_plugins: list = [] def check_shell_features(cmd: str) -> bool: + """Check if the shell command contains special features.""" patterns = [ # 重定向 - r'\>|\<|\>\>|\<<', + r"\>|\<|\>\>|\<<", # 管道 - r'\|', + r"\|", # 通配符 - r'\*|\?', + r"\*|\?", # 美元符号开头的环境变量 - r'\$[\w_]+', + r"\$[\w_]+", # 历史展开 - r'!', + r"!", # 后台运行符号 - r'&', + r"&", # 分号 - r';', + r";", # 括号命令分组 - r'\(|\)|\{|\}', + r"\(|\)|\{|\}", # 逻辑操作符 - r'&&|\|\|', + r"&&|\|\|", # Shell函数或变量赋值 - r'\b\w+\s*=\s*[^=\s]+' + r"\b\w+\s*=\s*[^=\s]+", ] - - for pattern in patterns: - if re.search(pattern, cmd): - return True - return False + return any(re.search(pattern, cmd) for pattern in patterns) def execute_shell_command(cmd: str) -> int: - '''Execute a shell command and exit.''' + """Execute a shell command and exit.""" if check_shell_features(cmd): try: - process = subprocess.Popen(cmd, shell=True) + process = subprocess.Popen(cmd, shell=True) # noqa: S602 except ValueError as e: - print(i18n.main_exec_value_error.format(error=e)) + print(i18n.main_exec_value_error.format(error=e)) # noqa: T201 return 1 else: try: - process = subprocess.Popen(shlex.split(cmd)) + process = subprocess.Popen(shlex.split(cmd)) # noqa: S603 except FileNotFoundError as e: - builtin_cmds = ['.', 'source', 'history', 'cd', 'export', 'alias', 'test'] + builtin_cmds = [".", "source", "history", "cd", "export", "alias", "test"] cmd_prefix = cmd.split()[0] if cmd_prefix in builtin_cmds: - print(i18n.main_exec_builtin_cmd.format(cmd_prefix=cmd_prefix)) + print(i18n.main_exec_builtin_cmd.format(cmd_prefix=cmd_prefix)) # noqa: T201 else: - print(i18n.main_exec_not_found_error.format(error=e)) + print(i18n.main_exec_not_found_error.format(error=e)) # noqa: T201 return 1 - exit_code = process.wait() - return exit_code + return process.wait() -def print_shell_commands(cmds: list): +def print_shell_commands(cmds: list) -> None: + """Display shell commands in a formatted panel.""" console = Console() - with Live(console=console, vertical_overflow='visible') as live: + with Live(console=console, vertical_overflow="visible") as live: live.update( Panel( Markdown( - '```bash\n' + '\n\n'.join(cmds) + '\n```', - code_theme='github-dark' + "```bash\n" + "\n\n".join(cmds) + "\n```", + code_theme="github-dark", ), - border_style='gray50' - ) + border_style="gray50", + ), ) def command_interaction_loop(cmds: list, service: llm_service.LLMService) -> int: + """Interact with the user to select and execute shell commands.""" if not cmds: return -1 print_shell_commands(cmds) while True: action = interact.select_action(len(cmds) > 1) - if action in ('execute_all', 'execute_selected', 'execute'): + if action in ("execute_all", "execute_selected", "execute"): exit_code: int = 0 selected_cmds = get_selected_cmds(cmds, action) if not selected_cmds: @@ -102,154 +101,154 @@ def command_interaction_loop(cmds: list, service: llm_service.LLMService) -> int for cmd in selected_cmds: exit_code = execute_shell_command(cmd) if exit_code != 0: - print( + print( # noqa: T201 i18n.main_exec_cmd_failed_with_exit_code.format( cmd=cmd, - exit_code=exit_code - ) + exit_code=exit_code, + ), ) break return -1 - if action == 'explain': + if action == "explain": service.explain_shell_command(select_one_cmd(cmds)) - elif action == 'edit': - i = select_one_cmd_with_index(cmds) - readline.set_startup_hook(lambda: readline.insert_text(cmds[i])) + elif action == "edit": + selected_cmd_idx = select_one_cmd_with_index(cmds) + readline.set_startup_hook(lambda idx=selected_cmd_idx: readline.insert_text(cmds[idx])) try: - cmds[i] = input() + cmds[selected_cmd_idx] = input() finally: readline.set_startup_hook() print_shell_commands(cmds) - elif action == 'cancel': + elif action == "cancel": return -1 +def handle_python_code_block(code: str) -> None: + """Handle Python code blocks in Markdown.""" + console = Console() + input_prompt = i18n.interact_question_input_file_name + file_name = console.input(__stylized_input_prompt(input_prompt)).removesuffix(".py") + ".py" + with Path(file_name).open("w", encoding="utf-8") as f: + f.write(code) + + def get_selected_cmds(cmds: list, action: str) -> list: - if action in ('execute', 'execute_all'): + """Retrieve commands based on the specified action.""" + if action in ("execute", "execute_all"): return cmds - if action == 'execute_selected': + if action == "execute_selected": return interact.select_multiple_commands(cmds) return [] def select_one_cmd(cmds: list) -> str: + """Select one command from the list based on user input.""" if len(cmds) == 1: return cmds[0] return interact.select_command(cmds) def select_one_cmd_with_index(cmds: list) -> int: + """Select one command with its index from the list based on user input.""" if len(cmds) == 1: return 0 return interact.select_command_with_index(cmds) -def handle_user_input(service: llm_service.LLMService, - user_input: str, mode: str) -> int: - '''Process user input based on the given flag and backend configuration.''' - cmds: list = [] - if mode == 'chat': - cmds = service.get_shell_commands(user_input) - if mode == 'shell': - cmds = service.get_shell_commands(user_input, single_line_cmd=True) +def handle_user_input(service: llm_service.LLMService, user_input: str, mode: str) -> int: + """Process user input based on the given flag and backend configuration.""" + result: llm_service.LLMService.LLMResult = llm_service.LLMService.LLMResult(None, None) + if mode == "chat": + result = service.get_llm_result(user_input) + if mode == "shell": + result = service.get_llm_result(user_input, single_line_cmd=True) if isinstance(service, framework_api.Framework): - if mode == 'flow': - cmds = service.flow(user_input, selected_plugins) - if mode == 'diagnose': - cmds = service.diagnose(user_input) - if mode == 'tuning': - cmds = service.tuning(user_input) - if cmds: - return command_interaction_loop(cmds, service) + if mode == "plugin": + result = service.plugin(user_input, selected_plugins) + if mode == "diagnose": + result = service.diagnose(user_input) + if mode == "tuning": + result = service.tuning(user_input) + if result.code and interact.ask_boolean(i18n.interact_question_save_python_code): + handle_python_code_block(result.code) + if result.cmds: + return command_interaction_loop(result.cmds, service) return -1 -def edit_config(): +def edit_config() -> None: + """Edit the configuration settings.""" console = Console() with Live(console=console) as live: - live.update( - Panel(Markdown(config_to_markdown(), code_theme='github-dark'), - border_style='gray50')) + live.update(Panel(Markdown(CONFIG.to_markdown(), code_theme="github-dark"), border_style="gray50")) while True: selected_entry = interact.select_settings_entry() - if selected_entry == 'cancel': + if selected_entry == "cancel": return - if selected_entry == 'backend': + if selected_entry == "backend": backend = interact.select_backend() - if selected_entry != 'cancel': - update_config(selected_entry, backend) - elif selected_entry == 'query_mode': - backend = load_config().get('backend', '') - update_config(selected_entry, interact.select_query_mode(backend)) - elif selected_entry in ('advanced_mode', 'debug_mode'): - input_prompt = i18n.interact_question_yes_or_no.format( - question_body=CONFIG_ENTRY_NAME.get(selected_entry)) - update_config(selected_entry, interact.ask_boolean(input_prompt)) + if selected_entry != "cancel": + CONFIG.update(selected_entry, backend) + elif selected_entry == "query_mode": + CONFIG.update(selected_entry, interact.select_query_mode(CONFIG.data.backend)) + elif selected_entry in ("advanced_mode", "debug_mode"): + input_prompt = i18n.interact_question_yes_or_no.format(question_body=CONFIG_ENTRY_NAME.get(selected_entry)) + CONFIG.update(selected_entry, interact.ask_boolean(input_prompt)) else: - original_text: str = load_config().get(selected_entry, '') - new_text = '' - input_prompt = i18n.interact_question_input_text.format( - question_body=CONFIG_ENTRY_NAME.get(selected_entry)) - stylized_input_prompt = Text('❯ ', style='#005f87 bold')\ - .append(input_prompt, style='bold') - readline.set_startup_hook(lambda: readline.insert_text(original_text)) + original_text: str = CONFIG.data.to_dict().get(selected_entry, "") + new_text = "" + input_prompt = i18n.interact_question_input_text.format(question_body=CONFIG_ENTRY_NAME.get(selected_entry)) + readline.set_startup_hook(lambda text=original_text: readline.insert_text(text)) try: - new_text = console.input(stylized_input_prompt) + new_text = console.input(__stylized_input_prompt(input_prompt)) finally: readline.set_startup_hook() - update_config(selected_entry, new_text) + CONFIG.update(selected_entry, new_text) + + +def main(user_input: Optional[str]) -> int: + """Handle user input and interact with the backend service. + + :param user_input: The user input string. + :return: The exit code. + """ + backend = CONFIG.data.backend + mode = CONFIG.data.query_mode + service: Optional[llm_service.LLMService] = __create_llm_service(backend) + if service is None: + print(f"\033[1;31m{i18n.main_service_is_none}\033[0m") # noqa: T201 + return 1 + + if not __initialize_service(service, mode): + return 1 + + print(f"\033[33m{i18n.main_exit_prompt}\033[0m") # noqa: T201 + + return __process_user_input(service, user_input, mode) -# pylint: disable=W0603 -def main(user_input: Optional[str], config: dict) -> int: +def __initialize_service(service: llm_service.LLMService, mode: str) -> bool: + """Initialize the service and handle plugin selection if needed.""" global selected_plugins - backend = config.get('backend') - mode = str(config.get('query_mode')) - service: Optional[llm_service.LLMService] = None - if backend == 'framework': - service = framework_api.Framework( - url=config.get('framework_url'), - api_key=config.get('framework_api_key'), - debug_mode=config.get('debug_mode', False) - ) - update_session_success = service.update_session_id() # get "ECSESSION" cookie - if not update_session_success: - return 1 - create_conversation_success = service.create_new_conversation() # get conversation_id from backend - if not create_conversation_success: - return 1 - if mode == 'flow': # get plugin list from current backend + if isinstance(service, framework_api.Framework): + if not service.update_session_id() or not service.create_new_conversation(): + return False + if mode == "plugin": # get plugin list from current backend plugins: list[framework_api.PluginData] = service.get_plugins() if not plugins: - print(f'\033[1;31m{i18n.main_service_framework_plugin_is_none}\033[0m') - return 1 + print(f"\033[1;31m{i18n.main_service_framework_plugin_is_none}\033[0m") # noqa: T201 + return False selected_plugins = [interact.select_one_plugin(plugins)] - elif backend == 'spark': - service = spark_api.Spark( - app_id=config.get('spark_app_id'), - api_key=config.get('spark_api_key'), - api_secret=config.get('spark_api_secret'), - spark_url=config.get('spark_url'), - domain=config.get('spark_domain') - ) - elif backend == 'openai': - service = openai_api.ChatOpenAI( - url=str(config.get('model_url')), - api_key=config.get('model_api_key'), - model=config.get('model_name') - ) - - if service is None: - print(f'\033[1;31m{i18n.main_service_is_none}\033[0m') - return 1 + return True - print(f'\033[33m{i18n.main_exit_prompt}\033[0m') +def __process_user_input(service: llm_service.LLMService, user_input: Optional[str], mode: str) -> int: + """Process the user input and interact with the service.""" try: while True: if user_input is None: - user_input = input('\033[35m❯\033[0m ') - if user_input.lower().startswith('exit'): + user_input = input("\033[35m❯\033[0m ") + if user_input.lower().startswith("exit"): return 0 exit_code = handle_user_input(service, user_input, mode) if exit_code != -1: @@ -260,3 +259,31 @@ def main(user_input: Optional[str], config: dict) -> int: service.stop() print() return 0 + + +def __create_llm_service(backend: str) -> Optional[llm_service.LLMService]: + if backend == "framework": + return framework_api.Framework( + url=CONFIG.data.framework_url, + api_key=CONFIG.data.framework_api_key, + debug_mode=CONFIG.data.debug_mode, + ) + if backend == "spark": + return spark_api.Spark( + app_id=CONFIG.data.spark_app_id, + api_key=CONFIG.data.spark_api_key, + api_secret=CONFIG.data.spark_api_secret, + spark_url=CONFIG.data.spark_url, + domain=CONFIG.data.spark_domain, + ) + if backend == "openai": + return openai_api.ChatOpenAI( + url=str(CONFIG.data.model_url), + api_key=CONFIG.data.model_api_key, + model=CONFIG.data.model_name, + ) + return None + + +def __stylized_input_prompt(prompt_text: str) -> Text: + return Text("❯ ", style="#005f87 bold").append(prompt_text, style="bold") diff --git a/src/copilot/app/copilot_cli.py b/src/copilot/app/copilot_cli.py index 790254f..45be117 100644 --- a/src/copilot/app/copilot_cli.py +++ b/src/copilot/app/copilot_cli.py @@ -1,9 +1,7 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. -# pylint: disable=R0911,R0912,R0913 - -import os import sys +from pathlib import Path from typing import Optional import typer @@ -11,12 +9,8 @@ import typer from copilot.app.copilot_app import edit_config, main from copilot.app.copilot_init import setup_copilot from copilot.utilities.config_manager import ( - CONFIG_PATH, - DEFAULT_CONFIG, QUERY_MODE_NAME, - load_config, - select_backend, - select_query_mode, + Config, ) from copilot.utilities.i18n import ( BRAND_NAME, @@ -33,140 +27,150 @@ from copilot.utilities.i18n import ( cli_notif_select_one_mode, ) -CONFIG: dict = load_config() -BACKEND: str = CONFIG.get('backend', DEFAULT_CONFIG['backend']) -ADVANCED_MODE: bool = CONFIG.get('advanced_mode', DEFAULT_CONFIG['advanced_mode']) -DEBUG_MODE: bool = CONFIG.get('debug_mode', DEFAULT_CONFIG['debug_mode']) -CONFIG_INITIALIZED: bool = os.path.exists(CONFIG_PATH) +CONFIG: Config = Config() +BACKEND: str = CONFIG.data.backend +ADVANCED_MODE: bool = CONFIG.data.advanced_mode +DEBUG_MODE: bool = CONFIG.data.debug_mode +CONFIG_INITIALIZED: bool = Path(CONFIG.config_path).exists() app = typer.Typer( context_settings={ - 'help_option_names': ['-h', '--help'], - 'allow_interspersed_args': True + "help_option_names": ["-h", "--help"], + "allow_interspersed_args": True, }, pretty_exceptions_show_locals=DEBUG_MODE, - add_completion=False + add_completion=False, ) -@app.command(help=f'{BRAND_NAME} CLI\n\n{cli_help_prompt_intro}') +@app.command(help=f"{BRAND_NAME} CLI\n\n{cli_help_prompt_intro}") def cli( - question: Optional[str] = typer.Argument( - None, show_default=False, - help=cli_help_prompt_question), + question: Optional[str] = typer.Argument(None, show_default=False, help=cli_help_prompt_question), chat: bool = typer.Option( - False, '--chat', '-c', + False, + "--chat", + "-c", help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["chat"]), - rich_help_panel=cli_help_panel_switch_mode + rich_help_panel=cli_help_panel_switch_mode, ), shell: bool = typer.Option( - False, '--shell', '-s', + False, + "--shell", + "-s", help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["shell"]), - rich_help_panel=cli_help_panel_switch_mode + rich_help_panel=cli_help_panel_switch_mode, ), - flow: bool = typer.Option( - False, '--plugin', '-p', - help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["flow"]), + plugin: bool = typer.Option( + False, + "--plugin", + "-p", + help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["plugin"]), rich_help_panel=cli_help_panel_switch_mode, - hidden=(BACKEND != 'framework'), + hidden=(BACKEND != "framework"), ), diagnose: bool = typer.Option( - False, '--diagnose', '-d', + False, + "--diagnose", + "-d", help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["diagnose"]), rich_help_panel=cli_help_panel_switch_mode, - hidden=(BACKEND != 'framework') + hidden=(BACKEND != "framework"), ), tuning: bool = typer.Option( - False, '--tuning', '-t', + False, + "--tuning", + "-t", help=cli_help_prompt_switch_mode.format(mode=QUERY_MODE_NAME["tuning"]), rich_help_panel=cli_help_panel_switch_mode, - hidden=(BACKEND != 'framework') + hidden=(BACKEND != "framework"), ), init: bool = typer.Option( - False, '--init', + False, + "--init", help=cli_help_prompt_init_settings, - hidden=(CONFIG_INITIALIZED) + hidden=(CONFIG_INITIALIZED), ), backend: bool = typer.Option( - False, '--backend', + False, + "--backend", help=cli_help_prompt_select_backend, rich_help_panel=cli_help_panel_advanced_options, - hidden=(not ADVANCED_MODE) + hidden=(not ADVANCED_MODE), ), settings: bool = typer.Option( - False, '--settings', + False, + "--settings", help=cli_help_prompt_edit_settings, rich_help_panel=cli_help_panel_advanced_options, - hidden=(not ADVANCED_MODE) - ) + hidden=(not ADVANCED_MODE), + ), ) -> int: if init: setup_copilot() return 0 if not CONFIG_INITIALIZED: - print(f'\033[1;31m{cli_notif_no_config}\033[0m') + print(f"\033[1;31m{cli_notif_no_config}\033[0m") return 1 if backend: if ADVANCED_MODE: - select_backend() + CONFIG.select_backend() return 0 if settings: if ADVANCED_MODE: edit_config() return 0 - if sum(map(bool, [chat, flow, diagnose, tuning])) > 1: - print(f'\033[1;31m{cli_notif_select_one_mode}\033[0m') + if sum(map(bool, [chat, plugin, diagnose, tuning])) > 1: + print(f"\033[1;31m{cli_notif_select_one_mode}\033[0m") return 1 if chat: - select_query_mode(0) + CONFIG.select_query_mode(0) if not question: return 0 elif shell: - select_query_mode(1) + CONFIG.select_query_mode(1) if not question: return 0 - elif flow: - if BACKEND == 'framework': - select_query_mode(2) + elif plugin: + if BACKEND == "framework": + CONFIG.select_query_mode(2) if not question: return 0 else: - compatibility_notification(QUERY_MODE_NAME['flow']) + compatibility_notification(QUERY_MODE_NAME["plugin"]) return 1 elif diagnose: - if BACKEND == 'framework': - select_query_mode(3) + if BACKEND == "framework": + CONFIG.select_query_mode(3) if not question: return 0 else: - compatibility_notification(QUERY_MODE_NAME['diagnose']) + compatibility_notification(QUERY_MODE_NAME["diagnose"]) return 1 elif tuning: - if BACKEND == 'framework': - select_query_mode(4) + if BACKEND == "framework": + CONFIG.select_query_mode(4) if not question: return 0 else: - compatibility_notification(QUERY_MODE_NAME['tuning']) + compatibility_notification(QUERY_MODE_NAME["tuning"]) return 1 if question: question = question.strip() - return main(question, load_config()) + return main(question) -def compatibility_notification(mode: str): - print('\033[33m', cli_notif_compatibility.format(mode=mode, brand_name=BRAND_NAME), - '\033[0m', sep='') +def compatibility_notification(mode: str) -> None: + print("\033[33m", cli_notif_compatibility.format(mode=mode, brand_name=BRAND_NAME), "\033[0m", sep="") def entry_point() -> int: return app() -if __name__ == '__main__': +if __name__ == "__main__": code = entry_point() sys.exit(code) diff --git a/src/copilot/app/copilot_init.py b/src/copilot/app/copilot_init.py index e843bd6..2818415 100644 --- a/src/copilot/app/copilot_init.py +++ b/src/copilot/app/copilot_init.py @@ -1,8 +1,5 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. -# pylint: disable=W0611 - -import os import readline # noqa: F401 from rich import print as rprint @@ -10,56 +7,60 @@ from rich import print as rprint from copilot.utilities import config_manager, i18n -def setup_copilot(): - def _init_config(): - if not os.path.exists(config_manager.CONFIG_DIR): - os.makedirs(config_manager.CONFIG_DIR) - if not os.path.exists(config_manager.CONFIG_PATH): - config_manager.init_config() +def setup_copilot() -> None: + config = config_manager.Config() def _prompt_for_config(config_key: str, prompt_text: str) -> str: config_value = input(prompt_text) - config_manager.update_config(config_key, config_value) + config.update(config_key, config_value) return config_value - if not os.path.exists(config_manager.CONFIG_PATH): - _init_config() - - rprint(f'\n[bold]{i18n.settings_init_welcome_msg.format(brand_name=i18n.BRAND_NAME)}[/bold]\n') - rprint(i18n.settings_init_welcome_usage_guide + '\n') - rprint(i18n.settings_init_welcome_help_hint) - rprint(i18n.settings_init_welcome_docs_link.format(url=i18n.DOCS_URL) + '\n') - rprint(i18n.settings_init_welcome_alpha_warning.format(brand_name=i18n.BRAND_NAME) + '\n') + rprint(f"\n[bold]{i18n.init_welcome_msg.format(brand_name=i18n.BRAND_NAME)}[/bold]\n") + rprint(i18n.init_welcome_usage_guide + "\n") + rprint(i18n.init_welcome_help_hint) + rprint(i18n.init_welcome_docs_link.format(url=i18n.DOCS_URL) + "\n") + rprint(i18n.init_welcome_alpha_warning.format(brand_name=i18n.BRAND_NAME) + "\n") - config = config_manager.load_config() - if config.get('backend') == 'spark': - if config.get('spark_app_id') == '': - _prompt_for_config('spark_app_id', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_spark_app_id)) - if config.get('spark_api_key') == '': - _prompt_for_config('spark_api_key', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_spark_api_key)) - if config.get('spark_api_secret') == '': - _prompt_for_config('spark_api_secret', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_spark_api_secret)) - if config.get('backend') == 'framework': - framework_url = config.get('framework_url') - if framework_url == '': - framework_url = _prompt_for_config('framework_url', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_framework_url)) - if config.get('framework_api_key') == '': - title = i18n.settings_init_framework_api_key_notice_title.format(brand_name=i18n.BRAND_NAME) - rprint(f'[bold]{title}[/bold]') - rprint(i18n.settings_init_framework_api_key_notice_content.format(url=framework_url)) - _prompt_for_config('framework_api_key', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_framework_api_key.format(brand_name=i18n.BRAND_NAME))) - if config.get('backend') == 'openai': - if config.get('model_url') == '': - _prompt_for_config('model_url', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_model_url)) - if config.get('model_api_key') == '': - _prompt_for_config('model_api_key', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_model_api_key)) - if config.get('model_name') == '': - _prompt_for_config('model_name', i18n.interact_question_input_text.format( - question_body=i18n.settings_config_entry_model_name)) + if config.data.backend == "spark": + if config.data.spark_app_id == "": + _prompt_for_config( + "spark_app_id", i18n.interact_question_input_text.format(question_body=i18n.config_entry_spark_app_id) + ) + if config.data.spark_api_key == "": + _prompt_for_config( + "spark_api_key", i18n.interact_question_input_text.format(question_body=i18n.config_entry_spark_api_key) + ) + if config.data.spark_api_secret == "": + _prompt_for_config( + "spark_api_secret", + i18n.interact_question_input_text.format(question_body=i18n.config_entry_spark_api_secret), + ) + if config.data.backend == "framework": + framework_url = config.data.framework_url + if framework_url == "": + framework_url = _prompt_for_config( + "framework_url", i18n.interact_question_input_text.format(question_body=i18n.config_entry_framework_url) + ) + if config.data.framework_api_key == "": + title = i18n.init_framework_api_key_notice_title.format(brand_name=i18n.BRAND_NAME) + rprint(f"[bold]{title}[/bold]") + rprint(i18n.init_framework_api_key_notice_content.format(url=framework_url)) + _prompt_for_config( + "framework_api_key", + i18n.interact_question_input_text.format( + question_body=i18n.config_entry_framework_api_key.format(brand_name=i18n.BRAND_NAME) + ), + ) + if config.data.backend == "openai": + if config.data.model_url == "": + _prompt_for_config( + "model_url", i18n.interact_question_input_text.format(question_body=i18n.config_entry_model_url) + ) + if config.data.model_api_key == "": + _prompt_for_config( + "model_api_key", i18n.interact_question_input_text.format(question_body=i18n.config_entry_model_api_key) + ) + if config.data.model_name == "": + _prompt_for_config( + "model_name", i18n.interact_question_input_text.format(question_body=i18n.config_entry_model_name) + ) diff --git a/src/copilot/backends/__init__.py b/src/copilot/backends/__init__.py new file mode 100644 index 0000000..4f5adbb --- /dev/null +++ b/src/copilot/backends/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. diff --git a/src/copilot/backends/framework_api.py b/src/copilot/backends/framework_api.py index fd9cf3e..bd3d774 100644 --- a/src/copilot/backends/framework_api.py +++ b/src/copilot/backends/framework_api.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import json import re @@ -39,266 +39,311 @@ from copilot.utilities.shell_script import write_shell_script FRAMEWORK_LLM_STREAM_BAD_REQUEST_MSG = { 401: backend_framework_request_unauthorized, - 429: backend_framework_request_too_many_requests + 429: backend_framework_request_too_many_requests, } +HTTP_OK = 200 +HTTP_AUTH_ERROR = 401 + -# pylint: disable=R0902 class Framework(LLMService): - def __init__(self, url, api_key, debug_mode=False): + """EulerCopilot Framework Service""" + + def __init__(self, url: str, api_key: str, *, debug_mode: bool = False) -> None: + """Initialize EulerCopilot Framework Service""" self.endpoint: str = url self.api_key: str = api_key self.debug_mode: bool = debug_mode - # 临时数据 (本轮对话) - self.session_id: str = '' + # 临时数据 (本轮对话) # noqa: ERA001 + self.session_id: str = "" self.plugins: list = [] - self.conversation_id: str = '' - # 临时数据 (本次问答) - self.content: str = '' + self.conversation_id: str = "" + # 临时数据 (本次问答) # noqa: ERA001 + self.content: str = "" self.commands: list = [] - self.sugggestion: str = '' + self.sugggestion: str = "" # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: - if single_line_cmd: - query = self._gen_shell_prompt(question) - else: - query = self._add_framework_extra_prompt(question) + def get_llm_result(self, question: str, *, single_line_cmd: bool = False) -> LLMService.LLMResult: + """获取 Shell 命令""" + query = self._gen_shell_prompt(question) if single_line_cmd else self.__add_extra_prompt(question) if prompt_framework_keyword_install in question.lower(): - query = self._add_framework_software_install_prompt(query) + query = self.__add_software_install_prompt(query) self._query_llm_service(query) if self.commands: - return self.commands - return self._extract_shell_code_blocks(self.content) + return LLMService.LLMResult(cmds=self.commands, code=None) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.content), + code=self._extract_python_code_blocks(self.content), + ) - def explain_shell_command(self, cmd: str): + def explain_shell_command(self, cmd: str) -> None: + """解释 Shell 命令""" query = self._gen_explain_cmd_prompt(cmd) self._query_llm_service(query, show_suggestion=False) def update_session_id(self) -> bool: - headers = self._get_headers() + """更新会话 ID""" + headers = self.__get_headers() try: response = requests.post( - urljoin(self.endpoint, 'api/client/session'), - json={'session_id': self.session_id} if self.session_id else {}, + urljoin(self.endpoint, "api/client/session"), + json={"session_id": self.session_id} if self.session_id else {}, headers=headers, - timeout=30 + timeout=30, ) except requests.exceptions.RequestException: self.console.print(backend_framework_request_exceptions.format(brand_name=BRAND_NAME)) return False - if response.status_code == 401: + if response.status_code == HTTP_AUTH_ERROR: self.console.print(backend_framework_auth_invalid_api_key.format(brand_name=BRAND_NAME)) return False - if response.status_code != 200: + if response.status_code != HTTP_OK: self.console.print(backend_general_request_failed.format(code=response.status_code)) return False - self.session_id = response.json().get('result', {}).get('session_id', '') + self.session_id = response.json().get("result", {}).get("session_id", "") return True def create_new_conversation(self) -> bool: - headers = self._get_headers() + """新建对话""" + headers = self.__get_headers() try: response = requests.post( - urljoin(self.endpoint, 'api/client/conversation'), + urljoin(self.endpoint, "api/client/conversation"), headers=headers, - timeout=30 + timeout=30, ) except requests.exceptions.RequestException: self.console.print(backend_framework_request_exceptions.format(brand_name=BRAND_NAME)) return False - if response.status_code == 401: + if response.status_code == HTTP_AUTH_ERROR: self.console.print(backend_framework_auth_invalid_api_key.format(brand_name=BRAND_NAME)) return False - if response.status_code != 200: + if response.status_code != HTTP_OK: self.console.print(backend_general_request_failed.format(code=response.status_code)) return False - self.conversation_id = response.json().get('result', {}).get('conversation_id', '') + self.conversation_id = response.json().get("result", {}).get("conversation_id", "") return True def get_plugins(self) -> list: - headers = self._get_headers() + """获取插件列表""" + headers = self.__get_headers() try: response = requests.get( - urljoin(self.endpoint, 'api/client/plugin'), + urljoin(self.endpoint, "api/client/plugin"), headers=headers, - timeout=30 + timeout=30, ) except requests.exceptions.RequestException: self.console.print(backend_framework_request_exceptions.format(brand_name=BRAND_NAME)) return [] - if response.status_code == 401: + if response.status_code == HTTP_AUTH_ERROR: self.console.print(backend_framework_auth_invalid_api_key.format(brand_name=BRAND_NAME)) return [] - if response.status_code != 200: + if response.status_code != HTTP_OK: self.console.print(backend_general_request_failed.format(code=response.status_code)) return [] - self.session_id = self._reset_session_from_cookie(response.headers.get('set-cookie', '')) - plugins = response.json().get('result', []) + self.session_id = self.__reset_session_from_cookie(response.headers.get("set-cookie", "")) + plugins = response.json().get("result", []) if plugins: self.plugins = [PluginData(**plugin) for plugin in plugins] return self.plugins - def flow(self, question: str, plugins: list) -> list: + def plugin(self, question: str, plugins: list) -> LLMService.LLMResult: + """智能插件""" self._query_llm_service(question, user_selected_plugins=plugins) if self.commands: - return self.commands - return self._extract_shell_code_blocks(self.content) + return LLMService.LLMResult(cmds=self.commands, code=None) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.content), + code=self._extract_python_code_blocks(self.content), + ) + + def diagnose(self, question: str) -> LLMService.LLMResult: + """智能诊断 - def diagnose(self, question: str) -> list: - # 确保用户输入的问题中包含有效的IP地址,若没有,则诊断本机 - if not self._contains_valid_ip(question): - local_ip = self._get_local_ip() + 确保用户输入的问题中包含有效的IP地址,若没有,则诊断本机 + """ + if not self.__contains_valid_ip(question): + local_ip = self.__get_local_ip() if local_ip: - question = f'{prompt_framework_plugin_ip} {local_ip},' + question - self._query_llm_service(question, user_selected_plugins=['euler-copilot-rca']) + question = f"{prompt_framework_plugin_ip} {local_ip}," + question + self._query_llm_service(question, user_selected_plugins=["euler-copilot-rca"]) if self.commands: - return self.commands - return self._extract_shell_code_blocks(self.content) + return LLMService.LLMResult(cmds=self.commands, code=None) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.content), + code=self._extract_python_code_blocks(self.content), + ) + + def tuning(self, question: str) -> LLMService.LLMResult: + """智能调优 - def tuning(self, question: str) -> list: - # 确保用户输入的问题中包含有效的IP地址,若没有,则调优本机 - if not self._contains_valid_ip(question): - local_ip = self._get_local_ip() + 确保用户输入的问题中包含有效的IP地址,若没有,则调优本机 + """ + if not self.__contains_valid_ip(question): + local_ip = self.__get_local_ip() if local_ip: - question = f'{prompt_framework_plugin_ip} {local_ip},' + question - self._query_llm_service(question, user_selected_plugins=['euler-copilot-tune']) + question = f"{prompt_framework_plugin_ip} {local_ip}," + question + self._query_llm_service(question, user_selected_plugins=["euler-copilot-tune"]) if self.commands: - return self.commands - return self._extract_shell_code_blocks(self.content) + return LLMService.LLMResult(cmds=self.commands, code=None) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.content), + code=self._extract_python_code_blocks(self.content), + ) - def stop(self): - headers = self._get_headers() + def stop(self) -> None: + """停止回答""" + headers = self.__get_headers() try: response = requests.post( - urljoin(self.endpoint, 'api/client/stop'), + urljoin(self.endpoint, "api/client/stop"), headers=headers, - timeout=30 + timeout=30, ) except requests.exceptions.RequestException: return - if response.status_code == 200: + if response.status_code == HTTP_OK: self.console.print(backend_framework_stream_stop.format(brand_name=BRAND_NAME)) - # pylint: disable=W0221 def _query_llm_service( self, question: str, user_selected_plugins: Optional[list] = None, - show_suggestion: bool = True - ): + *, + show_suggestion: bool = True, + ) -> None: + """Query LLM Service""" if not user_selected_plugins: user_selected_plugins = [] - headers = self._get_headers() + headers = self.__get_headers() self.update_session_id() data = { - 'session_id': self.session_id, - 'question': question, - 'language': 'zh', - 'conversation_id': self.conversation_id, - 'user_selected_plugins': user_selected_plugins + "session_id": self.session_id, + "question": question, + "language": "zh", + "conversation_id": self.conversation_id, + "user_selected_plugins": user_selected_plugins, } - self._stream_response(headers, data, show_suggestion) + self.__stream_response(headers, data, show_suggestion=show_suggestion) - def _stream_response(self, headers, data, show_suggestion: bool = True): - self._clear_previous_data() - spinner = Spinner('material') + def __stream_response(self, headers: dict, data: dict, *, show_suggestion: bool = True) -> None: + """流式响应输出""" + self.__clear_previous_data() + spinner = Spinner("material") with Live(console=self.console) as live: live.update(spinner, refresh=True) try: response = requests.post( - urljoin(self.endpoint, 'api/client/chat'), + urljoin(self.endpoint, "api/client/chat"), headers=headers, json=data, stream=True, - timeout=300 + timeout=300, ) except requests.exceptions.ConnectionError: - live.update( - backend_framework_request_connection_error.format(brand_name=BRAND_NAME), refresh=True) + live.update(backend_framework_request_connection_error.format(brand_name=BRAND_NAME), refresh=True) return except requests.exceptions.Timeout: - live.update( - backend_framework_request_timeout.format(brand_name=BRAND_NAME), refresh=True) + live.update(backend_framework_request_timeout.format(brand_name=BRAND_NAME), refresh=True) return except requests.exceptions.RequestException: - live.update( - backend_framework_request_exceptions.format(brand_name=BRAND_NAME), refresh=True) + live.update(backend_framework_request_exceptions.format(brand_name=BRAND_NAME), refresh=True) return - if response.status_code != 200: + if response.status_code != HTTP_OK: msg = FRAMEWORK_LLM_STREAM_BAD_REQUEST_MSG.get( response.status_code, - backend_general_request_failed.format(code=response.status_code) + backend_general_request_failed.format(code=response.status_code), ) live.update(msg, refresh=True) return - self.session_id = self._reset_session_from_cookie(response.headers.get('set-cookie', '')) + self.session_id = self.__reset_session_from_cookie(response.headers.get("set-cookie", "")) try: - self._handle_response_stream(live, response, show_suggestion) + self.__handle_response_stream(live, response, show_suggestion=show_suggestion) except requests.exceptions.ChunkedEncodingError: live.update(backend_framework_response_ended_prematurely, refresh=True) - def _clear_previous_data(self): - self.content = '' + def __clear_previous_data(self) -> None: + """清空上一次的数据""" + self.content = "" self.commands = [] - self.sugggestion = '' + self.sugggestion = "" - def _handle_response_stream( + def __handle_response_stream( self, live: Live, response: requests.Response, - show_suggestion: bool - ): + *, + show_suggestion: bool, + ) -> None: + """处理流式响应""" for line in response.iter_lines(): if line is None: continue - content = line.decode('utf-8').strip('data: ') + content = line.decode("utf-8").removeprefix("data: ") try: jcontent = json.loads(content) except json.JSONDecodeError: - if content == '': - continue - if content == '[ERROR]': - if not self.content: - MarkdownRenderer.update( - live, - backend_framework_stream_error.format(brand_name=BRAND_NAME) - ) - elif content == '[SENSITIVE]': - MarkdownRenderer.update(live, backend_framework_stream_sensitive) - self.content = '' - elif content != '[DONE]': - if not self.debug_mode: - continue - MarkdownRenderer.update( - live, - backend_framework_stream_unknown.format( - brand_name=BRAND_NAME, - content=content - ) - ) - break + if self.__break_on_json_error(content, live): + break else: - self._handle_json_chunk(jcontent, live, show_suggestion) + self.__handle_json_chunk(jcontent, live, show_suggestion=show_suggestion) + + def __break_on_json_error(self, content: str, live: Live) -> bool: + """处理 JSON 错误""" + if content == "": + return False + if content == "[ERROR]": + if not self.content: + MarkdownRenderer.update( + live, + backend_framework_stream_error.format(brand_name=BRAND_NAME), + ) + return True + if content == "[SENSITIVE]": + MarkdownRenderer.update(live, backend_framework_stream_sensitive) + self.content = "" + return True + if content != "[DONE]": + if not self.debug_mode: + return False + MarkdownRenderer.update( + live, + backend_framework_stream_unknown.format( + brand_name=BRAND_NAME, + content=content, + ), + ) + return True + return True - def _handle_json_chunk(self, jcontent, live: Live, show_suggestion: bool): - chunk = jcontent.get('content', '') + def __handle_json_chunk(self, jcontent: dict, live: Live, *, show_suggestion: bool) -> None: + """处理 JSON 数据块""" + chunk = jcontent.get("content", "") self.content += chunk - # 获取推荐问题 + self.__process_suggestions(jcontent, show_suggestion=show_suggestion) + self.__process_plugin_data(jcontent) + self.__update_render(live) + + def __process_suggestions(self, jcontent: dict, *, show_suggestion: bool) -> None: + """处理推荐问题""" if show_suggestion: - suggestions = jcontent.get('search_suggestions', []) + suggestions = jcontent.get("search_suggestions", []) if suggestions: - suggested_plugin = suggestions[0].get('name', '') - suggested_question = suggestions[0].get('question', '') + suggested_plugin = suggestions[0].get("name", "") + suggested_question = suggestions[0].get("question", "") if suggested_plugin and suggested_question: - self.sugggestion = f'**{suggested_plugin}** {suggested_question}' + self.sugggestion = f"**{suggested_plugin}** {suggested_question}" elif suggested_question: self.sugggestion = suggested_question - # 获取插件返回数据 - plugin_tool_type = jcontent.get('type', '') - if plugin_tool_type == 'extract': - data = jcontent.get('data', '') + + def __process_plugin_data(self, jcontent: dict) -> None: + """处理插件返回数据""" + plugin_tool_type = jcontent.get("type", "") + if plugin_tool_type == "extract": + data = jcontent.get("data", "") if data: if isinstance(data, str): try: @@ -306,18 +351,20 @@ class Framework(LLMService): except json.JSONDecodeError: return # 返回 Markdown 报告 - output = data.get('output', '') + output = data.get("output", "") if output: self.content = output # 返回单行 Shell 命令 - cmd = data.get('shell', '') + cmd = data.get("shell", "") if cmd: self.commands.append(cmd) # 返回 Shell 脚本 - script = data.get('script', '') + script = data.get("script", "") if script: self.commands.append(write_shell_script(script)) - # 刷新终端 + + def __update_render(self, live: Live) -> None: + """更新终端显示""" if not self.sugggestion: MarkdownRenderer.update(live, self.content) else: @@ -327,63 +374,64 @@ class Framework(LLMService): sugggestion=backend_framework_sugggestion.format(sugggestion=self.sugggestion), ) - def _get_headers(self) -> dict: - host = self.endpoint.strip('http://').strip('https://').strip('/') + def __get_headers(self) -> dict: + """生成请求头""" + host = self.endpoint.removeprefix("http://").removeprefix("https://").strip("/") headers = { - 'Host': host, - 'Accept': '*/*', - 'Content-Type': 'application/json; charset=UTF-8', - 'Connection': 'keep-alive', - 'Authorization': f'Bearer {self.api_key}' + "Host": host, + "Accept": "*/*", + "Content-Type": "application/json; charset=UTF-8", + "Connection": "keep-alive", + "Authorization": f"Bearer {self.api_key}", } if self.session_id: - headers['Cookie'] = f'ECSESSION={self.session_id};' + headers["Cookie"] = f"ECSESSION={self.session_id};" return headers - def _reset_session_from_cookie(self, cookie: str) -> str: + def __reset_session_from_cookie(self, cookie: str) -> str: if not cookie: - return '' - for item in cookie.split(';'): - item = item.strip() - if item.startswith('ECSESSION'): - return item.split('=')[1] - return '' - - def _contains_valid_ip(self, text: str) -> bool: + return "" + for cookie_item in cookie.split(";"): + item = cookie_item.strip() + if item.startswith("ECSESSION"): + return item.split("=")[1] + return "" + + def __contains_valid_ip(self, text: str) -> bool: ip_pattern = re.compile( - r'(? str: + def __get_local_ip(self) -> str: try: - process = subprocess.run( - ['hostname', '-I'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True) + process = subprocess.run( # noqa: S603 + ["hostname", "-I"], # noqa: S607 + capture_output=True, + check=True, + ) except (FileNotFoundError, subprocess.CalledProcessError): try: ip_list = socket.gethostbyname_ex(socket.gethostname())[2] except socket.gaierror: - return '' + return "" return ip_list[-1] if process.stdout: - ip_address = process.stdout.decode('utf-8').strip().split(' ', maxsplit=1)[0] - return ip_address - return '' + return process.stdout.decode("utf-8").strip().split(" ", maxsplit=1)[0] + return "" - def _add_framework_extra_prompt(self, query: str) -> str: - return query + '\n\n' + prompt_framework_markdown_format + def __add_extra_prompt(self, query: str) -> str: + return query + "\n\n" + prompt_framework_markdown_format - def _add_framework_software_install_prompt(self, query: str) -> str: - return query + '\n\n' + prompt_framework_extra_install.format( - prompt_general_root=self._gen_sudo_prompt()) + def __add_software_install_prompt(self, query: str) -> str: + return query + "\n\n" + prompt_framework_extra_install.format(prompt_general_root=self._gen_sudo_prompt()) @dataclass class PluginData: + """插件数据结构""" + id: str plugin_name: str plugin_description: str diff --git a/src/copilot/backends/llm_service.py b/src/copilot/backends/llm_service.py index 275de8e..cc78452 100644 --- a/src/copilot/backends/llm_service.py +++ b/src/copilot/backends/llm_service.py @@ -1,7 +1,12 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +"""Module for LLM service abstract base class and related utilities. + +Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. +""" import re from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional from copilot.utilities.env_info import get_os_info, is_root from copilot.utilities.i18n import ( @@ -15,28 +20,45 @@ from copilot.utilities.i18n import ( class LLMService(ABC): + """LLM Service Abstract Base Class""" + + @dataclass + class LLMResult: + """LLM Result""" + + cmds: Optional[list] + code: Optional[str] + @abstractmethod - def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: - pass + def get_llm_result(self, question: str, *, single_line_cmd: bool = False) -> LLMResult: + """ "Get shell commands""" - def explain_shell_command(self, cmd: str): + def explain_shell_command(self, cmd: str) -> None: + """Explain shell command""" query = self._gen_explain_cmd_prompt(cmd) self._query_llm_service(query) @abstractmethod - def _query_llm_service(self, question: str, *args, **kwargs): - pass + def _query_llm_service(self, question: str, *args, **kwargs) -> None: # noqa: ANN002, ANN003 + """Query LLM service""" - def _extract_shell_code_blocks(self, markdown_text) -> list: - pattern = r'```(bash|sh|shell)\n(.*?)(?=\n\s*```)' + def _extract_shell_code_blocks(self, markdown_text: str) -> list: + """Extract shell code blocks from markdown text""" + pattern = r"```(bash|sh|shell)\n(.*?)(?=\n\s*```)" bash_blocks = re.findall(pattern, markdown_text, re.DOTALL | re.MULTILINE) - cmds = list(dict.fromkeys('\n'.join([block[1].strip() for block in bash_blocks]).splitlines())) - return [cmd for cmd in cmds if cmd and not cmd.startswith('#')] # remove comments and empty lines + cmds = list(dict.fromkeys("\n".join([block[1].strip() for block in bash_blocks]).splitlines())) + return [cmd for cmd in cmds if cmd and not cmd.startswith("#")] # remove comments and empty lines + + def _extract_python_code_blocks(self, markdown_text: str) -> str: + """Extract Python code blocks from markdown text""" + pattern = r"```python\n(.*?)(?=\n\s*```)" + python_blocks = re.findall(pattern, markdown_text, re.DOTALL | re.MULTILINE) + return "\n\n".join(python_blocks) def _get_context_length(self, context: list) -> int: length = 0 for content in context: - temp = content['content'] + temp = content["content"] leng = len(temp) length += leng return length @@ -47,11 +69,10 @@ class LLMService(ABC): return prompt_general_root_false def _gen_system_prompt(self) -> str: - return prompt_general_system.format( - os=get_os_info(), prompt_general_root=self._gen_sudo_prompt()) + return prompt_general_system.format(os=get_os_info(), prompt_general_root=self._gen_sudo_prompt()) def _gen_shell_prompt(self, question: str) -> str: - return f'{question}\n\n{prompt_single_line_cmd}' + return f"{question}\n\n{prompt_single_line_cmd}" def _gen_chat_prompt(self, question: str) -> str: return prompt_general_chat.format(question=question, os=get_os_info()) diff --git a/src/copilot/backends/openai_api.py b/src/copilot/backends/openai_api.py index ff5417f..774dce2 100644 --- a/src/copilot/backends/openai_api.py +++ b/src/copilot/backends/openai_api.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import json from typing import Optional @@ -17,97 +17,114 @@ from copilot.utilities.i18n import ( ) from copilot.utilities.markdown_renderer import MarkdownRenderer +MAX_HISTORY_LENGTH = 5 +HTTP_STATUS_OK = 200 + class ChatOpenAI(LLMService): - def __init__(self, url: str, api_key: Optional[str], model: Optional[str], max_tokens = 2048): + """OpenAI Chat Service""" + + def __init__(self, url: str, api_key: Optional[str], model: Optional[str], max_tokens: int = 2048) -> None: + """Initialize OpenAI Chat Service""" self.url: str = url self.api_key: Optional[str] = api_key self.model: Optional[str] = model self.max_tokens: int = max_tokens - self.answer: str = '' + self.answer: str = "" self.history: list = [] # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: - if single_line_cmd: - query = self._gen_shell_prompt(question) - else: - query = self._gen_chat_prompt(question) + def get_llm_result(self, question: str, *, single_line_cmd: bool = False) -> LLMService.LLMResult: + """Get shell commands""" + query = self._gen_shell_prompt(question) if single_line_cmd else self._gen_chat_prompt(question) self._query_llm_service(query) - return self._extract_shell_code_blocks(self.answer) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.answer), code=self._extract_python_code_blocks(self.answer) + ) - # pylint: disable=W0221 - def _query_llm_service(self, question: str): - self._stream_response(question) + def _query_llm_service(self, question: str) -> None: + """Query LLM Service""" + self.__stream_response(question) - def _check_len(self, context: list) -> list: + def __check_len(self, context: list) -> list: while self._get_context_length(context) > self.max_tokens / 2: del context[0] return context - def _gen_params(self, query: str, stream: bool = True): - self.history.append({'content': query, 'role': 'user'}) - history = self._check_len( - self.history if len(self.history) < 5 else self.history[-5:] + def __gen_params(self, query: str, *, stream: bool = True) -> dict: + """Generate parameters""" + self.history.append({"content": query, "role": "user"}) + history = self.__check_len( + self.history if len(self.history) < MAX_HISTORY_LENGTH else self.history[-MAX_HISTORY_LENGTH:], ) - history.insert(0, {'content': self._gen_system_prompt(), 'role': 'system'}) + history.insert(0, {"content": self._gen_system_prompt(), "role": "system"}) return { - 'messages': history, - 'model': self.model, - 'stream': stream, - 'max_tokens': self.max_tokens, - 'temperature': 0.1, - 'top_p': 0.95 + "messages": history, + "model": self.model, + "stream": stream, + "max_tokens": self.max_tokens, + "temperature": 0.1, + "top_p": 0.95, } - def _gen_headers(self): + def __gen_headers(self) -> dict: + """Generate headers""" return { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {self.api_key}' + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}", } - def _stream_response(self, query: str): - spinner = Spinner('material') - self.answer = '' + def __stream_response(self, query: str) -> None: + spinner = Spinner("material") + self.answer = "" with Live(console=self.console) as live: live.update(spinner, refresh=True) - try: - response = requests.post( - self.url, - headers=self._gen_headers(), - data=json.dumps(self._gen_params(query)), - stream=True, - timeout=60 - ) - except requests.exceptions.ConnectionError: - live.update(backend_openai_request_connection_error, refresh=True) - return - except requests.exceptions.Timeout: - live.update(backend_openai_request_timeout, refresh=True) + response = self.__make_request(query, live) + if not response: return - except requests.exceptions.RequestException: - live.update(backend_openai_request_exceptions, refresh=True) - return - if response.status_code != 200: - live.update(backend_general_request_failed.format(code=response.status_code), refresh=True) - return - for line in response.iter_lines(): - if line is None: - continue - content = line.decode('utf-8').strip('data: ') - try: - jcontent = json.loads(content) - except json.JSONDecodeError: - continue - else: - choices = jcontent.get('choices', []) - if choices: - delta = choices[0].get('delta', {}) - chunk = delta.get('content', '') - finish_reason = choices[0].get('finish_reason') - self.answer += chunk - MarkdownRenderer.update(live, self.answer) - if finish_reason == 'stop': - self.history.append({'content': self.answer, 'role': 'assistant'}) - break + self.__process_response(response, live) + + def __make_request(self, query: str, live: Live) -> Optional[requests.Response]: + try: + response = requests.post( + self.url, + headers=self.__gen_headers(), + data=json.dumps(self.__gen_params(query)), + stream=True, + timeout=60, + ) + except requests.exceptions.ConnectionError: + live.update(backend_openai_request_connection_error, refresh=True) + return None + except requests.exceptions.Timeout: + live.update(backend_openai_request_timeout, refresh=True) + return None + except requests.exceptions.RequestException: + live.update(backend_openai_request_exceptions, refresh=True) + return None + if response.status_code != HTTP_STATUS_OK: + live.update(backend_general_request_failed.format(code=response.status_code), refresh=True) + return None + return response + + def __process_response(self, response: requests.Response, live: Live) -> None: + for line in response.iter_lines(): + if line is None: + continue + content = line.decode("utf-8").removeprefix("data: ") + try: + jcontent = json.loads(content) + except json.JSONDecodeError: + continue + else: + choices = jcontent.get("choices", []) + if choices: + delta = choices[0].get("delta", {}) + chunk = delta.get("content", "") + finish_reason = choices[0].get("finish_reason") + self.answer += chunk + MarkdownRenderer.update(live, self.answer) + if finish_reason == "stop": + self.history.append({"content": self.answer, "role": "assistant"}) + break diff --git a/src/copilot/backends/spark_api.py b/src/copilot/backends/spark_api.py index 312e882..3f058a8 100644 --- a/src/copilot/backends/spark_api.py +++ b/src/copilot/backends/spark_api.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import asyncio import base64 @@ -28,10 +28,16 @@ from copilot.utilities.i18n import ( from copilot.utilities.markdown_renderer import MarkdownRenderer -# pylint: disable=R0902 class Spark(LLMService): - # pylint: disable=R0913 - def __init__(self, app_id, api_key, api_secret, spark_url, domain, max_tokens=4096): + def __init__( # noqa: PLR0913 + self, + app_id: str, + api_key: str, + api_secret: str, + spark_url: str, + domain: str, + max_tokens: int = 4096, + ) -> None: self.app_id: str = app_id self.api_key: str = api_key self.api_secret: str = api_secret @@ -40,29 +46,28 @@ class Spark(LLMService): self.path = urlparse(spark_url).path self.domain: str = domain self.max_tokens: int = max_tokens - self.answer: str = '' + self.answer: str = "" self.history: list = [] # 富文本显示 self.console = Console() - def get_shell_commands(self, question: str, single_line_cmd: bool = False) -> list: - if single_line_cmd: - query = self._gen_shell_prompt(question) - else: - query = self._gen_chat_prompt(question) + def get_llm_result(self, question: str, *, single_line_cmd: bool = False) -> LLMService.LLMResult: + query = self._gen_shell_prompt(question) if single_line_cmd else self._gen_chat_prompt(question) self._query_llm_service(query) - return self._extract_shell_code_blocks(self.answer) + return LLMService.LLMResult( + cmds=self._extract_shell_code_blocks(self.answer), code=self._extract_python_code_blocks(self.answer) + ) # pylint: disable=W0221 def _query_llm_service(self, question: str): asyncio.get_event_loop().run_until_complete( - self._query_spark_ai(question) + self._query_spark_ai(question), ) async def _query_spark_ai(self, query: str): url = self._create_url() - self.answer = '' - spinner = Spinner('material') + self.answer = "" + spinner = Spinner("material") with Live(console=self.console) as live: live.update(spinner, refresh=True) try: @@ -74,65 +79,69 @@ class Spark(LLMService): try: message = await websocket.recv() data = json.loads(message) - code = data['header']['code'] + code = data["header"]["code"] if code != 0: - message = data['header']['message'] - live.update(backend_spark_stream_error.format( - code=code, - message=message - ), refresh=True) + message = data["header"]["message"] + live.update( + backend_spark_stream_error.format( + code=code, + message=message, + ), + refresh=True, + ) await websocket.close() else: - choices = data['payload']['choices'] - status = choices['status'] - content = choices['text'][0]['content'] + choices = data["payload"]["choices"] + status = choices["status"] + content = choices["text"][0]["content"] self.answer += content MarkdownRenderer.update(live, self.answer) if status == 2: - self.history.append({'role': 'assistant', 'content': self.answer}) + self.history.append({"role": "assistant", "content": self.answer}) break except websockets.exceptions.ConnectionClosed: break - except websockets.exceptions.InvalidStatusCode: + except websockets.exceptions.InvalidStatus: live.update( - Text.from_ansi(f'\033[1;31m{backend_spark_websockets_exceptions_msg_title}\033[0m\n\n')\ - .append(backend_spark_websockets_exceptions_msg_a)\ - .append(backend_spark_websockets_exceptions_msg_b)\ - .append(backend_spark_websockets_exceptions_msg_c.format(spark_url=self.spark_url)), - refresh=True + Text.from_ansi(f"\033[1;31m{backend_spark_websockets_exceptions_msg_title}\033[0m\n\n") + .append(backend_spark_websockets_exceptions_msg_a) + .append(backend_spark_websockets_exceptions_msg_b) + .append(backend_spark_websockets_exceptions_msg_c.format(spark_url=self.spark_url)), + refresh=True, ) - except Exception: # pylint: disable=W0718 + except Exception: live.update(backend_spark_network_error) - def _create_url(self): + def _create_url(self) -> str: now = datetime.now() # 生成RFC1123格式的时间戳 date = format_date_time(mktime(now.timetuple())) - signature_origin = f'host: {self.host}\ndate: {date}\nGET {self.path} HTTP/1.1' + signature_origin = f"host: {self.host}\ndate: {date}\nGET {self.path} HTTP/1.1" # 进行hmac-sha256进行加密 - signature_sha = hmac.new(self.api_secret.encode('utf-8'), - signature_origin.encode('utf-8'), - digestmod=hashlib.sha256).digest() + signature_sha = hmac.new( + self.api_secret.encode("utf-8"), signature_origin.encode("utf-8"), digestmod=hashlib.sha256 + ).digest() - signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8') + signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding="utf-8") - authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", ' + \ - f'headers="host date request-line", signature="{signature_sha_base64}"' + authorization_origin = ( + f'api_key="{self.api_key}", algorithm="hmac-sha256", ' + + f'headers="host date request-line", signature="{signature_sha_base64}"' + ) - authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') + authorization = base64.b64encode(authorization_origin.encode("utf-8")).decode(encoding="utf-8") # 将请求的鉴权参数组合为字典 v = { - 'authorization': authorization, - 'date': date, - 'host': self.host + "authorization": authorization, + "date": date, + "host": self.host, } # 拼接鉴权参数,生成url - url = self.spark_url + '?' + urlencode(v) + return self.spark_url + "?" + urlencode(v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 - return url def _check_len(self, context: list) -> list: while self._get_context_length(context) > self.max_tokens / 2: @@ -140,32 +149,29 @@ class Spark(LLMService): return context def _gen_params(self, query: str): - ''' - 通过appid和用户的提问来生成请参数 - ''' - self.history.append({'role': 'user', 'content': query}) + """通过appid和用户的提问来生成请参数""" + self.history.append({"role": "user", "content": query}) history = self._check_len( - self.history if len(self.history) < 5 else self.history[-5:] + self.history if len(self.history) < 5 else self.history[-5:], ) - if self.domain == 'generalv3.5': - history.insert(0, {'role': 'system', 'content': self._gen_system_prompt()}) - data = { - 'header': { - 'app_id': self.app_id, - 'uid': '1234', + if self.domain == "generalv3.5": + history.insert(0, {"role": "system", "content": self._gen_system_prompt()}) + return { + "header": { + "app_id": self.app_id, + "uid": "1234", + }, + "parameter": { + "chat": { + "domain": self.domain, + "temperature": 0.5, + "max_tokens": self.max_tokens, + "auditing": "default", + }, }, - 'parameter': { - 'chat': { - 'domain': self.domain, - 'temperature': 0.5, - 'max_tokens': self.max_tokens, - 'auditing': 'default', - } + "payload": { + "message": { + "text": history, + }, }, - 'payload': { - 'message': { - 'text': history - } - } } - return data diff --git a/src/copilot/utilities/__init__.py b/src/copilot/utilities/__init__.py new file mode 100644 index 0000000..4f5adbb --- /dev/null +++ b/src/copilot/utilities/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. diff --git a/src/copilot/utilities/config_manager.py b/src/copilot/utilities/config_manager.py index d6acaeb..876c744 100644 --- a/src/copilot/utilities/config_manager.py +++ b/src/copilot/utilities/config_manager.py @@ -1,119 +1,132 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import json -import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any from copilot.utilities import i18n, interact -CONFIG_DIR = os.path.join(os.path.expanduser('~'), '.config/eulercopilot') -CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.json') - CONFIG_ENTRY_NAME = { - 'backend': i18n.settings_config_entry_backend, - 'query_mode': i18n.settings_config_entry_query_mode, - 'advanced_mode': i18n.settings_config_entry_advanced_mode, - 'debug_mode': i18n.settings_config_entry_debug_mode, - 'spark_app_id': i18n.settings_config_entry_spark_app_id, - 'spark_api_key': i18n.settings_config_entry_spark_api_key, - 'spark_api_secret': i18n.settings_config_entry_spark_api_secret, - 'spark_url': i18n.settings_config_entry_spark_url, - 'spark_domain': i18n.settings_config_entry_spark_domain, - 'framework_url': i18n.settings_config_entry_framework_url.format(brand_name=i18n.BRAND_NAME), - 'framework_api_key': i18n.settings_config_entry_framework_api_key.format(brand_name=i18n.BRAND_NAME), - 'model_url': i18n.settings_config_entry_model_url, - 'model_api_key': i18n.settings_config_entry_model_api_key, - 'model_name': i18n.settings_config_entry_model_name + "backend": i18n.config_entry_backend, + "query_mode": i18n.config_entry_query_mode, + "advanced_mode": i18n.config_entry_advanced_mode, + "debug_mode": i18n.config_entry_debug_mode, + "spark_app_id": i18n.config_entry_spark_app_id, + "spark_api_key": i18n.config_entry_spark_api_key, + "spark_api_secret": i18n.config_entry_spark_api_secret, + "spark_url": i18n.config_entry_spark_url, + "spark_domain": i18n.config_entry_spark_domain, + "framework_url": i18n.config_entry_framework_url.format(brand_name=i18n.BRAND_NAME), + "framework_api_key": i18n.config_entry_framework_api_key.format(brand_name=i18n.BRAND_NAME), + "model_url": i18n.config_entry_model_url, + "model_api_key": i18n.config_entry_model_api_key, + "model_name": i18n.config_entry_model_name, } BACKEND_NAME = { - 'framework': i18n.interact_backend_framework.format(brand_name=i18n.BRAND_NAME), - 'spark': i18n.interact_backend_spark, - 'openai': i18n.interact_backend_openai + "framework": i18n.interact_backend_framework.format(brand_name=i18n.BRAND_NAME), + "spark": i18n.interact_backend_spark, + "openai": i18n.interact_backend_openai, } QUERY_MODE_NAME = { - 'chat': i18n.query_mode_chat, - 'shell': i18n.query_mode_shell, - 'flow': i18n.query_mode_flow, - 'diagnose': i18n.query_mode_diagnose, - 'tuning': i18n.query_mode_tuning, -} - -DEFAULT_CONFIG = { - 'backend': 'framework', - 'query_mode': 'chat', - 'advanced_mode': False, - 'debug_mode': False, - 'spark_app_id': '', - 'spark_api_key': '', - 'spark_api_secret': '', - 'spark_url': 'wss://spark-api.xf-yun.com/v3.5/chat', - 'spark_domain': 'generalv3.5', - 'framework_url': 'https://eulercopilot.gitee.com', - 'framework_api_key': '', - 'model_url': '', - 'model_api_key': '', - 'model_name': '' + "chat": i18n.query_mode_chat, + "shell": i18n.query_mode_shell, + "plugin": i18n.query_mode_plugin, + "diagnose": i18n.query_mode_diagnose, + "tuning": i18n.query_mode_tuning, } -def load_config() -> dict: - try: - with open(CONFIG_PATH, 'r', encoding='utf-8') as file: - config = json.load(file) - except FileNotFoundError: - init_config() - config = load_config() - return config - - -def write_config(config: dict): - with open(CONFIG_PATH, 'w', encoding='utf-8') as json_file: - json.dump(config, json_file, indent=4) - json_file.write('\n') # 追加一行空行 - - -def init_config(): - if not os.path.exists(CONFIG_DIR): - os.makedirs(CONFIG_DIR) - write_config(DEFAULT_CONFIG) - - -def update_config(key: str, value): - if key not in DEFAULT_CONFIG: - return - config = load_config() - config.update({key: value}) - write_config(config) - - -def select_query_mode(mode: int): - modes = list(QUERY_MODE_NAME.keys()) - if mode < len(modes): - update_config('query_mode', modes[mode]) - - -def select_backend(): - backend = interact.select_backend() - if backend in ['framework', 'spark', 'openai']: - update_config('backend', backend) - - -def config_to_markdown() -> str: - config = load_config() - config_table = '\n'.join([ - f'| {CONFIG_ENTRY_NAME.get(key)} | {__get_config_item_display_name(key, value)} |' - for key, value in config.items() - ]) - return f'# {i18n.settings_markdown_title}\n\ -| {i18n.settings_markdown_header_key} \ -| {i18n.settings_markdown_header_value} |\n\ -| ----------- | ----------- |\n{config_table}' - - -def __get_config_item_display_name(key, value): - if key == 'backend': - return BACKEND_NAME.get(value, value) - if key == 'query_mode': - return QUERY_MODE_NAME.get(value, value) - return value +@dataclass +class ConfigModel: + """配置模型""" + + backend: str = field(default="framework") + query_mode: str = field(default="chat") + advanced_mode: bool = field(default=True) + debug_mode: bool = field(default=False) + spark_app_id: str = field(default="") + spark_api_key: str = field(default="") + spark_api_secret: str = field(default="") + spark_url: str = field(default="wss://spark-api.xf-yun.com/v3.5/chat") + spark_domain: str = field(default="generalv3.5") + framework_url: str = field(default="https://www.eulercopilot.com (CHANGE_ME!!!)") + framework_api_key: str = field(default="") + model_url: str = field(default="") + model_api_key: str = field(default="") + model_name: str = field(default="") + + def to_dict(self) -> dict: + """将ConfigModel对象转换为字典""" + return self.__dict__ + + @staticmethod + def from_dict(data: dict) -> "ConfigModel": + """从字典创建ConfigModel对象""" + return ConfigModel(**data) + + @staticmethod + def metadata_dict() -> dict: + """获取配置项的 field 元数据""" + return {field.name: field.metadata for field in ConfigModel.__dataclass_fields__.values()} + + +class Config: + """配置管理器""" + + config_dir = Path.home() / ".config" / "eulercopilot" + config_path = config_dir / "config.json" + + data: ConfigModel + + def __init__(self) -> None: + """初始化配置""" + try: + with self.config_path.open(encoding="utf-8") as file: + config_data = json.load(file) + self.data = ConfigModel.from_dict(config_data) + except FileNotFoundError: + self.data = ConfigModel() + if not self.config_dir.exists(): + self.config_dir.mkdir(parents=True) + self.__save() + + def update(self, key: str, value: Any) -> None: + """更新配置""" + setattr(self.data, key, value) + self.__save() + + def select_query_mode(self, mode: int) -> None: + """选择问答模式""" + modes = list(QUERY_MODE_NAME.keys()) + if mode < len(modes): + self.update("query_mode", modes[mode]) + + def select_backend(self) -> None: + """选择后端""" + backend = interact.select_backend() + if backend in ["framework", "spark", "openai"]: + self.update("backend", backend) + + def to_markdown(self) -> str: + """以 Markdown 格式输出当前配置""" + config_items = self.data.to_dict() + config_table = "\n".join( + f"| {CONFIG_ENTRY_NAME.get(key)} | {self.__get_config_item_display_name(key, value)} |" + for key, value in config_items.items() + ) + return f"# {i18n.config_md_title}\n\n| {i18n.config_md_header_key} | {i18n.config_md_header_value} |\n| ---- | ---- |\n{config_table}" + + def __get_config_item_display_name(self, key: str, value: str) -> str: + if key == "backend": + return BACKEND_NAME.get(value, value) + if key == "query_mode": + return QUERY_MODE_NAME.get(value, value) + return value + + def __save(self) -> None: + """写入配置""" + with self.config_path.open("w", encoding="utf-8") as json_file: + json.dump(self.data.to_dict(), json_file, indent=4) diff --git a/src/copilot/utilities/env_info.py b/src/copilot/utilities/env_info.py index c86a025..72fb2e6 100644 --- a/src/copilot/utilities/env_info.py +++ b/src/copilot/utilities/env_info.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import os import platform @@ -10,13 +10,7 @@ from typing import Optional def _exec_shell_cmd(cmd: list) -> Optional[subprocess.CompletedProcess]: try: - process = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - check=True - ) + process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) except subprocess.CalledProcessError as e: sys.stderr.write(e.stderr) return None @@ -32,7 +26,7 @@ def _porc_linux_info(shell_result: Optional[subprocess.CompletedProcess]): match = re.search(pattern, shell_result.stdout) if match: return match.group(1) # 返回括号内匹配的内容,即PRETTY_NAME的值 - return 'Unknown Linux distribution' + return "Unknown Linux distribution" def _porc_macos_info(shell_result: Optional[subprocess.CompletedProcess]): @@ -41,21 +35,21 @@ def _porc_macos_info(shell_result: Optional[subprocess.CompletedProcess]): if shell_result.returncode == 0: lines = shell_result.stdout.splitlines() for line in lines: - key, value = line.split(':\t\t', maxsplit=1) + key, value = line.split(":\t\t", maxsplit=1) macos_info[key.strip()] = value.strip() - product_name = macos_info.get('ProductName') - product_version = macos_info.get('ProductVersion') + product_name = macos_info.get("ProductName") + product_version = macos_info.get("ProductVersion") if product_name is not None and product_version is not None: - return f'{product_name} {product_version}' - return 'Unknown macOS version' + return f"{product_name} {product_version}" + return "Unknown macOS version" def get_os_info() -> str: system = platform.system() - if system == 'Linux': - return _porc_linux_info(_exec_shell_cmd(['cat', '/etc/os-release'])) - elif system == 'Darwin': - return _porc_macos_info(_exec_shell_cmd(['sw_vers'])) + if system == "Linux": + return _porc_linux_info(_exec_shell_cmd(["cat", "/etc/os-release"])) + elif system == "Darwin": + return _porc_macos_info(_exec_shell_cmd(["sw_vers"])) else: return system diff --git a/src/copilot/utilities/i18n.py b/src/copilot/utilities/i18n.py index 3caed8b..a4f0bea 100644 --- a/src/copilot/utilities/i18n.py +++ b/src/copilot/utilities/i18n.py @@ -1,119 +1,128 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. from gettext import gettext as _ -BRAND_NAME = 'openEuler Copilot System' -DOCS_URL = _('https://gitee.com/openeuler/euler-copilot-framework/blob/master/docs/user-guide/README.md') +BRAND_NAME = "EulerCopilot" +DOCS_URL = _("https://gitee.com/openeuler/euler-copilot-framework/blob/master/docs/user-guide/README.md") main_exit_prompt = _('输入 "exit" 或按下 Ctrl+C 结束对话') -main_service_is_none = _('未正确配置 LLM 后端,请检查配置文件') -main_service_framework_plugin_is_none = _('获取插件失败或插件列表为空\n请联系管理员检查后端配置') +main_service_is_none = _("未正确配置 LLM 后端,请检查配置文件") +main_service_framework_plugin_is_none = _("获取插件失败或插件列表为空\n请联系管理员检查后端配置") main_exec_builtin_cmd = _('不支持执行 Shell 内置命令 "{cmd_prefix}",请复制后手动执行') -main_exec_value_error = _('执行命令时出错:{error}') -main_exec_not_found_error = _('命令不存在:{error}') +main_exec_value_error = _("执行命令时出错:{error}") +main_exec_not_found_error = _("命令不存在:{error}") main_exec_cmd_failed_with_exit_code = _('命令 "{cmd}" 执行中止,退出码:{exit_code}') -main_content_panel_alpha_warning = _('当前为内测版本,请仔细甄别 AI 回答的内容') - -cli_help_prompt_intro = _('输入问题后,按下 Ctrl+O 提问 (字母 O)') -cli_help_prompt_question = _('通过自然语言提问') -cli_help_prompt_switch_mode = _('切换到{mode}模式') -cli_help_prompt_init_settings = _('初始化 copilot 设置') -cli_help_prompt_edit_settings = _('编辑 copilot 设置') -cli_help_prompt_select_backend = _('选择大语言模型后端') -cli_help_panel_switch_mode = _('选择问答模式') -cli_help_panel_advanced_options = _('高级选项') -cli_notif_select_one_mode = _('当前版本只能选择一种问答模式') -cli_notif_compatibility = _('当前大模型后端不支持{mode}功能\n推荐使用 {brand_name} 智能体框架') +main_content_panel_alpha_warning = _("当前为内测版本,请仔细甄别 AI 回答的内容") + +cli_help_prompt_intro = _("输入问题后,按下 Ctrl+O 提问 (字母 O)") +cli_help_prompt_question = _("通过自然语言提问") +cli_help_prompt_switch_mode = _("切换到{mode}模式") +cli_help_prompt_init_settings = _("初始化 copilot 设置") +cli_help_prompt_edit_settings = _("编辑 copilot 设置") +cli_help_prompt_select_backend = _("选择大语言模型后端") +cli_help_panel_switch_mode = _("选择问答模式") +cli_help_panel_advanced_options = _("高级选项") +cli_notif_select_one_mode = _("当前版本只能选择一种问答模式") +cli_notif_compatibility = _("当前大模型后端不支持{mode}功能\n推荐使用 {brand_name} 智能体框架") cli_notif_no_config = _('请先初始化 copilot 设置\n请使用 "copilot --init" 命令初始化') -interact_action_explain = _('解释命令') -interact_action_edit = _('编辑命令') -interact_action_execute = _('执行命令') -interact_action_explain_selected = _('解释指定命令') -interact_action_edit_selected = _('编辑指定命令') -interact_action_execute_selected = _('执行指定命令') -interact_action_execute_all = _('执行所有命令') -interact_backend_framework = _('{brand_name} 智能体') -interact_backend_spark = _('讯飞星火大模型') -interact_backend_openai = _('OpenAI 兼容模式') -interact_cancel = _('取消') - -interact_question_yes_or_no = _('是否{question_body}:') -interact_question_input_text = _('请输入{question_body}:') -interact_question_select_action = _('选择要执行的操作:') -interact_question_select_cmd = _('选择命令:') -interact_question_select_settings_entry = _('选择设置项:') -interact_question_select_backend = _('请选择大模型后端:') -interact_question_select_query_mode = _('请选择问答模式:') -interact_question_select_plugin = _('请选择插件:') -interact_select_plugins_valiidate = _('请选择至少一个插件') - -backend_general_request_failed = _('请求失败: {code}') +interact_action_explain = _("解释命令") +interact_action_edit = _("编辑命令") +interact_action_execute = _("执行命令") +interact_action_explain_selected = _("解释指定命令") +interact_action_edit_selected = _("编辑指定命令") +interact_action_execute_selected = _("执行指定命令") +interact_action_execute_all = _("执行所有命令") +interact_backend_framework = _("{brand_name} 智能体") +interact_backend_spark = _("讯飞星火大模型") +interact_backend_openai = _("OpenAI 兼容模式") +interact_cancel = _("取消") + +interact_question_yes_or_no = _("是否{question_body}:") +interact_question_input_text = _("请输入{question_body}:") +interact_question_save_python_code = _("是否保存 Python 代码?") +interact_question_input_file_name = _("请输入文件名:") +interact_question_select_action = _("选择要执行的操作:") +interact_question_select_cmd = _("选择命令:") +interact_question_select_settings_entry = _("选择设置项:") +interact_question_select_backend = _("请选择大模型后端:") +interact_question_select_query_mode = _("请选择问答模式:") +interact_question_select_plugin = _("请选择插件:") +interact_select_plugins_valiidate = _("请选择至少一个插件") + +backend_general_request_failed = _("请求失败: {code}") backend_check_config_msg = _('输入 "vi ~/.config/eulercopilot/config.json" 查看和编辑配置') -backend_framework_auth_invalid_api_key = _('{brand_name} 智能体 API 密钥无效,请检查配置文件\n\n'+ backend_check_config_msg + '\n') -backend_framework_request_connection_error = _('{brand_name} 智能体连接失败,请检查网络连接') -backend_framework_request_timeout = _('{brand_name} 智能体请求超时,请检查网络连接') -backend_framework_request_exceptions = _('{brand_name} 智能体请求异常,请检查网络连接\n\n' + backend_check_config_msg + '\n') -backend_framework_request_unauthorized = _('当前会话已过期,请退出后重试') -backend_framework_request_too_many_requests = _('请求过于频繁,请稍后再试') -backend_framework_response_ended_prematurely = _('响应异常中止,请检查网络连接') -backend_framework_stream_error = _('{brand_name} 智能体遇到错误,请联系管理员定位问题') -backend_framework_stream_unknown = _('{brand_name} 智能体返回了未知内容:\n```json\n{content}\n```') -backend_framework_stream_sensitive = _('检测到违规信息,请重新提问') -backend_framework_stream_stop = _('{brand_name} 智能体已停止生成内容') -backend_framework_sugggestion = _('**你可以继续问** {sugggestion}') -backend_spark_stream_error = _('请求错误: {code}\n{message}') -backend_spark_websockets_exceptions_msg_title = _('请求错误') -backend_spark_websockets_exceptions_msg_a = _('请检查 appid 和 api_key 是否正确,或检查网络连接是否正常。\n') -backend_spark_websockets_exceptions_msg_b = _(backend_check_config_msg + ';\n') -backend_spark_websockets_exceptions_msg_c = _('或尝试 ping {spark_url}') -backend_spark_network_error = _('访问大模型失败,请检查网络连接') -backend_openai_request_connection_error = _('连接大模型失败') -backend_openai_request_timeout = _('请求大模型超时') -backend_openai_request_exceptions = _('请求大模型异常') - -settings_markdown_title = _('当前配置') -settings_markdown_header_key = _('设置项') -settings_markdown_header_value = _('值') -settings_config_entry_backend = _('大模型后端') -settings_config_entry_query_mode = _('问答模式') -settings_config_entry_advanced_mode = _('启用高级模式') -settings_config_entry_debug_mode = _('启用调试模式') -settings_config_entry_spark_app_id = _('星火大模型 App ID') -settings_config_entry_spark_api_key = _('星火大模型 API Key') -settings_config_entry_spark_api_secret = _('星火大模型 API Secret') -settings_config_entry_spark_url = _('星火大模型 URL') -settings_config_entry_spark_domain = _('星火大模型领域') -settings_config_entry_framework_url = _('{brand_name} 智能体 URL') -settings_config_entry_framework_api_key = _('{brand_name} 智能体 API Key') -settings_config_entry_model_url = _('OpenAI 模型 URL') -settings_config_entry_model_api_key = _('OpenAI 模型 API Key') -settings_config_entry_model_name = _('OpenAI 模型名称') -settings_config_interact_query_mode_disabled_explain = _('当前后端无法使用{mode}模式') -settings_init_welcome_msg = _('欢迎使用 {brand_name} 智能体') -settings_init_welcome_usage_guide = _('使用方法:输入问题,按下 Ctrl+O (字母 O) 提问') -settings_init_welcome_help_hint = _('更多用法详见命令行帮助:"copilot --help"') -settings_init_welcome_docs_link = _('使用指南:{url}') -settings_init_welcome_alpha_warning = _('{brand_name}(内测版)旨在让内测用户提前体验 \ +backend_framework_auth_invalid_api_key = _( + "{brand_name} 智能体 API 密钥无效,请检查配置文件\n\n" + backend_check_config_msg + "\n" +) +backend_framework_request_connection_error = _("{brand_name} 智能体连接失败,请检查网络连接") +backend_framework_request_timeout = _("{brand_name} 智能体请求超时,请检查网络连接") +backend_framework_request_exceptions = _( + "{brand_name} 智能体请求异常,请检查网络连接\n\n" + backend_check_config_msg + "\n" +) +backend_framework_request_unauthorized = _("当前会话已过期,请退出后重试") +backend_framework_request_too_many_requests = _("请求过于频繁,请稍后再试") +backend_framework_response_ended_prematurely = _("响应异常中止,请检查网络连接") +backend_framework_stream_error = _("{brand_name} 智能体遇到错误,请联系管理员定位问题") +backend_framework_stream_unknown = _("{brand_name} 智能体返回了未知内容:\n```json\n{content}\n```") +backend_framework_stream_sensitive = _("检测到违规信息,请重新提问") +backend_framework_stream_stop = _("{brand_name} 智能体已停止生成内容") +backend_framework_sugggestion = _("**你可以继续问** {sugggestion}") +backend_spark_stream_error = _("请求错误: {code}\n{message}") +backend_spark_websockets_exceptions_msg_title = _("请求错误") +backend_spark_websockets_exceptions_msg_a = _("请检查 appid 和 api_key 是否正确,或检查网络连接是否正常。\n") +backend_spark_websockets_exceptions_msg_b = _(backend_check_config_msg + ";\n") +backend_spark_websockets_exceptions_msg_c = _("或尝试 ping {spark_url}") +backend_spark_network_error = _("访问大模型失败,请检查网络连接") +backend_openai_request_connection_error = _("连接大模型失败") +backend_openai_request_timeout = _("请求大模型超时") +backend_openai_request_exceptions = _("请求大模型异常") + +config_md_title = _("当前配置") +config_md_header_key = _("设置项") +config_md_header_value = _("值") +config_entry_backend = _("大模型后端") +config_entry_query_mode = _("问答模式") +config_entry_advanced_mode = _("启用高级模式") +config_entry_debug_mode = _("启用调试模式") +config_entry_spark_app_id = _("星火大模型 App ID") +config_entry_spark_api_key = _("星火大模型 API Key") +config_entry_spark_api_secret = _("星火大模型 API Secret") +config_entry_spark_url = _("星火大模型 URL") +config_entry_spark_domain = _("星火大模型领域") +config_entry_framework_url = _("{brand_name} 智能体 URL") +config_entry_framework_api_key = _("{brand_name} 智能体 API Key") +config_entry_model_url = _("OpenAI 模型 URL") +config_entry_model_api_key = _("OpenAI 模型 API Key") +config_entry_model_name = _("OpenAI 模型名称") +config_interact_query_mode_forbidden = _("当前后端无法使用{mode}模式") + +init_welcome_msg = _("欢迎使用 {brand_name} 智能体") +init_welcome_usage_guide = _("使用方法:输入问题,按下 Ctrl+O (字母 O) 提问") +init_welcome_help_hint = _('更多用法详见命令行帮助:"copilot --help"') +init_welcome_docs_link = _("使用指南:{url}") +init_welcome_alpha_warning = _( + "{brand_name}(内测版)旨在让内测用户提前体验 \ openEuler 的智能化能力,帮助发现和修复版本质量、可用性及易用性问题,共同将版本做得更加完善。\ 如果您发现任何问题(包括软件设计、软件功能、不合适的问答对等),欢迎您反馈您的宝贵意见!\n\n\ [bold]本服务仅限于内测用户学习研究、内部测试目的使用[/bold]。您不得将本服务用于生产环境或任何其他商业目的,\ 否则您自行承担由此造成的所有后果和责任。\n\n\ 内测期间,除正常反馈问题外,应遵守内测用户保密规则:禁止在任何地方传播包括但不限于系统界面、\ 功能点等参与内测得知的有关本服务的各种非公开信息。\n\n\ -[bold]以上规则需严格遵守,如有违反,我们有权撤销您的内测资格,情节严重造成恶劣影响或损失者,我们将保留追究其责任的权利。[/bold]') -settings_init_framework_api_key_notice_title = _('获取 {brand_name} 智能体 API Key') -settings_init_framework_api_key_notice_content = _('请前往 {url},点击右上角头像图标获取 API Key') +[bold]以上规则需严格遵守,如有违反,我们有权撤销您的内测资格,情节严重造成恶劣影响或损失者,我们将保留追究其责任的权利。[/bold]" +) +init_framework_api_key_notice_title = _("获取 {brand_name} 智能体 API Key") +init_framework_api_key_notice_content = _("请前往 {url},点击右上角头像图标获取 API Key") -query_mode_chat = _('智能问答') -query_mode_shell = _('智能 Shell') -query_mode_flow = _('智能插件') -query_mode_diagnose = _('智能诊断') -query_mode_tuning = _('智能调优') +query_mode_chat = _("智能问答") +query_mode_shell = _("智能 Shell") +query_mode_plugin = _("智能插件") +query_mode_diagnose = _("智能诊断") +query_mode_tuning = _("智能调优") prompt_general_root_true = _('当前用户为 root 用户,你生成的 shell 命令不能包含 "sudo"') prompt_general_root_false = _('当前用户为普通用户,若你生成的 shell 命令需要 root 权限,需要包含 "sudo"') -prompt_general_system = _('''你是操作系统 {os} 的运维助理,你精通当前操作系统的管理和运维,熟悉运维脚本的编写。 +prompt_general_system = _("""你是操作系统 {os} 的运维助理,你精通当前操作系统的管理和运维,熟悉运维脚本的编写。 你给出的答案必须符合当前操作系统要求,你不能使用当前操作系统没有的功能。 格式要求: @@ -143,8 +152,8 @@ Debian 和 Ubuntu 使用 apt 管理软件包,你也不能在回答中使用 dn 你不能输出类似于上述例子的命令! 由于用户使用命令行与你交互,你需要避免长篇大论,请使用简洁的语言,一般情况下你的回答不应超过1000字。 -''') -prompt_general_chat = _('''根据用户输入的问题,使用 Markdown 格式输出。 +""") +prompt_general_chat = _("""根据用户输入的问题,使用 Markdown 格式输出。 用户的问题: {question} @@ -154,26 +163,26 @@ prompt_general_chat = _('''根据用户输入的问题,使用 Markdown 格式 2. 如果涉及 shell 命令或代码,请用 Markdown 代码块输出,必须标明代码的语言 3. 如果用户要求你生成的命令涉及到数据输入,你需要正确处理数据输入的方式,包括用户交互 4. 当前操作系统是 {os},你的回答必须符合当前系统要求,不能使用当前系统没有的功能 -''') -prompt_general_explain_cmd = _('''```bash +""") +prompt_general_explain_cmd = _("""```bash {cmd} ``` 请解释上面的 Shell 命令 要求: 先在代码块中打印一次上述命令,再有条理地解释命令中的主要步骤 -''') -prompt_single_line_cmd = _('''要求: +""") +prompt_single_line_cmd = _("""要求: + 请用单行 Shell 命令回答; + 命令请放在代码块中,并标明代码的语言。 -''') -prompt_framework_markdown_format = _('''格式要求: +""") +prompt_framework_markdown_format = _("""格式要求: + 你的回答中的代码块和表格都必须用 Markdown 呈现; + 你需要用中文回答问题,除了代码,其他内容都要符合汉语的规范。 -''') -prompt_framework_extra_install = _('''其他要求: +""") +prompt_framework_extra_install = _("""其他要求: + openEuler 使用 dnf 管理软件包,你不能在回答中使用 apt 或其他软件包管理器 + {prompt_general_root} -''') -prompt_framework_keyword_install = _('安装') -prompt_framework_plugin_ip = _('当前机器的IP为') +""") +prompt_framework_keyword_install = _("安装") +prompt_framework_plugin_ip = _("当前机器的IP为") diff --git a/src/copilot/utilities/interact.py b/src/copilot/utilities/interact.py index 705af4e..cbb6564 100644 --- a/src/copilot/utilities/interact.py +++ b/src/copilot/utilities/interact.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. from typing import Optional @@ -10,168 +10,177 @@ from copilot.utilities import config_manager, i18n ACTIONS_SINGLE_CMD = [ questionary.Choice( i18n.interact_action_explain, - value='explain', - shortcut_key='a' + value="explain", + shortcut_key="a", ), questionary.Choice( i18n.interact_action_edit, - value='edit', - shortcut_key='z' + value="edit", + shortcut_key="z", ), questionary.Choice( i18n.interact_action_execute, - value='execute', - shortcut_key='x' + value="execute", + shortcut_key="x", ), questionary.Choice( i18n.interact_cancel, - value='cancel', - shortcut_key='c' - ) + value="cancel", + shortcut_key="c", + ), ] ACTIONS_MULTI_CMDS = [ questionary.Choice( i18n.interact_action_explain_selected, - value='explain', - shortcut_key='a' + value="explain", + shortcut_key="a", ), questionary.Choice( i18n.interact_action_edit_selected, - value='edit', - shortcut_key='z' + value="edit", + shortcut_key="z", ), questionary.Choice( i18n.interact_action_execute_all, - value='execute_all', - shortcut_key='x' + value="execute_all", + shortcut_key="x", ), questionary.Choice( i18n.interact_action_execute_selected, - value='execute_selected', - shortcut_key='s' + value="execute_selected", + shortcut_key="s", ), questionary.Choice( i18n.interact_cancel, - value='cancel', - shortcut_key='c' - ) + value="cancel", + shortcut_key="c", + ), ] BACKEND_CHOICES = [ questionary.Choice( i18n.interact_backend_framework.format(brand_name=i18n.BRAND_NAME), - value='framework', - shortcut_key='e' + value="framework", + shortcut_key="e", ), questionary.Choice( i18n.interact_backend_spark, - value='spark', - shortcut_key='s' + value="spark", + shortcut_key="s", ), questionary.Choice( i18n.interact_backend_openai, - value='openai', - shortcut_key='o' + value="openai", + shortcut_key="o", ), questionary.Choice( i18n.interact_cancel, - value='cancel', - shortcut_key='c' - ) + value="cancel", + shortcut_key="c", + ), ] CUSTOM_STYLE_FANCY = questionary.Style( [ - ('separator', 'fg:#00afff'), - ('qmark', 'fg:#005f87 bold'), - ('question', 'bold'), - ('selected', 'fg:#00afff bold'), - ('pointer', 'fg:#005f87 bold'), - ('highlighted', 'bold'), - ('answer', 'fg:#00afff bold'), - ('text', 'fg:#808080'), - ('disabled', 'fg:#808080 italic'), - ] + ("separator", "fg:#00afff"), + ("qmark", "fg:#005f87 bold"), + ("question", "bold"), + ("selected", "fg:#00afff bold"), + ("pointer", "fg:#005f87 bold"), + ("highlighted", "bold"), + ("answer", "fg:#00afff bold"), + ("text", "fg:#808080"), + ("disabled", "fg:#808080 italic"), + ], ) def select_backend() -> str: + """命令行交互:选择后端""" return questionary.select( i18n.interact_question_select_backend, choices=BACKEND_CHOICES, - qmark='❯', + qmark="❯", use_shortcuts=True, style=CUSTOM_STYLE_FANCY, ).ask() def select_action(has_multi_cmds: bool) -> str: + """命令行交互:选择操作""" return questionary.select( i18n.interact_question_select_action, choices=ACTIONS_MULTI_CMDS if has_multi_cmds else ACTIONS_SINGLE_CMD, - qmark='❯', + qmark="❯", use_shortcuts=True, - style=CUSTOM_STYLE_FANCY + style=CUSTOM_STYLE_FANCY, ).ask() def select_command(commands: list) -> str: + """命令行交互:选择命令""" return questionary.select( i18n.interact_question_select_cmd, choices=commands, - qmark='❯', - style=CUSTOM_STYLE_FANCY + qmark="❯", + style=CUSTOM_STYLE_FANCY, ).ask() def select_command_with_index(commands: list) -> int: + """命令行交互:选择命令并返回索引""" command = questionary.select( i18n.interact_question_select_cmd, choices=commands, - qmark='❯', - style=CUSTOM_STYLE_FANCY + qmark="❯", + style=CUSTOM_STYLE_FANCY, ).ask() return commands.index(command) def select_multiple_commands(commands: list) -> list: + """命令行交互:选择多个命令""" return questionary.checkbox( i18n.interact_question_select_cmd, choices=commands, - qmark='❯', - style=CUSTOM_STYLE_FANCY + qmark="❯", + style=CUSTOM_STYLE_FANCY, ).ask() def select_one_plugin(plugins: list[PluginData]) -> str: + """命令行交互:选择一个插件""" return questionary.select( i18n.interact_question_select_plugin, choices=__get_plugin_choices(plugins), - qmark='❯', - style=CUSTOM_STYLE_FANCY + qmark="❯", + style=CUSTOM_STYLE_FANCY, ).ask() def select_settings_entry() -> str: + """命令行交互:选择设置条目""" return questionary.select( i18n.interact_question_select_settings_entry, choices=__get_settings_entry_choices(), - qmark='❯', + qmark="❯", style=CUSTOM_STYLE_FANCY, ).ask() def select_query_mode(backend: str) -> str: + """命令行交互:选择查询模式""" return questionary.select( i18n.interact_question_select_query_mode, choices=__get_query_mode_choices(backend), - qmark='❯', + qmark="❯", style=CUSTOM_STYLE_FANCY, ).ask() def ask_boolean(question: str) -> bool: + """命令行交互:提问布尔型问题""" return questionary.confirm(question, default=False, style=CUSTOM_STYLE_FANCY).ask() @@ -179,22 +188,23 @@ def __get_plugin_choices(plugins: list[PluginData]) -> list: return [ questionary.Choice( plugin.plugin_name, - value=plugin.id - ) for plugin in plugins + value=plugin.id, + ) + for plugin in plugins ] def __get_settings_entry_choices() -> list: choices = [questionary.Choice(name, item) for item, name in config_manager.CONFIG_ENTRY_NAME.items()] - choices.append(questionary.Choice(i18n.interact_cancel, value='cancel')) + choices.append(questionary.Choice(i18n.interact_cancel, value="cancel")) return choices def __get_query_mode_choices(backend: str) -> list: def __disabled(name: str, item: str) -> Optional[str]: return ( - i18n.settings_config_interact_query_mode_disabled_explain.format(mode=name) - if backend != 'framework' and item != 'chat' + i18n.config_interact_query_mode_forbidden.format(mode=name) + if backend != "framework" and item != "chat" else None ) @@ -202,6 +212,7 @@ def __get_query_mode_choices(backend: str) -> list: questionary.Choice( name, item, - disabled=__disabled(name, item) - ) for item, name in config_manager.QUERY_MODE_NAME.items() + disabled=__disabled(name, item), + ) + for item, name in config_manager.QUERY_MODE_NAME.items() ] diff --git a/src/copilot/utilities/markdown_renderer.py b/src/copilot/utilities/markdown_renderer.py index 22e2f69..c0921af 100644 --- a/src/copilot/utilities/markdown_renderer.py +++ b/src/copilot/utilities/markdown_renderer.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. from rich.console import Group from rich.live import Live @@ -9,20 +9,22 @@ from copilot.utilities.i18n import main_content_panel_alpha_warning class MarkdownRenderer: + """Markdown 渲染器""" @staticmethod - def update(live: Live, content: str, sugggestion: str = '', refresh: bool = True): + def update(live: Live, content: str, sugggestion: str = "", *, refresh: bool = True) -> None: + """更新 Markdown 内容""" content_panel = Panel( - Markdown(content, code_theme='github-dark'), - border_style='gray50', + Markdown(content, code_theme="github-dark"), + border_style="gray50", subtitle=main_content_panel_alpha_warning, - subtitle_align='right' + subtitle_align="right", ) if not sugggestion: live.update(content_panel, refresh=refresh) return - sugggestion_panel = Panel(Markdown(sugggestion, code_theme='github-dark'), border_style='gray50') + sugggestion_panel = Panel(Markdown(sugggestion, code_theme="github-dark"), border_style="gray50") live.update( Group(content_panel, sugggestion_panel), - refresh=refresh + refresh=refresh, ) diff --git a/src/copilot/utilities/shell_script.py b/src/copilot/utilities/shell_script.py index 61f6deb..4feae73 100644 --- a/src/copilot/utilities/shell_script.py +++ b/src/copilot/utilities/shell_script.py @@ -1,15 +1,15 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import os import uuid def write_shell_script(content: str) -> str: - '''将脚本内容写进 sh 文件中,并返回执行命令''' - script_name = f'plugin_gen_script_{str(uuid.uuid4())[:8]}.sh' - script_path = os.path.join(os.path.expanduser('~'), '.eulercopilot', 'scripts', script_name) + """将脚本内容写进 sh 文件中,并返回执行命令""" + script_name = f"plugin_gen_script_{str(uuid.uuid4())[:8]}.sh" + script_path = os.path.join(os.path.expanduser("~"), ".eulercopilot", "scripts", script_name) os.makedirs(os.path.dirname(script_path), exist_ok=True) - with open(script_path, 'w', encoding='utf-8') as script_file: + with open(script_path, "w", encoding="utf-8") as script_file: script_file.write(content) os.chmod(script_path, 0o700) - return f'bash {script_path}' + return f"bash {script_path}" diff --git a/src/eulercopilot.sh b/src/eulercopilot.sh index 178ae03..6996e47 100644 --- a/src/eulercopilot.sh +++ b/src/eulercopilot.sh @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. read_query_mode() { if [ ! -f ~/.config/eulercopilot/config.json ]; then @@ -7,12 +7,12 @@ read_query_mode() { local query_mode query_mode=$(jq '.query_mode' ~/.config/eulercopilot/config.json) - + if [ "$query_mode" = "\"chat\"" ]; then echo "智能问答" elif [ "$query_mode" = "\"shell\"" ]; then echo "智能 SHELL" - elif [ "$query_mode" = "\"flow\"" ]; then + elif [ "$query_mode" = "\"plugin\"" ]; then echo "智能插件" elif [ "$query_mode" = "\"diagnose\"" ]; then echo "智能诊断" @@ -92,10 +92,10 @@ run_copilot() { READLINE_LINE="" if [[ "$PS1" == *"\[\033[1;33m"* ]]; then revert_prompt - printf "\033[1;31m已关闭 openEuler Copilot System 提示符\033[0m\n" + printf "\033[1;31m已关闭 EulerCopilot 提示符\033[0m\n" else set_prompt - printf "\033[1;32m已开启 openEuler Copilot System 提示符\033[0m\n" + printf "\033[1;32m已开启 EulerCopilot 提示符\033[0m\n" fi fi } diff --git a/src/setup.py b/src/setup.py index ab26b8a..afb91ef 100644 --- a/src/setup.py +++ b/src/setup.py @@ -1,4 +1,4 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import os @@ -9,65 +9,59 @@ from setuptools.extension import Extension def add_py_files(module_name): - return [ - os.path.join(module_name, f) - for f in os.listdir(module_name) - if f.endswith('.py') - ] + return [os.path.join(module_name, f) for f in os.listdir(module_name) if f.endswith(".py")] # 定义编译选项 cython_compile_options = { - 'language_level': '3', - 'annotate': False, # 生成 HTML 注解文件 - 'compiler_directives': {}, + "language_level": "3", + "annotate": False, # 生成 HTML 注解文件 + "compiler_directives": {}, } # 定义 Cython 编译规则 cython_files = [] -cython_files += add_py_files('copilot/app') -cython_files += add_py_files('copilot/backends') -cython_files += add_py_files('copilot/utilities') +cython_files += add_py_files("copilot/app") +cython_files += add_py_files("copilot/backends") +cython_files += add_py_files("copilot/utilities") extensions = [Extension(f.replace("/", ".")[:-3], [f]) for f in cython_files] # 定义 setup() 参数 setup( - name='copilot', - version='1.2.1', - description='openEuler Copilot System Command Line Assistant', - author='Hongyu Shi', - author_email='shihongyu15@huawei.com', - url='https://gitee.com/openeuler/euler-copilot-shell', - py_modules=['copilot.__init__', 'copilot.__main__'], + name="copilot", + version="0.9.2", + description="EulerCopilot Command Line Assistant", + author="Hongyu Shi", + author_email="shihongyu15@huawei.com", + url="https://gitee.com/openeuler/euler-copilot-shell", + py_modules=["copilot.__init__", "copilot.__main__"], ext_modules=cythonize( extensions, - compiler_directives=cython_compile_options['compiler_directives'], - annotate=cython_compile_options['annotate'], - language_level=cython_compile_options['language_level'] + compiler_directives=cython_compile_options["compiler_directives"], + annotate=cython_compile_options["annotate"], + language_level=cython_compile_options["language_level"], ), - packages=['copilot'], - cmdclass={'build_ext': build_ext}, + packages=["copilot"], + cmdclass={"build_ext": build_ext}, include_package_data=True, zip_safe=False, classifiers=[ - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Programming Language :: Python :: 3.11', - 'Programming Language :: Python :: 3.12', - 'License :: OSI Approved :: Mulan Permissive Software License, Version 2', # 木兰许可证 v2 - 'Operating System :: POSIX :: Linux', - 'Operating System :: MacOS :: MacOS X' + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "License :: OSI Approved :: Mulan Permissive Software License, Version 2", # 木兰许可证 v2 + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS :: MacOS X", ], - python_requires='>=3.9', # Python 版本要求为 3.9 及以上 + python_requires=">=3.9", # Python 版本要求为 3.9 及以上 install_requires=[ # 添加项目依赖的库 - 'websockets', - 'requests', - 'rich', - 'typer', - 'questionary' + "websockets", + "requests", + "rich", + "typer", + "questionary", ], - entry_points={ - 'console_scripts': ['copilot=copilot.__main__:entry_point'] - } + entry_points={"console_scripts": ["copilot=copilot.__main__:entry_point"]}, ) -- Gitee