环境搭建

1. 下载pi-agent

1
git clone https://github.com/earendil-works/pi.git

2. understanding-anything方便代码理解

安装参考: https://github.com/Lum1104/Understand-Anything

1
2
3
PS D:\workspace\pi> cd C:\Users\zhiminding\.understand-anything-plugin\packages\dashboard
PS C:\Users\zhiminding\.understand-anything-plugin\packages\dashboard> $env:GRAPH_DIR="d:\workspace\pi"
PS C:\Users\zhiminding\.understand-anything-plugin\packages\dashboard> npx vite --host 127.0.0.1

3. codegraph阅读代码节省token

安装参考: https://github.com/colbymchenry/codegraph

1
2
codegraph init
codegraph sync

4. PI-agent服务启动

  1. 配置自定义模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// C:\Users\zhiminding\.pi\agent\models.json
{
"providers": {
"tencent-tokenhub": {
"baseUrl": "https://tokenhub.tencentmaas.com/v1",
"api": "openai-completions",
"apiKey": "sk-xxxx",
"compat": {
"supportsDeveloperRole": false,
"supportsReasoningEffort": false
},
"models": [
{
"id": "ep-889olgr0",
"name": "Kimi 2.6",
"reasoning": false,
"input": ["text"],
"contextWindow": 128000,
"maxTokens": 16384
},
{
"id": "ep-9kvyri9u",
"name": "Hunyuan 3 Preview",
"reasoning": false,
"input": ["text"],
"contextWindow": 128000,
"maxTokens": 16384
}
]
}
}
}
  1. 安装依赖(不执行 lifecycle scripts)
1
npm install --ignore-scripts
  1. 构建所有包
1
npm run build
  1. 从源码运行 pi(Windows 用 .bat 或 .ps1)
1
2
3
./pi-test.sh        # Linux/macOS
.\pi-test.bat # Windows CMD
.\pi-test.ps1 # Windows PowerShell
  1. 修改代码后,重新运行pi-test.sh等启动脚本

启动分析

main启动流程

首先从main(packages\coding-agent\src\main.ts)函数开始分析。main是整个代理CLI的入口

1. 环境准备

1.1 处理windwos环境的清理
1.2 包管理/配置命令 处理 pi install、pi config 等子命令,命中则直接返回
1.3 处理命令行参数parseArgs(args) 解析 CLI 参数
1.4 运行模式决定, 确定是 interactive(TUI)、print、json 还是 rpc 模式

2. SessionManager初始化

2.1 创建SettingsManager实例

加载全局+项目级设置,入口代码:

1
2
3
const cwd = process.cwd();
const agentDir = getAgentDir(); // 默认 ~/.pi/agent/
const startupSettingsManager = SettingsManager.create(cwd, agentDir);

相关目录:

目录/文件 路径(Windows) 说明
agentDir C:\Users\<user>\.pi\agent\ 全局代理配置根目录
settings.json ~/.pi/agent/settings.json 全局设置
auth.json ~/.pi/agent/auth.json 认证凭据
models.json ~/.pi/agent/models.json 自定义模型配置
themes/ ~/.pi/agent/themes/ 用户自定义主题
tools/ ~/.pi/agent/tools/ 自定义工具
prompts/ ~/.pi/agent/prompts/ 自定义 prompt 模板
bin/ ~/.pi/agent/bin/ 托管二进制(fd, rg)
sessions/ ~/.pi/agent/sessions/ 会话文件存储根目录
debug log ~/.pi/agent/pi-debug.log 调试日志

可通过环境变量 PI_CODING_AGENT_DIR 覆盖 agentDir 路径。

2.2 SessionDir 解析

sessionDir 决定会话文件存储位置,解析优先级(从高到低):

1
2
3
4
5
const sessionDir =
parsed.sessionDir // 1. CLI参数 --session-dir <path>
?? process.env[ENV_SESSION_DIR] // 2. 环境变量 PI_CODING_AGENT_SESSION_DIR
?? startupSettingsManager.getSessionDir(); // 3. settings.json 中配置
// 4. 若以上都无,默认为: ~/.pi/agent/sessions/--<encoded-cwd>--/

默认 sessionDir 计算规则getDefaultSessionDir()):

  • 取当前工作目录 cwd,将路径分隔符替换为 -
  • 拼接为: ~/.pi/agent/sessions/--<encoded-cwd>--/
  • 例如 cwd = D:\workspace\pi → sessionDir = ~/.pi/agent/sessions/--D-workspace-pi--/

2.3 createSessionManager 详细逻辑

根据不同 CLI 参数走不同分支:

1
async function createSessionManager(parsed, cwd, sessionDir, settingsManager)
CLI 参数 行为 静态方法
--no-session / --help / --list-models 内存模式,不持久化 SessionManager.inMemory(cwd)
--fork <session> 从源会话复制历史到当前项目创建新会话 SessionManager.forkFrom(sourcePath, cwd, sessionDir)
--session <id或path> 打开指定会话(本地直接打开;跨项目则询问是否fork) SessionManager.open(path, sessionDir)
--resume 弹出会话选择器让用户选择 SessionManager.open(selectedPath, sessionDir)
--continue 自动续接最近一次会话(按mtime排序) SessionManager.continueRecent(cwd, sessionDir)
--session-id <id> 打开指定ID的已有会话,或以该ID创建新会话 SessionManager.open()SessionManager.create()
(无参数) 创建全新会话 SessionManager.create(cwd, sessionDir)

–fork 额外校验:

  • 不能与 --session--continue--resume--no-session 同时使用
  • 若指定了 --session-id,检查目标ID不能已存在

–session 的解析流程 (resolveSessionPath()):

  1. 若参数含 /\ 或以 .jsonl 结尾 → 视为文件路径直接使用
  2. 否则先在当前项目 sessionDir 中匹配(精确匹配或前缀匹配)
  3. 若本地未找到,全局搜索所有项目的会话
  4. 全局找到时标记为 global,需要用户确认是否 fork 到当前项目

2.4 会话文件格式

会话以 .jsonl 文件存储(每行一个 JSON 对象),文件名格式:

1
2
<timestamp>_<session-id>.jsonl
例如: 2026-06-05T10-20-30-000Z_019704ab-1234-7890-abcd-ef1234567890.jsonl

文件结构(首行为 Header,后续为 Entry):

1
2
3
4
5
6
{"type":"session","version":3,"id":"...","timestamp":"...","cwd":"D:/workspace/pi","parentSession":"..."}
{"type":"message","id":"a1b2c3d4","parentId":null,"timestamp":"...","message":{...}}
{"type":"message","id":"e5f6g7h8","parentId":"a1b2c3d4","timestamp":"...","message":{...}}
{"type":"thinking_level_change","id":"...","parentId":"...","timestamp":"...","thinkingLevel":"high"}
{"type":"model_change","id":"...","parentId":"...","timestamp":"...","provider":"anthropic","modelId":"claude-opus-4-7"}
{"type":"compaction","id":"...","parentId":"...","timestamp":"...","summary":"...","firstKeptEntryId":"...","tokensBefore":50000}

Entry 类型:

type 说明
message 用户/助手/系统消息
thinking_level_change thinking 级别变更记录
model_change 模型切换记录
compaction 上下文压缩摘要(超出 context window 时触发)
branch_summary 分支摘要(从旧分支切到新分支时记录上下文)
custom 扩展自定义数据(不参与 LLM 上下文)
custom_message 扩展自定义消息(参与 LLM 上下文)
label 用户书签/标记
session_info 会话元数据(如用户定义的显示名称)

树形结构: 每个 Entry 有 idparentId,形成 append-only 树。支持分支对话(branch),不会删除/修改历史。

2.5 MissingSessionCwd 处理

当恢复一个旧会话,但会话记录的 cwd 目录已不存在时:

1
const missingSessionCwdIssue = getMissingSessionCwdIssue(sessionManager, cwd);
  • interactive 模式:弹出选择器,让用户选择是否在当前 cwd 继续
  • 其他模式:直接报错退出
