手搓一个Agent

文章目录1.Agent基本概念1.1 Agent是什么1.2 Agent的构建1.3 AI Agent的实践1.3.1 两种主流 Agent 模式1.3.2 单⼀Agent VS Multi-Agent代码示例V1:大

文章目录

  • 1.Agent基本概念
    • 1.1 Agent是什么
    • 1.2 Agent的构建
  • 1.3 AI Agent的实践
    • 1.3.1 两种主流 Agent 模式
      • 1.3.2 单⼀Agent VS Multi-Agent
  • 代码示例
    • V1:大模型感知工具
    • v2: 大模型规划工具链
    • v2.5 Function Call
    • V3: MCP
    • 官方示例
  • 参考文献

1.Agent基本概念

1.1 Agent是什么

AI Agent(以下简称Agent)是⼀种模拟⼈类智能⾏为的⼈⼯智能系统,以⼤型语⾔模型(LLM)作为其核⼼引擎。它们能够感知其环境,做出决策,并执⾏任务以实现特定的⽬标。AI Agent的设计理念是赋予机器⾃主性、适应性和交互性,使其能够在复杂多变的环境中独⽴运作。
OpenAI给的定义:

Agents are systems that independently accomplish tasks on your behalf.

谷歌给的定义:

This combination of reasoning, logic, and access to external information that are all connected to a Generative AI model invokes the concept of an agent, or a
program that extends beyond the standalone capabilities of a Generative AI model.

1.2 Agent的构建

Agent = LLM(⼤型语⾔模型)+ 记忆 + 规划技能 + ⼯具使⽤。

其中 LLM 扮演了 Agent 的“⼤脑”,在这个系统中提供推理、规划等能⼒。

  1. Planning(规划)

⼦⽬标和分解:将⼤型任务分解为较⼩、可管理的⼦⽬标,从⽽有效地处理复杂的任务。

反思和改进:对过去的⾏动进⾏⾃我反思,从错误中学习并优化未来的步骤。

  1. Memory(记忆):Agent的知识库

感觉记忆(Sensory Memory):对应于Agent接收到原始感官输⼊的初步处理,通常时间短暂。

短期记忆(Short-Term Memory):⽤于存储当前会话或任务中的信息,这些信息对于完成⼿头任务⾄关重要,但任务完成后通常不再保留。

⻓期记忆(Long-Term Memory):⽤于存储需要⻓期保留的信息,如⽤户偏好、历史交互等。⻓期记忆通常存储在外部数据库中,并通过快速检索机制供Agent使⽤。

  1. Tool use & Action(⼯具使⽤&执⾏)

Agent利⽤外部资源或⼯具来增强其感知、决策和⾏动能⼒的过程。这些⼯具可以是API、软件库、硬件设备或其他服务。

Agent基于规划和记忆执⾏执⾏最后的具体动作。

1.3 AI Agent的实践

1.3.1 两种主流 Agent 模式

对比维度ReActPlan-and-Execute
核心思想将任务分解为“推理”和“行动”两个交替进行的阶段将任务分为“规划”和“执行”两个明确的阶段
使用场景1.动态、交互式任务,需要根据环境反馈动态调整。2.任务结构不明确。1.结构化、长期规划任务,需要全局规划。2.任务步骤明确。
工作流程1. 推理:生成思考链,确定下一步行动。2. 行动:调用工具执行。3. 迭代:根据结果优化。1. 规划:生成完整的任务执行计划(DAG)。2. 执行:按计划批量执行子任务。
优点1.动态调整能力强。2.灵活性高,适合不确定性大的任务。 3.上下文和记忆管理方便。1.执行效率高。2.全局规划能力强,适合复杂任务。
缺点1.执行效率较低,Token消耗量大。2.规划能力有限,缺乏全局视角。1.灵活性较低,难以动态调整。2.依赖任务结构,不适合不确定性大的任务。

ReAct

https://github/ysymyth/ReAct

ReAct(Reasoning + Acting)框架将任务分解为“思考”和“⾏动”两个交替进⾏的阶段。

⾸先,Agent 通过少样本提示⽣成思考链(Thought Chain),确定下⼀步⾏动;

然后执⾏⼯具调⽤(Action),观察结果并迭代优化。这种框架适⽤于需要逐步推理和动态调整的任务。

和ReAct相对应的是Reasoning-Only和Action-Only。

在Reasoning-Only的模式下,⼤模型会基于任务进⾏逐步思考,并且不管有没有获得结果,都会把思考的每⼀步都执⾏⼀遍。

在Action-Only的模式下,⼤模型就会处于完全没有规划的状态下,先进⾏⾏动再进⾏观察,基于观察再调整⾏动,导致最终结果不可控。

Plan-and-Execute

https://arxiv/abs/2305.04091

该框架将任务分为“规划”和“执⾏”两个明确的阶段。

⾸先⽣成完整的任务执⾏计划(通常是 DAG 图);

然后按计划批量执⾏⼦任务。这种框架适⽤于任务结构清晰、需要预先规划的场景。

Planner接收来⾃⽤户的输⼊,输出具体的任务清单;

将任务清单给到Single-Task Agent,即执⾏器,执⾏器会在循环中逐个处理任务;

执⾏器每处理⼀个任务,就将处理结果和状态同步给Replanner,Replanner⼀⽅⾯会输出反馈给⽤户,另⼀⽅⾯会更新任务清单;

任务清单再次给到执⾏器进⾏执⾏。

优点:具备明确的⻓期规划,同时可以只使⽤较⼤的模型做规划,⽽使⽤较⼩的模型执⾏步骤,降低执⾏成本。

缺点:每个任务是按顺序执⾏的,下⼀个任务都必须等上⼀个任务完成之后才能执⾏,这可能会导致总执⾏时间的增加。

1.3.2 单⼀Agent VS Multi-Agent

单一Agent

定义

由⼀个Agent独⽴独⽴完成任务。这种系统通常被设计为解决特定问题或执⾏特定任务。

单Agent系统的优势在于其结构简单、易于实现和维护。

  1. 典型开源项⽬

Langchain:https://www.langchain/

⼀个⽤于开发基于语⾔模型的应⽤程序开发框架,依赖于LLM,但是只是调⽤,并不会集成LLMs,它的核⼼理念是为各种LLMs实现通⽤的接⼝,通过灵活的“链结构”把LLMs相关的组件组装在⼀起,简化LLMs应⽤的开发难度,⽅便开发者快速地开发复杂的LLMs应⽤。

Langchain侧重于语言模型的集成与工具使用,适用于构建各种自然语言处理应用,如智能客服、文本生成等。

AutoGPT:https://github/Significant-Gravitas/AutoGPT?ref=jina-ai-gmbh.ghost.io

⼀个开源的⾃主 AI 代理,基于 gpt-4 和 gpt-3.5 模型,集成了各种实⽤的外部⼯具和⻓短期内存管理。可以根据⽤户给定的⽬标,⾃动⽣成所需的提示,并执⾏多种任务,如创建⽹站、⽣产社交媒体内容、读写⽂件、浏览⽹⻚等。

