vLLM v1 调度器深度剖析

目录

  1. 调度器概述
  2. Continuous Batching 深度解析
  3. 核心数据结构
  4. 调度策略与请求队列
  5. 调度流程详解
  6. 调度器与 Model Runner 的协作
  7. KV Cache 内存管理
  8. 抢占机制
  9. 特殊场景处理
  10. 关键配置参数
  11. 源码导读

1. 调度器概述

1.1 什么是调度器?

vLLM 的调度器(Scheduler)是整个推理引擎的"大脑",负责决定在每一次前向计算(forward pass)中:

  • 哪些请求应该被处理
  • 每个请求处理多少 token
  • 如何分配 GPU 内存(KV Cache)

1.2 Continuous Batching 核心思想

vLLM 采用 Continuous Batching(连续批处理) 策略,其核心思想是:

1
2
3
4
5
传统批处理:  [Req1 完成] -> [Req2 完成] -> [Req3 完成]
↓ 等待 ↓ ↓ 等待 ↓

Continuous Batching: [Req1, Req2, Req3] 同时处理
Req1 完成后立即插入 Req4

这种方式的优势:

  • GPU 利用率更高:不需要等待整个 batch 完成
  • 延迟更低:新请求可以立即加入
  • 吞吐量更大:并发处理多个请求

1.3 调度器架构

graph TB
    subgraph Engine["Engine Core"]
        A[API Server] --> B[EngineCoreClient]
        B --> C[Scheduler]
        C --> D[Model Runner]
    end
    
    subgraph Scheduler["调度器内部"]
        C --> E[waiting 队列]
        C --> F[running 队列]
        C --> G[KVCacheManager]
        C --> H[EncoderCacheManager]
    end
    
    E -->|调度| F
    F -->|完成/抢占| E

2. Continuous Batching 深度解析

2.1 传统 Static Batching 的问题

在传统的 LLM 推理中,使用 Static Batching(静态批处理)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────────────┐
│ Static Batching 示例 │
├─────────────────────────────────────────────────────────────────┤
│ Batch 1: │
│ Req1: [████████████████████████████] 生成 30 tokens │
│ Req2: [████████░░░░░░░░░░░░░░░░░░░░] 生成 10 tokens, 等待 20 │
│ Req3: [██████████████░░░░░░░░░░░░░░] 生成 15 tokens, 等待 15 │
│ ↑ │
│ 等 Req1 完成后,整个 batch 才能释放 │
│ │
│ Batch 2: (必须等 Batch 1 全部完成) │
│ Req4: [████████████████████] │
│ Req5: [████████████] │
└─────────────────────────────────────────────────────────────────┘

问题:
1. Req2, Req3 提前完成,但 GPU 资源被浪费在"等待"上
2. Req4, Req5 必须等待整个 Batch 1 完成才能开始
3. GPU 利用率低,吞吐量受限

2.2 Continuous Batching 的解决方案

vLLM 采用 Continuous Batching(连续批处理),也称为 Iteration-level Batching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────────────┐
│ Continuous Batching 示例 │
├─────────────────────────────────────────────────────────────────┤
│ Step 1: [Req1, Req2, Req3] 一起处理 │
│ Step 2: [Req1, Req2, Req3] 继续处理 │
│ Step 3: [Req1, Req2, Req3] Req2 完成! → 移出 │
│ Step 4: [Req1, Req3, Req4] Req4 立即加入! ← 新请求 │
│ Step 5: [Req1, Req3, Req4] Req3 完成! → 移出 │
│ Step 6: [Req1, Req4, Req5] Req5 立即加入! ← 新请求 │
│ ... │
└─────────────────────────────────────────────────────────────────┘

优势:
1. 请求完成后立即释放资源,新请求立即加入
2. GPU 始终保持高利用率
3. 整体吞吐量大幅提升

2.3 调度器如何实现 Continuous Batching

调度器是 Continuous Batching 的核心实现者。它在每一个推理步骤(iteration)中:

2.3.1 动态管理请求集合

