From b8b448fbe7783bbfb3dc237e48885c3d3cb6bfda Mon Sep 17 00:00:00 2001 From: z30057876 Date: Wed, 29 Oct 2025 17:04:44 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=85=B3=E9=97=ADSSL=E9=AA=8C=E8=AF=81?= =?UTF-8?q?=EF=BC=9B=E4=BC=98=E5=8C=96JSON=E7=94=9F=E6=88=90Prompt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/llm/generator.py | 113 +++++++++----- apps/llm/prompt.py | 287 ++++++++++++++++++++++++----------- apps/llm/providers/ollama.py | 2 + apps/llm/providers/openai.py | 5 + apps/llm/providers/tei.py | 2 +- 5 files changed, 286 insertions(+), 123 deletions(-) diff --git a/apps/llm/generator.py b/apps/llm/generator.py index f2f681faa..656ef2800 100644 --- a/apps/llm/generator.py +++ b/apps/llm/generator.py @@ -10,11 +10,12 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from jsonschema import Draft7Validator -from apps.models import LLMType +from apps.models import LanguageType, LLMType from apps.schemas.llm import LLMFunctions from .llm import LLM -from .prompt import JSON_GEN_BASIC, JSON_NO_FUNCTION_CALL +from .prompt import JSON_GEN +from .token import token_calculator _logger = logging.getLogger(__name__) @@ -53,11 +54,71 @@ class JsonGenerator: _logger.info("[JSONGenerator] LLM不支持FunctionCall,将使用prompt方式") self._support_function_call = False + def _build_messages( + self, + function: dict[str, Any], + conversation: list[dict[str, str]], + language: LanguageType = LanguageType.CHINESE, + ) -> list[dict[str, str]]: + """构建messages,提取query并使用JSON_GEN模板格式化""" + if conversation[-1]["role"] == "user": + query = conversation[-1]["content"] + else: + err = "[JSONGenerator] 对话历史中最后一项必须是用户消息" + raise RuntimeError(err) + + template = self._env.from_string(JSON_GEN[language]) + prompt = template.render( + query=query, + conversation=conversation[:-1], + schema=function["parameters"], + use_xml_format=False, + ) + + messages = [*conversation[:-1], {"role": "user", "content": prompt}] + + # 计算Token数量 + if self._llm is not None: + token_count = token_calculator.calculate_token_length(messages) + ctx_length = self._llm.config.ctxLength + + # 进行消息裁剪 + if token_count > ctx_length: + _logger.warning( + "[JSONGenerator] 当前对话 Token 数量 (%d) 超过模型上下文长度 (%d),进行消息裁剪", + token_count, + ctx_length, + ) + + trimmed_conversation = list(conversation[:-1]) + + while trimmed_conversation and token_count > ctx_length: + if len(trimmed_conversation) >= 2 and \ + trimmed_conversation[0]["role"] == "user" and \ + trimmed_conversation[1]["role"] == "assistant": # noqa: PLR2004 + trimmed_conversation = trimmed_conversation[2:] + elif trimmed_conversation: + trimmed_conversation = trimmed_conversation[1:] + else: + break + + # 重新构建 messages 并计算 token + messages = [*trimmed_conversation, {"role": "user", "content": prompt}] + token_count = token_calculator.calculate_token_length(messages) + + _logger.info( + "[JSONGenerator] 裁剪后对话 Token 数量: %d,移除了 %d 条消息", + token_count, + len(conversation) - len(trimmed_conversation) - 1, + ) + + return messages + async def _single_trial( self, function: dict[str, Any], - query: str, context: list[dict[str, str]], + language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """单次尝试,包含校验逻辑;function使用OpenAI标准Function格式""" if self._llm is None: @@ -70,10 +131,10 @@ class JsonGenerator: # 执行生成 if self._support_function_call: # 如果支持FunctionCall - result = await self._call_with_function(function, query, context) + result = await self._call_with_function(function, context, language) else: # 如果不支持FunctionCall - result = await self._call_without_function(function, query, context) + result = await self._call_without_function(function, context, language) # 校验结果 try: @@ -94,23 +155,22 @@ class JsonGenerator: async def _call_with_function( self, function: dict[str, Any], - query: str, - context: list[dict[str, str]], + conversation: list[dict[str, str]], + language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """使用FunctionCall方式调用""" if self._llm is None: err = "[JSONGenerator] 未初始化,请先调用init()方法" raise RuntimeError(err) + messages = self._build_messages(function, conversation, language) + tool = LLMFunctions( name=function["name"], description=function["description"], param_schema=function["parameters"], ) - messages = context.copy() - messages.append({"role": "user", "content": query}) - tool_call_result = {} async for chunk in self._llm.call(messages, include_thinking=False, streaming=True, tools=[tool]): if chunk.tool_call: @@ -125,25 +185,15 @@ class JsonGenerator: async def _call_without_function( self, function: dict[str, Any], - query: str, - context: list[dict[str, str]], + conversation: list[dict[str, str]], + language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: """不使用FunctionCall方式调用""" if self._llm is None: err = "[JSONGenerator] 未初始化,请先调用init()方法" raise RuntimeError(err) - template = self._env.from_string(JSON_GEN_BASIC + "\n\n" + JSON_NO_FUNCTION_CALL) - prompt = template.render( - query=query, - conversation=context[1:] if context else [], - schema=function["parameters"], - ) - - messages = [ - context[0], - {"role": "user", "content": prompt}, - ] + messages = self._build_messages(function, conversation, language) # 使用LLM的call方法获取响应 full_response = "" @@ -162,22 +212,11 @@ class JsonGenerator: async def generate( self, - query: str, function: dict[str, Any], conversation: list[dict[str, str]] | None = None, + language: LanguageType = LanguageType.CHINESE, ) -> dict[str, Any]: - """ - 生成JSON;function使用OpenAI标准Function格式 - - Args: - query: 用户查询 - function: OpenAI标准Function格式的函数定义 - conversation: 对话历史,默认为空列表 - - Returns: - 生成的JSON对象 - - """ + """生成JSON;function使用OpenAI标准Function格式""" if self._llm is None: err = "[JSONGenerator] 未初始化,请先调用init()方法" raise RuntimeError(err) @@ -202,7 +241,7 @@ class JsonGenerator: count += 1 try: # 如果_single_trial没有抛出异常,直接返回结果,不进行重试 - return await self._single_trial(function, query, context) + return await self._single_trial(function, context, language) except Exception: _logger.exception( "[JSONGenerator] 第 %d/%d 次尝试失败", diff --git a/apps/llm/prompt.py b/apps/llm/prompt.py index 9851b2822..6ece04539 100644 --- a/apps/llm/prompt.py +++ b/apps/llm/prompt.py @@ -3,89 +3,206 @@ from textwrap import dedent -JSON_GEN_BASIC = dedent(r""" - - - You are an intelligent assistant who can use tools to help answer user queries. - Your task is to respond to the query according to the background information and available tools. - - Note: - - You have access to a set of tools that can help you gather information. - - You can use one tool at a time and will receive the result in the user's response. - - Use tools step-by-step to respond to the user's query, with each tool use informed by the \ -result of the previous tool use. - - The user's query is provided in the tags. - {% if previous_trial %}- Review the previous trial information in \ -tags to avoid repeating mistakes.{% endif %} - - - - - {{ query }} - - {% if previous_trial %} - - - - You previously attempted to answer the query by calling a tool, but the arguments were incorrect. - - - {{ previous_trial }} - - - {{ err_info }} - - - {% endif %} - - - You have access to a set of tools. You can use one tool and will receive the result of that tool \ -use in the user's response. - -""") - -JSON_NO_FUNCTION_CALL = dedent(r""" - **Tool Use Formatting:** - Tool uses are formatted using XML-style tags. The tool name itself becomes the root XML tag name. \ -Each parameter is enclosed within its own set of tags according to the parameter schema provided below. - - **Basic Structure:** - - value - - - **Parameter Schema:** - The available tools and their parameter schemas are provided in the following format: - - Tool name: The name to use as the root XML tag - - Parameters: Each parameter has a name, type, and description - - Required parameters must be included - - Optional parameters can be omitted - - **XML Generation Rules:** - 1. Use the exact tool name as the root XML tag - 2. For each parameter, create a nested tag with the parameter name - 3. Place the parameter value inside the corresponding tag - 4. For string values: text value - 5. For numeric values: 123 - 6. For boolean values: true or false - 7. For array values: wrap each item in the parameter tag - item1 - item2 - 8. For object values: nest the object properties as sub-tags - - value1 - value2 - - - **Example:** - If you need to use a tool named "search" with parameters query (string) and limit (number): - - - your search text - 10 - - - Always use the actual tool name as the root XML tag and match parameter names exactly as specified \ -in the schema for proper parsing and execution. -""") +from apps.models import LanguageType + +JSON_GEN: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent( + r""" + 你是一个智能助手,可以访问帮助回答用户查询的工具。 + 你的任务是使用可用的工具和背景信息来响应查询。 + + + - 你可以访问能够帮助收集信息的工具 + - 逐步使用工具,每次使用都基于之前的结果 + - 用户的查询在 标签中提供 + {% if previous_trial %}- 查看 信息以避免重复错误{% endif %} + {% if use_xml_format %}- 使用 XML 样式的标签格式化工具调用,其中工具名称是根标签,每个参数是嵌套标签 + - 使用架构中指定的确切工具名称和参数名称 + - 基本格式结构: + <工具名称> + <参数名称>值 + + - 参数类型: + * 字符串:搜索文本 + * 数字:10 + * 布尔值:true + * 数组(重复标签):项目1项目2 + * 对象(嵌套标签):{% endif %} + + {% if use_xml_format %} + + + + 杭州的天气怎么样? + + + + + get_weather: 获取指定城市的当前天气信息 + + + { + "name": "get_weather", + "description": "获取指定城市的当前天气信息", + "parameters": { + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "要查询天气的城市名称" + }, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "温度单位" + }, + "include_forecast": { + "type": "boolean", + "description": "是否包含预报数据" + } + }, + "required": ["city"] + } + } + + + + 助手响应: + + 杭州 + celsius + false + + + {% endif %} + + + {{ query }} + + {% if previous_trial %} + + + + 你之前的工具调用有不正确的参数。 + + + {{ previous_trial }} + + + {{ err_info }} + + + {% endif %} + + + + {{ tool_descriptions }} + + + {{ tool_schemas }} + + + """, + ), + LanguageType.ENGLISH: dedent( + r""" + You are an intelligent assistant with access to tools that help answer user queries. + Your task is to respond to queries using the available tools and background information. + + + - You have access to tools that can help gather information + - Use tools step-by-step, with each use informed by previous results + - The user's query is provided in the tags + {% if previous_trial %}- Review the information to avoid \ +repeating mistakes{% endif %} + {% if use_xml_format %}- Format tool calls using XML-style tags where the tool name is the root tag \ +and each parameter is a nested tag + - Use the exact tool name and parameter names as specified in the schema + - Basic format structure: + + value + + - Parameter types: + * String: search text + * Number: 10 + * Boolean: true + * Array (repeat tags): item1item2 + * Object (nest tags): value{% endif %} + + {% if use_xml_format %} + + + + What is the weather like in Hangzhou? + + + + + get_weather: Get current weather information for a specified city + + + { + "name": "get_weather", + "description": "Get current weather information for a specified city", + "parameters": { + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "The city name to query weather for" + }, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "Temperature unit" + }, + "include_forecast": { + "type": "boolean", + "description": "Whether to include forecast data" + } + }, + "required": ["city"] + } + } + + + + Assistant response: + + Hangzhou + celsius + false + + + {% endif %} + + + {{ query }} + + {% if previous_trial %} + + + + Your previous tool call had incorrect arguments. + + + {{ previous_trial }} + + + {{ err_info }} + + + {% endif %} + + + + {{ tool_descriptions }} + + + {{ tool_schemas }} + + + """, + ), +} diff --git a/apps/llm/providers/ollama.py b/apps/llm/providers/ollama.py index 35e4235eb..e72151069 100644 --- a/apps/llm/providers/ollama.py +++ b/apps/llm/providers/ollama.py @@ -47,6 +47,7 @@ class OllamaProvider(BaseProvider): self._client = AsyncClient( host=self.config.baseUrl, timeout=self._timeout, + verify=False, ) else: self._client = AsyncClient( @@ -55,6 +56,7 @@ class OllamaProvider(BaseProvider): "Authorization": f"Bearer {self.config.apiKey}", }, timeout=self._timeout, + verify=False, ) def _process_usage_data(self, last_chunk: ChatResponse | None, messages: list[dict[str, str]]) -> None: diff --git a/apps/llm/providers/openai.py b/apps/llm/providers/openai.py index 0c2d3d458..b14e9d4b9 100644 --- a/apps/llm/providers/openai.py +++ b/apps/llm/providers/openai.py @@ -5,6 +5,7 @@ import logging from collections.abc import AsyncGenerator from typing import cast +import httpx from openai import AsyncOpenAI, AsyncStream from openai.types.chat import ( ChatCompletionChunk, @@ -24,6 +25,7 @@ class OpenAIProvider(BaseProvider): """OpenAI大模型客户端""" _client: AsyncOpenAI + _http_client: httpx.AsyncClient input_tokens: int output_tokens: int _allow_chat: bool @@ -54,16 +56,19 @@ class OpenAIProvider(BaseProvider): @override def _init_client(self) -> None: """初始化模型API客户端""" + self._http_client = httpx.AsyncClient(verify=False) # noqa: S501 if not self.config.apiKey: self._client = AsyncOpenAI( base_url=self.config.baseUrl, timeout=self._timeout, + http_client=self._http_client, ) else: self._client = AsyncOpenAI( base_url=self.config.baseUrl, api_key=self.config.apiKey, timeout=self._timeout, + http_client=self._http_client, ) def _handle_usage_chunk(self, chunk: ChatCompletionChunk | None, messages: list[dict[str, str]]) -> None: diff --git a/apps/llm/providers/tei.py b/apps/llm/providers/tei.py index 0e5e9476c..088008622 100644 --- a/apps/llm/providers/tei.py +++ b/apps/llm/providers/tei.py @@ -52,7 +52,7 @@ class TEIProvider(BaseProvider): async def embedding(self, text: list[str]) -> list[list[float]]: """访问TEI兼容的Embedding API,获得向量化数据""" text = self._validate_input(text) - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(verify=False) as client: # noqa: S501 result = [] for single_text in text: data = { -- Gitee From 78eb5644534fe80ca80f918657bf4ba15798fbd2 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Wed, 29 Oct 2025 17:05:29 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=AD=A3Prompt=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E5=85=B6=E9=80=82=E5=90=88=E7=BB=93=E5=90=88JSON=20Generator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/core.py | 27 +-- apps/scheduler/call/facts/facts.py | 59 +++-- apps/scheduler/call/facts/prompt.py | 304 ++++++++++++------------- apps/scheduler/call/rag/prompt.py | 152 +++++++------ apps/scheduler/call/rag/rag.py | 19 +- apps/scheduler/call/slot/slot.py | 17 +- apps/scheduler/call/suggest/prompt.py | 276 +++++++++------------- apps/scheduler/call/suggest/suggest.py | 51 +---- apps/scheduler/mcp/host.py | 3 +- apps/scheduler/mcp/plan.py | 2 +- apps/scheduler/mcp/select.py | 8 +- apps/scheduler/mcp_agent/base.py | 6 +- apps/scheduler/mcp_agent/host.py | 12 +- apps/scheduler/scheduler/flow.py | 3 +- 14 files changed, 438 insertions(+), 501 deletions(-) diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 06939f9b7..6d1dc3963 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -12,7 +12,6 @@ from typing import TYPE_CHECKING, Any, ClassVar, Self from pydantic import BaseModel, ConfigDict, Field from pydantic.json_schema import SkipJsonSchema -from apps.llm import json_generator from apps.models import ExecutorHistory, LanguageType, NodeInfo from apps.schemas.enum_var import CallOutputType from apps.schemas.scheduler import ( @@ -189,7 +188,7 @@ class CoreCall(BaseModel): async def _llm(self, messages: list[dict[str, Any]], *, streaming: bool = False) -> AsyncGenerator[str, None]: """Call可直接使用的LLM非流式调用""" think_tag_opened = False - async for chunk in self._llm_obj.reasoning.call(messages, streaming=streaming): + async for chunk in self._llm_obj.call(messages, streaming=streaming): if chunk.reasoning_content: if not think_tag_opened: yield "" @@ -201,27 +200,3 @@ class CoreCall(BaseModel): yield "" think_tag_opened = False yield chunk.content - - - async def _json(self, messages: list[dict[str, Any]], function: dict[str, Any]) -> dict[str, Any]: - """Call可直接使用的JSON生成""" - # 从messages中提取最后一条用户消息作为query,其他作为conversation - query = "" - conversation = [] - - for i, msg in enumerate(messages): - role = msg.get("role") - # 跳过system消息 - if role == "system": - continue - # 找到最后一条user消息作为query - if role == "user" and i == len(messages) - 1: - query = msg.get("content", "") - else: - conversation.append(msg) - - return await json_generator.generate( - query=query, - function=function, - conversation=conversation if conversation else None, - ) diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index 378ba7634..5f2bffbec 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -4,10 +4,9 @@ from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any, Self -from jinja2 import BaseLoader -from jinja2.sandbox import SandboxedEnvironment from pydantic import Field +from apps.llm import json_generator from apps.models import LanguageType, NodeInfo from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType @@ -77,32 +76,44 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """执行工具""" data = FactsInput(**input_data) - # jinja2 环境 - env = SandboxedEnvironment( - loader=BaseLoader(), - autoescape=False, - trim_blocks=True, - lstrip_blocks=True, - extensions=["jinja2.ext.loopcontrols"], - ) - # 提取事实信息 - facts_tpl = env.from_string(FACTS_PROMPT[self._sys_vars.language]) - facts_prompt = facts_tpl.render(conversation=data.message) - facts_obj = await self._json([ - {"role": "system", "content": "You are a helpful assistant."}, + # 组装conversation消息 + facts_prompt = FACTS_PROMPT[self._sys_vars.language] + facts_conversation = [ + *data.message, {"role": "user", "content": facts_prompt}, - ], FactsGen.model_json_schema()) - facts_obj = FactsGen.model_validate(facts_obj) + ] - # 更新用户画像 - domain_tpl = env.from_string(DOMAIN_PROMPT[self._sys_vars.language]) - domain_prompt = domain_tpl.render(conversation=data.message) - domain_list = await self._json([ - {"role": "system", "content": "You are a helpful assistant."}, + # 提取事实信息 + facts_result = await json_generator.generate( + function={ + "name": "extract_facts", + "description": "Extract facts from the conversation", + "parameters": FactsGen.model_json_schema(), + }, + conversation=facts_conversation, + language=self._sys_vars.language, + ) + facts_obj = FactsGen.model_validate(facts_result) + + # 组装conversation消息 + domain_prompt = DOMAIN_PROMPT[self._sys_vars.language] + domain_conversation = [ + *data.message, {"role": "user", "content": domain_prompt}, - ], DomainGen.model_json_schema()) - domain_list = DomainGen.model_validate(domain_list) + ] + + # 更新用户画像 + domain_result = await json_generator.generate( + function={ + "name": "extract_domain", + "description": "Extract domain keywords from the conversation", + "parameters": DomainGen.model_json_schema(), + }, + conversation=domain_conversation, + language=self._sys_vars.language, + ) + domain_list = DomainGen.model_validate(domain_result) for domain in domain_list.keywords: await UserTagManager.update_user_domain_by_user_and_domain_name(data.user_id, domain) diff --git a/apps/scheduler/call/facts/prompt.py b/apps/scheduler/call/facts/prompt.py index 06f8f17bf..e4cfb9b7f 100644 --- a/apps/scheduler/call/facts/prompt.py +++ b/apps/scheduler/call/facts/prompt.py @@ -2,78 +2,69 @@ """记忆提取工具的提示词""" from textwrap import dedent +from typing import Any from apps.models import LanguageType DOMAIN_PROMPT: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" - - - 根据对话上文,提取推荐系统所需的关键词标签,要求: - 1. 实体名词、技术术语、时间范围、地点、产品等关键信息均可作为关键词标签 - 2. 至少一个关键词与对话的话题有关 - 3. 标签需精简,不得重复,不得超过10个字 - 4. 使用JSON格式输出,不要包含XML标签,不要包含任何解释说明 - - - - - 北京天气如何? - 北京今天晴。 - - - - { - "keywords": ["北京", "天气"] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - + # 任务说明 + 根据对话历史,提取推荐系统所需的关键词标签。这些标签将用于内容推荐、用户画像构建和个性化服务。 + + ## 提取要求 + + 1. **关键词类型**:可以是实体名词(人名、地名、组织名)、技术术语、产品名称、时间范围、领域概念等 + 2. **话题相关性**:至少提取一个与对话主题直接相关的关键词 + 3. **质量标准**: + - 标签应精准且简洁,每个标签不超过10个字 + - 避免重复或高度相似的标签 + - 优先提取具有区分度的关键词 + - 提取3-8个关键词为宜 + 4. **输出格式**:返回JSON对象,包含keywords字段,值为字符串数组 + + ## 示例 + + **示例1:天气查询** + - 用户:"北京天气如何?" + - 助手:"北京今天晴。" + - 提取结果:["北京", "天气"] + + **示例2:技术讨论** + - 用户:"介绍一下Python的装饰器" + - 助手:"Python装饰器是一种设计模式。" + - 提取结果:["Python", "装饰器", "设计模式"] """, ), LanguageType.ENGLISH: dedent( r""" - - - Extract keywords for recommendation system based on the previous conversation, requirements: - 1. Entity nouns, technical terms, time range, location, product, etc. can be keyword tags - 2. At least one keyword is related to the topic of the conversation - 3. Tags should be concise and not repeated, not exceeding 10 characters - 4. Output in JSON format, do not include XML tags, do not include any explanatory notes - - - - - What's the weather like in Beijing? - Beijing is sunny today. - - - - { - "keywords": ["Beijing", "weather"] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - + # Task Description + Extract keyword tags for the recommendation system based on conversation history. These tags will be used \ +for content recommendation, user profiling, and personalized services. + + ## Extraction Requirements + + 1. **Keyword Types**: Can be entity nouns (names, locations, organizations), technical terms, \ +product names, time ranges, domain concepts, etc. + 2. **Topic Relevance**: Extract at least one keyword directly related to the conversation topic + 3. **Quality Standards**: + - Tags should be precise and concise, each tag not exceeding 10 characters + - Avoid duplicate or highly similar tags + - Prioritize extracting distinctive keywords + - Extract 3-8 keywords as appropriate + 4. **Output Format**: Return JSON object containing keywords field with string array value + + ## Examples + + **Example 1: Weather Query** + - User: "What's the weather like in Beijing?" + - Assistant: "Beijing is sunny today." + - Extraction result: ["Beijing", "weather"] + + **Example 2: Technical Discussion** + - User: "Tell me about Python decorators" + - Assistant: "Python decorators are a design pattern." + - Extraction result: ["Python", "decorator", "design pattern"] """, ), } @@ -81,106 +72,107 @@ DOMAIN_PROMPT: dict[LanguageType, str] = { FACTS_PROMPT: dict[str, str] = { LanguageType.CHINESE: dedent( r""" - - - 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 - 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 - - **你需要关注的信息类型** - 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 - 2. 偏好:对待实体的态度。例如喜欢、讨厌等。 - 3. 关系:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等。 - 4. 动作:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等。 - - **要求** - 1. 事实必须准确,只能从对话中提取。不要将样例中的信息体现在输出中。 - 2. 事实必须清晰、简洁、易于理解。必须少于30个字。 - 3. 必须按照以下JSON格式输出: - - { - "facts": ["事实1", "事实2", "事实3"] - } - - - - - 杭州西湖有哪些景点? - 杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,\ -包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 - - - - { - "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - + # 任务说明 + 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 + + ## 关注的信息类型 + + 1. **实体**:对话中涉及到的实体。例如:姓名、地点、组织、事件等 + 2. **偏好**:对待实体的态度。例如喜欢、讨厌等 + 3. **关系**:用户与实体之间,或两个实体之间的关系。例如包含、并列、互斥等 + 4. **动作**:对实体产生影响的具体动作。例如查询、搜索、浏览、点击等 + + ## 提取要求 + + 1. 事实必须准确,只能从对话中提取 + 2. 事实必须清晰、简洁、易于理解,每条事实少于30个字 + 3. 输出格式:返回JSON对象,包含facts字段,值为字符串数组 + + ## 示例 + + **示例1:景点查询** + - 用户:"杭州西湖有哪些景点?" + - 助手:"西湖周围有许多著名的景点,包括苏堤、白堤、断桥、三潭印月等。" + - 提取结果:["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] + + **示例2:用户偏好** + - 用户:"我喜欢看科幻电影" + - 助手:"科幻电影确实很吸引人,比如《星际穿越》等。" + - 提取结果:["用户喜欢看科幻电影", "用户可能对《星际穿越》感兴趣"] """, ), LanguageType.ENGLISH: dedent( r""" - - - Extract key information from the conversation and organize it into unique, easily understandable \ -facts, including user preferences, relationships, entities, etc. - The following are the types of information you need to pay attention to and detailed instructions \ -on how to handle input data. - - **Types of information you need to pay attention to** - 1. Entities: Entities involved in the conversation. For example: names, locations, organizations, \ + # Task Description + Extract key information from the conversation and organize it into unique, easily understandable facts, \ +including user preferences, relationships, entities, etc. + + ## Information Types to Focus On + + 1. **Entities**: Entities involved in the conversation. For example: names, locations, organizations, \ events, etc. - 2. Preferences: Attitudes towards entities. For example: like, dislike, etc. - 3. Relationships: Relationships between users and entities, or between two entities. For example: \ + 2. **Preferences**: Attitudes towards entities. For example: like, dislike, etc. + 3. **Relationships**: Relationships between users and entities, or between two entities. For example: \ include, parallel, mutually exclusive, etc. - 4. Actions: Specific actions that affect entities. For example: query, search, browse, click, etc. - - **Requirements** - 1. Facts must be accurate and can only be extracted from the conversation. Do not include the \ -information in the example in the output. - 2. Facts must be clear, concise, and easy to understand. Must be less than 30 words. - 3. Output in the following JSON format: - - { - "facts": ["Fact 1", "Fact 2", "Fact 3"] - } - - - - - What are the attractions in Hangzhou West Lake ? - West Lake in Hangzhou, Zhejiang Province, China, is a famous scenic spot known for \ -its beautiful natural scenery and rich cultural heritage. Many notable attractions surround West Lake, including the \ -renowned Su Causeway, Bai Causeway, Broken Bridge, and the Three Pools Mirroring the Moon. Famous for its \ -crystal-clear waters and the surrounding mountains, West Lake is one of China's most famous lakes. - - - - { - "facts": ["Hangzhou West Lake has famous attractions such as Suzhou Embankment, Bai Budi, \ -Qiantang Bridge, San Tang Yue, etc."] - } - - - - - - {% for item in conversation %} - <{{item['role']}}> - {{item['content']}} - - {% endfor %} - - + 4. **Actions**: Specific actions that affect entities. For example: query, search, browse, click, etc. + + ## Extraction Requirements + + 1. Facts must be accurate and can only be extracted from the conversation + 2. Facts must be clear, concise, and easy to understand, each fact less than 30 words + 3. Output format: Return JSON object containing facts field with string array value + + ## Examples + + **Example 1: Attraction Query** + - User: "What are the attractions in Hangzhou West Lake?" + - Assistant: "Notable attractions include Su Causeway, Bai Causeway, Broken Bridge, etc." + - Extraction result: ["Hangzhou West Lake has Su Causeway, Bai Causeway, Broken Bridge, etc."] + + **Example 2: User Preference** + - User: "I like watching sci-fi movies" + - Assistant: "Sci-fi movies are indeed attractive, such as Interstellar." + - Extraction result: ["User likes watching sci-fi movies", "User may be interested in Interstellar"] """, ), } + +DOMAIN_FUNCTION: dict[str, Any] = { + "name": "extract_domain", + "description": "从对话中提取领域关键词标签 / Extract domain keyword tags from conversation", + "parameters": { + "type": "object", + "properties": { + "keywords": { + "type": "array", + "items": {"type": "string"}, + "description": "关键词或标签列表 / List of keywords or tags", + }, + }, + "required": ["keywords"], + }, + "examples": [ + {"keywords": ["北京", "天气"]}, + {"keywords": ["Python", "装饰器", "设计模式"]}, + ], +} + +FACTS_FUNCTION: dict[str, Any] = { + "name": "extract_facts", + "description": "从对话中提取关键事实信息 / Extract key fact information from conversation", + "parameters": { + "type": "object", + "properties": { + "facts": { + "type": "array", + "items": {"type": "string"}, + "description": "从对话中提取的事实条目 / Fact entries extracted from conversation", + }, + }, + "required": ["facts"], + }, + "examples": [ + {"facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"]}, + {"facts": ["用户喜欢看科幻电影", "用户可能对《星际穿越》感兴趣"]}, + ], +} diff --git a/apps/scheduler/call/rag/prompt.py b/apps/scheduler/call/rag/prompt.py index cc8691462..b4d849d2f 100644 --- a/apps/scheduler/call/rag/prompt.py +++ b/apps/scheduler/call/rag/prompt.py @@ -6,79 +6,89 @@ from textwrap import dedent from apps.models import LanguageType QUESTION_REWRITE: dict[LanguageType, str] = { - LanguageType.CHINESE: dedent(r""" - - - 根据用户当前的提问,推断用户的实际意图并补全用户的提问内容。要求: - 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; - 2. 若用户当前提问内容已足够完整,请直接输出用户的提问内容。 - 3. 补全内容必须精准、恰当,不要编造任何内容。 - 4. 请参考上下文理解用户的真实意图,确保补全后的问题与上下文保持一致。 - 5. 请输出补全后的问题,不要输出其他内容。 - 输出格式样例: - ```json - { - "question": "补全后的问题" - } - ``` - + LanguageType.CHINESE: dedent( + r""" + 你需要分析用户的当前提问,结合对话历史上下文,理解用户的真实意图并优化问题表述,使其更适合知识库检索。 - - - openEuler的优势有哪些? - - - ```json - { - "question": "openEuler操作系统的优势和应用场景是什么?" - } - ``` - - - + ## 要求 + - 参考对话历史理解用户的真实意图,补全省略的信息(如代词、缩略语等) + - 如果问题已经足够完整和明确,直接使用原问题,不要过度修改 + - 优化后的问题应该更加精准、具体,便于知识库检索匹配 + - 保持问题的核心语义不变,不要编造原问题中没有的信息 + - 适当扩展相关的关键术语和概念,提高检索召回率 - - {{question}} - + ## 示例 - 现在,请输出补全后的问题: - - """).strip("\n"), - LanguageType.ENGLISH: dedent(r""" - - - Based on the user's current question, infer the user's actual intent and complete the user's question. \ -Requirements: - 1. Please output in JSON format, referring to the example provided below; do not include any XML \ -tags or any explanatory notes; - 2. If the user's current question is already complete enough, directly output the user's question. - 3. The completed content must be precise and appropriate; do not fabricate any content. - 4. Please refer to the context to understand the user's true intent, ensuring that the \ -completed question is consistent with the context. - 5. Output only the completed question; do not include any other content. - Example output format: - ```json - { - "question": "The completed question" - } - ``` - + **示例1:补全上下文中的指代关系** + - 对话历史: + - 用户: openEuler是什么? + - 助手: openEuler是一个开源操作系统。 + - 当前问题:它的优势有哪些? + - 优化结果:openEuler操作系统的优势和特点是什么? - - - What are the features of openEuler? - - - ```json - { - "question": "What are the features and application scenarios of openEuler?" - } - ``` - - - - - {{question}} - - """).strip("\n"), + **示例2:扩展关键术语** + - 对话历史:无 + - 当前问题:如何安装Docker? + - 优化结果:如何在Linux系统上安装和配置Docker容器引擎? + + ## 用户当前问题 + {{question}} + """, + ).strip(), + LanguageType.ENGLISH: dedent( + r""" + Analyze the user's current question in the context of the conversation history to understand their true \ +intent and optimize the phrasing for knowledge base retrieval. + + ## Requirements + - Reference conversation history to understand true intent and complete omitted information (pronouns, \ +abbreviations, etc.) + - If the question is already complete and clear, use it as-is without over-modification + - The optimized question should be more precise and specific for better knowledge base matching + - Maintain the core semantics without fabricating information not present in the original question + - Appropriately expand related key terms and concepts to improve retrieval recall + + ## Examples + + **Example 1: Complete contextual references** + - Conversation history: + - User: What is openEuler? + - Assistant: openEuler is an open source operating system. + - Current question: What are its features? + - Optimized result: What are the features and advantages of the openEuler operating system? + + **Example 2: Expand key terms** + - Conversation history: None + - Current question: How to install Docker? + - Optimized result: How to install and configure Docker container engine on Linux system? + + ## User's Current Question + {{question}} + """, + ).strip(), +} + +QUESTION_REWRITE_FUNCTION: dict[str, object] = { + "name": "rewrite_question", + "description": ( + "基于上下文优化用户问题,使其更适合知识库检索 / " + "Optimize user question based on context for better knowledge base retrieval" + ), + "parameters": { + "type": "object", + "properties": { + "question": { + "type": "string", + "description": ( + "优化后的问题。应该完整、明确、包含关键信息,便于知识库检索 / " + "The optimized question that is complete, clear, and retrieval-friendly" + ), + }, + }, + "required": ["question"], + }, + "examples": [ + {"question": "openEuler操作系统的优势和特点是什么?"}, + {"question": "How to install and configure Docker container engine on Linux system?"}, + ], } diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index 104d898fa..59c0eadbe 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -14,6 +14,7 @@ from jinja2.sandbox import SandboxedEnvironment from pydantic import Field from apps.common.config import config +from apps.llm import json_generator from apps.models import LanguageType from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType @@ -25,7 +26,7 @@ from apps.schemas.scheduler import ( ) from apps.services.document import DocumentManager -from .prompt import QUESTION_REWRITE +from .prompt import QUESTION_REWRITE, QUESTION_REWRITE_FUNCTION from .schema import ( DocItem, QuestionRewriteOutput, @@ -155,7 +156,7 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """调用RAG工具""" data = RAGInput(**input_data) - # 使用Jinja2渲染问题重写模板,并用JsonGenerator解析结果 + # 使用Jinja2渲染问题重写模板,并用json_generator解析结果 try: env = SandboxedEnvironment( loader=BaseLoader(), @@ -166,11 +167,15 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): tmpl = env.from_string(QUESTION_REWRITE[self._sys_vars.language]) prompt = tmpl.render(question=data.query) - # 使用_json方法直接获取JSON结果 - json_result = await self._json([ - *self._sys_vars.background.conversation[-self.history_len:], - {"role": "user", "content": prompt}, - ], schema=QuestionRewriteOutput.model_json_schema()) + # 使用json_generator直接获取JSON结果 + json_result = await json_generator.generate( + function=QUESTION_REWRITE_FUNCTION, + conversation=[ + *self._sys_vars.background.conversation[-self.history_len:], + {"role": "user", "content": prompt}, + ], + language=self._sys_vars.language, + ) # 直接使用解析后的JSON结果 data.query = QuestionRewriteOutput.model_validate(json_result).question except Exception: diff --git a/apps/scheduler/call/slot/slot.py b/apps/scheduler/call/slot/slot.py index 150bba286..a3930924b 100644 --- a/apps/scheduler/call/slot/slot.py +++ b/apps/scheduler/call/slot/slot.py @@ -47,7 +47,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): async def _llm_slot_fill(self, remaining_schema: dict[str, Any]) -> tuple[str, dict[str, Any]]: - """使用JsonGenerator填充参数;若大模型解析度足够,则直接返回结果""" + """使用json_generator填充参数;若大模型解析度足够,则直接返回结果""" env = SandboxedEnvironment( loader=BaseLoader(), autoescape=False, @@ -104,10 +104,12 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): "description": f"Fill the missing parameters for {self.name}. {self.description}", "parameters": remaining_schema, } + # Append query as the last user message + conversation.append({"role": "user", "content": query}) data = await json_generator.generate( - query=query, function=function, conversation=conversation, + language=self._sys_vars.language, ) answer = json.dumps(data, ensure_ascii=False) return answer, data @@ -118,7 +120,16 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): {"role": "user", "content": self._question}, {"role": "assistant", "content": answer}, ] - return await self._json(messages=conversation, schema=remaining_schema) + function = { + "name": "fill_parameters", + "description": f"Fill the missing parameters for {self.name}. {self.description}", + "parameters": remaining_schema, + } + return await json_generator.generate( + function=function, + conversation=conversation, + language=self._sys_vars.language, + ) @classmethod async def instance(cls, executor: "StepExecutor", node: NodeInfo | None, **kwargs: Any) -> Self: diff --git a/apps/scheduler/call/suggest/prompt.py b/apps/scheduler/call/suggest/prompt.py index f2411fd3e..e90985e0d 100644 --- a/apps/scheduler/call/suggest/prompt.py +++ b/apps/scheduler/call/suggest/prompt.py @@ -1,191 +1,137 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""问题推荐工具的提示词""" +"""问题推荐工具的提示词和Function Schema""" from textwrap import dedent from apps.models import LanguageType +# Function Schema for question suggestion +SUGGEST_FUNCTION_SCHEMA = { + "name": "generate_suggestions", + "description": "Generate recommended follow-up questions based on conversation context and user interests / " + "基于对话上下文和用户兴趣生成推荐的后续问题", + "parameters": { + "type": "object", + "properties": { + "predicted_questions": { + "type": "array", + "description": "List of predicted questions, each should be a complete interrogative or imperative " + "sentence / 预测的问题列表,每个问题应该是完整的疑问句或祈使句", + "items": { + "type": "string", + "description": "Single recommended question, not exceeding 30 words / 单个推荐问题,长度不超过30字", + }, + }, + }, + "required": ["predicted_questions"], + }, + "examples": [ + { + "predicted_questions": [ + "What is the best season to visit Hangzhou? / 杭州的最佳旅游季节是什么时候?", + "What are the opening hours and ticket information for Lingyin Temple? / " + "灵隐寺的开放时间和门票信息?", + "Which attractions in Hangzhou are suitable for family trips? / 杭州有哪些适合亲子游的景点?", + ], + }, + { + "predicted_questions": [ + "What are the characteristics of dictionaries and sets? / 字典和集合有什么特点?", + "How to handle exceptions in Python? / 如何在Python中处理异常?", + "How to use list comprehensions? / 列表推导式怎么使用?", + ], + }, + ], +} + SUGGEST_PROMPT: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" - - - 根据先前的历史对话和提供的附加信息(用户倾向、问题列表、工具信息等)生成指定数量的预测问题。 - 中包含了用户已提出过的所有问题,请避免重复生成这些问题。 - 用户倾向将在标签中给出,工具信息将在标签中给出。 - - 生成预测问题时的要求: - 1. 以用户口吻生成预测问题,数量必须为指定的数量,必须为疑问句或祈使句,必须少于30字。 - 2. 预测问题必须精简,不得发生重复,不得在问题中掺杂非必要信息,不得输出除问题以外的文字。 - 3. 输出必须按照如下格式: - - ```json - { - "predicted_questions": [ - "预测问题1", - "预测问题2", - ... - ] - } - ``` - - - - - 简单介绍一下杭州 - 杭州有哪些著名景点? - 杭州西湖景区的门票价格是多少? - - 3 - - 景点查询 - 查询景点信息 - - ["杭州", "旅游"] - - 现在,进行问题生成: - - { - "predicted_questions": [ - "杭州的天气怎么样?", - "杭州有什么特色美食?" - ] - } - - - - 下面是实际的数据: - - 以下是问题列表,请参考其内容并避免重复生成: + 请根据对话历史和用户兴趣,生成{% if target_num %}{{ target_num }}{% else %}3-5{% endif %}个\ +用户可能感兴趣的后续问题。 + {% if history or generated %} - - {% for question in history %} - {{ question }} - {% endfor %} - {% for question in generated %} - {{ question }} - {% endfor %} - - {% else %} - (无已知问题) + **已讨论的问题:** + {% for question in history %} + - {{ question }} + {% endfor %} + {% for question in generated %} + - {{ question }} + {% endfor %} {% endif %} - {% if target_num %} - 请生成{{ target_num }}个问题。 + {% if tool %} + **可用工具:**{{ tool.name }}({{ tool.description }}) {% endif %} - - {% if tool %} - {{ tool.name }} - {{ tool.description }} - {% else %} - (无工具信息) - {% endif %} - - - - {% if preference %} - {{ preference }} - {% else %} - (无用户倾向) - {% endif %} - - - 现在,进行问题生成: + {% if preference %} + **用户兴趣:**{{ preference | join('、') }} + {% endif %} + + **要求:** + - 以用户口吻提问,使用疑问句或祈使句 + - 每个问题不超过30字,具体明确、富有探索性 + - 避免与已讨论问题重复 + - 问题应与可用工具和用户兴趣相关,能推进对话深度或拓展话题 + + **参考示例:** + + 示例1 - 旅游场景: + 当用户已讨论"杭州简介、杭州著名景点、西湖门票价格",可用工具为"景点查询", + 用户兴趣为"杭州、旅游"时,可生成: + 杭州的最佳旅游季节是什么时候?灵隐寺的开放时间和门票信息? + 杭州有哪些适合亲子游的景点? + + 示例2 - 编程场景: + 当用户已讨论"Python基础语法、列表和元组的区别",可用工具为"代码搜索", + 用户兴趣为"Python编程、数据结构"时,可生成: + 字典和集合有什么特点?如何在Python中处理异常?列表推导式怎么使用? """, ), LanguageType.ENGLISH: dedent( r""" - - - Generate the specified number of predicted questions based on the previous historical - dialogue and provided additional information (user preferences, question list, - tool information, etc.). - The contains all the questions that the user has asked before, - please avoid duplicating these questions when generating predictions. - User preferences will be given in the tag, and tool information will be - given in the tag. - - Requirements for generating predicted questions: - 1. Generate predicted questions in the user's voice, the quantity must be the specified - number, must be interrogative or imperative sentences, and must be less than 30 words. - 2. Predicted questions must be concise, without duplication, without unnecessary - information, and without text other than the questions. - 3. The output must be in the following format: - - ```json - { - "predicted_questions": [ - "Predicted question 1", - "Predicted question 2", - ... - ] - } - ``` - - - - - Briefly introduce Hangzhou - What are the famous attractions in Hangzhou? - What is the ticket price for the West Lake Scenic Area in Hangzhou? - - 3 - - Scenic Spot Search - Search for scenic spot information - - ["Hangzhou", "Tourism"] - - Now, generate questions: - - { - "predicted_questions": [ - "What's the weather like in Hangzhou?", - "What are the local specialties in Hangzhou?" - ] - } - - - - Here is the actual data: - - The following is a list of questions, please refer to its content and avoid duplicate generation: + Please generate {% if target_num %}{{ target_num }}{% else %}3-5{% endif %} follow-up questions \ +that the user might be interested in, based on conversation history and user interests. + {% if history or generated %} - - {% for question in history %} - {{ question }} - {% endfor %} - {% for question in generated %} - {{ question }} - {% endfor %} - - {% else %} - (No known questions) + **Questions already discussed:** + {% for question in history %} + - {{ question }} + {% endfor %} + {% for question in generated %} + - {{ question }} + {% endfor %} + {% endif %} + + {% if tool %} + **Available tool:** {{ tool.name }} ({{ tool.description }}) {% endif %} - {% if target_num %} - Please generate {{ target_num }} questions. + {% if preference %} + **User interests:** {{ preference | join(', ') }} {% endif %} - - {% if tool %} - {{ tool.name }} - {{ tool.description }} - {% else %} - (No tool information) - {% endif %} - - - - {% if preference %} - {{ preference }} - {% else %} - (No user preference) - {% endif %} - - - Now, generate questions: + **Requirements:** + - Use the user's voice with interrogative or imperative sentences + - Each question under 30 words, specific and exploratory + - Avoid repeating discussed questions + - Questions should relate to available tools and user interests, deepening or expanding the conversation + + **Reference examples:** + + Example 1 - Tourism scenario: + When the user has discussed "Hangzhou introduction, famous attractions in Hangzhou, + West Lake ticket prices", available tool is "Scenic Spot Search", and user interests are + "Hangzhou, Tourism", you can generate: + What is the best season to visit Hangzhou? + What are the opening hours and ticket information for Lingyin Temple? + Which attractions in Hangzhou are suitable for family trips? + + Example 2 - Programming scenario: + When the user has discussed "Python basics, difference between lists and tuples", available tool is + "Code Search", and user interests are "Python programming, Data structures", you can generate: + What are the characteristics of dictionaries and sets? How to handle exceptions in Python? + How to use list comprehensions? """, ), } diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 7921c7785..16505d500 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -6,11 +6,12 @@ import uuid from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any, Self -from jinja2 import BaseLoader +from jinja2 import BaseLoader, Template from jinja2.sandbox import SandboxedEnvironment from pydantic import Field from pydantic.json_schema import SkipJsonSchema +from apps.llm import json_generator from apps.models import LanguageType, NodeInfo from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType @@ -22,7 +23,7 @@ from apps.schemas.scheduler import ( ) from apps.services.user_tag import UserTagManager -from .prompt import SUGGEST_PROMPT +from .prompt import SUGGEST_FUNCTION_SCHEMA, SUGGEST_PROMPT from .schema import ( SingleFlowSuggestionConfig, SuggestGenResult, @@ -74,7 +75,6 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO async def _init(self, call_vars: CallVars) -> SuggestionInput: """初始化""" - # 从 ExecutorBackground 中获取历史问题 self._history_questions = call_vars.background.history_questions self._app_id = call_vars.ids.app_id self._flow_id = call_vars.ids.executor_id @@ -86,7 +86,6 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO ) self._avaliable_flows = {} - # 只有当_app_id不为None时才获取Flow信息 from apps.services.flow import FlowManager # noqa: PLC0415 if self._app_id is not None: @@ -107,22 +106,15 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO """运行问题推荐""" data = SuggestionInput(**input_data) - # 获取当前用户的画像 user_domain_info = await UserTagManager.get_user_domain_by_user_and_topk(data.user_id, 5) user_domain = [tag.name for tag in user_domain_info] - # 初始化Prompt prompt_tpl = self._env.from_string(SUGGEST_PROMPT[self._sys_vars.language]) - # 如果设置了configs,则按照configs生成问题 if self.configs: - async for output_chunk in self._process_configs( - prompt_tpl, - user_domain, - ): + async for output_chunk in self._process_configs(): yield output_chunk return - # 如果_app_id为None,直接生成N个推荐问题 if self._app_id is None: async for output_chunk in self._generate_general_questions( prompt_tpl, @@ -132,7 +124,6 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO yield output_chunk return - # 如果_app_id不为None,获取App中所有Flow并为每个Flow生成问题 async for output_chunk in self._generate_questions_for_all_flows( prompt_tpl, user_domain, @@ -141,18 +132,13 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO async def _generate_questions_from_llm( self, - prompt_tpl: Any, + prompt_tpl: Template, tool_info: dict[str, Any] | None, user_domain: list[str], generated_questions: set[str] | None = None, target_num: int | None = None, ) -> SuggestGenResult: """通过LLM生成问题""" - # 合并历史问题和已生成问题为question_list - question_list = list(self._history_questions) - if generated_questions: - question_list.extend(list(generated_questions)) - prompt = prompt_tpl.render( history=self._history_questions, generated=list(generated_questions) if generated_questions else None, @@ -165,23 +151,22 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO *self._sys_vars.background.conversation, {"role": "user", "content": prompt}, ] - result = await self._json( - messages=messages, - schema=SuggestGenResult.model_json_schema(), + result = await json_generator.generate( + function=SUGGEST_FUNCTION_SCHEMA, + conversation=messages, + language=self._sys_vars.language, ) return SuggestGenResult.model_validate(result) async def _generate_general_questions( self, - prompt_tpl: Any, + prompt_tpl: Template, user_domain: list[str], target_num: int, ) -> AsyncGenerator[CallOutputChunk, None]: """生成通用问题(无app_id时)""" pushed_questions = 0 attempts = 0 - - # 用于跟踪已经生成过的问题,避免重复 generated_questions = set() while pushed_questions < target_num and attempts < self.num: @@ -194,18 +179,15 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO target_num, ) - # 过滤掉已经生成过的问题 unique_questions = [ q for q in questions.predicted_questions if q not in generated_questions ] - # 输出生成的问题,直到达到目标数量 for question in unique_questions: if pushed_questions >= target_num: break - # 将问题添加到已生成集合中 generated_questions.add(question) yield CallOutputChunk( @@ -221,12 +203,11 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO async def _generate_questions_for_all_flows( self, - prompt_tpl: Any, + prompt_tpl: Template, user_domain: list[str], ) -> AsyncGenerator[CallOutputChunk, None]: """为App中所有Flow生成问题""" for flow_id, flow_info in self._avaliable_flows.items(): - # 为每个Flow生成一个问题 questions = await self._generate_questions_from_llm( prompt_tpl, { @@ -235,10 +216,7 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO }, user_domain, ) - # 随机选择一个生成的问题 question = questions.predicted_questions[random.randint(0, len(questions.predicted_questions) - 1)] # noqa: S311 - - # 判断是否为当前Flow,设置isHighlight is_highlight = (flow_id == self._flow_id) yield CallOutputChunk( @@ -254,12 +232,9 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO async def _process_configs( self, - prompt_tpl: Any, - user_domain: list[str], ) -> AsyncGenerator[CallOutputChunk, None]: """处理配置中的问题""" for config in self.configs: - # 如果flow_id为None,生成通用问题 if config.flow_id is None: yield CallOutputChunk( type=CallOutputType.DATA, @@ -268,18 +243,16 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO flowName=None, flowId=None, flowDescription=None, - isHighlight=False, # 通用问题不设置高亮 + isHighlight=False, ).model_dump(by_alias=True, exclude_none=True), ) else: - # 检查flow_id是否存在于可用Flow中 if config.flow_id not in self._avaliable_flows: raise CallError( message="配置的Flow ID不存在", data={}, ) - # 判断是否为当前Flow,设置isHighlight is_highlight = (config.flow_id == self._flow_id) yield CallOutputChunk( diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index 1e7e2f174..33285ebc4 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -115,11 +115,12 @@ class MCPHost: # 使用全局json_generator实例 return await json_generator.generate( - query=llm_query, function=function_definition, conversation=[ {"role": "user", "content": await self.assemble_memory()}, + {"role": "user", "content": llm_query}, ], + language=self._language, ) diff --git a/apps/scheduler/mcp/plan.py b/apps/scheduler/mcp/plan.py index ba7491055..69b8c2c66 100644 --- a/apps/scheduler/mcp/plan.py +++ b/apps/scheduler/mcp/plan.py @@ -80,11 +80,11 @@ class MCPPlanner: # 使用全局json_generator实例解析结果 plan = await json_generator.generate( - query=result, function=function_def, conversation=[ {"role": "user", "content": result}, ], + language=self._language, ) return MCPPlan.model_validate(plan) diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py index 1028f9b1d..65c5f0d12 100644 --- a/apps/scheduler/mcp/select.py +++ b/apps/scheduler/mcp/select.py @@ -56,11 +56,13 @@ class MCPSelector: mcp_ids=", ".join(mcp_ids), ) - # 使用JsonGenerator生成JSON + # 使用json_generator生成JSON result = await json_generator.generate( - query=user_prompt, function=function, - conversation=[], + conversation=[ + {"role": "user", "content": user_prompt}, + ], + language=self._language, ) try: diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index 853c27c38..12d58f09c 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -45,9 +45,13 @@ class MCPBase: async def get_json_result(self, result: str, function: dict[str, Any]) -> dict[str, Any]: """解析推理结果;function使用OpenAI标准Function格式""" return await json_generator.generate( - query="Please provide a JSON response based on the above information and schema.\n\n", function=function, conversation=[ {"role": "user", "content": result}, + { + "role": "user", + "content": "Please provide a JSON response based on the above information and schema.", + }, ], + language=self._language, ) diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index a4b30e5f6..ef2f446cc 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -59,10 +59,15 @@ class MCPHost(MCPBase): background_info=await self.assemble_memory(runtime, context), ) _logger.info("[MCPHost] 填充工具参数: %s", prompt) - # 使用JsonGenerator解析结果 + # 使用json_generator解析结果 + function = { + "name": mcp_tool.toolName, + "description": mcp_tool.description, + "parameters": mcp_tool.inputSchema, + } return await self.get_json_result( prompt, - mcp_tool.inputSchema, + function, ) async def fill_params( # noqa: D102, PLR0913 @@ -96,9 +101,10 @@ class MCPHost(MCPBase): } return await json_generator.generate( - query=llm_query, function=function, conversation=[ {"role": "user", "content": prompt}, + {"role": "user", "content": llm_query}, ], + language=language, ) diff --git a/apps/scheduler/scheduler/flow.py b/apps/scheduler/scheduler/flow.py index c42b0ee96..ab1d5ca94 100644 --- a/apps/scheduler/scheduler/flow.py +++ b/apps/scheduler/scheduler/flow.py @@ -56,12 +56,13 @@ class FlowMixin: "parameters": schema, } result_str = await json_generator.generate( - query=self.post_body.question, function=function, conversation=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, + {"role": "user", "content": self.post_body.question}, ], + language=self.task.runtime.language, ) result = TopFlow.model_validate(result_str) return result.choice -- Gitee