1
2
// 用户选择继续后,以当前 cwd 覆盖会话的 cwd
sessionManager = SessionManager.open(missingSessionCwdIssue.sessionFile!, sessionDir, selectedCwd);

3.6 SessionManager作用

最终会使用newSession()创建一个jsonl文件,并通过sessionManager对这个session文件进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class SessionManager {
// ===== 会话标识 =====
private sessionId: string = "";
// 会话的唯一ID(uuidv7),用于文件命名和会话查找
// 格式如: "019704ab-1234-7890-abcd-ef1234567890"

private sessionFile: string | undefined;
// 会话文件的绝对路径,如: ~/.pi/agent/sessions/--D-workspace-pi--/2026-06-05T10-20-30_<id>.jsonl
// undefined 表示内存模式(不持久化)

private sessionDir: string;
// 会话文件存储目录,如: ~/.pi/agent/sessions/--D-workspace-pi--/
// /new 和 /fork 时在此目录下创建新文件

// ===== 工作上下文 =====
private cwd: string;
// 会话绑定的工作目录,记录在 session header 中
// 决定了 Agent 操作文件的根路径,也用于 /resume 时恢复项目上下文

// ===== 持久化控制 =====
private persist: boolean;
// true = 正常模式,追加写入 .jsonl 文件
// false = 内存模式(--no-session / --help / --list-models 时使用)

private flushed: boolean = false;
// 延迟写入标记。新会话的写入策略:
// - 用户发消息时不立即创建文件
// - 直到收到第一条 assistant 回复后,才把所有 entries 一次性写入文件
// - 目的:避免用户输入后没等到回复就退出时,产生空会话文件
// true = 文件已创建/已写入,后续用 appendFileSync 追加

// ===== 会话数据(内存中的完整状态) =====
private fileEntries: FileEntry[] = [];
// 包含 session header + 所有 entries 的完整列表(与 .jsonl 文件内容一一对应)
// 首元素是 SessionHeader,后续是各种 SessionEntry

private byId: Map<string, SessionEntry> = new Map();
// Entry ID → Entry 的索引,用于 O(1) 快速查找
// 支撑 getEntry()、getBranch()、getChildren() 等树遍历操作

private labelsById: Map<string, string> = new Map();
// Entry ID → 用户定义的标签名,用于书签/导航功能
// 通过 /label 命令设置,支持跳转到标记的对话位置

private labelTimestampsById: Map<string, string> = new Map();
// Entry ID → 标签设置时间,用于排序和显示

// ===== 树形结构指针 =====
private leafId: string | null = null;
// 当前对话的"游标"——指向树的叶节点
// 所有 appendXxx() 操作都以 leafId 为 parentId 创建新节点,然后更新 leafId
// /fork 时将 leafId 回退到某个历史节点,下次 append 就形成新分支
// null 表示空会话(还没有任何 entry)
}

核心工作机制

  • fileEntries 是完整数据,byId 是索引,leafId 是游标
  • 写入:appendMessage() → 创建 entry(parentId=leafId) → 更新 leafId → 写文件
  • 分支:branch(entryId) → 仅修改 leafId 指向历史节点 → 下次 append 自然形成分支
  • 读取:buildSessionContext() → 从 leafId 沿 parentId 链回溯到根 → 收集路径上的消息

3. 创建 AgentSession Runtime

整体调用链路:

1
2
3
4
5
6
7
8
9
10
11
main()
└── createAgentSessionRuntime(createRuntime, options) // 顶层入口
└── createRuntime(options) // 工厂函数(闭包)
├── createAgentSessionServices(options) // 步骤1: 创建 cwd (当前nodejs运行目录)绑定的服务
├── resolveModelScope(patterns, modelRegistry) // 步骤2: 解析模型范围
├── buildSessionOptions(parsed, scopedModels, …) // 步骤3: 构建会话选项
└── createAgentSessionFromServices(options) // 步骤4: 创建 AgentSession
└── createAgentSession(options) // SDK 核心函数
├── findInitialModel(…) // 选择模型
├── new Agent(…) // 创建 Agent 实例
└── new AgentSession(…) // 创建最终会话

3.1 createAgentSessionServices — 创建 cwd 绑定的运行时服务

创建"基础设施"(不涉及具体会话)
源文件packages/coding-agent/src/core/agent-session-services.ts

1
2
3
export async function createAgentSessionServices(
options: CreateAgentSessionServicesOptions
): Promise<AgentSessionServices>

入参 CreateAgentSessionServicesOptions

