大模型ReAct-基于langchain框架

ReAct智能体出自于paper “ReAct: Synergizing Reasoning and Acting in Language Models”

ReAct 是一个将推理和行为与 LLMs 相结合通用的范例。ReAct 提示 LLMs 为任务生成口头推理轨迹和操作。这使得系统执行动态推理来创建、维护和调整操作计划,同时还支持与外部环境(例如,Wikipedia)的交互,以将额外信息合并到推理中。其核心逻辑在于推理+执行,分为如下三个步骤:Thought、Action、Observation。

  • Thought:由LLM模型生成,要求模型执行之前必须先进行思考。让模型自己规划需要执行什么行为来完成任务
  • Action:Act是指LLM判断本次需要执行的具体行为。Act一般由两部分组成:行为和对象。用编程的说法就是API名称和对应的入参。LLM模型最大的优势是,可以根据Thought的判断,选择需要使用的API并生成需要填入API的参数。从而保证了ReAct框架在执行层面的可行性。
  • Observation:LLM框架对于外界输入的获取。它就像LLM的五官,将外界的反馈信息同步给LLM模型,协助LLM模型进一步的做分析或者决策。

在langchain框架中,有一个基础的React框架和一个通过functioncall来实现的react框架。在langchain后续的langraph中,react流程默认就使用functioncall流程了。

使用langchain进行普通对话

以下是使用langchain构造的一个对话页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import ChatMessage
from langchain_openai import ChatOpenAI
import streamlit as st
import os

base_url = "http://xxx.com/openapi/v1"
api_key = os.environ['API_KEY']

class StreamHandler(BaseCallbackHandler):
def __init__(self, container, initial_text=""):
self.container = container
self.text = initial_text

def on_llm_new_token(self, token: str, **kwargs) -> None:
self.text += token
self.container.markdown(self.text)

with st.sidebar:
openai_api_key = st.text_input("OpenAI API Key", type="password")

if "messages" not in st.session_state:
st.session_state["messages"] = [] # [ChatMessage(role="system", content="How can I help you?")]

for msg in st.session_state.messages:
st.chat_message(msg.role).write(msg.content)

if prompt := st.chat_input():
st.session_state.messages.append(ChatMessage(role="user", content=prompt))
st.chat_message("user").write(prompt)

with st.chat_message("assistant"):
stream_handler = StreamHandler(st.empty())
llm = ChatOpenAI(base_url=base_url,api_key=api_key,model="gpt-4o", streaming=True, callbacks=[stream_handler])
st.write("message=",st.session_state.messages)
response = llm.invoke(st.session_state.messages)
st.session_state.messages.append(ChatMessage(role="assistant", content=response.content))

使用langchain 搭建ReAct agent

首先介绍langchain中基于paper提出的Thought、Action、Observation循环来搭建ReAct流程的agent。from langchain.agents import AgentExecutor, Tool, create_react_agent
langchain react中的系统提示词模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}

调用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
from langchain import hub
from langchain_community.llms import OpenAI
from langchain.agents import AgentExecutor, create_react_agent

prompt = hub.pull("hwchase17/react")
model = OpenAI()
tools = ...

agent = create_react_agent(model, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)

agent_executor.invoke({"input": "hi"})

在这个react流程中,要求模型每次都必须先thought。调用的工具和工具的参数填写在ActionAction Input中。同时,该流程要求流程处理完成之后一定要返回Final Answer字段。
每次模型输出之后,都会调用不同的Paarser去解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ReActOutputParser(AgentOutputParser):
"""Output parser for the ReAct agent."""

def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
action_prefix = "Action: "
if not text.strip().split("\n")[-1].startswith(action_prefix):
raise OutputParserException(f"Could not parse LLM Output: {text}")
action_block = text.strip().split("\n")[-1]
action_str = action_block[len(action_prefix) :]
# Parse out the action and the directive.
re_matches = re.search(r"(.*?)\[(.*?)\]", action_str)
if re_matches is None:
raise OutputParserException(
f"Could not parse action directive: {action_str}"
)
action, action_input = re_matches.group(1), re_matches.group(2)
if action == "Finish":
return AgentFinish({"output": action_input}, text)
else:
return AgentAction(action, action_input, text)

当解析失败时,就会把解析失败的报错信息当作Thought继续发给大模型,当模型指令遵循能力不足的时候,便容易陷入死循环,如没有结束标志,如react_agent没有收到大模型返回的Action:Finish那么就会一直给大模型发,直到达到迭代上限。