AutoGPT具有自主决策和互联网交互能力,适合处理需要自主探索和目标达成的任务,如项目管理、市场调研等。

BabyAGI:https://github/yoheinakajima/babyagi

Babyagi 是⼀个智能任务管理和解决⼯具,它结合了OpenAI GPT-4和Pinecone向量搜索引擎的⼒量,以⾃动完成和管理⼀系列任务,从⼀个初始任务开始,babyagi使⽤GPT4⽣成解决⽅案和新任务,并将解决⽅案存储在Pinecone中以便进⼀步检索。

Babyagi 专注于任务管理和执行,适用于构建数据处理流水线、自动化脚本执行等小型项目任务协调场景。通过任务队列管理,有序执行一系列相关任务。

Multi-Agent

定义

Multi-Agent系统可以理解为单⼀Agent的升级版。这种架构适⽤于需要多领域知识或多任务并⾏的场景。

单⼀Agent是由⼀个⼤模型进⾏思考和规划,然后调⽤外部⼯具完成任务;⽽Multi-Agent则是让多个⼤模型分⼯合作,每个模型负责不同的⼦任务,并通过通信和协调机制与其他Agent合作,最终共同完成任务。

  1. 与单一Agent的区别

减少了记忆容量的需求:Multi-Agent的每个Agent只需要关注与自己相关的信息即可,不需要cover所有的历史信息。而单一Agent,比如AutoGPT,需要记住所有的历史信息。这意味着单智能体在面临长历史的复杂任务时,对记忆容量(大模型支持的序列长度)要求比较高。

多方案并行探索:Multi-Agent可以并行探索多方案后选取最优的解决方案。

可拓展性更好:对于单一Agent,随着每次输入给大模型的context会变长,会产生性能下降的风险(大模型处理长序列会存在丢失关键信息的情况等等)。Multi-Agent分工协作则没有这个问题,因为每个Agent完成特定的子任务,子任务一般不会有很长的context。

表现更稳定:通过角色扮演的机制可以排除一些其他视角的观点,让大模型的表现更稳定,单一Agent由于混杂了很多任务,无法做到这一点。

  1. 典型开源项⽬

MetaGPT:https://github/geekan/MetaGPT

MetaGPT 是⼀个专注于多 Agent 协作的框架,旨在通过多个 Agent 的协同⼯作来解决复杂问题。

MetaGPT 适⽤于需要多 Agent 协作的场景,如智能客服、多任务⾃动化等。它通过任务分解和 Agent 间的通信机制,能够⾼效处理多步骤、多领域的任务。

HuggingGPT:https://github/microsoft/JARVIS

由微软开发的⼀种 AI 协作系统,可以使⽤ Huggingface 上的多个模型来完成给定的任务。

HuggingGPT 通过 ChatGPT 来协调各 AI 模型之间的合作,可以很好地处理多模态任务,如⽂本分类、物体检测、⽂本转语⾳、⽂本转视频等。

以下示例属于单一Agent、Plan-Execute 模式。

MCP协议示例在 Claude sonet 模型上运行通过,其余示例在 DeepSeekV1 上运行通过。

代码示例

V1:大模型感知工具

大家可能都玩过成语接龙,日行一善,善有善报,报喜不报忧。我们可以简单的把大模型理解为文字接龙机器。所以大模型只是根据概率推断出下一个文本是啥,而没有意识。也无法感知到工具的存在。
那么如何把大模型和工具关联起来呢?如果工具是一个Python函数,我们把函数的名称、功能和出入参格式以Prompt的形式投喂给大模型,大模型就能返回调用该函数的时机,然后我们手动解析这个时机并执行对应函数。这样就完成了大模型调用工具。
所以,本质上调用工具是我们的胶水代码完成的。我们的胶水代码充当了大模型输出和工具之间的中间人角色。
下面示例是个简单的Agent,作用是获取美剧数据,这些美剧中包含给定的英语短语。
代码包括 Agent、⼤型语⾔模型 Model 和⼯具 Toolbox 等几个部分。
工具箱里面只有一个工具方法 fetch_tv_series_list(),其功能和出入参见注释。
我们在 generate_prompt() 方法中生成 Prompt 然后投喂给大模型。Prompt里面的信息很关键,大模型会根据这个输入生成任务清单,然后我们解析这个任务清单并调用对应的工具方法,就完成了 Agent 的功能。
在代码中,我注释掉了对大模型的调用,转而返回mock数据,mock数据是根据真实的大模型返回数据生成的。
从执行结果看,大模型已经把工具的输入参数按照要求拼装好了,我们解析之后可以直接使用。
注意,以下示例中虽然调用的是 Deepseek 的模型,但是却是使用的 OpenAI 的API,这不是手误或者搬运,而是Deepseek官方文档中示例就是用的 OpenAI 的Python API。
只不过,BASE_URL 里面填的是 Deepseek 的网址,网络请求会打到Deepseek的平台上去。

import json
import re
import time

from openai import OpenAI


def fetch_tv_series_list(phrase: str):
    """
    搜索并获取包含该英语短语的英语电视剧数据,这些剧集台词中包含该短语
    Args:
    phrase (str): 英语短语

    Returns:
    json: 包含该英语短语的美剧信息
    """
    print(f"正在执行 fetch_tv_series_list:{phrase}")
    time.sleep(1)

    resp = {
        "phrase": phrase,
        "tv_series_list": [
            {
                "series": "海绵宝宝",
                "season": "第十一季",
                "episode": "第39集",
                "duration": "00:14:32,448 - 00:14:35,620",
                "subtitle": "Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred."
            },
            {
                "series": "生活大爆炸",
                "season": "第一季",
                "episode": "第10集",
                "duration": "00:12:25,960 - 00:12:27,990",
                "subtitle": "Oh-- break a leg. Break a leg.So, road trip to long beach."
            }
        ]
    }

    print(f"fetch_tv_series_list response:{resp}")

    return resp


class Model:
    API_KEY = "API_KEY"
    BASE_URL = "https://api.deepseek"
    MODEL_NAME = "deepseek-chat"

    def generate_response(self, prompt, model_name):
        return [
            {
                'tool_name': 'fetch_tv_series_list',
                'parameters': {
                    'phrase': 'break a leg'
                },
                'description': "使用工具fetch_tv_series_list搜索包含英语短语'break a leg'的电视剧数据,这些剧集的台词中包含该短语。"
            }
        ]

        # client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)
        # 
        # response = client.chatpletions.create(
        #     model=self.MODEL_NAME,
        #     messages=[
        #         {"role": "user", "content": f"{prompt}"},
        #     ],
        #     response_format={
        #         'type': 'json_object'
        #     },
        #     stream=False
        # )
        # 
        # try:
        #     plan = json.loads(response.choices[0].message.content)
        #     return plan['steps']
        # except Exception as e:
        #     print(f"解析失败:{e}")
        #     print(f"使用备用计划,原数据{response}")
        # 
        # return [{
        #     "tool_name": "fetch_tv_series_list",
        #     "parameters": {
        #         "phrase": "break a leg"
        #     },
        #     "description": "搜索并获取包含短语'break a leg'的英语电视剧数据"
        # }]