参数 类型 默认值 说明
cwd string (必填) 当前工作目录
agentDir string ~/.pi/agent/ 全局配置目录
authStorage AuthStorage AuthStorage.create(agentDir/auth.json) 认证凭据管理
settingsManager SettingsManager SettingsManager.create(cwd, agentDir) 设置管理器
modelRegistry ModelRegistry ModelRegistry.create(authStorage, agentDir/models.json) 模型注册表
extensionFlagValues Map<string, boolean|string> undefined CLI 传入的扩展 flag(--flagName
resourceLoaderOptions object undefined 资源加载器选项(见下表)

resourceLoaderOptions 子选项(来自 main.ts 闭包):

选项 来源(CLI参数) 说明
additionalExtensionPaths --extension <path> 额外扩展路径
additionalSkillPaths --skill <path> 额外技能路径
additionalPromptTemplatePaths --prompt-template <path> 额外 prompt 模板路径
additionalThemePaths --theme <path> 额外主题路径
noExtensions --no-extensions 禁用所有扩展
noSkills --no-skills 禁用所有技能
noPromptTemplates --no-prompt-templates 禁用 prompt 模板
noThemes --no-themes 禁用主题
noContextFiles --no-context-files 禁用 AGENTS.md 等上下文文件
systemPrompt --system-prompt <text/path> 覆盖系统提示词
appendSystemPrompt --append-system-prompt <text/path> 追加系统提示词
extensionFactories 代码注入 编程式注入的扩展工厂

内部执行流程:

  1. 解析 cwdagentDir 为绝对路径
  2. 创建 AuthStorage(管理 API Key 和 OAuth token)
  3. 创建 SettingsManager(加载 settings.json)
  4. 创建 ModelRegistry(加载内置模型 + models.json 自定义模型)
  5. 创建 DefaultResourceLoader 并调用 reload()
    • 加载扩展(extensions)
    • 加载技能(skills)
    • 加载 prompt 模板(用于命令使用,如/review/pr
    • 加载主题
    • 加载 AGENTS.md 上下文文件
  6. 处理扩展注册的 Provider(pendingProviderRegistrations
  7. 应用扩展 flag values

返回 AgentSessionServices

1
2
3
4
5
6
7
8
9
interface AgentSessionServices {
cwd: string; // 解析后的绝对路径
agentDir: string; // 全局配置目录
authStorage: AuthStorage; // 认证管理
settingsManager: SettingsManager; // 设置管理
modelRegistry: ModelRegistry; // 模型注册表
resourceLoader: ResourceLoader; // 资源加载器
diagnostics: AgentSessionRuntimeDiagnostic[]; // 诊断信息
}

当用户 /resume 会话替换,切换到另一个项目的会话时,cwd 变了,所有服务要重建(不同项目有不同的扩展、设置、AGENTS.md),所以暴露一个创建service的方法。这个service只服务环境,而不是session级别(createAgentSession)的


3.2 ResourceLoader — 资源加载器

源文件packages/coding-agent/src/core/resource-loader.ts

DefaultResourceLoader 负责从多个来源加载所有可扩展资源:

加载搜索路径优先级(由高到低):

资源类型 搜索路径
Extensions CLI --extension > ~/.pi/agent/extensions/ > .pi/extensions/(项目级) > 包管理器安装的
Skills CLI --skill > 扩展提供的 > ~/.pi/agent/skills/ > .pi/skills/(项目级)
Prompt Templates CLI --prompt-template > 扩展提供的 > ~/.pi/agent/prompts/ > .pi/prompts/(项目级)
Themes CLI --theme > 扩展提供的 > ~/.pi/agent/themes/ > 内置主题
Context Files ~/.pi/agent/AGENTS.md > 项目祖先目录链中的 AGENTS.md/CLAUDE.md

系统提示词(System Prompt)管理:

  • --system-prompt:完全覆盖默认系统提示词(可以是文本或文件路径)
  • --append-system-prompt:追加到默认系统提示词后面
  • AGENTS.md 文件内容会被作为上下文注入到系统提示词中
  • 扩展也可以通过 hook 修改系统提示词

构造系统提示词流程:packages\coding-agent\src\core\system-prompt.ts

1
2
3
4
5
6
7
buildSystemPrompt()
├── 有 .pi/SYSTEM.md? → 用它替换默认模板
│ 没有? → 使用的默认模板(工具列表 + guidelines + 文档路径)
├── 追加 .pi/APPEND_SYSTEM.md 内容
├── 追加 <project_context>(AGENTS.md 内容)
├── 追加 Skills 信息
└── 追加日期 + cwd

3.3 ModelRegistry — 模型注册表

源文件packages/coding-agent/src/core/model-registry.ts

1
ModelRegistry.create(authStorage, modelsPath)

职责:

  1. 加载内置模型(models.generated.ts 中 4000+ 条模型定义)
  2. 加载 models.json 自定义模型(合并/覆盖内置模型)
  3. 处理扩展注册的 Provider
  4. 提供 API Key 解析(getApiKeyAndHeaders(model)
  5. 判断模型是否有有效认证(hasConfiguredAuth(model)
  6. 列出可用模型(getAvailable() — 仅返回有认证的模型)

API Key 解析优先级:

  1. AuthStorage.getRuntimeApiKey() — CLI --api-key 设置的临时 key
  2. auth.json 中对应 provider 的配置
  3. 环境变量(如 ANTHROPIC_API_KEYOPENAI_API_KEY
  4. models.json 中 provider 的 apiKey 字段
  5. OAuth token(通过 /login 获取)

3.4 resolveModelScope — 解析模型作用域

源文件packages/coding-agent/src/core/model-resolver.ts

1
2
3
4
export async function resolveModelScope(
patterns: string[],
modelRegistry: ModelRegistry
): Promise<ScopedModel[]>

调用时机:在 main.ts 的 createRuntime 闭包中:

1
2
3
4
const modelPatterns = parsed.models ?? settingsManager.getEnabledModels();
const scopedModels = modelPatterns?.length > 0
? await resolveModelScope(modelPatterns, modelRegistry)
: [];

patterns 来源

  • CLI --models pattern1,pattern2,...
  • 或 settings.json 中 enabledModels 配置

匹配规则:

  1. 支持 glob 模式(*sonnet*anthropic/*
  2. 支持精确匹配(claude-opus-4-7
  3. 支持 provider/modelId 格式
  4. 支持附加 thinking level(claude-opus-4-7:high
  5. 模糊匹配时优先 alias(无日期后缀)> 最新日期版本

返回 ScopedModel[]

1
2
3
4
interface ScopedModel {
model: Model<Api>;
thinkingLevel?: ThinkingLevel; // 模式中指定的 thinking level
}

用于 TUI 中 Ctrl+P 快速切换模型。


3.5 buildSessionOptions — 构建会话选项

源文件packages/coding-agent/src/main.ts 第 338-430 行

1
2
3
4
5
6
7
function buildSessionOptions(
parsed: Args,
scopedModels: ScopedModel[],
hasExistingSession: boolean,
modelRegistry: ModelRegistry,
settingsManager: SettingsManager,
): { options: CreateAgentSessionOptions; cliThinkingFromModel: boolean; diagnostics: ... }

模型选择优先级:

  1. CLI --model <pattern> — 调用 resolveCliModel() 精确/模糊匹配
  2. scopedModels 中与 settings 默认模型匹配的那个
  3. scopedModels 的第一个
  4. (留空,后续由 createAgentSession 内的 findInitialModel 兜底)

Thinking Level 选择优先级:

  1. CLI --thinking <level>(最高优先级)
  2. --model pattern:level 中解析出的 level
  3. scopedModel 上配置的 level
  4. (留空,后续由 settings 默认值兜底)

工具配置:

CLI 参数 效果
--no-tools noTools = "all",禁用所有工具
--no-builtin-tools noTools = "builtin",仅禁用内置工具
--tools read,bash 仅启用列出的工具

3.6 AuthStorage — 认证管理

源文件packages/coding-agent/src/core/auth-storage.ts

1
const authStorage = AuthStorage.create();  // 读取 ~/.pi/agent/auth.json

核心方法:

  • setRuntimeApiKey(provider, key) — 设置运行时临时 API Key(来自 --api-key
  • getApiKey(provider) — 获取 provider 的 API Key
  • setOAuthToken(provider, token) — 保存 OAuth token(/login 写入)
  • getOAuthToken(provider) — 获取 OAuth token

main.ts 中的使用:

1
2
3
4
5
6
7
if (parsed.apiKey) {
if (!sessionOptions.model) {
diagnostics.push({ type: "error", message: "--api-key requires --model" });
} else {
authStorage.setRuntimeApiKey(sessionOptions.model.provider, parsed.apiKey);
}
}

3.7 createAgentSessionFromServices — 从服务创建会话

桥接接口,避免开发者直接调用 createAgentSession。这个接口有6个字段,包装一下传参为services的调用。

源文件packages/coding-agent/src/core/agent-session-services.ts

1
2
3
export async function createAgentSessionFromServices(
options: CreateAgentSessionFromServicesOptions
): Promise<CreateAgentSessionResult>

入参 CreateAgentSessionFromServicesOptions

参数 说明
services 步骤 3.1 创建的服务集合
sessionManager 会话管理器
sessionStartEvent 会话启动事件(通知扩展)
model 选定的模型(可选,会内部兜底)
thinkingLevel thinking 级别(可选)
scopedModels Ctrl+P 可切换的模型列表
tools 工具白名单
noTools 工具禁用模式
customTools 自定义工具定义

本质上是将 services 展开后调用 createAgentSession()


3.8 createAgentSession — SDK 核心函数

源文件packages/coding-agent/src/core/sdk.ts

这是真正创建 AgentSession 的函数,内部执行:

1. 模型选择流程(若 options.model 未指定):

1
2
3
4
5
6
7
8
如果恢复已有会话 → 尝试从会话历史的 model_change entry 恢复模型
↓ 恢复失败
调用 findInitialModel():
1. settings.json 中的 defaultProvider + defaultModel
2. 遍历所有 provider 的默认模型(defaultModelPerProvider 表)
3. 找到第一个有有效 API Key 的模型
↓ 都没找到
model = undefined, 返回 "No models available" 警告

2. Thinking Level 确定:

1
2
CLI --thinking > 会话历史恢复 > settings 默认值 > DEFAULT_THINKING_LEVEL("medium")
最终 clampThinkingLevel(model, level) 确保不超出模型能力

3. 创建 Agent 实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
agent = new Agent({
initialState: { systemPrompt: "", model, thinkingLevel, tools: [] },
convertToLlm: convertToLlmWithBlockImages, // 消息格式转换
streamFn: async (model, context, options) => {
// 获取 API Key → 调用 streamSimple() 流式请求 LLM
const auth = await modelRegistry.getApiKeyAndHeaders(model);
return streamSimple(model, context, { apiKey: auth.apiKey, ... });
},
onPayload: ..., // 扩展 hook: before_provider_request
onResponse: ..., // 扩展 hook: after_provider_response
transformContext: ..., // 扩展 hook: 上下文变换
sessionId: sessionManager.getSessionId(),
steeringMode: settingsManager.getSteeringMode(),
followUpMode: settingsManager.getFollowUpMode(),
transport: settingsManager.getTransport(),
thinkingBudgets: settingsManager.getThinkingBudgets(),
});

Agent 实例的作用(来自 @earendil-works/pi-agent-core 包):

Agent 是底层执行引擎,不关心 UI、文件持久化、扩展——只负责"对话循环":

  • state:持有当前 systemPrompt、model、thinkingLevel、tools、messages(对话历史)
  • prompt(text):接收用户输入 → 调用 LLM → 执行工具调用 → 循环直到 LLM 停止
  • streamFn:实际调 LLM 的函数(通过 modelRegistry 获取 API Key 后调用 streamSimple)
  • steer(msg) / followUp(msg):在 agent 运行中注入消息(steering = 当前轮结束后立即注入;followUp = agent 完全停止后才注入)
  • abort():中止当前运行
  • subscribe(listener):监听生命周期事件(agent_start、message_start、tool_call、agent_end 等)
  • convertToLlm:将内部消息格式转为 LLM API 消息格式
  • transformContext:发送前让扩展对上下文做最后变换

简单说:Agent = 一个无状态循环(用户消息 → LLM → 工具 → LLM → … → 停止),所有"记住什么、存在哪"都不是它管的。

4. 恢复/初始化会话消息:

1
2
3
4
5
6
if (hasExistingSession) {
agent.state.messages = existingSession.messages; // 恢复历史
} else {
sessionManager.appendModelChange(model.provider, model.id); // 新会话记录初始模型
sessionManager.appendThinkingLevelChange(thinkingLevel); // 记录初始 thinking
}

5. 创建 AgentSession:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd,
scopedModels: options.scopedModels,
resourceLoader,
customTools: options.customTools,
modelRegistry,
initialActiveToolNames, // 默认 ["read", "bash", "edit", "write"]
allowedToolNames,
extensionRunnerRef,
sessionStartEvent,
});

6. 返回结果:

1
2
3
4
5
interface CreateAgentSessionResult {
session: AgentSession; // 核心会话对象
extensionsResult: LoadExtensionsResult; // 扩展加载结果
modelFallbackMessage?: string; // 模型回退警告信息
}

AgentSession 的作用packages/coding-agent/src/core/agent-session.ts):

AgentSession 是高层业务协调者,在 Agent 之上叠加了所有"编码代理"需要的能力:

职责 说明
会话持久化 监听 Agent 事件,自动将消息写入 SessionManager(.jsonl 文件)
System Prompt 构建 组装工具描述 + guidelines + AGENTS.md + skills → 设置 agent.state.systemPrompt
工具管理 注册内置工具 + 扩展工具 + 自定义工具,管理启用/禁用状态
模型切换 /model 命令切换模型、记录 model_change entry、更新 agent.state.model
Thinking Level 切换 thinking 级别、clamp 到模型能力范围、记录 entry
上下文压缩 监控 token 用量,超阈值时触发 compaction(摘要压缩历史消息)
Prompt 展开 用户输入 → 展开 prompt template → 展开 skill block → 交给 Agent
扩展生命周期 创建 ExtensionRunner、分发事件(session_start、turn_start、message_end 等)
Bash 执行 提供 executeBash() 方法,包装 bash 工具的超时、权限等逻辑
分支/导航 暴露 branch、label 等操作给 UI 层
导出 exportToHtml() 将会话导出为 HTML

Agent vs AgentSession 的关系

1
2
3
AgentSession(高层:业务逻辑 + 持久化 + 扩展)
└── Agent(底层:纯对话循环引擎)
└── streamFn(最底层:HTTP 请求 LLM)
  • Agent 不知道文件系统、不知道扩展、不知道 session 文件
  • AgentSession 监听 Agent 事件,把每条消息同步写入 .jsonl,管理工具注册,驱动扩展 hook
  • 所有运行模式(interactive / print / rpc)共享同一个 AgentSession,只是上面的 UI 层不同

3.9 createAgentSessionRuntime — 顶层包装

源文件packages/coding-agent/src/core/agent-session-runtime.ts

1
2
3
4
export async function createAgentSessionRuntime(
createRuntime: CreateAgentSessionRuntimeFactory,
options: { cwd, agentDir, sessionManager, sessionStartEvent? }
): Promise<AgentSessionRuntime>

执行流程:

  1. assertSessionCwdExists() — 确保会话的 cwd 存在
  2. 调用 createRuntime(options) — 执行工厂函数(3.1~3.8 全部流程)
  3. 将结果包装为 AgentSessionRuntime 实例

AgentSessionRuntime 类:管理当前会话的生命周期,提供会话替换能力:

方法 触发场景 说明
switchSession(path) /resume 选择另一个会话 teardown 当前 → 创建新 runtime
newSession() /new 创建新会话 teardown 当前 → 创建新 runtime
fork(entryId) /fork 从某个 entry 分支 创建分支 → teardown → 新 runtime
importFromJsonl(path) /import 导入外部会话 复制文件 → open → 新 runtime
dispose() 退出程序 发送 session_shutdown 事件 → 释放资源

会话替换的统一模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async switchSession(path) {
// 1. 通知扩展 session_before_switch(可取消)
const beforeResult = await this.emitBeforeSwitch("resume", path);
if (beforeResult.cancelled) return;

// 2. Teardown 当前会话
await this.teardownCurrent("resume", newSessionFile);
// → 发送 session_shutdown 事件
// → 调用 beforeSessionInvalidate 回调(UI 解绑)
// → session.dispose()

// 3. 创建新 runtime(调用 createRuntime 工厂函数)
this.apply(await this.createRuntime({ cwd, agentDir, sessionManager, sessionStartEvent }));

// 4. 通知外部绑定新 session
await this.finishSessionReplacement();
// → rebindSession 回调(UI 重绑)
// → withSession 回调(调用者自定义逻辑)
}

3.10 默认内置工具

1
const defaultActiveToolNames: ToolName[] = ["read", "bash", "edit", "write"];
工具名 工厂函数 说明
read createReadTool(cwd) 读取文件内容
bash createBashTool(cwd) 执行 shell 命令
edit createEditTool(cwd) 编辑文件(search/replace)
write createWriteTool(cwd) 写入文件
grep createGrepTool(cwd) 搜索文件内容(需要格式化输出而非通用bash时,需手动启用)
find createFindTool(cwd) 查找文件(需要格式化输出而非通用bash时,需手动启用)
ls createLsTool(cwd) 列出目录(需要格式化输出而非通用bash时,需手动启用)

工具访问通过 withFileMutationQueue() 包装,确保文件操作的顺序一致性。


3.11 整体数据流总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────────────────────────┐
│ AgentSessionRuntime │
│ ├── AgentSession │
│ │ ├── Agent (来自 pi-agent-core) │
│ │ │ ├── state: { systemPrompt, model, thinkingLevel, tools, messages }
│ │ │ ├── streamFn → modelRegistry.getApiKeyAndHeaders → streamSimple (LLM调用)
│ │ │ └── convertToLlm → 消息格式转换 │
│ │ ├── SessionManager (会话持久化) │
│ │ ├── ExtensionRunner (扩展事件分发) │
│ │ ├── ModelRegistry (模型查找/认证) │
│ │ └── SettingsManager (读取配置) │
│ ├── AgentSessionServices (cwd绑定的服务集合) │
│ └── createRuntime (工厂函数,供会话替换时重建) │
└─────────────────────────────────────────────────────────────────┘

4. 读取输入 & 准备初始消息

对应 main.ts 第 687-704 行:

4.1 readPipedStdin — 读取管道输入

1
async function readPipedStdin(): Promise<string | undefined>
  • 如果 stdin 是 TTY(交互终端)→ 返回 undefined,不读取
  • 如果 stdin 是管道(如 echo "fix bug" | pi)→ 读取全部内容返回
  • 副作用:如果读到了管道内容,appMode 从 interactive 降级为 print(执行一次就退出)

4.2 prepareInitialMessage — 准备首条消息

1
2
async function prepareInitialMessage(parsed, autoResizeImages, stdinContent?)
→ { initialMessage?: string, initialImages?: ImageContent[] }

将各种输入源合并为一条初始消息:

输入源 示例 说明
-p "message" pi -p "fix the bug" CLI 直接传入的 prompt 文本
@file 参数 pi @image.png @code.ts 文件内容/图片,通过 processFileArguments 处理
stdin 管道 cat error.log | pi readPipedStdin 读到的内容

这些内容通过 buildInitialMessage() 拼接为最终的 initialMessage 字符串 + initialImages 图片数组。

4.3 initTheme — 初始化主题

加载主题配色方案(interactive 模式下还会启动主题文件 watcher,支持热更新)。


5. 启动运行模式

对应 main.ts 第 729-773 行,根据 appMode 进入不同的运行循环:

5.1 RPC 模式(appMode === "rpc"

1
await runRpcMode(runtime);  // 永不返回(process.exit 退出)
  • 接管 stdout 为 JSON-RPC 输出通道
  • 从 stdin 逐行读取 JSON-RPC 请求,分发到 session 执行
  • 用于外部程序(IDE 插件、自动化脚本)集成调用 pi
  • 支持的命令:prompt、abort、getState、switchModel、newSession 等

5.2 Interactive 模式(appMode === "interactive")— 最常用

1
2
3
4
5
6
7
8
9
const interactiveMode = new InteractiveMode(runtime, {
migratedProviders, // 迁移过的 provider(显示提示)
modelFallbackMessage, // 模型回退警告
initialMessage, // 首条消息(如果有)
initialImages, // 首条消息的图片
initialMessages: parsed.messages, // -p 传入的多条消息
verbose: parsed.verbose,
});
await interactiveMode.run(); // 永不返回(Ctrl+C 退出)核心流程,循环等待用户输入

InteractiveMode.run() 内部流程:

  1. init() — 初始化 TUI:

    • 注册信号处理器(Ctrl+C 等)
    • 下载 fdrg 工具(如果缺失)
    • 显示启动信息(版本、模型范围、快捷键提示)
    • 创建 TUI 组件树(header、editor、message 列表等)
    • 绑定 AgentSession 事件到 UI 组件
  2. 异步检查(不阻塞启动):

    • 版本更新检查(checkForNewPiVersion
    • 已安装包更新检查
    • tmux 键盘兼容性检查
  3. 处理初始消息(如果有):

    1
    2
    3
    if (initialMessage) {
    await this.session.prompt(initialMessage, { images: initialImages });
    }
  4. 主循环(核心交互):

    1
    2
    3
    4
    5
    // interactiveMode.run() {
    while (true) {
    const userInput = await this.getUserInput(); // 等待用户在编辑器中输入
    await this.session.prompt(userInput); // 发送给 AgentSession 处理
    }
    • getUserInput() 阻塞等待用户输入(TUI 编辑器组件)
    • 用户按 Enter 提交 → session.prompt() 展开模板 → 调 Agent → LLM → 工具循环 → 输出
    • 循环永远不会自然结束(通过 Ctrl+C / /exit 触发信号退出)

5.3 Print 模式(appMode === "print""json"

1
2
3
4
5
6
const exitCode = await runPrintMode(runtime, {
mode: "text" | "json", // text = 纯文本输出, json = JSON 格式输出
messages: parsed.messages,
initialMessage,
initialImages,
});
  • 执行完所有消息后立即退出(非交互式)
  • 适用于脚本自动化:pi -p "explain this code" < file.ts
  • text 模式直接输出 assistant 回复文本
  • json 模式输出结构化 JSON(包含 tool calls、metadata 等)
  • 返回 exitCode(0=成功,非0=错误)

总结:main() 完整生命周期

1
2
3
4
5
6
7
8
9
main(args)
├── 1. 环境准备(参数解析、模式决定)
├── 2. SessionManager 初始化(创建/恢复会话)
├── 3. AgentSession Runtime 创建(services → 模型 → session)
├── 4. 读取输入 & 准备初始消息
└── 5. 启动运行模式
├── rpc: stdin JSON-RPC → session.prompt → stdout JSON-RPC(永驻)
├── interactive: TUI 循环 getUserInput → session.prompt(永驻)
└── print: 执行 initialMessage → 输出 → 退出

完整对话流程

本节从 interactiveMode.run() 开始,详细分析一次完整的用户输入 → LLM 响应 → 工具执行 → 输出的全流程。

总览:调用链路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
InteractiveMode.run()                         [UI 层 — TUI 交互]
└── while(true) getUserInput() 等待用户在编辑器中按 Enter 提交
└── AgentSession.prompt(text) [业务层 — 命令解析/模板/验证]
├── 1. 扩展命令检测 (/command)
├── 2. 扩展 input 事件拦截
├── 3. Skill/模板展开
├── 4. 流式中排队 (steer/followUp)
├── 5. 模型 & API Key 验证
├── 6. 压缩检查
├── 7. 构建 messages 数组
├── 8. 扩展 before_agent_start
└── 9. _runAgentPrompt(messages)
└── Agent.prompt(messages) [引擎层 — 纯对话循环]
└── runPromptMessages()
└── runWithLifecycle()
└── runAgentLoop()
└── runLoop() [循环核心]
├── streamAssistantResponse() → LLM 流式请求
├── executeToolCalls() → 工具执行
└── 检查 steering/followUp → 决定是否继续

1. 用户输入收集(InteractiveMode)

源文件packages/coding-agent/src/modes/interactive/interactive-mode.ts

1.1 主循环

1
2
3
4
5
6
7
8
9
10
// InteractiveMode.run() 的核心循环
while (true) {
const userInput = await this.getUserInput(); // 阻塞等待用户输入
try {
await this.session.prompt(userInput); // 发送给 AgentSession 处理
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : "Unknown error occurred";
this.showError(errorMessage);
}
}

1.2 getUserInput() — Promise 回调模式

1
2
3
4
5
6
7
8
async getUserInput(): Promise<string> {
return new Promise((resolve) => {
this.onInputCallback = (text: string) => {
this.onInputCallback = undefined;
resolve(text);
};
});
}

这是一个经典的"等待外部触发"模式:

  • getUserInput() 创建一个 Promise,将 resolve 函数暂存为 onInputCallback
  • 当用户在 TUI 编辑器中按 Enter 提交文本时,setupEditorSubmitHandler 中的 onSubmit 回调触发 this.onInputCallback(text)
  • Promise resolve,主循环继续执行 session.prompt(userInput)

1.3 编辑器提交处理

setupEditorSubmitHandler() 的逻辑分三种情况:

场景 处理方式
内置 UI 命令(/settings/model/export 等) 直接在 InteractiveMode 内处理,不走 session
Agent 正在流式输出中 调用 session.prompt(text, { streamingBehavior: "steer" }) 排队
正常输入 调用 this.onInputCallback(text) 唤醒主循环

2. AgentSession.prompt() — 业务层处理

源文件packages/coding-agent/src/core/agent-session.ts(第 962 行)

这是从用户文本到 Agent 执行的桥梁,负责一系列预处理。

2.1 完整处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
prompt(text, options?)

├─ 步骤1: 扩展命令检测
│ text.startsWith("/") → _tryExecuteExtensionCommand(text)
│ 若命令被处理(如 /review、/pr),直接 return,不进 Agent

├─ 步骤2: 扩展 input 事件
│ _extensionRunner.emitInput(text, images, source)
│ 扩展可以:
│ - "handled":完全拦截,不继续
│ - "transform":修改 text/images 后继续
│ - 不处理:原样继续

├─ 步骤3: Skill/模板展开
│ _expandSkillCommand(text) → 展开 /skill:name 格式
│ expandPromptTemplate(text) → 展开 prompt 模板(如 /review → 完整 review prompt)

├─ 步骤4: 流式中排队
│ if (this.isStreaming) → 根据 streamingBehavior 选择:
│ - "steer":_queueSteer() → 当前轮结束后立即注入
│ - "followUp":_queueFollowUp() → Agent 完全停止后注入
│ 排队后直接 return,不阻塞

├─ 步骤5: 模型 & API Key 验证
│ - 检查 this.model 是否已选定
│ - 检查 modelRegistry.hasConfiguredAuth(model) 是否有有效认证
│ - 无效则 throw Error(显示给用户)

├─ 步骤6: 压缩检查
│ _checkCompaction(lastAssistant, false)
│ 如果上下文过长需要先压缩,则执行 agent.continue() 完成压缩后再继续

├─ 步骤7: 构建 messages 数组
│ messages = [
│ { role: "user", content: [{ type: "text", text: expandedText }, ...images] },
│ ...pendingNextTurnMessages // 扩展注入的上下文消息
│ ]

├─ 步骤8: 扩展 before_agent_start 事件
│ _extensionRunner.emitBeforeAgentStart(text, images, systemPrompt, options)
│ 扩展可以:
│ - 追加 custom messages 到 messages 数组
│ - 修改 systemPrompt(仅本轮有效)

└─ 步骤9: 执行
await _runAgentPrompt(messages)

2.2 _runAgentPrompt — 调用 Agent 并处理后续

1
2
3
4
5
6
7
8
9
10
private async _runAgentPrompt(messages: AgentMessage | AgentMessage[]): Promise<void> {
try {
await this.agent.prompt(messages); // 调用底层 Agent
while (await this._handlePostAgentRun()) { // 循环:重试 or 压缩
await this.agent.continue();
}
} finally {
this._flushPendingBashMessages(); // 刷出未处理的 bash 消息
}
}

2.3 _handlePostAgentRun — 后处理循环

Agent 一轮执行完毕后,检查是否需要继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private async _handlePostAgentRun(): Promise<boolean> {
const msg = this._lastAssistantMessage;
this._lastAssistantMessage = undefined;
if (!msg) return false;

// 1. 可重试的错误?→ 准备重试(自动重试,不需要用户介入)
if (this._isRetryableError(msg) && await this._prepareRetry(msg)) {
return true; // → agent.continue() 重试
}

// 2. 重试失败最终通知
if (msg.stopReason === "error" && this._retryAttempt > 0) {
this._emit({ type: "auto_retry_end", success: false, ... });
this._retryAttempt = 0;
}

// 3. 检查是否需要压缩上下文
return await this._checkCompaction(msg); // true → agent.continue() 执行压缩
}

可重试错误类型:网络超时、Rate Limit(429)、服务端错误(5xx)、上下文过长(overflow) 等。


3. Agent.prompt() — 引擎层入口

源文件packages/agent/src/agent.ts(第 327 行)

Agent 是无状态的对话循环引擎,不关心 UI、持久化、扩展。

3.1 prompt() 方法

1
2
3
4
5
6
7
async prompt(input: string | AgentMessage | AgentMessage[], images?: ImageContent[]): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing a prompt. Use steer() or followUp()...");
}
const messages = this.normalizePromptInput(input, images);
await this.runPromptMessages(messages);
}

关键约束:同一时刻只能有一个 activeRun。如果 Agent 正在执行,外部只能通过 steer()followUp() 排队。

3.2 runPromptMessages — 准备上下文并进入循环

1
2
3
4
5
6
7
8
9
10
11
12
private async runPromptMessages(messages: AgentMessage[], options = {}): Promise<void> {
await this.runWithLifecycle(async (signal) => {
await runAgentLoop(
messages, // 本次要发送的消息
this.createContextSnapshot(), // 上下文快照(systemPrompt + 历史消息 + 工具)
this.createLoopConfig(options), // 循环配置(模型、回调、队列)
(event) => this.processEvents(event), // 事件处理器
signal, // AbortSignal
this.streamFn, // LLM 流式调用函数
);
});
}

3.3 createContextSnapshot — 上下文快照

1
2
3
4
5
6
7
private createContextSnapshot(): AgentContext {
return {
systemPrompt: this._state.systemPrompt, // 当前系统提示词
messages: this._state.messages.slice(), // 历史消息副本
tools: this._state.tools.slice(), // 可用工具列表副本
};
}

3.4 createLoopConfig — 循环配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private createLoopConfig(options = {}): AgentLoopConfig {
return {
model: this._state.model, // 当前模型
reasoning: this._state.thinkingLevel, // thinking 级别
sessionId: this.sessionId, // 会话ID(缓存相关)
onPayload: this.onPayload, // 请求钩子
onResponse: this.onResponse, // 响应钩子
transport: this.transport, // 传输方式
thinkingBudgets: this.thinkingBudgets, // thinking token 预算
toolExecution: this.toolExecution, // 工具执行模式(parallel/sequential)
beforeToolCall: this.beforeToolCall, // 工具调用前钩子
afterToolCall: this.afterToolCall, // 工具调用后钩子
prepareNextTurn: ..., // 下轮准备钩子
convertToLlm: this.convertToLlm, // 消息格式转换
transformContext: this.transformContext, // 上下文变换
getApiKey: this.getApiKey, // API Key 解析
getSteeringMessages: async () => this.steeringQueue.drain(), // steering 队列
getFollowUpMessages: async () => this.followUpQueue.drain(), // followUp 队列
};
}

3.5 runWithLifecycle — 生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private async runWithLifecycle(executor: (signal: AbortSignal) => Promise<void>): Promise<void> {
// 1. 创建 AbortController + Promise(用于 waitForIdle)
const abortController = new AbortController();
this.activeRun = { promise, resolve, abortController };

// 2. 标记为流式状态
this._state.isStreaming = true;

try {
await executor(abortController.signal); // 执行循环
} catch (error) {
await this.handleRunFailure(error, abortController.signal.aborted);
} finally {
this.finishRun(); // 清理:isStreaming=false, activeRun=undefined
}
}

3.6 processEvents — 状态同步 + 事件分发

Agent 收到循环事件后做两件事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private async processEvents(event: AgentEvent): Promise<void> {
// 1. 更新内部状态
switch (event.type) {
case "message_start":
this._state.streamingMessage = event.message;
break;
case "message_end":
this._state.streamingMessage = undefined;
this._state.messages.push(event.message); // 归档到历史
break;
case "tool_execution_start":
this._state.pendingToolCalls.add(event.toolCallId);
break;
case "tool_execution_end":
this._state.pendingToolCalls.delete(event.toolCallId);
break;
// ...
}

// 2. 通知所有订阅者(AgentSession._handleAgentEvent 在此被调用)
for (const listener of this.listeners) {
await listener(event, signal);
}
}

4. runAgentLoop — 循环核心

源文件packages/agent/src/agent-loop.ts(第 95 行)

4.1 入口:runAgentLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
export async function runAgentLoop(
prompts: AgentMessage[], // 本次发送的消息(user message + 扩展注入的 custom messages)
context: AgentContext, // 上下文快照
config: AgentLoopConfig, // 循环配置
emit: AgentEventSink, // 事件回调
signal?: AbortSignal,
streamFn?: StreamFn,
): Promise<AgentMessage[]> {
const newMessages: AgentMessage[] = [...prompts];
const currentContext: AgentContext = {
...context,
messages: [...context.messages, ...prompts], // 将新消息追加到上下文
};

await emit({ type: "agent_start" }); // ← Agent 开始事件
await emit({ type: "turn_start" }); // ← 第一轮开始
for (const prompt of prompts) {
await emit({ type: "message_start", message: prompt }); // ← 用户消息事件
await emit({ type: "message_end", message: prompt });
}

await runLoop(currentContext, newMessages, config, signal, emit, streamFn);
return newMessages;
}

4.2 runLoop — 双层循环

这是整个 Agent 的核心调度逻辑,采用双层循环设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─ 外层循环(Outer Loop)────────────────────────────────────────┐
│ 负责处理 followUp 消息。当内层循环结束(Agent 准备停止)时, │
│ 检查是否有 followUp 消息需要处理。有则继续,无则退出。 │
│ │
│ ┌─ 内层循环(Inner Loop)──────────────────────────────────┐ │
│ │ 负责处理 tool calls + steering 消息。 │ │
│ │ 只要还有工具调用或 steering 消息,就继续循环。 │ │
│ │ │ │
│ │ 每轮执行: │ │
│ │ 1. 注入 pending 消息(steering) │ │
│ │ 2. streamAssistantResponse() → LLM 流式响应 │ │
│ │ 3. 检查 tool calls → executeToolCalls() │ │
│ │ 4. prepareNextTurn() → 可能切换模型/上下文 │ │
│ │ 5. shouldStopAfterTurn() → 是否强制停止 │ │
│ │ 6. getSteeringMessages() → 检查新的 steering 消息 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 内层结束后:getFollowUpMessages() → 有则回到内层继续 │
└────────────────────────────────────────────────────────────────┘

详细伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async function runLoop(initialContext, newMessages, initialConfig, signal, emit, streamFn) {
let currentContext = initialContext;
let config = initialConfig;
let pendingMessages = await config.getSteeringMessages(); // 启动时检查积压的 steering

// ===== 外层循环 =====
while (true) {
let hasMoreToolCalls = true;

// ===== 内层循环 =====
while (hasMoreToolCalls || pendingMessages.length > 0) {
// 1. 注入 steering 消息到上下文
if (pendingMessages.length > 0) {
for (const msg of pendingMessages) {
emit({ type: "message_start/end", message: msg });
currentContext.messages.push(msg);
}
pendingMessages = [];
}

// 2. 流式获取 LLM 响应
const message = await streamAssistantResponse(currentContext, config, signal, emit, streamFn);

// 3. 错误/中止 → 直接退出
if (message.stopReason === "error" || message.stopReason === "aborted") {
emit({ type: "agent_end" });
return;
}

// 4. 提取并执行工具调用
const toolCalls = message.content.filter(c => c.type === "toolCall");
hasMoreToolCalls = false;
if (toolCalls.length > 0) {
const batch = await executeToolCalls(currentContext, message, config, signal, emit);
hasMoreToolCalls = !batch.terminate; // 工具未要求终止 → 继续循环
// 工具结果追加到上下文
for (const result of batch.messages) {
currentContext.messages.push(result);
}
}

// 5. 轮次结束,允许动态调整配置
const snapshot = await config.prepareNextTurn?.({ message, toolResults, context, newMessages });
// 可能更新 model、thinkingLevel、context

// 6. 检查是否强制停止
if (await config.shouldStopAfterTurn?.({ message, toolResults, ... })) {
emit({ type: "agent_end" });
return;
}

// 7. 获取新的 steering 消息
pendingMessages = await config.getSteeringMessages();
}
// ===== 内层循环结束 =====

// 8. 检查 followUp 消息
const followUps = await config.getFollowUpMessages();
if (followUps.length > 0) {
pendingMessages = followUps;
continue; // 回到外层循环 → 进入内层循环处理
}

break; // 无更多消息,退出
}
// ===== 外层循环结束 =====

emit({ type: "agent_end", messages: newMessages });
}

5. streamAssistantResponse — LLM 流式请求

源文件packages/agent/src/agent-loop.ts(第 275 行)

这是 AgentMessage[] → Message[] 转换 + LLM 调用的地方。

5.1 执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
streamAssistantResponse(context, config, signal, emit, streamFn)

├─ 1. transformContext(可选)
│ config.transformContext(messages, signal)
│ 扩展可以对上下文做最后变换(如截断、注入额外信息)

├─ 2. convertToLlm
│ config.convertToLlm(messages)
│ AgentMessage[] → Message[](过滤掉 custom 等非 LLM 角色)

├─ 3. 构建 LLM Context
│ { systemPrompt, messages: llmMessages, tools }

├─ 4. 解析 API Key
│ config.getApiKey(model.provider) || config.apiKey

├─ 5. 调用 streamFunction(即 streamSimple)
│ streamFn(model, llmContext, { apiKey, signal, reasoning, ... })
│ 返回 AsyncIterable<StreamEvent>

└─ 6. 消费流式事件
for await (const event of response) {
switch (event.type) {
case "start": → emit message_start(创建 partial message)
case "text_delta": → emit message_update(增量文本)
case "thinking_delta": → emit message_update(增量思考)
case "toolcall_start": → emit message_update(工具调用开始)
case "toolcall_delta": → emit message_update(工具参数增量)
case "done"/"error": → emit message_end(最终消息)
}
}
return finalMessage; // AssistantMessage(含 content、usage、stopReason)

5.2 流式事件传递链

1
2
3
4
5
6
7
LLM API (HTTP SSE)
→ streamSimple() 解析为 StreamEvent
→ streamAssistantResponse() 转为 AgentEvent
→ emit() 回调
→ Agent.processEvents() 更新状态 + 通知 listeners
→ AgentSession._handleAgentEvent() 持久化 + 扩展通知 + 广播
→ InteractiveMode.handleEvent() 更新 TUI 组件

5.3 关键事件类型

StreamEvent AgentEvent UI 效果
start message_start 创建 assistant 消息组件,开始流式显示
text_delta message_update 增量追加文本到消息组件
thinking_delta message_update 增量追加思考内容
toolcall_start message_update 创建工具执行组件
toolcall_delta message_update 更新工具参数显示
done message_end 完成消息渲染
error message_end 显示错误信息

6. executeToolCalls — 工具执行

源文件packages/agent/src/agent-loop.ts(第 373 行)

6.1 执行策略

根据 config.toolExecution 和工具自身的 executionMode 决定执行方式:

条件 执行方式
config.toolExecution === "sequential" 串行执行所有工具
任一工具标记 executionMode: "sequential" 串行执行所有工具
其他情况(默认) 并行执行所有工具

6.2 单个工具调用的完整生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
executeToolCalls(context, assistantMessage, config, signal, emit)

├─ 对每个 toolCall:
│ │
│ ├─ emit tool_execution_start → UI 显示"正在执行..."
│ │
│ ├─ prepareToolCall() 准备阶段:
│ │ ├─ 查找工具定义(context.tools 中匹配 name)
│ │ ├─ 工具未找到 → immediate error result
│ │ ├─ prepareArguments() → 预处理参数
│ │ ├─ validateToolArguments() → JSON Schema 校验
│ │ └─ config.beforeToolCall() → 扩展钩子(可拦截/修改/终止)
│ │
│ ├─ executePreparedToolCall() 执行阶段:
│ │ └─ tool.call(args, context, signal) → 实际执行工具
│ │ 期间通过 emit tool_execution_update 通知进度
│ │
│ ├─ finalizeExecutedToolCall() 收尾阶段:
│ │ └─ config.afterToolCall() → 扩展钩子(可修改结果)
│ │
│ ├─ emit tool_execution_end → UI 显示执行结果
│ │
│ └─ createToolResultMessage() → 构建 ToolResultMessage

└─ 返回 { messages: ToolResultMessage[], terminate: boolean }

6.3 并行执行细节

并行模式下的执行策略更精细:

1
2
3
4
5
6
// 1. 遍历所有 toolCalls,发出 tool_execution_start
// 2. 对每个 toolCall 调用 prepareToolCall()
// - immediate 结果(如工具未找到)→ 直接 emit end
// - prepared 结果 → 存为 async 函数待执行
// 3. Promise.all() 并行执行所有 prepared 的工具
// 4. 按原始顺序收集结果,逐个 emit tool result message

6.4 terminate 语义

1
2
3
4
function shouldTerminateToolBatch(finalizedCalls): boolean {
// 只有当所有工具都返回 terminate=true 时,才终止循环
return finalizedCalls.length > 0 && finalizedCalls.every(f => f.result.terminate === true);
}
  • terminate = true:该批次工具都认为不需要再调 LLM → 内层循环的 hasMoreToolCalls = false
  • terminate = false(默认):工具结果需要反馈给 LLM → hasMoreToolCalls = true → 继续循环

7. 事件传递与持久化

7.1 AgentSession._handleAgentEvent — 事件中转站

源文件packages/coding-agent/src/core/agent-session.ts(第 469 行)

AgentSession 通过 agent.subscribe(this._handleAgentEvent) 监听所有 Agent 事件,负责:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
_handleAgentEvent(event)

├─ 1. 队列追踪
│ 当 user message 开始时,从 _steeringMessages/_followUpMessages 中移除
│ → 通知 UI 更新队列显示

├─ 2. 扩展通知
│ _emitExtensionEvent(event) → 通知所有扩展的 hook

├─ 3. UI 广播
│ _emit(event) → 通知所有 listener(InteractiveMode 在此更新 TUI)

└─ 4. 持久化
if (event.type === "message_end"):
- role === "custom" → sessionManager.appendCustomMessageEntry()
- role === "user/assistant/toolResult" → sessionManager.appendMessage()
if (event.message.role === "assistant"):
- 记录 _lastAssistantMessage(供 _handlePostAgentRun 使用)
- 非错误时重置重试计数器

7.2 InteractiveMode.handleEvent — UI 更新

事件类型 UI 行为
agent_start 清空工具追踪、启动加载动画
message_start (assistant) 创建 AssistantMessageComponent,开始流式渲染
message_update 增量更新文本/思考/工具内容到组件
message_end 完成渲染、处理错误状态
tool_execution_start 创建工具执行进度组件
tool_execution_update 更新进度(如 bash 输出)
tool_execution_end 显示工具执行结果
agent_end 停止加载动画、清理状态、准备接收下一次输入
auto_retry_start 显示"正在重试…"提示
queue_update 更新消息队列显示

8. Steering 和 FollowUp 机制

8.1 概念对比

机制 注入时机 用途
Steering 当前轮(assistant turn)结束后、下一轮 LLM 调用前 用户在 Agent 执行中追加指令
FollowUp Agent 完全停止(无更多 tool calls)后 延迟执行的后续任务

8.2 数据流

1
2
3
4
5
6
7
8
9
10
11
12
用户在流式中输入 "stop writing tests"
→ InteractiveMode 检测到 isStreaming
→ session.prompt(text, { streamingBehavior: "steer" })
→ AgentSession._queueSteer(text)
→ agent.steer(message)
→ steeringQueue.enqueue(message)

Agent 内层循环当前轮结束后:
→ pendingMessages = config.getSteeringMessages()
→ steeringQueue.drain()
→ 返回 ["stop writing tests"]
→ 注入到上下文 → 下一次 LLM 调用看到这条消息

8.3 队列模式(QueueMode)

1
type QueueMode = "one-at-a-time" | "all";
  • "one-at-a-time"(默认):每次 drain() 只取出一条消息
  • "all":每次 drain() 取出所有积压消息

9. 错误处理与重试

9.1 重试流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Agent 运行结束
→ _handlePostAgentRun()
→ _isRetryableError(msg)? 检查是否可重试:
- 429 Rate Limit
- 5xx 服务端错误
- 网络超时
- Context overflow
→ _prepareRetry(msg)?
- _retryAttempt++
- 检查是否超过 maxRetries
- emit auto_retry_start 事件
- 等待退避延时
- return true
→ agent.continue() 重新进入循环

9.2 上下文压缩(Compaction)

当 token 用量接近模型的 contextWindow 时触发:

1
2
3
4
5
6
7
_checkCompaction(lastAssistantMessage)
→ 计算当前 token 用量
→ 超过阈值?
→ 生成 compaction summary(摘要历史消息)
→ 替换历史消息为摘要
→ sessionManager.appendCompaction(summary, ...)
→ return true → agent.continue() 用压缩后的上下文继续

10. 完整时序图

一次典型的用户输入完整流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
用户按 Enter                                     时间线
│ │
▼ │
InteractiveMode.getUserInput() resolve │
│ │
▼ │
AgentSession.prompt(text) │
├─ 模板展开、验证、构建 messages │
└─ _runAgentPrompt(messages) │
│ │
▼ │
Agent.prompt(messages) │
│ │
▼ │
runWithLifecycle() → isStreaming=true │
│ │
▼ │
runAgentLoop(messages, context, config, ...) │
│ │
▼ │
emit agent_start ─────────────────────────────── UI: 开始加载动画
emit turn_start │
emit message_start (user) ───────────────────── UI: 显示用户消息
emit message_end (user) ─────────────────────── SessionManager: 持久化
│ │
▼ │
streamAssistantResponse() │
→ transformContext() │
→ convertToLlm() │
→ streamFn(model, context, opts) │
│ │
├─ event: start ────────────────────────── emit message_start (assistant)
│ UI: 创建 assistant 消息组件
├─ event: text_delta ───────────────────── emit message_update
│ UI: 增量渲染文本
├─ event: text_delta ───────────────────── emit message_update
│ ...(持续流式输出)
├─ event: toolcall_start ───────────────── emit message_update
│ UI: 创建工具调用组件
├─ event: toolcall_delta ───────────────── emit message_update
│ UI: 更新工具参数
└─ event: done ────────────────────────── emit message_end (assistant)
SessionManager: 持久化
│ │
▼ │
hasToolCalls? YES │
│ │
▼ │
executeToolCalls() │
├─ emit tool_execution_start ──────────────── UI: "正在执行 bash..."
├─ tool.call(args) ────────────────────────── 实际执行(如运行命令)
│ ├─ emit tool_execution_update ─────────── UI: 显示执行输出
│ └─ 完成
├─ emit tool_execution_end ────────────────── UI: 显示结果
└─ emit message_start/end (toolResult) ────── SessionManager: 持久化
│ │
▼ │
emit turn_end │
│ │
▼ │
hasMoreToolCalls=true → 回到内层循环顶部 │
│ │
▼ │
(LLM 根据工具结果继续生成...) │
streamAssistantResponse() → 纯文本回复 │
│ │
▼ │
hasToolCalls? NO, hasMoreToolCalls=false │
pendingMessages (steering)? EMPTY │
│ │
▼ │
内层循环退出 │
followUpMessages? EMPTY │
│ │
▼ │
外层循环退出 │
emit agent_end ──────────────────────────────── UI: 停止加载动画
│ │
▼ │
finishRun() → isStreaming=false │
│ │
▼ │
_handlePostAgentRun() │
→ 无错误、无需压缩 → return false │
│ │
▼ │
主循环回到 getUserInput() ─────────────────────── UI: 编辑器获取焦点,等待下一次输入

11. 关键设计总结

11.1 三层架构分离

层次 组件 职责 不关心
UI 层 InteractiveMode 渲染、键盘事件、用户交互 模型细节、持久化
业务层 AgentSession 命令解析、扩展、验证、持久化、压缩、重试 LLM 协议、流式解析
引擎层 Agent + runLoop 纯对话循环、工具执行、消息管理 文件系统、UI、扩展系统

11.2 事件驱动架构

整个系统通过事件流串联:

  • 发布者runLoop 中的 emit() 调用
  • 中间件Agent.processEvents() 做状态同步
  • 消费者链:Agent listeners → AgentSession._handleAgentEvent → InteractiveMode.handleEvent

11.3 消息队列设计

双队列(steering + followUp)实现了"用户可以在 Agent 执行中追加指令"的能力,而不需要 abort 当前执行:

1
2
steering:  用户急需插入的指令 → 当前轮结束后立即处理
followUp: 延迟任务 → Agent 完全空闲后才处理

11.4 不可变快照 + 可变推进

  • createContextSnapshot() 在循环开始前创建上下文快照(独立副本)
  • 循环内直接 push 到 currentContext.messages(可变推进)
  • Agent._state.messages 通过 processEventsmessage_end 逐步同步
  • 保证循环内的上下文变化不会被外部意外修改