从上述的agent的提示词可以看出,每一步都必须调用工具,这是一个专门针对工具调用的agent,但我们对话中,有时候并不要调用工具,所以在langchain中,也专门定义了一套用于对话的ReAct agent:ConversationalChatAgent,其提示词如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# flake8: noqa
PREFIX = """Assistant is a large language model trained by OpenAI.

Assistant is designed to be able to assist with a wide range of tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics. As a language model, Assistant is able to generate human-like text based on the input it receives, allowing it to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand.

Assistant is constantly learning and improving, and its capabilities are constantly evolving. It is able to process and understand large amounts of text, and can use this knowledge to provide accurate and informative responses to a wide range of questions. Additionally, Assistant is able to generate its own text based on the input it receives, allowing it to engage in discussions and provide explanations and descriptions on a wide range of topics.

Overall, Assistant is a powerful system that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. Whether you need help with a specific question or just want to have a conversation about a particular topic, Assistant is here to assist."""

FORMAT_INSTRUCTIONS = """RESPONSE FORMAT INSTRUCTIONS
----------------------------

When responding to me, please output a response in one of two formats:

**Option 1:**
Use this if you want the human to use a tool.
Markdown code snippet formatted in the following schema:

```json
{{{{
"action": string, \\\\ The action to take. Must be one of {tool_names}
"action_input": string \\\\ The input to the action
}}}}
```

**Option #2:**
Use this if you want to respond directly to the human. Markdown code snippet formatted in the following schema:

```json
{{{{
"action": "Final Answer",
"action_input": string \\\\ You should put what you want to return to use here
}}}}
```"""

SUFFIX = """TOOLS
------
Assistant can ask the user to use tools to look up information that may be helpful in answering the users original question. The tools the human can use are:

{{tools}}

{format_instructions}

USER'S INPUT
--------------------
Here is the user's input (remember to respond with a markdown code snippet of a json blob with a single action, and NOTHING else):

{{{{input}}}}"""

TEMPLATE_TOOL_RESPONSE = """TOOL RESPONSE:
---------------------
{observation}

USER'S INPUT
--------------------

Okay, so what is the response to my last comment? If using information obtained from the tools you must mention it explicitly without mentioning the tool names - I have forgotten all TOOL RESPONSES! Remember to respond with a markdown code snippet of a json blob with a single action, and NOTHING else."""

这个流程的Parser也不同,该流程需要接受最后的Final Answer作为结束条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ConvoOutputParser(AgentOutputParser):
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
"""Attempts to parse the given text into an AgentAction or AgentFinish.

Raises:
OutputParserException if parsing fails.
"""
try:
# Attempt to parse the text into a structured format (assumed to be JSON
# stored as markdown)
response = parse_json_markdown(text)

# If the response contains an 'action' and 'action_input'
if "action" in response and "action_input" in response:
action, action_input = response["action"], response["action_input"]

# If the action indicates a final answer, return an AgentFinish
if action == "Final Answer":
return AgentFinish({"output": action_input}, text)
else:
# Otherwise, return an AgentAction with the specified action and
# input
return AgentAction(action, action_input, text)
else:
# If the necessary keys aren't present in the response, raise an
# exception
raise OutputParserException(
f"Missing 'action' or 'action_input' in LLM output: {text}"
)
except Exception as e:
# If any other exception is raised during parsing, also raise an
# OutputParserException
raise OutputParserException(f"Could not parse LLM output: {text}") from e


以下提供一个使用ConversationalChatAgentstreamlit完成的一个tools调用webui

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from langchain.agents import ConversationalChatAgent, AgentExecutor,Tool
from langchain.memory import ConversationBufferMemory
from langchain_community.callbacks import StreamlitCallbackHandler
from langchain_community.chat_message_histories import StreamlitChatMessageHistory
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from typing import Optional, List, Dict
import os
from langchain_core.callbacks import BaseCallbackHandler
import logging
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def ddgs_text(
query: str
) -> List[Dict[str, str]]:
""""useful for when you need to answer questions about current events. You should ask targeted questions"""
from duckduckgo_search import DDGS

with DDGS() as ddgs:
ddgs_gen = ddgs.text(
query,
)
if ddgs_gen:
results = [r for r in ddgs_gen]
return " ".join(r["body"] for r in results)
return []

class InteractionLogger(BaseCallbackHandler):
def __init__(self):
self.require_confirmation = True
self.logger = logging.getLogger(__name__)