class Toolbox:
    def __init__(self):
        self.tools = {}

    def store(self, tools):
        for tool in tools:
            name = tool.__name__
            doc = tool.__doc__
            self.tools[name] = doc

    def get_tool_description(self):
        return self.tools


class AgentV1:
    def __init__(self, tools, model_service, model_name, stop_token=None):
        self.tools = tools
        self.model_service = model_service
        self.model_name = model_name
        self.stop_token = stop_token

    def prepare_tools(self):
        toolbox = Toolbox()
        toolbox.store(self.tools)
        tool_descriptions = toolbox.get_tool_description()
        return tool_descriptions

    def generate_prompt(self, query):
        tool_descriptions = self.prepare_tools()
        prompt = f"""
        你是一个高级任务规划AI,请将用户任务分解为可执行的步骤。
        可用工具:{tool_descriptions}
        输出要求:
        1. 使用JSON格式,包含steps数组
        2. 每个步骤包含:
           - "tool_name"(工具名称,如果是工具调用)
           - "parameters"(参数对象)
           - "description"(步骤描述)
        3. 如果使用了tool,请确保给出的参数符合tool的要求
        Query: {query}
        """

        return prompt

    def plan(self, query):
        prompt = self.generate_prompt(query)
        print(f"prompt:{prompt}")
        response = self.model_service.generate_response(prompt, self.model_name)
        return response

    def execute(self, query):
        print("agent start planing")
        response = self.plan(query)
        print(f"agent plan result:{response}")

        try:
            tool_choice = response[0]['tool_name']
            print(f"agent choose tool:{tool_choice}")
            tool_input = response[0]['parameters']
            print(f"agent tool_input:{tool_input}")

            if tool_choice == 'fetch_tv_series_list':
                result = fetch_tv_series_list(tool_input['phrase'])
            else:
                result = tool_input

            print(f"agent result:{result}")
            return result

        except json.JSONDecodeError as e:
            print(f"Error parsing JSON response: {e}")
            return "Error: Invalid JSON response from the model."


if __name__ == "__main__":
    agent = AgentV1(
        tools=[fetch_tv_series_list],
        model_service=Model(),  # Placeholder for actual service class
        model_name="deepseek-chat",  # Or another LLM model
        stop_token=None
    )

    while True:
        query = input("Ask me anything: ")
        if query.lower().strip() == "exit":
            break
        agent.execute(query)
        break


执行代码:

Ask me anything: 包含 break a leg的短语

agent start planing

prompt:
        你是一个高级任务规划AI,请将用户任务分解为可执行的步骤。
        可用工具:{'fetch_tv_series_list': '\n    搜索并获取包含该英语短语的英语电视剧数据,这些剧集台词中包含该短语\n    Args:\n    phrase (str): 英语短语\n\n    Returns:\n    json: 包含该英语短语的美剧信息\n    '}
        输出要求:
        1. 使用JSON格式,包含steps数组
        2. 每个步骤包含:
           - "tool_name"(工具名称,如果是工具调用)
           - "parameters"(参数对象)
           - "description"(步骤描述)
        3. 如果使用了tool,请确保给出的参数符合tool的要求
        Query: 包含 break a leg的短语
        
agent plan result:
[
  {
    "tool_name": "fetch_tv_series_list",
    "parameters": {
      "phrase": "break a leg"
    },
    "description": "使用工具fetch_tv_series_list搜索包含英语短语'break a leg'的电视剧数据,这些剧集的台词中包含该短语。"
  }
]

agent choose tool:fetch_tv_series_list

agent tool_input:{'phrase': 'break a leg'}

正在执行 fetch_tv_series_list:break a leg

fetch_tv_series_list response:{'phrase': 'break a leg', 'tv_series_list': [{'series': '海绵宝宝', 'season': '第十一季', 'episode': '第39集', 'duration': '00:14:32,448 - 00:14:35,620', 'subtitle': 'Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred.'}, {'series': '生活大爆炸', 'season': '第一季', 'episode': '第10集', 'duration': '00:12:25,960 - 00:12:27,990', 'subtitle': 'Oh-- break a leg. Break a leg.So, road trip to long beach.'}]}
agent result:
{
  "phrase": "break a leg",
  "tv_series_list": [
    {
      "series": "海绵宝宝",
      "season": "第十一季",
      "episode": "第39集",
      "duration": "00:14:32,448 - 00:14:35,620",
      "subtitle": "Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred."
    },
    {
      "series": "生活大爆炸",
      "season": "第一季",
      "episode": "第10集",
      "duration": "00:12:25,960 - 00:12:27,990",
      "subtitle": "Oh-- break a leg. Break a leg.So, road trip to long beach."
    }
  ]
}

v2: 大模型规划工具链

V1 版本的 Agent 比较简单,只有1个工具方法。在此章节中我们将其升级到调用多个工具。
我们来看一个实际的应用场景。在短视频平台,有一类学习英语常用短语的视频,是多个美剧视频片段的合集,比如:

我们开发一个生成此类视频的Agent。
我们先来看下人工制作这种视频需要哪些步骤,然后用Agent调用工具来实现。
给定一个常用短语比如 break a leg,我们首先要找哪些美剧的台词中使用了该短语。最简单的方法是去搜台词的网站直接搜索,比如 https://33.agilestudio/。

然后我们去下载相关美剧视频片段。每个视频片段(时长为10秒左右)里面有很多台词,所以我们需要先搜索包含短语的视频子片段(时长为1-2秒),并将这些视频子片段切分出来并合并成一个视频。
然后抽取视频帧作为封面, 最后发布到短视频平台。
这里面每一个步骤都有对应相关的工具方法,具体见如下代码:

import time

import openai
import json
import re

from openai import OpenAI


def get_frequently_used_phrase():
    """
    获取英语口语常用短语

    Returns:
    str: 英语口语常用短语,如:"break a leg"
    """

    print("正在执行 get_frequently_used_phrase")
    time.sleep(1)

    phrase = "break a leg"

    return phrase


def fetch_tv_series_list(phrase: str):
    """
    搜索并获取包含该英语短语的英语电视剧数据,这些剧集台词中包含该短语
    Args:
    phrase (str): 英语短语

    Returns:
    json: 包含该英语短语的美剧信息
    """
    print(f"正在执行 fetch_tv_series_list:{phrase}")
    time.sleep(1)

    resp = {
        "phrase": phrase,
        "tv_series_list": [
            {
                "series": "海绵宝宝",
                "season": "第十一季",
                "episode": "第39集",
                "duration": "00:14:32,448 - 00:14:35,620",
                "subtitle": "Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred."
            },
            {
                "series": "生活大爆炸",
                "season": "第一季",
                "episode": "第10集",
                "duration": "00:12:25,960 - 00:12:27,990",
                "subtitle": "Oh-- break a leg. Break a leg.So, road trip to long beach."
            }
        ]
    }

    print(f"fetch_tv_series_list response:{resp}")

    return resp