1
2
3
4
5
6
7
8
# scheduler.py 中的核心数据结构
self.waiting: RequestQueue = ... # 等待队列(可动态添加)
self.running: list[Request] = [] # 运行列表(动态变化)

# 每次调度时:
# 1. 已完成的请求从 running 移出
# 2. 新请求从 waiting 加入 running
# 3. 资源不足时可抢占 running 中的请求

2.3.2 Token 级别的细粒度调度

这是 Continuous Batching 的关键!调度器不是按"请求"粒度调度,而是按"token"粒度:

1
2
3
4
5
6
7
# 每个请求本轮处理的 token 数可以不同
num_scheduled_tokens = {
"req_A": 1000, # Prefill 阶段,处理 1000 个 prompt tokens
"req_B": 1, # Decode 阶段,只生成 1 个 token
"req_C": 1, # Decode 阶段,只生成 1 个 token
"req_D": 500, # Prefill 阶段,处理 500 个 prompt tokens(新加入)
}

这意味着不同阶段的请求可以混合处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌──────────────────────────────────────────────────────────────────┐
│ Prefill 和 Decode 混合批处理 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ 单次前向传播中: │
│ │
│ [Req_A: 1000 tokens (Prefill)] + [Req_B: 1 token (Decode)] │
│ + [Req_C: 1 token (Decode)] + [Req_D: 500 tokens (Prefill)] │
│ ──────────────────────────────────────────────────────────── │
│ = 1502 tokens 一次性送入模型 │
│ │
│ 优势: Prefill 的计算密集型操作和 Decode 的内存密集型操作 │
│ 可以互补,提高 GPU 利用率 │
└──────────────────────────────────────────────────────────────────┘

2.3.3 调度器的每步循环

flowchart LR
    subgraph "Continuous Batching 循环"
        A[调度器 schedule] --> B[选择本轮请求]
        B --> C[分配 token 预算]
        C --> D[Model Runner 执行]
        D --> E[更新请求状态]
        E --> F{有完成的?}
        F -->|是| G[移出 running]
        F -->|否| H{有新请求?}
        G --> H
        H -->|是| I[加入 waiting]
        H -->|否| A
        I --> A
    end

2.4 Continuous Batching 的三个关键机制

机制 1:Iteration-level 调度

每次模型前向传播前都重新调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
# engine_core.py 主循环(简化)
while True:
# 1. 每次迭代前调度
scheduler_output = self.scheduler.schedule()

# 2. 执行模型前向传播
model_output = self.model_runner.execute_model(scheduler_output)

# 3. 更新状态,处理完成的请求
engine_outputs = self.scheduler.update_from_output(model_output)

# 4. 返回完成的请求结果,接收新请求
# ... 新请求随时可以加入 ...

机制 2:PagedAttention(分页注意力)

KV Cache 使用类似操作系统虚拟内存的分页机制:

1
2
3
4
5
6
传统方式:每个请求预分配固定大小的连续内存
→ 内存碎片化,浪费严重

PagedAttention:将 KV Cache 分成固定大小的 blocks
→ 按需分配,动态管理
→ 支持请求的动态加入/退出
1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────────────────────────┐
│ PagedAttention 内存布局 │
├─────────────────────────────────────────────────────────────────┤
│ Block Pool (共享): │
│ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐│
│ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │10 │11 │12 │13 │...││
│ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘│
│ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ │
│ └───┴───┘ └───┘ └───┴───┘ │
│ Req_A Req_B Req_C │
│ │
│ 请求完成时:释放其 blocks,新请求可立即使用 │
└─────────────────────────────────────────────────────────────────┘

机制 3:抢占与重调度

当资源不足时,可以抢占低优先级请求:

1
2
3
4
5
6
7
8
9
# 资源不足时的处理
if not enough_kv_cache_blocks:
# 抢占低优先级请求
preempted_req = select_lowest_priority_request()
free_kv_cache(preempted_req)
move_to_waiting(preempted_req)

# 新请求可以使用释放的资源
allocate_for_new_request()