def on_llm_start(self, serialized, prompts, **kwargs):
msg = f"\n=== LLM Request ===\nPrompts: {prompts}\n"
self.logger.info(msg)
# print(f"Serialized: {serialized}") # Model configuration

def on_llm_end(self, response, **kwargs):
msg = f"\n=== LLM Response ===\nResponse: {response}\n"
self.logger.info(msg)

logger = InteractionLogger()

base_url = "http://xxx.com/openapi/v1"
api_key = os.environ['API_KEY']
model = "hunyuan"

import streamlit as st

st.set_page_config(page_title="LangChain: Chat with search", page_icon="🦜")
st.title("🦜 LangChain: Chat with search")

openai_api_key = st.sidebar.text_input("OpenAI API Key", type="password")


msgs = StreamlitChatMessageHistory()
memory = ConversationBufferMemory(
chat_memory=msgs, return_messages=True, memory_key="chat_history", output_key="output"
)
if len(msgs.messages) == 0 or st.sidebar.button("Reset chat history"):
msgs.clear()
# msgs.add_ai_message("How can I help you?")
st.session_state.steps = {}

avatars = {"human": "user", "ai": "assistant"}
for idx, msg in enumerate(msgs.messages):
with st.chat_message(avatars[msg.type]):
# Render intermediate steps if any were saved
for step in st.session_state.steps.get(str(idx), []):
if step[0].tool == "_Exception":
continue
with st.status(f"**{step[0].tool}**: {step[0].tool_input}", state="complete"):
st.write(step[0].log)
st.write(step[1])
st.write(msg.content)

if prompt := st.chat_input(placeholder="Who won the Women's U.S. Open in 2018?"):
st.chat_message("user").write(prompt)

llm = ChatOpenAI(base_url=base_url,model=model,api_key=api_key, streaming=True, callbacks=[logger],)
tools = [
Tool(
name="Search",
func=ddgs_text,
description="useful for when you need to answer questions about current events. You should ask targeted questions",
)
]
chat_agent = ConversationalChatAgent.from_llm_and_tools(llm=llm, tools=tools)
executor = AgentExecutor.from_agent_and_tools(
agent=chat_agent,
tools=tools,
memory=memory,
return_intermediate_steps=True,
handle_parsing_errors=True,
)
with st.chat_message("assistant"):
st_cb = StreamlitCallbackHandler(st.container(), expand_new_thoughts=True, collapse_completed_thoughts=False)
cfg = RunnableConfig()
cfg["callbacks"] = [st_cb]
response = executor.invoke(prompt, cfg)
st.write(response["output"])
st.session_state.steps[str(len(msgs.messages) - 1)] = response["intermediate_steps"]

使用langchain functiontool 搭建agent

上一节介绍的React agent均是基于paper实现的,工具调用的流程以及工具参数。在langchain中,也可以使用当前大部分模型都支持的functioncall方式来调用工具,从而构建agent。该流程搭建ReAct就特别简单,只需要根据大模型返回的tool_call列表便可以知道是否完成整个任务,同时大模型调用tool的时候,也会输出content作为思考的部分。当工具调用的时候,大模型返回工具列表,同时标记'finish_reason': 'tool_calls',当整个任务结束时,大模型便会返回标记'finish_reason': 'stop',故而可以方便的完成整套ReAct流程。

在langchain中,使用大模型支持的functioncall搭建流程使用from langchain.agents import AgentExecutor, Tool, create_tool_calling_agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from langchain.agents import AgentExecutor, create_tool_calling_agent, tool
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant"),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
]
)
model = ChatAnthropic(model="claude-3-opus-20240229")

@tool
def magic_function(input: int) -> int:
"""Applies a magic function to an input."""
return input + 2

tools = [magic_function]