def download_tv_series_segment_list(tv_series_info):
    """
    根据美剧详细信息下载对应的在线美剧视频片段,一集视频对应.m3u8格式的playlist,
    每个片段是.ts格式的视频切片,其中部分片段包含目标短语,下载完成后返回视频片段的本地路径
    因为无法准确找到包含该短语的片段,每个美剧可能会下载多个视频片段,
    :param tv_series_info
    :return: 台词中可能包含短语的视频片段
    """

    print("正在执行 download_tv_series_segment_list")
    time.sleep(1)

    print(f"{tv_series_info}")

    info = json.loads(tv_series_info)
    phrase = info["phrase"]
    tv_series_list = info["tv_series_list"]

    video_segment_uri_list = []

    for idx, tv in enumerate(tv_series_list):
        name = tv.get("series")
        season = tv.get("season")
        episode = tv.get("episode")
        duration = tv.get("duration")
        print(f"downloading {name}{season}{episode}-{duration}")
        time.sleep(0.5)
        video_segment_uri_list.append(f"./inputs/{name}{season}{episode}{idx}.ts")

    video_segment_uri_list.sort()

    return {
        "phrase": phrase,
        "video_segment_uri_list": video_segment_uri_list
    }


def find_segment_contain_phrase(search_task_info):
    """
    在已经下载的视频片段中搜索包含phrase的视频片段,并返回这些视频片段本地路径
    :param search_task_info:搜索任务数据
    :return: 返回一个json对象,包括短语和台词中包含英语短语的视频本地路径数组
    """

    print("正在执行 find_segment_contain_phrase")
    time.sleep(1)

    info = json.loads(search_task_info)
    phrase = info["phrase"]
    video_segment_uri_list = info["video_segment_uri_list"]
    contain_list = []
    for idx, segment_uri in enumerate(video_segment_uri_list):
        print(f"searching {segment_uri}")
        time.sleep(0.5)
        contain_list.append(segment_uri)

    print(f"search result:{contain_list}")

    return {
        "phrase": phrase,
        "video_segment_uri_list": contain_list
    }


def merge_segment_to_one_single_video(meta_info):
    """
    将多个视频片段合成一个单一视频,合成后的视频是.mp4格式
    :param meta_info:用于合成视频的元数据
    :return:返回合成后视频的本地路径和短语
    """

    print("正在执行 merge_segment_to_one_single_video")
    time.sleep(1)

    info = json.loads(meta_info)
    phrase = info["phrase"]
    video_segment_uri_list = info["video_segment_uri_list"]
    video_uri = ""

    for idx, segment_uri in enumerate(video_segment_uri_list):
        video_uri = video_uri + segment_uri

    print(f"merged video uri:{video_uri}")

    return {
        "phrase": phrase,
        "video_uri": video_uri
    }


def extract_video_cover_image(merged_video_info):
    """
    从视频中抽取第1s对应的帧作为视频封面
    :param merged_video_info,合成后的视频信息
    :return: 视频封面图片本地路径,cover_uri_3_4是比例为3:4的图片,cover_uri_4_3是4:3的图片,以方便发布到社交媒体时选择合适的比例
    """

    print("正在执行 extract_video_cover_image")
    time.sleep(1)

    info = json.loads(merged_video_info)
    phrase = info["phrase"]
    video_uri = info["video_uri"]

    cover_uri_3_4 = "./outputs/cover_uri_3_4.png"
    cover_uri_4_3 = "./outputs/cover_uri_4_3.png"

    return {
        "phrase": phrase,
        "video_uri": video_uri,
        "cover_uri34": cover_uri_3_4,
        "cover_uri43": cover_uri_4_3
    }


def post_to_social_media(merged_video_info_with_cover):
    """
    将抽取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体
    :param merged_video_info_with_cover: 等待发布的视频数据
    :return:是个数组,单个元素表示发布到社交媒体的结果,True表示发布成功,False 表示发布失败
    """

    print(f"正在执行 post_to_social_media:{merged_video_info_with_cover}")
    time.sleep(1)

    info = json.loads(merged_video_info_with_cover)
    phrase = info["phrase"]
    video_uri = info["video_uri"]
    cover_uri_3_4 = info["cover_uri34"]
    cover_uri_4_3 = info["cover_uri43"]

    social_media_list = ["douyin", "xhs", "bili", "kuai"]
    for idx, social in enumerate(social_media_list):
        print(f"posting to {social}")
        print(f"phrase:{phrase}")
        print(f"video_uri:{video_uri}")
        print(f"cover_uri_3_4:{cover_uri_3_4}")
        print(f"cover_uri_4_3:{cover_uri_4_3}")
        time.sleep(1)

    douyin = True
    xiaohongshu = True
    bilibili = True
    kuaishou = True

    return [douyin, xiaohongshu, bilibili, kuaishou]


class Model:
    API_KEY = "API_KEY"
    BASE_URL = "https://api.deepseek"
    MODEL_NAME = "deepseek-chat"

    def __init__(self) -> None:
        super().__init__()
        self.client = OpenAI(api_key=self.API_KEY, base_url=self.BASE_URL)

    def generate_response(self, user_task, tools):
        """任务规划器(使用GPT进行任务分解)"""

        system_msg = f'''
                        你是一个高级任务规划AI,请将用户任务分解为可执行的步骤。
                        可用工具:{json.dumps(tools, indent=2)}
                        输出要求:
                        1. 使用JSON格式,包含steps数组
                        2. 每个步骤包含:
                           - "type":"tool_call"
                           - "tool_name"(工具名称,如果是工具调用)
                           - "parameters",参数对象,里面只有1个key-value对,key是对应工具方法的入参名称,比如 "tool_name": "download_tv_series_segment_list" 对应的 parameters 为 {{"tv_series_info":{{step2}}}}
                           - "description"(步骤描述)
                        3. 需要步骤存在先后顺序,需要处理参数依赖(用{{前序步骤编号}}标记),比如{{step1}}
                        4. 如果使用了tool,请确保给出的参数符合tool的要求
                    '''

        response = self.client.chat.completions.create(
            model=self.MODEL_NAME,
            messages=[
                {"role": "system", "content": system_msg},
                {"role": "user", "content": user_task}
            ],
            response_format={
                'type': 'json_object'
            },
            stream=False
        )

        print(f"task_planner response:{response}")

        try:
            plan = json.loads(response.choices[0].message.content)
            return plan['steps']
        except Exception as e:
            print(f"解析失败:{e}")
            print(f"使用备用计划,原数据{response}")
            return []


