diff --git a/jiuwen/core/utils/llm/base.py b/jiuwen/core/utils/llm/base.py new file mode 100644 index 0000000000000000000000000000000000000000..5e0bb4f7e4a06187b020c2ef4a2c2d6d287dda09 --- /dev/null +++ b/jiuwen/core/utils/llm/base.py @@ -0,0 +1,138 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +import asyncio +import json +from abc import abstractmethod +from typing import List, Any, Union, Dict, Optional, AsyncIterator, Iterator +from pydantic import BaseModel, Field, field_validator + +from jiuwen.core.utils.llm.messages import BaseMessage, ToolInfo +from jiuwen.core.utils.llm.messages_chunk import BaseMessageChunk + + +class BaseChatModel: + output_parser_list = None + + def invoke(self, messages: Union[List[BaseMessage], List[Dict], str], + tools: Union[List[ToolInfo], List[Dict]] = None, **kwargs: Any): + try: + return self._invoke(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs) + except NotImplementedError: + return asyncio.run(self.ainvoke(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs)) + + async def ainvoke(self, messages: Union[List[BaseMessage], List[Dict], str], + tools: Union[List[ToolInfo], List[Dict]] = None, **kwargs: Any): + try: + return await self._ainvoke(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs) + except NotImplementedError: + return self._invoke(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs) + + def stream(self, messages: Union[List[BaseMessage], List[Dict], str], + tools: Union[List[ToolInfo], List[Dict]] = None, **kwargs: Any): + try: + for chunk in self._stream(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs): + yield chunk + except NotImplementedError: + async def async_gen_wrapper(): + async for chunk in self._astream(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs): + yield chunk + + loop = asyncio.new_event_loop() + try: + gen = async_gen_wrapper() + while True: + try: + chunk = loop.run_until_complete(gen.__anext__()) + yield chunk + except StopAsyncIteration: + break + finally: + loop.close() + + + async def astream(self, messages: Union[List[BaseMessage], List[Dict], str], + tools: Union[List[ToolInfo], List[Dict]] = None, **kwargs: Any)-> AsyncIterator[BaseMessageChunk]: + try: + async for chunk in self._astream(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs): + yield chunk + except NotImplementedError: + for chunk in self._stream(messages=self._cover_messages_format(messages), + tools=self._cover_tool_format(tools), **kwargs): + yield chunk + + def _invoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> BaseMessage: + raise NotImplementedError("BaseChatModel _invoke not implemented") + + async def _ainvoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> BaseMessage: + raise NotImplementedError("BaseChatModel _ainvoke not implemented") + + def _stream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> Iterator[BaseMessageChunk]: + raise NotImplementedError("BaseChatModel _stream not implemented") + + async def _astream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AsyncIterator[ + BaseMessageChunk]: + raise NotImplementedError("BaseChatModel _astream not implemented") + + @abstractmethod + def model_provider(self): + pass + + def _cover_tool_format(self, tools: Union[List[ToolInfo], List[Dict]]): + if not tools: + return [] + + if all(isinstance(item, Dict) for item in tools): + return tools + else: + return [json.loads(tool.model_dump_json()) for tool in tools] + + def _cover_messages_format(self, messages: Union[List[BaseMessage], List[Dict], str]): + if not messages: + return [{"role": "user", "content": ""}] + + if isinstance(messages, str): + return [{"role": "user", "content": messages}] + else: + if all(isinstance(item, Dict) for item in messages): + return messages + return [item.model_dump(exclude_none=True) for item in messages] + + def bind_out_parser(self, output_parser_list: List): + self.output_parser_list = output_parser_list + pass + + def post_process(self, model_output): + pass + + def pre_process(self, model_output): + pass + + +class BaseModelInfo(BaseModel): + api_key: Optional[str] = Field(default="", alias="api_key") + api_base: Optional[str] = Field(default="", alias="api_base") + model_name: str = Field(default="", alias="model") + temperature: float = Field(default=0.95) + top_p: float = Field(default=0.1) + streaming: bool = Field(default=False, alias="stream") + timeout: float = Field(default=60.0) + + @field_validator('model_name', mode='before') + @classmethod + def handle_model_name(cls, v, values): + if not v and 'model' in values.data: + return values.data['model'] + return v + + class Config: + populate_by_name = True + extra = "forbid" diff --git a/jiuwen/core/utils/llm/llm_config.yaml b/jiuwen/core/utils/llm/llm_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/jiuwen/core/utils/llm/messages.py b/jiuwen/core/utils/llm/messages.py index 1135db26a8c4415502dff1f43f19889e470271ed..55c83d3e5c5cc3adf672d0a6f1e8fcc82c098f4d 100644 --- a/jiuwen/core/utils/llm/messages.py +++ b/jiuwen/core/utils/llm/messages.py @@ -1,9 +1,61 @@ -from typing import Union, List, Dict, Optional +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved +from typing import Union, Dict, List, Optional, Any from pydantic import BaseModel +class Parameters(BaseModel): + type: str = "object" + properties: Dict[str, Any] = {} + required: List[str] + +class Function(BaseModel): + name: str + description: str + parameters: Parameters + +class ToolInfo(BaseModel): + type: str = "function" + function: Function + +class ToolCall(BaseModel): + name: str + args: Dict[str, Any] = {} + id: Optional[str] class BaseMessage(BaseModel): role: str content: Union[str, List[Union[str, Dict]]] - name: Optional[str] = None \ No newline at end of file + name: Optional[str] = None + +class UsageMetadata(BaseModel): + code: int = -1 + errmsg: str= "Model request exception, please try again" + prompt: str = "" + task_id: str = "" + model_name: str = "" + finish_reason: str = "" + total_latency: float = 0. + model_stats: dict = {} + first_token_time: str = "" + request_start_time: str = "" + +class AIMessage(BaseMessage): + + role: str = "assistant" + tool_calls: List[ToolCall] = None + usage_metadata: Optional[UsageMetadata] = None + raw_content: Optional[str] = None + reason_content: Optional[str] = None + +class HumanMessage(BaseMessage): + role: str = "user" + +class SystemMessage(BaseMessage): + role: str = "system" + +class ToolMessage(BaseMessage): + role: str = "tool" + tool_call_id: str + diff --git a/jiuwen/core/utils/llm/messages_chunk.py b/jiuwen/core/utils/llm/messages_chunk.py new file mode 100644 index 0000000000000000000000000000000000000000..4451e4acd9d0446c9bc072778162df320320afc4 --- /dev/null +++ b/jiuwen/core/utils/llm/messages_chunk.py @@ -0,0 +1,56 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +from typing import Any + +from jiuwen.core.utils.llm.messages import BaseMessage, AIMessage, ToolMessage + + +class BaseMessageChunk(BaseMessage): + class Config: + arbitrary_types_allowed = True + json_encoders = {type(None): lambda _: None} + + def __add__(self, other: "BaseMessageChunk") -> "BaseMessageChunk": + if not isinstance(other, BaseMessageChunk): + raise TypeError(f"Cannot add {self.__class__.__name__} to {type(other)}") + + if isinstance(self.content, str) and isinstance(other.content, str): + combined_content = self.content + other.content + elif isinstance(self.content, list) and isinstance(other.content, list): + combined_content = self.content + other.content + else: + combined_content = other.content + + + return self.__class__(role=self.role, content=combined_content, name=self.name or other.name) + + +class AIMessageChunk(AIMessage, BaseMessageChunk): + def __add__(self, other: Any) -> "AIMessageChunk": + if not isinstance(other, AIMessageChunk): + raise TypeError(f"Cannot add AIMessageChunk to {type(other)}") + + return AIMessageChunk( + role=self.role, + content=(self.content or "") + (other.content or ""), + name=other.name or self.name, + tool_calls=(self.tool_calls or []) + (other.tool_calls or []), + usage_metadata=other.usage_metadata or self.usage_metadata, + raw_content=other.raw_content or self.raw_content, + reason_content=other.reason_content or self.reason_content + ) + + +class ToolMessageChunk(ToolMessage, BaseMessageChunk): + def __add__(self, other: Any) -> "ToolMessageChunk": + if not isinstance(other, ToolMessageChunk): + raise TypeError(f"Cannot add ToolMessageChunk to {type(other)}") + + return ToolMessageChunk( + role="tool", + content=(self.content or "") + (other.content or ""), + name=other.name or self.name, + tool_call_id=other.tool_call_id or self.tool_call_id + ) \ No newline at end of file diff --git a/jiuwen/core/utils/llm/model_library/siliconflow.py b/jiuwen/core/utils/llm/model_library/siliconflow.py new file mode 100644 index 0000000000000000000000000000000000000000..c2b9e9b4d6e575fb03fde27a92053a6e9d736f78 --- /dev/null +++ b/jiuwen/core/utils/llm/model_library/siliconflow.py @@ -0,0 +1,47 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +from typing import List, Dict, Any, Iterator, AsyncIterator +from pydantic import Field, BaseModel + +from jiuwen.core.utils.llm.base import BaseChatModel, BaseModelInfo +from jiuwen.core.utils.llm.messages import AIMessage +from jiuwen.core.utils.llm.messages_chunk import AIMessageChunk +from jiuwen.core.utils.llm.model_utils.defult_model import RequestChatModel + + +class Siliconflow(BaseModel, BaseChatModel): + model_info: BaseModelInfo=Field(default_factory=BaseModelInfo) + _request_model: RequestChatModel = None + + def __init__(self, model_info: BaseModelInfo): + super().__init__() + self.model_info = BaseModelInfo(**model_info.dict()) + self._request_model = RequestChatModel(model_info=self.model_info) + self._should_close_session = True + + + async def close(self): + if hasattr(self, '_request_model') and self._request_model: + if hasattr(self._request_model, 'close'): + await self._request_model.close() + self._request_model = None + + + def model_provider(self)-> str: + return "siliconflow" + + def _invoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AIMessage: + return self._request_model._invoke(messages, tools, **kwargs) + + async def _ainvoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AIMessage: + return await self._request_model._ainvoke(messages, tools, **kwargs) # Added await here + + def _stream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> Iterator[AIMessageChunk]: + return self._request_model._stream(messages, tools, **kwargs) + + async def _astream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AsyncIterator[ + AIMessageChunk]: + async for chunk in self._request_model._astream(messages, tools, **kwargs): + yield chunk \ No newline at end of file diff --git a/jiuwen/core/utils/llm/model_utils/defult_model.py b/jiuwen/core/utils/llm/model_utils/defult_model.py new file mode 100644 index 0000000000000000000000000000000000000000..70a44a73b22cfc971b38b57efcffd4818d8683e2 --- /dev/null +++ b/jiuwen/core/utils/llm/model_utils/defult_model.py @@ -0,0 +1,159 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +import aiohttp +import json +from typing import List, Dict, Any, Iterator, AsyncIterator, Optional + +from aiohttp import ClientSession +from pydantic import ConfigDict +from requests import Session + +from jiuwen.core.utils.llm.base import BaseChatModel, BaseModelInfo +from jiuwen.core.utils.llm.messages import AIMessage, UsageMetadata +from jiuwen.core.utils.llm.messages_chunk import AIMessageChunk + + +class RequestChatModel(BaseChatModel, BaseModelInfo): + model_info: BaseModelInfo + model_config = ConfigDict(arbitrary_types_allowed=True) + sync_client: Session = Session() + aiohttp_session: Optional[ClientSession] = None + + async def ensure_session(self): + if self.aiohttp_session is None or self.aiohttp_session.closed: + self.aiohttp_session = aiohttp.ClientSession() + + def model_provider(self) -> str: + return "generic_http_api" + + def _invoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AIMessage: + params = self._request_params(messages, tools, *kwargs) + + response = self.sync_client.post( + verify=False, + url=self.model_info.api_base, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {self.model_info.api_key}" + }, + json=params, + timeout=self.model_info.timeout + ) + response.raise_for_status() + return self._parse_response(response.json()) + + async def _ainvoke(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AIMessage: + await self.ensure_session() + params = self._request_params(messages, tools, *kwargs) + async with self.aiohttp_session.post( + url=self.model_info.api_base, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {self.model_info.api_key}" + }, + json=params, + timeout=self.model_info.timeout + ) as response: + response.raise_for_status() + data = await response.json() + return self._parse_response(data) + + def _stream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> Iterator[AIMessageChunk]: + params = self._request_params(messages, tools, *kwargs) + + params["stream"] = True + with self.sync_client.post( + verify=False, + url=self.model_info.api_base, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {self.model_info.api_key}" + }, + json=params, + stream=True, + timeout=self.model_info.timeout + ) as response: + response.raise_for_status() + for line in response.iter_lines(): + if line: + chunk = self._parse_stream_line(line) + if chunk: + yield chunk + + async def _astream(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> AsyncIterator[ + AIMessageChunk]: + await self.ensure_session() + params = self._request_params(messages, tools, *kwargs) + params["stream"] = True + + async with self.aiohttp_session.post( + url=self.model_info.api_base, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {self.model_info.api_key}" + }, + json=params, + timeout=aiohttp.ClientTimeout(total=self.model_info.timeout) + ) as response: + response.raise_for_status() + async for line in response.content: + if line: + chunk = self._parse_stream_line(line) + if chunk: + yield chunk + + def _request_params(self, messages: List[Dict], tools: List[Dict] = None, **kwargs: Any) -> Dict: + params = { + "model": self.model_info.model_name, + "messages": messages, + "temperature": self.model_info.temperature, + "top_p": self.model_info.top_p, + **kwargs + } + + if tools: + params["tools"] = tools + + return params + + def _parse_response(self, response_data: Dict) -> AIMessage: + choice = response_data.get("choices", [{}])[0] + message = choice.get("message", {}) + + return AIMessage( + content=message.get("content", ""), + tool_calls=message.get("tool_calls", []), + usage_metadata=UsageMetadata( + model_name=self.model_name, + finish_reason=choice.get("finish_reason", ""), + total_latency=response_data.get('usage', {}).get('total_tokens', 0) + ) + ) + + def _parse_stream_line(self, line: bytes) -> Optional[AIMessageChunk]: + if line.startswith(b"data: "): + line = line[6:] + + if line.strip() == b"[DONE]": + return None + + try: + data = json.loads(line.decode("utf-8")) + choice = data.get("choices", [{}])[0] + delta = choice.get("delta", {}) + content = delta.get("content", "") + if content is None: + content = "" + + return AIMessageChunk( + content=content, + tool_calls=delta.get("tool_calls", []) + ) + except json.JSONDecodeError: + return None + + async def close(self): + if self.aiohttp_session: + await self.aiohttp_session.close() diff --git a/jiuwen/core/utils/llm/model_utils/model_factory.py b/jiuwen/core/utils/llm/model_utils/model_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..2482bc3cd69d86d3ec245c05dcfae4fe23ccd9ee --- /dev/null +++ b/jiuwen/core/utils/llm/model_utils/model_factory.py @@ -0,0 +1,74 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +import importlib +import logging +import os +from typing import Dict, Type + +from jiuwen.core.utils.llm.base import BaseModelInfo, BaseChatModel +from jiuwen.core.utils.llm.model_utils.singleton import Singleton + + +class ModelFactory(metaclass=Singleton): + + def __init__(self): + self.model_map: Dict[str, Type[BaseChatModel]] = {} + self._initialize_models() + + def _initialize_models(self): + core_model_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + 'model_library' + ) + self._load_model_dir(core_model_dir) + custom_model_dir = os.getenv('MODEL_DIR') + if custom_model_dir and os.path.exists(custom_model_dir): + self._load_model_dir(custom_model_dir) + + @staticmethod + def _load_models(model_dir: str) -> Dict[str, Type[BaseChatModel]]: + model_dict = {} + if not os.path.exists(model_dir): + logging.warning(f"Model directory not found: {model_dir}") + return model_dict + + try: + py_files = [ + f for f in os.listdir(model_dir) + if (f.endswith('.py') or f.endswith('.pyc')) and f != "__init__.py" and f != "__init__.pyc" + ] + for py_file in py_files: + module_name = os.path.splitext(py_file)[0] + module_path = os.path.join(model_dir, py_file) + + try: + spec = importlib.util.spec_from_file_location(module_name, module_path) + if spec is None: + continue + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for name, obj in module.__dict__.items(): + if (isinstance(obj, type) and issubclass(obj, BaseChatModel) and obj != BaseChatModel): + model_dict[module_name] = obj # 使用model_provider作为key + logging.info(f"Loaded model: {module_name} -> {obj.__name__}") + except Exception as e: + logging.error(f"Error loading module {py_file}: {str(e)}") + continue + except Exception as e: + raise Exception("module load error") + return model_dict + + def _load_model_dir(self, model_dir: str): + model_dict = self._load_models(model_dir) + self.model_map.update(model_dict) + + def get_model(self, model_provider: str, model_info: BaseModelInfo) -> BaseChatModel: + model_cls = self.model_map.get(model_provider.lower()) + print(model_cls) + if not model_cls: + available_models = ", ".join(self.model_map.keys()) + raise ValueError(f"Unavailable model provider: {model_provider}. Available models: {available_models}") + return model_cls(model_info) \ No newline at end of file diff --git a/jiuwen/core/utils/llm/model_utils/singleton.py b/jiuwen/core/utils/llm/model_utils/singleton.py new file mode 100644 index 0000000000000000000000000000000000000000..712bda89efaa80ae7f5459c337d2fad5d3c1d2fb --- /dev/null +++ b/jiuwen/core/utils/llm/model_utils/singleton.py @@ -0,0 +1,17 @@ +#!/usr/bin/python3.11 +# coding: utf-8 +# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved + +import abc +import threading +singleton_lock = threading.Lock() + + +class Singleton(abc.ABCMeta, type): + _instances = {} + + def __call__(cls, *args, **kwargs): + with singleton_lock: + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] \ No newline at end of file