diff --git a/apps/llm/generator.py b/apps/llm/generator.py
index f2f681faa7cb41cb8e4c07e70f7903d7503b92f8..656ef2800580ab795ffd49cc9b7ec656f27e1411 100644
--- a/apps/llm/generator.py
+++ b/apps/llm/generator.py
@@ -10,11 +10,12 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from jsonschema import Draft7Validator
-from apps.models import LLMType
+from apps.models import LanguageType, LLMType
from apps.schemas.llm import LLMFunctions
from .llm import LLM
-from .prompt import JSON_GEN_BASIC, JSON_NO_FUNCTION_CALL
+from .prompt import JSON_GEN
+from .token import token_calculator
_logger = logging.getLogger(__name__)
@@ -53,11 +54,71 @@ class JsonGenerator:
_logger.info("[JSONGenerator] LLM不支持FunctionCall,将使用prompt方式")
self._support_function_call = False
+ def _build_messages(
+ self,
+ function: dict[str, Any],
+ conversation: list[dict[str, str]],
+ language: LanguageType = LanguageType.CHINESE,
+ ) -> list[dict[str, str]]:
+ """构建messages,提取query并使用JSON_GEN模板格式化"""
+ if conversation[-1]["role"] == "user":
+ query = conversation[-1]["content"]
+ else:
+ err = "[JSONGenerator] 对话历史中最后一项必须是用户消息"
+ raise RuntimeError(err)
+
+ template = self._env.from_string(JSON_GEN[language])
+ prompt = template.render(
+ query=query,
+ conversation=conversation[:-1],
+ schema=function["parameters"],
+ use_xml_format=False,
+ )
+
+ messages = [*conversation[:-1], {"role": "user", "content": prompt}]
+
+ # 计算Token数量
+ if self._llm is not None:
+ token_count = token_calculator.calculate_token_length(messages)
+ ctx_length = self._llm.config.ctxLength
+
+ # 进行消息裁剪
+ if token_count > ctx_length:
+ _logger.warning(
+ "[JSONGenerator] 当前对话 Token 数量 (%d) 超过模型上下文长度 (%d),进行消息裁剪",
+ token_count,
+ ctx_length,
+ )
+
+ trimmed_conversation = list(conversation[:-1])
+
+ while trimmed_conversation and token_count > ctx_length:
+ if len(trimmed_conversation) >= 2 and \
+ trimmed_conversation[0]["role"] == "user" and \
+ trimmed_conversation[1]["role"] == "assistant": # noqa: PLR2004
+ trimmed_conversation = trimmed_conversation[2:]
+ elif trimmed_conversation:
+ trimmed_conversation = trimmed_conversation[1:]
+ else:
+ break
+
+ # 重新构建 messages 并计算 token
+ messages = [*trimmed_conversation, {"role": "user", "content": prompt}]
+ token_count = token_calculator.calculate_token_length(messages)
+
+ _logger.info(
+ "[JSONGenerator] 裁剪后对话 Token 数量: %d,移除了 %d 条消息",
+ token_count,
+ len(conversation) - len(trimmed_conversation) - 1,
+ )
+
+ return messages
+
async def _single_trial(
self,
function: dict[str, Any],
- query: str,
context: list[dict[str, str]],
+ language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""单次尝试,包含校验逻辑;function使用OpenAI标准Function格式"""
if self._llm is None:
@@ -70,10 +131,10 @@ class JsonGenerator:
# 执行生成
if self._support_function_call:
# 如果支持FunctionCall
- result = await self._call_with_function(function, query, context)
+ result = await self._call_with_function(function, context, language)
else:
# 如果不支持FunctionCall
- result = await self._call_without_function(function, query, context)
+ result = await self._call_without_function(function, context, language)
# 校验结果
try:
@@ -94,23 +155,22 @@ class JsonGenerator:
async def _call_with_function(
self,
function: dict[str, Any],
- query: str,
- context: list[dict[str, str]],
+ conversation: list[dict[str, str]],
+ language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""使用FunctionCall方式调用"""
if self._llm is None:
err = "[JSONGenerator] 未初始化,请先调用init()方法"
raise RuntimeError(err)
+ messages = self._build_messages(function, conversation, language)
+
tool = LLMFunctions(
name=function["name"],
description=function["description"],
param_schema=function["parameters"],
)
- messages = context.copy()
- messages.append({"role": "user", "content": query})
-
tool_call_result = {}
async for chunk in self._llm.call(messages, include_thinking=False, streaming=True, tools=[tool]):
if chunk.tool_call:
@@ -125,25 +185,15 @@ class JsonGenerator:
async def _call_without_function(
self,
function: dict[str, Any],
- query: str,
- context: list[dict[str, str]],
+ conversation: list[dict[str, str]],
+ language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
"""不使用FunctionCall方式调用"""
if self._llm is None:
err = "[JSONGenerator] 未初始化,请先调用init()方法"
raise RuntimeError(err)
- template = self._env.from_string(JSON_GEN_BASIC + "\n\n" + JSON_NO_FUNCTION_CALL)
- prompt = template.render(
- query=query,
- conversation=context[1:] if context else [],
- schema=function["parameters"],
- )
-
- messages = [
- context[0],
- {"role": "user", "content": prompt},
- ]
+ messages = self._build_messages(function, conversation, language)
# 使用LLM的call方法获取响应
full_response = ""
@@ -162,22 +212,11 @@ class JsonGenerator:
async def generate(
self,
- query: str,
function: dict[str, Any],
conversation: list[dict[str, str]] | None = None,
+ language: LanguageType = LanguageType.CHINESE,
) -> dict[str, Any]:
- """
- 生成JSON;function使用OpenAI标准Function格式
-
- Args:
- query: 用户查询
- function: OpenAI标准Function格式的函数定义
- conversation: 对话历史,默认为空列表
-
- Returns:
- 生成的JSON对象
-
- """
+ """生成JSON;function使用OpenAI标准Function格式"""
if self._llm is None:
err = "[JSONGenerator] 未初始化,请先调用init()方法"
raise RuntimeError(err)
@@ -202,7 +241,7 @@ class JsonGenerator:
count += 1
try:
# 如果_single_trial没有抛出异常,直接返回结果,不进行重试
- return await self._single_trial(function, query, context)
+ return await self._single_trial(function, context, language)
except Exception:
_logger.exception(
"[JSONGenerator] 第 %d/%d 次尝试失败",
diff --git a/apps/llm/prompt.py b/apps/llm/prompt.py
index 9851b282292f16a4fdbc556ff9428e61e1a4bf26..6ece04539e6d50896980a8d6dc2bf956e420c900 100644
--- a/apps/llm/prompt.py
+++ b/apps/llm/prompt.py
@@ -3,89 +3,206 @@
from textwrap import dedent
-JSON_GEN_BASIC = dedent(r"""
-
-
- You are an intelligent assistant who can use tools to help answer user queries.
- Your task is to respond to the query according to the background information and available tools.
-
- Note:
- - You have access to a set of tools that can help you gather information.
- - You can use one tool at a time and will receive the result in the user's response.
- - Use tools step-by-step to respond to the user's query, with each tool use informed by the \
-result of the previous tool use.
- - The user's query is provided in the tags.
- {% if previous_trial %}- Review the previous trial information in \
-tags to avoid repeating mistakes.{% endif %}
-
-
-
-
- {{ query }}
-
- {% if previous_trial %}
-
-
-
- You previously attempted to answer the query by calling a tool, but the arguments were incorrect.
-
-
- {{ previous_trial }}
-
-
- {{ err_info }}
-
-
- {% endif %}
-
-
- You have access to a set of tools. You can use one tool and will receive the result of that tool \
-use in the user's response.
-
-""")
-
-JSON_NO_FUNCTION_CALL = dedent(r"""
- **Tool Use Formatting:**
- Tool uses are formatted using XML-style tags. The tool name itself becomes the root XML tag name. \
-Each parameter is enclosed within its own set of tags according to the parameter schema provided below.
-
- **Basic Structure:**
-
- value
-
-
- **Parameter Schema:**
- The available tools and their parameter schemas are provided in the following format:
- - Tool name: The name to use as the root XML tag
- - Parameters: Each parameter has a name, type, and description
- - Required parameters must be included
- - Optional parameters can be omitted
-
- **XML Generation Rules:**
- 1. Use the exact tool name as the root XML tag
- 2. For each parameter, create a nested tag with the parameter name
- 3. Place the parameter value inside the corresponding tag
- 4. For string values: text value
- 5. For numeric values: 123
- 6. For boolean values: true or false
- 7. For array values: wrap each item in the parameter tag
- item1
- item2
- 8. For object values: nest the object properties as sub-tags
-
- value1
- value2
-
-
- **Example:**
- If you need to use a tool named "search" with parameters query (string) and limit (number):
-
-
- your search text
- 10
-
-
- Always use the actual tool name as the root XML tag and match parameter names exactly as specified \
-in the schema for proper parsing and execution.
-""")
+from apps.models import LanguageType
+
+JSON_GEN: dict[LanguageType, str] = {
+ LanguageType.CHINESE: dedent(
+ r"""
+ 你是一个智能助手,可以访问帮助回答用户查询的工具。
+ 你的任务是使用可用的工具和背景信息来响应查询。
+
+
+ - 你可以访问能够帮助收集信息的工具
+ - 逐步使用工具,每次使用都基于之前的结果
+ - 用户的查询在 标签中提供
+ {% if previous_trial %}- 查看 信息以避免重复错误{% endif %}
+ {% if use_xml_format %}- 使用 XML 样式的标签格式化工具调用,其中工具名称是根标签,每个参数是嵌套标签
+ - 使用架构中指定的确切工具名称和参数名称
+ - 基本格式结构:
+ <工具名称>
+ <参数名称>值参数名称>
+ 工具名称>
+ - 参数类型:
+ * 字符串:搜索文本
+ * 数字:10
+ * 布尔值:true
+ * 数组(重复标签):项目1项目2
+ * 对象(嵌套标签):值{% endif %}
+
+ {% if use_xml_format %}
+
+
+
+ 杭州的天气怎么样?
+
+
+
+
+ get_weather: 获取指定城市的当前天气信息
+
+
+ {
+ "name": "get_weather",
+ "description": "获取指定城市的当前天气信息",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "city": {
+ "type": "string",
+ "description": "要查询天气的城市名称"
+ },
+ "unit": {
+ "type": "string",
+ "enum": ["celsius", "fahrenheit"],
+ "description": "温度单位"
+ },
+ "include_forecast": {
+ "type": "boolean",
+ "description": "是否包含预报数据"
+ }
+ },
+ "required": ["city"]
+ }
+ }
+
+
+
+ 助手响应:
+
+ 杭州
+ celsius
+ false
+
+
+ {% endif %}
+
+
+ {{ query }}
+
+ {% if previous_trial %}
+
+
+
+ 你之前的工具调用有不正确的参数。
+
+
+ {{ previous_trial }}
+
+
+ {{ err_info }}
+
+
+ {% endif %}
+
+
+
+ {{ tool_descriptions }}
+
+
+ {{ tool_schemas }}
+
+
+ """,
+ ),
+ LanguageType.ENGLISH: dedent(
+ r"""
+ You are an intelligent assistant with access to tools that help answer user queries.
+ Your task is to respond to queries using the available tools and background information.
+
+
+ - You have access to tools that can help gather information
+ - Use tools step-by-step, with each use informed by previous results
+ - The user's query is provided in the tags
+ {% if previous_trial %}- Review the information to avoid \
+repeating mistakes{% endif %}
+ {% if use_xml_format %}- Format tool calls using XML-style tags where the tool name is the root tag \
+and each parameter is a nested tag
+ - Use the exact tool name and parameter names as specified in the schema
+ - Basic format structure:
+
+ value
+
+ - Parameter types:
+ * String: search text
+ * Number: 10
+ * Boolean: true
+ * Array (repeat tags): item1item2
+ * Object (nest tags): value{% endif %}
+
+ {% if use_xml_format %}
+
+
+
+ What is the weather like in Hangzhou?
+
+
+
+
+ get_weather: Get current weather information for a specified city
+
+
+ {
+ "name": "get_weather",
+ "description": "Get current weather information for a specified city",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "city": {
+ "type": "string",
+ "description": "The city name to query weather for"
+ },
+ "unit": {
+ "type": "string",
+ "enum": ["celsius", "fahrenheit"],
+ "description": "Temperature unit"
+ },
+ "include_forecast": {
+ "type": "boolean",
+ "description": "Whether to include forecast data"
+ }
+ },
+ "required": ["city"]
+ }
+ }
+
+
+
+ Assistant response:
+
+ Hangzhou
+ celsius
+ false
+
+
+ {% endif %}
+
+
+ {{ query }}
+
+ {% if previous_trial %}
+
+
+
+ Your previous tool call had incorrect arguments.
+
+
+ {{ previous_trial }}
+
+
+ {{ err_info }}
+
+
+ {% endif %}
+
+
+
+ {{ tool_descriptions }}
+
+
+ {{ tool_schemas }}
+
+
+ """,
+ ),
+}
diff --git a/apps/llm/providers/ollama.py b/apps/llm/providers/ollama.py
index 35e4235ebccac4d796d0c1fa079470d3ff7952b8..e721510694c2f7d264e6dea72b06e88af1c1047d 100644
--- a/apps/llm/providers/ollama.py
+++ b/apps/llm/providers/ollama.py
@@ -47,6 +47,7 @@ class OllamaProvider(BaseProvider):
self._client = AsyncClient(
host=self.config.baseUrl,
timeout=self._timeout,
+ verify=False,
)
else:
self._client = AsyncClient(
@@ -55,6 +56,7 @@ class OllamaProvider(BaseProvider):
"Authorization": f"Bearer {self.config.apiKey}",
},
timeout=self._timeout,
+ verify=False,
)
def _process_usage_data(self, last_chunk: ChatResponse | None, messages: list[dict[str, str]]) -> None:
diff --git a/apps/llm/providers/openai.py b/apps/llm/providers/openai.py
index 0c2d3d4584b5cdfefde6a1c2f59fb3ce08d1902c..b14e9d4b9bea6f35227f819252959811dd6fb0e5 100644
--- a/apps/llm/providers/openai.py
+++ b/apps/llm/providers/openai.py
@@ -5,6 +5,7 @@ import logging
from collections.abc import AsyncGenerator
from typing import cast
+import httpx
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import (
ChatCompletionChunk,
@@ -24,6 +25,7 @@ class OpenAIProvider(BaseProvider):
"""OpenAI大模型客户端"""
_client: AsyncOpenAI
+ _http_client: httpx.AsyncClient
input_tokens: int
output_tokens: int
_allow_chat: bool
@@ -54,16 +56,19 @@ class OpenAIProvider(BaseProvider):
@override
def _init_client(self) -> None:
"""初始化模型API客户端"""
+ self._http_client = httpx.AsyncClient(verify=False) # noqa: S501
if not self.config.apiKey:
self._client = AsyncOpenAI(
base_url=self.config.baseUrl,
timeout=self._timeout,
+ http_client=self._http_client,
)
else:
self._client = AsyncOpenAI(
base_url=self.config.baseUrl,
api_key=self.config.apiKey,
timeout=self._timeout,
+ http_client=self._http_client,
)
def _handle_usage_chunk(self, chunk: ChatCompletionChunk | None, messages: list[dict[str, str]]) -> None:
diff --git a/apps/llm/providers/tei.py b/apps/llm/providers/tei.py
index 0e5e9476cdd3b64428cda19e4c7c12da561d2b8d..088008622e7b5f38f0c08f91eddf7a85365d0da7 100644
--- a/apps/llm/providers/tei.py
+++ b/apps/llm/providers/tei.py
@@ -52,7 +52,7 @@ class TEIProvider(BaseProvider):
async def embedding(self, text: list[str]) -> list[list[float]]:
"""访问TEI兼容的Embedding API,获得向量化数据"""
text = self._validate_input(text)
- async with httpx.AsyncClient() as client:
+ async with httpx.AsyncClient(verify=False) as client: # noqa: S501
result = []
for single_text in text:
data = {
diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py
index 06939f9b794be1199efd208177afc2c85c355724..6d1dc396395b01b1c85db24ab59585d2776df8d1 100644
--- a/apps/scheduler/call/core.py
+++ b/apps/scheduler/call/core.py
@@ -12,7 +12,6 @@ from typing import TYPE_CHECKING, Any, ClassVar, Self
from pydantic import BaseModel, ConfigDict, Field
from pydantic.json_schema import SkipJsonSchema
-from apps.llm import json_generator
from apps.models import ExecutorHistory, LanguageType, NodeInfo
from apps.schemas.enum_var import CallOutputType
from apps.schemas.scheduler import (
@@ -189,7 +188,7 @@ class CoreCall(BaseModel):
async def _llm(self, messages: list[dict[str, Any]], *, streaming: bool = False) -> AsyncGenerator[str, None]:
"""Call可直接使用的LLM非流式调用"""
think_tag_opened = False
- async for chunk in self._llm_obj.reasoning.call(messages, streaming=streaming):
+ async for chunk in self._llm_obj.call(messages, streaming=streaming):
if chunk.reasoning_content:
if not think_tag_opened:
yield ""
@@ -201,27 +200,3 @@ class CoreCall(BaseModel):
yield ""
think_tag_opened = False
yield chunk.content
-
-
- async def _json(self, messages: list[dict[str, Any]], function: dict[str, Any]) -> dict[str, Any]:
- """Call可直接使用的JSON生成"""
- # 从messages中提取最后一条用户消息作为query,其他作为conversation
- query = ""
- conversation = []
-
- for i, msg in enumerate(messages):
- role = msg.get("role")
- # 跳过system消息
- if role == "system":
- continue
- # 找到最后一条user消息作为query
- if role == "user" and i == len(messages) - 1:
- query = msg.get("content", "")
- else:
- conversation.append(msg)
-
- return await json_generator.generate(
- query=query,
- function=function,
- conversation=conversation if conversation else None,
- )
diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py
index 378ba763411b2a5b96994d19abf09f9f16d99fa0..5f2bffbec3d01d723b86a98b5a1128278e9941de 100644
--- a/apps/scheduler/call/facts/facts.py
+++ b/apps/scheduler/call/facts/facts.py
@@ -4,10 +4,9 @@
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any, Self
-from jinja2 import BaseLoader
-from jinja2.sandbox import SandboxedEnvironment
from pydantic import Field
+from apps.llm import json_generator
from apps.models import LanguageType, NodeInfo
from apps.scheduler.call.core import CoreCall
from apps.schemas.enum_var import CallOutputType
@@ -77,32 +76,44 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput):
async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]:
"""执行工具"""
data = FactsInput(**input_data)
- # jinja2 环境
- env = SandboxedEnvironment(
- loader=BaseLoader(),
- autoescape=False,
- trim_blocks=True,
- lstrip_blocks=True,
- extensions=["jinja2.ext.loopcontrols"],
- )
- # 提取事实信息
- facts_tpl = env.from_string(FACTS_PROMPT[self._sys_vars.language])
- facts_prompt = facts_tpl.render(conversation=data.message)
- facts_obj = await self._json([
- {"role": "system", "content": "You are a helpful assistant."},
+ # 组装conversation消息
+ facts_prompt = FACTS_PROMPT[self._sys_vars.language]
+ facts_conversation = [
+ *data.message,
{"role": "user", "content": facts_prompt},
- ], FactsGen.model_json_schema())
- facts_obj = FactsGen.model_validate(facts_obj)
+ ]
- # 更新用户画像
- domain_tpl = env.from_string(DOMAIN_PROMPT[self._sys_vars.language])
- domain_prompt = domain_tpl.render(conversation=data.message)
- domain_list = await self._json([
- {"role": "system", "content": "You are a helpful assistant."},
+ # 提取事实信息
+ facts_result = await json_generator.generate(
+ function={
+ "name": "extract_facts",
+ "description": "Extract facts from the conversation",
+ "parameters": FactsGen.model_json_schema(),
+ },
+ conversation=facts_conversation,
+ language=self._sys_vars.language,
+ )
+ facts_obj = FactsGen.model_validate(facts_result)
+
+ # 组装conversation消息
+ domain_prompt = DOMAIN_PROMPT[self._sys_vars.language]
+ domain_conversation = [
+ *data.message,
{"role": "user", "content": domain_prompt},
- ], DomainGen.model_json_schema())
- domain_list = DomainGen.model_validate(domain_list)
+ ]
+
+ # 更新用户画像
+ domain_result = await json_generator.generate(
+ function={
+ "name": "extract_domain",
+ "description": "Extract domain keywords from the conversation",
+ "parameters": DomainGen.model_json_schema(),
+ },
+ conversation=domain_conversation,
+ language=self._sys_vars.language,
+ )
+ domain_list = DomainGen.model_validate(domain_result)
for domain in domain_list.keywords:
await UserTagManager.update_user_domain_by_user_and_domain_name(data.user_id, domain)
diff --git a/apps/scheduler/call/facts/prompt.py b/apps/scheduler/call/facts/prompt.py
index 06f8f17bfc2ee77a65e0f5325a1453f8719d9344..e4cfb9b7ff3bb466d23a8e1fbbe38734230c317d 100644
--- a/apps/scheduler/call/facts/prompt.py
+++ b/apps/scheduler/call/facts/prompt.py
@@ -2,78 +2,69 @@
"""记忆提取工具的提示词"""
from textwrap import dedent
+from typing import Any
from apps.models import LanguageType
DOMAIN_PROMPT: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
-
-
- 根据对话上文,提取推荐系统所需的关键词标签,要求:
- 1. 实体名词、技术术语、时间范围、地点、产品等关键信息均可作为关键词标签
- 2. 至少一个关键词与对话的话题有关
- 3. 标签需精简,不得重复,不得超过10个字
- 4. 使用JSON格式输出,不要包含XML标签,不要包含任何解释说明
-
-
-
-
- 北京天气如何?
- 北京今天晴。
-
-
-
-
-
-
-
- {% for item in conversation %}
- <{{item['role']}}>
- {{item['content']}}
- {{item['role']}}>
- {% endfor %}
-
-