class Toolbox:
    def __init__(self):
        self.tools = {}

    def store(self, tools):
        for tool in tools:
            name = tool.__name__
            doc = tool.__doc__
            self.tools[name] = doc

    def get_tool_description(self):
        return self.tools


class AgentV2:
    def __init__(self, tools, model_service, stop_token=None):
        self.tools = tools
        self.model_service = model_service
        self.stop_token = stop_token

    def prepare_tools(self):
        toolbox = Toolbox()
        toolbox.store(self.tools)
        tool_descriptions = toolbox.get_tool_description()
        return tool_descriptions

    def do_task(self, steps):
        """执行任务计划"""
        context = {}

        for idx, step in enumerate(steps, 1):
            print(f"\n{'=' * 30}")
            print(f"步骤 {idx}: {step['description']}")

            # 处理参数中的占位符
            params = step.get('parameters', {}).copy()
            for k, v in params.items():
                if isinstance(v, str) and "{step" in v:
                    params[k] = re.sub(r'\{step(\d+)\}', lambda m: json.dumps(context.get(int(m.group(1)), "")), v)

            if step['type'] == 'tool_call':
                tool = step['tool_name']
                print(f"调用工具 {tool}: {params}")

                if tool == 'get_frequently_used_phrase':
                    result = get_frequently_used_phrase()
                elif tool == 'fetch_tv_series_list':
                    result = fetch_tv_series_list(params['phrase'])
                elif tool == 'download_tv_series_segment_list':
                    result = download_tv_series_segment_list(params["tv_series_info"])
                elif tool == 'find_segment_contain_phrase':
                    result = find_segment_contain_phrase(params['search_task_info'])
                elif tool == 'merge_segment_to_one_single_video':
                    result = merge_segment_to_one_single_video(params['meta_info'])
                elif tool == 'extract_video_cover_image':
                    result = extract_video_cover_image(params['merged_video_info'])
                elif tool == 'post_to_social_media':
                    result = post_to_social_media(params['merged_video_info_with_cover'])
                else:
                    result = "未知工具"

                print(f"工具返回结果:{result}")
                context[idx] = result

            else:
                print("未知type")

        return context

    def plan(self, task):
        print(f"原始任务: {task}")

        print("\n开始任务规划...")
        # plan = self.model_service.generate_response(task, self.prepare_tools())
        plan = [
            {
                "type": "tool_call",
                "tool_name": "get_frequently_used_phrase",
                "parameters": {},
                "description": "获取英语口语常用短语,例如 'break a leg'"
            },
            {
                "type": "tool_call",
                "tool_name": "fetch_tv_series_list",
                "parameters": {
                    "phrase": "{step1}"
                },
                "description": "搜索并获取包含该英语短语的美剧数据"
            },
            {
                "type": "tool_call",
                "tool_name": "download_tv_series_segment_list",
                "parameters": {
                    "tv_series_info": "{step2}"
                },
                "description": "根据美剧详细信息下载对应的在线美剧视频片段"
            },
            {
                "type": "tool_call",
                "tool_name": "find_segment_contain_phrase",
                "parameters": {
                    "search_task_info": "{step3}"
                },
                "description": "在已经下载的视频片段中搜索包含短语的视频片段"
            },
            {
                "type": "tool_call",
                "tool_name": "merge_segment_to_one_single_video",
                "parameters": {
                    "meta_info": "{step4}"
                },
                "description": "将多个视频片段合成一个单一视频"
            },
            {
                "type": "tool_call",
                "tool_name": "extract_video_cover_image",
                "parameters": {
                    "merged_video_info": "{step5}"
                },
                "description": "从视频中提取第1秒对应的帧作为视频封面"
            },
            {
                "type": "tool_call",
                "tool_name": "post_to_social_media",
                "parameters": {
                    "merged_video_info_with_cover": "{step6}"
                },
                "description": "将提取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体"
            }
        ]

        print("\n生成的执行计划:")
        for i, step in enumerate(plan, 1):
            print(f"{i}. [{step['type']}] {step['description']}")

        return plan

    def execute(self, task):
        print("agent start thinking")
        steps = self.plan(task)
        print(f"agent think result:{steps}")

        print(f"\n{'=' * 30}")
        print("\n开始执行计划...")
        print(json.dumps(steps, indent=2, ensure_ascii=False))
        final_context = self.do_task(steps)

        print(f"\n{'=' * 30}")
        print("\n最终结果:")
        print(final_context.get(len(steps), "任务未完成"))


if __name__ == "__main__":
    agent = AgentV2(
        tools=[
            get_frequently_used_phrase,
            fetch_tv_series_list,
            download_tv_series_segment_list,
            find_segment_contain_phrase,
            merge_segment_to_one_single_video,
            extract_video_cover_image,
            post_to_social_media
        ],
        model_service=Model(),
        stop_token=None
    )

    query = "制作一个视频,由多个片段组成,每个片段中台词均含有 break a leg,各个视频片段都出自美剧"
    agent.execute(query)

执行代码结果:

agent start thinking

原始任务: 制作一个视频,由多个片段组成,每个片段中台词均含有 break a leg,各个视频片段都出自美剧

开始任务规划...

生成的执行计划:
1. [tool_call] 获取英语口语常用短语,例如 'break a leg'
2. [tool_call] 搜索并获取包含该英语短语的美剧数据
3. [tool_call] 根据美剧详细信息下载对应的在线美剧视频片段
4. [tool_call] 在已经下载的视频片段中搜索包含短语的视频片段
5. [tool_call] 将多个视频片段合成一个单一视频
6. [tool_call] 从视频中提取第1秒对应的帧作为视频封面
7. [tool_call] 将提取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体

agent think result:
[
  {
    "type": "tool_call",
    "tool_name": "get_frequently_used_phrase",
    "parameters": {},
    "description": "获取英语口语常用短语,例如 'break a leg'"
  },
  {
    "type": "tool_call",
    "tool_name": "fetch_tv_series_list",
    "parameters": {
      "phrase": "{step1}"
    },
    "description": "搜索并获取包含该英语短语的美剧数据"
  },
  {
    "type": "tool_call",
    "tool_name": "download_tv_series_segment_list",
    "parameters": {
      "tv_series_info": "{step2}"
    },
    "description": "根据美剧详细信息下载对应的在线美剧视频片段"
  },
  {
    "type": "tool_call",
    "tool_name": "find_segment_contain_phrase",
    "parameters": {
      "search_task_info": "{step3}"
    },
    "description": "在已经下载的视频片段中搜索包含短语的视频片段"
  },
  {
    "type": "tool_call",
    "tool_name": "merge_segment_to_one_single_video",
    "parameters": {
      "meta_info": "{step4}"
    },
    "description": "将多个视频片段合成一个单一视频"
  },
  {
    "type": "tool_call",
    "tool_name": "extract_video_cover_image",
    "parameters": {
      "merged_video_info": "{step5}"
    },
    "description": "从视频中提取第1秒对应的帧作为视频封面"
  },
  {
    "type": "tool_call",
    "tool_name": "post_to_social_media",
    "parameters": {
      "merged_video_info_with_cover": "{step6}"
    },
    "description": "将提取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体"
  }
]

