SWPipeline API Reference
PyTorch / CUDA 通用软件流水线框架 ·General-purpose Software Pipeline for PyTorch / CUDA · sw_pipeline.py
· ← 回到 SWPipeline 总览← Back to SWPipeline Overview
数据类Data Classes
batch 是 next(data_iter) 的原始数据,task 通过 ctx.xxx = ... 传递中间结果,通过 del ctx.xxx 提前释放显存。每个 in-flight 迭代拥有独立的 IterContext,避免跨迭代数据竞争。
Per-iteration context passed to every task function. batch is raw data from next(data_iter). Tasks pass intermediate results via ctx.xxx = ... and free memory via del ctx.xxx. Each in-flight iteration gets its own IterContext to avoid cross-iteration data hazards.
| 属性Attr | 类型Type | 说明Description |
|---|---|---|
| batch | Any | 数据迭代器产出的原始 batchRaw batch from data iterator |
| iter_idx | int | 0-based 迭代编号0-based iteration index |
| 任意arbitrary | Any | Task 动态附加:ctx.X = ...Dynamically attached by tasks: ctx.X = ... |
def copy_batch(ctx: IterContext) -> None:
ctx.x_gpu = ctx.batch["x"].to("cuda", non_blocking=True)
ctx.y_gpu = ctx.batch["y"].to("cuda", non_blocking=True)
def forward(ctx: IterContext) -> None:
ctx.logits = model(ctx.x_gpu)
del ctx.x_gpu # free GPU memory early
class DeclaredIO
IterContext 以外共享状态(外部副作用)的读写合约。框架自动在缓存时调用 capture、在 shortcut 回放时调用 restore。返回值经 _cache_val 递归 detach+clone 所有 Tensor。
Declares a Task's read/write contract with shared state outside IterContext (external side effects). The framework calls capture during caching and restore during shortcut replay. Return values go through _cache_val (recursive tensor detach+clone).
| 字段Field | 类型Type | 说明Description |
|---|---|---|
| capture | Callable[[], Any] | Task 执行后调用。快照外部状态,返回可缓存值Called after Task runs. Snapshots external state, returns cacheable value |
| restore | Callable[[Any], None] | Shortcut 时调用。接收 _restore_val 后的值,写回外部状态Called during shortcut. Receives _restore_val output, writes back external state |
shared_buffer = {}
PipelineTask("encode", encode_fn, io=[
DeclaredIO(
capture=lambda: dict(shared_buffer),
restore=lambda s: shared_buffer.update(s),
),
])
class PipelineTask
PipelinePlan 的 schedule 字典声明。__hash__ 和 __eq__ 基于 name,允许直接作为 schedule 的 key。
One schedulable computation unit. Declares only computation identity (name + fn + io), not scheduling. Scheduling is in PipelinePlan.schedule. __hash__ and __eq__ are based on name, so tasks can serve as schedule keys.
| 字段Field | 类型Type | 说明Description |
|---|---|---|
| name | str | 全局唯一标识(也用于 NVTX tag)Globally unique identifier (also NVTX tag) |
| fn | Callable[[IterContext], None] | 计算函数,读写 ctx 属性Compute function, reads/writes ctx attrs |
| io | List[DeclaredIO] | 外部副作用声明(默认 [])Side effect declarations (default []) |
h2d = PipelineTask("CopyBatch", copy_batch_fn)
fwd = PipelineTask("Forward", forward_fn)
bwd = PipelineTask("Backward", backward_fn)
# with DeclaredIO for tasks with external side effects
emb = PipelineTask("EmbLookup", emb_fn, io=[
DeclaredIO(capture=capture_emb, restore=restore_emb),
])
class TaskSchedule
PipelinePlan 中的调度配置。所有属性有默认值——最简使用只需设 stage。
Per-task scheduling configuration in a PipelinePlan. All fields have defaults — simplest usage only needs stage.
| 字段Field | 类型Type | 默认Default | 说明Description |
|---|---|---|---|
| stage | int | 0 | Pipeline stage(stage k → 处理 iter i-k 的数据)Pipeline stage (stage k → processes iter i-k data) |
| stream | Optional[Stream] | None | None = default streamNone = default stream |
| thread_group | str | "default" | 所属线程组。同组 Task 串行执行Thread group. Same-group tasks execute serially |
| globally_ordered | bool | False | True → _SubmissionSequencer 保证跨 rank 顺序(防 NCCL 死锁)True → _SubmissionSequencer ensures cross-rank order (prevents NCCL deadlocks) |
TaskSchedule(stage=0, stream=memcpy_stream, thread_group="io")
TaskSchedule(stage=1, globally_ordered=True) # NCCL task
TaskSchedule(stage=2) # default stream, default thread
class PipelinePlan
ShardingPlan——将"做什么"和"怎么调度"分离。schedule 的 key 即 task 集合——无需单独传 task 列表。intra_iter_deps 声明迭代内依赖,inter_iter_deps 声明跨迭代依赖。
Complete scheduling plan. Analogous to TorchRec ShardingPlan — separates "what" from "how". schedule keys define the task set. intra_iter_deps declares intra-iteration deps, inter_iter_deps declares cross-iteration deps.
| 字段Field | 类型Type | 说明Description |
|---|---|---|
| schedule | Dict[TaskRef, TaskSchedule] | Task → 调度配置映射。key 定义任务集合Task → scheduling config. Keys define task set |
| intra_iter_deps | List[Tuple[TaskRef, TaskRef]] | 迭代内依赖 (task, depends_on)Intra-iteration deps (task, depends_on) |
| inter_iter_deps | List[Tuple[…]] | 跨迭代依赖 (task_i, task_i-1)(默认 [])Cross-iteration deps (task_i, task_i-1) (default []) |
| pipeline_depth | int | 流水线深度,等于 max(stage) + 1。1 = serial,2+ = pipelined(默认 2)Pipeline depth, equals max(stage) + 1. 1 = serial, 2+ = pipelined (default 2) |
copy = PipelineTask("H2D", h2d_fn)
fwd = PipelineTask("Forward", fwd_fn)
bwd = PipelineTask("Backward", bwd_fn)
optim = PipelineTask("Optim", optim_fn)
plan = PipelinePlan(
schedule={
copy: TaskSchedule(stage=0, stream=copy_stream, thread_group="io"),
fwd: TaskSchedule(stage=1, thread_group="compute"),
bwd: TaskSchedule(stage=1, thread_group="compute"),
optim: TaskSchedule(stage=1, thread_group="compute"),
},
intra_iter_deps=[(fwd, copy), (bwd, fwd), (optim, bwd)],
inter_iter_deps=[(fwd, optim)], # Fwd(i) waits for Optim(i-1)
pipeline_depth=2,
)
class ProfileResult
TaskProfiler.profile() 返回结果,包含基线时间和每个 task 的 exposed time。
Return type of TaskProfiler.profile(), containing baseline time and per-task exposed time.
| 字段Field | 类型Type | 说明Description |
|---|---|---|
| baseline_s | float | 一次 serial 迭代的中位 wall-clock(秒)Median wall-clock of one serial iteration (seconds) |
| exposed_s | Dict[str, float] | {task_name: exposed_seconds}{task_name: exposed_seconds} |
PipelinePlan 的 schedule、intra_iter_deps、inter_iter_deps 中,可用 PipelineTask 对象或 name 字符串引用 task。推荐使用对象以获得更好的类型检查。
In PipelinePlan fields, tasks can be referenced by PipelineTask objects or name strings. Objects are recommended for better type checking.
同步与调度Synchronization & Scheduling
两阶段同步协议Two-Phase Synchronization Protocol
SWPipeline 通过两层依赖声明描述任务间的数据关系: SWPipeline uses two layers of dependency declarations to describe data relationships between tasks:
intra_iter_deps:同一 batch(同一 iter)内的数据依赖。如果任务 B 消费任务 A 的输出,写intra_iter_deps=[(B, A)]。intra_iter_deps: Data dependency within the same batch (same iter). If task B consumes A's output, writeintra_iter_deps=[(B, A)].inter_iter_deps:跨迭代数据依赖。任务 A(iter i) 依赖 B(iter i-1) 的输出,写inter_iter_deps=[(A, B)]。仅支持i-1。inter_iter_deps: Cross-iteration data dependency. Task A(iter i) depends on B(iter i-1), writeinter_iter_deps=[(A, B)]. Onlyi-1is supported.
两者都纯粹描述数据依赖语义,与 stream、stage 无关。stream 和 stage 带来的约束在推论部分讨论。运行时,每条依赖通过两阶段协议执行: Both purely describe data dependency semantics, independent of stream or stage. Constraints arising from stream and stage are discussed in the Corollaries section. At runtime, each dependency is enforced via a two-phase protocol:
# Phase 1: CPU synchronization — threading.Event sig = self._cpu_signal(iter_idx, dep_name) sig.wait(timeout=30) # CPU blocks until dep's CPU work is done # Phase 2: GPU synchronization — torch.cuda.Event stream.wait_event(self._cuda_event(iter_idx, dep_name)) # GPU waits for dep's kernels # Signal sender (after task completes): self._cuda_event(iter_idx, task_def.name).record(stream) self._cpu_signal(iter_idx, task_def.name).set()
多线程分发与 CPU 同步Multi-Thread Dispatch & CPU Synchronization
SWPipeline 支持多线程提交 kernel。每个任务通过 thread_group 属性指定所属的工作线程。运行时,_enqueue_period 按 _enqueue_order 顺序将任务分发到各线程的 work_queue:
SWPipeline supports multi-threaded kernel submission. Each task specifies its worker thread via the thread_group attribute. At runtime, _enqueue_period dispatches tasks to each thread's work_queue following _enqueue_order:
_enqueue_period 分发流程_enqueue_period dispatch flow: for task_def in self._enqueue_order: # stage-descending self._work_queues[task_def.thread_group].put((iter_idx, task_def)) ↓ ↓ ┌─────────────────┐ ┌─────────────────┐ │ Thread "memcpy" │ │ Thread "default" │ │ work_queue: │ │ work_queue: │ │ H2D │ │ WaitBatch │ │ │ │ Forward │ │ │ │ Backward │ └────────┬─────────┘ └────────┬──────────┘ │ │ _submit_task() _submit_task()
CPU 依赖不是独立的依赖层——它是两阶段协议的 Phase 1 自动处理的。当 Thread B 上的 WaitBatch 声明 intra_iter_deps=[H2D] 时:
CPU dependency is not a separate layer — it is automatically handled by Phase 1 of the two-phase protocol. When WaitBatch on Thread B declares intra_iter_deps=[H2D]:
- Thread B 在
_submit_task开头执行cpu_signal("H2D").wait()→ CPU 阻塞,直到 Thread A 完成 H2D 并调用.set()Thread B callscpu_signal("H2D").wait()at the start of_submit_task→ CPU blocks until Thread A finishes H2D and calls.set() - 解除阻塞后,Thread B 执行
stream.wait_event(cuda_event("H2D"))→ GPU 也同步 Once unblocked, Thread B callsstream.wait_event(cuda_event("H2D"))→ GPU also synchronized
即使两个任务在同一线程上,Phase 1 的 wait() 也一样执行——如果 dep 已完成则 wait() 立即返回,零开销。因此用户只需声明数据依赖(intra_iter_deps / inter_iter_deps),CPU 线程同步由 Phase 1 自动保证。
Even for tasks on the same thread, Phase 1's wait() still executes — if the dep is already done, wait() returns immediately with zero overhead. So users only need to declare data dependencies (intra_iter_deps / inter_iter_deps); CPU thread synchronization is automatically guaranteed by Phase 1.
示例:跨线程依赖Example: Cross-Thread Dependency
Thread "memcpy" (H2D) Thread "default" (WaitBatch) ═══════════════════════════════ ═══════════════════════════════════ ├─ run H2D on memcpy_stream ├─ cpu_signal("H2D").wait() ← Phase 1: CPU blocks ├─ cuda_event("H2D").record(memcpy) │ ... Thread B blocked ... ├─ cpu_signal("H2D").set() ─────────────┤ unblocked! ▼ (done, process next in queue) ├─ stream.wait_event(cuda_event("H2D")) ← Phase 2: GPU waits ├─ run WaitBatch on default_stream ▼ (done)
period / iter / stageperiod / iter / stage
| 概念Concept | 含义Meaning |
|---|---|
period |
全局时钟周期,每次 progress() 递增 1Global clock, incremented by 1 on each progress() |
stage |
流水线深度偏移(0 = 最早阶段,max = 最后阶段)Pipeline depth offset (0 = earliest, max = final stage) |
iter_idx |
任务处理的实际数据迭代编号Actual data iteration index |
iter_idx = period − stage
pipeline_depth = max(stage) + 1
示例Example: pipeline_depth=3 (SparseDist) stage=0(H2D) stage=1(InputDist) stage=2(Fwd/Bwd/Opt) period=2: iter 2 iter 1 iter 0 period=3: iter 3 iter 2 iter 1 period=4: iter 4 iter 3 iter 2
三条推论与死锁分析Three Corollaries & Deadlock Analysis
intra_iter_deps 中被依赖方的 stage 必须 ≤ 依赖方的 stage:stagedep ≤ stagetask。否则 dep 在更晚 period 执行,task 永远等不到——死锁。
Corollary 1: Stage constraint for intra_iter_deps. stagedep ≤ stagetask. Otherwise dep executes in a later period — deadlock.
_build_enqueue_order 采用 stage-descending 策略:
Corollary 2: Stage-descending submission. Higher stage → older batch within same period. _build_enqueue_order uses stage-descending:
- 正确性:
inter_iter_deps(k=1) 使供给者(高 stage)与消费者(低 stage)同 period,必须先提交供给者 Correctness: k=1inter_iter_depsplace supplier (high stage) and consumer (low stage) in same period; supplier first - 吞吐:优先老 batch 减少 head-of-line blocking Throughput: Prioritize older batch to reduce head-of-line blocking
inter_iter_deps 的 stage 差约束。被依赖方与依赖方的 stage 差不能大于 1:stagedep − stagetask ≤ 1。否则 dep 落在未来 period,构成结构性死锁。
Corollary 3: Stage gap for inter_iter_deps. stagedep − stagetask ≤ 1. Otherwise dep falls in a future period — structural deadlock.
| k | Δperiod | 结果Result | 示例Example |
|---|---|---|---|
≤ 0 | ≤ −1 |
✅ dep 在更早 period,零阻塞✅ dep in earlier period, zero blocking | 同 stage 内跨 iter:dep 在上一 period 已完成Same stage cross-iter: dep completed in prev period |
1 | 0 |
⚠️ 同 period,需 stage-descending⚠️ Same period, needs stage-descending | A(s=0) 依赖 B(s=1):B 先提交则安全A(s=0) depends on B(s=1): safe if B submitted first |
≥ 2 | ≥ 1 |
❌ 结构性死锁❌ Structural deadlock | fwd(s=0) 依赖 opt(s=2):opt 在未来 periodfwd(s=0) depends on opt(s=2): opt in future period |
死锁场景Deadlock Scenarios
场景 1:单线程 k=1 升序死锁Scenario 1: Single-thread k=1 ascending deadlock
A(s=0, T1), B(s=1, T1), A.inter_iter_deps=[B] → 同 periodsame period
升序Ascending ❌: T1: A(s=0) → wait B → BLOCKED → B 永远不提交never submitted 💀
降序Descending ✅: T1: B(s=1) → set ✓ → A(s=0) → 已 setset ✓
场景 2:多线程交叉死锁Scenario 2: Multi-thread cross deadlock
T1: A(s=0), D(s=1) A.cross_iter_dep→B(T2) T2: C(s=0), B(s=1) C.cross_iter_dep→D(T1)
升序Ascending ❌: T1: A→wait B→BLOCKED T2: C→wait D→BLOCKED → 互锁mutual 💀
降序Descending ✅: T1: D(s=1)→✓ T2: B(s=1)→✓ T1: A(s=0)→✓ T2: C(s=0)→✓
场景 3:k ≥ 2 结构性死锁Scenario 3: k ≥ 2 structural deadlock
fwd(s=0), bwd(s=1), opt(s=2), fwd.inter_iter_deps=[opt] → k=2 stage=2(opt) stage=1(bwd) stage=0(fwd) period 3: opt(1) bwd(2) fwd(3) period 4: opt(2) bwd(3) fwd(4)
fwd(3) 需要needs opt(2): fwd(3) @ period 3, opt(2) @ period 4 ← 未来 period,任何顺序都死锁future period, deadlock regardless 💀
综合示例:fwd / bwd / opt 的 stage 安排Example: fwd / bwd / opt Stage Arrangements
基本设定:intra_iter_deps = [(bwd, fwd), (opt, bwd)]。分析有/无 fwd.inter_iter_deps = [opt]。
Base: intra_iter_deps = [(bwd, fwd), (opt, bwd)]. With/without fwd.inter_iter_deps = [opt].
无 inter_iter_deps(接受 weight staleness)Without inter_iter_deps (accept staleness)
| 模式Mode | fwd | bwd | opt | depth | 加速Speedup | 说明Notes |
|---|---|---|---|---|---|---|
| A | 0 | 0 | 0 | 1 | ❌ | 纯串行Serial |
| B | 0 | 0 | 1 | 2 | ✅ | opt(i-1) ‖ fwd(i)+bwd(i) |
| C | 0 | 1 | 1 | 2 | ✅ | bwd(i-1)+opt(i-1) ‖ fwd(i) |
| D | 0 | 1 | 2 | 3 | ✅ | 三路,staleness=2Three-way, staleness=2 |
有 inter_iter_deps → 无 GPU 加速With inter_iter_deps → no GPU speedup
推论 1 + 推论 3 → 仅 3 种合法安排(A/B/C),均因 wait_event 强制串行:
Corollary 1 + 3 → only 3 legal arrangements (A/B/C), all forced serial by wait_event:
| 模式Mode | fwd | bwd | opt | k | 加速Speedup |
|---|---|---|---|---|---|
| A | 0 | 0 | 0 | 0 | ❌ |
| B | 0 | 0 | 1 | 1 | ❌ fwd wait_event opt |
| C | 0 | 1 | 1 | 1 | ❌ fwd wait_event opt |
inter_iter_deps。不声明时不同 stage 真正并行;声明后全局串行。实践建议:对 fwd/bwd/opt 不要声明 inter_iter_deps,依靠 stream FIFO + stage-descending 保证权重可见性。
Key insight: Pipelining works precisely by not declaring inter_iter_deps. Without: true parallelism. With: forced serial. Advice: don't declare inter_iter_deps for fwd/bwd/opt; rely on stream FIFO + stage-descending.
常见陷阱Common Pitfalls
| 陷阱Pitfall | 后果Consequence | 修复Fix |
|---|---|---|
| 遗漏跨 stream depMissing cross-stream dep | GPU 数据竞争GPU data race | 加 depAdd dep |
| 遗漏同 stage intra-depMissing intra-stage dep | _topo_sort 字母序,顺序错_topo_sort alphabetical, wrong order |
加 depAdd dep |
| dep 循环Circular deps | 死锁Deadlock | 确保 DAGEnsure DAG |
intra_iter_deps 中 dep.stage > task.stagedep.stage > task.stage in intra_iter_deps |
死锁(推论 1)Deadlock (Cor. 1) | 改用 inter_iter_depsUse inter_iter_deps |
| inter_iter_deps k ≥ 2 | 结构性死锁(推论 3)Structural deadlock (Cor. 3) | 缩减 stage 差至 ≤ 1Reduce stage gap to ≤ 1 |
Ready-First 调度算法Ready-First Scheduling Algorithm
_build_enqueue_order 采用 ready-first 策略:构建 period-local 依赖图,按 (stall_cost, name) 优先级做拓扑排序,在保证正确性的前提下最小化 GPU stream 空转(stall)。
_build_enqueue_order uses a ready-first strategy: builds a period-local dependency graph and topologically sorts by (stall_cost, name) priority, minimizing GPU stream stalls while preserving correctness.
问题:为什么 stage-descending 不是最优Problem: Why Stage-Descending Is Not Optimal
GPU stream 是 FIFO 队列。考虑以下配置: GPU streams are FIFO queues. Consider this setup:
任务Task stage stream intra_iter_deps ──────────────────────────────────── P 0 X — 无依赖none Q 1 Y [P] 跨 stage(0→1),不在 period-local 图中cross-stage(0→1), not in period-local graph R 0 X [Q] 跨 stage(1→0),不在 period-local 图中,但 GPU 仍有 wait_eventcross-stage(1→0), not in period-local graph, but GPU still has wait_event
P 和 R 都在 stage 0、stream X 上。stage-descending 先提交 Q(stage 1),再提交 P、R(stage 0)。问题是 R 依赖 Q(跨 stream Y→X),提交时会插入 stream_X.wait_event(Q)。如果 R 被排在 P 前面(如按字母序),P 就被堵在 R 的 wait_event 后面白白空转:
P and R are both at stage 0 on stream X. Stage-descending submits Q(stage 1) first, then P and R(stage 0). The problem: R depends on Q (cross-stream Y→X), inserting stream_X.wait_event(Q). If R is queued before P (e.g., alphabetical), P idles behind R's wait_event:
不优顺序(R 先于 P)Suboptimal (R before P): stream_X: [R.wait_event(Q)] → [R kernels] → [P kernels] ↑ P 白等idles 优化顺序(P 先于 R)Optimal (P before R): stream_X: [P kernels] → [R.wait_event(Q)] → [R kernels] ↑ P 立即执行runs immediately
GPU stall 定义:stream.wait_event(event) 使 GPU stream 暂停直到 event 被 record。如果 event 来自同 period 内尚在执行的另一个 stream 上的任务,stream 会空转。这段空转时间就是 stall。
GPU stall defined: stream.wait_event(event) pauses the GPU stream until the event is recorded. If the event comes from another stream's task still executing within the same period, the stream idles. This idle time is the stall.
Stage-descending 的局限:它只按 stage 排序,同 stage 内的任务顺序不考虑 stall_cost。ready-first 算法解决这个问题——优先提交无跨 stream 依赖(stall_cost=0)的任务。 Stage-descending limitation: It only sorts by stage; within the same stage, task order ignores stall_cost. Ready-first fixes this — it prioritizes tasks with no cross-stream deps (stall_cost=0).
核心思路:Period-Local 依赖图 + Stall Cost 优先级Core Idea: Period-Local Dependency Graph + Stall Cost Priority
算法分两步:The algorithm has two steps:
步骤 1:构建 period-local 依赖图Step 1: Build the period-local dependency graph
只纳入同一 period 内实际发生交互的依赖边(参见推论 3 的分类): Include only dependency edges that interact within the same period (see Corollary 3 classification):
| 边类型Edge type | 条件Condition | 图中方向Edge direction |
|---|---|---|
intra_iter_deps |
stagedep = stagetask |
dep → task |
inter_iter_deps |
stagedep = stagetask + 1 |
dep → task |
不纳入的边:intra_iter_deps 中 stagedep < stagetask(dep 在更早 period 完成);inter_iter_deps 中 stagedep ≤ stagetask(dep 在更早 period 完成)。这些依赖的 threading.Event 已 set、cuda_event 已 record,不会造成任何阻塞。
Excluded edges: intra_iter_deps where stagedep < stagetask (dep completed in earlier period); inter_iter_deps where stagedep ≤ stagetask (dep completed in earlier period). These dependencies have threading.Event set and cuda_event recorded — zero blocking.
步骤 2:Stall-Cost 感知的拓扑排序Step 2: Stall-cost-aware topological sort
对 period-local 图做拓扑排序。当多个任务同时 ready(入度 = 0)时,按 stall_cost 升序选取: Topologically sort the period-local graph. When multiple tasks are ready (in-degree = 0), pick by ascending stall_cost:
stall_cost(task) = 该任务在 period-local 图中,有多少个跨 stream 的入边(dep 与 task 不在同一 stream 上)number of cross-stream in-edges for this task in the period-local graph (dep and task on different streams) 排序 keySort key: (stall_cost, task_name) → stall_cost = 0 的任务优先:不插入wait_event,不堵同 stream 后续任务tasks first: nowait_eventinserted, won't block subsequent tasks on same stream → task_name 做 tie-breaker,保证确定性(跨 rank 一致)as tie-breaker for determinism (consistent across ranks)
伪代码Pseudocode
def _build_enqueue_order_ready_first(self) -> List[PipelineTask]:
# --- Step 1: build period-local graph ---
adj: Dict[str, List[str]] = defaultdict(list) # dep → [tasks]
in_degree: Dict[str, int] = {t.name: 0 for t in self._defs.values()}
for task_name, dep_list in self._intra_iter_deps.items():
task_stage = self._stage_map[task_name]
for dep_name in dep_list:
if self._stage_map[dep_name] == task_stage: # same stage → same period
adj[dep_name].append(task_name)
in_degree[task_name] += 1
for task_name, dep_list in self._inter_iter_deps.items():
task_stage = self._stage_map[task_name]
for dep_name in dep_list:
if self._stage_map[dep_name] == task_stage + 1: # k=1 → same period
adj[dep_name].append(task_name)
in_degree[task_name] += 1
# --- Step 2: compute stall_cost ---
stall_cost: Dict[str, int] = {t.name: 0 for t in self._defs.values()}
for task_name, dep_list in self._intra_iter_deps.items():
task_stream = self._stream_map[task_name]
task_stage = self._stage_map[task_name]
for dep_name in dep_list:
if self._stage_map[dep_name] == task_stage \
and self._stream_map[dep_name] != task_stream:
stall_cost[task_name] += 1 # in-period cross-stream dep
for task_name, dep_list in self._inter_iter_deps.items():
task_stream = self._stream_map[task_name]
task_stage = self._stage_map[task_name]
for dep_name in dep_list:
if self._stage_map[dep_name] == task_stage + 1 \
and self._stream_map[dep_name] != task_stream:
stall_cost[task_name] += 1 # in-period cross-stream dep
# --- Step 3: topo sort with stall_cost priority ---
ready = sorted(
[n for n, d in in_degree.items() if d == 0],
key=lambda n: (stall_cost[n], n)
)
order: List[str] = []
while ready:
name = ready.pop(0)
order.append(name)
for succ in adj[name]:
in_degree[succ] -= 1
if in_degree[succ] == 0:
ready.append(succ)
ready.sort(key=lambda n: (stall_cost[n], n))
return [self._defs[n] for n in order]
实例推演:TrainPipelineSparseDist(3-stage)Worked Example: TrainPipelineSparseDist (3-stage)
配置回顾(共 8 个任务,3 个 stream,3 个 stage):Configuration recap (8 tasks, 3 streams, 3 stages):
任务Task stage stream intra_iter_deps inter_iter_deps ───────────────────────────────────────────────────────────────────────── H2D 0 memcpy — — InputDistStart 1 data_dist H2D (stage 0→1, cross-stage) — InputDistWait 1 data_dist InputDistStart — ZeroGrad 2 default — — WaitBatch 2 default InputDistWait (stage 1→2), — ZeroGrad Forward 2 default InputDistWait (stage 1→2), — WaitBatch Backward 2 default Forward — OptimizerStep 2 default Backward —
步骤 1:Period-Local 图Step 1: Period-Local Graph
只保留 stagedep = stagetask 的 intra_iter_deps 边(无 inter_iter_deps):Keep only intra_iter_deps edges where stagedep = stagetask (no inter_iter_deps):
保留的边(同 stage = 同 period)Retained edges (same stage = same period): InputDistStart → InputDistWait (stage 1 → 1) ✓ ZeroGrad → WaitBatch (stage 2 → 2) ✓ WaitBatch → Forward (stage 2 → 2) ✓ Forward → Backward (stage 2 → 2) ✓ Backward → OptimizerStep (stage 2 → 2) ✓ 排除的边(跨 stage = 跨 period,dep 在更早 period 已完成)Excluded edges (cross-stage = cross-period, dep completed in earlier period): H2D → InputDistStart (stage 0 → 1) ✗ InputDistWait → WaitBatch (stage 1 → 2) ✗ InputDistWait → Forward (stage 1 → 2) ✗
步骤 2:计算 stall_costStep 2: Compute stall_cost
stall_cost = period-local 图中跨 stream 的入边数。所有保留的边都是同 stream 内部的:stall_cost = number of cross-stream in-edges in the period-local graph. All retained edges are within the same stream:
任务Task period-local 入边period-local in-edges 跨 stream?cross-stream? stall_cost ────────────────────────────────────────────────────────────────────── H2D 无none — 0 InputDistStart 无none — 0 InputDistWait InputDistStart(data_dist) 同 streamsame 0 ZeroGrad 无none — 0 WaitBatch ZeroGrad(default) 同 streamsame 0 Forward WaitBatch(default) 同 streamsame 0 Backward Forward(default) 同 streamsame 0 OptimizerStep Backward(default) 同 streamsame 0
所有 stall_cost = 0,因为跨 stage(跨 stream)的 dep 被排除了——它们来自更早 period,GPU event 早已 record。 All stall_cost = 0 because cross-stage (cross-stream) deps are excluded — they come from earlier periods with GPU events already recorded.
步骤 3:拓扑排序Step 3: Topological Sort
初始 ready(入度=0,按 (stall_cost, name) 排序)Initial ready (in-degree=0, sorted by (stall_cost, name)): [(0,"H2D"), (0,"InputDistStart"), (0,"ZeroGrad")] 出队过程Dequeue process: 1. H2D → 无后继在 period-local 图中no successors in period-local graph 2. InputDistStart → 释放releases InputDistWait 3. ZeroGrad → 释放releases WaitBatch 4. InputDistWait → 无 period-local 后继no period-local successors 5. WaitBatch → 释放releases Forward 6. Forward → 释放releases Backward 7. Backward → 释放releases OptimizerStep 8. OptimizerStep
最终提交顺序Final submission order:
# 任务Task stream stage stall_cost
1. H2D memcpy 0 0
2. InputDistStart data_dist 1 0
3. ZeroGrad default 2 0
4. InputDistWait data_dist 1 0
5. WaitBatch default 2 0
6. Forward default 2 0
7. Backward default 2 0
8. OptimizerStep default 2 0
[ZeroGrad, WaitBatch, Forward, Backward, OptimizerStep, InputDistStart, InputDistWait, H2D]。两者在此例中效果相同——因为跨 stage 的 dep 都来自更早 period,不产生 stall。但在有 inter_iter_deps(k=1)的场景下,ready-first 的优势才体现出来。
Comparison with stage-descending: Stage-descending produces [ZeroGrad, WaitBatch, Forward, Backward, OptimizerStep, InputDistStart, InputDistWait, H2D]. Both are equivalent for this example — because all cross-stage deps come from earlier periods, producing no stalls. The advantage of ready-first emerges in scenarios with inter_iter_deps (k=1).
进阶示例:同 stream 不同 stage + inter_iter_depsAdvanced Example: Same Stream, Different Stages + inter_iter_deps
考虑一个假设的 pipeline,同一 stream 上有任务分布在不同 stage,且存在 inter_iter_deps(k=1):Consider a hypothetical pipeline where the same stream has tasks at different stages, with inter_iter_deps (k=1):
配置Setup: P: stage=0, stream=X # stall_cost = 0 Q: stage=1, stream=Y, intra_iter_deps=[P] # 跨 stage:排除出 period-local 图cross-stage: excluded from period-local graph R: stage=0, stream=X, intra_iter_deps=[Q] # 跨 stage:排除出 period-local 图cross-stage: excluded from period-local graph A: stage=0, stream=Z, inter_iter_deps=[Q] # k=1: Q(stage 1) → 进入 period-local 图enters period-local graph
Period-Local 图Period-Local Graph(只有 k=1 的 inter_iter_deps 边) (only the k=1 inter_iter_deps edge):
Q(stage 1) ──→ A(stage 0) # cross_iter_dep, k=1, 同 periodsame period P, R: 无 period-local 边no period-local edges
stall_cost:
P: 0 (无 period-local 入边)(no period-local in-edge) Q: 0 (无 period-local 入边)(no period-local in-edge) R: 0 (无 period-local 入边)(no period-local in-edge) A: 1 (Q→A 跨 stream: Y→Z)(Q→A cross-stream: Y→Z)
拓扑排序Topological sort:
初始 readyInitial ready: [(0,"P"), (0,"Q"), (0,"R")] A 入度=1,不在 ready 中in-degree=1, not ready 1. P (stall=0, stream=X) → 立即执行,不等任何人runs immediately, no waits 2. Q (stall=0, stream=Y) → 立即执行runs immediately → 释放 Areleases A 3. R (stall=0, stream=X) → 立即执行(P 已在 X 上,FIFO 保证顺序)runs immediately (P already on X, FIFO guarantees order) 4. A (stall=1, stream=Z) → wait_event(Q),但 Q 已提交,延迟最小wait_event(Q), but Q already submitted, minimal delay
最终顺序Final order: P(X,s0) → Q(Y,s1) → R(X,s0) → A(Z,s0) GPU 时间线timeline: stream_X: ▓▓P▓▓▓▓R▓▓▓▓▓▓ ← P,R 连续执行,零空转run back-to-back, zero stall stream_Y: ▓▓Q▓▓▓▓▓▓▓▓▓▓▓ stream_Z: ░░░░░░▓▓A▓▓▓▓▓ ← A 等 Q 完成后执行waits for Q then runs
Q → P → R → A。在此例中 P 被延迟了——Q 在不同 stream 上,P 不需要等 Q,但 stage-descending 强制让 Q 先提交。ready-first 让 P 和 Q 同时出现在 ready 列表中,按 name 排序 P 先出队。
Stage-descending comparison: Stage-descending submits Q(stage 1) first, then P, R, A (stage 0). Order: Q → P → R → A. Here P is delayed — Q runs on a different stream and P doesn't need Q, but stage-descending forces Q first. Ready-first places both P and Q in the ready list simultaneously; P dequeues first by name.
正确性保证:为什么 Ready-First 能避免死锁Correctness Guarantee: Why Ready-First Prevents Deadlock
Ready-first 不仅优化性能,还从结构上保证了无死锁。死锁的根本原因是循环等待——任务 X 等待任务 Y,而 Y 又直接或间接等待 X。Ready-first 通过以下两层保证消除循环等待: Ready-first is not merely a performance optimization — it structurally guarantees deadlock freedom. Deadlock arises from circular waits — task X waits for Y while Y (directly or indirectly) waits for X. Ready-first eliminates circular waits through two layers of guarantees:
| 依赖来源Dependency source | 为什么不会死锁Why no deadlock |
|---|---|
| 跨 periodCross-period |
被依赖方在更早的 period 中执行完毕:threading.Event 已 set、cuda_event 已 record。等待立即返回,零阻塞。
The dependency ran in an earlier period: threading.Event already set, cuda_event already recorded. Waits return immediately — zero blocking.
|
| 同 period(period-local)Same period (period-local) | 拓扑排序保证:若 X 依赖 Y(period-local 边 Y→X),则 Y 一定排在 X 前面入队。在 FIFO 工作队列中,Y 先被 worker 取出执行,X 等 Y 时 Y 已经在运行或已完成。 Topological sort guarantees: if X depends on Y (period-local edge Y→X), Y is enqueued before X. In the FIFO work queue, Y is picked up first by the worker — when X waits for Y, Y is already running or completed. |
多线程场景:每个 thread group 有独立的 FIFO 队列。如果任务 X 排在队头但阻塞等待同 queue 中更后面的 Y → head-of-line 死锁。Ready-first 的拓扑排序确保 Y 永远排在 X 前面,消除了这种死锁模式。 Multi-threaded scenario: Each thread group has its own FIFO queue. If task X is at the head but blocks waiting for Y further back in the same queue → head-of-line deadlock. Ready-first's topological sort ensures Y is always ahead of X, eliminating this deadlock pattern.
❌ 可能死锁的入队顺序(未按拓扑排序)❌ Potentially deadlocking enqueue order (not topologically sorted): thread_queue: [X, Y] X 先出队,等 Y 的 CPU signal → Y 永远出不了队 → 死锁dequeued first, waits for Y's CPU signal → Y never dequeues → deadlock ✓ Ready-first 保证的入队顺序(拓扑排序)✓ Ready-first guaranteed enqueue order (topologically sorted): thread_queue: [Y, X] Y 先出队执行 → set CPU signal → X 出队后立即获得 signaldequeued first → set CPU signal → X dequeues and immediately gets signal
形式化:period-local 依赖图是 DAG(无环有向图)——_validate 确保 intra_iter_deps 满足 dep.stage ≤ task.stage,inter_iter_deps 满足 dep.stage - task.stage ≤ 1(推论 1、3)。DAG 的拓扑排序一定存在且无环,因此 ready-first 产生的入队顺序必然无死锁。
Formal argument: The period-local dependency graph is a DAG — _validate ensures intra_iter_deps satisfy dep.stage ≤ task.stage and inter_iter_deps satisfy dep.stage - task.stage ≤ 1 (Corollaries 1 & 3). A DAG always admits a topological sort with no cycles, so the ready-first enqueue order is guaranteed deadlock-free.
完整性证明:排除的跨 stage 边不影响正确性Completeness Proof: Excluded Cross-Stage Edges Do Not Affect Correctness
Period-local 图排除了 intra_iter_deps 中 stagedep < stagetask 的边。这是否意味着这些同 batch 的依赖得不到保证?以下定理证明不会。
The period-local graph excludes intra_iter_deps edges where stagedep < stagetask. Does this mean these same-batch dependencies are unsatisfied? The following theorem proves they are not.
定理:对于任意 intra_iter_deps 边 (T, D)(T 依赖 D,同一迭代 i),ready-first 入队顺序保证 D 的 CPU signal 和 CUDA event 在 T 执行时已可用。
Theorem: For any intra_iter_deps edge (T, D) where T depends on D within iteration i, the ready-first enqueue order guarantees D's CPU signal and CUDA event are available when T executes.
证明:设 D 的 stage 为 sD,T 的 stage 为 sT。推论 1(_validate 强制)要求 sD ≤ sT。分三种情况:
Proof: Let sD and sT be the stages of D and T. Corollary 1 (_validate enforced) requires sD ≤ sT. Three cases:
| 情况Case | stage 关系Stage relation | 证明Proof |
|---|---|---|
| A | sD = sT |
D 和 T 在同一 period P = i + sD。边 D→T 在 period-local 图中,拓扑排序保证 D 排在 T 前面入队。· 单线程:D 先 CPU 提交,T 后提交。✓ · 多线程同 thread group:FIFO 保证 D 先被 worker 处理。✓ · 多线程不同 thread group:T 的 worker 调用 cpu_signal(i, D).wait() 阻塞。D 的依赖均来自更早 period 或同 period 更上游(topo 排序保证),D 不依赖 T → 无循环等待 → D 终将完成。✓
D and T are in the same period P = i + sD. Edge D→T is in the period-local graph; topological sort guarantees D is enqueued before T.· Single-threaded: D is CPU-submitted before T. ✓ · Multi-threaded, same thread group: FIFO ensures D is picked up first. ✓ · Multi-threaded, different groups: T's worker calls cpu_signal(i, D).wait(). D's deps are from earlier periods or upstream in the same period (topo-sorted); D does not depend on T → no circular wait → D eventually completes. ✓
|
| B | sD < sT |
D 在 period PD = i + sD,T 在 period PT = i + sT,PD < PT。边不在 period-local 图中。关键:periods 按序处理, _enqueue_period(PT) 被调用时 _enqueue_period(PD) 早已完成。子证明(D 不被 PT 中的任务阻塞):D 的所有依赖要么来自更早 period(< PD,已完成),要么来自 PD 内的同 stage 任务(情况 A 保证)。由推论 1 的 stagedep ≤ stagetask 约束,D 的依赖链只能指向 stage ≤ sD < sT 的任务,不可能涉及 period PT 中的任何任务。因此 D 终将执行完毕: cpu_signal(i, D) 已 set → T 的 sig.wait() 立即返回;cuda_event(i, D) 已 record → T 的 stream.wait_event() 保证 GPU 顺序。✓
D runs in period PD = i + sD, T in period PT = i + sT, PD < PT. Edge is not in the period-local graph.Key: Periods are processed sequentially; _enqueue_period(PT) is called after _enqueue_period(PD) has completed.Sub-proof (D is not blocked by any task in PT): All of D's dependencies are from earlier periods (< PD, already done) or same-period tasks in PD (guaranteed by Case A). By Corollary 1's stagedep ≤ stagetask constraint, D's dependency chain can only reach tasks with stage ≤ sD < sT, never involving any task in period PT.Therefore D eventually completes: cpu_signal(i, D) is set → T's sig.wait() returns immediately; cuda_event(i, D) is recorded → T's stream.wait_event() guarantees GPU ordering. ✓
|
| C | sD > sT |
被 _validate 禁止(推论 1)。此时 D 在 period PD = i + sD > PT,T 执行时 D 尚未入队 → cpu_signal(i, D).wait() 永远等不到 → 结构性死锁。这就是推论 1 存在的原因。 Forbidden by _validate (Corollary 1).D would be in period PD = i + sD > PT; when T executes, D is not yet enqueued → cpu_signal(i, D).wait() blocks forever → structural deadlock.This is precisely why Corollary 1 exists. |
具体示例(FusedSparseDist):InputDistStart(stage=1) 依赖 H2D(stage=0),属于情况 B。
Concrete example (FusedSparseDist): InputDistStart(stage=1) depends on H2D(stage=0) — this is Case B.
迭代Iteration i = 3: H2D(i=3): stage=0, period = 3+0 = 3 InputDistStart(i=3): stage=1, period = 3+1 = 4 时间线Timeline: period 3: 提交enqueue H2D(i=3) → cpu_signal(3,"H2D").set() → cuda_event(3,"H2D").record() period 4: 提交enqueue InputDistStart(i=3) → cpu_signal(3,"H2D").wait() → 已 set,立即返回already set, returns immediately → stream.wait_event(cuda_event) → 已 record,GPU 保证顺序already recorded, GPU guarantees order
QED:推论 1 排除情况 C;情况 A 由 period-local 拓扑排序保证;情况 B 由 period 顺序执行保证。三者合一,ready-first 对所有 intra_iter_deps 必然正确。对 inter_iter_deps 的证明类似(用推论 3 替换推论 1,证明结构相同)。
QED: Corollary 1 rules out Case C; Case A is guaranteed by period-local topological sort; Case B is guaranteed by sequential period processing. Combined, ready-first is correct for all intra_iter_deps. The proof for inter_iter_deps is analogous (substitute Corollary 3 for Corollary 1; proof structure is identical).
类层次结构Class Hierarchy
流水线框架采用 Template Method 模式,将通用基础设施(同步资源、worker 线程、shortcut 缓存)抽取到抽象基类 _PipelineBase,调度策略由子类实现:
The pipeline framework uses the Template Method pattern. Universal infrastructure (sync resources, worker threads, shortcut caching) is extracted into the abstract base class _PipelineBase, while scheduling strategies are implemented by subclasses:
_PipelineBase (abstract)
┌─────────────────────────────────────┐
│ fill_pipeline / progress / drain │ ← 公共 API
│ run / run_serial / run_one_serial │
│ enable_shortcut / disable_shortcut │
│ _submit_task + _after_task_complete │ ← hook
│ _start_workers / _stop_workers │
│ CUDA events / CPU signals / slots │
└───────────┬─────────────┬───────────┘
│ │
┌────────────────┘ └────────────────┐
│ │
SWPipeline (clock-driven) DataFlowPipeline (data-flow)
┌────────────────────────┐ ┌────────────────────────┐
│ _do_fill: depth 个 │ │ _do_fill: _try_advance │
│ period 逐步填充 │ │ burst 连续发射 │
│ _do_retire: enqueue │ │ _do_retire: sweep_fire │
│ next period │ │ + try_advance │
│ _enqueue_period() │ │ _after_task_complete: │
│ _build_enqueue_order() │ │ 触发下游 task │
│ stage / period 概念 │ │ _try_fire_task() │
│ format_schedule() │ │ 无 stage/period 概念 │
└────────────────────────┘ └────────────────────────┘
文件布局: sw_pipeline.py 包含 _PipelineBase + SWPipeline + TaskProfiler + 数据类;dataflow_pipeline.py 包含 DataFlowPipeline。
File layout: sw_pipeline.py contains _PipelineBase + SWPipeline + TaskProfiler + data classes; dataflow_pipeline.py contains DataFlowPipeline.
_PipelineBase
PipelinePlan 中的任务定义和依赖关系,按 num_slots 预分配 CUDA events 和 CPU signals,管理 worker 线程、shortcut 缓存、NVTX 标注。不包含任何调度策略 — 子类通过实现 _do_fill()、_do_retire() 和可选的 _after_task_complete() hook 来定义调度行为。
Abstract base class for pipeline engines. Parses task definitions and dependencies from PipelinePlan, pre-allocates CUDA events and CPU signals for num_slots, manages worker threads, shortcut caching, and NVTX annotations. Contains no scheduling strategy — subclasses define scheduling by implementing _do_fill(), _do_retire(), and optionally overriding the _after_task_complete() hook.
| 参数Param | 类型Type | 说明Description |
|---|---|---|
| plan | PipelinePlan | 完整调度计划(任务、依赖、stream 分配)Complete scheduling plan (tasks, deps, stream assignments) |
| num_slots | int | 同步槽位数。SWPipeline 传 depth,DataFlowPipeline 传 max_depthSync slots. SWPipeline passes depth, DataFlowPipeline passes max_depth |
| device | int | CUDA 设备号(默认 0)CUDA device (default 0) |
class _PipelineBase:
def fill_pipeline(self, data_iter):
self._reset_state()
self._start_workers()
self._do_fill(data_iter) # 抽象: 子类实现填充策略
self._pipeline_started = True
return data_iter
def progress(self, data_iter=None):
oldest = self._next_retire
# ... 等待 _iter_complete, 检查 error, wait_event ...
self._do_retire(oldest, data_iter) # 抽象: 子类实现退休+推进策略
# ... check drain ...
return oldest
def _submit_task(self, iter_idx, task_def):
# ... wait deps, exec task, record events ...
self._after_task_complete(task_def.name, iter_idx) # hook
def _after_task_complete(self, task_name, iter_idx):
pass # SWPipeline: no-op; DataFlowPipeline: 触发下游
SWPipeline
_PipelineBase。以 period 为节拍,每个 period 提交所有 task 各一次(iter_idx = period − stage)。构造时从 TaskSchedule.stage 推断 depth = max(stage) + 1,传入基类作为 num_slots。支持 pipelined / serial / single-iter 三种执行模式,以及运行时 task shortcut。构造时验证 DAG(无环、stage 单调、名称唯一)。
Clock-driven (synchronous) pipeline engine, inheriting from _PipelineBase. Advances in periods, submitting every task once per period (iter_idx = period − stage). At construction, infers depth = max(stage) + 1 from TaskSchedule.stage and passes it to the base as num_slots. Supports pipelined / serial / single-iter execution and runtime task shortcutting. Validates DAG at construction (acyclic, monotone stages, unique names).
| 参数Param | 类型Type | 说明Description |
|---|---|---|
| plan | PipelinePlan | 完整调度计划Complete scheduling plan |
| device | int | CUDA 设备号(默认 0)CUDA device (default 0) |
pipe = SWPipeline(plan, device=0)
# Mode 1: run() — one-shot pipelined execution
elapsed = pipe.run(dataloader, verbose=True)
# Mode 2: run_serial() — serial baseline
baseline = pipe.run_serial(dataloader, verbose=True)
# Mode 3: fill_pipeline + progress — fine-grained control
data_iter = pipe.fill_pipeline(dataloader)
while True:
try:
idx = pipe.progress(data_iter)
except StopIteration:
break
执行时序图Execution Sequence Diagram
fill_pipeline() → progress() 循环。depth=2, 3 batches, Stage 0: CopyBatch (IO Worker), Stage 1: Forward + Backward (Comp Worker)。
fill_pipeline() → progress() loop. depth=2, 3 batches. Stage 0: CopyBatch (IO Worker), Stage 1: Forward + Backward (Comp Worker).
SWPipeline 方法Methods
fill_pipeline
depth 个 batch,启动 worker 线程,提交前 depth 个 period。必须在首次 progress() 前调用。接受任何可迭代对象。
Prefetch depth batches, start worker threads, submit first depth periods. Must be called before first progress(). Accepts any iterable.
| 参数Param | 类型Type | 说明Description |
|---|---|---|
| data_iter | Iterable | 数据源Data source |
返回Returns → Iterator
异常Raises → RuntimeError (重复调用)(called twice without drain)
data_iter = pipe.fill_pipeline(train_loader)
# Now ready for pipe.progress(data_iter)
progress
iter_idx。传 data_iter=None 只 retire 不入队新数据。超时 60 秒抛 RuntimeError。
Advance one step: (1) wait for oldest in-flight iteration (2) enqueue next period (3) return completed iter_idx. Pass None to retire without enqueuing. 60s timeout raises RuntimeError.
返回Returns → int · 异常Raises → StopIteration, RuntimeError
data_iter = pipe.fill_pipeline(dataloader)
while True:
try:
idx = pipe.progress(data_iter)
except StopIteration:
break
run
data_iter,返回 wall-clock 秒数。内部 = fill_pipeline + progress 循环。NVTX 前缀 SWP/{TaskName}/iter{N}。emit_nvtx=True 启用 torch.autograd.profiler.emit_nvtx() 让 backward kernel 在 nsys 中可见。
One-shot pipelined execution consuming entire data_iter. Returns wall-clock seconds. Internally = fill_pipeline + progress loop. NVTX prefix SWP/{TaskName}/iter{N}. emit_nvtx=True enables torch.autograd.profiler.emit_nvtx() for nsys backward-kernel visibility.
返回Returns → float (秒,含 cuda.synchronize)(seconds, includes cuda.synchronize)
elapsed = pipe.run(batches, verbose=True, emit_nvtx=True)
print(f"Pipeline: {elapsed*1000:.1f} ms")
run_serial
SWP_serial/{TaskName}/iter{N}。Shortcut task 自动追加 [skip] 后缀。
Run all iterations serially on default stream (no pipeline, no threads) as a performance baseline. NVTX prefix SWP_serial/{TaskName}/iter{N}. Shortcut tasks get [skip] suffix.
返回Returns → float
serial = pipe.run_serial(make_data(100), verbose=True)
pipeline = pipe.run(make_data(100), verbose=True)
print(f"Speedup: {serial / pipeline:.2f}x")
run_one_serial_iter
fn(ctx) 并回放缓存。不需要 fill_pipeline(),不涉及 worker 线程。原子操作,无 in-flight 状态。
Single serial iteration on default stream. Executes tasks in topo order. Respects shortcut state — shortcutted tasks skip fn(ctx) and replay cache. No fill_pipeline() needed, no workers. Atomic, leaves no in-flight state.
batch = next(iter(train_loader))
pipe.run_one_serial_iter(batch, iter_idx=0)
# With shortcut
pipe.enable_shortcut("Forward")
pipe.run_one_serial_iter(batch, iter_idx=1) # caching
pipe.run_one_serial_iter(batch, iter_idx=2) # shortcut (Forward skipped)
enable_shortcut
DeclaredIO.capture()),后续执行 = shortcut(跳过 fn,回放 cache + 调用 DeclaredIO.restore())。globally_ordered Task 仍执行 _SubmissionSequencer 保序。所有 rank 必须同步 enable/disable 相同 Task。
Enable shortcut mode. First execution = caching (run fn + snapshot + call DeclaredIO.capture()), subsequent = shortcut (skip fn, replay cache + call DeclaredIO.restore()). globally_ordered tasks still run _SubmissionSequencer. All ranks must enable/disable same tasks.
异常Raises → ValueError (未知 task 名称)(unknown task name)
pipe.enable_shortcut("EmbLookup")
pipe.enable_shortcut("EmbLookup", "Prefetch", "MPEmbForward")
disable_shortcut
pipe.disable_shortcut("EmbLookup")
drain
_reset_state()。之后可重新 fill_pipeline()。Serial 模式(
run_one_serial_iter):仅 cuda.synchronize——串行迭代是原子操作,无 in-flight 状态。Shortcut 配置和缓存保留,需显式
disable_shortcut() 清除。
Pipelined mode: retires all in-flight iterations → stops workers → syncs CUDA → _reset_state(). Then fill_pipeline() again.Serial mode (
run_one_serial_iter): just cuda.synchronize — serial iterations are atomic.Shortcut config and cache are preserved; call
disable_shortcut() to clear.
# Switch data iterator in pipelined mode
pipe.drain()
data_iter = pipe.fill_pipeline(new_loader)
# Change shortcut config mid-pipeline
pipe.drain()
pipe.enable_shortcut("Forward")
data_iter = pipe.fill_pipeline(loader)
print_schedule
[skip]。
Print period × task schedule table. One task/row. Columns: Thread, Stream, iteration per period. Shortcutted tasks have [skip] suffix.
pipe.print_schedule(3)
# # Task Thread Stream | P0 P1 P2
# -- ------------------------ ------- ------- + ---- ---- ----
# 0 H2DAndShuffle default default | i0 i1 i2
# 1 EmbInputDistStart [skip] default default | . . .
# 2 EmbInputDistWait default default | i0 i1 i2
format_schedule
print_schedule,但返回字符串不打印。可用于日志或进一步处理。
Same as print_schedule but returns string instead of printing. Useful for logging.
logger.info(pipe.format_schedule(5))
print_stage_analysis
pipe.print_stage_analysis()
# Task schedule (topological order):
# Pos Task Idx Stage Type
# --------------------------------------------------
# 0 CopyBatch 0 0 stream
# 1 Forward 1 1 stream
# ...
# LDS = 2 (example: (1, 0))
# Pipeline depth = 2
# ✓ LDS == depth (Dilworth-optimal)
__repr__
print(pipe)
# SWPipeline(device=0, depth=1, tasks=[...], shortcuts=['MPEmbForward'])
#
# # Task Thread Stream | P0
# -- ------------------------ ------- ------- + ---
# 0 H2DAndShuffle default default | i0
TaskProfiler
SWPipeline shortcut 机制:逐一 shortcut 每个 task,测量 exposed(T) = baseline − serial_with_T_shortcut。结果驱动 auto-scheduling 决策。
Per-task exposed time measurement. Uses SWPipeline shortcut: shortcut each task one by one, measure exposed(T) = baseline − serial_with_T_shortcut. Results drive auto-scheduling decisions.
profiler = TaskProfiler(pipe)
result = profiler.profile(batch, num_warmup=5, num_measure=20)
result.print_report()
profile
exposed = max(0, baseline - median_shortcut)。每 round 只有两次 cuda.synchronize。skip_tasks 排除不可独立 shortcut 的 Task(如 LossBackward)。
Four-phase profiling: (1) Warmup (2) Baseline (median of num_rounds) (3) Per-task shortcut measurement (4) exposed = max(0, baseline - median_shortcut). Only two cuda.synchronize per round. skip_tasks excludes non-independently-shortcuttable tasks (e.g. LossBackward).
| 参数Param | 类型Type | 默认Default | 说明Description |
|---|---|---|---|
| batch | Any | — | 代表性 batchRepresentative batch |
| num_warmup | int | 3 | 预热次数Warmup iterations |
| num_measure | int | 10 | 每 round 迭代数Iterations per round |
| num_rounds | int | 3 | round 数Number of rounds |
| skip_tasks | Optional[set] | None | 排除的 Task 名称Task names to exclude |
返回Returns → ProfileResult
result = profiler.profile(batch, num_warmup=5, num_measure=20, num_rounds=5)
result.print_report()
# Access raw data
for name, t in result.exposed_s.items():
print(f"{name}: {t*1e3:.3f} ms")
profile_many
results = profiler.profile_many([batch_a, batch_b, batch_c])
for i, r in enumerate(results):
print(f"\n--- Batch {i} ---")
r.print_report()
ProfileResult.print_report
result.print_report()
# Baseline serial iteration: 8.503 ms
#
# Task Exposed % baseline
# ------------------------ ---------- ------------
# CopyBatch 0.521ms 6.1%
# EmbLookup 1.234ms 14.5%
# ...
# ------------------------ ---------- ------------
# SUM 7.571ms 89.0%
DataFlowPipeline
_PipelineBase。没有 period 或 stage 概念 — 每个 task 在所有依赖完成 + buffer 有空位时立即发射。max_depth 控制同时在飞的最大迭代数,既是 buffer 容量上限,也是反压机制。TaskSchedule.stage 字段被忽略。
Data-flow (asynchronous) pipeline engine, inheriting from _PipelineBase. No period or stage concept — each task fires as soon as all dependencies are met + buffer available. max_depth controls the maximum number of in-flight iterations, acting as both buffer capacity and back-pressure. The TaskSchedule.stage field is ignored.
| 参数Param | 类型Type | 说明Description |
|---|---|---|
| plan | PipelinePlan | 完整调度计划(stage 被忽略)Complete scheduling plan (stage ignored) |
| max_depth | int | 最大在飞迭代数(≥ 1)Max in-flight iterations (≥ 1) |
| device | int | CUDA 设备号(默认 0)CUDA device (default 0) |
from dataflow_pipeline import DataFlowPipeline
plan = PipelinePlan(
schedule={
t_h2d: TaskSchedule(stream=memcpy_stream), # stage 被忽略
t_dist: TaskSchedule(stream=dist_stream, globally_ordered=True),
t_fwd: TaskSchedule(),
t_bwd: TaskSchedule(),
},
intra_iter_deps=[(t_dist, t_h2d), (t_fwd, t_dist), (t_bwd, t_fwd)],
inter_iter_deps=[(t_fwd, t_bwd)], # Forward(i) 等 Backward(i-1)
)
pipe = DataFlowPipeline(plan, max_depth=5, device=0)
# 公共 API 与 SWPipeline 完全一致
data_iter = pipe.fill_pipeline(dataloader)
while True:
try:
idx = pipe.progress(data_iter)
except StopIteration:
break
发射条件Fire Conditions
一个 task 只有在全部 5 个条件同时满足时才会被发射(入队到 worker):
A task is fired (enqueued to worker) only when all 5 conditions are met simultaneously:
| # | 条件Condition | 含义Meaning |
|---|---|---|
| 1 | (task, iter) ∉ fired | 该 task 在该迭代尚未发射Not already fired for this iteration |
| 2 | pending[task] < max_depth | 该 task 在飞计数未达上限Per-task in-flight count below limit |
| 3 | iter ∈ contexts | IterContext 已创建(batch 已获取)IterContext exists (batch fetched) |
| 4 | intra_deps 全部完成All intra_deps done | (dep, iter) ∈ task_done 对所有 intra dep(dep, iter) ∈ task_done for all intra deps |
| 5 | inter_deps 全部完成All inter_deps done | (dep, iter−1) ∈ task_done 对所有 inter dep(dep, iter−1) ∈ task_done for all inter deps |
"完成" 的判定:_submit_task() 末尾调用 _after_task_complete() hook,将 (task_name, iter_idx) 加入 _task_done 集合。这意味着 CUDA kernel 已 launch(event 已 record),CPU 侧 signal 已 set。
"Done" means: _submit_task() calls the _after_task_complete() hook at the end, adding (task_name, iter_idx) to the _task_done set. This means the CUDA kernel has been launched (event recorded) and CPU signal set.
_try_fire_task
fired、增加 pending 计数并入队到对应 thread group 的 worker queue。必须在持有 _fire_lock 时调用。
Checks all 5 conditions above. If all met, marks as fired, increments pending count, and enqueues to the corresponding thread group's worker queue. Must be called while holding _fire_lock.
_try_advance
IterContext(获取 batch),上限为 max_depth 个在飞迭代。每创建一个新 context,立即对所有 task 调用 _try_fire_task()。无依赖的 task(如 H2D)会立即连续发射(burst)。
Creates new IterContext objects (fetches batches), bounded by max_depth in-flight iterations. After each new context, calls _try_fire_task() for every task. Dependency-free tasks (e.g. H2D) fire immediately in burst.
_sweep_fire
pending >= max_depth 被阻塞的 task 在 pending 计数下降后可以成功发射。
Called after retirement frees buffer slots. Retries all unfired tasks across live iterations. Tasks previously blocked by pending >= max_depth may now succeed after the pending count drops.
与 SWPipeline 的关键差异Key Differences from SWPipeline
| 维度Dimension | SWPipeline | DataFlowPipeline |
|---|---|---|
| 调度模型Scheduling | Clock-driven (period 节拍)Clock-driven (period tick) | Data-flow (依赖触发)Data-flow (dependency trigger) |
| stage | 必需 — 决定 iter_idxRequired — determines iter_idx |
忽略Ignored |
num_slots |
depth = max(stage)+1 |
max_depth (构造参数)(constructor arg) |
_do_fill |
depth 个 period 逐步填充depth periods sequentially |
burst — _try_advance 连续创建 contextburst — _try_advance creates contexts |
_do_retire |
_enqueue_period(next) |
_try_advance + _sweep_fire |
_after_task_complete |
no-op | 触发 downstream intra/inter tasksfires downstream intra/inter tasks |
| 死锁分析Deadlock | 静态可证(stage 图 + ready-first)Statically provable | 需证 bounded buffer 无死锁Must prove bounded buffer safety |
内部机制Internals
_exec_task
# Simplified logic:
def _exec_task(self, task_def, ctx):
name = task_def.name
if name in self._shortcut_tasks:
if name in self._shortcut_cache:
self._apply_shortcut(ctx, task_def) # Mode 1: replay
return
before_ids = {k: id(v) for k, v in vars(ctx).items()}
task_def.fn(ctx)
self._capture_and_cache(ctx, task_def, before_ids) # Mode 2: cache
return
task_def.fn(ctx) # Mode 3: normal
fn(ctx),从缓存恢复 ctx 属性 + _GraftGrad 梯度桥 + DeclaredIO.restore()。Mode 2 (Caching): 正常执行
fn(ctx),diff ctx 属性,_cache_val 递归 detach+clone,调用 DeclaredIO.capture(),存入缓存。Mode 3 (Normal): 直接执行
fn(ctx),零额外开销。
Mode 1 (Shortcut Active): skip fn(ctx), restore ctx attrs + _GraftGrad gradient bridge + DeclaredIO.restore().Mode 2 (Caching): run
fn(ctx) normally, diff ctx attrs, _cache_val recursive detach+clone, call DeclaredIO.capture(), store in cache.Mode 3 (Normal): run
fn(ctx) directly, zero overhead.
_cache_val
torch.Tensor 执行 detach().clone(),保存 requires_grad 状态。支持 dict、list、tuple 和含 Tensor 的任意对象(如 TorchRec JaggedTensor)。返回值用 _CACHE_TAG sentinel 标记,供 _restore_val 识别重建。
Recursively cache a value. Each torch.Tensor gets detach().clone(), preserving requires_grad. Supports dict, list, tuple, and arbitrary objects with tensor attrs (e.g. JaggedTensor). Tagged with _CACHE_TAG sentinel for _restore_val.
# Tags: "t"=Tensor, "d"=dict, "l"=list, "u"=tuple, "o"=object
_cache_val(tensor) → (_CACHE_TAG, "t", tensor.detach().clone(), requires_grad)
_cache_val({"a": t}) → (_CACHE_TAG, "d", {"a": _cache_val(t)})
_cache_val(jt) → (_CACHE_TAG, "o", JaggedTensor, {attr: _cache_val(v) ...})
_restore_val
_cache_val 输出递归重建值。Tensor 通过 detach()(零拷贝)生成新叶节点,忠实恢复 requires_grad。每次调用返回全新 Tensor,防止 "backward through graph a second time" 错误。
Recursively restore from _cache_val output. Tensors get detach() (zero-copy) as fresh leaves, faithfully restoring requires_grad. Each call returns brand-new tensors, preventing "backward through graph a second time" errors.
_GraftGrad
grad_output 传给 restored tensor;将预分配零张量传给 upstream inputs(触发它们的 backward 但不影响参数梯度)。确保 shortcut 后下游 loss.backward() 能正确传播梯度。
Custom autograd Function serving as shortcut's "gradient bridge". Forward: identity (returns restored tensor unchanged). Backward: passes grad_output to restored; passes pre-allocated zeros to upstream inputs (triggers backward without affecting param grads). Ensures loss.backward() works after shortcutting.
_SubmissionSequencer
globally_ordered Task 获得预分配的序列号。execute_ordered(seq, fn) 阻塞直到轮到该 Task,确保所有 rank 以相同顺序调用 NCCL 集合操作,防止死锁。超时 30 秒抛 RuntimeError。
Deterministic cross-thread submission sequencer. Each globally_ordered task gets a pre-assigned sequence number. execute_ordered(seq, fn) blocks until it's that task's turn, ensuring all ranks call NCCL collectives in identical order (prevents deadlocks). 30s timeout raises RuntimeError.
| 方法Method | 说明Description |
|---|---|
reset() | 重置序列号为 0Reset sequence counter to 0 |
execute_ordered(seq, fn) | 等待 _next_seq == seq,然后执行 fn()Wait for _next_seq == seq, then execute fn() |
执行模型:同步 vs 异步流水线Execution Model: Synchronous vs Asynchronous Pipeline
_PipelineBase 基础设施,公共 API 完全相同。本节对比两种设计的原理、优劣和适用场景。
The execution model determines when a task fires. The framework provides two scheduling strategies: SWPipeline (synchronous / clock-driven) and DataFlowPipeline (asynchronous / data-flow), sharing _PipelineBase infrastructure with an identical public API. This section compares the two designs.
同步流水线(Clock-Driven)— 当前实现Synchronous Pipeline (Clock-Driven) — Current Implementation
同步模型以 period 为节拍,每个 period 恰好提交所有 task 各一次。progress() 是时钟:退休一个 iteration → 入队下一个 period → 窗口固定为 depth 个 period。
即使 H2D 没有任何依赖,也必须等 progress() 入队它所在的 period 才能开始。Task 的发射由 period 节拍控制,而非依赖就绪状态。
The synchronous model advances in periods. Each period submits every task exactly once. progress() is the clock: retire one iteration → enqueue next period → window fixed at depth periods.
Even if H2D has zero dependencies, it must wait for progress() to enqueue its period. Task firing is driven by the period clock, not dependency readiness.
# _enqueue_period — 同步模型的核心: 一个 period 提交所有 task
def _enqueue_period(self, period, data_iter):
for task in self._enqueue_order:
iter_idx = period - stage[task]
queue.put((iter_idx, task)) # 按 period 节拍提交
def progress(self, data_iter):
retire(oldest) # ① 退休最老的 iteration
_enqueue_period(next_period) # ② 入队下一个 period
return oldest # 窗口 = depth 个 period
时序图:SparseDist (depth=3)Timeline: SparseDist (depth=3)
Period H2D(memcpy) InputDist(data_dist) FWD+BWD+OPT(default)
───────── ────────────────── ────────────────────
P0 █ iter0
P1 █ iter1 █ iter0
P2 █ iter2 █ iter1 ████ iter0
P3 █ iter3 █ iter2 ████ iter1
P4 █ iter4 █ iter3 ████ iter2
↑ ↑
H2D 很快完成后空闲, 瓶颈,100% 利用率
等下一个 progress() 才能发射下一个 H2D
观察: H2D(1ms) 在每个 period 中只工作 1ms,剩余 9ms 空闲等待 period 节拍。这不影响稳态吞吐(瓶颈是 FWD+BWD),但意味着 H2D 无法提前缓冲数据来吸收偶发的 PCIe 延迟抖动。 Observation: H2D (1ms) works only 1ms per period, idle for 9ms waiting for the period clock. This doesn't affect steady-state throughput (bottleneck is FWD+BWD), but means H2D cannot pre-buffer data to absorb occasional PCIe latency jitter.
异步流水线(Data-Flow)— DataFlowPipelineAsynchronous Pipeline (Data-Flow) — DataFlowPipeline
异步模型中,每个 task 在所有依赖完成 + buffer 有空位时立即发射,不受 period 节拍约束。这是硬件设计中的 elastic pipeline(带 FIFO 缓冲的异步流水线)。
H2D 没有依赖 → 只要 buffer 有空位就持续发射。InputDist 依赖 H2D → H2D(i) 完成后立即发射 InputDist(i),无需等下一个 period。
In the async model, each task fires as soon as all dependencies are met + buffer slot available, independent of any period clock. This is the elastic pipeline from hardware design (async pipeline with FIFO buffers).
H2D has no dependencies → fires continuously as long as buffer has space. InputDist depends on H2D → fires immediately when H2D(i) completes, no waiting for next period.
# DataFlowPipeline 的实际实现 (dataflow_pipeline.py)
class DataFlowPipeline(_PipelineBase):
def _after_task_complete(self, task_name, iter_idx):
"""_submit_task() 末尾的 hook — 触发下游 task"""
with self._fire_lock:
self._task_done.add((task_name, iter_idx))
for ds in self._downstream_intra[task_name]:
self._try_fire_task(ds, iter_idx) # 同迭代下游
for ds in self._downstream_inter[task_name]:
self._try_fire_task(ds, iter_idx + 1) # 下一迭代下游
def _try_fire_task(self, task_name, iter_idx):
"""5 条件全满足时才入队 worker"""
if (task_name, iter_idx) in self._fired: return
if self._task_pending[task_name] >= self.max_depth: return
if iter_idx not in self._contexts: return
for dep in intra_deps[task_name]:
if (dep, iter_idx) not in self._task_done: return
# ... inter_iter_deps 同理 ...
self._fired.add((task_name, iter_idx))
self._task_pending[task_name] += 1
queue.put((iter_idx, task_def)) # 入队!
时序图:SparseDist + 异步 H2D (buffer=5)Timeline: SparseDist + Async H2D (buffer=5)
Time H2D(memcpy) InputDist(data_dist) FWD+BWD+OPT(default)
───────── ────────────────── ────────────────────
t0 █0 █1 █2 █3 █4 ← H2D 连续发射 5 个
t1 █ iter0 ← H2D(0) 完成即触发
t2 █5 █ iter1 ████ iter0
t3 █6 █ iter2 ████ iter1
↑ ↑
H2D 跑在前面, 瓶颈不变
缓冲了多个 batch 但 jitter 被缓冲吸收
关键区别: H2D 在 t0 就完成了 5 个 batch 的传输(连续发射),而非每个 period 才传输一个。当 H2D 偶尔遇到 PCIe 延迟抖动(如 5ms),InputDist 可以直接从 buffer 获取已就绪的 batch,GPU 计算不受影响。 Key difference: H2D transfers 5 batches at t0 (burst), not one per period. When H2D occasionally hits PCIe jitter (e.g. 5ms), InputDist grabs an already-ready batch from the buffer, keeping GPU compute unaffected.
对比与取舍Comparison & Tradeoffs
1 / Tbottleneck。异步模型 不能 提高稳态吞吐 —— 瓶颈 task(通常是 Forward+Backward)决定上限。异步模型的价值在于减少非稳态损失。
Steady-state theorem: Regardless of sync or async, steady-state throughput = 1 / Tbottleneck. The async model cannot improve steady-state throughput — the bottleneck task (typically Forward+Backward) sets the ceiling. The value of async lies in reducing non-steady-state losses.
| 维度Dimension | 同步 (Clock-Driven)Synchronous | 异步 (Data-Flow)Asynchronous |
|---|---|---|
| 发射条件Fire condition | progress() 入队 period 后enqueues period |
依赖就绪 + buffer 有空位deps ready + buffer available |
| 稳态吞吐Steady throughput | 1 / Tbottleneck | 1 / Tbottleneck (相同)(same) |
| Jitter 容忍Jitter tolerance | 无 — 非瓶颈 task 的抖动直接传导None — non-bottleneck jitter propagates | 有界 buffer 吸收抖动Bounded buffer absorbs jitter |
| Pipeline 填充Pipeline fill | depth 个 period 逐步填充periods to fill |
快速 burst — 无依赖 task 立即连续发射Fast burst — dependency-free tasks fire immediately |
| Buffer 数量Buffer count | depth (确定性)(deterministic) |
最多 max_buffer(可配置上限)Up to max_buffer (configurable cap) |
| 内存可预测性Memory predictability | ✓ 完全确定✓ Fully deterministic | ✗ 取决于运行时抖动✗ Depends on runtime jitter |
| 死锁分析Deadlock analysis | ✓ 静态可证(stage 图 + ready-first)✓ Statically provable (stage graph + ready-first) | ✗ 需证 bounded buffer 不死锁✗ Must prove bounded buffer deadlock-freedom |
| 实现复杂度Complexity | _enqueue_period — 简单循环_enqueue_period — simple loop |
per-task 触发器 + 反压 + buffer 管理per-task triggers + back-pressure + buffer mgmt |
| 退化条件Degeneracy | — | max_buffer = 1 → 退化为同步模型→ degenerates to synchronous |
异步模型的三个价值场景Three Value Scenarios for Async Model
- 吸收 Jitter(最大价值): H2D 的 PCIe 延迟和 InputDist 的 all-to-all 网络延迟均有不可预测的波动。同步模型下这些波动直接传导为 GPU 空闲;异步模型用 buffer 吸收,将随机延迟与 GPU 计算解耦。
- 更快的 Pipeline 填充: 同步模型需要
depth个progress()调用才能填满流水线。异步模型中 H2D 等无依赖 task 可以 burst 式连续发射,显著缩短启动延迟。 - 异构资源解耦: H2D(DMA 引擎)、InputDist(NIC)、Forward(GPU SM)是不同的物理资源。同步模型强制它们以相同节拍工作;异步模型让它们各自以最大速率运行,通过 FIFO 自然解耦。
- Absorbing jitter (greatest value): H2D's PCIe latency and InputDist's all-to-all network latency both have unpredictable variance. In the sync model this variance directly becomes GPU idle time; async buffers absorb it, decoupling random delays from GPU compute.
- Faster pipeline fill: The sync model needs
depthprogress()calls to fill the pipeline. In the async model, dependency-free tasks like H2D can burst-fire, significantly reducing startup latency. - Heterogeneous resource decoupling: H2D (DMA engine), InputDist (NIC), and Forward (GPU SM) are different physical resources. The sync model forces them into lock-step; the async model lets each run at maximum rate, naturally decoupled through FIFOs.
判断标准: 如果 nsys profile 显示非瓶颈 task 的延迟抖动导致了显著的 GPU stall(例如 H2D 偶尔 5ms 导致 Forward 空等),则值得引入有界 buffer。如果 GPU 利用率已接近 100%,异步模型不会带来额外收益。 Core tension — GPU memory: In recommendation systems GPU memory is extremely tight (large embedding tables consume most of it). The async model needs more buffer slots → more memory. The benefit (absorbing jitter) must be weighed against the cost (stealing memory from embedding tables), guided by profiling data.
Decision criterion: If nsys profiles show non-bottleneck task jitter causing significant GPU stalls (e.g. H2D occasionally 5ms causing Forward to idle), bounded buffers are worthwhile. If GPU utilization is already near 100%, async brings no additional benefit.
如何选择How to Choose
两种模型已通过 _PipelineBase 共享基础设施,公共 API(fill_pipeline / progress / drain / run)完全相同,切换只需更换构造函数:
# 同步: pipe = SWPipeline(plan, device=0)
# 异步: pipe = DataFlowPipeline(plan, max_depth=5, device=0)
推荐 SWPipeline(同步)当:
- 显存紧张: buffer 数量 =
depth,完全确定,无运行时惊喜 - 需要可证明的死锁自由: ready-first 算法在静态 stage 图上证明无死锁
- 需要可预测的调试:
print_schedule()精确输出每个 period 的 task-iteration 映射
推荐 DataFlowPipeline(异步)当:
- nsys 显示 jitter 导致 GPU stall: 非瓶颈 task(H2D、InputDist)的延迟抖动传导为 GPU 空闲
- 需要更快的 pipeline 填充: 无依赖 task burst 式发射,缩短启动延迟
- 异构资源需要解耦: DMA/NIC/GPU SM 各自以最大速率运行
Both models share infrastructure via _PipelineBase with identical public API (fill_pipeline / progress / drain / run). Switching requires only a constructor change:
# Sync: pipe = SWPipeline(plan, device=0)
# Async: pipe = DataFlowPipeline(plan, max_depth=5, device=0)
Prefer SWPipeline (sync) when:
- Memory is tight: buffer count =
depth, fully deterministic, no runtime surprises - Need provable deadlock freedom: ready-first algorithm proves deadlock-freedom on static stage graph
- Need predictable debugging:
print_schedule()precisely shows task-iteration mapping per period
Prefer DataFlowPipeline (async) when:
- nsys shows jitter-induced GPU stalls: non-bottleneck task jitter (H2D, InputDist) propagating as GPU idle time
- Need faster pipeline fill: dependency-free tasks burst-fire, reducing startup latency
- Heterogeneous resources need decoupling: DMA/NIC/GPU SM each running at maximum rate
端到端示例 — FusedSparseDistEnd-to-End Example — FusedSparseDist
# TrainPipelineFusedSparseDist 的 SWPipeline 等价表示
# 9 个 task · 3 stages · 4 streams: memcpy, data_dist, emb_lookup, default
import torch
from sw_pipeline import (
PipelineTask, TaskSchedule, PipelinePlan, SWPipeline,
)
# 1. Streams
memcpy_stream = torch.cuda.Stream()
data_dist_stream = torch.cuda.Stream()
emb_lookup_stream = torch.cuda.Stream()
# 2. Tasks (function bodies omitted for brevity)
t_h2d = PipelineTask("H2D", h2d)
t_dist_start = PipelineTask("InputDistStart", input_dist_start)
t_dist_wait = PipelineTask("InputDistWait", input_dist_wait)
t_emb = PipelineTask("EmbLookup", emb_lookup)
t_zero = PipelineTask("ZeroGrad", zero_grad)
t_wait = PipelineTask("WaitBatch", wait_batch)
t_fwd = PipelineTask("Forward", dense_forward)
t_bwd = PipelineTask("Backward", backward)
t_opt = PipelineTask("OptimizerStep", optimizer_step)
# 3. PipelinePlan — 3 stages, 4 streams
plan = PipelinePlan(
schedule={
t_h2d: TaskSchedule(stage=0, stream=memcpy_stream),
t_dist_start: TaskSchedule(stage=1, stream=data_dist_stream, globally_ordered=True),
t_dist_wait: TaskSchedule(stage=1, stream=data_dist_stream),
t_emb: TaskSchedule(stage=2, stream=emb_lookup_stream),
t_zero: TaskSchedule(stage=2), # default stream
t_wait: TaskSchedule(stage=2),
t_fwd: TaskSchedule(stage=2),
t_bwd: TaskSchedule(stage=2),
t_opt: TaskSchedule(stage=2),
},
intra_iter_deps=[
(t_dist_start, t_h2d), # cross-stage: memcpy → data_dist
(t_dist_wait, t_dist_start), # same-stream ordering
(t_emb, t_dist_wait), # cross-stage: data_dist → emb_lookup
(t_fwd, t_emb), # cross-stream: emb_lookup → default
(t_wait, t_zero), # same-stream ordering
(t_fwd, t_wait), # same-stream ordering
(t_bwd, t_fwd),
(t_opt, t_bwd),
],
inter_iter_deps=[
(t_emb, t_bwd), # EmbLookup(i) 依赖 Backward(i-1):TBE fused 在 backward 时更新权重
],
)
# pipeline_depth 自动推断: max(stage)+1 = 3
# 4. Construct & inspect
pipe = SWPipeline(plan, device=0)
pipe.print_schedule(5)
# print_schedule(5) 输出:
# # Task Thread Stream | P0 P1 P2 P3 P4
# -- ----------------- ------- ------------ + ----- ----- ----- ----- -----
# 0 EmbLookup default emb_lookup | -- -- i0 i1 i2
# 1 ZeroGrad default default | -- -- i0 i1 i2
# 2 WaitBatch default default | -- -- i0 i1 i2
# 3 Forward default default | -- -- i0 i1 i2
# 4 Backward default default | -- -- i0 i1 i2
# 5 OptimizerStep default default | -- -- i0 i1 i2
# 6 InputDistStart default data_dist | -- i0 i1 i2 i3
# 7 InputDistWait default data_dist | -- i0 i1 i2 i3
# 8 H2D default memcpy | i0 i1 i2 i3 i4
# 复用完全相同的 plan — DataFlowPipeline 忽略 stage 字段
from dataflow_pipeline import DataFlowPipeline
pipe_df = DataFlowPipeline(plan, max_depth=5, device=0)
# 公共 API 完全一致
data_iter = pipe_df.fill_pipeline(dataloader)
while True:
try:
idx = pipe_df.progress(data_iter)
except StopIteration:
break
# 区别:
# - fill_pipeline 阶段,H2D 会 burst 式连续发射 5 个 batch(无需等 period)
# - 每个 task 完成后立即触发下游,无 period 节拍约束
# - max_depth=5 控制反压:最多 5 个迭代同时在飞
基于Based on sw_pipeline.py + dataflow_pipeline.py · ← 回到 SWPipeline 总览← Back to SWPipeline Overview