Schedulable Pipeline Engine可调度流水线引擎
- Three-field dependency API:
depends_onis now same-batch logical dependency only ("this task on batch K waits for X to have processed batch K"). Two new sibling fields cover the other two intents:cross_iter_depends_on=("X", ("Y", -2), ...)— different-batch logical dependency. Bare name is shorthand for(name, -1); explicit tuple required for N≠1. Positive / zero offsets rejected.same_progress_sync=("X", ...)— same-progress GPU/stream coherency wait, regardless of which batches X and self are processing. Mirrors the legacydefault_stream.wait_stream(prefetch_stream)pattern (e.g.backward.same_progress_sync=("prefetch_embeddings",)).
Task.__init__.三字段依赖 API:depends_on现在只表达同 batch 的逻辑依赖("本 task 处理 batch K 时,要等 X 处理完 batch K")。另外两个意图改用新增的姐妹字段:cross_iter_depends_on=("X", ("Y", -2), ...)—— 跨 batch 的逻辑依赖。bare name 等价于(name, -1);N≠1 必须用显式 tuple。正/零 offset 直接拒绝。same_progress_sync=("X", ...)—— 同一次 progress 内的 GPU/stream 协同等待,与 X / self 处理的 batch 无关。对应遗留的default_stream.wait_stream(prefetch_stream)模式(如backward.same_progress_sync=("prefetch_embeddings",))。
Task.__init__校验。 - Topological-sort execution order replaces declaration order:
SchedulablePipeline.__init__callstopological_sort(schedule)(Kahn's algorithm) and rebuilds the single-stageSchedulewith the sorted task tuple. Edges come fromreads/writesexact slot match,depends_onwith matching lookahead,same_progress_sync, andcross_iter_depends_onentries with Δ=0 (auto-promoted, see next bullet). Δ ≥ 1cross_iter_depends_onand cross-lookaheaddepends_onare excluded (cross-progress deps are satisfied by ring rotation, not intra-progress ordering). Declaration order is now only a tie-break for incomparable tasks.拓扑排序执行序取代声明顺序:SchedulablePipeline.__init__调用topological_sort(schedule)(Kahn 算法),用排序后的任务元组重建单 stage 的Schedule。边来自reads/writes精确 slot 匹配、lookahead 相等的depends_on、same_progress_sync,以及 Δ=0 的cross_iter_depends_on(自动提升,见下一条);Δ ≥ 1 的cross_iter_depends_on与跨 lookahead 的depends_on不入边(跨 progress 由 ring 平移满足)。声明顺序仅作为不可比任务之间的 tie-break。 - Cross-iter slot-offset formula corrected:
cross_iter_depends_on=((X, -N))now resolves the producer's event atslot_offset = consumer.batch_offset + neg_offset, notproducer.batch_offset + neg_offset. The old formula was only correct whenproducer.lookahead == consumer.lookahead; the new one correctly indexes the consumer's view of the ring regardless of producer lookahead.修正跨迭代 slot 偏移公式:cross_iter_depends_on=((X, -N))现在通过slot_offset = consumer.batch_offset + neg_offset取 producer 事件,不再用producer.batch_offset + neg_offset。旧公式只在producer.lookahead == consumer.lookahead时正确;新公式从 consumer 视角索引 ring,与 producer lookahead 无关。 - Δ = 0
cross_iter_depends_onauto-promoted tosame_progress_syncsemantics: whenconsumer.la == producer.la + |neg_offset|(the producer and consumer run in the same progress on different batches), the engine now treats the declaration as if the user had writtensame_progress_sync=(producer,)— adds a topo-DAG edge producer → consumer in_build_same_progress_dag_edges, adds the cross-thread CPUthreading.Eventin_compute_cpu_deps, and emits the cross-streamwait_eventatslot[producer.la](no ring rotation). Earlier this case raised at construction; the rejection was UX-only — seetasks/SPEC_cross_iter_delta0_autoconvert.mdandtests/commons/test_engine_same_progress_sync_correctness.py(parametrized: same_progress_sync explicit ≡ cross_iter Δ=0 auto-promoted, GPU-verified).Δ = 0cross_iter_depends_on自动提升为same_progress_sync语义:当consumer.la == producer.la + |neg_offset|(producer 与 consumer 同 progress 处理不同 batch),引擎现在把这个声明当作用户写了same_progress_sync=(producer,)处理:在_build_same_progress_dag_edges里加 topo 边 producer → consumer,在_compute_cpu_deps里加跨线程threading.Event边,跨 stream 时在slot[producer.la]emitwait_event(无 ring 旋转)。早期版本会在构造时 raise;之前的拒绝纯属 UX 选择——详见tasks/SPEC_cross_iter_delta0_autoconvert.md与tests/commons/test_engine_same_progress_sync_correctness.py(参数化:显式 same_progress_sync ≡ Δ=0 cross_iter 自动提升,GPU 验证一致)。 - Cross-iter future-read validator:
infer_cross_stream_event_depsraises at schedule construction when Δ < 0 (i.e.,consumer.la > producer.la + |neg_offset|): the producer hasn't yet processed batch K-N by the consumer's progress, so the dependency is impossible to honor.跨迭代 future-read 校验:infer_cross_stream_event_deps在 Δ < 0(即consumer.la > producer.la + |neg_offset|)时构造期 raise——consumer 的 progress 时 producer 还没处理过 batch K-N,依赖不可能满足。 - PostProc dead-code removed:
PipelinedPostproc,PipelineState.pipelined_postprocs, theset_module_contextpropagation loop, and the now-orphaned_validate_set_context_colocationguard are deleted (never wired in production). The torchrecset_contextmutation that motivated those guards no longer exists in this codebase.删除 PostProc 死代码:PipelinedPostproc、PipelineState.pipelined_postprocs、set_module_context传播循环、以及随之失去意义的_validate_set_context_colocation守卫全部删除(从未在生产链路接入)。当初触发这些守卫的 torchrecset_contextmutation 已不在本仓库出现。
- User-facing Task API:
batch_offset=Nkwarg removed. Uselookahead=Ninstead — same semantics, better name.reads/writesaccept bare strings; offsets are inferred from the producer'slookahead.用户可见的 Task API:移除了batch_offset=N形参。改用lookahead=N,语义不变、名字更准。reads/writes直接传字符串,offset 由生产者的lookahead自动推断。 - Bare-name
depends_on: writedepends_on=("prefetch_embeddings",)and the engine auto-classifies as within-iter or cross-iter from the producer's vs consumer'slookahead. No more leaking pseudo-slot names likemodule_input_post_prefetchin user code. (Superseded in v3 — see above.)bare-namedepends_on:直接写depends_on=("prefetch_embeddings",),引擎根据生产者/消费者的lookahead自动判断是同迭代还是跨迭代依赖。再也不用把module_input_post_prefetch这类 pseudo-slot 暴露在用户代码里。(v3 已替换为三字段,见上。) - Declarative
Pipelinewrapper:commons.pipeline.engine.Pipeline(tasks=[...])— a thin façade overSchedulablePipeline+Schedule+StreamPoolfor one-shot user code.声明式Pipeline封装:commons.pipeline.engine.Pipeline(tasks=[...]),把SchedulablePipeline+Schedule+StreamPool一层薄封装,给一次性用户代码用。 - Event-based cross-stream sync: executor prefers
wait_event(producer_event)from the ring slot over coarsewait_stream(producer_stream). Stream-level wait remains as first-iter fallback.基于 Event 的跨流同步:执行器优先从 ring slot 取wait_event(producer_event),回退到粗粒度的wait_stream(producer_stream)。stream 级别的 wait 仍作为第一轮迭代的兜底。 - NVTX integration: every
task.run(ctx)is wrapped innvtx.annotate(task.nvtx_tag or task.name)at all execution callsites (sequential, threaded, single-task fast path, NCCL-locked path). No-op on CPU-only hosts.NVTX 集成:所有执行点(sequential / threaded / 单任务快路径 / NCCL-锁路径)都用nvtx.annotate(task.nvtx_tag or task.name)包裹了task.run(ctx)。CPU-only 环境下是 no-op。 HSTUPipeline.detach()resets bookkeeping so a follow-upattach()+progress()rebuilds the engine cleanly (Codex B-MEDIUM-1).HSTUPipeline.detach()现在重置 bookkeeping,下一次attach()+progress()能干净地重建引擎(Codex B-MEDIUM-1)。SchedulablePipeline.progress()resets §4.8 state on a fresh iterator (mirrors legacytrain_pipeline.py:418). Switching from train loader to eval loader inside one session no longer raisesStopIterationimmediately.SchedulablePipeline.progress()在新 iterator 上重置 §4.8 状态(对齐遗留的train_pipeline.py:418)。在同一个 session 里从 train loader 切到 eval loader 不再立刻抛StopIteration。- HSTU pretrain ranking GR integration:
RECSYS_PIPELINE_BACKEND=new(defaultlegacy) routesexamples/hstu/training/pretrain_gr_ranking.pythroughHSTUPipelineFactory. 8-GPU benchmark on synthetic 8-layer 1024-hidden + cutlass: 568.24 / 565.76 TFLOPS (legacy / new, within 0.4%); 81.8% NCCL/compute overlap; 64 host threads × 5 streams. Seetasks/nsys_runs/RESULTS.md.集成到 HSTU pretrain ranking GR:RECSYS_PIPELINE_BACKEND=new(默认legacy)把examples/hstu/training/pretrain_gr_ranking.py走HSTUPipelineFactory。8 卡基准(合成 8 层 1024 隐藏 + cutlass):568.24 / 565.76 TFLOPS(legacy / new,差 0.4%);NCCL/compute overlap 81.8%;64 个 host 线程跨 5 个 stream。详见tasks/nsys_runs/RESULTS.md。
1. Background & motivation1. 背景与动机
The legacy training pipeline at examples/commons/pipeline/train_pipeline.py follows torchrec's design: a hand-rolled progress() method that hardcodes task sequencing — H2D → input_dist → wait → forward → backward → optimizer — with HSTU-specific extensions wedged in (KK shuffler, loss-token AllReduce, Megatron grad finalize, NCCL stream discipline).
遗留的训练流水线 examples/commons/pipeline/train_pipeline.py 沿用 torchrec 的设计:一个手写的 progress() 方法把任务顺序写死 —— H2D → input_dist → wait → forward → backward → optimizer —— 然后在中间硬塞各种 HSTU 专属扩展(KK shuffler、loss-token AllReduce、Megatron 梯度收尾、NCCL 流纪律等)。
Three structural problems motivated the rework: 驱动这次重写的三个结构性问题:
- Coupling耦合: every task ordering decision lived inside one Python method. Adding a stage required editing the loop. 所有任务顺序决策都挤在一个 Python 方法里。新增一个阶段就得改主循环。
- Single-threaded单线程: CPU-side kernel submission was strictly serial; CPU-bound work (e.g. KK partitioner) blocked the whole pipeline. CPU 端的 kernel 提交是严格串行的;任何 CPU-bound 的工作(比如 KK partitioner)都会阻塞整条流水线。
-
No customization point没有定制点:
variants like
JaggedMegatronPrefetchwere full subclass forks. 像JaggedMegatronPrefetch这样的变体只能整个子类化复制粘贴。
The rework is structured as three coupled but independent slices: 重写被拆成 三个相互关联但独立交付的切片:
| Slice切片 | Layer层级 | Goal目标 | Constraint约束 |
|---|---|---|---|
| Problem #1 | Engine引擎 | Decouple tasks from schedule with HugeCTR-style primitives用 HugeCTR 风格的原语把任务和调度解耦 | Framework-agnostic — no torchrec, megatron, fbgemm_gpu imports框架无关 —— 不引入 torchrec、megatron、fbgemm_gpu |
| Problem #3 | Threaded executor多线程执行器 | Multi-threaded task dispatch with NCCL ordering safety多线程任务派发,并保持 NCCL 顺序安全 | Same engine still works single-threaded同一个引擎仍能在单线程下工作 |
| Problem #2 | HSTU adapterHSTU 适配层 | Drive torchrec's _rewrite_model machinery via the engine通过引擎驱动 torchrec 的 _rewrite_model 机制 |
Legacy train_pipeline.py stays byte-identical遗留的 train_pipeline.py 保持字节级不变 |
Pipeline + Scheduleable abstraction. We borrow the "task as schedulable unit" model but replace HugeCTR's C++ runtime with a Python engine + PyTorch streams.
设计灵感来自 HugeCTR 的 Pipeline + Scheduleable 抽象。我们沿用"任务即可调度单元"的模型,但把 HugeCTR 的 C++ 运行时换成了 Python 引擎 + PyTorch 流。
2. Architecture overview2. 架构概览
2.1 Three layers2.1 三层结构
2.2 Module dependency2.2 模块依赖
# Engine core — pure stdlib + torch only commons.pipeline.engine ├── task.py # Task, DataSlot ├── context.py # SlotStore, BatchRing, TaskContext (thread-local) ├── streams.py # StreamPool with anchor_device + use(stream) ctx ├── schedule.py # Schedule, Stage ├── deps.py # infer_cross_stream_waits ├── pipeline.py # SchedulablePipeline driver + _seed_first_batch ├── executor.py # Sequential + Threaded + _NcclOrderedLock ├── _presets.py # T1/T2 forward/backward/optimizer task factories └── autosched/ ├── validator.py # 8-rule SPEC §4.2 schedule validity ├── cost_model.py # CostProfiler, CostModel (JSON roundtrip) └── list_scheduler.py # Critical-path list scheduler # HSTU adapter — depends on torchrec, megatron, dynamicemb commons.pipeline.hstu_pipeline ├── pipeline.py # HSTUPipeline + HSTU_DEFAULT_THREAD_MAP ├── tasks.py # 14 task factories + PipelineState └── factory.py # HSTUPipelineFactory
2.3 Hard invariants2.3 硬性不变量
-
Engine framework-free引擎层框架无关:
commons.pipeline.engine.*never importstorchrec,megatron,fbgemm_gpu, orcommons.distributed.*. Enforced bytest_engine_import_hygiene.py.commons.pipeline.engine.*不引入torchrec、megatron、fbgemm_gpu或commons.distributed.*。由test_engine_import_hygiene.py强制约束。 -
Legacy untouched遗留代码字节级不动:
train_pipeline.py,train_pipeline_factory.py,utils.pystay byte-identical. Enforced bytest_engine_legacy_untouched.py.train_pipeline.py、train_pipeline_factory.py、utils.py保持字节级一致。由test_engine_legacy_untouched.py强制约束。 -
v1-only contexts in HSTUHSTU 仅允许 v1 context:
create_torchrec_ctxassertsctx.version == 1. Enforced bytest_no_v0_context_references_in_hstu_pipeline(regex scan).create_torchrec_ctx断言ctx.version == 1。由test_no_v0_context_references_in_hstu_pipeline(正则扫描)强制约束。 -
Out-of-slot shared state needs an engine-visible happens-beforeslot 之外的共享状态需要引擎可见的 happens-before:
Rule. Engine only inserts sync for edges it can see: same-slot
reads/writes, matching-ladepends_on, orsame_progress_sync. Anything outside slots — module attributes,PipelineStatefields, globals — is invisible. If two tasks mutate the same out-of-slot object without one of those declared edges, author must colocate them on one thread viathread_map(same thread = serial, no race). Convention only; no runtime check.
Example. Two tasks both writemodule.forward._context:start_input_dist(la=1) andforward(la=0). Within one progress, ordering is fine —forwardreadstorchrec_ctxthatstart_input_distwrote (slot edge). But across progressesforward(P_n) andstart_input_dist(P_n) actually run concurrently in the same progress on different lookaheads, touching different batches' contexts but the same Python attribute → race. Fix used: drop the shared mutation entirely (PostProc cleanup); the colocation guard that previously enforced "put both on one thread" was deleted along with it. 规则。引擎只对它能看见的边加 sync:同 slot 的reads/writes、la 相等的depends_on、或same_progress_sync。slot 之外的东西—— module 属性、PipelineState字段、全局变量——引擎看不见。如果两个 task 改 slot 之外的同一个对象、又没声明上述任意一种边,必须用thread_map放到同一个线程(单线程内自然串行)。约定式约束,运行时不检查。
例子。两个 task 都写module.forward._context:start_input_dist(la=1) 与forward(la=0)。同一次 progress 内顺序没问题——forward读start_input_dist写过的torchrec_ctxslot(slot 边)。但 P_n 里的forward与start_input_dist同 progress 不同 la 并发跑,触碰不同批次的 context 但是同一个 Python 属性 → 竞态。修法:直接砍掉这个共享 mutation(PostProc 清理),原本"把两者放同线程"的运行时校验也一并删除。
3. Engine layer (Problem #1)3. 引擎层(Problem #1)
3.1 Task & DataSlot3.1 Task 与 DataSlot
A Task is a unit of work with declarative metadata for the engine to schedule:
Task 是一个带声明式元数据的工作单元,由引擎进行调度:
class Task: name: str stream: str # named CUDA stream context priority: int # NVTX priority hint lookahead: int # SPEC_p4 v2: 0 = current batch, k = k iters ahead reads: Tuple[DataSlot, ...] # slots consumed (str shorthand auto-wraps) writes: Tuple[DataSlot, ...] # slots produced depends_on: Tuple[str, ...] # SAME-batch logical edge: this task on # batch K waits for X to have processed K cross_iter_depends_on: Tuple[Tuple[str, int], ...] # DIFFERENT-batch logical edge: ("X",) is # shorthand for (X, -1); ("Y", -N) for N≠1. # Positive/zero offsets rejected. same_progress_sync: Tuple[str, ...] # SAME-progress GPU/stream coherency wait, # regardless of which batches X/self process # (mirrors legacy default_stream.wait_stream) nccl: bool # participate in NCCL ordered lock nvtx_tag: Optional[str] # auto-emitted via nvtx.annotate def run(self, ctx: TaskContext) -> None: ...
SPEC_p4 v3 user API note: the three dependency fields encode three distinct intents and are not interchangeable. depends_on is for same-batch logical waits (e.g. backward.depends_on=("zero_grad",)); cross_iter_depends_on is for "wait for X from N batches ago" (rare in HSTU); same_progress_sync is for "wait for X's GPU work in this progress to finish" (e.g. backward.same_progress_sync=("prefetch_embeddings",) in the prefetch variant — a stream coherency wait, not a logical batch dependency). Names mentioned in two fields raise at Task.__init__. Subclasses can set the three as class attributes; normalization runs unconditionally so bare-name shorthand works in both kwargs and class-attr forms.
SPEC_p4 v3 用户 API 说明:三个依赖字段表达三种不同的意图,不可互换。depends_on 用于同 batch 的逻辑等待(如 backward.depends_on=("zero_grad",));cross_iter_depends_on 用于"等 X 处理 N 个 batch 之前的工作"(HSTU 暂无用例);same_progress_sync 用于"等 X 在本次 progress 内的 GPU 工作做完"(如 prefetch 变体的 backward.same_progress_sync=("prefetch_embeddings",) —— 是 stream 协同等待,不是逻辑 batch 依赖)。同一个名字出现在两个字段会在 Task.__init__ 抛错。子类可以以类属性形式设置三者;规范化无条件运行,bare-name 简写在 kwargs 和 class-attr 两种写法下都生效。
SPEC_p4 v2 user API note (commits 2060ee62, 9a2c7a12, faa7a30f): the legacy batch_offset=N kwarg is removed (Phase C hard break). Pass lookahead=N; reads / writes accept bare strings. Pseudo-slot names (e.g. module_input_post_prefetch) are no longer required to model HSTU's prefetch→forward mutation chain.
SPEC_p4 v2 用户 API 说明(提交 2060ee62、9a2c7a12、faa7a30f):legacy 的 batch_offset=N 形参已移除(Phase C hard break)。改用 lookahead=N;reads / writes 直接传字符串。HSTU 的 prefetch→forward mutation chain 不再需要 pseudo-slot 名(如 module_input_post_prefetch)。
DataSlot is an opaque named handle: (name: str, batch_offset: int). Two slots are equal iff both fields match. Cross-iter data flow happens via BatchRing.advance() shifting the slot store rather than copying values.
DataSlot 是一个不透明的命名句柄:(name: str, batch_offset: int)。两个 slot 相等当且仅当两个字段都相等。跨迭代的数据流是通过 BatchRing.advance() 平移 slot store 实现的,并不真的拷贝数据。
3.2 Schedule & Stage3.2 Schedule 与 Stage
A Schedule is a frozen DAG declaration:
Schedule 是一个冻结的 DAG 声明:
schedule = Schedule( stages=( Stage(tasks=(h2d_task, fwd_task, bwd_task, opt_task)), ), stream_slots=("default", "memcpy"), )
Stages run in the order they appear in Schedule.stages. The tasks within a single-stage schedule are reordered at SchedulablePipeline.__init__ by topological_sort() (autosched/deps.py): edges come from exact-slot reads/writes match, depends_on with matching lookahead, and same_progress_sync; cross_iter_depends_on and cross-lookahead depends_on contribute no in-progress edge (cross-iter is satisfied by ring rotation). Author-declaration order is the tie-break for incomparable tasks. The §4.8 mask + executor parallelism choice still gate runtime triggering. in_flight_batches is derived from max(task.lookahead) + 1.
Stage 按 Schedule.stages 中出现的顺序运行。单 stage 内的任务由 SchedulablePipeline.__init__ 调用 topological_sort()(autosched/deps.py)重新排序:边来自 reads/writes 精确 slot 匹配、lookahead 相等的 depends_on、以及 same_progress_sync;cross_iter_depends_on 与跨 lookahead 的 depends_on 不入边(跨迭代由 ring 平移满足)。声明顺序作为不可比任务之间的 tie-break。运行期触发仍由 §4.8 掩码与执行器并行策略约束。in_flight_batches 由 max(task.lookahead) + 1 推导。
The validator (autosched/validator.py) enforces 8 rules at SchedulablePipeline.__init__:
validator(autosched/validator.py)在 SchedulablePipeline.__init__ 时强制执行 8 条规则:
| Rule规则 | Constraint约束 |
|---|---|
| 1 | Task names are unique within the schedule.同一个 schedule 中任务名唯一。 |
| 2 | task.lookahead >= 0 and slot.batch_offset >= 0.task.lookahead >= 0 且 slot.batch_offset >= 0。 |
| 3 | Every task.stream is in schedule.stream_slots AND in StreamPool.每个 task.stream 既要在 schedule.stream_slots 中,也要在 StreamPool 中。 |
| 4 | Each DataSlot(name, offset) has at most one writer.每个 DataSlot(name, offset) 最多只有一个 writer。 |
| 5 | Every read resolves to a writer (cross-iter via ring carry allowed; reserved slot batch_cpu exempt).每个读都能解析到一个 writer(允许跨迭代通过 ring 进位;保留 slot batch_cpu 例外)。 |
| 6 | Every name in depends_on, cross_iter_depends_on, or same_progress_sync resolves to a task in the same schedule. Declaration-order is no longer required; topological_sort picks a valid execution order.出现在 depends_on、cross_iter_depends_on 或 same_progress_sync 中的名字必须能解析到本 schedule 中的任务。不再要求声明顺序;topological_sort 会算出可执行顺序。 |
| 7 | The intra-progress graph (slot edges + matching-lookahead depends_on + same_progress_sync) is acyclic — checked by running topological_sort; failure raises SchedulingError("cyclic dependency").progress 内的图(slot 边 + 同 lookahead 的 depends_on + same_progress_sync)必须无环 —— 直接调用 topological_sort 检测,失败时抛 SchedulingError("cyclic dependency")。 |
| 8 | Cross-stream waits are emitted exactly once per (consumer, producer-stream) pair.每个 (consumer, producer-stream) 对的 cross-stream wait 仅插入一次。 |
3.3 BatchRing3.3 BatchRing 批次环
The ring holds N in-flight batches' slot stores. at(k) is the slot store at offset k from current. advance() drops offset 0, shifts each store down by one, and appends a fresh empty store at the highest offset.
环里持有 N 个在飞批次的 slot store。at(k) 返回距当前偏移 k 的 slot store。advance() 丢弃 offset 0,把所有 store 向下平移一格,并在最大偏移处追加一个空的新 store。
_run_one_internal_iter. Slot stores carry batch-specific tensors and torchrec contexts; advance() is what enables data flow across iterations without explicit copies.
每行是 _run_one_internal_iter 成功后的环状态。Slot store 持有批次相关的张量与 torchrec context;advance() 让数据无需显式拷贝就能跨迭代流动。
3.4 §4.8 mask formula3.4 §4.8 触发掩码公式
Determines whether a task at lookahead = k fires at iteration i (max_lookahead = max(task.lookahead) across the schedule):
决定 lookahead = k 的任务是否在迭代 i 触发(max_lookahead 取 schedule 中所有 task 的 lookahead 最大值):
fire iff (max_lookahead - k) ≤ i < M_known + (max_lookahead - k)
where M_known = pulled while pulling is live, otherwise the final M.
其中 M_known 在还在拉数据时等于已拉数 pulled,拉完之后等于最终的 M。
- The prefill phase (i < max_lookahead) runs only deep-lookahead tasks; compute @ lookahead=0 doesn't fire yet. 预热阶段(i < max_lookahead)只跑 lookahead 较深的任务;lookahead=0 上的 compute 还不触发。
- The steady phase (max_lookahead ≤ i < M) runs all qualifying tasks each iter. 稳态阶段(max_lookahead ≤ i < M)每轮都跑所有满足条件的任务。
- The drain phase (i ≥ M) runs only shallow-lookahead tasks until the ring is empty. 排空阶段(i ≥ M)只跑 lookahead 较浅的任务,直到环清空。
Engine raises StopIteration when no future iteration can satisfy any task's mask.
当未来任何迭代都不能让任意任务的掩码成立时,引擎抛出 StopIteration。
3.5 SchedulablePipeline driver3.5 SchedulablePipeline 驱动器
Public API surface (8 symbols total at commons.pipeline.engine):
公开 API(commons.pipeline.engine 共 8 个符号):
| Symbol符号 | Visibility可见性 | Purpose用途 |
|---|---|---|
SchedulablePipeline | public | The driver. Holds the engine's (schedule, stream_pool, executor) triple and runs progress(batch_iter).驱动器。持有 (schedule, stream_pool, executor) 三元组并执行 progress(batch_iter)。 |
Task | public | Subclass or Task.from_fn(name, fn, ...).通过子类或 Task.from_fn(name, fn, ...) 创建。 |
DataSlot | public | Slot identifier.slot 标识。 |
Schedule, Stage | public | Declarative DAG.声明式 DAG。 |
StreamPool | public | Named CUDA stream registry.命名 CUDA 流注册表。 |
TaskContext | public | Per-iter handle (ctx.slots, ctx.stream_pool, ctx.iter_count).每轮迭代的句柄(ctx.slots、ctx.stream_pool、ctx.iter_count)。 |
SequentialExecutor, ThreadedExecutor | public | Two execution strategies.两种执行策略。 |
ScheduleValidationError | public | Raised by validator.由 validator 抛出。 |
_seed_first_batch | private | Pre-populate ring.at(max_offset) for adapter bootstrap.为适配层引导预填 ring.at(max_offset)。 |
3.6 Event-sync graph & slot_offset3.6 事件同步图与 slot_offset
deps.infer_cross_stream_event_deps(schedule) compiles the four user-facing dependency declarations (reads/writes, depends_on, cross_iter_depends_on, same_progress_sync) into a flat per-consumer list of (producer, producer_stream, slot_offset) triples. At runtime the executor performs ring.at(slot_offset).get_event(producer) + consumer.wait_event(event). Two views below clarify (a) which declarations become topological-sort edges vs. only event-emit edges, and (b) how each edge's slot_offset is derived from ring rotation.
deps.infer_cross_stream_event_deps(schedule) 把用户层四种依赖声明(reads/writes、depends_on、cross_iter_depends_on、same_progress_sync)编译成每个 consumer 的扁平三元组列表 (producer, producer_stream, slot_offset)。运行期 executor 执行 ring.at(slot_offset).get_event(producer) + consumer.wait_event(event)。下面两张图分别说明:(a) 哪些声明会成为 topological_sort 的边、哪些只 emit 事件;(b) 每种边的 slot_offset 如何由 ring 旋转推出。
aux_stats task (not in real HSTU; only here to illustrate cross_iter_depends_on).Why la=2 for the cross_iter consumer? Two constraints, derived from the slot_offset math:
cross_iter_depends_on=((X,-N))code-level:slot_offset = consumer.la − N ≥ 0→ consumer.la ≥ N (deps.py:446, otherwise rejected as out-of-ring).depends_onfuture-read: any same-batchdepends_on=("Y",)requires consumer.la ≤ Y.la (deps.py:368). So a task that depends_on a low-la training task (backward/optimizer/forward, la=0) is itself constrained to la=0 — and would therefore fail constraint (1).
depends_on on low-la training tasks. It must be a high-lookahead, prefetch-side task whose only same-batch deps are on other high-la tasks (h2d, prefetch). Streaming / auxiliary statistics is the canonical example.topological_sort only consumes the three solid edges (start_input_dist → wait_input_dist → prefetch_embeddings, plus prefetch_embeddings → backward); the dashed edges contribute event triples but no in-progress ordering — their producer/consumer ordering across progress() calls is mediated by ring rotation, not by topo sort.
HSTU prefetch 变体里抽出 5 个 task,加 1 个假想的 aux_stats task(HSTU 实际无此任务,这里只为演示 cross_iter_depends_on)。为什么 cross_iter consumer 是 la=2?由 slot_offset 数学推出两条约束:
cross_iter_depends_on=((X,-N))代码层:slot_offset = consumer.la − N ≥ 0→ consumer.la ≥ N(deps.py:446,否则视为出 ring 拒绝)。depends_onfuture-read:同 batchdepends_on=("Y",)要求 consumer.la ≤ Y.la(deps.py:368)。所以任何 depends_on 低 la 训练 task(backward/optimizer/forward,la=0)的 task,自身也被钉在 la=0——这就违反约束 (1)。
depends_on 任何低 la 训练 task,必须是高 lookahead 的 prefetch 侧任务,同 batch 依赖只能落在其它高 la task(h2d、prefetch)上。流式/辅助统计就是典型用例。topological_sort 只看 3 条实线边(start_input_dist → wait_input_dist → prefetch_embeddings,加 prefetch_embeddings → backward);虚线边只 emit 事件三元组,不参与 progress 内顺序——它们 producer/consumer 跨 progress 的顺序由 ring 旋转保证,不由 topo sort 排。
| Edge type边类型 | slot_offset formulaslot_offset 公式 | Topo edge?是否拓扑边 | deps.py sitedeps.py 位置 |
|---|---|---|---|
reads/writes | read_slot.batch_offset | Yes if writer.batch_offset == read_slot.batch_offset; else ring-rotatedwriter.batch_offset == read_slot.batch_offset 时是;否则走 ring 旋转 | deps.py:341-348 |
depends_on (producer.la == consumer.la) | consumer.batch_offset | Yes是 | deps.py:350-390 |
depends_on (producer.la > consumer.la) | consumer.batch_offset (after diff ring advances) | No (ring rotation)否(依赖 ring 旋转) | deps.py:350-390 |
cross_iter_depends_on=((X,-N)) | consumer.batch_offset + neg_offset | No (ring rotation across N progresses)否(跨 N 次 progress 的 ring 旋转) | deps.py:399-463 |
same_progress_sync | producer.batch_offset | Yes (no rotation between producer and consumer)是(producer 与 consumer 间无 ring 旋转) | deps.py:476-487 |
3.7 same_progress_sync — when to use it3.7 same_progress_sync 的使用场景
Reads as: "this task must wait for X's work in this same progress() call to finish, regardless of which batches X and self are processing." Engine guarantees: 语义:"本 task 等 X 这同一次 progress() 内的工作做完,不管 X 和自己处理的是哪个 batch。"引擎承诺:
- Adds a topo-DAG edge X → self (X is guaranteed to fire before self within the progress).加一条 topo-DAG 边 X → self(保证 X 在 progress 内先于 self 触发)。
- For threaded execution, adds a CPU-side
threading.Eventwait (cross-thread case).线程执行时跨线程加一条 CPU 端threading.Event等待。 - For cross-stream, emits
wait_eventatslot[producer.la]; same-stream relies on CUDA stream FIFO.跨流时在slot[producer.la]emitwait_event;同流靠 CUDA stream FIFO 保序。
Three known use cases: 三种已知用法:
| # | Use case用法 | Example例子 |
|---|---|---|
| (a) | Stream coherency on out-of-slot mutable state. Two tasks touch a shared object NOT declared in any reads/writes slot (e.g. dynamicemb cache, torchrec module attribute, global counter). Engine cannot infer a dataflow edge → use same_progress_sync to make the wait explicit. This is the original / canonical use; mirrors the legacy default_stream.wait_stream(prefetch_stream) pattern.slot 之外可变状态的 stream 协同。两个 task 都触碰一个 reads/writes 没声明的对象(dynamicemb cache、torchrec module 属性、全局计数器等)。引擎看不见数据流边 → 用 same_progress_sync 把这条等待写出来。这是原始的、典型的用法;对应遗留代码的 default_stream.wait_stream(prefetch_stream) 模式。 |
backward.same_progress_sync=("prefetch_embeddings",) in HSTU prefetch variant — drains prefetch's GPU work before backward / before the next progress.HSTU prefetch 变体里的 backward.same_progress_sync=("prefetch_embeddings",) —— 在 backward / 下次 progress 之前把 prefetch 的 GPU 工作排空。 |
| (b) | Explicit form of the $\Delta = 0$ cross_iter_depends_on. When the user's mental model is "this la=c task waits for that la=p task's current-progress output" with $c = p + N$ ($N>0$), they may equivalently write same_progress_sync=(X,) for clarity, OR write cross_iter_depends_on=((X, -N),) with the batch-flow phrasing — the engine auto-promotes the latter to the same mechanical contract (same topo edge, same slot lookup, same CPU edge). Use same_progress_sync when the ordering intent is naturally "wait for X in this progress, regardless of batch."$\Delta = 0$ cross_iter_depends_on 的显式写法。当用户思维是"我这个 la=c 的 task 等那个 la=p 的 task 在本次 progress 的输出",$c = p + N$ ($N>0$),可以等价地用 same_progress_sync=(X,)(更清晰)或 cross_iter_depends_on=((X, -N),)(batch-flow 措辞)——引擎将后者自动提升为前者的机理(相同 topo 边、相同 slot 查询、相同 CPU 边)。当顺序意图自然就是"在本 progress 内等 X 完成、不管 batch"时用 same_progress_sync。 |
forward.same_progress_sync=("update",) when fwd.la=1, update.la=0 — fwd uses post-update weights from the same progress. See tests/commons/test_engine_same_progress_sync_correctness.py for verified GPU equivalence to a manual la=1-pipelined SGD baseline.forward.same_progress_sync=("update",),fwd.la=1, update.la=0 —— fwd 用本次 progress update 后的 weight。tests/commons/test_engine_same_progress_sync_correctness.py 在 GPU 上验证了与手动 la=1 流水线 SGD baseline 完全等价。 |
| (c) | Same-progress logging / metric aggregation. A consumer (e.g. metric writer) wants the producer's current-progress event for coherent measurement, even if the two don't share any slot or out-of-slot state. Less common; aux statistics tasks typically.同 progress 内的 logging / 指标汇总。consumer(如 metric writer)想拿到 producer 本次 progress 的 event 做一致性测量,即使两者没共享 slot 或其它状态。比较少见;多见于辅助统计任务。 | Hypothetical metric_writer.same_progress_sync=("forward",) for capturing forward stats per-progress.假想 metric_writer.same_progress_sync=("forward",) 用于按 progress 抓取 forward 的统计量。 |
| (d) | Implicit, via cross_iter_depends_on Δ=0 auto-promotion. If the user's mental model is batch-flow ("I want X's work for batch K−N"), they may write cross_iter_depends_on=((X, -N),) directly. When the lookahead math gives Δ=0 (consumer.la == producer.la + N), the engine auto-promotes the declaration to the same_progress_sync mechanical contract — same topo edge, same CPU edge, same slot[producer.la] event lookup. The user does not need to know same_progress_sync as a separate field; it works either way.通过 cross_iter_depends_on Δ=0 自动提升隐式触发。用户思维如果是 batch-flow("我要 X 在 batch K−N 的工作"),可以直接写 cross_iter_depends_on=((X, -N),)。当 lookahead 数学给出 Δ=0(consumer.la == producer.la + N),引擎自动把这个声明转成 same_progress_sync 的机理——相同的 topo 边、相同的 CPU 边、相同的 slot[producer.la] event 查询。用户不需要知道 same_progress_sync 是独立字段;两种写法效果完全一致。 |
forward.cross_iter_depends_on=(("update", -1),) with fwd.la=1, update.la=0 → Δ=0 auto-promoted. Equivalent to use case (b) but using the cross_iter syntax. See SPEC_cross_iter_delta0_autoconvert.md.forward.cross_iter_depends_on=(("update", -1),),fwd.la=1, update.la=0 → Δ=0 自动提升。与 (b) 等价,但用 cross_iter 语法。详见 SPEC_cross_iter_delta0_autoconvert.md。 |
Direction convention. The field lives on the consumer side; X.same_progress_sync=() always lists producers the consumer is waiting on. producer.la and consumer.la may differ — the field is la-agnostic (in contrast to depends_on which requires producer.la ≥ consumer.la to avoid future-read).
方向约定。字段写在 consumer 一侧;X.same_progress_sync=() 列的都是 X 等待的 producer。producer.la 与 consumer.la 可以不相等——这个字段对 lookahead 不敏感(区别于 depends_on 必须 producer.la ≥ consumer.la 以避免 future-read)。
Distinguishing from depends_on and cross_iter_depends_on:
与 depends_on 和 cross_iter_depends_on 的区别:
depends_on=("X",)— same-batch logical wait. Both must process the same batch K. Constrained byproducer.la ≥ consumer.la.depends_on=("X",)—— 同 batch 逻辑等待。两者必须处理同一个 batch K。约束producer.la ≥ consumer.la。cross_iter_depends_on=((X, -N),)— different-batch wait. "Consumer on batch K waits for X on batch K−N." Δ ≥ 1 is the cross-progress, ring-rotated case; Δ=0 (i.e.producer.la + N == consumer.la) is auto-promoted to thesame_progress_syncmechanical contract — same topo edge, CPU edge, and current-progress event lookup atslot[producer.la](perSPEC_cross_iter_delta0_autoconvert.md).cross_iter_depends_on=((X, -N),)—— 跨 batch 等待。"consumer 在 batch K 时等 X 在 batch K−N 的工作"。Δ ≥ 1 走 ring-rotated 跨 progress 路径;Δ=0(即producer.la + N == consumer.la)会被自动 promote 为same_progress_sync的机制契约——相同的 topo 边、CPU 边、以及在slot[producer.la]上 lookup 当前 progress 的 event(详见SPEC_cross_iter_delta0_autoconvert.md)。same_progress_sync=("X",)— same-progress wait, batch-agnostic. The two ARE allowed to process different batches; what matters is X completes its current-progress GPU work before consumer reads.same_progress_sync=("X",)—— 同 progress 等待,不关心 batch。两者允许处理不同 batch;重点是 X 在本次 progress 的 GPU 工作完成后 consumer 才读。
4. Threaded executor (Problem #3)4. 多线程执行器(Problem #3)
ThreadedExecutor dispatches tasks to a concurrent.futures.ThreadPoolExecutor with one worker per thread_id in the active thread_map. The pool resizes on demand if a later stage needs more thread groups than the current pool has.
ThreadedExecutor 把任务派发到 concurrent.futures.ThreadPoolExecutor,每个活跃 thread_map 中的 thread_id 对应一个 worker。如果后续 stage 需要更多线程组,线程池会按需扩容。
4.1 thread_map strategies4.1 thread_map 策略
| Spec规约 | Resolution解析方式 | Use case使用场景 |
|---|---|---|
None or或 "by_stream" (default默认) |
thread_id = task.stream or "default" |
Common case: one CPU thread per CUDA stream常见情况:每个 CUDA 流一个 CPU 线程 |
"per_task" |
thread_id = task.name |
Maximum parallelism; useful for stress tests最大并行度;适合压力测试 |
dict[str, str] |
{task_name: thread_id} with "default" fallback未命中走 "default" |
Custom pinning (e.g. HSTU's {io, compute})自定义绑定(例如 HSTU 的 {io, compute}) |
Callable[[Task], str] |
User function用户提供的函数 | Dynamic policies based on task fields基于任务字段的动态策略 |
Threads and CUDA streams are decoupled: a task's stream selects the CUDA stream context inside StreamPool.use(); thread_map selects the CPU worker that submits the task. The same stream may be submitted from different threads (different iterations or — with caution — same iteration).
线程与 CUDA 流是 解耦的:任务的 stream 字段决定 StreamPool.use() 内的 CUDA 流上下文;thread_map 决定提交任务的 CPU 线程。同一个流可以来自不同线程(来自不同迭代,或谨慎地来自同一迭代)。
4.2 CPU dependency computation4.2 CPU 依赖计算
_compute_cpu_deps builds a dict {task_name → [predecessor_completion_events]} by walking the topo-sorted active task list and adding four edge types:
_compute_cpu_deps 沿拓扑序遍历活跃任务,构造 {task_name → [predecessor_completion_events]} 字典,添加四类边:
- Slot-based基于 slot: any writer of a slot the task reads (excluding self). 该任务所读 slot 的任意 writer(自己除外)。
-
Same-batch logical (
depends_on)同 batch 逻辑(depends_on): everytask.depends_onname with matchinglookaheadthat is in active. Cross-lookahead names go through ring rotation as a cross-stream wait, not a CPU edge.task.depends_on中且lookahead相等、又在活跃集合中的任务名。跨 lookahead 的名字走 ring 平移变成跨流 wait,不进 CPU 边。 -
Same-progress sync (
same_progress_sync)同 progress 协同(same_progress_sync): everytask.same_progress_syncname that is in active — same-progress GPU coherency wait, regardless of lookahead.task.same_progress_sync中且在活跃集合中的任务名 —— 同一次 progress 内的 GPU 协同等待,与 lookahead 无关。 -
Same-stream FIFO同流 FIFO:
the most recent task on the same stream in topological order — preserves CUDA stream FIFO semantics when
thread_mapsplits same-stream tasks across threads. 拓扑序中同一个流上的最近一个任务 —— 当thread_map把同流任务拆到不同线程时,这条边保住了 CUDA 流的 FIFO 语义。
The cross-thread filter then keeps only edges where thread_id_of(producer) ≠ thread_id_of(consumer); same-thread predecessors are already serialized by their thread loop.
随后通过跨线程过滤,仅保留 thread_id_of(producer) ≠ thread_id_of(consumer) 的边;同线程的前驱已经被线程内的循环串行化。
4.3 NCCL ordered lock4.3 NCCL 顺序锁
Tasks tagged nccl=True acquire a ticket-based lock that guarantees topological-order execution across all ranks. Tickets are assigned by the topo-sorted task tuple (deterministic across ranks because the schedule and tie-break are deterministic). Without this, a multi-threaded executor on different ranks may submit collectives in different orders → mismatched NCCL calls → deadlock.
标了 nccl=True 的任务要获取一把基于票据的锁,保证所有 rank 都按拓扑序执行集合通信。票据由拓扑排序后的任务元组分配(schedule 与 tie-break 都是确定性的,所以各 rank 一致)。如果没有它,多线程执行器在不同 rank 上可能以不同顺序提交集合操作 → NCCL 调用错配 → 死锁。
release(failed=True) when the running task raises; abort() when an unrelated worker dies before the expected ticket release would have happened.
票据 FIFO 设计了两条失败退出:当前任务抛错时走 release(failed=True);当某个无关 worker 在预期的票据释放之前先死掉时由 abort() 触发。
4.4 Cancellation & abort4.4 取消与中止
Each execute_stage call sets up:
每次 execute_stage 调用会准备:
- A
threading.Eventper active task — signaled when the task finishes (or its thread errors out).每个活跃任务一个threading.Event—— 任务完成(或所在线程出错)时置位。 - A shared
cancelledEvent — set by the first failing thread.一个共享的cancelledEvent —— 由第一个失败的线程置位。 - An
errors_lock+errorslist to collect exceptions across threads.errors_lock+errors列表用于跨线程收集异常。
The except handler in _run_thread_chain does three things in order:
_run_thread_chain 的 except 分支按顺序做三件事:
cancelled.set()— stops further task starts on all chains.阻止所有线程链上后续任务的启动。self._nccl_lock.abort()— wakes any worker blocked insideacquire(ticket > current).唤醒任何阻塞在acquire(ticket > current)的 worker。- Append exception to
errors; thefinallyblock sets every remaining task's completion event so peer threads waiting oncpu_depsEvents don't deadlock.把异常追加进errors;finally块把剩余任务的完成事件全部置位,避免其他线程在cpu_deps上死锁。
After the stage barrier (future.result() on every chain), execute_stage raises errors[0].
stage 屏障(对每条链 future.result())之后,execute_stage 抛出 errors[0]。
5. HSTU adapter (Problem #2)5. HSTU 适配层(Problem #2)
5.1 14 tasks5.1 14 个任务
HSTUPipeline._build_schedule assembles a single-stage schedule from these task factories. Author declaration order is just a tie-break for incomparable tasks — actual execution order is decided by topological_sort over reads/writes + depends_on + same_progress_sync.
HSTUPipeline._build_schedule 用这些任务工厂拼出一个单 stage 的 schedule。作者的声明顺序只在不可比的任务之间作为 tie-break 用;真正的执行顺序由 topological_sort 基于 reads/writes + depends_on + same_progress_sync 决定。
| # | Task任务 | lookahead | stream | nccl | Purpose用途 |
|---|---|---|---|---|---|
| 1 | h2d | 2 (np) / 2 (pf) | memcpy | — | CPU→GPU + create torchrec_ctxCPU→GPU + 创建 torchrec_ctx |
| 2 | start_shuffle | 2 / 2 | memcpy | conditional | KK shuffler phase 1 (background thread)KK shuffler 第 1 阶段(后台线程) |
| 3 | finish_shuffle | 2 / 2 | memcpy | conditional | KK shuffler phase 2 (AllGather + index_select)KK shuffler 第 2 阶段(AllGather + index_select) |
| 4 | start_input_dist | 1 / 1 | data_dist | ✓ | torchrec splits all-to-alltorchrec splits all-to-all |
| 5 | wait_input_dist | 1 / 1 | data_dist | ✓ | tensors-awaitable wait等待 tensors-awaitable 完成 |
| 6 | zero_grad | 0 | default | — | optimizer.zero_grad + Megatron buffer resetoptimizer.zero_grad + Megatron buffer 重置 |
| 7 | global_tokens_allreduce | 0 | default | ✓ | Per-rank loss-token AllReduceper-rank loss-token AllReduce |
| 8 | nccl_safety_barrier | 0 | default | — | wait_stream(memcpy) for shuffle ordering为 shuffle 顺序插入 wait_stream(memcpy) |
| 9 | forward | 0 | default | — | set_module_context + model(batch)set_module_context + model(batch) |
| 10 | prefetch_embeddings (prefetch only) | 1 | prefetch | — | cache prefetch + populate module_input_post_prefetch缓存 prefetch + 填充 module_input_post_prefetch |
| 11 | backward | 0 | default | ✓ | (loss × dp_size / global_tokens).backward() |
| 12 | finalize_model_grads | 0 | default | ✓ | Megatron TP grad finalizeMegatron TP 梯度收尾 |
| 13 | optimizer_step | 0 | default | — | optimizer.step + write step_resultoptimizer.step + 写 step_result |
| 14 | watchdog_step | 0 | default | — | Optional CUDA mem watchdog可选的 CUDA 内存 watchdog |
5.2 HSTU_DEFAULT_THREAD_MAP5.2 HSTU_DEFAULT_THREAD_MAP
Two thread groups: 两个线程组:
HSTU_DEFAULT_THREAD_MAP = {
# io thread — pure data movement
"h2d": "io",
"start_shuffle": "io",
"finish_shuffle": "io",
# compute thread — everything that touches shared module state
"start_input_dist": "compute",
"wait_input_dist": "compute",
"prefetch_embeddings": "compute",
"zero_grad": "compute",
"global_tokens_allreduce": "compute",
"nccl_safety_barrier": "compute",
"forward": "compute",
"backward": "compute",
"finalize_model_grads": "compute",
"optimizer_step": "compute",
"watchdog_step": "compute",
}
"io" overlap NCCL + GPU compute on "compute", without paying the per-task pool dispatch overhead of full thread-per-task. forward is the only writer of the shared module.forward._context attribute, and runs as a single task on "compute", so there is no cross-thread race to defend against.
分两线程(而非更多)是为了让 "io" 上的 H2D / shuffle 与 "compute" 上的 NCCL + GPU 计算重叠,又不付 thread-per-task 的调度开销。共享属性 module.forward._context 的唯一写者是 forward 这一个 task,固定跑在 "compute" 线程上,没有跨线程竞态需要防。
5.3 Bootstrap flow5.3 引导流程
The first call to HSTUPipeline.progress() runs a one-time bootstrap because torchrec's _rewrite_model needs a real batch for FX tracing. Sequence diagram in §6.1.
首次调用 HSTUPipeline.progress() 会执行一次性引导,因为 torchrec 的 _rewrite_model 需要一个真实 batch 做 FX 追踪。时序图见 §6.1。
Key invariants: 关键不变量:
- StreamPool built first先建 StreamPool — H2D + bootstrap collectives run on the engine's actual streams, not throwaway streams.H2D + 引导阶段的集合通信都跑在引擎真正用的流上,而非临时流。
- Real per-batch context真实的 per-batch context — bootstrap context is created by
state.create_torchrec_ctx()withversion=1(the v0 single-shared-context branch is forbidden).引导 context 由state.create_torchrec_ctx()创建,version=1(v0 单共享 context 分支被禁用)。 - Peek batch is seeded into the ringpeek batch 被植入 ring —
_pipe._seed_first_batch(...)populatesring.at(max_offset)with{batch_cpu, batch_gpu, torchrec_ctx, shuffled_batch (identity only)}; idempotent guards in h2d/start_input_dist/finish_shuffle skip the work that's already been done._pipe._seed_first_batch(...)把{batch_cpu, batch_gpu, torchrec_ctx, shuffled_batch(仅 identity)}写入ring.at(max_offset);h2d/start_input_dist/finish_shuffle 中的幂等保护会跳过已完成的工作。 - No 1-batch loss不丢一个 batch — the peek batch flows through forward/backward/optimizer like any other batch.peek batch 与普通 batch 一样走完 forward/backward/optimizer。
5.4 native vs prefetch variant5.4 native 与 prefetch 变体
Same task list, plus or minus prefetch_embeddings. The non-prefetch variant uses PipelinedForward which reads input_dist_tensors_requests directly. The prefetch variant uses PrefetchPipelinedForward which reads from module_input_post_prefetch, populated in the previous iter by prefetch_embeddings.
两者任务列表相同,区别是有没有 prefetch_embeddings。无 prefetch 变体用 PipelinedForward,直接读 input_dist_tensors_requests;prefetch 变体用 PrefetchPipelinedForward,读上一轮 prefetch_embeddings 写入的 module_input_post_prefetch。
Critical ordering (within one progress)关键顺序(同一次 progress 内):
prefetch_embeddings must run after forward in every progress, not before. Reason: dynamicemb's _prefetch_outstanding_keys counter is incremented by prefetch() and decremented at end of forward. The ordering is enforced by backward.same_progress_sync=("prefetch_embeddings",): backward (and therefore this progress's whole compute chain after forward) waits for prefetch's GPU work in the same progress to finish — a stream coherency wait, not a batch dependency. Within one progress:
prefetch_embeddings 必须在每次 progress 中跑在 forward 之后,不能反过来。原因:dynamicemb 的 _prefetch_outstanding_keys 计数器在 prefetch() 时加,在 forward 结束时减。这一顺序由 backward.same_progress_sync=("prefetch_embeddings",) 强制:backward(以及由此 progress 中 forward 之后的整条计算链)等本次 progress 内 prefetch 的 GPU 工作完成 —— 是 stream 协同等待,不是 batch 依赖。同一次 progress 内:
- forward → prefetch (correct)forward → prefetch(正确): forward consumes prev-iter's prefetched data first (counter decremented), then this-iter's prefetch adds new keys. Steady-state outstanding ≈ 1 batch.forward 先消费上一轮 prefetch 的数据(计数减),然后本轮 prefetch 新增 key。稳态在飞 ≈ 1 个 batch。
- prefetch → forward (wrong)prefetch → forward(错误): prefetch adds new keys before forward consumes prev's. Steady-state outstanding ≈ 2 batches → cache overflow.prefetch 在 forward 消费前一轮之前就加新 key。稳态在飞 ≈ 2 个 batch → cache 溢出。
Why same_progress_sync and not depends_on? prefetch_embeddings has lookahead=1 (it pre-loads next batch K+1) while backward has lookahead=0 (consumes batch K). They process different batches, so a same-batch depends_on would be incorrect; what we actually need is "drain prefetch's GPU work in this progress before the next progress starts" — exactly the same-progress GPU coherency intent.
为什么用 same_progress_sync 而不是 depends_on?prefetch_embeddings 的 lookahead=1(提前为 K+1 batch 加载),而 backward 的 lookahead=0(消费 batch K)。两者处理的是不同 batch,写 depends_on(同 batch 等待)语义就错了;真正要表达的是"在本次 progress 结束前,把 prefetch 的 GPU 工作排空" —— 正好是同 progress 的 GPU 协同意图。
5.5 Auto-scheduler v2: joint search5.5 自动调度器 v2:联合搜索
The v2 design searches the coupled space thread_map × stream_map × fire_ordering for the HSTU schedule. It does not replace the current lookahead-only scheduler in autosched/fire_order.py; instead, every candidate schedule is first materialized with its proposed thread/stream/fire-order choices, then passed through auto_assign_lookaheads(..., max_in_flight=5). The returned lookahead map becomes part of the scored candidate.
v2 设计在 HSTU schedule 上联合搜索 thread_map × stream_map × fire_ordering。它不会替换当前 autosched/fire_order.py 中的 lookahead-only 调度器;每个候选 schedule 会先按候选的线程、流、触发顺序物化,再交给 auto_assign_lookaheads(..., max_in_flight=5)。返回的 lookahead map 会成为该候选的一部分并参与打分。
| Axis维度 | Candidate source候选来源 | Bound / safety rule边界 / 安全规则 |
|---|---|---|
thread_map | Named HSTU presets: default, by_stream, io_prefetch_compute, io_data_dist_compute, io_data_dist_prefetch_compute.HSTU 命名预设:default、by_stream、io_prefetch_compute、io_data_dist_compute、io_data_dist_prefetch_compute。 | per_task is rejected because the 14-task HSTU graph would exceed max_threads ≤ 4.per_task 被拒绝,因为 14 个 HSTU 任务会超过 max_threads ≤ 4。 |
stream_map | Prep groups start from current streams: memcpy for H2D/shuffle, data_dist for input distribution, prefetch for embedding prefetch.准备阶段任务组从现有流开始:H2D/shuffle 用 memcpy,input distribution 用 data_dist,embedding prefetch 用 prefetch。 | forward, backward, finalize_model_grads, and optimizer_step are frozen on the default stream at lookahead 0.forward、backward、finalize_model_grads、optimizer_step 固定在 default stream 且 lookahead 为 0。 |
fire_ordering | Resource-aware topological tie-break policies: critical-chain first, NCCL queue early, PCIe prep early, longest off-default first, or min-slack first.资源感知的拓扑排序 tie-break 策略:critical-chain first、NCCL queue early、PCIe prep early、longest off-default first、min-slack first。 | Ordering never removes DAG edges and preserves same-communicator NCCL ticket order.排序不会删除 DAG 边,并保留同 communicator NCCL 的 ticket 顺序。 |
The objective is modeled steady-state latency: simulate several warmup/steady/drain progress calls with the task costs from hstu_prefetch_caching_1node.json, then score the interval between successive optimizer_step completions. The overlap matrix is the resource model: same stream serializes, same NCCL communicator serializes even on different streams, PCIe-bound tasks contend, and all other pairs may overlap if the DAG permits.
目标函数是建模后的稳态延迟:用 hstu_prefetch_caching_1node.json 中的任务 cost 模拟若干 warmup/steady/drain progress,再对连续 optimizer_step 完成之间的间隔打分。overlap matrix 是资源模型:同 stream 串行,同 NCCL communicator 即使在不同 stream 上也串行,PCIe-bound 任务竞争带宽,其余任务只要 DAG 允许就可重叠。
# Pseudocode sketch only; full design is in tasks/SPEC_autosched_joint_search.md.
beam = [current_hstu_candidate]
for round in search_rounds:
expanded = mutate_thread_map(beam) + mutate_stream_map(beam) + mutate_fire_ordering(beam)
for candidate in expanded:
schedule = rebuild_schedule(candidate.thread_map, candidate.stream_map, candidate.fire_ordering)
candidate.lookahead_map = auto_assign_lookaheads(schedule, cost_model, max_in_flight=5)
candidate.score = modeled_optimizer_step_interval(schedule, candidate.lookahead_map)
beam = top_k(valid_candidates, by="score")
max_threads ≤ 4, max_in_flight ≤ 5, bit-exact default-stream anchors at la=0, and same-communicator NCCL serialization from the existing overlap matrix.
max_threads ≤ 4、max_in_flight ≤ 5、bit-exact 任务固定 default stream 且 la=0,并沿用现有 overlap matrix 中的同 communicator NCCL 串行规则。
Full pseudocode (6 procedures)完整伪代码(6 个过程)
Python-like, but not runnable. The pseudocode below mirrors tasks/SPEC_autosched_joint_search.md; use <details> to expand each procedure individually.
Python 风格但非可运行代码;与 tasks/SPEC_autosched_joint_search.md 一致。点击下方 <details> 展开每个过程。
1. JOINT_HSTU_SEARCH — main beam loop主 beam 循环
procedure JOINT_HSTU_SEARCH(
base_schedule,
cost_model,
hstu_thread_presets,
beam_width,
max_threads = 4,
max_in_flight = 5,
):
require max_threads <= 4
require max_in_flight <= 5
frozen = {
"forward",
"backward",
"finalize_model_grads",
"optimizer_step",
}
legal_thread_presets = []
for preset_name, preset in hstu_thread_presets:
if preset_name == "per_task":
continue # exceeds max_threads <= 4
legal_thread_presets.append((preset_name, preset))
canonical = Candidate(
thread_map = HSTU_DEFAULT_THREAD_MAP,
stream_map = CURRENT_HSTU_STREAMS(base_schedule),
fire_ordering = DECLARATION_ORDER(base_schedule),
lookahead_map = CURRENT_LOOKAHEADS(base_schedule),
)
canonical = NORMALIZE_AND_SCORE(
canonical, base_schedule, cost_model,
frozen, max_threads, max_in_flight,
)
beam = TOP_BY_SCORE([canonical], beam_width)
best = canonical
for round in 1..SEARCH_ROUNDS:
expanded = []
for candidate in beam:
expanded += MUTATE_THREAD_MAP(candidate, legal_thread_presets)
expanded += MUTATE_STREAM_MAP(candidate)
expanded += MUTATE_FIRE_ORDERING(candidate)
scored = []
for candidate in UNIQUE(expanded):
normalized = NORMALIZE_AND_SCORE(
candidate, base_schedule, cost_model,
frozen, max_threads, max_in_flight,
)
if normalized is valid:
scored.append(normalized)
beam = TOP_BY_SCORE(DOMINANCE_PRUNE(scored), beam_width)
if beam is empty: break
if SCORE(beam[0]) < SCORE(best): best = beam[0]
if NO_SCORE_IMPROVEMENT_FOR_PATIENCE_ROUNDS(): break
return best
2. NORMALIZE_AND_SCORE — composes with auto_assign_lookaheads复用现有 auto_assign_lookaheads 作为 lookahead oracle
procedure NORMALIZE_AND_SCORE(
candidate, base_schedule, cost_model,
frozen, max_threads, max_in_flight,
):
if VIOLATES_FROZEN_STREAM_OR_LOOKAHEAD(candidate, frozen):
return invalid
resolved_thread_map = RESOLVE_THREAD_MAP(candidate.thread_map,
candidate.stream_map)
if COUNT_THREADS(resolved_thread_map) > max_threads:
return invalid
provisional_schedule = REBUILD_SCHEDULE(
base_schedule,
stream_map = candidate.stream_map,
fire_ordering = candidate.fire_ordering,
lookahead_map = CURRENT_LOOKAHEADS(base_schedule),
)
try:
derived_lookahead = auto_assign_lookaheads(
provisional_schedule, cost_model,
max_in_flight = max_in_flight,
)
except SchedulerRejectsCandidate:
return invalid
if ANY(derived_lookahead[t] != 0 for t in frozen):
return invalid
scored_schedule = REBUILD_SCHEDULE(
base_schedule,
stream_map = candidate.stream_map,
fire_ordering = candidate.fire_ordering,
lookahead_map = derived_lookahead,
)
overlap_matrix = compute_overlap_matrix(scored_schedule.tasks)
latency = RESOURCE_LIST_SCHEDULE_LATENCY(
scored_schedule, cost_model,
resolved_thread_map, overlap_matrix, max_in_flight,
)
candidate.lookahead_map = derived_lookahead
candidate.score = latency
return candidate
3. MUTATE_THREAD_MAP — enumerate the 5 surviving presets枚举 5 个合法 preset
procedure MUTATE_THREAD_MAP(candidate, legal_thread_presets):
for preset_name, preset in legal_thread_presets:
yield candidate with thread_map = preset
4. MUTATE_STREAM_MAP — group-level moves, not full Cartesian按 prep group 整体迁移,避免笛卡尔爆炸
procedure MUTATE_STREAM_MAP(candidate):
movable_groups = [
{"h2d", "start_shuffle", "finish_shuffle"},
{"start_input_dist", "wait_input_dist"},
{"prefetch_embeddings"},
]
for group in movable_groups:
if group is absent from schedule:
continue
for stream_choice in LEGAL_STREAM_CHOICES(group):
next_map = COPY(candidate.stream_map)
for task in group:
if task in BIT_EXACT_FROZEN_TASKS:
continue # never move forward/bwd/opt/finalize
next_map[task] = stream_choice
yield candidate with stream_map = next_map
5. MUTATE_FIRE_ORDERING — topological tie-break policies + adjacent-pair swaps5 种拓扑 tie-break 策略 + 相邻可换对的交换
procedure MUTATE_FIRE_ORDERING(candidate):
policies = [
CRITICAL_DEFAULT_CHAIN_FIRST,
LONGEST_OFF_DEFAULT_FIRST,
NCCL_QUEUE_EARLY_WITH_TICKET_ORDER,
PCIE_PREP_EARLY,
MIN_SLACK_FIRST,
]
for policy in policies:
priority = BUILD_PRIORITY_VECTOR(candidate, policy)
yield candidate with fire_ordering = priority
# local search refinement: any topo-incomparable adjacent pair
# in the current order is allowed to swap
for adjacent_pair in INCOMPARABLE_ADJACENT_PAIRS(candidate.fire_ordering):
yield candidate with that pair swapped
6. DOMINANCE_PRUNE — conservative dedup before scoring打分前的保守去重
procedure DOMINANCE_PRUNE(candidates):
grouped = GROUP_BY(
candidates,
keys = (thread_map, stream_map, lookahead_map),
)
for group in grouped:
keep the lowest-score fire_ordering
discard candidates with same or worse score and no lower thread count
Full SPEC (objective definition, constraint encoding, complexity bound, worked example): tasks/SPEC_autosched_joint_search.md (685 lines). 完整 SPEC(目标函数、约束编码、复杂度边界、worked example):tasks/SPEC_autosched_joint_search.md(685 行)。
→ Interactive visualization (Diagram 5: search-space landscape). → 交互式可视化(图 5:搜索空间)。
→ Thread-map preset comparison (6 hand-coded presets, stream × thread × lookahead per HSTU task). → 6 个 thread_map preset 对照(每个 HSTU task 的 stream × thread × lookahead 布局)。
6. Workflow sequence diagrams6. 工作流时序图
6.1 First progress() call (bootstrap)6.1 首次 progress() 调用(引导)
progress(). The peek batch is fully integrated into the engine via _seed_first_batch; subsequent calls skip steps 2–12 and go straight to SchedulablePipeline.progress().
引导只在首次 progress() 时发生一次。peek batch 通过 _seed_first_batch 完整融入引擎;后续调用直接跳过第 2–12 步进入 SchedulablePipeline.progress()。
6.2 Steady-state iteration6.2 稳态迭代
par fragment marks parallel execution: io chain and compute chain run concurrently. Within each chain, tasks fire in topological order (declaration order is the tie-break). NCCL-tagged tasks acquire tickets from _NcclOrderedLock in that same topological order (0, 1, 2, 3 here), guaranteeing deterministic submission order across ranks.
par 片段标记并行执行:io 链与 compute 链并发跑。链内任务按拓扑序触发(声明顺序作 tie-break)。带 NCCL 标记的任务按相同拓扑序从 _NcclOrderedLock 取票(这里是 0、1、2、3),从而在所有 rank 上保证确定的提交顺序。
6.3 NCCL ticket lock6.3 NCCL 票据锁
6.4 Multi-batch overlap (timing diagram)6.4 多批次重叠(时序图)
progress()-internal iteration. Rows show what runs at each lookahead. After 2 prefill iters, 3 batches are in flight simultaneously (different stages on different streams via the io/compute split). The progress() return tracks lookahead=0's step_result.
每列是一次 progress() 内部迭代。各行表示在不同 lookahead 上跑的内容。经过 2 轮预热后,3 个 batch 同时在飞(io/compute 拆分让不同阶段落在不同流上)。progress() 的返回值对应 lookahead=0 的 step_result。
7. Class & method reference7. 类与方法参考
7.1 Engine classes7.1 引擎类
uml-text-mute) members. SchedulablePipeline aggregates a Schedule, StreamPool, and Executor; the BatchRing + TaskContext are constructed internally.
公开成员 + 私有成员(uml-text-mute 斜体)。SchedulablePipeline 聚合 Schedule、StreamPool、Executor;BatchRing + TaskContext 由内部构造。
7.2 Executor classes7.2 执行器类
| Class / function类 / 函数 | Member成员 | Description说明 |
|---|---|---|
SequentialExecutor | execute_stage(stage, ctx, iter_count, mask, cross_stream_waits, pool) | Iterate over stage.tasks in order, run each under pool.use(stream).按顺序遍历 stage.tasks,每个任务在 pool.use(stream) 上下文里执行。 |
shutdown() | No-op.空操作。 | |
ThreadedExecutor | __init__(thread_map=None, max_workers=None) | Lazy ThreadPoolExecutor; resizes on stage demand.懒构造 ThreadPoolExecutor;按 stage 需要扩容。 |
execute_stage(...) | Partition active by thread_id, build cpu_deps, submit chains, barrier.按 thread_id 切分活跃任务,构造 cpu_deps,提交各链,最后做屏障。 | |
shutdown() | Joins worker threads.合并 worker 线程。 | |
_NcclOrderedLock | acquire(ticket) | Block until next_ticket == ticket or raise on _failed.阻塞至 next_ticket == ticket,或在 _failed 时抛错。 |
release(failed=False) | Increment next_ticket; if failed, set _failed.递增 next_ticket;若 failed,置位 _failed。 | |
abort() | Force-set _failed + notify_all. Wakes blocked waiters.强制置 _failed + notify_all,唤醒所有等待者。 | |
reset() | Reset both fields. Called per stage.重置两个字段,每个 stage 调用一次。 | |
_resolve_thread_id(task, thread_map) | helper辅助函数 | Maps a task to its thread id given any thread_map spec.在任意 thread_map 规约下把任务映射到 thread id。 |
_compute_cpu_deps(active, thread_id_of, completion) | helper辅助函数 | Build cross-thread {name → [Event]}.构造跨线程的 {name → [Event]}。 |
_apply_cross_stream_waits(task, waits, pool) | helper辅助函数 | GPU-side wait_stream emission.发出 GPU 端的 wait_stream。 |
7.3 HSTU classes7.3 HSTU 类
| Class类 | Member成员 | Visibility可见性 | Purpose用途 |
|---|---|---|---|
HSTUPipeline | __init__(model, optimizer, device, *, prefetch=False, prefetch_depth=1, batch_shuffler=None, threaded=True, thread_map=None, ...) | public | Construct the adapter; pipeline lazily built on first progress().构造适配器;流水线在首次 progress() 时懒构建。 |
progress(dataloader_iter) → (loss, tokens, output) | public | Drive one full iter; matches legacy return signature.驱动一次完整迭代;返回签名与遗留实现一致。 | |
attach(model=None) / detach() → Module | public | Pause/resume hooks (matches legacy).暂停/恢复钩子(与遗留一致)。 | |
shutdown() + context-manager | public | Tear down executor + threads.拆解执行器与线程。 | |
_build_schedule() | internal | Assemble Schedule + StreamPool from variant flags.根据变体参数装配 Schedule + StreamPool。 | |
_rewrite_model(peek, dist, default, memcpy=None) | internal | Bootstrap torchrec FX trace + monkey-patches.引导 torchrec FX 追踪与 monkey-patch。 | |
HSTUPipelineFactory | register(name, factory) | public | Register a custom variant.注册自定义变体。 |
create(name, **kwargs) → HSTUPipeline | public | Look up + instantiate.查找并实例化。 | |
list() → List[str] | public | Enumerate registered names.列出已注册名称。 | |
Pre-registered variants预注册变体: "hstu_sparse_dist" (native), "hstu_prefetch_sparse_dist" (prefetch). | |||
PipelineState | model, optimizer, device, batch_shuffler | internal | Captured at construction.构造时捕获。 |
pipelined_modules | internal | Populated by _rewrite_model.由 _rewrite_model 填充。 | |
create_torchrec_ctx() → TrainPipelineContext | internal | v1 only — asserts ctx.version == 1.仅 v1 —— 断言 ctx.version == 1。 | |
8. User-facing API examples8. 用户接口与示例
8.1 T1: vanilla preset (≤ 8-line diff)8.1 T1:基础预设(≤ 8 行 diff)
from commons.pipeline.engine import SchedulablePipeline # Before — manual loop: # for batch in loader: # opt.zero_grad() # loss = model(batch).sum() # loss.backward() # opt.step() # After — preset: pipe = SchedulablePipeline.basic(model, optimizer, loss_fn=lambda out: out.sum()) for batch in loader: pipe.step(batch)
8.2 T2: with prefetch + memcpy stream8.2 T2:带 prefetch + memcpy 流
pipe = SchedulablePipeline.basic( model, optimizer, loss_fn=lambda out: out.sum(), prefetch=True, memcpy_stream=True, ) # Drive with cpu-resident batches; engine moves them to GPU on memcpy stream for cpu_batch in cpu_loader: pipe.step(cpu_batch)
8.3 HSTU usage8.3 HSTU 使用
from commons.pipeline.hstu_pipeline import HSTUPipelineFactory pipe = HSTUPipelineFactory.create( "hstu_prefetch_sparse_dist", model=model_train, optimizer=dense_optimizer, device=torch.device("cuda"), batch_shuffler=batch_shuffler, ) # Create the iterator ONCE outside the loop. SPEC_p4 v2's # iterator-identity reset means a fresh `iter(dataloader)` per # step would restart the engine each time and discard in-flight # batches. dataloader_iter = iter(dataloader) for _ in range(max_train_iters): try: loss, tokens, output = pipe.progress(dataloader_iter) except StopIteration: break pipe.shutdown()
Matches legacy's return signature; use case is drop-in replacement of JaggedMegatronTrainPipelineSparseDist / JaggedMegatronPrefetchTrainPipelineSparseDist.
返回签名与遗留一致;可作为 JaggedMegatronTrainPipelineSparseDist / JaggedMegatronPrefetchTrainPipelineSparseDist 的直接替换。
8.4 Custom thread_map8.4 自定义 thread_map
# by_stream (default for non-HSTU SchedulablePipeline.basic) pipe = SchedulablePipeline.basic(model, opt, threaded=True, thread_map="by_stream") # per_task — every task its own thread pipe = SchedulablePipeline.basic(model, opt, threaded=True, thread_map="per_task") # Explicit dict pipe = SchedulablePipeline.basic( model, opt, threaded=True, thread_map={ "h2d": "io", "forward": "compute", "backward": "compute", "optimizer_step": "compute", }, ) # Callable pipe = SchedulablePipeline.basic( model, opt, threaded=True, thread_map=lambda task: "io" if task.stream == "memcpy" else "compute", )
9. Internal API examples9. 内部接口与示例
9.1 Custom Task subclass9.1 自定义 Task 子类
from commons.pipeline.engine import Task, DataSlot class CountTokensTask(Task): name = "count_tokens" stream = "default" lookahead = 0 reads = (DataSlot("batch_gpu", 0),) writes = (DataSlot("token_count", 0),) def run(self, ctx): batch = ctx.slots["batch_gpu"] ctx.slots.set("token_count", batch.numel())
9.2 Hand-built Schedule9.2 手工搭建 Schedule
from commons.pipeline.engine import ( Schedule, Stage, StreamPool, Task, DataSlot, SchedulablePipeline, ) # Two tasks: producer on memcpy stream, consumer on default producer = Task.from_fn( "producer", lambda ctx: ctx.slots.set("x", torch.randn(8, device="cuda")), stream="memcpy", writes=(DataSlot("x", 0),), ) consumer = Task.from_fn( "consumer", lambda ctx: ctx.slots.set("step_result", ctx.slots["x"].sum()), stream="default", reads=(DataSlot("x", 0),), writes=(DataSlot("step_result", 0),), ) schedule = Schedule( stages=(Stage(tasks=(producer, consumer)),), stream_slots=("default", "memcpy"), ) pool = StreamPool({ "default": torch.cuda.default_stream(), "memcpy": torch.cuda.Stream(priority=-1), }) pipe = SchedulablePipeline(schedule, pool, executor="threaded") result = pipe.step(None) # engine fills batch_cpu but our tasks don't use it
9.3 Plug in a custom Executor9.3 接入自定义 Executor
# An executor must implement: execute_stage(...) and shutdown(). class VerboseExecutor: def __init__(self, inner): self._inner = inner def execute_stage(self, stage, ctx, iter_count, should_run, waits, pool): for task in stage.tasks: if should_run(task): print(f"[iter {iter_count}] running {task.name}") return self._inner.execute_stage(stage, ctx, iter_count, should_run, waits, pool) def shutdown(self): self._inner.shutdown() # Use: from commons.pipeline.engine import SequentialExecutor pipe = SchedulablePipeline(schedule, pool, executor=VerboseExecutor(SequentialExecutor()))
10. Limitations & followups10. 限制与后续工作
Tracked in tasks/followups.md. The non-resolved items relevant to current users:
完整列表在 tasks/followups.md。与当前用户相关的未解决项:
| Item条目 | Status状态 | Trigger to resume恢复触发条件 |
|---|---|---|
Engine offset-aware cross-stream sync (wait_event instead of wait_stream)引擎层 offset 感知的跨流同步(用 wait_event 代替 wait_stream) | SPEC drafted (SPEC_p3.md)SPEC 已草拟(SPEC_p3.md) | Perf profile shows io ↔ compute overlap collapse under non-identity shuffler非 identity shuffler 下性能 profile 显示 io ↔ compute 重叠崩溃 |
| Adam optimizer parityAdam 优化器 parity | xfail — torchrec checkpoint Adam-state limitationxfail —— torchrec checkpoint 的 Adam state 限制 | Upstream torchrec fix lands上游 torchrec 修复合入 |
| Real multi-rank NCCL ordering test真正的多 rank NCCL 顺序测试 | only Python-call-order tested in unit suite单元测试只覆盖了 Python 调用顺序 | Add multi-process NCCL fixture; partly covered by 4-GPU parity tests on real workloads补充多进程 NCCL fixture;4-GPU parity 测试已部分覆盖 |
| Non-Megatron DDP support in HSTU adapterHSTU 适配层支持非 Megatron DDP | finalize_model_grads + zero_grad_buffer hardcodedfinalize_model_grads + zero_grad_buffer 写死 | Generic DDP user wants to adopt有通用 DDP 用户想接入 |
| Semantic B (autonomous data pipeline)Semantic B(自治的数据流水线) | Engine has Semantic A only (prefetch_depth)引擎目前只有 Semantic A(prefetch_depth) | Workload with high input_dist latency variance有 input_dist 延迟方差较大的负载 |
338f0c46, 4-GPU on ipp1-2028): 25 passed, 16 xfailed (all adam), 0 failed. Anti-flake mode: 2 consecutive runs, 50-step long-run drift check, ≥2-thread utilization probe, NCCL deadlock regression guard.
(提交 338f0c46,ipp1-2028 上 4-GPU):25 passed、16 xfailed(全部为 adam)、0 failed。抗抖动模式:连跑 2 次、50 步长程漂移检查、≥2 线程利用率探针、NCCL 死锁回归守卫。
Appendix A. Δ in cross_iter_depends_on附录 A. cross_iter_depends_on 中的 Δ
Δ is the engine's internal name for the "progress lag" between when a producer records its event and when a consumer reads it via a cross_iter_depends_on=((X, -N),) edge. Defined and bounded entirely by lookaheads and N; it never appears in the user-facing API but governs which configurations are accepted at SchedulablePipeline.__init__.
Δ 是引擎内部用来描述 cross_iter_depends_on=((X, -N),) 这条边上"producer 录 event 到 consumer 读 event 之间的 progress 延迟"的量。完全由 lookahead 和 N 决定;用户接口里不出现,但决定了 SchedulablePipeline.__init__ 时哪些配置会被接受。
A.1 DefinitionA.1 定义
Δ is primitively defined as the progress lag between two task submissions — equivalently, the number of ring.advance() calls (one per progress()) elapsed between when a producer records an event and when a consumer reads it back:
Δ 的本质定义是两次任务提交之间的 progress 跨度——等价地,producer 录 event 到 consumer 读 event 之间 ring.advance()(每次 progress() 调一次)调用的次数:
$$\Delta \;\stackrel{\text{def}}{=}\; q_c - q_p$$
where $q_c$ is the progress() index in which consumer reads the event and $q_p$ is the progress() index in which producer recorded it. This definition is universal — it applies to any cross-progress wait, independent of which sync field the user declared. cross-iter requires $q_p < q_c$, i.e. $\Delta \ge 1$. 其中 $q_c$ 是 consumer 读 event 时所在的 progress() 编号,$q_p$ 是 producer 录 event 时所在的 progress() 编号。这个定义是普适的——任何跨 progress 等待都适用,与用户用了哪种 sync 字段无关。cross-iter 要求 $q_p < q_c$,即 $\Delta \ge 1$。
Closed-form value in the cross_iter case. When the wait is declared via consumer.cross_iter_depends_on=(("X", -N),) with $N > 0$, the lookahead mechanics determine $q_c$ and $q_p$ uniquely; working through them (see §A.2) gives:
cross_iter 场景下的闭式取值。当依赖通过 consumer.cross_iter_depends_on=(("X", -N),) ($N > 0$) 声明时,lookahead 机制把 $q_c$ 和 $q_p$ 唯一确定下来;按 §A.2 的推导,可以算出:
$$\Delta \;=\; X.\mathrm{la} + N - \mathrm{consumer.la} \quad\text{(cross\_iter only)}$$
This is a derived equality, not a definition — it holds in the cross_iter context because of how lookahead pins down which batch each task processes. Other sync types would yield different closed forms (e.g. same_progress_sync always has $\Delta = 0$ since both tasks are in the same progress).
这是个推导出来的等式,不是定义——在 cross_iter 上下文里成立,是因为 lookahead 把每个 task 处理的 batch 钉死了。其他 sync 类型会有不同的闭式(例如 same_progress_sync 永远 $\Delta = 0$,因为两个 task 在同一次 progress 内)。
Useful decomposition: $\Delta = N + (X.\mathrm{la} - \mathrm{consumer.la})$. When producer and consumer have equal lookahead, $\Delta = N$ exactly (the user's literal "wait for N batches ago"); when producer is deeper in the ring ($X.\mathrm{la} > \mathrm{consumer.la}$), $\Delta > N$ (extra rotations to bring the slot down to consumer's view). 有用的分解:$\Delta = N + (X.\mathrm{la} - \mathrm{consumer.la})$。producer 与 consumer lookahead 相等时 $\Delta = N$(即用户字面理解的"N 个 batch 前");producer 在 ring 更深处 ($X.\mathrm{la} > \mathrm{consumer.la}$) 时 $\Delta > N$(slot 需要多平移几步才到 consumer 视角)。
A.2 Derivation of Δ in the cross_iter caseA.2 cross_iter 场景下 Δ 的推导
Goal: take the primitive Δ definition $q_c - q_p$ from §A.1 and reduce it to a closed-form expression in $X.\mathrm{la}$, $N$, and $\mathrm{consumer.la}$ when the wait is declared as cross_iter_depends_on=(("X", -N),).
目标:把 §A.1 给出的本质定义 $\Delta = q_c - q_p$,在 cross_iter_depends_on=(("X", -N),) 这种声明下化简成只含 $X.\mathrm{la}$、$N$、$\mathrm{consumer.la}$ 的闭式。
Convention: in P_n (the n-th call to progress(), 0-indexed), a task with lookahead k processes batch B_(n+k) and records its event at slot[k]. Equivalently, slot[k] in P_n currently holds B_(n+k).
约定:在 P_n(第 n 次调用 progress(),0-indexed)里,lookahead 为 k 的 task 处理 batch B_(n+k),并在 slot[k] 记录 event。等价地,P_n 里 slot[k] 当前装着 B_(n+k)。
Consumer at P_n = q_c processes B_(n + consumer.la). The cross_iter declaration says it waits for X's work on B_(n + consumer.la − N). The progress in which X processes B_(n + consumer.la − N) is P_(n + consumer.la − N − X.la):
consumer 在 P_n = q_c 处理 B_(n + consumer.la)。cross_iter 要等 X 在 B_(n + consumer.la − N) 上的工作。而处理 batch B_(n + consumer.la − N) 的 progress 是在 P_(n + consumer.la − N − X.la):
$$ \begin{aligned} q_p &= n + \mathrm{consumer.la} - N - X.\mathrm{la} \\ \Delta &= q_c - q_p \\ &= n - (n + \mathrm{consumer.la} - N - X.\mathrm{la}) \\ &= X.\mathrm{la} + N - \mathrm{consumer.la} \end{aligned} $$
Where consumer actually looks up the event. Producer recorded its event at $\mathrm{slot}[X.\mathrm{la}]$ in its own progress $P_{q_p}$. Each ring.advance() uniformly shifts every slot by $-1$. After $\Delta$ advances, the slot that was at offset $X.\mathrm{la}$ has rotated to offset $X.\mathrm{la} - \Delta$ in consumer's progress $P_{q_c}$. The engine's consumer-side ring index is exactly this:
consumer 实际去查的 slot。producer 在自己的 progress $P_{q_p}$ 里把 event 录在 $\mathrm{slot}[X.\mathrm{la}]$;每次 ring.advance() 把所有 slot 整体 $-1$ 平移,经过 $\Delta$ 次 advance 后那个 slot 在 consumer 的 progress $P_{q_c}$ 里已经落到偏移 $X.\mathrm{la} - \Delta$。这个偏移就是引擎在 consumer 端实际查询的 ring index:
$$\mathrm{slot\_offset} \;\stackrel{\text{def}}{=}\; X.\mathrm{la} - \Delta$$
$X.\mathrm{la}$ does appear in $\Delta$, but it cancels when we substitute: $\Delta$ 里其实带着 $X.\mathrm{la}$,代入展开后两个 $X.\mathrm{la}$ 相消:
$$\mathrm{slot\_offset} \;=\; X.\mathrm{la} - (X.\mathrm{la} + N - \mathrm{consumer.la}) \;=\; \mathrm{consumer.la} - N$$
That is why the consumer-side lookup index depends only on $\mathrm{consumer.la}$ and $N$, not on the producer's lookahead — ring rotation is uniform, so producer.la is fully absorbed into $\Delta$. The slot_offset formula in deps.py writes this directly as consumer.batch_offset + neg_offset (where neg_offset = -N).
这就是为什么 consumer 端实际查询的 slot 编号只跟 $\mathrm{consumer.la}$ 和 $N$ 有关,与 producer 的 lookahead 无关——ring 旋转是整体均匀平移,producer.la 的信息全部被 $\Delta$ 吸收。deps.py 里直接写成 consumer.batch_offset + neg_offset(其中 neg_offset = -N)。
A.3 Validity constraintsA.3 合法性约束
| Condition条件 | Requirement要求 | Why / engine response原因 / 引擎反应 |
|---|---|---|
| $\Delta < 0$ | $\mathrm{consumer.la} > X.\mathrm{la} + N$ | Future-read: producer hasn't yet processed $B_{K-N}$ by consumer's progress. deps.py raises at construction.future-read:producer 在 consumer 的 progress 时还没处理过 $B_{K-N}$。deps.py 构造时直接 raise。 |
| $\Delta = 0$ | $\mathrm{consumer.la} = X.\mathrm{la} + N$ | Same-progress, no ring rotation. Auto-promoted to same_progress_sync semantics: engine adds the topo-DAG edge X → consumer, the cross-thread CPU edge, and emits wait_event at slot[X.la]. The user-facing declaration stays as written; the engine handles the dispatch. (Earlier versions raised here, instructing the user to rewrite as same_progress_sync; that step is now optional. See SPEC_cross_iter_delta0_autoconvert.md.)同 progress、无 ring 旋转。自动提升为 same_progress_sync 语义:引擎加 topo-DAG 边 X → consumer、跨线程 CPU 边,跨 stream 时在 slot[X.la] emit wait_event。用户层声明保持原样,引擎自动派发。(早期版本会在此 raise 并要求重写为 same_progress_sync;该步骤现在可选。详见 SPEC_cross_iter_delta0_autoconvert.md。) |
| $\Delta \ge 1$ | $\mathrm{consumer.la} \le X.\mathrm{la} + N - 1$ | Strictly cross-progress — accepted.严格跨 progress —— 接受。 |
| Additional (cross-stream only): $\mathrm{slot\_offset} \ge 0$额外(仅 cross-stream):$\mathrm{slot\_offset} \ge 0$ | $\mathrm{consumer.la} \ge N$ — slot still in ring; otherwise event has been recycled. Same-stream skips this check (CUDA stream FIFO orders submissions without needing the event lookup).$\mathrm{consumer.la} \ge N$ —— slot 仍在 ring 内;否则 event 已被回收。同 stream 跳过此检查(CUDA stream FIFO 直接保序,不需要查 event)。 | |
Combined cross-stream interval: cross-stream 合并区间:
$$N \;\le\; \mathrm{consumer.la} \;\le\; X.\mathrm{la} + N - 1 \quad\Longleftrightarrow\quad 1 \;\le\; \Delta \;\le\; X.\mathrm{la}$$
Same-stream relaxes to just Δ ≥ 1. 同 stream 放宽为只需 Δ ≥ 1。
Validator code: deps.py lines 437-501 (Δ=0 / Δ<0), and deps.py line 515 (slot_offset<0).
校验代码:deps.py 437-501 行(Δ=0 / Δ<0),deps.py 515 行(slot_offset<0)。
A.4 Plain-language summaryA.4 通俗解释
Read cross_iter_depends_on=(("X", -N),) as: "the consumer task, when running on batch $K$, needs X's GPU work for batch $K - N$ to be already finished and recorded into the ring." $\Delta$ tells the engine how stale X's record will be by the time consumer reads it — measured in progress() calls.
把 cross_iter_depends_on=(("X", -N),) 读作:"consumer 在 batch $K$ 上跑时,需要 X 在 batch $K - N$ 上的 GPU 工作已经做完并录进 ring。"$\Delta$ 告诉引擎:等 consumer 真正读这条 event 时,它在 ring 里待机了多少个 progress() 步。
- $\Delta = 1$: producer ran in the immediately preceding progress. Most common case.$\Delta = 1$:producer 在紧邻的上一次 progress 跑完。最常见。
- $\Delta = 2, 3, \ldots$: producer ran further back. Engine still finds the event in the ring as long as $\Delta \le X.\mathrm{la}$ (the slot hasn't rotated out yet).$\Delta = 2, 3, \ldots$:producer 在更早的 progress 跑完。只要 $\Delta \le X.\mathrm{la}$(slot 还没被旋转出去),引擎仍能在 ring 里找到 event。
- $\Delta = 0$: producer and consumer are both in the same progress. Engine auto-promotes the declaration to
same_progress_syncsemantics — same topo edge, same CPU edge, sameslot[producer.la]event lookup. Equivalent to writingsame_progress_sync=(X,)directly; the cross_iter syntax just lets users keep a batch-flow mental model.$\Delta = 0$:producer 和 consumer 在同一次 progress 里。引擎自动提升这条声明为same_progress_sync语义——相同的 topo 边、CPU 边、slot[producer.la]event 查询。与直接写same_progress_sync=(X,)等价;cross_iter 语法只是让 batch-flow 思维的用户少改一行。 - $\Delta < 0$: consumer would be asking for something producer hasn't computed yet. Logically impossible.$\Delta < 0$:consumer 在等 producer 还没算出来的东西。逻辑上不可能。
Quick mnemonic: if you wrote cross_iter_depends_on=(("X", -1),), you usually mean "wait for last progress's X". That works exactly when $\Delta = 1$, which forces $X.\mathrm{la} = \mathrm{consumer.la}$. Any la mismatch causes $\Delta$ to drift and may collide with one of the boundary checks above.
速记:写 cross_iter_depends_on=(("X", -1),) 通常是想表达"等上一次 progress 的 X"。这恰好对应 $\Delta = 1$,强制 $X.\mathrm{la} = \mathrm{consumer.la}$。任何 la 不一致都会让 $\Delta$ 偏离,可能撞上上面的边界检查。
A.5 Worked examplesA.5 实例
| $X.\mathrm{la}$ | $\mathrm{consumer.la}$ | $N$ | $\Delta$ | $\mathrm{slot\_offset}$ | Verdict判定 |
|---|---|---|---|---|---|
| 0 | 0 | 1 | 1 | −1 | cross-stream: REJECTED (slot rotated out). same-stream: OK (FIFO).cross-stream:拒(slot 出 ring)。same-stream:通过(FIFO 保序)。 |
| 1 | 1 | 1 | 1 | 0 | OK. "Wait for last progress's X on same lookahead."通过。"等上一 progress 的 X,同 lookahead。" |
| 2 | 2 | 2 | 2 | 0 | OK. "Wait for X two progresses back, same lookahead."通过。"等 2 个 progress 前的 X,同 lookahead。" |
| 3 | 2 | 2 | 3 | 0 | OK. X is deeper, so its slot needs 3 advances to reach consumer's view; ring depth 4 holds it.通过。X 在更深处,slot 需要旋转 3 次才到 consumer 视角;ring 深度 4 装得下。 |
| 0 | 1 | 1 | 0 | 0 | OK — Δ=0 auto-promoted. Engine adds topo edge X → consumer; emits wait_event at slot[0] (cross-stream) or relies on FIFO + topo order (same-stream). Equivalent to same_progress_sync=(X,).通过 —— Δ=0 自动提升。引擎加 topo 边 X → consumer;跨流时 slot[0] emit wait_event,同流靠 FIFO + topo 顺序。等价于 same_progress_sync=(X,)。 |
| 0 | 3 | 1 | −2 | 2 | REJECTED — future-read (Δ<0). Producer hasn't run yet at consumer's progress.拒 —— future-read (Δ<0)。consumer 的 progress 时 producer 还没跑过这个 batch。 |