代码拉取完成,页面将自动刷新
# 客户端
import asyncio
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from mcp.shared.context import RequestContext
from mcp.types import (
TextContent,
CreateMessageRequestParams,
CreateMessageResult,
)
from mcp.shared.memory import (
create_connected_server_and_client_session as create_session
)
# 这里需要引入服务端的 app 对象
from file_server import app
server_params = StdioServerParameters(
command='uv',
args=['run', 'file_server.py'],
)
async def sampling_callback(
context: RequestContext[ClientSession, None],
params: CreateMessageRequestParams,
):
# 获取工具发送的消息并显示给用户
input_message = input(params.messages[0].content.text)
print("输入的回答是: ", input_message)
# 将用户输入发送回工具
return CreateMessageResult(
role='user',
content=TextContent(
type='text',
text=input_message.strip().upper() or 'Y'
),
model='user-input',
stopReason='endTurn'
)
async def main():
async with create_session(
app._mcp_server,
sampling_callback=sampling_callback
) as client_session:
res = await client_session.call_tool(
'delete_file',
{'file_path': 'C:/xxx.txt'}
)
print(res)
if __name__ == '__main__':
asyncio.run(main())
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。