Pi Agent Harness 源码剖析与理论学习

本文档结合项目官方设计文档(packages/agent/docs/)和源码实现,系统性地讲解 Agent Harness 工程的核心理论与实现方案。


目录


一、Harness 是什么

1.1 定义

在 Agent 工程中,Harness(直译"线束")是连接 LLM 与外部世界的编排层。它不是 LLM 本身,也不是单个工具,而是协调以下所有组件的中枢:

1
用户输入 → [Harness] → LLM → 工具执行 → 结果反馈 → 持久化 → 下一轮

1.2 Pi 中 Harness 的定位

引用项目官方文档:

AgentHarness is the orchestration layer above the low-level agent loop. It owns session persistence, runtime configuration, resource resolution, operation locking, and extension-facing mutation semantics.

翻译:AgentHarness 是位于底层 agent loop 之上的编排层。它负责会话持久化、运行时配置、资源解析、操作锁定、以及面向扩展的变更语义。

1.3 Harness 的核心职责

职责 说明
任务循环 驱动 LLM 调用 → 工具执行 → 下一轮的核心循环
工具调用 校验参数、执行工具、处理结果
上下文管理 消息转换、压缩、窗口管理
状态记录 运行时状态、阶段管理、快照
会话持久化 树形 session、JSONL 存储
可观测性 结构化事件流、Trace/Span
失败重试 指数退避、上下文溢出处理
扩展性 Hook 系统、工具注册、资源加载

二、分层架构总览

2.1 包层次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌────────────────────────────────────────────────────────────────┐
│ packages/coding-agent (应用层 CLI) │
│ - 交互模式 (Interactive/Print/RPC) │
│ - 应用级重试、UI渲染、扩展加载 │
│ - 具体工具实现 (bash/read/write/edit/grep/find/ls) │
└───────────────────────────────┬────────────────────────────────┘
│ 使用
┌───────────────────────────────┼────────────────────────────────┐
│ packages/agent (核心运行时) │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ AgentHarness (编排层) │ │
│ │ - session 管理、compaction、skill、hook emit │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Agent (有状态包装器) │ │
│ │ - 状态管理、队列、生命周期 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ agentLoop / runLoop (底层循环) │ │
│ │ - LLM 调用、工具执行、事件发射 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Session (树形会话) + Compaction (压缩) │ │
│ └─────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬────────────────────────────────┘
│ 使用
┌───────────────────────────────┼────────────────────────────────┐
│ packages/ai (LLM Provider 层) │
│ - streamSimple() / completeSimple() │
│ - 多 Provider: Anthropic, OpenAI, Google, Bedrock, Mistral │
└────────────────────────────────────────────────────────────────┘

2.2 调用入口链

1
2
3
4
5
6
7
用户输入
→ CLI 入口 (coding-agent/src/cli.ts)
→ main.ts (参数解析、runtime 创建)
→ AgentSession (应用级协调: retry + compaction trigger)
→ AgentHarness.prompt() (编排层)
→ runAgentLoop() (底层循环)
→ streamSimple() (LLM 调用)

三、任务循环 (Agent Loop)

源码位置: packages/agent/src/agent-loop.ts

3.1 理论:为什么需要双层循环

Agent 的任务循环需要解决几个核心问题:

  1. 工具执行后继续:LLM 调用工具后,需要把工具结果反馈给 LLM 继续推理
  2. 中途转向 (Steering):用户在 Agent 工作时想改变方向
  3. 任务续接 (Follow-up):Agent 完成当前任务后,检查是否有后续任务
  4. 优雅终止:在合适的时机停止循环

Pi 的解决方案是双层 while 循环

1
2
3
外层循环 ← follow-up 消息驱动(Agent 要停时检查是否有后续)
└─ 内层循环 ← tool calls + steering 消息驱动
└─ 每轮: LLM → 工具 → 检查终止条件

3.2 源码:runLoop 核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// packages/agent/src/agent-loop.ts 第 155-269 行