==============================

开始执行计划...
[
  {
    "type": "tool_call",
    "tool_name": "get_frequently_used_phrase",
    "parameters": {},
    "description": "获取英语口语常用短语,例如 'break a leg'"
  },
  {
    "type": "tool_call",
    "tool_name": "fetch_tv_series_list",
    "parameters": {
      "phrase": "{step1}"
    },
    "description": "搜索并获取包含该英语短语的美剧数据"
  },
  {
    "type": "tool_call",
    "tool_name": "download_tv_series_segment_list",
    "parameters": {
      "tv_series_info": "{step2}"
    },
    "description": "根据美剧详细信息下载对应的在线美剧视频片段"
  },
  {
    "type": "tool_call",
    "tool_name": "find_segment_contain_phrase",
    "parameters": {
      "search_task_info": "{step3}"
    },
    "description": "在已经下载的视频片段中搜索包含短语的视频片段"
  },
  {
    "type": "tool_call",
    "tool_name": "merge_segment_to_one_single_video",
    "parameters": {
      "meta_info": "{step4}"
    },
    "description": "将多个视频片段合成一个单一视频"
  },
  {
    "type": "tool_call",
    "tool_name": "extract_video_cover_image",
    "parameters": {
      "merged_video_info": "{step5}"
    },
    "description": "从视频中提取第1秒对应的帧作为视频封面"
  },
  {
    "type": "tool_call",
    "tool_name": "post_to_social_media",
    "parameters": {
      "merged_video_info_with_cover": "{step6}"
    },
    "description": "将提取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体"
  }
]

==============================
步骤 1: 获取英语口语常用短语,例如 'break a leg'
调用工具 get_frequently_used_phrase
正在执行 get_frequently_used_phrase
工具返回结果:
"break a leg"

==============================
步骤 2: 搜索并获取包含该英语短语的美剧数据
调用工具 fetch_tv_series_list
正在执行 fetch_tv_series_list:"break a leg"
工具返回结果:
{
  "phrase": "\"break a leg\"",
  "tv_series_list": [
    {
      "series": "海绵宝宝",
      "season": "第十一季",
      "episode": "第39集",
      "duration": "00:14:32,448 - 00:14:35,620",
      "subtitle": "Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred."
    },
    {
      "series": "生活大爆炸",
      "season": "第一季",
      "episode": "第10集",
      "duration": "00:12:25,960 - 00:12:27,990",
      "subtitle": "Oh-- break a leg. Break a leg.So, road trip to long beach."
    }
  ]
}

==============================
步骤 3: 根据美剧详细信息下载对应的在线美剧视频片段
调用工具 download_tv_series_segment_list
正在执行 download_tv_series_segment_list
{"phrase": "\"break a leg\"", "tv_series_list": [{"series": "\u6d77\u7ef5\u5b9d\u5b9d", "season": "\u7b2c\u5341\u4e00\u5b63", "episode": "\u7b2c39\u96c6", "duration": "00:14:32,448 - 00:14:35,620", "subtitle": "Me little star! Break a leg. Break two legs.- My leg! - Shut up, Fred."}, {"series": "\u751f\u6d3b\u5927\u7206\u70b8", "season": "\u7b2c\u4e00\u5b63", "episode": "\u7b2c10\u96c6", "duration": "00:12:25,960 - 00:12:27,990", "subtitle": "Oh-- break a leg. Break a leg.So, road trip to long beach."}]}
downloading 海绵宝宝第十一季第39集-00:14:32,448 - 00:14:35,620
downloading 生活大爆炸第一季第10集-00:12:25,960 - 00:12:27,990
工具返回结果:
{
  "phrase": "\"break a leg\"",
  "video_segment_uri_list": [
    "./inputs/海绵宝宝第十一季第39集0.ts",
    "./inputs/生活大爆炸第一季第10集1.ts"
  ]
}

==============================
步骤 4: 在已经下载的视频片段中搜索包含短语的视频片段
调用工具 find_segment_contain_phrase
正在执行 find_segment_contain_phrase
searching ./inputs/海绵宝宝第十一季第39集0.ts
searching ./inputs/生活大爆炸第一季第10集1.ts
工具返回结果:
{
  "phrase": "\"break a leg\"",
  "video_segment_uri_list": [
    "./inputs/海绵宝宝第十一季第39集0.ts",
    "./inputs/生活大爆炸第一季第10集1.ts"
  ]
}

==============================
步骤 5: 将多个视频片段合成一个单一视频
调用工具 merge_segment_to_one_single_video
正在执行 merge_segment_to_one_single_video
工具返回结果:
{
  "phrase": "\"break a leg\"",
  "video_uri": "./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts"
}

==============================
步骤 6: 从视频中提取第1秒对应的帧作为视频封面
调用工具 extract_video_cover_image
正在执行 extract_video_cover_image
工具返回结果:
{
  "phrase": "\"break a leg\"",
  "video_uri": "./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts",
  "cover_uri34": "./outputs/cover_uri_3_4.png",
  "cover_uri43": "./outputs/cover_uri_4_3.png"
}

==============================
步骤 7: 将提取出来的视频封面数据发布到小红书、抖音、Bilibili、快手等社交媒体
调用工具 post_to_social_media
正在执行 post_to_social_media:{"phrase": "\"break a leg\"", "video_uri": "./inputs/\u6d77\u7ef5\u5b9d\u5b9d\u7b2c\u5341\u4e00\u5b63\u7b2c39\u96c60.ts./inputs/\u751f\u6d3b\u5927\u7206\u70b8\u7b2c\u4e00\u5b63\u7b2c10\u96c61.ts", "cover_uri34": "./outputs/cover_uri_3_4.png", "cover_uri43": "./outputs/cover_uri_4_3.png"}

posting to douyin
phrase:"break a leg"
video_uri:./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts
cover_uri_3_4:./outputs/cover_uri_3_4.png
cover_uri_4_3:./outputs/cover_uri_4_3.png

posting to xhs
phrase:"break a leg"
video_uri:./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts
cover_uri_3_4:./outputs/cover_uri_3_4.png
cover_uri_4_3:./outputs/cover_uri_4_3.png

posting to bili
phrase:"break a leg"
video_uri:./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts
cover_uri_3_4:./outputs/cover_uri_3_4.png
cover_uri_4_3:./outputs/cover_uri_4_3.png

posting to kuai
phrase:"break a leg"
video_uri:./inputs/海绵宝宝第十一季第39集0.ts./inputs/生活大爆炸第一季第10集1.ts
cover_uri_3_4:./outputs/cover_uri_3_4.png
cover_uri_4_3:./outputs/cover_uri_4_3.png
工具返回结果:
[
  true,
  true,
  true,
  true
]

