diff --git a/src/app/css/styles.tcss b/src/app/css/styles.tcss index 2e4f45e8df8544efc73886efce3222e85cdbfeba..3855d541bd83ca4bb6978952802a2de8cc469cdd 100644 --- a/src/app/css/styles.tcss +++ b/src/app/css/styles.tcss @@ -32,11 +32,14 @@ Static { /* 底部输入区域样式 */ #input-container { - height: 3; + height: auto; + min-height: 3; + max-height: 12; dock: bottom; padding: 0 1; border-top: solid #688efd; width: 100%; + overflow: auto; } /* 输入框样式 */ @@ -45,6 +48,8 @@ Static { color: #688efd; padding: 0 1; border: none; + height: 1; + max-height: 1; } /* 设置界面样式 */ @@ -235,23 +240,58 @@ Static { /* MCP 组件样式 */ #mcp-confirm, #mcp-parameter { height: auto; + min-height: 10; + width: 100%; padding: 1; border: solid #ff9800; background: #1a1a1a; } +/* MCP 模式下的输入容器 - 确保按钮可见 */ +#input-container.mcp-mode { + height: auto; + min-height: 10; + max-height: 15; +} + +/* 正常模式下的输入容器 */ +#input-container.normal-mode { + height: 3; + min-height: 3; + max-height: 3; +} + /* MCP 确认组件样式 */ -.confirm-title { - text-align: center; - text-style: bold; - color: #ff9800; +.confirm-info { + text-align: left; + color: #ffffff; + height: 1; + padding: 0 1; margin-bottom: 1; } +.confirm-reason { + text-align: left; + color: #cccccc; + height: 1; + max-height: 2; + padding: 0 1; + margin-bottom: 1; + text-style: italic; +} + .confirm-buttons { - height: auto; + height: 5; + min-height: 5; align: center middle; margin-top: 1; + dock: bottom; +} + +.help-text { + color: #888888; + text-style: italic; + width: auto; } .risk-low { @@ -267,20 +307,38 @@ Static { } /* MCP 参数组件样式 */ -.param-title { +.param-header { text-align: center; text-style: bold; color: #2196f3; margin-bottom: 1; + height: 1; + padding: 0 1; +} + +.param-tool { + text-align: left; + color: #4caf50; + height: 1; + padding: 0 1; + margin-bottom: 0; } .param-message { - color: #ffeb3b; + text-align: left; + color: #ffffff; + height: auto; + padding: 0 1; margin-bottom: 1; } +.param-input-compact { + width: 100%; + margin-bottom: 0; +} + .param-buttons { - height: auto; + height: 4; align: center middle; margin-top: 1; } @@ -289,4 +347,19 @@ Static { #mcp-confirm-yes, #mcp-confirm-no, #mcp-param-submit, #mcp-param-cancel { margin: 0 1; width: auto; + min-height: 3; + height: 3; +} + +/* 进度消息样式 */ +.progress-line { + background: rgba(76, 175, 80, 0.1); + border-left: solid #4caf50; + padding: 1; + margin: 1 0; +} + +/* 进度消息过渡动画效果 */ +.progress-line:focus { + background: rgba(76, 175, 80, 0.2); } diff --git a/src/app/mcp_widgets.py b/src/app/mcp_widgets.py index aa5b7561d7c083e732df57b538b17e5ffa958a14..c45bd1bbb4d18b5e9239bb046c7ec60a1da6dbbc 100644 --- a/src/app/mcp_widgets.py +++ b/src/app/mcp_widgets.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING from textual import on from textual.containers import Container, Horizontal, Vertical from textual.message import Message -from textual.widgets import Button, Input, Label, Static +from textual.widgets import Button, Input, Static if TYPE_CHECKING: from textual.app import ComposeResult @@ -29,6 +29,8 @@ class MCPConfirmWidget(Container): """初始化确认组件""" super().__init__(name=name, id=widget_id, classes=classes) self.event = event + # 设置为可聚焦,以便键盘导航 + self.can_focus = True def compose(self) -> ComposeResult: """构建确认界面""" @@ -37,23 +39,40 @@ class MCPConfirmWidget(Container): risk = content.get("risk", "unknown") reason = content.get("reason", "需要用户确认是否执行此工具") - # 风险级别文本 - risk_text = { - "low": "低风险", - "medium": "中等风险", - "high": "高风险", - }.get(risk, "未知风险") + # 风险级别文本和图标 + risk_info = { + "low": ("🟢", "低风险"), + "medium": ("🟡", "中等风险"), + "high": ("🔴", "高风险"), + }.get(risk, ("⚪", "未知风险")) + + risk_icon, risk_text = risk_info with Vertical(): - yield Static("⚠️ 工具执行确认", classes="confirm-title") - yield Static(f"工具名称: {step_name}") - yield Static(f"风险级别: {risk_text}", classes=f"risk-{risk}") - yield Static(f"原因: {reason}") - yield Static("") + # 紧凑的工具确认信息显示 + yield Static( + f"🔧 {step_name} {risk_icon} {risk_text}", + classes=f"confirm-info risk-{risk}", + markup=False, + ) + # 显示简化的说明文字,确保按钮可见 + if len(reason) > 30: + # 如果说明太长,显示更短的简化版本 + yield Static( + "💭 请确认执行", + classes="confirm-reason", + markup=False, + ) + else: + yield Static( + f"💭 {reason}", + classes="confirm-reason", + markup=False, + ) + # 确保按钮始终显示 with Horizontal(classes="confirm-buttons"): - yield Button("确认执行 (Y)", variant="success", id="mcp-confirm-yes") - yield Button("取消 (N)", variant="error", id="mcp-confirm-no") - yield Static("请选择: Y(确认) / N(取消)") + yield Button("✓ 确认", variant="success", id="mcp-confirm-yes") + yield Button("✗ 取消", variant="error", id="mcp-confirm-no") @on(Button.Pressed, "#mcp-confirm-yes") def confirm_execution(self) -> None: @@ -65,6 +84,56 @@ class MCPConfirmWidget(Container): """取消执行""" self.post_message(MCPConfirmResult(confirmed=False, task_id=self.event.get_task_id())) + def on_key(self, event) -> None: # noqa: ANN001 + """处理键盘事件""" + if event.key == "enter" or event.key == "y": + # Enter 或 Y 键确认 + self.confirm_execution() + event.prevent_default() + event.stop() + elif event.key == "escape" or event.key == "n": + # Escape 或 N 键取消 + self.cancel_execution() + event.prevent_default() + event.stop() + elif event.key == "tab": + # Tab 键在按钮间切换焦点 + try: + buttons = self.query("Button") + current_focus = self.app.focused + if current_focus is not None and current_focus in buttons: + current_index = list(buttons).index(current_focus) + next_index = (current_index + 1) % len(buttons) + buttons[next_index].focus() + else: + # 如果没有按钮聚焦,聚焦到第一个按钮 + if buttons: + buttons[0].focus() + event.prevent_default() + event.stop() + except (AttributeError, ValueError, IndexError): + pass + + def on_mount(self) -> None: + """组件挂载时自动聚焦""" + # 延迟聚焦,确保组件完全渲染 + self.set_timer(0.1, self._focus_first_button) + + def _focus_first_button(self) -> None: + """聚焦到第一个按钮""" + try: + buttons = self.query("Button") + if buttons: + buttons[0].focus() + # 确保组件本身也有焦点,以便键盘事件能正确处理 + self.focus() + except Exception: + # 如果聚焦失败,至少确保组件本身有焦点 + try: + self.focus() + except Exception: + pass + class MCPParameterWidget(Container): """MCP 工具参数输入组件""" @@ -81,6 +150,8 @@ class MCPParameterWidget(Container): super().__init__(name=name, id=widget_id, classes=classes) self.event = event self.param_inputs: dict[str, Input] = {} + # 设置为可聚焦,以便键盘导航 + self.can_focus = True def compose(self) -> ComposeResult: """构建参数输入界面""" @@ -90,34 +161,38 @@ class MCPParameterWidget(Container): params = content.get("params", {}) with Vertical(): - yield Static("📝 参数补充", classes="param-title") - yield Static(f"工具名称: {step_name}") - yield Static(message, classes="param-message") - yield Static("") - - # 为每个需要填写的参数创建输入框 + # 紧凑的参数输入标题 + yield Static("📝 参数输入", classes="param-header", markup=False) + yield Static(f"🔧 {step_name}", classes="param-tool", markup=False) + # 只在说明较短时显示 + if len(message) <= 30: + yield Static(f"💭 {message}", classes="param-message", markup=False) + + # 垂直布局的参数输入,更节省空间 for param_name, param_value in params.items(): if param_value is None or param_value == "": - yield Label(f"{param_name}:") param_input = Input( placeholder=f"请输入 {param_name}", id=f"param_{param_name}", + classes="param-input-compact", ) self.param_inputs[param_name] = param_input yield param_input - # 额外信息输入框 - yield Label("补充说明(可选):") - description_input = Input( - placeholder="请输入补充说明信息", - id="param_description", - ) - self.param_inputs["description"] = description_input - yield description_input - + # 简化的补充说明输入 + if params: # 只有在有其他参数时才显示补充说明 + description_input = Input( + placeholder="补充说明(可选)", + id="param_description", + classes="param-input-compact", + ) + self.param_inputs["description"] = description_input + yield description_input + + # 紧凑的按钮行 with Horizontal(classes="param-buttons"): - yield Button("提交", variant="success", id="mcp-param-submit") - yield Button("取消", variant="error", id="mcp-param-cancel") + yield Button("✓ 提交", variant="success", id="mcp-param-submit") + yield Button("✗ 取消", variant="error", id="mcp-param-cancel") @on(Button.Pressed, "#mcp-param-submit") def submit_parameters(self) -> None: diff --git a/src/app/tui.py b/src/app/tui.py index 29bb079e25d97164a67d62daf0bf2ca4c10f8f76..07956c5c5b4f5bf99c6b67cdde489033da031a4f 100644 --- a/src/app/tui.py +++ b/src/app/tui.py @@ -3,7 +3,8 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, ClassVar, NamedTuple +import re +from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple from rich.markdown import Markdown as RichMarkdown from textual import on @@ -16,6 +17,7 @@ from textual.widgets import Footer, Header, Input, Static from app.dialogs import AgentSelectionDialog, BackendRequiredDialog, ExitDialog from app.mcp_widgets import MCPConfirmResult, MCPConfirmWidget, MCPParameterResult, MCPParameterWidget from app.settings import SettingsScreen +from app.tui_mcp_handler import TUIMCPEventHandler from backend.factory import BackendFactory from backend.hermes import HermesChatClient from config import ConfigManager @@ -96,13 +98,13 @@ class OutputLine(Static): self.add_class("command-line") self.text_content = text - def update(self, content: VisualType = "") -> None: + def update(self, content: VisualType = "", *, layout: bool = False) -> None: """更新组件内容,确保禁用富文本标记解析""" # 如果是字符串,更新内部存储的文本内容 if isinstance(content, str): self.text_content = content # 调用父类方法进行实际更新 - super().update(content) + super().update(content, layout=layout) def get_content(self) -> str: """获取组件内容的纯文本表示""" @@ -151,6 +153,51 @@ class MarkdownOutputLine(Static): self.update_markdown(self.current_content) +class ProgressOutputLine(MarkdownOutputLine): + """可替换的进度输出行组件,用于 MCP 工具进度显示""" + + def __init__(self, markdown_content: str = "", *, step_id: str = "") -> None: + """初始化进度输出组件""" + super().__init__(markdown_content) + self.step_id = step_id + self.add_class("progress-line") + + def get_step_id(self) -> str: + """获取步骤ID""" + return self.step_id + + def update_markdown(self, markdown_content: str) -> None: + """更新Markdown内容""" + self.current_content = markdown_content + + # 使用rich的Markdown渲染器 + md = RichMarkdown( + markdown_content, + code_theme=self._get_code_theme(), + hyperlinks=True, + ) + + # 使用rich渲染后的内容更新组件 + super().update(md) + + def get_content(self) -> str: + """获取当前Markdown原始内容""" + return self.current_content + + def _get_code_theme(self) -> str: + """根据当前Textual主题获取适合的代码主题""" + return "material" if self.app.current_theme.dark else "xcode" + + def _on_mount(self, event: Mount) -> None: + """组件挂载时设置主题监听""" + super()._on_mount(event) + self.watch(self.app, "theme", self._retheme) + + def _retheme(self) -> None: + """主题变化时重新应用主题""" + self.update_markdown(self.current_content) + + class CommandInput(Input): """命令输入组件""" @@ -192,7 +239,7 @@ class IntelligentTerminal(App): """初始化应用""" super().__init__() # 设置应用标题 - self.title = "openEuler 智能 Shell" + self.title = "openEuler Intelligence" self.config_manager = ConfigManager() self.processing: bool = False # 添加保存任务的集合到类属性 @@ -206,12 +253,14 @@ class IntelligentTerminal(App): self._current_mcp_task_id: str = "" # 创建日志实例 self.logger = get_logger(__name__) + # 进度消息跟踪 + self._current_progress_lines: dict[str, ProgressOutputLine] = {} # step_id -> ProgressOutputLine def compose(self) -> ComposeResult: """构建界面""" yield Header(show_clock=True) yield FocusableContainer(id="output-container") - with Container(id="input-container"): + with Container(id="input-container", classes="normal-mode"): yield CommandInput() yield Footer() @@ -230,6 +279,8 @@ class IntelligentTerminal(App): # 清除屏幕上的所有内容 output_container = self.query_one("#output-container") output_container.remove_children() + # 清理进度消息跟踪 + self._current_progress_lines.clear() def action_choose_agent(self) -> None: """选择智能体的动作""" @@ -251,17 +302,39 @@ class IntelligentTerminal(App): """在命令输入框和文本区域之间切换焦点""" # 获取当前聚焦的组件 focused = self.focused - if isinstance(focused, CommandInput): - # 如果当前聚焦在命令输入框,则聚焦到输出容器 + + # 检查是否聚焦在输入组件(包括 MCP 组件) + is_input_focused = isinstance(focused, CommandInput) or ( + focused is not None and hasattr(focused, "id") and focused.id in ["mcp-confirm", "mcp-parameter"] + ) + + if is_input_focused: + # 如果当前聚焦在输入组件,则聚焦到输出容器 output_container = self.query_one("#output-container", FocusableContainer) output_container.focus() else: - # 否则聚焦到命令输入框 - self.query_one(CommandInput).focus() + # 否则聚焦到当前的输入组件 + self._focus_current_input_widget() def on_mount(self) -> None: """初始化完成时设置焦点和绑定""" - self.query_one(CommandInput).focus() + # 确保初始状态是正常模式 + self._mcp_mode = "normal" + self._current_mcp_task_id = "" + + # 清理任何可能的重复组件 + try: + # 移除任何可能的重复ID组件 + existing_widgets = self.query("#command-input") + if len(existing_widgets) > 1: + # 如果有多个相同ID的组件,移除多余的 + for widget in existing_widgets[1:]: + widget.remove() + except Exception: + # 忽略清理过程中的异常 + self.logger.exception("清理重复组件失败") + + self._focus_current_input_widget() def get_llm_client(self) -> LLMClientBase: """获取大模型客户端,使用单例模式维持对话历史""" @@ -270,8 +343,6 @@ class IntelligentTerminal(App): # 为 Hermes 客户端设置 MCP 事件处理器以支持 MCP 交互 if isinstance(self._llm_client, HermesChatClient): - from app.tui_mcp_handler import TUIMCPEventHandler - mcp_handler = TUIMCPEventHandler(self, self._llm_client) self._llm_client.set_mcp_handler(mcp_handler) @@ -338,19 +409,27 @@ class IntelligentTerminal(App): @on(MCPConfirmResult) def handle_mcp_confirm_result(self, message: MCPConfirmResult) -> None: """处理 MCP 确认结果""" - if message.task_id == self._current_mcp_task_id: + # 检查是否是当前任务且未在处理中 + if message.task_id == self._current_mcp_task_id and not self.processing: + self.processing = True # 设置处理标志,防止重复处理 + # 立即恢复正常输入界面 + self._restore_normal_input() # 发送 MCP 响应并处理结果 - task = asyncio.create_task(self._send_mcp_response(message.task_id, message.confirmed)) + task = asyncio.create_task(self._send_mcp_response(message.task_id, params=message.confirmed)) self.background_tasks.add(task) task.add_done_callback(self._task_done_callback) @on(MCPParameterResult) def handle_mcp_parameter_result(self, message: MCPParameterResult) -> None: """处理 MCP 参数结果""" - if message.task_id == self._current_mcp_task_id: + # 检查是否是当前任务且未在处理中 + if message.task_id == self._current_mcp_task_id and not self.processing: + self.processing = True # 设置处理标志,防止重复处理 + # 立即恢复正常输入界面 + self._restore_normal_input() # 发送 MCP 响应并处理结果 params = message.params if message.params is not None else False - task = asyncio.create_task(self._send_mcp_response(message.task_id, params)) + task = asyncio.create_task(self._send_mcp_response(message.task_id, params=params)) self.background_tasks.add(task) task.add_done_callback(self._task_done_callback) @@ -400,7 +479,7 @@ class IntelligentTerminal(App): # 重新聚焦到输入框(如果应用仍在运行) try: if hasattr(self, "is_running") and self.is_running: - self.query_one(CommandInput).focus() + self._focus_current_input_widget() except (AttributeError, ValueError, RuntimeError): # 应用可能正在退出,忽略聚焦错误 self.logger.debug("Failed to focus input widget, app may be exiting") @@ -408,56 +487,139 @@ class IntelligentTerminal(App): async def _handle_command_stream(self, user_input: str, output_container: Container) -> bool: """处理命令流式响应""" - current_line: OutputLine | MarkdownOutputLine | None = None - current_content = "" # 用于累积内容 - is_first_content = True # 标记是否是第一段内容 - received_any_content = False # 标记是否收到任何内容 - start_time = asyncio.get_event_loop().time() - timeout_seconds = 60.0 # 60秒超时 + # 在新的命令会话开始时重置MCP状态跟踪 + if self._llm_client and isinstance(self._llm_client, HermesChatClient): + self._llm_client.stream_processor.reset_status_tracking() + + stream_state = self._init_stream_state() try: - # 通过 process_command 获取命令处理结果和输出类型 - async for output_tuple in process_command(user_input, self.get_llm_client()): - content, is_llm_output = output_tuple # 解包输出内容和类型标志 - received_any_content = True - - # 检查超时 - if asyncio.get_event_loop().time() - start_time > timeout_seconds: - output_container.mount(OutputLine("请求超时,已停止处理", command=False)) - break - - # 处理内容 - params = ContentChunkParams( - content=content, - is_llm_output=is_llm_output, - current_content=current_content, - is_first_content=is_first_content, - ) - current_line = await self._process_content_chunk( - params, - current_line, - output_container, - ) + received_any_content = await self._process_stream( + user_input, + output_container, + stream_state, + ) + except TimeoutError: + received_any_content = self._handle_timeout_error(output_container, stream_state) + except asyncio.CancelledError: + received_any_content = self._handle_cancelled_error(output_container, stream_state) - # 更新状态 - if is_first_content: - is_first_content = False - current_content = content - elif isinstance(current_line, MarkdownOutputLine) and is_llm_output: - current_content += content + return received_any_content - # 滚动到底部 - await self._scroll_to_end() + def _init_stream_state(self) -> dict: + """初始化流处理状态""" + start_time = asyncio.get_event_loop().time() + return { + "current_line": None, + "current_content": "", + "is_first_content": True, + "received_any_content": False, + "start_time": start_time, + "timeout_seconds": 1800.0, # 30分钟超时,与HTTP层面保持一致 + "last_content_time": start_time, + "no_content_timeout": 300.0, # 5分钟无内容超时 + } + + async def _process_stream( + self, + user_input: str, + output_container: Container, + stream_state: dict, + ) -> bool: + """处理命令输出流""" + async for output_tuple in process_command(user_input, self.get_llm_client()): + content, is_llm_output = output_tuple + stream_state["received_any_content"] = True + current_time = asyncio.get_event_loop().time() + + # 更新最后收到内容的时间 + if content.strip(): + stream_state["last_content_time"] = current_time + + # 检查超时 + if self._check_timeouts(current_time, stream_state, output_container): + break + + # 处理内容 + await self._process_stream_content( + content, + stream_state, + output_container, + is_llm_output=is_llm_output, + ) - except asyncio.TimeoutError: - self.logger.warning("Command stream timed out") - if hasattr(self, "is_running") and self.is_running: - output_container.mount(OutputLine("请求超时,请稍后重试", command=False)) - except asyncio.CancelledError: - self.logger.info("Command stream was cancelled") - if received_any_content and hasattr(self, "is_running") and self.is_running: - output_container.mount(OutputLine("[处理被中断]", command=False)) + # 滚动到底部 + await self._scroll_to_end() + return stream_state["received_any_content"] + + def _check_timeouts( + self, + current_time: float, + stream_state: dict, + output_container: Container, + ) -> bool: + """检查各种超时条件,返回是否应该中断处理""" + # 检查总体超时 + if current_time - stream_state["start_time"] > stream_state["timeout_seconds"]: + output_container.mount(OutputLine("请求超时,已停止处理", command=False)) + return True + + # 检查无内容超时 + received_any_content = stream_state["received_any_content"] + time_since_last_content = current_time - stream_state["last_content_time"] + if received_any_content and time_since_last_content > stream_state["no_content_timeout"]: + output_container.mount(OutputLine("长时间无响应,已停止处理", command=False)) + return True + + return False + + async def _process_stream_content( + self, + content: str, + stream_state: dict, + output_container: Container, + *, + is_llm_output: bool, + ) -> None: + """处理流式内容""" + params = ContentChunkParams( + content=content, + is_llm_output=is_llm_output, + current_content=stream_state["current_content"], + is_first_content=stream_state["is_first_content"], + ) + + processed_line = await self._process_content_chunk( + params, + stream_state["current_line"], + output_container, + ) + + # 只有当返回值不为None时才更新current_line + if processed_line is not None: + stream_state["current_line"] = processed_line + + # 更新状态 + if stream_state["is_first_content"]: + stream_state["is_first_content"] = False + stream_state["current_content"] = content + elif isinstance(stream_state["current_line"], MarkdownOutputLine) and is_llm_output: + stream_state["current_content"] += content + + def _handle_timeout_error(self, output_container: Container, stream_state: dict) -> bool: + """处理超时错误""" + self.logger.warning("Command stream timed out") + if hasattr(self, "is_running") and self.is_running: + output_container.mount(OutputLine("请求超时,请稍后重试", command=False)) + return stream_state["received_any_content"] + + def _handle_cancelled_error(self, output_container: Container, stream_state: dict) -> bool: + """处理取消错误""" + self.logger.info("Command stream was cancelled") + received_any_content = stream_state["received_any_content"] + if received_any_content and hasattr(self, "is_running") and self.is_running: + output_container.mount(OutputLine("[处理被中断]", command=False)) return received_any_content async def _process_content_chunk( @@ -465,13 +627,90 @@ class IntelligentTerminal(App): params: ContentChunkParams, current_line: OutputLine | MarkdownOutputLine | None, output_container: Container, - ) -> OutputLine | MarkdownOutputLine: + ) -> OutputLine | MarkdownOutputLine | None: """处理单个内容块""" content = params.content is_llm_output = params.is_llm_output current_content = params.current_content is_first_content = params.is_first_content + # 检查是否包含MCP标记(替换标记或MCP标记) + replace_tool_name = None + mcp_tool_name = None + cleaned_content = content + + # 寻找替换标记,可能不在开头 + replace_match = re.search(r"\[REPLACE:([^\]]+)\]", content) + if replace_match: + replace_tool_name = replace_match.group(1) + # 移除替换标记,保留其他内容 + cleaned_content = re.sub(r"\[REPLACE:[^\]]+\]", "", content).strip() + self.logger.debug( + "检测到替换标记,工具: %s, 原内容长度: %d, 清理后长度: %d", + replace_tool_name, + len(content), + len(cleaned_content), + ) + self.logger.debug("原内容片段: %s", content[:100]) + self.logger.debug("清理后片段: %s", cleaned_content[:100]) + + # 寻找MCP标记,表示这是一个MCP状态消息但不需要替换 + mcp_match = re.search(r"\[MCP:([^\]]+)\]", cleaned_content) + if mcp_match: + mcp_tool_name = mcp_match.group(1) + # 移除MCP标记,保留其他内容 + cleaned_content = re.sub(r"\[MCP:[^\]]+\]", "", cleaned_content).strip() + self.logger.debug( + "检测到MCP标记,工具: %s, 清理后长度: %d", + mcp_tool_name, + len(cleaned_content), + ) + + # 使用清理后的内容进行后续处理 + content = cleaned_content + + self.logger.debug("[TUI] 处理内容: %s", content.strip()[:50]) + + # 检查是否为 MCP 进度消息 + # 修复:带有替换标记或MCP标记的内容都被认为是MCP进度消息 + tool_name = replace_tool_name or mcp_tool_name + is_progress_message = tool_name is not None and self._is_progress_message(content) + + # 如果是进度消息,根据标记类型进行处理 + if is_progress_message and tool_name: + # 检查是否为最终状态消息 + is_final_message = self._is_final_progress_message(content) + + # 检查是否有现有的进度消息 + existing_progress = self._current_progress_lines.get(tool_name) + + # 如果有替换标记,则尝试替换现有消息 + if replace_tool_name and existing_progress is not None: + # 替换现有的进度消息(包括最终状态) + existing_progress.update_markdown(content) + self.logger.debug("替换工具 %s 的进度消息: %s", tool_name, content.strip()[:50]) + + # 如果是最终状态,清理进度跟踪(但保留替换后的消息) + if is_final_message: + self._current_progress_lines.pop(tool_name, None) + self.logger.debug("工具 %s 到达最终状态,清理进度跟踪", tool_name) + + # 重要:对于MCP消息,直接返回None,避免影响后续的LLM输出处理 + # 因为MCP消息是独立的状态更新,不应该成为content accumulation的一部分 + return None + + # 创建新的进度消息(适用于首次MCP标记或没有现有进度的替换标记) + new_progress_line = ProgressOutputLine(content, step_id=tool_name) + + # 如果不是最终状态,加入进度跟踪 + if not is_final_message: + self._current_progress_lines[tool_name] = new_progress_line + + output_container.mount(new_progress_line) + self.logger.debug("创建工具 %s 的新进度消息: %s", tool_name, content.strip()[:50]) + # 同样返回None,避免影响后续内容处理 + return None + # 处理第一段内容,创建适当的输出组件 if is_first_content: new_line: OutputLine | MarkdownOutputLine = ( @@ -498,15 +737,127 @@ class IntelligentTerminal(App): output_container.mount(new_line) return new_line + def _is_progress_message(self, content: str) -> bool: + """判断是否为进度消息""" + # 必须包含工具相关的关键词,避免误识别其他消息 + tool_related_patterns = [ + r"工具.*`[^`]+`", # 包含工具和反引号的内容 + r"正在初始化工具:", + r"等待用户确认执行工具", + r"等待用户输入参数", + ] + + # 检查是否包含工具相关内容 + has_tool_content = any(re.search(pattern, content) for pattern in tool_related_patterns) + + if not has_tool_content: + return False + + # 具体的进度指示符 + progress_indicators = [ + "🔧 正在初始化工具", + "📥 工具", + "正在执行...", + "⏸️ **等待用户确认执行工具**", + "📝 **等待用户输入参数**", + "✅ 工具", + "执行完成", + "❌ 工具", + "已取消", + "⚠️ 工具", + "执行失败", + ] + + return any(indicator in content for indicator in progress_indicators) + + def _is_final_progress_message(self, content: str) -> bool: + """判断是否为最终进度消息(执行完成、失败、取消等)""" + final_indicators = [ + "✅ 工具", + "执行完成", + "❌ 工具", + "已取消", + "⚠️ 工具", + "执行失败", + ] + return any(indicator in content for indicator in final_indicators) + + def _extract_tool_name_from_content(self, content: str) -> str: + """从内容中提取工具名称""" + # 尝试从内容中提取工具名称 + patterns = [ + r"工具:\s*`([^`]+)`", + r"工具名称:\s*`([^`]+)`", + r"正在初始化工具:\s*`([^`]+)`", + r"工具\s*`([^`]+)`\s*正在执行", + r"✅ 工具\s*`([^`]+)`", + r"❌ 工具\s*`([^`]+)`", + r"⚠️ 工具\s*`([^`]+)`", + ] + + for pattern in patterns: + match = re.search(pattern, content) + if match: + return match.group(1).strip() # 使用工具名称作为步骤标识 + + return "" + + def _cleanup_progress_message(self, step_id: str) -> None: + """清理指定步骤的进度消息""" + if step_id in self._current_progress_lines: + self._current_progress_lines.pop(step_id) + # 对于完成状态,我们保留消息但从跟踪中移除 + self.logger.debug("清理步骤 %s 的进度消息跟踪", step_id) + def _format_error_message(self, error: BaseException) -> str: """格式化错误消息""" error_str = str(error).lower() + + # 处理网络连接异常 + if "remoteprotocolerror" in error_str or "peer closed connection" in error_str: + return "网络连接异常中断,请稍后重试" if "timeout" in error_str: return "请求超时,请稍后重试" - if any(keyword in error_str for keyword in ["network", "connection"]): + if any(keyword in error_str for keyword in ["network", "connection", "unreachable"]): return "网络连接错误,请检查网络后重试" + if "httperror" in error_str or "http" in error_str: + return "服务端响应异常,请稍后重试" + return f"处理命令时出错: {error!s}" + def _focus_current_input_widget(self) -> None: + """聚焦到当前的输入组件,考虑 MCP 模式状态""" + try: + if self._mcp_mode == "normal": + # 正常模式,聚焦到 CommandInput + self.query_one(CommandInput).focus() + elif self._mcp_mode == "confirm": + # MCP 确认模式,聚焦到 MCP 确认组件 + try: + mcp_widget = self.query_one("#mcp-confirm") + mcp_widget.focus() + except (AttributeError, ValueError, RuntimeError): + # 如果MCP组件不存在,回退到正常模式 + self._mcp_mode = "normal" + self.query_one(CommandInput).focus() + elif self._mcp_mode == "parameter": + # MCP 参数模式,聚焦到 MCP 参数组件 + try: + mcp_widget = self.query_one("#mcp-parameter") + mcp_widget.focus() + except (AttributeError, ValueError, RuntimeError): + # 如果MCP组件不存在,回退到正常模式 + self._mcp_mode = "normal" + self.query_one(CommandInput).focus() + else: + # 未知模式,重置为正常模式并聚焦到 CommandInput + self.logger.warning("未知的 MCP 模式: %s,重置为正常模式", self._mcp_mode) + self._mcp_mode = "normal" + self.query_one(CommandInput).focus() + except (AttributeError, ValueError, RuntimeError) as e: + # 聚焦失败时记录调试信息,但不抛出异常 + self.logger.debug("Failed to focus input widget: %s", str(e)) + async def _scroll_to_end(self) -> None: """滚动到容器底部的辅助方法""" # 获取输出容器 @@ -593,48 +944,73 @@ class IntelligentTerminal(App): """替换输入容器中的组件为 MCP 交互组件""" try: input_container = self.query_one("#input-container") + + # 切换到 MCP 模式样式 + input_container.remove_class("normal-mode") + input_container.add_class("mcp-mode") + # 移除所有子组件 input_container.remove_children() + # 添加新的 MCP 组件 input_container.mount(widget) - # 聚焦到新组件 - widget.focus() + + # 延迟聚焦,确保组件完全挂载 + self.set_timer(0.05, lambda: widget.focus()) + except Exception: self.logger.exception("替换输入组件失败") + # 如果替换失败,尝试恢复正常输入 + try: + self._restore_normal_input() + except Exception: + self.logger.exception("恢复正常输入失败") def _restore_normal_input(self) -> None: """恢复正常的命令输入组件""" try: input_container = self.query_one("#input-container") + + # 重置 MCP 状态 + self._mcp_mode = "normal" + self._current_mcp_task_id = "" + + # 切换回正常模式样式 + input_container.remove_class("mcp-mode") + input_container.add_class("normal-mode") + # 移除所有子组件 input_container.remove_children() + # 添加正常的命令输入组件 - input_container.mount(CommandInput()) + command_input = CommandInput() + input_container.mount(command_input) + # 聚焦到输入框 - self.query_one(CommandInput).focus() - # 重置 MCP 状态 - self._mcp_mode = "normal" - self._current_mcp_task_id = "" + self._focus_current_input_widget() + except Exception: self.logger.exception("恢复正常输入组件失败") + # 如果恢复失败,至少要重置状态 + self._mcp_mode = "normal" + self._current_mcp_task_id = "" - async def _send_mcp_response(self, task_id: str, params: bool | dict) -> None: + async def _send_mcp_response(self, task_id: str, *, params: bool | dict[str, Any]) -> None: """发送 MCP 响应并处理结果""" - try: - # 恢复正常输入界面 - self._restore_normal_input() + output_container: Container | None = None - # 获取输出容器 - output_container = self.query_one("#output-container") + try: + # 先获取输出容器,确保可以显示错误信息 + output_container = self.query_one("#output-container", Container) # 发送 MCP 响应并处理流式回复 llm_client = self.get_llm_client() if hasattr(llm_client, "send_mcp_response"): success = await self._handle_mcp_response_stream( task_id, - params, - output_container, - llm_client, # type: ignore[arg-type] + params=params, + output_container=output_container, + llm_client=llm_client, ) if not success: # 如果没有收到任何响应内容,显示默认消息 @@ -645,35 +1021,44 @@ class IntelligentTerminal(App): except Exception as e: self.logger.exception("发送 MCP 响应失败") - # 确保恢复正常界面 - self._restore_normal_input() # 显示错误信息 - output_container = self.query_one("#output-container") - error_message = self._format_error_message(e) - output_container.mount(OutputLine(f"❌ 发送 MCP 响应失败: {error_message}")) + if output_container is not None: + try: + error_message = self._format_error_message(e) + output_container.mount(OutputLine(f"❌ 发送 MCP 响应失败: {error_message}")) + except Exception: + # 如果连显示错误信息都失败了,至少记录日志 + self.logger.exception("无法显示错误信息") finally: + # 重置处理标志,不再在这里恢复输入界面 self.processing = False async def _handle_mcp_response_stream( self, task_id: str, - params: bool | dict, - output_container, # noqa: ANN001 - llm_client, # noqa: ANN001 + *, + params: bool | dict[str, Any], + output_container: Container, + llm_client: LLMClientBase, ) -> bool: """处理 MCP 响应的流式回复""" current_line: OutputLine | MarkdownOutputLine | None = None current_content = "" is_first_content = True received_any_content = False - timeout_seconds = 60.0 + timeout_seconds = 1800.0 # 30分钟超时,与HTTP层面保持一致 + + if not isinstance(llm_client, HermesChatClient): + self.logger.error("当前客户端不支持 MCP 响应功能") + output_container.mount(OutputLine("❌ 当前客户端不支持 MCP 响应功能")) + return False try: # 使用 asyncio.wait_for 包装整个流处理过程 async def _process_stream() -> bool: nonlocal current_line, current_content, is_first_content, received_any_content - async for content in llm_client.send_mcp_response(task_id, params): + async for content in llm_client.send_mcp_response(task_id, params=params): if not content.strip(): continue @@ -692,11 +1077,14 @@ class IntelligentTerminal(App): current_content=current_content, is_first_content=is_first_content, ) - current_line = await self._process_content_chunk( + processed_line = await self._process_content_chunk( params_obj, current_line, output_container, ) + # 只有当返回值不为None时才更新current_line + if processed_line is not None: + current_line = processed_line # 第一段内容后设置标记 if is_first_content: @@ -710,7 +1098,7 @@ class IntelligentTerminal(App): # 执行流处理,添加超时 return await asyncio.wait_for(_process_stream(), timeout=timeout_seconds) - except asyncio.TimeoutError: + except TimeoutError: output_container.mount(OutputLine(f"⏱️ MCP 响应超时 ({timeout_seconds}秒)")) return received_any_content except asyncio.CancelledError: diff --git a/src/backend/hermes/client.py b/src/backend/hermes/client.py index 911e2325a629f200379322dd776070a7eafeff32..36f2408a97c03820046eed4cec24f7c3ad713686 100644 --- a/src/backend/hermes/client.py +++ b/src/backend/hermes/client.py @@ -5,18 +5,22 @@ from __future__ import annotations import json import re import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Self from urllib.parse import urljoin import httpx -from typing_extensions import Self from backend.base import LLMClientBase from log.manager import get_logger, log_exception from .constants import HTTP_OK from .exceptions import HermesAPIError +from .models import HermesApp, HermesChatRequest, HermesFeatures +from .services.agent import HermesAgentManager +from .services.conversation import HermesConversationManager from .services.http import HermesHttpManager +from .services.model import HermesModelManager +from .stream import HermesStreamEvent, HermesStreamProcessor if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -24,11 +28,7 @@ if TYPE_CHECKING: from backend.mcp_handler import MCPEventHandler - from .models import HermesAgent, HermesChatRequest - from .services.agent import HermesAgentManager - from .services.conversation import HermesConversationManager - from .services.model import HermesModelManager - from .stream import HermesStreamEvent, HermesStreamProcessor + from .models import HermesAgent def validate_url(url: str) -> bool: @@ -73,8 +73,6 @@ class HermesChatClient(LLMClientBase): def model_manager(self) -> HermesModelManager: """获取模型管理器(延迟初始化)""" if self._model_manager is None: - from .services.model import HermesModelManager - self._model_manager = HermesModelManager(self.http_manager) return self._model_manager @@ -82,8 +80,6 @@ class HermesChatClient(LLMClientBase): def agent_manager(self) -> HermesAgentManager: """获取智能体管理器(延迟初始化)""" if self._agent_manager is None: - from .services.agent import HermesAgentManager - self._agent_manager = HermesAgentManager(self.http_manager) return self._agent_manager @@ -91,8 +87,6 @@ class HermesChatClient(LLMClientBase): def conversation_manager(self) -> HermesConversationManager: """获取会话管理器(延迟初始化)""" if self._conversation_manager is None: - from .services.conversation import HermesConversationManager - self._conversation_manager = HermesConversationManager(self.http_manager) return self._conversation_manager @@ -100,8 +94,6 @@ class HermesChatClient(LLMClientBase): def stream_processor(self) -> HermesStreamProcessor: """获取流处理器(延迟初始化)""" if self._stream_processor is None: - from .stream import HermesStreamProcessor - self._stream_processor = HermesStreamProcessor() return self._stream_processor @@ -154,6 +146,9 @@ class HermesChatClient(LLMClientBase): # 如果有未完成的会话,先停止它 await self._stop() + # 不在这里重置状态跟踪,让进度状态能够跨流保持 + # 只有在真正的新对话开始时才重置(由上层调用方决定) + self.logger.info("开始 Hermes 流式聊天请求") self.logger.debug("提示内容长度: %d", len(prompt)) start_time = time.time() @@ -164,8 +159,6 @@ class HermesChatClient(LLMClientBase): self.logger.info("使用会话ID: %s", conversation_id) # 创建聊天请求 - from .models import HermesApp, HermesChatRequest, HermesFeatures - app = HermesApp(self._current_agent_id) request = HermesChatRequest( app=app, @@ -211,7 +204,7 @@ class HermesChatClient(LLMClientBase): """ return await self.agent_manager.get_available_agents() - async def send_mcp_response(self, task_id: str, params: bool | dict) -> AsyncGenerator[str, None]: + async def send_mcp_response(self, task_id: str, *, params: bool | dict) -> AsyncGenerator[str, None]: """ 发送 MCP 响应并获取流式回复 @@ -226,6 +219,7 @@ class HermesChatClient(LLMClientBase): HermesAPIError: 当 API 调用失败时 """ + # 不在 MCP 响应时重置状态跟踪,保持去重机制有效 self.logger.info("发送 MCP 响应 - 任务ID: %s", task_id) start_time = time.time() @@ -327,8 +321,6 @@ class HermesChatClient(LLMClientBase): async def _process_stream_events(self, response: httpx.Response) -> AsyncGenerator[str, None]: """处理流式响应事件""" - from .stream import HermesStreamEvent - has_content = False event_count = 0 has_error_message = False # 标记是否已经产生错误消息 @@ -386,28 +378,21 @@ class HermesChatClient(LLMClientBase): yield mcp_status # 处理 MCP 交互事件 - if event.event_type == "step.waiting_for_start": - # 通知 TUI 切换到确认界面 - if self._mcp_handler is not None: + if self._mcp_handler is not None: + if event.event_type == "step.waiting_for_start": + # 通知 TUI 切换到确认界面 await self._mcp_handler.handle_waiting_for_start(event) - content = event.get_content() - step_name = event.get_step_name() - reason = content.get("reason", "需要用户确认") - yield f"⏸️ 等待用户确认执行工具 '{step_name}': {reason}" - elif event.event_type == "step.waiting_for_param": - # 通知 TUI 切换到参数输入界面 - if self._mcp_handler is not None: + elif event.event_type == "step.waiting_for_param": + # 通知 TUI 切换到参数输入界面 await self._mcp_handler.handle_waiting_for_param(event) - content = event.get_content() - step_name = event.get_step_name() - message = content.get("message", "需要补充参数") - yield f"📝 等待用户输入参数 - 工具 '{step_name}': {message}" - - # 处理文本内容 - text_content = event.get_text_content() - if text_content: - self.stream_processor.log_text_content(text_content) - yield text_content + + # 处理文本内容:只有当不是 MCP 步骤事件时才输出文本内容 + # 这避免了 MCP 状态消息和文本内容的重复输出 + if not event.is_mcp_step_event(): + text_content = event.get_text_content() + if text_content: + self.stream_processor.log_text_content(text_content) + yield text_content async def _stop(self) -> None: """停止当前会话""" diff --git a/src/backend/hermes/stream.py b/src/backend/hermes/stream.py index 60e81c42485a87dd8f86b771d889812d58aa4f09..a6aa696453e3b312126e8dc5f43542dd891d3802 100644 --- a/src/backend/hermes/stream.py +++ b/src/backend/hermes/stream.py @@ -1,9 +1,14 @@ -"""Hermes 流事件处理器""" +""" +Hermes 流处理模块 + +用于处理 SSE (Server-Sent Events) 流式数据和 MCP 事件 +""" from __future__ import annotations import json -from typing import TYPE_CHECKING +import time +from typing import TYPE_CHECKING, Any from log.manager import get_logger @@ -51,10 +56,6 @@ class HermesStreamEvent: """获取文本内容""" if self.event_type == "text.add": return self.data.get("content", {}).get("text", "") - if self.event_type == "step.output": - content = self.data.get("content", {}) - if "text" in content: - return content["text"] return None def get_flow_info(self) -> dict[str, Any]: @@ -114,6 +115,38 @@ class HermesStreamProcessor: def __init__(self) -> None: """初始化流处理器""" self.logger = get_logger(__name__) + # 跟踪状态消息去重 + self._last_status_message: str = "" + self._should_ignore_flow_stop: bool = False + + # 增强的去重机制:跟踪事件类型和时间 + self._event_history: list[tuple[str, str, float]] = [] # (event_type, step_name, timestamp) + self._event_dedup_window: float = 5.0 # 5秒去重窗口 + + # 进度消息替换机制:跟踪当前工具的进度状态 + self._current_tool_progress: dict[str, dict[str, Any]] = {} # step_id -> progress_info + self._progress_message_types = { + "step.init", + "step.input", + "step.output", + "step.cancel", + "step.error", + "step.waiting_for_start", + "step.waiting_for_param", + } + self._final_message_types = { + "flow.success", + "flow.failed", + "flow.cancel", + } + + def reset_status_tracking(self) -> None: + """重置状态跟踪,用于新对话开始时""" + self._last_status_message = "" + self._should_ignore_flow_stop = False + self._event_history.clear() + self._current_tool_progress.clear() + self.logger.debug("状态跟踪已重置") def handle_special_events(self, event: HermesStreamEvent) -> tuple[bool, str | None]: """处理特殊事件类型,返回(是否中断, 中断消息)""" @@ -151,20 +184,245 @@ class HermesStreamProcessor: return None step_name = event.get_step_name() + step_id = event.get_step_id() event_type = event.event_type + content = event.get_content() + + # 基于步骤ID和事件类型的去重检查 + if not self._should_process_event(event_type, step_id): + return None + # 检查是否应该替换之前的进度消息 + should_replace = self._should_replace_progress(event, step_id) + + # 处理特殊的等待状态事件 + if event_type == "step.waiting_for_start": + return self._format_waiting_for_start(content, step_name, step_id, should_replace=should_replace) + + if event_type == "step.waiting_for_param": + return self._format_waiting_for_param(content, step_name, step_id, should_replace=should_replace) + + # 特殊处理 flow.stop:如果刚刚收到过等待事件,则忽略 + if event_type == "flow.stop": + return self._handle_flow_stop() + + # 处理其他事件类型 + return self._format_standard_status(event_type, step_name, step_id, should_replace=should_replace) + + def _should_process_event(self, event_type: str, step_id: str) -> bool: + """检查是否应该处理此事件(去重逻辑)""" + current_time = time.time() + event_key = f"{event_type}:{step_id or 'flow'}" + + # 详细调试日志 + self.logger.debug("处理事件: %s, 时间: %.3f", event_key, current_time) + + # 清理过期的事件历史(保留去重窗口内的事件) + old_count = len(self._event_history) + self._event_history = [ + (etype, sname, timestamp) + for etype, sname, timestamp in self._event_history + if current_time - timestamp < self._event_dedup_window + ] + new_count = len(self._event_history) + if old_count != new_count: + self.logger.debug("清理过期事件历史: %d -> %d", old_count, new_count) + + # 检查是否为重复事件(基于步骤ID而不是步骤名称) + for hist_event_type, hist_step_id, timestamp in self._event_history: + hist_key = f"{hist_event_type}:{hist_step_id or 'flow'}" + if hist_key == event_key and current_time - timestamp < self._event_dedup_window: + self.logger.debug("跳过重复事件: %s, 距离上次 %.2f 秒", event_key, current_time - timestamp) + return False + + # 记录当前事件(使用步骤ID) + self._event_history.append((event_type, step_id or "", current_time)) + self.logger.debug("记录新事件: %s, 历史记录数量: %d", event_key, len(self._event_history)) + return True + + def _format_waiting_for_start( + self, + content: dict[str, Any], + step_name: str, + step_id: str, + *, + should_replace: bool, + ) -> str: + """格式化等待开始执行的消息""" + self._should_ignore_flow_stop = True # 标记下一个 flow.stop 应该被忽略 + risk = content.get("risk", "unknown") + reason = content.get("reason", "需要用户确认是否执行此工具") + + # 风险级别映射 + risk_info = { + "low": "🟢 低风险", + "medium": "🟡 中等风险", + "high": "🔴 高风险", + }.get(risk, "⚪ 风险等级未知") + + message = f"\n⏸️ **等待用户确认执行工具**\n\n🔧 工具名称: `{step_name}` {risk_info}\n\n💭 说明: {reason}\n" + + # 记录进度信息 + if step_id: + self._current_tool_progress[step_id] = { + "message": message, + "should_replace": should_replace, + "is_progress": True, + } + + return message + + def _format_waiting_for_param( + self, + content: dict[str, Any], + step_name: str, + step_id: str, + *, + should_replace: bool, + ) -> str: + """格式化等待参数输入的消息""" + self._should_ignore_flow_stop = True # 标记下一个 flow.stop 应该被忽略 + message_content = content.get("message", "需要补充参数") + message = f"\n📝 **等待用户输入参数**\n\n🔧 工具名称: `{step_name}`\n\n💭 说明: {message_content}\n" + + # 记录进度信息 + if step_id: + self._current_tool_progress[step_id] = { + "message": message, + "should_replace": should_replace, + "is_progress": True, + } + + return message + + def _handle_flow_stop(self) -> str | None: + """处理流停止事件""" + if self._should_ignore_flow_stop: + self.logger.debug("忽略 flow.stop 事件,因为刚刚收到等待用户操作事件") + self._should_ignore_flow_stop = False # 重置标记 + return None + # 如果不是因为等待用户操作而暂停,则显示通用暂停消息 + message = "\n⏸️ **工作流已暂停**\n" + if message == self._last_status_message: + return None + self._last_status_message = message + return message + + def _format_standard_status( + self, + event_type: str, + step_name: str, + step_id: str, + *, + should_replace: bool, + ) -> str | None: + """格式化标准状态消息""" # 定义事件类型到状态消息的映射 status_messages = { - "step.init": f"🔧 正在初始化工具: {step_name}", - "step.input": f"📥 工具 {step_name} 正在执行...", - "step.output": f"✅ 工具 {step_name} 执行完成", - "step.cancel": f"❌ 工具 {step_name} 已取消", - "step.error": f"⚠️ 工具 {step_name} 执行失败", - "flow.start": "🚀 开始执行工作流", - "flow.stop": "⏸️ 工作流已暂停,等待用户操作", - "flow.success": "🎉 工作流执行成功", - "flow.failed": "💥 工作流执行失败", - "flow.cancel": "🛑 工作流已取消", + "step.init": f"\n🔧 正在初始化工具: `{step_name}`\n", + "step.input": f"\n📥 工具 `{step_name}` 正在执行...\n", + "step.output": f"\n✅ 工具 `{step_name}` 执行完成\n", + "step.cancel": f"\n❌ 工具 `{step_name}` 已取消\n", + "step.error": f"\n⚠️ 工具 `{step_name}` 执行失败\n", + # 隐藏 flow.start 和 flow.success,只保留失败相关的状态 + # "flow.start": "\n🚀 **开始执行工作流**\n", # 隐藏 + # "flow.success": "\n🎉 **工作流执行成功**\n", # 隐藏 + "flow.failed": "\n💥 **工作流执行失败**\n", + "flow.cancel": "\n🛑 **工作流已取消**\n", } - return status_messages.get(event_type) + base_message = status_messages.get(event_type) + if not base_message: + return None + + # 对于所有步骤相关的消息,都检查是否需要替换之前的进度 + if event_type in self._progress_message_types and step_id: + base_message = self._handle_progress_message( + event_type, + step_name, + step_id, + base_message, + should_replace=should_replace, + ) + elif event_type in self._final_message_types and step_id: + # 这是一个流级别的最终状态消息,清理对应的进度信息 + self._current_tool_progress.pop(step_id, None) + + # 检查是否与上一次的状态消息相同,避免重复输出 + if base_message == self._last_status_message: + self.logger.debug("跳过重复的状态消息: %s", base_message.strip()) + return None + + self._last_status_message = base_message + return base_message + + def _handle_progress_message( + self, + event_type: str, + step_name: str, + step_id: str, + base_message: str, + *, + should_replace: bool, + ) -> str: + """处理进度消息的替换逻辑""" + # 检查是否为最终状态消息 + is_final_state = event_type in {"step.output", "step.cancel", "step.error"} + + # 关键修复:使用工具名称而不是step_id来跟踪,确保同一工具的后续状态更新能够替换之前的进度 + # 策略:如果是同一个工具名称的后续消息,就应该替换之前的消息 + has_previous_progress = step_name in self._current_tool_progress + + # 这是一个进度消息,记录到跟踪字典中(使用工具名称作为key) + if not is_final_state: + self._current_tool_progress[step_name] = { + "message": base_message, + "should_replace": should_replace, + "is_progress": True, + "step_id": step_id, # 保留step_id用于调试 + } + + # 核心修复:所有的MCP进度消息都应该标记为MCP状态,而不仅仅是需要替换的消息 + # 使用工具名称作为标识,确保TUI层面能正确识别为MCP消息 + if has_previous_progress: + # 如果有之前的进度,说明这是一个状态更新,需要替换 + base_message = f"[REPLACE:{step_name}]{base_message}" + if is_final_state: + self.logger.debug("添加替换标记给最终状态消息,工具 %s: %s", step_name, event_type) + # 清理对应的进度信息 + self._current_tool_progress.pop(step_name, None) + else: + self.logger.debug("添加替换标记给工具 %s: %s", step_name, event_type) + else: + # 如果是第一个进度消息,添加MCP标记但不替换 + base_message = f"[MCP:{step_name}]{base_message}" + self.logger.debug("添加MCP标记给首次进度消息,工具 %s: %s", step_name, event_type) + + return base_message + + def _should_replace_progress(self, event: HermesStreamEvent, step_id: str | None) -> bool: + """判断是否应该替换之前的进度消息""" + step_name = event.get_step_name() + if not step_name: + return False + + event_type = event.event_type + + # 对于进度消息类型,只要存在同一个工具名称的之前记录,就应该替换 + # 这确保了同一个工具的状态更新会依次替换,而不是累积 + if event_type in self._progress_message_types and step_name in self._current_tool_progress: + prev_info = self._current_tool_progress[step_name] + if prev_info.get("is_progress", False): + self.logger.debug( + "工具 %s 的进度消息将被替换: %s -> %s", + step_name, + prev_info.get("message", "").strip()[:50], + event_type, + ) + return True + + return False + + def get_replacement_info(self, step_id: str) -> dict[str, Any] | None: + """获取指定步骤的替换信息""" + return self._current_tool_progress.get(step_id) diff --git a/src/tool/command_processor.py b/src/tool/command_processor.py index ed9ab577362f0268c209690cbd2d3bf29bd09117..928a1977e079ca23c537372c5b0afc299b73ada0 100644 --- a/src/tool/command_processor.py +++ b/src/tool/command_processor.py @@ -33,7 +33,7 @@ def execute_command(command: str) -> tuple[bool, str]: shell=True, capture_output=True, text=True, - timeout=600, + timeout=1800, check=False, ) success = result.returncode == 0