GPU 资源争夺战大模型推理调度实战指南一、显存焦虑当模型精度不再是瓶颈大模型推理服务上线后最先暴露的问题往往不是模型精度而是 GPU 资源不够用。一块 A100 显卡 80GB 显存跑一个 70B 参数的模型KV Cache 占掉一半剩下的空间只能同时服务十几个并发请求。当请求量超过 GPU 的处理能力排队就成了必然。排队的代价是延迟飙升。一个本该 200ms 返回的推理请求在队列里等了 3 秒才开始执行端到端延迟直接膨胀到 3.2 秒。用户体感上3 秒的等待和 200ms 的响应是质变——前者让人怀疑系统是不是挂了。更棘手的是请求的异构性。短文本生成的推理只需 0.5 秒长文本生成可能需要 10 秒。如果用简单的 FIFO 队列一个长文本请求占住 GPU 10 秒后面的短文本请求全部排队。这种大卡车堵住整条车道的现象在推理服务中极为常见。调度策略需要解决三个核心问题优先级区分哪些请求先执行、批处理优化如何把多个请求合并成一个 batch 提升吞吐、以及资源隔离如何防止低优先级任务饿死高优先级任务。二、优先级队列与动态批处理推理调度的核心机制推理调度的核心架构分为三层请求接入层、调度层和执行层。接入层负责接收请求并分配优先级调度层负责排队和批处理决策执行层负责调用推理引擎。graph TB subgraph 接入层 API[API Gateway] P1[高优先级请求br/实时对话] P2[中优先级请求br/文档处理] P3[低优先级请求br/批量推理] end subgraph 调度层 PQ[优先级队列] SCHED[调度器] BP[动态批处理器] end subgraph 执行层 G1[GPU 1br/高优先级专用] G2[GPU 2br/共享池] G3[GPU 3br/共享池] end API -- P1 API -- P2 API -- P3 P1 --|优先级 0| PQ P2 --|优先级 1| PQ P3 --|优先级 2| PQ PQ -- SCHED SCHED -- BP BP --|batch_size4| G1 BP --|batch_size8| G2 BP --|batch_size8| G3 G1 -.-|GPU利用率反馈| SCHED G2 -.-|GPU利用率反馈| SCHED G3 -.-|GPU利用率反馈| SCHED style 接入层 fill:#e3f2fd,stroke:#1976D2,stroke-width:2px style 调度层 fill:#fff3e0,stroke:#F57C00,stroke-width:2px style 执行层 fill:#e8f5e9,stroke:#388E3C,stroke-width:2px优先级队列的设计需要考虑抢占问题。如果高优先级请求持续到来低优先级请求可能永远得不到执行——这就是经典的饥饿问题。解决方案是引入老化机制Aging请求在队列中等待的时间越长其优先级逐步提升。等待超过 30 秒的低优先级请求优先级会自动提升到中等级别保证最终能被执行。动态批处理是提升 GPU 吞吐的关键技术。GPU 的推理吞吐量随 batch size 增大而提升在显存允许的范围内但 batch size 越大单个 batch 的推理时间越长。调度器需要在吞吐量和延迟之间找到平衡点当队列中有大量请求时增大 batch size 提升吞吐当队列中请求较少时减小 batch size 降低延迟。三、生产级推理调度器实现下面实现一个支持优先级调度和动态批处理的推理调度器。核心设计优先级队列 老化机制 自适应批处理。import asyncio import time import uuid import logging from dataclasses import dataclass, field from enum import IntEnum from typing import Any, Callable, Coroutine logger logging.getLogger(__name__) class Priority(IntEnum): 请求优先级数值越小优先级越高 HIGH 0 # 实时对话延迟敏感 MEDIUM 1 # 文档处理延迟中等 LOW 2 # 批量推理延迟不敏感 dataclass class InferenceRequest: 推理请求结构 request_id: str field(default_factorylambda: uuid.uuid4().hex[:12]) priority: Priority Priority.MEDIUM prompt: str max_tokens: int 512 # 请求创建时间用于老化机制计算等待时长 created_at: float field(default_factorytime.monotonic) # 实际执行优先级可能因老化而提升 effective_priority: float 0.0 # 推理结果 result: Any None # 完成事件调用方通过 await event.wait() 等待结果 done_event: asyncio.Event field(default_factoryasyncio.Event) dataclass class SchedulerConfig: 调度器配置 max_batch_size: int 8 # 最大批处理大小 max_queue_size: int 1000 # 最大排队请求数 aging_threshold: float 30.0 # 老化触发阈值秒 aging_boost: float 0.5 # 每超过阈值1秒优先级提升量 batch_timeout: float 0.05 # 批处理等待超时秒50ms # batch_timeout 的权衡太短则 batch 小、吞吐低 # 太长则每个请求多等 50ms延迟增大 class InferenceScheduler: 推理调度器优先级队列 动态批处理 老化机制。 核心调度循环 1. 从优先级队列取出请求 2. 根据当前队列深度决定 batch_size 3. 将 batch 提交给推理引擎 4. 将结果分发给各请求的调用方 def __init__( self, engine: Callable[[list[InferenceRequest]], Coroutine], config: SchedulerConfig | None None, ): self._engine engine self._config config or SchedulerConfig() self._queue: list[InferenceRequest] [] self._running False self._lock asyncio.Lock() self._not_empty asyncio.Event() # 统计指标 self._total_scheduled 0 self._total_completed 0 async def submit(self, request: InferenceRequest) - Any: 提交推理请求阻塞等待结果返回。 为什么用 Event 而不是 Future 因为 Event 更轻量不需要额外的回调机制。 调用方只需 await event.wait()调度器完成后 set() 即可。 if len(self._queue) self._config.max_queue_size: raise RuntimeError( f调度队列已满({self._config.max_queue_size}) f请稍后重试 ) async with self._lock: request.effective_priority float(request.priority) self._queue.append(request) self._total_scheduled 1 self._not_empty.set() # 等待推理完成 await request.done_event.wait() return request.result async def start(self) - None: 启动调度循环 self._running True logger.info(推理调度器启动) while self._running: # 等待队列非空 if not self._queue: self._not_empty.clear() await self._not_empty.wait() if not self._running: break # 应用老化机制并排序 await self._apply_aging() # 取出一个 batch 的请求 batch await self._dequeue_batch() if not batch: continue # 提交给推理引擎 try: results await self._engine(batch) # 将结果分发给各请求 for req, result in zip(batch, results): req.result result req.done_event.set() self._total_completed 1 except Exception as e: logger.error(推理引擎执行失败: %s, e) # 引擎失败所有请求标记失败 for req in batch: req.result {error: str(e)} req.done_event.set() async def _apply_aging(self) - None: 老化机制等待时间超过阈值的请求优先级逐步提升。 为什么用线性提升而不是指数提升 指数提升会导致等待稍长时间的请求瞬间跳到最高优先级 打乱正常的优先级秩序。线性提升更温和 既保证低优先级请求最终被执行又不至于严重干扰高优先级请求。 now time.monotonic() async with self._lock: for req in self._queue: wait_time now - req.created_at if wait_time self._config.aging_threshold: # 超过阈值后每秒提升 0.5 的优先级 boost self._config.aging_boost * ( wait_time - self._config.aging_threshold ) req.effective_priority max( 0.0, float(req.priority) - boost ) # 按 effective_priority 升序排序数值越小越优先 self._queue.sort(keylambda r: r.effective_priority) async def _dequeue_batch( self, ) - list[InferenceRequest]: 从队列中取出一批请求batch_size 根据队列深度自适应调整。 自适应策略 - 队列深度 max_batch_size取全部请求 - 队列深度 max_batch_size取 max_batch_size 个 为什么不固定 batch_size 因为固定 batch_size 在低负载时会导致不必要的等待。 比如队列里只有 2 个请求batch_size8 意味着要等 6 个请求 才能凑满一个 batch这 2 个请求的延迟白白增加了。 async with self._lock: # 自适应 batch_size queue_depth len(self._queue) batch_size min( queue_depth, self._config.max_batch_size ) batch self._queue[:batch_size] self._queue self._queue[batch_size:] if not self._queue: self._not_empty.clear() return batch async def stop(self) - None: 优雅停止调度器 self._running False self._not_empty.set() # 唤醒等待中的调度循环 # 等待队列中的请求处理完成最多等 10 秒 deadline time.monotonic() 10.0 while self._queue and time.monotonic() deadline: await asyncio.sleep(0.1) if self._queue: # 超时仍有未处理请求标记失败 for req in self._queue: req.result {error: scheduler_shutdown} req.done_event.set() logger.warning( 调度器关闭时仍有 %d 个请求未处理, len(self._queue), ) logger.info( 调度器已停止: 调度 %d, 完成 %d, self._total_scheduled, self._total_completed, ) # 使用示例 async def mock_inference_engine( batch: list[InferenceRequest], ) - list[dict]: 模拟推理引擎实际生产中替换为 vLLM / TGI 等推理框架的调用。 为什么用异步接口 因为推理引擎通常是独立的服务进程如 vLLM 的 HTTP Server 调度器通过网络调用推理引擎异步 I/O 避免阻塞事件循环。 # 模拟推理耗时与 batch_size 和 max_tokens 正相关 avg_tokens sum(r.max_tokens for r in batch) / len(batch) latency 0.1 avg_tokens * 0.002 # 粗略估算 await asyncio.sleep(latency) results [] for req in batch: results.append({ request_id: req.request_id, generated_text: f[模拟输出] prompt长度{len(req.prompt)}, usage: { prompt_tokens: len(req.prompt), completion_tokens: req.max_tokens, }, }) return results async def demo(): 调度器使用演示 scheduler InferenceScheduler( enginemock_inference_engine, configSchedulerConfig(max_batch_size4), ) # 启动调度循环后台运行 asyncio.create_task(scheduler.start()) # 提交不同优先级的请求 tasks [] for i in range(10): priority Priority(i % 3) # 交替高/中/低优先级 req InferenceRequest( prioritypriority, promptf测试请求 {i}, max_tokens128, ) tasks.append(scheduler.submit(req)) results await asyncio.gather(*tasks) for r in results: print(f 请求 {r[request_id]}: {r[generated_text]}) await scheduler.stop()四、调度的代价延迟与吞吐的博弈推理调度的核心矛盾是延迟与吞吐的博弈。增大 batch size 可以提升吞吐GPU 并行度更高但单个请求的延迟也随之增加——因为必须等待整个 batch 推理完成才能返回结果。一个 batch 中最长的生成任务决定了整个 batch 的延迟短文本请求被长文本请求拖慢。这个问题的经典解决方案是连续批处理Continuous Batching。不同于静态批处理整个 batch 一起开始、一起结束连续批处理允许新请求在推理过程中动态加入 batch已完成的请求立即返回结果释放位置。vLLM 的 iteration-level scheduling 就是这种机制。但连续批处理的实现复杂度远高于静态批处理调度器需要维护每个请求的独立状态在每次推理迭代后检查哪些请求已完成。资源隔离是另一个容易被忽略的代价。如果所有优先级共享同一组 GPU低优先级的批量任务可能占满显存导致高优先级的实时请求无法调度。生产环境推荐将 GPU 分为专用池和共享池——高优先级请求独占专用池中低优先级请求共享共享池。专用池的 GPU 利用率可能不高因为高优先级请求量通常不大但这是保证实时请求延迟的必要代价。调度器本身也有性能开销。优先级排序的时间复杂度是 O(N log N)当队列中有数万条请求时每次排序可能消耗数十毫秒。优化方案是用堆Heap替代列表将排序开销降到 O(log N)。但在实际场景中队列深度超过 1000 通常意味着系统已经过载应该触发限流而非继续排队。五、总结大模型推理调度需要在延迟、吞吐和公平性之间找到平衡点。优先级队列保证高价值请求优先执行动态批处理提升 GPU 利用率老化机制防止低优先级请求饥饿。三者协同才能在有限的 GPU 资源下实现最优的推理服务体验。落地路线建议从 FIFO 开始先用简单的先进先出队列上线收集请求延迟分布和 GPU 利用率数据引入优先级根据业务需求定义 2-3 个优先级级别逐步替换 FIFO开启动态批处理在 GPU 利用率低于 60% 时开启batch_size 从 4 开始逐步调大部署老化机制监控低优先级请求的最大等待时间超过 30 秒时触发老化资源隔离将 20% 的 GPU 划为高优先级专用池80% 作为共享池根据实际负载动态调整比例