==============================

最终结果:
[True, True, True, True]

相比较V1版本,V2调用的工具更多,需要严格按照顺序调用工具,并且把前一个工具的输出当做入参投喂给下一个工具。所以我们在Prompt里面告诉大模型,生成的工具调用清单中,使用 StepN 作为特殊标志,以方便我们在粘合剂代码中完成前后工具之间的数据流动。
这里需要特别注意,一定要保证大模型生成的 StepN 正确,否则执行结果出错。
比如 post_to_social_media() 这个方法的注释,我写的是「将抽取出来的视频封面数据发布到小红书、抖音、B站、快手等短视频平台」。这个注释看起来怪怪的。
因为正常情况下,我们应该写成「将视频和封面数据发布到小红书、抖音、B站、快手等短视频平台」,但是这样写的话,大模型生成的任务清单中,post_to_social_media 对应的 StepN 会是 step5 而非 step6。
因为在大模型看来,抽取封面和发布视频并列的,都是基于 step5 的。

v2.5 Function Call

在上述示例中,我们的Agent调用的工具都是方法或函数。OpenAI的SDK中其实已经封装了tools参数,我们可以按照格式要求传入方法或函数信息,以 Function Call 的形式来调用工具。
以下是 Deepseek 官方文档给的 Function Call 示例。注意,不是所有的大模型都支持function call,所以我们在使用前要确认下。
在下面的示例中,第一次调用 send_message 大模型会返回要使用的工具即 get_weather 方法以及其入参。然后把假装调用工具生成的 24℃ 等数据再次连同之前的输入投喂给大模型,大模型会整理后给出最终气温呈现给用户。
注意,代码中有多次 messages.append(message) 这是为了保留之前的会话数据,让大模型了解上下文。之前的Prompt数据会命中Deepseek的缓存,计费上会有折扣。
deepseek 官方 function call 示例:

from openai import OpenAI

def send_messages(messages):
    response = client.chat.completions.create(
        model="deepseek-chat",
        messages=messages,
        tools=tools
    )
    return response.choices[0].message

client = OpenAI(
    api_key="<your api key>",
    base_url="https://api.deepseek",
)

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get weather of an location, the user shoud supply a location first",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g. San Francisco, CA",
                    }
                },
                "required": ["location"]
            },
        }
    },
]

messages = [{"role": "user", "content": "How's the weather in Hangzhou?"}]
message = send_messages(messages)
print(f"User>\t {messages[0]['content']}")

tool = message.tool_calls[0]
messages.append(message)

messages.append({"role": "tool", "tool_call_id": tool.id, "content": "24℃"})
message = send_messages(messages)
print(f"Model>\t {message.content}")

运行结果:

User>
 [{'role': 'user', 'content': "How's the weather in Hangzhou?"}]
Model>
[
    Choice(
        finish_reason='tool_calls',
        index=0,
        logprobs=None,
        message=ChatCompletionMessage(
            content='',
            refusal=None,
            role='assistant',
            annotations=None,
            audio=None, 
            function_call=None, 
            tool_calls=[
                ChatCompletionMessageToolCall(
                    id='call_0_f03a852b-0600-495a-a29f-9616c4c46abb', 
                    function=Function(
                        arguments='{"location":"Hangzhou"}',
                        name='get_weather'
                    ),
                    type='function', 
                    index=0
                )
            ]
        )
    )
]


User>
[
    {
        'role': 'user',
        'content': "How's the weather in Hangzhou?"
    },
    ChatCompletionMessage(
        content='',
        refusal=None,
        role='assistant',
        annotations=None,
        audio=None,
        function_call=None,
        tool_calls=[
            ChatCompletionMessageToolCall(
                id='call_0_f03a852b-0600-495a-a29f-9616c4c46abb',
                function=Function(
                    arguments='{"location":"Hangzhou"}',
                    name='get_weather'
                ),
                type='function',
                index=0
            )
        ]
    ),
    {
        'role': 'tool',
        'tool_call_id': 'call_0_f03a852b-0600-495a-a29f-9616c4c46abb',
        'content': '24℃'
    }
]

Model>
[
    Choice(
        finish_reason='stop',
        index=0,
        logprobs=None, 
        message=ChatCompletionMessage(
            content='The current weather in Hangzhou is 24°C.',
            refusal=None,
            role='assistant',
            annotations=None,
            audio=None,
            function_call=None,
            tool_calls=None
        )
    )
]

V3: MCP

从V2代码看,如果我们有很多工具,需要写很多胶水代码,容易出错,不易维护。
MCP 可以解决这个问题。
MCP 是 Model Context Protocol 的缩写,是 Anthropic 提出的一种协议,用于拓展大模型的能力边界,使其不仅可以生成内容,还可以执行动作。

MCP is an open protocol that standardizes how applications provide context to LLMs. Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect your devices to various peripherals and accessories, MCP provides a standardized way to connect AI models to different data sources and tools.

官方示例

我们来看下 Anthropic 给出的官方示例,一个天气预报Agent。

工具方法单独放在 server.py 文件中,以 @mcp.tool() 注解标注。

server.py

from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"