async function runLoop(
initialContext: AgentContext,
newMessages: AgentMessage[],
initialConfig: AgentLoopConfig,
signal: AbortSignal | undefined,
emit: AgentEventSink,
): Promise<void> {
let currentContext = initialContext;
let config = initialConfig;
let pendingMessages = await config.getSteeringMessages?.() || [];

// ═══ 外层循环:follow-up 驱动 ═══
while (true) {
let hasMoreToolCalls = true;

// ═══ 内层循环:tool calls + steering 驱动 ═══
while (hasMoreToolCalls || pendingMessages.length > 0) {
// 1️⃣ 注入 steering 消息
if (pendingMessages.length > 0) {
for (const message of pendingMessages) {
currentContext.messages.push(message);
newMessages.push(message);
}
pendingMessages = [];
}

// 2️⃣ 调用 LLM 获取 assistant 响应
const message = await streamAssistantResponse(
currentContext, config, signal, emit
);

// 3️⃣ 错误/中止检查
if (message.stopReason === "error" || message.stopReason === "aborted") {
await emit({ type: "agent_end", messages: newMessages });
return;
}

// 4️⃣ 执行工具调用
const toolCalls = message.content.filter(c => c.type === "toolCall");
hasMoreToolCalls = false;
if (toolCalls.length > 0) {
const batch = await executeToolCalls(...);
hasMoreToolCalls = !batch.terminate; // 所有工具都 terminate 才停
}

// 5️⃣ prepareNextTurn hook(可动态更新 context/model)
const snapshot = await config.prepareNextTurn?.(turnContext);
if (snapshot) {
currentContext = snapshot.context ?? currentContext;
config = { ...config, model: snapshot.model ?? config.model };
}

// 6️⃣ shouldStopAfterTurn 检查(优雅终止)
if (await config.shouldStopAfterTurn?.(turnContext)) return;

// 7️⃣ 拉取新的 steering 消息
pendingMessages = await config.getSteeringMessages?.() || [];
}

// ═══ 外层检查:follow-up 消息 ═══
const followUp = await config.getFollowUpMessages?.() || [];
if (followUp.length > 0) {
pendingMessages = followUp;
continue; // 回到内层循环继续处理
}
break; // 无后续消息,退出
}

await emit({ type: "agent_end", messages: newMessages });
}

3.3 入口函数对比

函数 用途 源码位置
agentLoop(prompts, context, config) 新对话:添加 prompt 后启动循环 第 31-54 行
agentLoopContinue(context, config) 续接:用于 retry,从当前上下文继续 第 64-93 行

两者都返回 EventStream<AgentEvent, AgentMessage[]>,支持流式消费。

3.4 设计要点

设计 作用
Steering 队列 用户在 Agent 执行工具时"转向",消息在下轮 LLM 调用前注入
Follow-up 队列 Agent 本来要停时检查是否有后续任务
prepareNextTurn 每轮结束后可动态更换 model/context/thinkingLevel
shouldStopAfterTurn 优雅终止(如上下文快满时提前停止)
取消信号 (AbortSignal) 贯穿所有异步操作,支持随时中止

四、工具调用 (Tool Calling)

源码位置: packages/agent/src/agent-loop.ts (执行),packages/agent/src/types.ts (类型)

4.1 理论:工具调用的生命周期

一个完整的工具调用经历 5 个阶段:

1
解析 → 校验 → 前置 Hook → 执行 → 后置 Hook → 结果写入

每个阶段都可能产生错误,且需要不同的处理策略。

4.2 工具定义接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// packages/agent/src/types.ts 第 361-384 行

interface AgentTool<TParameters, TDetails> {
name: string; // 工具唯一名
label: string; // UI 显示名
description: string; // LLM 可见的工具描述
parameters: TParameters; // JSON Schema 参数定义(TypeBox)

// 参数预处理(兼容性修补)
prepareArguments?: (args: unknown) => Static<TParameters>;

// 执行入口
execute: (
toolCallId: string,
params: Static<TParameters>,
signal?: AbortSignal,
onUpdate?: AgentToolUpdateCallback<TDetails>,
) => Promise<AgentToolResult<TDetails>>;

// 执行模式:"sequential" 强制顺序 | "parallel" 可并发
executionMode?: ToolExecutionMode;
}

4.3 源码:三阶段执行 Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 第一阶段:准备(第 562-626 行)
async function prepareToolCall(context, assistantMsg, toolCall, config, signal) {
// 1. 查找 tool 定义
const tool = context.tools?.find(t => t.name === toolCall.name);
if (!tool) return { kind: "immediate", result: errorResult("not found") };

// 2. 参数预处理
const preparedToolCall = prepareToolCallArguments(tool, toolCall);

// 3. JSON Schema 校验
const validatedArgs = validateToolArguments(tool, preparedToolCall);

// 4. beforeToolCall Hook(可 block 执行)
if (config.beforeToolCall) {
const beforeResult = await config.beforeToolCall({ toolCall, args, context });
if (beforeResult?.block) {
return { kind: "immediate", result: errorResult(beforeResult.reason) };
}
}

return { kind: "prepared", toolCall, tool, args: validatedArgs };
}