2.5 调度器与 Continuous Batching 的关系总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────────────┐
│ 调度器在 Continuous Batching 中的角色 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 新请求 │ │ 调度器 │ │ Model Runner│ │
│ │ 不断到达 │ ───> │ (大脑) │ ───> │ (执行者) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ │ 核心职责: │
│ │ │
│ ├─ 1. 决定哪些请求本轮执行 │
│ ├─ 2. 分配每个请求的 token 数 │
│ ├─ 3. 管理 KV Cache 资源 │
│ ├─ 4. 处理请求的加入/完成/抢占 │
│ └─ 5. 平衡吞吐量和延迟 │
│ │
│ 调度器使 Continuous Batching 成为可能: │
│ - 没有调度器 → 无法动态管理请求集合 │
│ - 没有调度器 → 无法实现 token 级别的细粒度控制 │
│ - 没有调度器 → 无法协调 KV Cache 的分配和释放 │
│ │
└─────────────────────────────────────────────────────────────────┘

2.6 性能对比

指标 Static Batching Continuous Batching
GPU 利用率 低(等待短请求) 高(始终满载)
请求延迟 高(等待整 batch) 低(即时处理)
吞吐量 受限于最长请求 接近理论最大值
内存效率 低(预分配) 高(按需分配)
实现复杂度 简单 复杂(需要调度器)

3. 调度器核心数据结构

3.1 Request(请求)

request.py 定义了请求的完整生命周期状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
class RequestStatus(enum.IntEnum):
"""请求状态枚举"""
WAITING = enum.auto() # 等待调度
WAITING_FOR_FSM = enum.auto() # 等待结构化输出 FSM 编译
WAITING_FOR_REMOTE_KVS = enum.auto() # 等待远程 KV 传输
RUNNING = enum.auto() # 正在运行
PREEMPTED = enum.auto() # 被抢占
# 以下为完成状态
FINISHED_STOPPED = enum.auto() # 正常停止
FINISHED_LENGTH_CAPPED = enum.auto() # 达到长度上限
FINISHED_ABORTED = enum.auto() # 被中止
FINISHED_IGNORED = enum.auto() # 被忽略
FINISHED_ERROR = enum.auto() # 错误结束

请求的关键属性:

属性 说明
num_prompt_tokens 提示词 token 数量
num_computed_tokens 已计算的 token 数量
num_tokens 当前总 token 数(prompt + output)
num_tokens_with_spec 包含推测解码 token 的总数
spec_token_ids 推测解码的 token IDs
priority 优先级(数值越小越优先)
arrival_time 到达时间

3.2 请求状态流转

stateDiagram-v2
    [*] --> WAITING: 新请求到达
    
    WAITING --> RUNNING: 被调度
    WAITING --> WAITING_FOR_FSM: 需要结构化输出
    WAITING --> WAITING_FOR_REMOTE_KVS: P/D 场景
    
    WAITING_FOR_FSM --> WAITING: FSM编译完成
    WAITING_FOR_REMOTE_KVS --> WAITING: KV传输完成
    
    RUNNING --> PREEMPTED: 被抢占
    PREEMPTED --> WAITING: 重新排队
    
    RUNNING --> FINISHED_STOPPED: 遇到停止条件
    RUNNING --> FINISHED_LENGTH_CAPPED: 达到最大长度
    RUNNING --> FINISHED_ABORTED: 被用户中止

3.3 SchedulerOutput(调度输出)

output.py 定义了调度器的输出结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@dataclass
class SchedulerOutput:
# 首次调度的新请求(需要完整数据)
scheduled_new_reqs: list[NewRequestData]
# 已缓存的请求(只需增量数据)
scheduled_cached_reqs: CachedRequestData

# 每个请求调度的 token 数量
num_scheduled_tokens: dict[str, int] # {req_id: num_tokens}
total_num_scheduled_tokens: int # 总调度 token 数

# 推测解码相关
scheduled_spec_decode_tokens: dict[str, list[int]]

# 编码器输入(多模态)
scheduled_encoder_inputs: dict[str, list[int]]

# 完成的请求 IDs
finished_req_ids: set[str]