async def make_nws_request(url: str) -> dict[str, Any] | None:
    """Make a request to the NWS API with proper error handling."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None

def format_alert(feature: dict) -> str:
    """Format an alert feature into a readable string."""
    props = feature["properties"]
    return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""


@mcp.tool()
async def get_alerts(state: str) -> str:
    """Get weather alerts for a US state.

    Args:
        state: Two-letter US state code (e.g. CA, NY)
    """
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)

    if not data or "features" not in data:
        return "Unable to fetch alerts or no alerts found."

    if not data["features"]:
        return "No active alerts for this state."

    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
    """Get weather forecast for a location.

    Args:
        latitude: Latitude of the location
        longitude: Longitude of the location
    """
    # First get the forecast grid endpoint
    points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
    points_data = await make_nws_request(points_url)

    if not points_data:
        return "Unable to fetch forecast data for this location."

    # Get the forecast URL from the points response
    forecast_url = points_data["properties"]["forecast"]
    forecast_data = await make_nws_request(forecast_url)

    if not forecast_data:
        return "Unable to fetch detailed forecast."

    # Format the periods into a readable forecast
    periods = forecast_data["properties"]["periods"]
    forecasts = []
    for period in periods[:5]:  # Only show next 5 periods
        forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
        forecasts.append(forecast)

    return "\n---\n".join(forecasts)


if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='stdio')

client.py充当Agent的作用,与大模型交互并根据任务清单调用server.py里面的工具。
从 connect_to_server() 方法可以看出,工具方法同时支持Python脚本方法和JavaScript脚本方法。

client.py

import asyncio
import json
import sys
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv()  # load environment variables from .env


class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.anthropic = Anthropic()

    # methods will go here

    async def connect_to_server(self, server_script_path: str):

        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:

        """Process a query using Claude and available tools"""
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]

        response = await self.session.list_tools()
        available_tools = [{
            "name": tool.name,
            "description": tool.description,
            "input_schema": tool.inputSchema
        } for tool in response.tools]

        print(f"available_tools:{available_tools}")
        print(f"messages:{messages}")

        # Initial Claude API call
        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=messages,
            tools=available_tools
        )

        print(f"first resp:{response.content}")

        # Process response and handle tool calls
        final_text = []

        assistant_message_content = []
        for content in response.content:
            if content.type == 'text':
                final_text.append(content.text)
                assistant_message_content.append(content)
            elif content.type == 'tool_use':
                tool_name = content.name
                tool_args = content.input

                # Execute tool call
                result = await self.session.call_tool(tool_name, tool_args)
                final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

                assistant_message_content.append(content)
                messages.append({
                    "role": "assistant",
                    "content": assistant_message_content
                })
                messages.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": content.id,
                            "content": result.content
                        }
                    ]
                })

                print(f"messages:{messages}")

                # Get next response from Claude
                response = self.anthropic.messages.create(
                    model="claude-3-5-sonnet-20241022",
                    max_tokens=1024,
                    messages=messages,
                    tools=available_tools
                )

                print(f"second resp:{response.content}")

                final_text.append(response.content[0].text)

        return "\n".join(final_text)

    async def chat_loop(self):

        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                if query.lower().strip() == '':
                    continue

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

和 Function Call 类似,该示例也是把工具的信息作为 SDK API 的一个参数(tools)传入,其格式与Function call有所不同。
OpenAI SDK 中Function call的格式:

{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather of an location, the user shoud supply a location first",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "The city and state, e.g. San Francisco, CA",
                }
            },
            "required": ["location"]
        },
    }
}

Anthropic SDK API中工具信息格式:

{
  'name': 'get_alerts',
  'description': 'Get weather alerts for a US state.\n\n    Args:\n        state: Two-letter US state code (e.g. CA, NY)\n    ',
  'input_schema': {
    'properties': {
      'state': {
        'title': 'State',
        'type': 'string'
      }
    },
    'required': [
      'state'
    ],
    'title': 'get_alertsArguments',
    'type': 'object'
  }
}

对于每次请求,大模型的返回值都是一个数组:

[
    TextBlock(
        citations=None,
        text="I'll help you check the weather alerts for New York state and can provide a forecast if you give me specific coordinates.\n\nLet me check the weather alerts for New York first:",
        type='text'
    ), 
    ToolUseBlock(
        id='toolu_01VA9YevPQBxQ7QgfFr4ZPNS',
        input={'state': 'NY'},
        name='get_alerts',
        type='tool_use'
    )
]

TextBlock 代表直接展示给用户的数据,而 ToolUseBlock 表示下一步的工具调用信息。

先执行服务端代码:

uv run server.py

在执行客户端代码:

uv run client.py YOUR_PATH/weather/server.py

执行结果如下:

Processing request of type ListToolsRequest

available_tools:
[
    {
        'name': 'get_alerts',
        'description': 'Get weather alerts for a US state.\n\n    Args:\n        state: Two-letter US state code (e.g. CA, NY)\n    ',
        'input_schema': {
            'properties': {
                'state': {
                    'title': 'State',
                    'type': 'string'
                }
            },
            'required': ['state'],
            'title': 'get_alertsArguments',
            'type': 'object'
        }
    },
    {
        'name': 'get_forecast',
        'description': 'Get weather forecast for a location.\n\n    Args:\n        latitude: Latitude of the location\n        longitude: Longitude of the location\n    ',
        'input_schema': {
            'properties': {
                'latitude': {
                    'title': 'Latitude',
                    'type': 'number'
                },
                'longitude': {
                    'title': 'Longitude',
                    'type': 'number'
                }
            },
            'required': ['latitude', 'longitude'],
            'title': 'get_forecastArguments',
            'type': 'object'
        }
    }
]

first resp:
[
    TextBlock(
        citations=None,
        text="I'll help you check the weather alerts for New York state and can provide a forecast if you give me specific coordinates.\n\nLet me check the weather alerts for New York first:",
        type='text'
    ), 
    ToolUseBlock(
        id='toolu_01VA9YevPQBxQ7QgfFr4ZPNS',
        input={'state': 'NY'},
        name='get_alerts',
        type='tool_use'
    )
]

Processing request of type CallToolRequest

HTTP Request: GET https://api.weather.gov/alerts/active/area/NY "HTTP/1.1 200 OK"

second resp:
[
    TextBlock(
        citations=None,
        text="I can see there are several active weather alerts in New York state, including:\n1. A Flood Warning in Monroe County\n2. Wind Advisories in several counties including Jefferson, Lewis, Northern Herkimer, Hamilton, and others\n3. Winter Weather Advisories in various areas including Northern Herkimer, Hamilton, and other locations\n\nIf you'd like to get a specific forecast for a particular location in New York, I'll need the latitude and longitude coordinates for that location. Would you like to know the forecast for a specific city or place in New York?",
        type='text'
    )
]

I'll help you check the weather alerts for New York state and can provide a forecast if you give me specific coordinates.

Let me check the weather alerts for New York first:
[Calling tool get_alerts with args {'state': 'NY'}]

I can see there are several active weather alerts in New York state, including:
1. A Flood Warning in Monroe County
2. Wind Advisories in several counties including Jefferson, Lewis, Northern Herkimer, Hamilton, and others
3. Winter Weather Advisories in various areas including Northern Herkimer, Hamilton, and other locations

If you'd like to get a specific forecast for a particular location in New York, I'll need the latitude and longitude coordinates for that location. Would you like to know the forecast for a specific city or place in New York?

如何使用其他大模型,比如 Deepseek,对接 MCP 呢?要使用MCP,特别是使用工具时,我们需要把工具信息传递给模型。
目前有两种方式可以实现:

  • 大模型支持Function call,通过SDK中的tools参数将MCP中的各种工具信息传递给大模型
  • 将tools信息以Prompt的形式输入给大模型
    优先使用Function call的方式。

MCP 优势:

  • 丰富的Tool库
  • 方便集成不同技术栈的Tool

劣势:

  • 科学上网限制
  • 安全漏洞

参考文献

  • A practical guide to building agents
  • Agent2Agent Protocol
  • AI安全警报-MCP协议曝重大漏洞:你的智能代理正在被劫持!
  • MCP Server Use Cases
  • model context protocol
  • Agent Whitepaper
  • whitepaper-prompt-engineering
  • How to use Anthropic MCP Server with open LLMs, OpenAI or Google Gemini
  • MCP 实战:调用DeepSeek实现MCP客户端和服务端快速搭建
  • Awesome MCP Servers
  • 图解「模型上下文协议(MCP)」
  • MCP协议深度解读:技术创新正以前所未有的速度突破

发布者:admin,转转请注明出处:http://www.yc00.com/web/1754770741a5200068.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信