// 第二阶段:执行(第 628-663 行)
async function executePreparedToolCall(prepared, signal, emit) {
try {
const result = await prepared.tool.execute(
prepared.toolCall.id,
prepared.args,
signal,
(partialResult) => emit({ type: "tool_execution_update", ... }),
);
return { result, isError: false };
} catch (error) {
return { result: errorResult(error.message), isError: true };
}
}

// 第三阶段:定稿(第 665-708 行)
async function finalizeExecutedToolCall(...) {
let { result, isError } = executed;

// afterToolCall Hook(可修改结果)
if (config.afterToolCall) {
const patch = await config.afterToolCall({ toolCall, result, isError });
if (patch) {
result = { ...result, ...patch };
isError = patch.isError ?? isError;
}
}

return { toolCall, result, isError };
}

4.4 并行 vs 顺序执行

1
2
3
4
5
6
7
8
9
10
11
12
// 第 373-388 行
async function executeToolCalls(...) {
// 如果 batch 中有任何 tool 标记 "sequential",整个 batch 顺序执行
const hasSequentialTool = toolCalls.some(
tc => context.tools?.find(t => t.name === tc.name)?.executionMode === "sequential"
);

if (config.toolExecution === "sequential" || hasSequentialTool) {
return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);
}

并行执行的特殊处理(第 451-516 行):

  • prepareToolCall 阶段顺序执行(因为 beforeToolCall hook 可能有副作用)
  • 实际 execute 阶段并发执行
  • 结果按原始 toolCall 顺序发射消息

4.5 Terminate 机制

1
2
3
4
5
6
// 第 544-546 行
function shouldTerminateToolBatch(finalizedCalls) {
// 只有当 batch 中所有工具都设了 terminate 时才停止
return finalizedCalls.length > 0 &&
finalizedCalls.every(f => f.result.terminate === true);
}

设计意图:单个工具的 terminate: true 表示"我完成了",但只有所有工具都完成时才安全终止循环。


五、上下文管理 (Context Management)

5.1 理论:三层消息转换

Agent 内部使用扩展的 AgentMessage 类型,LLM 只理解标准的 Message 类型。需要分层转换:

1
2
3
4
5
6
7
AgentMessage[]          ← 内部表示,含自定义消息(notification、custom 等)
↓ transformContext(可选的预处理)
AgentMessage[] ← 裁剪/注入后
↓ convertToLlm(类型映射)
Message[] ← LLM 标准格式 (user/assistant/toolResult)
↓ 构建 Context
发送到 LLM Provider

5.2 源码:streamAssistantResponse 中的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// packages/agent/src/agent-loop.ts 第 275-368 行

async function streamAssistantResponse(context, config, signal, emit) {
// 第一层:AgentMessage → AgentMessage(裁剪/注入)
let messages = context.messages;
if (config.transformContext) {
messages = await config.transformContext(messages, signal);
}

// 第二层:AgentMessage → Message(类型映射)
const llmMessages = await config.convertToLlm(messages);

// 第三层:构建 LLM 上下文
const llmContext: Context = {
systemPrompt: context.systemPrompt,
messages: llmMessages,
tools: context.tools,
};

// 调用 LLM
const response = await streamFunction(config.model, llmContext, { ... });
// ... 处理流式响应
}

5.3 上下文压缩 (Compaction)

源码位置: packages/agent/src/harness/compaction/compaction.ts

为什么需要压缩? LLM 有上下文窗口限制。当对话变得太长时,需要将旧消息"压缩"为摘要。

压缩流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1. prepareCompaction(branchEntries, settings)
├─ 计算当前总 token 数
├─ 从后往前保留 keepRecentTokens 的消息
├─ 确定分割点 firstKeptEntryId
├─ 提取文件操作记录(read/write/edit)
└─ 返回 CompactionPreparation

2. compact(preparation, model, apiKey)
├─ 序列化要压缩的消息为纯文本
├─ 附加文件操作摘要
├─ 调用 LLM 生成总结
└─ 返回 { summary, firstKeptEntryId, tokensBefore }

3. session.appendCompaction(...)
└─ 写入 compaction entry 到 session tree

