vLLM v1 调度器深度剖析
目录
调度器概述
Continuous Batching 深度解析
核心数据结构
调度策略与请求队列
调度流程详解
调度器与 Model Runner 的协作
KV Cache 内存管理
抢占机制
特殊场景处理
关键配置参数
源码导读
1. 调度器概述
1.1 什么是调度器?
vLLM 的调度器(Scheduler)是整个推理引擎的"大脑",负责决定在每一次前向计算(forward pass)中:
哪些请求 应该被处理
每个请求处理多少 token
如何分配 GPU 内存(KV Cache)
1.2 Continuous Batching 核心思想
vLLM 采用 Continuous Batching(连续批处理) 策略,其核心思想是:
1 2 3 4 5 传统批处理: [Req1 完成] -> [Req2 完成] -> [Req3 完成] ↓ 等待 ↓ ↓ 等待 ↓ Continuous Batching: [Req1, Req2, Req3] 同时处理 Req1 完成后立即插入 Req4
这种方式的优势:
GPU 利用率更高 :不需要等待整个 batch 完成
延迟更低 :新请求可以立即加入
吞吐量更大 :并发处理多个请求
1.3 调度器架构
graph TB
subgraph Engine["Engine Core"]
A[API Server] --> B[EngineCoreClient]
B --> C[Scheduler]
C --> D[Model Runner]
end
subgraph Scheduler["调度器内部"]
C --> E[waiting 队列]
C --> F[running 队列]
C --> G[KVCacheManager]
C --> H[EncoderCacheManager]
end
E -->|调度| F
F -->|完成/抢占| E
2. Continuous Batching 深度解析
2.1 传统 Static Batching 的问题
在传统的 LLM 推理中,使用 Static Batching(静态批处理) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ┌─────────────────────────────────────────────────────────────────┐ │ Static Batching 示例 │ ├─────────────────────────────────────────────────────────────────┤ │ Batch 1: │ │ Req1: [████████████████████████████] 生成 30 tokens │ │ Req2: [████████░░░░░░░░░░░░░░░░░░░░] 生成 10 tokens, 等待 20 │ │ Req3: [██████████████░░░░░░░░░░░░░░] 生成 15 tokens, 等待 15 │ │ ↑ │ │ 等 Req1 完成后,整个 batch 才能释放 │ │ │ │ Batch 2: (必须等 Batch 1 全部完成) │ │ Req4: [████████████████████] │ │ Req5: [████████████] │ └─────────────────────────────────────────────────────────────────┘ 问题: 1. Req2, Req3 提前完成,但 GPU 资源被浪费在"等待"上 2. Req4, Req5 必须等待整个 Batch 1 完成才能开始 3. GPU 利用率低,吞吐量受限
2.2 Continuous Batching 的解决方案
vLLM 采用 Continuous Batching(连续批处理) ,也称为 Iteration-level Batching :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ┌─────────────────────────────────────────────────────────────────┐ │ Continuous Batching 示例 │ ├─────────────────────────────────────────────────────────────────┤ │ Step 1: [Req1, Req2, Req3] 一起处理 │ │ Step 2: [Req1, Req2, Req3] 继续处理 │ │ Step 3: [Req1, Req2, Req3] Req2 完成! → 移出 │ │ Step 4: [Req1, Req3, Req4] Req4 立即加入! ← 新请求 │ │ Step 5: [Req1, Req3, Req4] Req3 完成! → 移出 │ │ Step 6: [Req1, Req4, Req5] Req5 立即加入! ← 新请求 │ │ ... │ └─────────────────────────────────────────────────────────────────┘ 优势: 1. 请求完成后立即释放资源,新请求立即加入 2. GPU 始终保持高利用率 3. 整体吞吐量大幅提升
2.3 调度器如何实现 Continuous Batching
调度器是 Continuous Batching 的核心实现者 。它在每一个推理步骤(iteration)中:
2.3.1 动态管理请求集合
1 2 3 4 5 6 7 8 self.waiting: RequestQueue = ... self.running: list [Request] = []
2.3.2 Token 级别的细粒度调度
这是 Continuous Batching 的关键!调度器不是按"请求"粒度调度,而是按"token"粒度:
1 2 3 4 5 6 7 num_scheduled_tokens = { "req_A" : 1000 , "req_B" : 1 , "req_C" : 1 , "req_D" : 500 , }
这意味着不同阶段的请求可以混合处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ┌──────────────────────────────────────────────────────────────────┐ │ Prefill 和 Decode 混合批处理 │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ 单次前向传播中: │ │ │ │ [Req_A: 1000 tokens (Prefill)] + [Req_B: 1 token (Decode)] │ │ + [Req_C: 1 token (Decode)] + [Req_D: 500 tokens (Prefill)] │ │ ──────────────────────────────────────────────────────────── │ │ = 1502 tokens 一次性送入模型 │ │ │ │ 优势: Prefill 的计算密集型操作和 Decode 的内存密集型操作 │ │ 可以互补,提高 GPU 利用率 │ └──────────────────────────────────────────────────────────────────┘
2.3.3 调度器的每步循环
flowchart LR
subgraph "Continuous Batching 循环"
A[调度器 schedule] --> B[选择本轮请求]
B --> C[分配 token 预算]
C --> D[Model Runner 执行]
D --> E[更新请求状态]
E --> F{有完成的?}
F -->|是| G[移出 running]
F -->|否| H{有新请求?}
G --> H
H -->|是| I[加入 waiting]
H -->|否| A
I --> A
end
2.4 Continuous Batching 的三个关键机制
机制 1:Iteration-level 调度
每次模型前向传播前都重新调度:
1 2 3 4 5 6 7 8 9 10 11 12 13 while True : scheduler_output = self.scheduler.schedule() model_output = self.model_runner.execute_model(scheduler_output) engine_outputs = self.scheduler.update_from_output(model_output)
机制 2:PagedAttention(分页注意力)
KV Cache 使用类似操作系统虚拟内存的分页机制:
1 2 3 4 5 6 传统方式:每个请求预分配固定大小的连续内存 → 内存碎片化,浪费严重 PagedAttention:将 KV Cache 分成固定大小的 blocks → 按需分配,动态管理 → 支持请求的动态加入/退出
1 2 3 4 5 6 7 8 9 10 11 12 13 ┌─────────────────────────────────────────────────────────────────┐ │ PagedAttention 内存布局 │ ├─────────────────────────────────────────────────────────────────┤ │ Block Pool (共享): │ │ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐│ │ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │10 │11 │12 │13 │...││ │ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘│ │ ↑ ↑ ↑ ↑ ↑ ↑ ↑ ↑ │ │ └───┴───┘ └───┘ └───┴───┘ │ │ Req_A Req_B Req_C │ │ │ │ 请求完成时:释放其 blocks,新请求可立即使用 │ └─────────────────────────────────────────────────────────────────┘
机制 3:抢占与重调度
当资源不足时,可以抢占低优先级请求:
1 2 3 4 5 6 7 8 9 if not enough_kv_cache_blocks: preempted_req = select_lowest_priority_request() free_kv_cache(preempted_req) move_to_waiting(preempted_req) allocate_for_new_request()
2.5 调度器与 Continuous Batching 的关系总结
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ┌─────────────────────────────────────────────────────────────────┐ │ 调度器在 Continuous Batching 中的角色 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 新请求 │ │ 调度器 │ │ Model Runner│ │ │ │ 不断到达 │ ───> │ (大脑) │ ───> │ (执行者) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ 核心职责: │ │ │ │ │ ├─ 1. 决定哪些请求本轮执行 │ │ ├─ 2. 分配每个请求的 token 数 │ │ ├─ 3. 管理 KV Cache 资源 │ │ ├─ 4. 处理请求的加入/完成/抢占 │ │ └─ 5. 平衡吞吐量和延迟 │ │ │ │ 调度器使 Continuous Batching 成为可能: │ │ - 没有调度器 → 无法动态管理请求集合 │ │ - 没有调度器 → 无法实现 token 级别的细粒度控制 │ │ - 没有调度器 → 无法协调 KV Cache 的分配和释放 │ │ │ └─────────────────────────────────────────────────────────────────┘
2.6 性能对比
指标
Static Batching
Continuous Batching
GPU 利用率
低(等待短请求)
高(始终满载)
请求延迟
高(等待整 batch)
低(即时处理)
吞吐量
受限于最长请求
接近理论最大值
内存效率
低(预分配)
高(按需分配)
实现复杂度
简单
复杂(需要调度器)
3. 调度器核心数据结构
3.1 Request(请求)
request.py 定义了请求的完整生命周期状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 class RequestStatus (enum.IntEnum): """请求状态枚举""" WAITING = enum.auto() WAITING_FOR_FSM = enum.auto() WAITING_FOR_REMOTE_KVS = enum.auto() RUNNING = enum.auto() PREEMPTED = enum.auto() FINISHED_STOPPED = enum.auto() FINISHED_LENGTH_CAPPED = enum.auto() FINISHED_ABORTED = enum.auto() FINISHED_IGNORED = enum.auto() FINISHED_ERROR = enum.auto()
请求的关键属性:
属性
说明
num_prompt_tokens
提示词 token 数量
num_computed_tokens
已计算的 token 数量
num_tokens
当前总 token 数(prompt + output)
num_tokens_with_spec
包含推测解码 token 的总数
spec_token_ids
推测解码的 token IDs
priority
优先级(数值越小越优先)
arrival_time
到达时间
3.2 请求状态流转
stateDiagram-v2
[*] --> WAITING: 新请求到达
WAITING --> RUNNING: 被调度
WAITING --> WAITING_FOR_FSM: 需要结构化输出
WAITING --> WAITING_FOR_REMOTE_KVS: P/D 场景
WAITING_FOR_FSM --> WAITING: FSM编译完成
WAITING_FOR_REMOTE_KVS --> WAITING: KV传输完成
RUNNING --> PREEMPTED: 被抢占
PREEMPTED --> WAITING: 重新排队
RUNNING --> FINISHED_STOPPED: 遇到停止条件
RUNNING --> FINISHED_LENGTH_CAPPED: 达到最大长度
RUNNING --> FINISHED_ABORTED: 被用户中止
3.3 SchedulerOutput(调度输出)
output.py 定义了调度器的输出结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @dataclass class SchedulerOutput : scheduled_new_reqs: list [NewRequestData] scheduled_cached_reqs: CachedRequestData num_scheduled_tokens: dict [str , int ] total_num_scheduled_tokens: int scheduled_spec_decode_tokens: dict [str , list [int ]] scheduled_encoder_inputs: dict [str , list [int ]] finished_req_ids: set [str ] preempted_req_ids: set [str ] | None
4. 调度策略与请求队列
4.1 两种调度策略
request_queue.py 实现了两种调度策略:
FCFS(先来先服务)
1 2 3 4 5 6 7 8 class FCFSRequestQueue (deque[Request], RequestQueue): """基于 deque 实现的 FCFS 队列""" def add_request (self, request: Request ) -> None : self.append(request) def pop_request (self ) -> Request: return self.popleft()
Priority(优先级调度)
1 2 3 4 5 6 7 8 9 10 11 class PriorityRequestQueue (RequestQueue ): """基于堆实现的优先级队列""" def __init__ (self ) -> None : self._heap: list [Request] = [] def add_request (self, request: Request ) -> None : heapq.heappush(self._heap, request) def pop_request (self ) -> Request: return heapq.heappop(self._heap)
优先级比较逻辑 (Request 类中):
1 2 3 4 5 6 7 8 def __lt__ (self, other: "Request" ) -> bool : if self.priority != other.priority: return self.priority < other.priority if self.arrival_time != other.arrival_time: return self.arrival_time < other.arrival_time return self.request_id < other.request_id
4.2 队列管理
调度器维护两个核心队列:
1 2 3 self.waiting = create_request_queue(self.policy) self.running: list [Request] = []
队列操作示意图:
1 2 3 4 5 新请求 ──┬──> [waiting 队列] ──调度──> [running 列表] ──完成──> 释放 │ │ │ │ 抢占 │ ↓ └──────────────────────────────┘
5. 调度流程详解
5.1 schedule() 主流程
schedule() 方法是调度器的核心,在每次前向计算前被调用。整体流程如下:
flowchart TD
A[开始调度] --> B[初始化 token_budget]
B --> C[调度 RUNNING 请求]
C --> D{有抢占?}
D -->|是| G[跳过 WAITING]
D -->|否| E[调度 WAITING 请求]
E --> F[构建 SchedulerOutput]
G --> F
F --> H[更新状态]
H --> I[返回输出]
5.2 调度 RUNNING 请求
目的 :为已经在运行的请求分配本轮需要计算的 token
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 while req_index < len (self.running) and token_budget > 0 : request = self.running[req_index] num_new_tokens = ( request.num_tokens_with_spec + request.num_output_placeholders - request.num_computed_tokens ) if 0 < threshold < num_new_tokens: num_new_tokens = threshold num_new_tokens = min (num_new_tokens, token_budget) new_blocks = self.kv_cache_manager.allocate_slots(...) if new_blocks is None : preempted_req = self.running.pop() self._preempt_request(preempted_req, ...) else : scheduled_running_reqs.append(request) token_budget -= num_new_tokens
关键点:
Token 计算逻辑 :num_new_tokens = num_tokens_with_spec - num_computed_tokens
Chunked Prefill :长提示词可以分块处理,避免阻塞
资源不足时抢占 :优先抢占低优先级请求
5.3 调度 WAITING 请求
目的 :将等待队列中的新请求加入运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 while self.waiting and token_budget > 0 : if len (self.running) == self.max_num_running_reqs: break request = self.waiting.peek_request() if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: continue if request.status == RequestStatus.WAITING_FOR_FSM: continue new_computed_blocks, num_new_local_computed_tokens = ( self.kv_cache_manager.get_computed_blocks(request) ) if self.connector is not None : ext_tokens, load_kv_async = ( self.connector.get_num_new_matched_tokens(...) ) num_new_tokens = request.num_tokens - num_computed_tokens num_new_tokens = min (num_new_tokens, token_budget) new_blocks = self.kv_cache_manager.allocate_slots(...) if new_blocks is None : break self.waiting.pop_request() self.running.append(request) request.status = RequestStatus.RUNNING
核心流程图:
1 2 3 4 5 6 7 8 9 10 11 WAITING 请求 │ ├─> 检查最大并发数 ─> 超过则停止 │ ├─> 检查特殊状态 ─> 跳过 │ ├─> 查找缓存命中 ─> 减少计算量 │ ├─> 分配 KV Cache ─> 失败则停止 │ └─> 移入 RUNNING ─> 开始计算
5.4 调度约束检查
调度完成后,验证约束条件:
1 2 3 4 5 6 total_num_scheduled_tokens = sum (num_scheduled_tokens.values()) assert total_num_scheduled_tokens <= self.max_num_scheduled_tokensassert len (self.running) <= self.max_num_running_reqs
6. 调度器与 Model Runner 的协作
6.1 核心问题:谁真正处理多个请求?
调度器代码中只是逐个处理请求,并没有看到"同时处理多个请求"的逻辑。这是因为 调度器和 Model Runner 有明确的分工 :
组件
职责
是否处理多请求
Scheduler
规划每个请求需要计算多少 token
❌ 只是规划
Model Runner
将多个请求的 token 拼接成批次
✅ 真正拼接
GPU Model
一次前向传播处理整个批次
✅ 真正并发
6.2 调度器的输出:一份"执行计划"
调度器的核心输出是 num_scheduled_tokens 字典,告诉 Model Runner 每个请求本轮需要计算多少 token:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 num_scheduled_tokens: dict [str , int ] = {} num_scheduled_tokens[request.request_id] = num_new_tokens num_scheduled_tokens[request.request_id] = num_new_tokens total_num_scheduled_tokens = sum (num_scheduled_tokens.values())
调度器只是"规划者",不执行实际计算。
6.3 Model Runner:真正的批量执行者
真正将多个请求的 token 拼接成一个大批次的逻辑在 gpu_model_runner.py 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens) cu_num_tokens, arange = self._get_cumsum_and_arange(num_scheduled_tokens) positions_np = self.positions.np[:total_num_scheduled_tokens] np.add( self.input_batch.num_computed_tokens_cpu[req_indices], arange, out=positions_np, )
6.3.2 execute_model() 方法:批量前向传播
1 2 3 4 5 6 7 8 9 10 model_output = self._model_forward( input_ids=input_ids, positions=positions, intermediate_tensors=intermediate_tensors, inputs_embeds=inputs_embeds, **model_kwargs, )
关键点 :input_ids 是一个一维张量,包含了所有请求的所有待计算 token!
6.4 完整协作流程
sequenceDiagram
participant E as Engine Core
participant S as Scheduler
participant M as Model Runner
participant G as GPU Model
E->>S: schedule()
Note over S: 规划每个请求的 num_new_tokens
S-->>E: SchedulerOutput {req_A: 1000, req_B: 500, req_C: 1}
E->>M: execute_model(scheduler_output)
Note over M: _prepare_inputs() 将 3 个请求的 token 拼接成 一个 1501 长度的张量
rect rgb(200, 230, 200)
Note over M,G: 一次前向传播处理所有 1501 个 token
M->>G: model(input_ids=[1501])
G-->>M: logits=[1501]
end
Note over M: 采样每个请求的下一个 token
M-->>E: ModelRunnerOutput
E->>S: update_from_output()
Note over S: 更新每个请求的 num_computed_tokens
6.5 张量拼接示例
假设有 3 个请求:
请求
总 token
已计算
本次计算
A (prefill)
1000
0
1000
B (prefill)
500
0
500
C (decode)
51
50
1
调度器输出 :
1 2 num_scheduled_tokens = {"A" : 1000 , "B" : 500 , "C" : 1 } total_num_scheduled_tokens = 1501
Model Runner 构造的张量 :
1 2 3 4 5 input_ids = [A的1000个token, B的500个token, C的1个token] = 一个长度为 1501 的一维张量 positions = [0,1,2,...,999, 0,1,2,...,499, 50] = 每个 token 在其请求中的位置
模型前向传播 :
1 2 logits = model(input_ids, positions, ...)
6.6 为什么这样设计?
这种 调度器规划 + Model Runner 执行 的分离设计有以下优势:
关注点分离 :
调度器专注于资源管理和请求优先级
Model Runner 专注于高效的批量计算
灵活性 :
可以独立优化调度策略和执行策略
支持不同的硬件后端(GPU/CPU/TPU)
性能优化 :
调度器可以在 CPU 上运行,不阻塞 GPU
Model Runner 可以最大化 GPU 利用率
6.7 与 Continuous Batching 的关系
这种设计是 Continuous Batching 的具体实现 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ┌─────────────────────────────────────────────────────────────────┐ │ Continuous Batching 的实现层次 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 概念层:Continuous Batching │ │ ↓ │ │ 策略层:Scheduler(决定 what 和 how much) │ │ ↓ │ │ 执行层:Model Runner(实现 batch 的拼接和执行) │ │ ↓ │ │ 计算层:GPU Model(并行计算所有 token) │ │ │ │ 每一层都是 Continuous Batching 不可或缺的部分: │ │ - Scheduler 实现"动态":请求的加入/退出/抢占 │ │ - Model Runner 实现"批处理":多请求 token 的拼接执行 │ │ - 两者协作实现"连续":每个 iteration 都重新调度 │ │ │ └─────────────────────────────────────────────────────────────────┘
6.8 小结
1 2 3 4 5 6 7 8 9 10 11 ┌─────────────────────────────────────────────────────────────────┐ │ vLLM 执行流程 │ ├─────────────────────────────────────────────────────────────────┤ │ Scheduler (CPU) Model Runner (GPU) │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ 决定: │ │ 执行: │ │ │ │ - 哪些请求 │ ────────> │ - 拼接所有 token │ │ │ │ - 多少 token│ 调度输出 │ - 一次前向传播 │ │ │ │ - 分配内存 │ │ - 返回所有 logits │ │ │ └─────────────┘ └─────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘
7. KV Cache 内存管理
7.1 架构概览
graph TB
subgraph KVCacheManager
A[KVCacheManager] --> B[KVCacheCoordinator]
B --> C[SingleTypeKVCacheManager 1]
B --> D[SingleTypeKVCacheManager 2]
C --> E[BlockPool]
D --> E
end
E --> F[FreeKVCacheBlockQueue]
E --> G[cached_block_hash_to_block]
7.2 Block 分配流程
kv_cache_manager.py 中的 allocate_slots() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def allocate_slots ( self, request: Request, num_new_tokens: int , num_new_computed_tokens: int = 0 , new_computed_blocks: KVCacheBlocks | None = None , num_lookahead_tokens: int = 0 , num_external_computed_tokens: int = 0 , ... ) -> KVCacheBlocks | None : """ Block 布局示意: |<--已计算-->|<--新缓存命中-->|<--外部缓存-->|<--新计算-->|<--预测-->| | comp | new_comp | ext_comp | new |lookahead| |<--- 待计算 --->| |<---------- 待分配 ---------->| """ num_tokens_need_slot = ( total_computed_tokens + num_new_tokens + num_lookahead_tokens ) num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(...) if not self.has_enough_free_blocks(num_blocks_to_allocate): return None new_blocks = self.coordinator.allocate_new_blocks(...) self.coordinator.cache_blocks(request, num_tokens_to_cache) return self.create_kv_cache_blocks(new_blocks)
7.3 Prefix Caching(前缀缓存)
Prefix Caching 是 vLLM 的重要优化:
1 2 3 4 5 请求 A: "What is the capital of France?" 请求 B: "What is the capital of Germany?" 共享前缀: "What is the capital of " └── 缓存这部分的 KV Cache
实现方式:
使用 Block Hash 标识相同的前缀
通过 cached_block_hash_to_block 字典快速查找
引用计数管理 block 生命周期
1 2 3 4 5 6 7 def get_computed_blocks (self, request: Request ) -> tuple [KVCacheBlocks, int ]: """查找本地缓存的 blocks""" return self.coordinator.find_longest_cache_hit( request.block_hashes, max_cache_hit_length=request.num_tokens - 1 , )
8. 抢占机制
8.1 触发条件
当 KV Cache 资源不足时,调度器会抢占低优先级请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 while True : new_blocks = self.kv_cache_manager.allocate_slots(request, num_new_tokens) if new_blocks is not None : break if self.policy == SchedulingPolicy.PRIORITY: preempted_req = max ( self.running, key=lambda r: (r.priority, r.arrival_time), ) else : preempted_req = self.running.pop() self._preempt_request(preempted_req, ...)
8.2 抢占处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def _preempt_request (self, request: Request, timestamp: float ) -> None : """抢占请求并放回等待队列""" self.kv_cache_manager.free(request) self.encoder_cache_manager.free(request) request.status = RequestStatus.PREEMPTED request.num_computed_tokens = 0 request.spec_token_ids.clear() request.num_preemptions += 1 self.waiting.prepend_request(request)
抢占流程图:
1 2 3 4 5 6 7 8 9 资源不足 │ ├─> 选择抢占目标(优先级最低/最晚到达) │ ├─> 释放 KV Cache │ ├─> 重置请求状态 │ └─> 放回 waiting 队列头部
9. 特殊场景处理
9.1 Chunked Prefill(分块预填充)
长提示词可以分多次处理,避免阻塞其他请求:
1 2 3 4 5 6 7 self.scheduler_config.long_prefill_token_threshold self.scheduler_config.enable_chunked_prefill if 0 < threshold < num_new_tokens: num_new_tokens = threshold
9.2 推测解码(Speculative Decoding)
支持 EAGLE 等推测解码方法:
1 2 3 4 5 6 7 8 9 10 if request.spec_token_ids: num_scheduled_spec_tokens = ( num_new_tokens + request.num_computed_tokens - request.num_tokens - request.num_output_placeholders ) if num_scheduled_spec_tokens > 0 : del request.spec_token_ids[num_scheduled_spec_tokens:] scheduled_spec_decode_tokens[request.request_id] = request.spec_token_ids
9.3 多模态编码器调度
处理图像、音频等多模态输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _try_schedule_encoder_inputs ( self, request: Request, num_computed_tokens: int , num_new_tokens: int , encoder_compute_budget: int , ) -> tuple [list [int ], int , int , list [int ]]: """ 调度需要在当前步骤处理的编码器输入 条件: - 输出 token 与当前计算范围重叠 - 未被缓存 - 有足够的编码器预算 - 编码器缓存有空间 """
9.4 P/D 分离(Prefill/Decode Disaggregation)
支持异步 KV 传输:
1 2 3 4 5 6 7 if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: is_ready = self._update_waiting_for_remote_kv(request) if not is_ready: skipped_waiting_requests.prepend_request(request) continue
10. 关键配置参数
10.1 调度相关配置
参数
默认值
说明
max_num_seqs
128
最大并发请求数
max_num_batched_tokens
2048
单步最大处理 token 数
policy
“fcfs”
调度策略(fcfs/priority)
enable_chunked_prefill
False
是否启用分块预填充
long_prefill_token_threshold
0
长预填充阈值
10.2 配置对性能的影响
1 2 3 4 5 max_num_seqs ↑ → 并发度 ↑ → 吞吐量 ↑ → 单请求延迟可能 ↑ → GPU 内存使用 ↑ max_num_batched_tokens ↑ → 批处理效率 ↑ → 吞吐量 ↑ → 首 token 延迟可能 ↑
11. 源码导读
11.1 核心文件结构
1 2 3 4 5 6 7 8 9 10 11 12 13 vllm/v1/core/ ├── sched/ │ ├── scheduler.py # 主调度器实现 (★核心) │ ├── interface.py # 调度器接口定义 │ ├── output.py # 输出数据结构 │ ├── request_queue.py # 请求队列实现 │ └── utils.py # 工具函数 ├── kv_cache_manager.py # KV Cache 管理 (★核心) ├── kv_cache_coordinator.py # 多类型 KV Cache 协调 ├── single_type_kv_cache_manager.py # 单类型 KV Cache 管理 ├── block_pool.py # Block 池管理 ├── kv_cache_utils.py # KV Cache 工具类 └── encoder_cache_manager.py # 编码器缓存管理
11.2 关键方法速查
方法
文件
作用
schedule()
scheduler.py
主调度入口
update_from_output()
scheduler.py
处理模型输出
add_request()
scheduler.py
添加新请求
_preempt_request()
scheduler.py
抢占请求
allocate_slots()
kv_cache_manager.py
分配 KV Cache
get_computed_blocks()
kv_cache_manager.py
获取缓存命中
free()
kv_cache_manager.py
释放 KV Cache
11.3 调试建议
理解调度决策 :在 schedule() 方法中添加日志,观察 num_scheduled_tokens 字典
监控内存使用 :查看 kv_cache_manager.usage 属性
跟踪请求状态 :监控 request.status 的变化
11.4 阅读顺序建议
入门 :先看 interface.py ,理解调度器的抽象接口
数据结构 :阅读 request.py 和 output.py
队列管理 :学习 request_queue.py
核心逻辑 :深入 scheduler.py 的 schedule() 方法
内存管理 :研究 kv_cache_manager.py
总结
vLLM v1 调度器的设计体现了以下核心思想:
Continuous Batching :动态添加/移除请求,最大化 GPU 利用率
Token 级别调度 :细粒度控制每个请求的计算量
智能内存管理 :Prefix Caching + 抢占机制,平衡吞吐和延迟
可扩展架构 :支持多种调度策略、推测解码、P/D 分离等高级特性
调度器与 Model Runner 分离 :调度器负责规划,Model Runner 负责执行,实现关注点分离