# 抢占的请求 IDs
preempted_req_ids: set[str] | None

4. 调度策略与请求队列

4.1 两种调度策略

request_queue.py 实现了两种调度策略:

FCFS(先来先服务)

1
2
3
4
5
6
7
8
class FCFSRequestQueue(deque[Request], RequestQueue):
"""基于 deque 实现的 FCFS 队列"""

def add_request(self, request: Request) -> None:
self.append(request) # 添加到队尾

def pop_request(self) -> Request:
return self.popleft() # 从队首取出

Priority(优先级调度)

1
2
3
4
5
6
7
8
9
10
11
class PriorityRequestQueue(RequestQueue):
"""基于堆实现的优先级队列"""

def __init__(self) -> None:
self._heap: list[Request] = []

def add_request(self, request: Request) -> None:
heapq.heappush(self._heap, request)

def pop_request(self) -> Request:
return heapq.heappop(self._heap)

优先级比较逻辑(Request 类中):

1
2
3
4
5
6
7
8
def __lt__(self, other: "Request") -> bool:
# 优先级数值小的优先
if self.priority != other.priority:
return self.priority < other.priority
# 同优先级,先到达的优先
if self.arrival_time != other.arrival_time:
return self.arrival_time < other.arrival_time
return self.request_id < other.request_id

4.2 队列管理

调度器维护两个核心队列:

1
2
3
# scheduler.py 中
self.waiting = create_request_queue(self.policy) # 等待队列
self.running: list[Request] = [] # 运行队列

队列操作示意图:

1
2
3
4
5
新请求 ──┬──> [waiting 队列] ──调度──> [running 列表] ──完成──> 释放
│ │
│ │ 抢占
│ ↓
└──────────────────────────────┘

5. 调度流程详解

5.1 schedule() 主流程

schedule() 方法是调度器的核心,在每次前向计算前被调用。整体流程如下:

flowchart TD
    A[开始调度] --> B[初始化 token_budget]
    B --> C[调度 RUNNING 请求]
    C --> D{有抢占?}
    D -->|是| G[跳过 WAITING]
    D -->|否| E[调度 WAITING 请求]
    E --> F[构建 SchedulerOutput]
    G --> F
    F --> H[更新状态]
    H --> I[返回输出]

5.2 调度 RUNNING 请求

目的:为已经在运行的请求分配本轮需要计算的 token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 简化后的核心逻辑
while req_index < len(self.running) and token_budget > 0:
request = self.running[req_index]

# 计算本轮需要处理的 token 数
num_new_tokens = (
request.num_tokens_with_spec # 总 token 数(含推测)
+ request.num_output_placeholders # 异步调度占位符
- request.num_computed_tokens # 已计算的
)

# 应用 chunked prefill 限制
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold

# 不超过 token 预算
num_new_tokens = min(num_new_tokens, token_budget)

# 分配 KV Cache blocks
new_blocks = self.kv_cache_manager.allocate_slots(...)

if new_blocks is None:
# 资源不足,执行抢占
preempted_req = self.running.pop() # 抢占最后一个(FCFS)
self._preempt_request(preempted_req, ...)
else:
# 调度成功
scheduled_running_reqs.append(request)
token_budget -= num_new_tokens

关键点:

  1. Token 计算逻辑num_new_tokens = num_tokens_with_spec - num_computed_tokens
  2. Chunked Prefill:长提示词可以分块处理,避免阻塞
  3. 资源不足时抢占:优先抢占低优先级请求

5.3 调度 WAITING 请求

目的:将等待队列中的新请求加入运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
while self.waiting and token_budget > 0:
# 检查是否达到最大并发数
if len(self.running) == self.max_num_running_reqs:
break

request = self.waiting.peek_request()

# 跳过特殊状态的请求
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
# 异步 KV 传输中
continue
if request.status == RequestStatus.WAITING_FOR_FSM:
# 等待 FSM 编译
continue

# 查找本地缓存命中
new_computed_blocks, num_new_local_computed_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)