agent = create_tool_calling_agent(model, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

agent_executor.invoke({"input": "what is the value of magic_function(3)?"})

# Using with chat history
from langchain_core.messages import AIMessage, HumanMessage
agent_executor.invoke(
{
"input": "what's my name?",
"chat_history": [
HumanMessage(content="hi! my name is bob"),
AIMessage(content="Hello Bob! How can I assist you today?"),
],
}
)

使用langgraph框架

LangGraph(https://langchain-ai.github.io/langgraph/) 是一个python库,用于构建有状态的,多操作的大模型(LLM)应用程序,用于创建agent(智能体)和multi-agent(组合智能体)流程。和其他的LLM应用框架相比,他提供了核心的优点:循环、可控的和持久化。LangGraph 允许你自定义涉及到循环的流程,这对大多数agent架构来说都是必不可少的,这使它有别于基于DAG的解决方案。作为一个底层的框架,LangGraph为你提供了涉及到流程和状态的应用程序更细颗粒的控制,这对创建可靠的agent应用来说至关重要。另外,LangGraph 包括内置的持久化功能,能支持高级的人工介入(在智能体执行过程中)和记忆功能。

安装

1
pip install langgraph

langgraph 图搭建

首先我们介绍一下使用langgraph搭建一个图,其核心逻辑是创建节点、边和条件边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 定义一个图
workflow = StateGraph(MessagesState)
# 定义两个可以循环的节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
# 设置agent的入口
# 这表示这是第一个被调用的节点
workflow.add_edge(START, "agent")
# 添加条件边,支持选择的
workflow.add_conditional_edges(
# 从agent节点去连边
"agent",
should_continue,
)
# 这表示tools工具被调用后,紧接着调用agent节点,普通边,一定执行的
workflow.add_edge("tools", 'agent')
# 初始化内从以保存graph之间的运行,此次保存在内存中,也可以保存到redis或者mongodb中
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)

全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from langgraph.graph import END, START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI
from typing import Annotated, Literal, TypedDict

from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
# 导入状态图和状态
from langgraph.graph import END, START, StateGraph, MessagesState
# 导入工具节点,用于调用工具
from langgraph.prebuilt import ToolNode
from langchain_core.callbacks import BaseCallbackHandler

import os

base_url = "http://xxx.com/openapi/v1"
api_key = os.environ['API_KEY']

class InteractionLogger(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"\n=== LLM Request ===\nPrompts: {prompts}\n")
print(f"Serialized: {serialized}") # Model configuration
# print(f"Additional kwargs: {kwargs}") # Other request parameters

def on_llm_end(self, response, **kwargs):
print(f"\n=== LLM Response ===\nResponse: {response}\n")

logger = InteractionLogger()

@tool
def search_weather(city: str) -> str:
"""Search weather for a city."""
if city == '北京':
return '今天北京晴天'
else:
return '今天天气不错'


tools = [search_weather]

tool_node = ToolNode(tools)

# 1. 初始化模型
model = ChatOpenAI(
model_name="gpt-4o",
temperature=0.7,
max_tokens=100,
base_url=base_url,
api_key=api_key,
callbacks=[logger]
).bind_tools(tools)

# 定义一个调用大模型的函数
def call_model(state: MessagesState):
messages = state['messages']
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}

# 定义一个函数确定是否继续执行
def should_continue(state: MessagesState) -> Literal["tools", END]:
messages = state['messages']
last_message = messages[-1]
# 如果大模型通知调用工具的时候,我们可以路由到对应的工具节点
if last_message.tool_calls:
return "tools"
# 否则,停止执行(回复用户)
return END
# 定义一个图
workflow = StateGraph(MessagesState)
# 定义两个可以循环的节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
# 设置agent的入口
# 这表示这是第一个被调用的节点
workflow.add_edge(START, "agent")
# 添加条件边,支持选择的
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called. 这表示这些边在`agent`节点调用之后执行
"agent",
# Next, we pass in the function that will determine which node is called next. 接下来通过这个函数决定哪一个节点将被调用
should_continue,
)
# We now add a normal edge from `tools` to `agent`. 从工具到agent中添加一个普通的边(edge)
# This means that after `tools` is called, `agent` node is called next.
# 这表示tools工具被调用后,紧接着调用agent节点,普通边,一定执行的
workflow.add_edge("tools", 'agent')
# Initialize memory to persist state between graph runs
# 初始化内从以保存graph之间的运行,此次保存在内存中,也可以保存到redis或者mongodb中
checkpointer = MemorySaver()
# 最后编译,编译成一个langchain的runnable,意味着你可以像使用其他任意的runnable一样使用他,注意我们在刚刚编译的时候放入了内存记忆(memory)
app = workflow.compile(checkpointer=checkpointer)
# Use the Runnable
final_state = app.invoke(
{"messages": [HumanMessage(content="北京天气怎么样")]},
config={"configurable": {"thread_id": 42}}
)
print(final_state["messages"][-1].content)

# Use the Runnable
final_state = app.invoke(
{"messages": [HumanMessage(content="我问的是哪个城市")]},
config={"configurable": {"thread_id": 42}}
)
print(final_state["messages"][-1].content)


langgraph使用预购建的ReAct agent