压缩后的上下文重建(session.ts 第 21-76 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
function buildSessionContext(pathEntries: SessionTreeEntry[]): SessionContext {
if (compaction) {
// 1. 放入压缩摘要作为第一条消息
messages.push(createCompactionSummaryMessage(compaction.summary));
// 2. 从 firstKeptEntryId 开始追加保留的消息
for (entry after firstKeptEntry) appendMessage(entry);
// 3. 追加压缩点之后的所有新消息
for (entry after compaction) appendMessage(entry);
} else {
// 无压缩:直接追加所有消息
for (entry of pathEntries) appendMessage(entry);
}
}

5.4 压缩配置

1
2
3
4
5
interface CompactionSettings {
enabled: boolean; // 是否启用
reserveTokens: number; // 预留给新回复的 token 数
keepRecentTokens: number; // 压缩时保留的最近消息 token 数
}

六、状态模型 (State Model)

引用官方文档(agent-harness.md):

The harness separates state into four categories.

6.1 四类状态

(1) Harness Config — 运行时配置

最新的配置,由应用或扩展设置:

配置项 说明
model 当前使用的 LLM 模型
thinkingLevel 思考级别 (off/minimal/low/medium/high/xhigh)
tools 注册的工具列表
activeToolNames 当前激活的工具名
resources Skills + Prompt Templates
streamOptions 请求选项 (transport/timeout/headers)
systemPrompt 系统提示词(或生成函数)

关键语义:Getter 返回最新配置,不是正在执行的 turn 快照。Setter 立即更新,影响下一轮 turn,不影响正在进行的 LLM 请求。

(2) Turn Snapshot — 轮次快照

每轮 LLM 调用前创建的不可变快照:

1
2
3
4
5
6
7
8
// agent-harness.ts 第 313-345 行
private async createTurnState() {
const context = await this.session.buildContext(); // 从 session 重建消息
const tools = [...this.tools.values()];
const activeTools = this.activeToolNames.map(...);
const systemPrompt = await this.systemPrompt?.({...}); // 解析系统提示词
return { messages, resources, streamOptions, sessionId, systemPrompt, model, tools, activeTools };
}

(3) Session — 持久化状态

只包含已持久化的 entries。Session 读取不包含排队中的写入。

(4) Pending Session Writes — 待写入队列

在操作进行中(turn/compaction 等)接受的写入请求会排队,在 save point 和 settlement 时刷新:

1
2
3
4
5
6
7
8
9
10
// 第 459-481 行
private async flushPendingSessionWrites() {
while (this.pendingSessionWrites.length > 0) {
const write = this.pendingSessionWrites[0]!;
if (write.type === "message") await this.session.appendMessage(write.message);
else if (write.type === "model_change") await this.session.appendModelChange(...);
// ... 其他类型
this.pendingSessionWrites.shift(); // 逐个处理,失败不丢失
}
}

6.2 操作阶段 (Phase)

1
type AgentHarnessPhase = "idle" | "turn" | "compaction" | "branch_summary" | "retry";

状态机规则

1
2
3
4
5
结构性操作要求 phase === "idle":
- prompt / skill / promptFromTemplate / compact / navigateTree

运行中允许的操作:
- steer / followUp / nextTurn / abort / 配置 setters

非 idle 时发起结构性操作会抛出 AgentHarnessError code "busy"

6.3 Save Point(保存点)

引用官方文档:

A save point occurs after an assistant turn and its tool-result messages have completed.

保存点时 Harness 做三件事:

  1. 刷新 pending session writes
  2. 创建新的 turn snapshot
  3. 将新的 context/model/thinkingLevel 应用到下一轮

设计意图:在 turn 执行期间的配置变更(如切换模型)不会干扰当前请求,只在保存点生效。


七、会话持久化 (Session & Memory)

源码位置: packages/agent/src/harness/session/session.ts

7.1 理论:为什么用树形结构

线性对话 = 只能前进。树形对话 = 可以分支、回溯、尝试不同路径。

1
2
3
4
5
6
7
    root
/ \
msg1 msg1' ← 用户尝试了不同的 prompt
| |
msg2 msg2'
|
msg3 ← leafId 指向当前活跃分支末端

这让 Agent 支持:

  • 分支尝试:从某个节点开始走不同路径
  • 回溯:回到之前的某个点继续
  • 会话摘要:切换分支时自动总结旧分支内容

7.2 Session Entry 类型

1
2
3
4
5
6
7
8
9
10
11
type SessionTreeEntry =
| MessageEntry // 对话消息 (user/assistant/toolResult)
| ModelChangeEntry // 模型切换记录
| ThinkingLevelChangeEntry // 思考级别变更
| CompactionEntry // 压缩摘要
| BranchSummaryEntry // 分支切换摘要
| CustomEntry // 自定义数据
| CustomMessageEntry // 自定义可显示消息
| LabelEntry // 节点标签
| SessionInfoEntry // 会话元信息
| LeafEntry; // 叶节点标记

每个 entry 的通用字段:

1
2
3
4
5
6
interface SessionTreeEntryBase {
type: string;
id: string; // 唯一 ID
parentId: string | null; // 父节点(null = 根)
timestamp: string; // ISO 时间戳
}

7.3 SessionStorage 接口

1
2
3
4
5
6
7
8
9
10
11
// 存储抽象(第 433-447 行 of types.ts)
interface SessionStorage<TMetadata> {
getMetadata(): Promise<TMetadata>;
getLeafId(): Promise<string | null>; // 当前活跃叶节点
setLeafId(leafId): Promise<void>; // 移动指针
createEntryId(): Promise<string>; // 生成唯一 ID
appendEntry(entry): Promise<void>; // 追加条目
getEntry(id): Promise<SessionTreeEntry>; // 按 ID 查询
getPathToRoot(leafId): Promise<SessionTreeEntry[]>; // 从叶到根的路径
getEntries(): Promise<SessionTreeEntry[]>; // 所有条目
}

实现

  • JsonlStorage — JSONL 文件持久化(生产用)
  • MemoryStorage — 内存存储(测试用)

7.4 关键操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Session 类(session.ts)

// 追加消息
async appendMessage(message: AgentMessage): Promise<string> {
return this.appendTypedEntry({
type: "message",
id: await this.storage.createEntryId(),
parentId: await this.storage.getLeafId(), // 链接到当前叶节点
timestamp: new Date().toISOString(),
message,
});
}

// 追加压缩
async appendCompaction(summary, firstKeptEntryId, tokensBefore, details) { ... }

// 分支导航
async moveTo(entryId, summary?) {
await this.storage.setLeafId(entryId); // 移动叶指针
if (summary) {
// 写入分支摘要条目
return this.appendTypedEntry({ type: "branch_summary", ... });
}
}

7.5 上下文重建

从树中重建线性上下文的算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function buildSessionContext(pathEntries: SessionTreeEntry[]): SessionContext {
// pathEntries 是从根到叶的路径(已排序)

// 1. 遍历跟踪 model/thinkingLevel 变更
for (entry of pathEntries) {
if (entry.type === "thinking_level_change") thinkingLevel = entry.thinkingLevel;
if (entry.type === "model_change") model = { provider, modelId };
if (entry.type === "compaction") compaction = entry; // 找最后一个压缩点
}

// 2. 根据有无压缩决定消息起点
if (compaction) {
messages.push(summaryMessage); // 先放摘要
// 从 firstKeptEntryId 开始追加
} else {
// 从头追加所有消息
}

return { messages, thinkingLevel, model };
}

7.6 LeafId 的持久化

引用官方文档:

Session storage implementations must persist leaf changes as leaf entries. setLeafId() is not an in-memory-only cursor update; it appends a durable entry.

这保证了重新打开 session 时能恢复到正确的分支位置。


八、执行日志与可观测性 (Observability)

设计文档: packages/agent/docs/observability.md

8.1 设计理念

引用官方文档:

Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry.

Pi 的核心原则:只发事件,不绑定具体 APM 框架

8.2 事件体系

底层 Agent 事件

1
2
3
4
5
6
7
8
9
10
11
12
// packages/agent/src/types.ts 第 403-418 行
type AgentEvent =
| { type: "agent_start" }
| { type: "agent_end"; messages: AgentMessage[] }
| { type: "turn_start" }
| { type: "turn_end"; message: AgentMessage; toolResults: ToolResultMessage[] }
| { type: "message_start"; message: AgentMessage }
| { type: "message_update"; message: AgentMessage; assistantMessageEvent }
| { type: "message_end"; message: AgentMessage }
| { type: "tool_execution_start"; toolCallId; toolName; args }
| { type: "tool_execution_update"; toolCallId; toolName; partialResult }
| { type: "tool_execution_end"; toolCallId; toolName; result; isError };

Harness 层扩展事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// types.ts 第 618-643 行
type AgentHarnessOwnEvent =
| QueueUpdateEvent // 队列变化
| SavePointEvent // 保存点
| AbortEvent // 中止
| SettledEvent // 运行结束
| BeforeAgentStartEvent // Agent 启动前
| ContextEvent // 上下文变换
| BeforeProviderRequestEvent // Provider 请求前
| BeforeProviderPayloadEvent // Payload 发送前
| AfterProviderResponseEvent // Provider 响应后
| ToolCallEvent // 工具调用 hook
| ToolResultEvent // 工具结果 hook
| SessionBeforeCompactEvent // 压缩前
| SessionCompactEvent // 压缩完成
| SessionBeforeTreeEvent // 树导航前
| SessionTreeEvent // 树导航完成
| ModelSelectEvent // 模型切换
| ThinkingLevelSelectEvent // 思考级别切换
| ResourcesUpdateEvent; // 资源更新

8.3 Trace/Span 模型

1
2
3
4
5
6
7
8
9
10
11
12
// observability.md 的设计
interface PiObservabilityEvent {
type: "start" | "end" | "error" | "event";
name: string; // e.g. "pi.agent.prompt", "pi.ai.provider.request"
traceId: string; // 一次用户操作的唯一 ID
spanId?: string; // 当前操作段 ID
parentSpanId?: string; // 父操作段
timestamp: number;
durationMs?: number;
context?: Record<string, unknown>; // 用户上下文
payload?: Record<string, unknown>; // 操作属性
}

示例 Trace 树:

1
2
3
4
5
traceId=t1 spanId=s1 parent=-  name=pi.agent.prompt
traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn
traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request
traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call
traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry

8.4 安全与脱敏

引用官方文档的分类:

安全(默认发送) 不安全(默认不发送)
provider/model/api 名 prompts/completions
session id tool args/results
entry type shell output
tool name file contents
status code request payloads
stop reason API keys/headers
token counts/costs/durations

8.5 订阅模式

1
2
3
4
5
6
7
8
// Agent 底层
agent.subscribe((event, signal) => { /* 所有 AgentEvent */ });

// AgentHarness(所有事件:Agent 事件 + Harness 自有事件)
harness.subscribe((event, signal) => { ... });

// AgentHarness(特定事件 hook,可返回结果影响执行)
harness.on("tool_call", (event) => { return { block: true }; });

九、失败重试 (Retry & Recovery)

9.1 两层重试机制

层级 位置 处理对象
Provider 层 packages/ai 网络超时、5xx、限流(内置指数退避)
应用层 packages/coding-agent/agent-session.ts LLM 过载、服务不可用

9.2 应用层重试配置

1
2
3
4
5
6
interface RetrySettings {
enabled?: boolean; // 默认 true
maxRetries?: number; // 默认 3
baseDelayMs?: number; // 默认 2000ms
provider?: ProviderRetrySettings;
}

9.3 重试流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Agent Run 结束

检查最后一条 assistant 消息的 stopReason

stopReason === "error" ?
├─ 是:检查 errorMessage 是否可重试
│ ├─ 服务器过载/限流/网络错误 → 可重试
│ └─ 上下文溢出 → 不可重试(需要 compaction)

↓ 可重试 && retryAttempt < maxRetries
执行重试:
1. retryAttempt++
2. delay = baseDelayMs × 2^(attempt-1) // 2s → 4s → 8s
3. 从 state 中移除错误消息
4. 发射 "auto_retry_start" 事件
5. await sleep(delay) // 可被 abort 中断
6. agent.continue() // 从当前上下文续接(不是重新 prompt)

9.4 上下文溢出 ≠ 重试

上下文溢出(context overflow)需要压缩而非重试。重试只会反复失败。正确的处理路径:

1
2
3
4
5
6
7
检测到 context overflow
↓ 不走重试
触发 compaction

压缩旧消息为摘要

用压缩后的上下文重新调用 LLM

9.5 中止 (Abort)

引用官方文档:

Abort does not clear nextTurn messages. Messages queued with nextTurn() survive abort.
Abort does not discard pending session writes.

1
2
3
4
5
6
7
8
9
10
11
12
13
// agent-harness.ts 第 936-963 行
async abort(): Promise<AbortResult> {
// 1. 清空 steer 和 followUp 队列
this.steerQueue = [];
this.followUpQueue = [];
// 2. 触发 AbortController
this.runAbortController?.abort();
// 3. 等待 idle
await this.waitForIdle();
// 4. 发射 abort 事件
await this.emitOwn({ type: "abort", clearedSteer, clearedFollowUp });
return { clearedSteer, clearedFollowUp };
}

十、Hook 系统 (Hooks & Extensions)

设计文档: packages/agent/docs/hooks.md

10.1 理论:Hook 的核心模型

Pi 的 Hook 系统基于一个简洁的设计:事件自带结果类型

1
2
3
4
5
6
7
8
9
10
// 幽灵类型标记事件的结果
declare const HookResult: unique symbol;

interface HookEvent<TType extends string, TResult = void> {
type: TType;
readonly [HookResult]?: TResult; // 类型级标记,运行时不存在
}

// 从事件类型推导结果类型
type ResultOf<E> = E extends { readonly [HookResult]?: infer R } ? R : void;

两种角色

  • observe(handler) — 观察者:看所有事件,只读,返回值被忽略
  • on(type, handler) — 参与者:参与特定事件的语义,可返回结果

10.2 Hook 接口

1
2
3
4
5
6
7
8
9
10
11
12
interface AgentHarnessHooks<E, Ctx> {
context: Ctx; // 共享上下文
setContext(ctx: Ctx): void; // 更新上下文

observe(handler): () => void; // 注册观察者
on(type, handler): () => void; // 注册特定事件处理器
emit(event, signal?): Promise<ResultOf<E>>; // Harness 调用

addCleanup(cleanup): () => void; // 注册清理函数
clear(): Promise<void>; // 移除所有 handler + 运行清理
dispose(): Promise<void>; // 最终清理
}

10.3 不同事件的归约语义

事件类型 归约方式 说明
context 链式变换 每个 handler 看到前一个的修改结果
before_provider_request 顺序 patch 每个 handler 可 patch streamOptions
before_provider_payload 顺序变换 每个 handler 可修改 payload
before_agent_start 收集合并 收集注入消息 + 链式修改 systemPrompt
tool_call 早退阻塞 任一 handler 返回 { block: true } 则停止
tool_result 顺序 patch 每个 handler 看到前一个 patch 后的结果
session_before_compact/tree 早退取消 任一 handler 返回 { cancel: true } 则取消
其他(message_end 等) 纯观察 返回值忽略

10.4 Harness 与 Hook 的交互

Harness 只做一件事:

1
2
3
// Harness 内部
const result = await this.hooks.emit({ type: "context", messages }, signal);
return result?.messages ?? messages; // 使用 hook 结果或保持原样

Harness 存储 handler、不做链式处理、不知道扩展策略。这些都由 Hooks 实现负责。

10.5 当前实现(简化版)

目前 AgentHarness 使用 Map<string, Set<Handler>> 存储 handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// agent-harness.ts
private handlers = new Map<string, Set<AgentHarnessHandler>>();

// 注册
on(type, handler) {
let handlers = this.handlers.get(type);
if (!handlers) { handlers = new Set(); this.handlers.set(type, handlers); }
handlers.add(handler);
return () => handlers.delete(handler);
}

// 发射
private async emitHook(event) {
const handlers = this.getHandlers(event.type);
for (const handler of handlers) {
const result = await handler(event);
// ... 根据事件类型做不同归约
}
}

十一、持久化恢复设计 (Durable Harness)

设计文档: packages/agent/docs/durable-harness.md

11.1 半持久化的务实选择

引用官方文档:

A fully durable AgentHarness is not realistic by itself because important dependencies are runtime JS supplied by the host app.

完全持久化不可行的原因:工具实现、模型认证、扩展、Hook handler 都是运行时 JS 代码,无法序列化。

实际目标:半持久化(semi-durable)

  • Session 是持久的 append-only 状态树
  • Harness 将自己拥有的状态写入 session entries
  • 宿主应用负责在恢复时重建不可持久化的依赖
  • 恢复从持久边界重启,不从 in-flight 的 LLM 流中恢复

11.2 恢复流程

1
2
3
4
5
6
7
8
9
10
11
12
启动恢复:
1. 宿主注册 tools/models/extensions/resources/auth/hooks
2. Harness 打开 session
3. 从 session entries 归约出:
- 当前 leaf
- 对话分支
- harness 配置
- 队列
- pending writes
- 活跃操作/turn/tool 状态
4. 验证必需的运行时依赖
5. 协调未完成的操作状态

11.3 恢复策略

场景 保守策略
未完成的 agent turn 标记中断,保留队列/pending writes,回到 idle
未完成的 provider 请求 标记中断,不自动重试
未完成的 tool call 追加错误结果,除非工具声明幂等可重试
未完成的 compaction 如果无 compaction entry 存在则重新运行
未完成的分支导航 补充缺失的 summary/leaf entries

11.4 关键不变式

引用官方文档的关键场景:

Crash after queue_enqueued: message is restored.
Consumed queue IDs must be recorded in turn_started before they are considered consumed.
Deterministic target entry IDs let recovery detect the entry already exists.


十二、测试与评测

12.1 测试结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
packages/agent/test/
├── agent-loop.test.ts // 核心循环
├── harness/
│ ├── agent-harness.test.ts // Harness 生命周期
│ ├── agent-harness-stream.test.ts // Provider/stream 行为
│ ├── compaction.test.ts // 压缩逻辑
│ └── session.test.ts // Session 树操作

packages/coding-agent/test/
├── agent-session-retry.test.ts // 重试逻辑
├── tools.test.ts // 工具执行
├── compaction*.test.ts // 应用层压缩
├── extensions-*.test.ts // 扩展系统
├── print-mode.test.ts // 打印模式
├── rpc*.test.ts // RPC 模式
└── suite/ // 集成测试套件
├── harness.ts // 测试 harness 工具
└── regressions/ // 回归测试

12.2 Faux Provider(模拟 LLM)

测试使用 registerFauxProvider + fauxAssistantMessage 构建确定性的 LLM 响应:

1
2
3
4
5
6
7
// 无需真实 API key,完全可控
registerFauxProvider("test", (context) => {
return fauxAssistantMessage({
content: [{ type: "text", text: "Hello" }],
// 可以预设 toolCall 响应来测试工具调用链
});
});

12.3 运行测试

1
2
3
4
5
6
7
8
9
10
# 所有非 e2e 测试
./test.sh

# Harness 专项
npm run test:harness
npm run coverage:harness

# 特定文件
cd packages/agent
node ../../node_modules/vitest/dist/cli.js --run test/agent-loop.test.ts

附录:源码文件索引

packages/agent/src/(核心运行时)

文件 行数 职责
agent-loop.ts 743 核心循环:runLoop、streamAssistantResponse、executeToolCalls
agent.ts 557 Agent 类:有状态包装器、队列、生命周期管理
types.ts 419 核心类型:AgentMessage、AgentTool、AgentEvent、AgentLoopConfig
harness/agent-harness.ts 996 AgentHarness 类:编排层,集成 session/compaction/hook
harness/types.ts 816 Harness 类型:事件、hook 结果、error、session entry 定义
harness/session/session.ts 253 Session 类:树形会话、上下文重建、分支导航
harness/compaction/compaction.ts ~200 压缩引擎:prepareCompaction、compact
harness/compaction/utils.ts - 压缩工具:序列化、文件操作提取
harness/messages.ts - 消息转换:convertToLlm、自定义消息创建
harness/skills.ts - Skill 系统:格式化 skill 调用
harness/system-prompt.ts - 系统提示词构建

packages/agent/docs/(设计文档)

文件 内容
agent-harness.md AgentHarness 生命周期、状态模型、操作阶段、实现 TODO
durable-harness.md 半持久化设计:恢复模型、关键场景、最小可行实现
hooks.md Hook 系统设计:核心模型、归约语义、扩展加载
observability.md 可观测性设计:事件模型、Trace/Span、安全脱敏

packages/coding-agent/src/core/(应用层)

文件/目录 职责
agent-session.ts 应用级协调(2500+ 行):retry、compaction trigger、model 管理
tools/ 具体工具实现:bash、read、write、edit、grep、find、ls
extensions/ 扩展系统:加载、运行、API
session-manager.ts 会话管理:创建、恢复、fork、列表

学习路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
第 1 步:理解类型
└─ 读 packages/agent/src/types.ts
重点:AgentMessage、AgentTool、AgentEvent、AgentLoopConfig

第 2 步:掌握核心循环
└─ 读 packages/agent/src/agent-loop.ts 的 runLoop 函数
重点:双层循环结构、steering/follow-up、工具执行

第 3 步:理解状态管理
└─ 读 packages/agent/src/agent.ts 的 Agent 类
重点:state 管理、队列、runWithLifecycle

第 4 步:学习持久化
└─ 读 packages/agent/src/harness/session/session.ts
重点:树形结构、buildSessionContext、moveTo

第 5 步:理解压缩
└─ 读 packages/agent/src/harness/compaction/compaction.ts
重点:prepareCompaction、compact 函数

第 6 步:掌握编排层
└─ 读 packages/agent/src/harness/agent-harness.ts
重点:createTurnState、executeTurn、save point、hooks

第 7 步:对照设计文档
└─ 读 packages/agent/docs/ 下的四个文件
理解设计意图、未来方向、权衡取舍

第 8 步:看应用层整合
└─ 读 packages/coding-agent/src/core/agent-session.ts(选读)
理解 retry 逻辑、UI 集成、具体工具实现