# 查找远程缓存命中(P/D 场景)
if self.connector is not None:
ext_tokens, load_kv_async = (
self.connector.get_num_new_matched_tokens(...)
)

# 计算需要处理的 token 数
num_new_tokens = request.num_tokens - num_computed_tokens
num_new_tokens = min(num_new_tokens, token_budget)

# 分配 KV Cache
new_blocks = self.kv_cache_manager.allocate_slots(...)

if new_blocks is None:
break # 资源不足,停止调度

# 成功调度
self.waiting.pop_request()
self.running.append(request)
request.status = RequestStatus.RUNNING

核心流程图:

1
2
3
4
5
6
7
8
9
10
11
WAITING 请求

├─> 检查最大并发数 ─> 超过则停止

├─> 检查特殊状态 ─> 跳过

├─> 查找缓存命中 ─> 减少计算量

├─> 分配 KV Cache ─> 失败则停止

└─> 移入 RUNNING ─> 开始计算

5.4 调度约束检查

调度完成后,验证约束条件:

1
2
3
4
5
6
# 总调度 token 数不超过限制
total_num_scheduled_tokens = sum(num_scheduled_tokens.values())
assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens

# 运行请求数不超过限制
assert len(self.running) <= self.max_num_running_reqs

6. 调度器与 Model Runner 的协作

6.1 核心问题:谁真正处理多个请求?

调度器代码中只是逐个处理请求,并没有看到"同时处理多个请求"的逻辑。这是因为 调度器和 Model Runner 有明确的分工

组件 职责 是否处理多请求
Scheduler 规划每个请求需要计算多少 token ❌ 只是规划
Model Runner 将多个请求的 token 拼接成批次 ✅ 真正拼接
GPU Model 一次前向传播处理整个批次 ✅ 真正并发

6.2 调度器的输出:一份"执行计划"

调度器的核心输出是 num_scheduled_tokens 字典,告诉 Model Runner 每个请求本轮需要计算多少 token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# scheduler.py 中的关键输出
num_scheduled_tokens: dict[str, int] = {}

# 调度 RUNNING 请求时
num_scheduled_tokens[request.request_id] = num_new_tokens

# 调度 WAITING 请求时
num_scheduled_tokens[request.request_id] = num_new_tokens

# 例如,调度完成后:
# {
# "req_A": 1000, # 请求 A 需要计算 1000 个 token(prefill)
# "req_B": 500, # 请求 B 需要计算 500 个 token(prefill)
# "req_C": 1, # 请求 C 需要计算 1 个 token(decode)
# }

total_num_scheduled_tokens = sum(num_scheduled_tokens.values()) # 1501

调度器只是"规划者",不执行实际计算。

6.3 Model Runner:真正的批量执行者

真正将多个请求的 token 拼接成一个大批次的逻辑在 gpu_model_runner.py 中:

6.3.1 _prepare_inputs() 方法:拼接 token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# gpu_model_runner.py

# 生成请求索引
# 例如 num_scheduled_tokens = [1000, 500, 1]
# req_indices = [0, 0, ...(1000个0), 1, 1, ...(500个1), 2]
req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens)

# 计算累积 token 数
# cu_num_tokens = [1000, 1500, 1501]
cu_num_tokens, arange = self._get_cumsum_and_arange(num_scheduled_tokens)

# 计算每个 token 的位置
positions_np = self.positions.np[:total_num_scheduled_tokens]
np.add(
self.input_batch.num_computed_tokens_cpu[req_indices],
arange,
out=positions_np,
)

6.3.2 execute_model() 方法:批量前向传播

1
2
3
4
5
6
7
8
9
10
# gpu_model_runner.py

# 调用模型,一次处理所有请求的所有 token
model_output = self._model_forward(
input_ids=input_ids, # 形状: [total_num_scheduled_tokens]
positions=positions, # 形状: [total_num_scheduled_tokens]
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
**model_kwargs,
)

关键点input_ids 是一个一维张量,包含了所有请求的所有待计算 token!

6.4 完整协作流程