langgraph也预先构建了一套基于function call的ReAct agent用于工具调用。thought和工具、工具参数的选择全部依赖模型基础能力,而不用通过提示词去解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import os
import httpx
from langchain_core.callbacks import BaseCallbackHandler
from langgraph.checkpoint.memory import MemorySaver
# 1. 初始化模型
model = ChatOpenAI(
model_name="hunyuan",
temperature=0.7,
max_tokens=100,
base_url=base_url,
api_key=api_key,
# streaming=True,
# callbacks=[logger],
# http_client=httpx.Client(transport=transport),
)

def search_weather(city: str) -> str:
"""Actual weather search implementation"""
if city == '北京':
return '今天北京晴天'
else:
return '抱歉,查询频率太高,请使用其他工具'

def search_bing(query: str) -> str:
"""Actual bing search implementation"""
if query == '北京':
return '今天北京晴天'
else:
return '晴天,20-25度,紫外线强,注意防晒'

checkpointer = MemorySaver()
config={"configurable": {"thread_id": 42}}

agent = create_react_agent(
model=model, # (2)!
checkpointer=checkpointer,
tools=[search_weather, search_bing], # (3)!
interrupt_before=["tools"],
prompt="You are a helpful assistant" # (4)!
# prompt="""Answer by following these steps:
# 1. Thought: Analyze what needs to be done
# 2. Action: Use tool if needed
# 3. Observation: Check tool result
# 4. Final Answer""",
# debug=True
)

# Run the agent
inputs = {"messages": [{"role": "user", "content": "上海天气怎么样"}]}
# final_state = agent.invoke(
# inputs,
# config=config
# )
# print(final_state)
# final_state = agent.invoke(
# None,
# config=config
# )
# print(final_state)
# final_state = agent.invoke(
# None,
# config=config
# )
# print(final_state)

final_state = None
while True:
final_state = agent.invoke(
inputs if inputs is not None else None,
config=config
)
print(final_state)
messages = final_state.get("messages", [])
if not messages:
break
# Check if we've reached the final state
last_message = messages[-1]
# Check if message is final answer (has content and no tool calls)
if (hasattr(last_message, 'content') and
last_message.content and
not hasattr(last_message, 'tool_calls')):
break
# Also check if tool calls are completed (no more tool calls in response)
if (hasattr(last_message, 'response_metadata') and
last_message.response_metadata.get('finish_reason') == 'stop'):
break
try:
user_approval = input("是否调用外部工具?(yes/no):")
except:
user_approval = "yes"

if user_approval.lower() != "yes" and user_approval.lower() != "y":
print('用户拒绝调用外部工具,结束')
break

# For next iteration, pass None as inputs to continue from checkpoint
inputs = None

langgraph ReAct agent搭建chatbot

我们通过langraph和streamlit搭建一个使用工具的聊天工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from typing import Optional, List, Dict
from langchain_core.messages import HumanMessage
import os
from langchain_core.callbacks import BaseCallbackHandler
import logging
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver,InMemorySaver
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
import streamlit as st
import uuid

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

def ddgs_text(
query: str
) -> List[Dict[str, str]]:
""""useful for when you need to answer questions about current events. You should ask targeted questions"""
from duckduckgo_search import DDGS

with DDGS() as ddgs:
ddgs_gen = ddgs.text(
query,
)
if ddgs_gen:
results = [r for r in ddgs_gen]
return " ".join(r["body"] for r in results)
return []

def search_weather(city: str) -> str:
"""Actual weather search implementation"""
if city == '北京':
return '今天北京晴天'
else:
return '抱歉,查询频率太高,请使用其他工具'

def search_bing(query: str) -> str:
"""Actual bing search implementation"""
if query == '北京':
return '今天北京晴天'
else:
return '晴天,20-25度,紫外线强,注意防晒'


class InteractionLogger(BaseCallbackHandler):
def __init__(self):
self.logger = logging.getLogger(__name__)

def on_llm_start(self, serialized, prompts, **kwargs):
msg = f"\n=== LLM Request ===\nPrompts: {prompts}\n"
self.logger.info(msg)
# print(f"Serialized: {serialized}") # Model configuration

def on_llm_end(self, response, **kwargs):
msg = f"\n=== LLM Response ===\nResponse: {response}\n"
self.logger.info(msg)

logger = InteractionLogger()


base_url = "xxx"
api_key = os.environ['API_TOKEN']
# model = "gpt-4o"



st.set_page_config(page_title="管家 ReAct agent", page_icon="🦜")
st.title("🦜 管家 ReAct agent")

