Pi Agent Harness 源码剖析与理论学习
本文档结合项目官方设计文档(packages/agent/docs/)和源码实现,系统性地讲解 Agent Harness 工程的核心理论与实现方案。
目录
一、Harness 是什么
1.1 定义
在 Agent 工程中,Harness (直译"线束")是连接 LLM 与外部世界的编排层 。它不是 LLM 本身,也不是单个工具,而是协调以下所有组件的中枢:
1 用户输入 → [Harness] → LLM → 工具执行 → 结果反馈 → 持久化 → 下一轮
1.2 Pi 中 Harness 的定位
引用项目官方文档:
AgentHarness is the orchestration layer above the low-level agent loop. It owns session persistence, runtime configuration, resource resolution, operation locking, and extension-facing mutation semantics.
翻译:AgentHarness 是位于底层 agent loop 之上的编排层 。它负责会话持久化、运行时配置、资源解析、操作锁定、以及面向扩展的变更语义。
1.3 Harness 的核心职责
职责
说明
任务循环
驱动 LLM 调用 → 工具执行 → 下一轮的核心循环
工具调用
校验参数、执行工具、处理结果
上下文管理
消息转换、压缩、窗口管理
状态记录
运行时状态、阶段管理、快照
会话持久化
树形 session、JSONL 存储
可观测性
结构化事件流、Trace/Span
失败重试
指数退避、上下文溢出处理
扩展性
Hook 系统、工具注册、资源加载
二、分层架构总览
2.1 包层次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ┌────────────────────────────────────────────────────────────────┐ │ packages/coding-agent (应用层 CLI) │ │ - 交互模式 (Interactive/Print/RPC) │ │ - 应用级重试、UI渲染、扩展加载 │ │ - 具体工具实现 (bash/read/write/edit/grep/find/ls) │ └───────────────────────────────┬────────────────────────────────┘ │ 使用 ┌───────────────────────────────┼────────────────────────────────┐ │ packages/agent (核心运行时) │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ AgentHarness (编排层) │ │ │ │ - session 管理、compaction、skill、hook emit │ │ │ └─────────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Agent (有状态包装器) │ │ │ │ - 状态管理、队列、生命周期 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ agentLoop / runLoop (底层循环) │ │ │ │ - LLM 调用、工具执行、事件发射 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Session (树形会话) + Compaction (压缩) │ │ │ └─────────────────────────────────────────────────────────┘ │ └───────────────────────────────┬────────────────────────────────┘ │ 使用 ┌───────────────────────────────┼────────────────────────────────┐ │ packages/ai (LLM Provider 层) │ │ - streamSimple() / completeSimple() │ │ - 多 Provider: Anthropic, OpenAI, Google, Bedrock, Mistral │ └────────────────────────────────────────────────────────────────┘
2.2 调用入口链
1 2 3 4 5 6 7 用户输入 → CLI 入口 (coding-agent/src/cli.ts) → main.ts (参数解析、runtime 创建) → AgentSession (应用级协调: retry + compaction trigger) → AgentHarness.prompt() (编排层) → runAgentLoop() (底层循环) → streamSimple() (LLM 调用)
三、任务循环 (Agent Loop)
源码位置: packages/agent/src/agent-loop.ts
3.1 理论:为什么需要双层循环
Agent 的任务循环需要解决几个核心问题:
工具执行后继续 :LLM 调用工具后,需要把工具结果反馈给 LLM 继续推理
中途转向 (Steering) :用户在 Agent 工作时想改变方向
任务续接 (Follow-up) :Agent 完成当前任务后,检查是否有后续任务
优雅终止 :在合适的时机停止循环
Pi 的解决方案是双层 while 循环 :
1 2 3 外层循环 ← follow-up 消息驱动(Agent 要停时检查是否有后续) └─ 内层循环 ← tool calls + steering 消息驱动 └─ 每轮: LLM → 工具 → 检查终止条件
3.2 源码:runLoop 核心实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 async function runLoop ( initialContext: AgentContext, newMessages: AgentMessage[], initialConfig: AgentLoopConfig, signal: AbortSignal | undefined , emit: AgentEventSink, ): Promise <void > { let currentContext = initialContext; let config = initialConfig; let pendingMessages = await config.getSteeringMessages ?.() || []; while (true ) { let hasMoreToolCalls = true ; while (hasMoreToolCalls || pendingMessages.length > 0 ) { if (pendingMessages.length > 0 ) { for (const message of pendingMessages) { currentContext.messages .push (message); newMessages.push (message); } pendingMessages = []; } const message = await streamAssistantResponse ( currentContext, config, signal, emit ); if (message.stopReason === "error" || message.stopReason === "aborted" ) { await emit ({ type : "agent_end" , messages : newMessages }); return ; } const toolCalls = message.content .filter (c => c.type === "toolCall" ); hasMoreToolCalls = false ; if (toolCalls.length > 0 ) { const batch = await executeToolCalls (...); hasMoreToolCalls = !batch.terminate ; } const snapshot = await config.prepareNextTurn ?.(turnContext); if (snapshot) { currentContext = snapshot.context ?? currentContext; config = { ...config, model : snapshot.model ?? config.model }; } if (await config.shouldStopAfterTurn ?.(turnContext)) return ; pendingMessages = await config.getSteeringMessages ?.() || []; } const followUp = await config.getFollowUpMessages ?.() || []; if (followUp.length > 0 ) { pendingMessages = followUp; continue ; } break ; } await emit ({ type : "agent_end" , messages : newMessages }); }
3.3 入口函数对比
函数
用途
源码位置
agentLoop(prompts, context, config)
新对话:添加 prompt 后启动循环
第 31-54 行
agentLoopContinue(context, config)
续接:用于 retry,从当前上下文继续
第 64-93 行
两者都返回 EventStream<AgentEvent, AgentMessage[]>,支持流式消费。
3.4 设计要点
设计
作用
Steering 队列
用户在 Agent 执行工具时"转向",消息在下轮 LLM 调用前注入
Follow-up 队列
Agent 本来要停时检查是否有后续任务
prepareNextTurn
每轮结束后可动态更换 model/context/thinkingLevel
shouldStopAfterTurn
优雅终止(如上下文快满时提前停止)
取消信号 (AbortSignal)
贯穿所有异步操作,支持随时中止
源码位置: packages/agent/src/agent-loop.ts (执行),packages/agent/src/types.ts (类型)
4.1 理论:工具调用的生命周期
一个完整的工具调用经历 5 个阶段:
1 解析 → 校验 → 前置 Hook → 执行 → 后置 Hook → 结果写入
每个阶段都可能产生错误,且需要不同的处理策略。
4.2 工具定义接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 interface AgentTool <TParameters , TDetails > { name : string ; label : string ; description : string ; parameters : TParameters ; prepareArguments?: (args: unknown ) => Static <TParameters >; execute : ( toolCallId: string , params: Static<TParameters>, signal?: AbortSignal, onUpdate?: AgentToolUpdateCallback<TDetails>, ) => Promise <AgentToolResult <TDetails >>; executionMode?: ToolExecutionMode ; }
4.3 源码:三阶段执行 Pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 async function prepareToolCall (context, assistantMsg, toolCall, config, signal ) { const tool = context.tools ?.find (t => t.name === toolCall.name ); if (!tool) return { kind : "immediate" , result : errorResult ("not found" ) }; const preparedToolCall = prepareToolCallArguments (tool, toolCall); const validatedArgs = validateToolArguments (tool, preparedToolCall); if (config.beforeToolCall ) { const beforeResult = await config.beforeToolCall ({ toolCall, args, context }); if (beforeResult?.block ) { return { kind : "immediate" , result : errorResult (beforeResult.reason ) }; } } return { kind : "prepared" , toolCall, tool, args : validatedArgs }; } async function executePreparedToolCall (prepared, signal, emit ) { try { const result = await prepared.tool .execute ( prepared.toolCall .id , prepared.args , signal, (partialResult ) => emit ({ type : "tool_execution_update" , ... }), ); return { result, isError : false }; } catch (error) { return { result : errorResult (error.message ), isError : true }; } } async function finalizeExecutedToolCall (... ) { let { result, isError } = executed; if (config.afterToolCall ) { const patch = await config.afterToolCall ({ toolCall, result, isError }); if (patch) { result = { ...result, ...patch }; isError = patch.isError ?? isError; } } return { toolCall, result, isError }; }
4.4 并行 vs 顺序执行
1 2 3 4 5 6 7 8 9 10 11 12 async function executeToolCalls (... ) { const hasSequentialTool = toolCalls.some ( tc => context.tools ?.find (t => t.name === tc.name )?.executionMode === "sequential" ); if (config.toolExecution === "sequential" || hasSequentialTool) { return executeToolCallsSequential (...); } return executeToolCallsParallel (...); }
并行执行的特殊处理 (第 451-516 行):
prepareToolCall 阶段顺序执行(因为 beforeToolCall hook 可能有副作用)
实际 execute 阶段并发执行
结果按原始 toolCall 顺序发射消息
4.5 Terminate 机制
1 2 3 4 5 6 function shouldTerminateToolBatch (finalizedCalls ) { return finalizedCalls.length > 0 && finalizedCalls.every (f => f.result .terminate === true ); }
设计意图:单个工具的 terminate: true 表示"我完成了",但只有所有工具都完成时才安全终止循环。
五、上下文管理 (Context Management)
5.1 理论:三层消息转换
Agent 内部使用扩展的 AgentMessage 类型,LLM 只理解标准的 Message 类型。需要分层转换:
1 2 3 4 5 6 7 AgentMessage[] ← 内部表示,含自定义消息(notification、custom 等) ↓ transformContext(可选的预处理) AgentMessage[] ← 裁剪/注入后 ↓ convertToLlm(类型映射) Message[] ← LLM 标准格式 (user/assistant/toolResult) ↓ 构建 Context 发送到 LLM Provider
5.2 源码:streamAssistantResponse 中的转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 async function streamAssistantResponse (context, config, signal, emit ) { let messages = context.messages ; if (config.transformContext ) { messages = await config.transformContext (messages, signal); } const llmMessages = await config.convertToLlm (messages); const llmContext : Context = { systemPrompt : context.systemPrompt , messages : llmMessages, tools : context.tools , }; const response = await streamFunction (config.model , llmContext, { ... }); }
5.3 上下文压缩 (Compaction)
源码位置: packages/agent/src/harness/compaction/compaction.ts
为什么需要压缩? LLM 有上下文窗口限制。当对话变得太长时,需要将旧消息"压缩"为摘要。
压缩流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1. prepareCompaction(branchEntries, settings) ├─ 计算当前总 token 数 ├─ 从后往前保留 keepRecentTokens 的消息 ├─ 确定分割点 firstKeptEntryId ├─ 提取文件操作记录(read/write/edit) └─ 返回 CompactionPreparation 2. compact(preparation, model, apiKey) ├─ 序列化要压缩的消息为纯文本 ├─ 附加文件操作摘要 ├─ 调用 LLM 生成总结 └─ 返回 { summary, firstKeptEntryId, tokensBefore } 3. session.appendCompaction(...) └─ 写入 compaction entry 到 session tree
压缩后的上下文重建 (session.ts 第 21-76 行):
1 2 3 4 5 6 7 8 9 10 11 12 13 function buildSessionContext (pathEntries: SessionTreeEntry[] ): SessionContext { if (compaction) { messages.push (createCompactionSummaryMessage (compaction.summary )); for (entry after firstKeptEntry) appendMessage (entry); for (entry after compaction) appendMessage (entry); } else { for (entry of pathEntries) appendMessage (entry); } }
5.4 压缩配置
1 2 3 4 5 interface CompactionSettings { enabled : boolean ; reserveTokens : number ; keepRecentTokens : number ; }
六、状态模型 (State Model)
引用官方文档(agent-harness.md):
The harness separates state into four categories.
6.1 四类状态
(1) Harness Config — 运行时配置
最新的配置,由应用或扩展设置:
配置项
说明
model
当前使用的 LLM 模型
thinkingLevel
思考级别 (off/minimal/low/medium/high/xhigh)
tools
注册的工具列表
activeToolNames
当前激活的工具名
resources
Skills + Prompt Templates
streamOptions
请求选项 (transport/timeout/headers)
systemPrompt
系统提示词(或生成函数)
关键语义 :Getter 返回最新配置,不是正在执行的 turn 快照。Setter 立即更新,影响下一轮 turn,不影响正在进行的 LLM 请求。
(2) Turn Snapshot — 轮次快照
每轮 LLM 调用前创建的不可变快照:
1 2 3 4 5 6 7 8 private async createTurnState ( ) { const context = await this .session .buildContext (); const tools = [...this .tools .values ()]; const activeTools = this .activeToolNames .map (...); const systemPrompt = await this .systemPrompt ?.({...}); return { messages, resources, streamOptions, sessionId, systemPrompt, model, tools, activeTools }; }
(3) Session — 持久化状态
只包含已持久化的 entries。Session 读取不包含排队中的写入。
(4) Pending Session Writes — 待写入队列
在操作进行中(turn/compaction 等)接受的写入请求会排队,在 save point 和 settlement 时刷新:
1 2 3 4 5 6 7 8 9 10 private async flushPendingSessionWrites ( ) { while (this .pendingSessionWrites .length > 0 ) { const write = this .pendingSessionWrites [0 ]!; if (write.type === "message" ) await this .session .appendMessage (write.message ); else if (write.type === "model_change" ) await this .session .appendModelChange (...); this .pendingSessionWrites .shift (); } }
6.2 操作阶段 (Phase)
1 type AgentHarnessPhase = "idle" | "turn" | "compaction" | "branch_summary" | "retry" ;
状态机规则 :
1 2 3 4 5 结构性操作要求 phase === "idle": - prompt / skill / promptFromTemplate / compact / navigateTree 运行中允许的操作: - steer / followUp / nextTurn / abort / 配置 setters
非 idle 时发起结构性操作会抛出 AgentHarnessError code "busy"。
6.3 Save Point(保存点)
引用官方文档:
A save point occurs after an assistant turn and its tool-result messages have completed.
保存点时 Harness 做三件事:
刷新 pending session writes
创建新的 turn snapshot
将新的 context/model/thinkingLevel 应用到下一轮
设计意图 :在 turn 执行期间的配置变更(如切换模型)不会干扰当前请求,只在保存点生效。
七、会话持久化 (Session & Memory)
源码位置: packages/agent/src/harness/session/session.ts
7.1 理论:为什么用树形结构
线性对话 = 只能前进。树形对话 = 可以分支、回溯、尝试不同路径。
1 2 3 4 5 6 7 root / \ msg1 msg1' ← 用户尝试了不同的 prompt | | msg2 msg2' | msg3 ← leafId 指向当前活跃分支末端
这让 Agent 支持:
分支尝试 :从某个节点开始走不同路径
回溯 :回到之前的某个点继续
会话摘要 :切换分支时自动总结旧分支内容
7.2 Session Entry 类型
1 2 3 4 5 6 7 8 9 10 11 type SessionTreeEntry = | MessageEntry | ModelChangeEntry | ThinkingLevelChangeEntry | CompactionEntry | BranchSummaryEntry | CustomEntry | CustomMessageEntry | LabelEntry | SessionInfoEntry | LeafEntry ;
每个 entry 的通用字段:
1 2 3 4 5 6 interface SessionTreeEntryBase { type : string ; id : string ; parentId : string | null ; timestamp : string ; }
7.3 SessionStorage 接口
1 2 3 4 5 6 7 8 9 10 11 interface SessionStorage <TMetadata > { getMetadata (): Promise <TMetadata >; getLeafId (): Promise <string | null >; setLeafId (leafId): Promise <void >; createEntryId (): Promise <string >; appendEntry (entry): Promise <void >; getEntry (id): Promise <SessionTreeEntry >; getPathToRoot (leafId): Promise <SessionTreeEntry []>; getEntries (): Promise <SessionTreeEntry []>; }
实现 :
JsonlStorage — JSONL 文件持久化(生产用)
MemoryStorage — 内存存储(测试用)
7.4 关键操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 async appendMessage (message : AgentMessage ): Promise <string > { return this .appendTypedEntry ({ type : "message" , id : await this .storage .createEntryId (), parentId : await this .storage .getLeafId (), timestamp : new Date ().toISOString (), message, }); } async appendCompaction (summary, firstKeptEntryId, tokensBefore, details ) { ... }async moveTo (entryId, summary? ) { await this .storage .setLeafId (entryId); if (summary) { return this .appendTypedEntry ({ type : "branch_summary" , ... }); } }
7.5 上下文重建
从树中重建线性上下文的算法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 function buildSessionContext (pathEntries: SessionTreeEntry[] ): SessionContext { for (entry of pathEntries) { if (entry.type === "thinking_level_change" ) thinkingLevel = entry.thinkingLevel ; if (entry.type === "model_change" ) model = { provider, modelId }; if (entry.type === "compaction" ) compaction = entry; } if (compaction) { messages.push (summaryMessage); } else { } return { messages, thinkingLevel, model }; }
7.6 LeafId 的持久化
引用官方文档:
Session storage implementations must persist leaf changes as leaf entries. setLeafId() is not an in-memory-only cursor update; it appends a durable entry.
这保证了重新打开 session 时能恢复到正确的分支位置。
八、执行日志与可观测性 (Observability)
设计文档: packages/agent/docs/observability.md
8.1 设计理念
引用官方文档:
Pi should emit stable, structured lifecycle events. External listeners can convert those events into OTel spans, Sentry spans, logs, metrics, or custom telemetry.
Pi 的核心原则:只发事件,不绑定具体 APM 框架 。
8.2 事件体系
底层 Agent 事件
1 2 3 4 5 6 7 8 9 10 11 12 type AgentEvent = | { type : "agent_start" } | { type : "agent_end" ; messages : AgentMessage [] } | { type : "turn_start" } | { type : "turn_end" ; message : AgentMessage ; toolResults : ToolResultMessage [] } | { type : "message_start" ; message : AgentMessage } | { type : "message_update" ; message : AgentMessage ; assistantMessageEvent } | { type : "message_end" ; message : AgentMessage } | { type : "tool_execution_start" ; toolCallId; toolName; args } | { type : "tool_execution_update" ; toolCallId; toolName; partialResult } | { type : "tool_execution_end" ; toolCallId; toolName; result; isError };
Harness 层扩展事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type AgentHarnessOwnEvent = | QueueUpdateEvent | SavePointEvent | AbortEvent | SettledEvent | BeforeAgentStartEvent | ContextEvent | BeforeProviderRequestEvent | BeforeProviderPayloadEvent | AfterProviderResponseEvent | ToolCallEvent | ToolResultEvent | SessionBeforeCompactEvent | SessionCompactEvent | SessionBeforeTreeEvent | SessionTreeEvent | ModelSelectEvent | ThinkingLevelSelectEvent | ResourcesUpdateEvent ;
8.3 Trace/Span 模型
1 2 3 4 5 6 7 8 9 10 11 12 interface PiObservabilityEvent { type : "start" | "end" | "error" | "event" ; name : string ; traceId : string ; spanId?: string ; parentSpanId?: string ; timestamp : number ; durationMs?: number ; context?: Record <string , unknown >; payload?: Record <string , unknown >; }
示例 Trace 树:
1 2 3 4 5 traceId=t1 spanId=s1 parent=- name=pi.agent.prompt traceId=t1 spanId=s2 parent=s1 name=pi.agent.turn traceId=t1 spanId=s3 parent=s2 name=pi.ai.provider.request traceId=t1 spanId=s4 parent=s2 name=pi.agent.tool_call traceId=t1 spanId=s5 parent=s4 name=pi.session.append_entry
8.4 安全与脱敏
引用官方文档的分类:
安全(默认发送)
不安全(默认不发送)
provider/model/api 名
prompts/completions
session id
tool args/results
entry type
shell output
tool name
file contents
status code
request payloads
stop reason
API keys/headers
token counts/costs/durations
8.5 订阅模式
1 2 3 4 5 6 7 8 agent.subscribe ((event, signal ) => { }); harness.subscribe ((event, signal ) => { ... }); harness.on ("tool_call" , (event ) => { return { block : true }; });
九、失败重试 (Retry & Recovery)
9.1 两层重试机制
层级
位置
处理对象
Provider 层
packages/ai
网络超时、5xx、限流(内置指数退避)
应用层
packages/coding-agent/agent-session.ts
LLM 过载、服务不可用
9.2 应用层重试配置
1 2 3 4 5 6 interface RetrySettings { enabled?: boolean ; maxRetries?: number ; baseDelayMs?: number ; provider?: ProviderRetrySettings ; }
9.3 重试流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Agent Run 结束 ↓ 检查最后一条 assistant 消息的 stopReason ↓ stopReason === "error" ? ├─ 是:检查 errorMessage 是否可重试 │ ├─ 服务器过载/限流/网络错误 → 可重试 │ └─ 上下文溢出 → 不可重试(需要 compaction) │ ↓ 可重试 && retryAttempt < maxRetries 执行重试: 1. retryAttempt++ 2. delay = baseDelayMs × 2^(attempt-1) // 2s → 4s → 8s 3. 从 state 中移除错误消息 4. 发射 "auto_retry_start" 事件 5. await sleep(delay) // 可被 abort 中断 6. agent.continue() // 从当前上下文续接(不是重新 prompt)
9.4 上下文溢出 ≠ 重试
上下文溢出(context overflow)需要压缩 而非重试。重试只会反复失败。正确的处理路径:
1 2 3 4 5 6 7 检测到 context overflow ↓ 不走重试 触发 compaction ↓ 压缩旧消息为摘要 ↓ 用压缩后的上下文重新调用 LLM
9.5 中止 (Abort)
引用官方文档:
Abort does not clear nextTurn messages. Messages queued with nextTurn() survive abort.
Abort does not discard pending session writes.
1 2 3 4 5 6 7 8 9 10 11 12 13 async abort (): Promise <AbortResult > { this .steerQueue = []; this .followUpQueue = []; this .runAbortController ?.abort (); await this .waitForIdle (); await this .emitOwn ({ type : "abort" , clearedSteer, clearedFollowUp }); return { clearedSteer, clearedFollowUp }; }
十、Hook 系统 (Hooks & Extensions)
设计文档: packages/agent/docs/hooks.md
10.1 理论:Hook 的核心模型
Pi 的 Hook 系统基于一个简洁的设计:事件自带结果类型 。
1 2 3 4 5 6 7 8 9 10 declare const HookResult : unique symbol ;interface HookEvent <TType extends string , TResult = void > { type : TType ; readonly [HookResult ]?: TResult ; } type ResultOf <E> = E extends { readonly [HookResult ]?: infer R } ? R : void ;
两种角色 :
observe(handler) — 观察者:看所有事件,只读,返回值被忽略
on(type, handler) — 参与者:参与特定事件的语义,可返回结果
10.2 Hook 接口
1 2 3 4 5 6 7 8 9 10 11 12 interface AgentHarnessHooks <E, Ctx > { context : Ctx ; setContext (ctx : Ctx ): void ; observe (handler): () => void ; on (type , handler): () => void ; emit (event, signal?): Promise <ResultOf <E>>; addCleanup (cleanup): () => void ; clear (): Promise <void >; dispose (): Promise <void >; }
10.3 不同事件的归约语义
事件类型
归约方式
说明
context
链式变换
每个 handler 看到前一个的修改结果
before_provider_request
顺序 patch
每个 handler 可 patch streamOptions
before_provider_payload
顺序变换
每个 handler 可修改 payload
before_agent_start
收集合并
收集注入消息 + 链式修改 systemPrompt
tool_call
早退阻塞
任一 handler 返回 { block: true } 则停止
tool_result
顺序 patch
每个 handler 看到前一个 patch 后的结果
session_before_compact/tree
早退取消
任一 handler 返回 { cancel: true } 则取消
其他(message_end 等)
纯观察
返回值忽略
10.4 Harness 与 Hook 的交互
Harness 只做一件事:
1 2 3 const result = await this .hooks .emit ({ type : "context" , messages }, signal);return result?.messages ?? messages;
Harness 不 存储 handler、不做链式处理、不知道扩展策略。这些都由 Hooks 实现负责。
10.5 当前实现(简化版)
目前 AgentHarness 使用 Map<string, Set<Handler>> 存储 handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private handlers = new Map <string , Set <AgentHarnessHandler >>();on (type , handler ) { let handlers = this .handlers .get (type ); if (!handlers) { handlers = new Set (); this .handlers .set (type , handlers); } handlers.add (handler); return () => handlers.delete (handler); } private async emitHook (event ) { const handlers = this .getHandlers (event.type ); for (const handler of handlers) { const result = await handler (event); } }
十一、持久化恢复设计 (Durable Harness)
设计文档: packages/agent/docs/durable-harness.md
11.1 半持久化的务实选择
引用官方文档:
A fully durable AgentHarness is not realistic by itself because important dependencies are runtime JS supplied by the host app.
完全持久化不可行的原因:工具实现、模型认证、扩展、Hook handler 都是运行时 JS 代码,无法序列化。
实际目标 :半持久化(semi-durable)
Session 是持久的 append-only 状态树
Harness 将自己拥有的状态写入 session entries
宿主应用负责在恢复时重建不可持久化的依赖
恢复从持久边界重启,不从 in-flight 的 LLM 流中恢复
11.2 恢复流程
1 2 3 4 5 6 7 8 9 10 11 12 启动恢复: 1. 宿主注册 tools/models/extensions/resources/auth/hooks 2. Harness 打开 session 3. 从 session entries 归约出: - 当前 leaf - 对话分支 - harness 配置 - 队列 - pending writes - 活跃操作/turn/tool 状态 4. 验证必需的运行时依赖 5. 协调未完成的操作状态
11.3 恢复策略
场景
保守策略
未完成的 agent turn
标记中断,保留队列/pending writes,回到 idle
未完成的 provider 请求
标记中断,不自动重试
未完成的 tool call
追加错误结果,除非工具声明幂等可重试
未完成的 compaction
如果无 compaction entry 存在则重新运行
未完成的分支导航
补充缺失的 summary/leaf entries
11.4 关键不变式
引用官方文档的关键场景:
Crash after queue_enqueued: message is restored.
Consumed queue IDs must be recorded in turn_started before they are considered consumed.
Deterministic target entry IDs let recovery detect the entry already exists.
十二、测试与评测
12.1 测试结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 packages/agent/test/ ├── agent-loop.test.ts // 核心循环 ├── harness/ │ ├── agent-harness.test.ts // Harness 生命周期 │ ├── agent-harness-stream.test.ts // Provider/stream 行为 │ ├── compaction.test.ts // 压缩逻辑 │ └── session.test.ts // Session 树操作 packages/coding-agent/test/ ├── agent-session-retry.test.ts // 重试逻辑 ├── tools.test.ts // 工具执行 ├── compaction*.test.ts // 应用层压缩 ├── extensions-*.test.ts // 扩展系统 ├── print-mode.test.ts // 打印模式 ├── rpc*.test.ts // RPC 模式 └── suite/ // 集成测试套件 ├── harness.ts // 测试 harness 工具 └── regressions/ // 回归测试
12.2 Faux Provider(模拟 LLM)
测试使用 registerFauxProvider + fauxAssistantMessage 构建确定性的 LLM 响应:
1 2 3 4 5 6 7 registerFauxProvider ("test" , (context ) => { return fauxAssistantMessage ({ content : [{ type : "text" , text : "Hello" }], }); });
12.3 运行测试
1 2 3 4 5 6 7 8 9 10 ./test.sh npm run test :harness npm run coverage:harness cd packages/agentnode ../../node_modules/vitest/dist/cli.js --run test /agent-loop.test.ts
附录:源码文件索引
packages/agent/src/(核心运行时)
文件
行数
职责
agent-loop.ts
743
核心循环 :runLoop、streamAssistantResponse、executeToolCalls
agent.ts
557
Agent 类 :有状态包装器、队列、生命周期管理
types.ts
419
核心类型 :AgentMessage、AgentTool、AgentEvent、AgentLoopConfig
harness/agent-harness.ts
996
AgentHarness 类 :编排层,集成 session/compaction/hook
harness/types.ts
816
Harness 类型 :事件、hook 结果、error、session entry 定义
harness/session/session.ts
253
Session 类 :树形会话、上下文重建、分支导航
harness/compaction/compaction.ts
~200
压缩引擎 :prepareCompaction、compact
harness/compaction/utils.ts
-
压缩工具:序列化、文件操作提取
harness/messages.ts
-
消息转换:convertToLlm、自定义消息创建
harness/skills.ts
-
Skill 系统:格式化 skill 调用
harness/system-prompt.ts
-
系统提示词构建
packages/agent/docs/(设计文档)
文件
内容
agent-harness.md
AgentHarness 生命周期、状态模型、操作阶段、实现 TODO
durable-harness.md
半持久化设计:恢复模型、关键场景、最小可行实现
hooks.md
Hook 系统设计:核心模型、归约语义、扩展加载
observability.md
可观测性设计:事件模型、Trace/Span、安全脱敏
packages/coding-agent/src/core/(应用层)
文件/目录
职责
agent-session.ts
应用级协调(2500+ 行):retry、compaction trigger、model 管理
tools/
具体工具实现:bash、read、write、edit、grep、find、ls
extensions/
扩展系统:加载、运行、API
session-manager.ts
会话管理:创建、恢复、fork、列表
学习路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 第 1 步:理解类型 └─ 读 packages/agent/src/types.ts 重点:AgentMessage、AgentTool、AgentEvent、AgentLoopConfig 第 2 步:掌握核心循环 └─ 读 packages/agent/src/agent-loop.ts 的 runLoop 函数 重点:双层循环结构、steering/follow-up、工具执行 第 3 步:理解状态管理 └─ 读 packages/agent/src/agent.ts 的 Agent 类 重点:state 管理、队列、runWithLifecycle 第 4 步:学习持久化 └─ 读 packages/agent/src/harness/session/session.ts 重点:树形结构、buildSessionContext、moveTo 第 5 步:理解压缩 └─ 读 packages/agent/src/harness/compaction/compaction.ts 重点:prepareCompaction、compact 函数 第 6 步:掌握编排层 └─ 读 packages/agent/src/harness/agent-harness.ts 重点:createTurnState、executeTurn、save point、hooks 第 7 步:对照设计文档 └─ 读 packages/agent/docs/ 下的四个文件 理解设计意图、未来方向、权衡取舍 第 8 步:看应用层整合 └─ 读 packages/coding-agent/src/core/agent-session.ts(选读) 理解 retry 逻辑、UI 集成、具体工具实现