sequenceDiagram
    participant E as Engine Core
    participant S as Scheduler
    participant M as Model Runner
    participant G as GPU Model

    E->>S: schedule()
    Note over S: 规划每个请求的 num_new_tokens
    S-->>E: SchedulerOutput
{req_A: 1000, req_B: 500, req_C: 1} E->>M: execute_model(scheduler_output) Note over M: _prepare_inputs()
将 3 个请求的 token 拼接成
一个 1501 长度的张量 rect rgb(200, 230, 200) Note over M,G: 一次前向传播处理所有 1501 个 token M->>G: model(input_ids=[1501]) G-->>M: logits=[1501] end Note over M: 采样每个请求的下一个 token M-->>E: ModelRunnerOutput E->>S: update_from_output() Note over S: 更新每个请求的 num_computed_tokens

6.5 张量拼接示例

假设有 3 个请求:

请求 总 token 已计算 本次计算
A (prefill) 1000 0 1000
B (prefill) 500 0 500
C (decode) 51 50 1

调度器输出

1
2
num_scheduled_tokens = {"A": 1000, "B": 500, "C": 1}
total_num_scheduled_tokens = 1501

Model Runner 构造的张量

1
2
3
4
5
input_ids = [A的1000个token, B的500个token, C的1个token]
= 一个长度为 1501 的一维张量

positions = [0,1,2,...,999, 0,1,2,...,499, 50]
= 每个 token 在其请求中的位置

模型前向传播

1
2
# 一次调用,处理所有 1501 个 token
logits = model(input_ids, positions, ...) # logits.shape = [1501, vocab_size]

6.6 为什么这样设计?

这种 调度器规划 + Model Runner 执行 的分离设计有以下优势:

  1. 关注点分离

    • 调度器专注于资源管理和请求优先级
    • Model Runner 专注于高效的批量计算
  2. 灵活性

    • 可以独立优化调度策略和执行策略
    • 支持不同的硬件后端(GPU/CPU/TPU)
  3. 性能优化

    • 调度器可以在 CPU 上运行,不阻塞 GPU
    • Model Runner 可以最大化 GPU 利用率

6.7 与 Continuous Batching 的关系

这种设计是 Continuous Batching 的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────────────────────────────┐
│ Continuous Batching 的实现层次 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 概念层:Continuous Batching │
│ ↓ │
│ 策略层:Scheduler(决定 what 和 how much) │
│ ↓ │
│ 执行层:Model Runner(实现 batch 的拼接和执行) │
│ ↓ │
│ 计算层:GPU Model(并行计算所有 token) │
│ │
│ 每一层都是 Continuous Batching 不可或缺的部分: │
│ - Scheduler 实现"动态":请求的加入/退出/抢占 │
│ - Model Runner 实现"批处理":多请求 token 的拼接执行 │
│ - 两者协作实现"连续":每个 iteration 都重新调度 │
│ │
└─────────────────────────────────────────────────────────────────┘

6.8 小结

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────────────┐
│ vLLM 执行流程 │
├─────────────────────────────────────────────────────────────────┤
│ Scheduler (CPU) Model Runner (GPU) │
│ ┌─────────────┐ ┌─────────────────────┐ │
│ │ 决定: │ │ 执行: │ │
│ │ - 哪些请求 │ ────────> │ - 拼接所有 token │ │
│ │ - 多少 token│ 调度输出 │ - 一次前向传播 │ │
│ │ - 分配内存 │ │ - 返回所有 logits │ │
│ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

7. KV Cache 内存管理

7.1 架构概览

graph TB
    subgraph KVCacheManager
        A[KVCacheManager] --> B[KVCacheCoordinator]
        B --> C[SingleTypeKVCacheManager 1]
        B --> D[SingleTypeKVCacheManager 2]
        C --> E[BlockPool]
        D --> E
    end
    
    E --> F[FreeKVCacheBlockQueue]
    E --> G[cached_block_hash_to_block]

7.2 Block 分配流程