llm_model = st.sidebar.selectbox(
"选择使用的大模型",
options=["gpt-4o", "hunyuan", "hunyuan-standrad", "qwen3-30b-a3b", "deepseek-r1-local","deepseek-v3-local"],
index=0,
# on_change=reset_connection_state
)
st.sidebar.write("当前选择的模型:", llm_model)


if "full_conversation" not in st.session_state:
st.session_state.full_conversation = []

if "session_id" not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())

# 初始化agent和checkpointer(只执行一次)
if "checkpointer" not in st.session_state:
st.session_state.checkpointer = InMemorySaver()

def show_single_msg(message):
print(f"Partial response [type: {type(message).__name__}]: {message}")
if isinstance(message, HumanMessage):
with st.chat_message("user"):
st.write(message.content)
elif isinstance(message, AIMessage) and message.content:
# 判断message finish_reason是否是stop
if hasattr(message, 'response_metadata') and 'finish_reason' in message.response_metadata and message.response_metadata['finish_reason'] == 'stop':
with st.chat_message("assistant"):
st.write(message.content)
else:
# with st.chat_message("assistant"):
st.markdown(f"🤔 **Thinking:** {message.content}")

if hasattr(message, 'tool_calls') and message.tool_calls: # Tool call message
print("ddTest, tool_calls message:", message)
for tool_call in message.tool_calls:
# Check if tool response already exists in conversation
tool_response = None
if isinstance(message, AIMessage):
for resp in st.session_state.full_conversation:
if isinstance(resp, ToolMessage) and resp.tool_call_id == tool_call['id']:
tool_response = resp.content
break

# For historical tool calls (response exists), show as completed
if tool_response:
with st.status(f"✅ 工具 '{tool_call['name']}' 执行完成", state="complete", expanded=True):
st.markdown("**工具参数:**")
st.json(tool_call['args'])
st.markdown("**执行结果:**")
if isinstance(tool_response, dict) or isinstance(tool_response, list):
st.json(tool_response)
else:
st.text(tool_response)
else:
# For new tool calls (no response yet), show as running
with st.status(f"🛠️ 调用工具 '{tool_call['name']}'...", state="running", expanded=True) as status:
st.markdown("**工具参数:**")
st.json(tool_call['args'])

result_placeholder = st.empty()
result_placeholder.info("⏳ 等待工具执行结果...")

# Store the status and placeholder for later updates
if 'pending_tool_calls' not in st.session_state:
st.session_state.pending_tool_calls = {}
st.session_state.pending_tool_calls[tool_call['id']] = {
'status': status,
'result_placeholder': result_placeholder,
'tool_name': tool_call['name']
}
# Check if this is a ToolMessage and update any pending tool calls
if isinstance(message, ToolMessage) and hasattr(st.session_state, 'pending_tool_calls'):
tool_call_id = getattr(message, 'tool_call_id', None)
if tool_call_id and tool_call_id in st.session_state.pending_tool_calls:
pending = st.session_state.pending_tool_calls[tool_call_id]
pending['status'].update(label=f"✅ 工具 '{pending['tool_name']}' 执行完成", state="complete")
pending['result_placeholder'].empty()
pending['result_placeholder'].markdown("**执行结果:**")
if isinstance(message.content, dict) or isinstance(message.content, list):
pending['result_placeholder'].json(message.content)
else:
pending['result_placeholder'].text(message.content)
del st.session_state.pending_tool_calls[tool_call_id]

def display_existing_conversation():
for msg in st.session_state.full_conversation:
show_single_msg(msg)


def add_message_to_conversation(msg):
st.session_state.full_conversation.append(msg)

show_single_msg(msg)


if __name__ == "__main__":
display_existing_conversation()

if prompt := st.chat_input(placeholder="中美gdp?"):
agent = create_react_agent(
model=llm_model, # (2)!
checkpointer=st.session_state.checkpointer,
tools=[search_weather, search_bing], # (3)!
# interrupt_before=["tools"],
prompt="你是一个有用的助手,调用工具之前必须先输出思考过程" # (4)!
# debug=True
)

cfg = RunnableConfig()
cfg["configurable"] = {"thread_id": st.session_state.session_id}
inputs = {"messages": [HumanMessage(content= prompt)]}


for s in agent.stream(inputs, config=cfg, stream_mode="values"):
message = s["messages"][-1]
# print(f"Partial response [type: {type(message).__name__}]: {message}")
add_message_to_conversation(message)



参考

  1. langGraph的中文指引(基础图使用) https://github.com/jurnea/LangGraph-Chinese
  2. LangSmith https://docs.smith.langchain.com/