kv_cache_manager.py 中的 allocate_slots() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def allocate_slots(
self,
request: Request,
num_new_tokens: int,
num_new_computed_tokens: int = 0,
new_computed_blocks: KVCacheBlocks | None = None,
num_lookahead_tokens: int = 0,
num_external_computed_tokens: int = 0,
...
) -> KVCacheBlocks | None:
"""
Block 布局示意:

|<--已计算-->|<--新缓存命中-->|<--外部缓存-->|<--新计算-->|<--预测-->|
| comp | new_comp | ext_comp | new |lookahead|
|<--- 待计算 --->|
|<---------- 待分配 ---------->|
"""

# 1. 计算总共需要的 token 槽位
num_tokens_need_slot = (
total_computed_tokens + num_new_tokens + num_lookahead_tokens
)

# 2. 检查是否有足够的 free blocks
num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(...)
if not self.has_enough_free_blocks(num_blocks_to_allocate):
return None # 资源不足

# 3. 分配新 blocks
new_blocks = self.coordinator.allocate_new_blocks(...)

# 4. 缓存 blocks(如果启用 prefix caching)
self.coordinator.cache_blocks(request, num_tokens_to_cache)

return self.create_kv_cache_blocks(new_blocks)

7.3 Prefix Caching(前缀缓存)

Prefix Caching 是 vLLM 的重要优化:

1
2
3
4
5
请求 A: "What is the capital of France?"
请求 B: "What is the capital of Germany?"

共享前缀: "What is the capital of "
└── 缓存这部分的 KV Cache

实现方式:

  • 使用 Block Hash 标识相同的前缀
  • 通过 cached_block_hash_to_block 字典快速查找
  • 引用计数管理 block 生命周期
1
2
3
4
5
6
7
# 查找缓存命中
def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
"""查找本地缓存的 blocks"""
return self.coordinator.find_longest_cache_hit(
request.block_hashes,
max_cache_hit_length=request.num_tokens - 1,
)

8. 抢占机制

8.1 触发条件

当 KV Cache 资源不足时,调度器会抢占低优先级请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
while True:
new_blocks = self.kv_cache_manager.allocate_slots(request, num_new_tokens)

if new_blocks is not None:
break # 分配成功

# 分配失败,执行抢占
if self.policy == SchedulingPolicy.PRIORITY:
# 优先级策略:抢占优先级最低的
preempted_req = max(
self.running,
key=lambda r: (r.priority, r.arrival_time),
)
else:
# FCFS 策略:抢占最后一个(最晚到达)
preempted_req = self.running.pop()

self._preempt_request(preempted_req, ...)

8.2 抢占处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _preempt_request(self, request: Request, timestamp: float) -> None:
"""抢占请求并放回等待队列"""

# 1. 释放 KV Cache
self.kv_cache_manager.free(request)

# 2. 释放编码器缓存
self.encoder_cache_manager.free(request)

# 3. 重置状态
request.status = RequestStatus.PREEMPTED
request.num_computed_tokens = 0
request.spec_token_ids.clear()
request.num_preemptions += 1

# 4. 放回等待队列头部(优先重新调度)
self.waiting.prepend_request(request)

抢占流程图:

1
2
3
4
5
6
7
8
9
资源不足

├─> 选择抢占目标(优先级最低/最晚到达)

├─> 释放 KV Cache

├─> 重置请求状态

└─> 放回 waiting 队列头部

9. 特殊场景处理

9.1 Chunked Prefill(分块预填充)

长提示词可以分多次处理,避免阻塞其他请求:

1
2
3
4
5
6
7
# 配置参数
self.scheduler_config.long_prefill_token_threshold # 长预填充阈值
self.scheduler_config.enable_chunked_prefill # 是否启用分块

# 调度逻辑
if 0 < threshold < num_new_tokens:
num_new_tokens = threshold # 截断到阈值

9.2 推测解码(Speculative Decoding)

支持 EAGLE 等推测解码方法:

1
2
3
4
5
6
7
8
9
10
# 推测 token 处理
if request.spec_token_ids:
num_scheduled_spec_tokens = (
num_new_tokens + request.num_computed_tokens
- request.num_tokens
- request.num_output_placeholders
)
if num_scheduled_spec_tokens > 0:
del request.spec_token_ids[num_scheduled_spec_tokens:]
scheduled_spec_decode_tokens[request.request_id] = request.spec_token_ids

9.3 多模态编码器调度

处理图像、音频等多模态输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _try_schedule_encoder_inputs(
self,
request: Request,
num_computed_tokens: int,
num_new_tokens: int,
encoder_compute_budget: int,
) -> tuple[list[int], int, int, list[int]]:
"""
调度需要在当前步骤处理的编码器输入

条件:
- 输出 token 与当前计算范围重叠
- 未被缓存
- 有足够的编码器预算
- 编码器缓存有空间
"""

9.4 P/D 分离(Prefill/Decode Disaggregation)

支持异步 KV 传输:

1
2
3
4
5
6
7
# 等待远程 KV 的请求
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
is_ready = self._update_waiting_for_remote_kv(request)
if not is_ready:
# 仍在传输中,跳过
skipped_waiting_requests.prepend_request(request)
continue

10. 关键配置参数

10.1 调度相关配置

参数 默认值 说明
max_num_seqs 128 最大并发请求数
max_num_batched_tokens 2048 单步最大处理 token 数
policy “fcfs” 调度策略(fcfs/priority)
enable_chunked_prefill False 是否启用分块预填充
long_prefill_token_threshold 0 长预填充阈值

10.2 配置对性能的影响

1
2
3
4
5
max_num_seqs ↑  →  并发度 ↑  →  吞吐量 ↑  →  单请求延迟可能 ↑
→ GPU 内存使用 ↑

max_num_batched_tokens ↑ → 批处理效率 ↑ → 吞吐量 ↑
→ 首 token 延迟可能 ↑

11. 源码导读

11.1 核心文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
vllm/v1/core/
├── sched/
│ ├── scheduler.py # 主调度器实现 (★核心)
│ ├── interface.py # 调度器接口定义
│ ├── output.py # 输出数据结构
│ ├── request_queue.py # 请求队列实现
│ └── utils.py # 工具函数
├── kv_cache_manager.py # KV Cache 管理 (★核心)
├── kv_cache_coordinator.py # 多类型 KV Cache 协调
├── single_type_kv_cache_manager.py # 单类型 KV Cache 管理
├── block_pool.py # Block 池管理
├── kv_cache_utils.py # KV Cache 工具类
└── encoder_cache_manager.py # 编码器缓存管理

11.2 关键方法速查

方法 文件 作用
schedule() scheduler.py 主调度入口
update_from_output() scheduler.py 处理模型输出
add_request() scheduler.py 添加新请求
_preempt_request() scheduler.py 抢占请求
allocate_slots() kv_cache_manager.py 分配 KV Cache
get_computed_blocks() kv_cache_manager.py 获取缓存命中
free() kv_cache_manager.py 释放 KV Cache

11.3 调试建议

  1. 理解调度决策:在 schedule() 方法中添加日志,观察 num_scheduled_tokens 字典
  2. 监控内存使用:查看 kv_cache_manager.usage 属性
  3. 跟踪请求状态:监控 request.status 的变化

11.4 阅读顺序建议

  1. 入门:先看 interface.py,理解调度器的抽象接口
  2. 数据结构:阅读 request.pyoutput.py
  3. 队列管理:学习 request_queue.py
  4. 核心逻辑:深入 scheduler.pyschedule() 方法
  5. 内存管理:研究 kv_cache_manager.py

总结

vLLM v1 调度器的设计体现了以下核心思想:

  1. Continuous Batching:动态添加/移除请求,最大化 GPU 利用率
  2. Token 级别调度:细粒度控制每个请求的计算量
  3. 智能内存管理:Prefix Caching + 抢占机制,平衡吞吐和延迟
  4. 可扩展架构:支持多种调度策略、推测解码、P/D 分离等高级特性
  5. 调度器与 Model Runner 分离:调度器负责规划,Model Runner 负责执行,实现关注点分离