Schedulable Pipeline Engine可调度流水线引擎

Branch: junzhang/postproc-cleanup · Last updated: 2026-04-28 (UTC+8) · Coupled SPECs: SPEC.md (Problem #1), SPEC_p2.md (HSTU adapter), SPEC_p3.md (event-based sync, RESOLVED), SPEC_p4_declarative_pipeline.md (v2 declarative API). 分支:junzhang/postproc-cleanup · 最后更新:2026-04-28(UTC+8)· 相互关联的 SPEC:SPEC.md(Problem #1)、SPEC_p2.md(HSTU 适配层)、SPEC_p3.md(基于事件的同步,已 RESOLVED)、SPEC_p4_declarative_pipeline.md(v2 声明式 API)。

Changelog: SPEC_p4 v3 (2026-04-28)更新日志:SPEC_p4 v3(2026-04-28)
  • Three-field dependency API: depends_on is now same-batch logical dependency only ("this task on batch K waits for X to have processed batch K"). Two new sibling fields cover the other two intents:
    • cross_iter_depends_on=("X", ("Y", -2), ...) — different-batch logical dependency. Bare name is shorthand for (name, -1); explicit tuple required for N≠1. Positive / zero offsets rejected.
    • same_progress_sync=("X", ...) — same-progress GPU/stream coherency wait, regardless of which batches X and self are processing. Mirrors the legacy default_stream.wait_stream(prefetch_stream) pattern (e.g. backward.same_progress_sync=("prefetch_embeddings",)).
    The three fields are non-overlapping (a producer name in two fields raises). Each is validated separately at Task.__init__.
    三字段依赖 APIdepends_on 现在表达同 batch 的逻辑依赖("本 task 处理 batch K 时,要等 X 处理完 batch K")。另外两个意图改用新增的姐妹字段:
    • cross_iter_depends_on=("X", ("Y", -2), ...) —— 跨 batch 的逻辑依赖。bare name 等价于 (name, -1);N≠1 必须用显式 tuple。正/零 offset 直接拒绝。
    • same_progress_sync=("X", ...) —— 同一次 progress 内的 GPU/stream 协同等待,与 X / self 处理的 batch 无关。对应遗留的 default_stream.wait_stream(prefetch_stream) 模式(如 backward.same_progress_sync=("prefetch_embeddings",))。
    三个字段名空间互斥(同一个 producer 名字出现在两个字段会报错),分别在 Task.__init__ 校验。
  • Topological-sort execution order replaces declaration order: SchedulablePipeline.__init__ calls topological_sort(schedule) (Kahn's algorithm) and rebuilds the single-stage Schedule with the sorted task tuple. Edges come from reads/writes exact slot match, depends_on with matching lookahead, same_progress_sync, and cross_iter_depends_on entries with Δ=0 (auto-promoted, see next bullet). Δ ≥ 1 cross_iter_depends_on and cross-lookahead depends_on are excluded (cross-progress deps are satisfied by ring rotation, not intra-progress ordering). Declaration order is now only a tie-break for incomparable tasks.拓扑排序执行序取代声明顺序SchedulablePipeline.__init__ 调用 topological_sort(schedule)(Kahn 算法),用排序后的任务元组重建单 stage 的 Schedule。边来自 reads/writes 精确 slot 匹配、lookahead 相等的 depends_onsame_progress_sync,以及 Δ=0 的 cross_iter_depends_on(自动提升,见下一条);Δ ≥ 1 的 cross_iter_depends_on 与跨 lookahead 的 depends_on 不入边(跨 progress 由 ring 平移满足)。声明顺序仅作为不可比任务之间的 tie-break。
  • Cross-iter slot-offset formula corrected: cross_iter_depends_on=((X, -N)) now resolves the producer's event at slot_offset = consumer.batch_offset + neg_offset, not producer.batch_offset + neg_offset. The old formula was only correct when producer.lookahead == consumer.lookahead; the new one correctly indexes the consumer's view of the ring regardless of producer lookahead.修正跨迭代 slot 偏移公式cross_iter_depends_on=((X, -N)) 现在通过 slot_offset = consumer.batch_offset + neg_offset 取 producer 事件,不再用 producer.batch_offset + neg_offset。旧公式只在 producer.lookahead == consumer.lookahead 时正确;新公式从 consumer 视角索引 ring,与 producer lookahead 无关。
  • Δ = 0 cross_iter_depends_on auto-promoted to same_progress_sync semantics: when consumer.la == producer.la + |neg_offset| (the producer and consumer run in the same progress on different batches), the engine now treats the declaration as if the user had written same_progress_sync=(producer,) — adds a topo-DAG edge producer → consumer in _build_same_progress_dag_edges, adds the cross-thread CPU threading.Event in _compute_cpu_deps, and emits the cross-stream wait_event at slot[producer.la] (no ring rotation). Earlier this case raised at construction; the rejection was UX-only — see tasks/SPEC_cross_iter_delta0_autoconvert.md and tests/commons/test_engine_same_progress_sync_correctness.py (parametrized: same_progress_sync explicit ≡ cross_iter Δ=0 auto-promoted, GPU-verified).Δ = 0 cross_iter_depends_on 自动提升为 same_progress_sync 语义:当 consumer.la == producer.la + |neg_offset|(producer 与 consumer 同 progress 处理不同 batch),引擎现在把这个声明当作用户写了 same_progress_sync=(producer,) 处理:在 _build_same_progress_dag_edges 里加 topo 边 producer → consumer,在 _compute_cpu_deps 里加跨线程 threading.Event 边,跨 stream 时在 slot[producer.la] emit wait_event(无 ring 旋转)。早期版本会在构造时 raise;之前的拒绝纯属 UX 选择——详见 tasks/SPEC_cross_iter_delta0_autoconvert.mdtests/commons/test_engine_same_progress_sync_correctness.py(参数化:显式 same_progress_sync ≡ Δ=0 cross_iter 自动提升,GPU 验证一致)。
  • Cross-iter future-read validator: infer_cross_stream_event_deps raises at schedule construction when Δ < 0 (i.e., consumer.la > producer.la + |neg_offset|): the producer hasn't yet processed batch K-N by the consumer's progress, so the dependency is impossible to honor.跨迭代 future-read 校验infer_cross_stream_event_depsΔ < 0(即 consumer.la > producer.la + |neg_offset|)时构造期 raise——consumer 的 progress 时 producer 还没处理过 batch K-N,依赖不可能满足。
  • PostProc dead-code removed: PipelinedPostproc, PipelineState.pipelined_postprocs, the set_module_context propagation loop, and the now-orphaned _validate_set_context_colocation guard are deleted (never wired in production). The torchrec set_context mutation that motivated those guards no longer exists in this codebase.删除 PostProc 死代码PipelinedPostprocPipelineState.pipelined_postprocsset_module_context 传播循环、以及随之失去意义的 _validate_set_context_colocation 守卫全部删除(从未在生产链路接入)。当初触发这些守卫的 torchrec set_context mutation 已不在本仓库出现。
Changelog: SPEC_p4 v2 (2026-04-26)更新日志:SPEC_p4 v2(2026-04-26)
  • User-facing Task API: batch_offset=N kwarg removed. Use lookahead=N instead — same semantics, better name. reads / writes accept bare strings; offsets are inferred from the producer's lookahead.用户可见的 Task API:移除了 batch_offset=N 形参。改用 lookahead=N,语义不变、名字更准。reads / writes 直接传字符串,offset 由生产者的 lookahead 自动推断。
  • Bare-name depends_on: write depends_on=("prefetch_embeddings",) and the engine auto-classifies as within-iter or cross-iter from the producer's vs consumer's lookahead. No more leaking pseudo-slot names like module_input_post_prefetch in user code. (Superseded in v3 — see above.)bare-name depends_on:直接写 depends_on=("prefetch_embeddings",),引擎根据生产者/消费者的 lookahead 自动判断是同迭代还是跨迭代依赖。再也不用把 module_input_post_prefetch 这类 pseudo-slot 暴露在用户代码里。(v3 已替换为三字段,见上。)
  • Declarative Pipeline wrapper: commons.pipeline.engine.Pipeline(tasks=[...]) — a thin façade over SchedulablePipeline + Schedule + StreamPool for one-shot user code.声明式 Pipeline 封装commons.pipeline.engine.Pipeline(tasks=[...]),把 SchedulablePipeline + Schedule + StreamPool 一层薄封装,给一次性用户代码用。
  • Event-based cross-stream sync: executor prefers wait_event(producer_event) from the ring slot over coarse wait_stream(producer_stream). Stream-level wait remains as first-iter fallback.基于 Event 的跨流同步:执行器优先从 ring slot 取 wait_event(producer_event),回退到粗粒度的 wait_stream(producer_stream)。stream 级别的 wait 仍作为第一轮迭代的兜底。
  • NVTX integration: every task.run(ctx) is wrapped in nvtx.annotate(task.nvtx_tag or task.name) at all execution callsites (sequential, threaded, single-task fast path, NCCL-locked path). No-op on CPU-only hosts.NVTX 集成:所有执行点(sequential / threaded / 单任务快路径 / NCCL-锁路径)都用 nvtx.annotate(task.nvtx_tag or task.name) 包裹了 task.run(ctx)。CPU-only 环境下是 no-op。
  • HSTUPipeline.detach() resets bookkeeping so a follow-up attach() + progress() rebuilds the engine cleanly (Codex B-MEDIUM-1).HSTUPipeline.detach() 现在重置 bookkeeping,下一次 attach() + progress() 能干净地重建引擎(Codex B-MEDIUM-1)。
  • SchedulablePipeline.progress() resets §4.8 state on a fresh iterator (mirrors legacy train_pipeline.py:418). Switching from train loader to eval loader inside one session no longer raises StopIteration immediately.SchedulablePipeline.progress() 在新 iterator 上重置 §4.8 状态(对齐遗留的 train_pipeline.py:418)。在同一个 session 里从 train loader 切到 eval loader 不再立刻抛 StopIteration
  • HSTU pretrain ranking GR integration: RECSYS_PIPELINE_BACKEND=new (default legacy) routes examples/hstu/training/pretrain_gr_ranking.py through HSTUPipelineFactory. 8-GPU benchmark on synthetic 8-layer 1024-hidden + cutlass: 568.24 / 565.76 TFLOPS (legacy / new, within 0.4%); 81.8% NCCL/compute overlap; 64 host threads × 5 streams. See tasks/nsys_runs/RESULTS.md.集成到 HSTU pretrain ranking GRRECSYS_PIPELINE_BACKEND=new(默认 legacy)把 examples/hstu/training/pretrain_gr_ranking.pyHSTUPipelineFactory。8 卡基准(合成 8 层 1024 隐藏 + cutlass):568.24 / 565.76 TFLOPS(legacy / new,差 0.4%);NCCL/compute overlap 81.8%;64 个 host 线程跨 5 个 stream。详见 tasks/nsys_runs/RESULTS.md

1. Background & motivation1. 背景与动机

The legacy training pipeline at examples/commons/pipeline/train_pipeline.py follows torchrec's design: a hand-rolled progress() method that hardcodes task sequencing — H2D → input_dist → wait → forward → backward → optimizer — with HSTU-specific extensions wedged in (KK shuffler, loss-token AllReduce, Megatron grad finalize, NCCL stream discipline). 遗留的训练流水线 examples/commons/pipeline/train_pipeline.py 沿用 torchrec 的设计:一个手写的 progress() 方法把任务顺序写死 —— H2D → input_dist → wait → forward → backward → optimizer —— 然后在中间硬塞各种 HSTU 专属扩展(KK shuffler、loss-token AllReduce、Megatron 梯度收尾、NCCL 流纪律等)。

Three structural problems motivated the rework: 驱动这次重写的三个结构性问题:

  1. Coupling耦合: every task ordering decision lived inside one Python method. Adding a stage required editing the loop. 所有任务顺序决策都挤在一个 Python 方法里。新增一个阶段就得改主循环。
  2. Single-threaded单线程: CPU-side kernel submission was strictly serial; CPU-bound work (e.g. KK partitioner) blocked the whole pipeline. CPU 端的 kernel 提交是严格串行的;任何 CPU-bound 的工作(比如 KK partitioner)都会阻塞整条流水线。
  3. No customization point没有定制点: variants like JaggedMegatronPrefetch were full subclass forks. JaggedMegatronPrefetch 这样的变体只能整个子类化复制粘贴。

The rework is structured as three coupled but independent slices: 重写被拆成 三个相互关联但独立交付的切片

Slice切片 Layer层级 Goal目标 Constraint约束
Problem #1 Engine引擎 Decouple tasks from schedule with HugeCTR-style primitives用 HugeCTR 风格的原语把任务和调度解耦 Framework-agnostic — no torchrec, megatron, fbgemm_gpu imports框架无关 —— 不引入 torchrecmegatronfbgemm_gpu
Problem #3 Threaded executor多线程执行器 Multi-threaded task dispatch with NCCL ordering safety多线程任务派发,并保持 NCCL 顺序安全 Same engine still works single-threaded同一个引擎仍能在单线程下工作
Problem #2 HSTU adapterHSTU 适配层 Drive torchrec's _rewrite_model machinery via the engine通过引擎驱动 torchrec 的 _rewrite_model 机制 Legacy train_pipeline.py stays byte-identical遗留的 train_pipeline.py 保持字节级不变
Reference参考: design inspired by HugeCTR's Pipeline + Scheduleable abstraction. We borrow the "task as schedulable unit" model but replace HugeCTR's C++ runtime with a Python engine + PyTorch streams. 设计灵感来自 HugeCTR 的 Pipeline + Scheduleable 抽象。我们沿用"任务即可调度单元"的模型,但把 HugeCTR 的 C++ 运行时换成了 Python 引擎 + PyTorch 流。

2. Architecture overview2. 架构概览

2.1 Three layers2.1 三层结构

Three-layer architecture Layer 3 — HSTU adapter (Problem #2) examples/commons/pipeline/hstu_pipeline/ HSTUPipeline HSTUPipelineFactory 14 task factories PipelineState Layer 2 — Threaded executor (Problem #3) examples/commons/pipeline/engine/executor.py ThreadedExecutor SequentialExecutor _NcclOrderedLock _compute_cpu_deps Layer 1 — Engine core (Problem #1) examples/commons/pipeline/engine/ — framework-free SchedulablePipeline Task DataSlot Schedule/Stage StreamPool BatchRing TaskContext + autosched/ submodule: validator, cost_model, list_scheduler depends on depends on
Component diagram: dependency direction is strictly downward. Layer 1 has no torchrec/megatron imports. Layers 2 and 3 add capability without modifying lower layers. 组件图:依赖方向严格向下。Layer 1 不引入任何 torchrec/megatron。Layer 2 和 Layer 3 只新增能力,不修改下层。

2.2 Module dependency2.2 模块依赖

# Engine core — pure stdlib + torch only
commons.pipeline.engine
├── task.py       # Task, DataSlot
├── context.py    # SlotStore, BatchRing, TaskContext (thread-local)
├── streams.py    # StreamPool with anchor_device + use(stream) ctx
├── schedule.py   # Schedule, Stage
├── deps.py       # infer_cross_stream_waits
├── pipeline.py   # SchedulablePipeline driver + _seed_first_batch
├── executor.py   # Sequential + Threaded + _NcclOrderedLock
├── _presets.py   # T1/T2 forward/backward/optimizer task factories
└── autosched/
    ├── validator.py        # 8-rule SPEC §4.2 schedule validity
    ├── cost_model.py       # CostProfiler, CostModel (JSON roundtrip)
    └── list_scheduler.py   # Critical-path list scheduler

# HSTU adapter — depends on torchrec, megatron, dynamicemb
commons.pipeline.hstu_pipeline
├── pipeline.py   # HSTUPipeline + HSTU_DEFAULT_THREAD_MAP
├── tasks.py      # 14 task factories + PipelineState
└── factory.py    # HSTUPipelineFactory

2.3 Hard invariants2.3 硬性不变量

  • Engine framework-free引擎层框架无关: commons.pipeline.engine.* never imports torchrec, megatron, fbgemm_gpu, or commons.distributed.*. Enforced by test_engine_import_hygiene.py. commons.pipeline.engine.* 不引入 torchrecmegatronfbgemm_gpucommons.distributed.*。由 test_engine_import_hygiene.py 强制约束。
  • Legacy untouched遗留代码字节级不动: train_pipeline.py, train_pipeline_factory.py, utils.py stay byte-identical. Enforced by test_engine_legacy_untouched.py. train_pipeline.pytrain_pipeline_factory.pyutils.py 保持字节级一致。由 test_engine_legacy_untouched.py 强制约束。
  • v1-only contexts in HSTUHSTU 仅允许 v1 context: create_torchrec_ctx asserts ctx.version == 1. Enforced by test_no_v0_context_references_in_hstu_pipeline (regex scan). create_torchrec_ctx 断言 ctx.version == 1。由 test_no_v0_context_references_in_hstu_pipeline(正则扫描)强制约束。
  • Out-of-slot shared state needs an engine-visible happens-beforeslot 之外的共享状态需要引擎可见的 happens-before: Rule. Engine only inserts sync for edges it can see: same-slot reads/writes, matching-la depends_on, or same_progress_sync. Anything outside slots — module attributes, PipelineState fields, globals — is invisible. If two tasks mutate the same out-of-slot object without one of those declared edges, author must colocate them on one thread via thread_map (same thread = serial, no race). Convention only; no runtime check.

    Example. Two tasks both write module.forward._context: start_input_dist (la=1) and forward (la=0). Within one progress, ordering is fine — forward reads torchrec_ctx that start_input_dist wrote (slot edge). But across progresses forward(P_n) and start_input_dist(P_n) actually run concurrently in the same progress on different lookaheads, touching different batches' contexts but the same Python attribute → race. Fix used: drop the shared mutation entirely (PostProc cleanup); the colocation guard that previously enforced "put both on one thread" was deleted along with it.
    规则。引擎只对它能看见的边加 sync:同 slot 的 reads/writes、la 相等的 depends_on、或 same_progress_sync。slot 之外的东西—— module 属性、PipelineState 字段、全局变量——引擎看不见。如果两个 task 改 slot 之外的同一个对象、又没声明上述任意一种边,必须用 thread_map 放到同一个线程(单线程内自然串行)。约定式约束,运行时不检查。

    例子。两个 task 都写 module.forward._contextstart_input_dist(la=1) 与 forward(la=0)。同一次 progress 内顺序没问题——forwardstart_input_dist 写过的 torchrec_ctx slot(slot 边)。但 P_n 里的 forwardstart_input_dist 同 progress 不同 la 并发跑,触碰不同批次的 context 但是同一个 Python 属性 → 竞态。修法:直接砍掉这个共享 mutation(PostProc 清理),原本"把两者放同线程"的运行时校验也一并删除。

3. Engine layer (Problem #1)3. 引擎层(Problem #1)

3.1 Task & DataSlot3.1 Task 与 DataSlot

A Task is a unit of work with declarative metadata for the engine to schedule: Task 是一个带声明式元数据的工作单元,由引擎进行调度:

class Task:
    name: str
    stream: str                                 # named CUDA stream context
    priority: int                              # NVTX priority hint
    lookahead: int                             # SPEC_p4 v2: 0 = current batch, k = k iters ahead
    reads: Tuple[DataSlot, ...]                # slots consumed (str shorthand auto-wraps)
    writes: Tuple[DataSlot, ...]               # slots produced
    depends_on: Tuple[str, ...]                # SAME-batch logical edge: this task on
                                                 #   batch K waits for X to have processed K
    cross_iter_depends_on: Tuple[Tuple[str, int], ...]
                                                 # DIFFERENT-batch logical edge: ("X",) is
                                                 #   shorthand for (X, -1); ("Y", -N) for N≠1.
                                                 #   Positive/zero offsets rejected.
    same_progress_sync: Tuple[str, ...]        # SAME-progress GPU/stream coherency wait,
                                                 #   regardless of which batches X/self process
                                                 #   (mirrors legacy default_stream.wait_stream)
    nccl: bool                                 # participate in NCCL ordered lock
    nvtx_tag: Optional[str]                      # auto-emitted via nvtx.annotate

    def run(self, ctx: TaskContext) -> None: ...

SPEC_p4 v3 user API note: the three dependency fields encode three distinct intents and are not interchangeable. depends_on is for same-batch logical waits (e.g. backward.depends_on=("zero_grad",)); cross_iter_depends_on is for "wait for X from N batches ago" (rare in HSTU); same_progress_sync is for "wait for X's GPU work in this progress to finish" (e.g. backward.same_progress_sync=("prefetch_embeddings",) in the prefetch variant — a stream coherency wait, not a logical batch dependency). Names mentioned in two fields raise at Task.__init__. Subclasses can set the three as class attributes; normalization runs unconditionally so bare-name shorthand works in both kwargs and class-attr forms. SPEC_p4 v3 用户 API 说明:三个依赖字段表达三种不同的意图,不可互换。depends_on 用于同 batch 的逻辑等待(如 backward.depends_on=("zero_grad",));cross_iter_depends_on 用于"等 X 处理 N 个 batch 之前的工作"(HSTU 暂无用例);same_progress_sync 用于"等 X 在本次 progress 内的 GPU 工作做完"(如 prefetch 变体的 backward.same_progress_sync=("prefetch_embeddings",) —— 是 stream 协同等待,不是逻辑 batch 依赖)。同一个名字出现在两个字段会在 Task.__init__ 抛错。子类可以以类属性形式设置三者;规范化无条件运行,bare-name 简写在 kwargs 和 class-attr 两种写法下都生效。

SPEC_p4 v2 user API note (commits 2060ee62, 9a2c7a12, faa7a30f): the legacy batch_offset=N kwarg is removed (Phase C hard break). Pass lookahead=N; reads / writes accept bare strings. Pseudo-slot names (e.g. module_input_post_prefetch) are no longer required to model HSTU's prefetch→forward mutation chain. SPEC_p4 v2 用户 API 说明(提交 2060ee629a2c7a12faa7a30f):legacy 的 batch_offset=N 形参已移除(Phase C hard break)。改用 lookahead=Nreads / writes 直接传字符串。HSTU 的 prefetch→forward mutation chain 不再需要 pseudo-slot 名(如 module_input_post_prefetch)。

DataSlot is an opaque named handle: (name: str, batch_offset: int). Two slots are equal iff both fields match. Cross-iter data flow happens via BatchRing.advance() shifting the slot store rather than copying values. DataSlot 是一个不透明的命名句柄:(name: str, batch_offset: int)。两个 slot 相等当且仅当两个字段都相等。跨迭代的数据流是通过 BatchRing.advance() 平移 slot store 实现的,并不真的拷贝数据。

3.2 Schedule & Stage3.2 Schedule 与 Stage

A Schedule is a frozen DAG declaration: Schedule 是一个冻结的 DAG 声明:

schedule = Schedule(
    stages=(
        Stage(tasks=(h2d_task, fwd_task, bwd_task, opt_task)),
    ),
    stream_slots=("default", "memcpy"),
)

Stages run in the order they appear in Schedule.stages. The tasks within a single-stage schedule are reordered at SchedulablePipeline.__init__ by topological_sort() (autosched/deps.py): edges come from exact-slot reads/writes match, depends_on with matching lookahead, and same_progress_sync; cross_iter_depends_on and cross-lookahead depends_on contribute no in-progress edge (cross-iter is satisfied by ring rotation). Author-declaration order is the tie-break for incomparable tasks. The §4.8 mask + executor parallelism choice still gate runtime triggering. in_flight_batches is derived from max(task.lookahead) + 1. Stage 按 Schedule.stages 中出现的顺序运行。单 stage 内的任务SchedulablePipeline.__init__ 调用 topological_sort()autosched/deps.py)重新排序:边来自 reads/writes 精确 slot 匹配、lookahead 相等的 depends_on、以及 same_progress_synccross_iter_depends_on 与跨 lookahead 的 depends_on 不入边(跨迭代由 ring 平移满足)。声明顺序作为不可比任务之间的 tie-break。运行期触发仍由 §4.8 掩码与执行器并行策略约束。in_flight_batchesmax(task.lookahead) + 1 推导。

The validator (autosched/validator.py) enforces 8 rules at SchedulablePipeline.__init__: validatorautosched/validator.py)在 SchedulablePipeline.__init__ 时强制执行 8 条规则:

Rule规则 Constraint约束
1Task names are unique within the schedule.同一个 schedule 中任务名唯一。
2task.lookahead >= 0 and slot.batch_offset >= 0.task.lookahead >= 0slot.batch_offset >= 0
3Every task.stream is in schedule.stream_slots AND in StreamPool.每个 task.stream 既要在 schedule.stream_slots 中,也要在 StreamPool 中。
4Each DataSlot(name, offset) has at most one writer.每个 DataSlot(name, offset) 最多只有一个 writer。
5Every read resolves to a writer (cross-iter via ring carry allowed; reserved slot batch_cpu exempt).每个读都能解析到一个 writer(允许跨迭代通过 ring 进位;保留 slot batch_cpu 例外)。
6Every name in depends_on, cross_iter_depends_on, or same_progress_sync resolves to a task in the same schedule. Declaration-order is no longer required; topological_sort picks a valid execution order.出现在 depends_oncross_iter_depends_onsame_progress_sync 中的名字必须能解析到本 schedule 中的任务。不再要求声明顺序;topological_sort 会算出可执行顺序。
7The intra-progress graph (slot edges + matching-lookahead depends_on + same_progress_sync) is acyclic — checked by running topological_sort; failure raises SchedulingError("cyclic dependency").progress 内的图(slot 边 + 同 lookahead 的 depends_on + same_progress_sync)必须无环 —— 直接调用 topological_sort 检测,失败时抛 SchedulingError("cyclic dependency")
8Cross-stream waits are emitted exactly once per (consumer, producer-stream) pair.每个 (consumer, producer-stream) 对的 cross-stream wait 仅插入一次。

3.3 BatchRing3.3 BatchRing 批次环

The ring holds N in-flight batches' slot stores. at(k) is the slot store at offset k from current. advance() drops offset 0, shifts each store down by one, and appends a fresh empty store at the highest offset. 环里持有 N 个在飞批次的 slot store。at(k) 返回距当前偏移 k 的 slot store。advance() 丢弃 offset 0,把所有 store 向下平移一格,并在最大偏移处追加一个空的新 store。

BatchRing — advance() over 4 iterations (N=3) iter 0 iter 1 iter 2 iter 3 slot[0] slot[1] slot[2] empty empty batch 0 empty batch 0 batch 1 batch 0 batch 1 batch 2 batch 1 batch 2 batch 3 push new batch at slot[max_offset] advance() shifts stores steady state: N batches in flight batch 0's slot evicted (refcounts drop)
Each row is the ring after a successful _run_one_internal_iter. Slot stores carry batch-specific tensors and torchrec contexts; advance() is what enables data flow across iterations without explicit copies. 每行是 _run_one_internal_iter 成功后的环状态。Slot store 持有批次相关的张量与 torchrec context;advance() 让数据无需显式拷贝就能跨迭代流动。

3.4 §4.8 mask formula3.4 §4.8 触发掩码公式

Determines whether a task at lookahead = k fires at iteration i (max_lookahead = max(task.lookahead) across the schedule): 决定 lookahead = k 的任务是否在迭代 i 触发(max_lookahead 取 schedule 中所有 task 的 lookahead 最大值):

fire iff   (max_lookahead - k) ≤ i < M_known + (max_lookahead - k)

where M_known = pulled while pulling is live, otherwise the final M. 其中 M_known 在还在拉数据时等于已拉数 pulled,拉完之后等于最终的 M

  • The prefill phase (i < max_lookahead) runs only deep-lookahead tasks; compute @ lookahead=0 doesn't fire yet. 预热阶段(i < max_lookahead)只跑 lookahead 较深的任务;lookahead=0 上的 compute 还不触发。
  • The steady phase (max_lookahead ≤ i < M) runs all qualifying tasks each iter. 稳态阶段(max_lookahead ≤ i < M)每轮都跑所有满足条件的任务。
  • The drain phase (i ≥ M) runs only shallow-lookahead tasks until the ring is empty. 排空阶段(i ≥ M)只跑 lookahead 较浅的任务,直到环清空。

Engine raises StopIteration when no future iteration can satisfy any task's mask. 当未来任何迭代都不能让任意任务的掩码成立时,引擎抛出 StopIteration

3.5 SchedulablePipeline driver3.5 SchedulablePipeline 驱动器

Public API surface (8 symbols total at commons.pipeline.engine): 公开 API(commons.pipeline.engine 共 8 个符号):

Symbol符号 Visibility可见性 Purpose用途
SchedulablePipelinepublicThe driver. Holds the engine's (schedule, stream_pool, executor) triple and runs progress(batch_iter).驱动器。持有 (schedule, stream_pool, executor) 三元组并执行 progress(batch_iter)
TaskpublicSubclass or Task.from_fn(name, fn, ...).通过子类或 Task.from_fn(name, fn, ...) 创建。
DataSlotpublicSlot identifier.slot 标识。
Schedule, StagepublicDeclarative DAG.声明式 DAG。
StreamPoolpublicNamed CUDA stream registry.命名 CUDA 流注册表。
TaskContextpublicPer-iter handle (ctx.slots, ctx.stream_pool, ctx.iter_count).每轮迭代的句柄(ctx.slotsctx.stream_poolctx.iter_count)。
SequentialExecutor, ThreadedExecutorpublicTwo execution strategies.两种执行策略。
ScheduleValidationErrorpublicRaised by validator.由 validator 抛出。
_seed_first_batchprivatePre-populate ring.at(max_offset) for adapter bootstrap.为适配层引导预填 ring.at(max_offset)

3.6 Event-sync graph & slot_offset3.6 事件同步图与 slot_offset

deps.infer_cross_stream_event_deps(schedule) compiles the four user-facing dependency declarations (reads/writes, depends_on, cross_iter_depends_on, same_progress_sync) into a flat per-consumer list of (producer, producer_stream, slot_offset) triples. At runtime the executor performs ring.at(slot_offset).get_event(producer) + consumer.wait_event(event). Two views below clarify (a) which declarations become topological-sort edges vs. only event-emit edges, and (b) how each edge's slot_offset is derived from ring rotation. deps.infer_cross_stream_event_deps(schedule) 把用户层四种依赖声明(reads/writesdepends_oncross_iter_depends_onsame_progress_sync)编译成每个 consumer 的扁平三元组列表 (producer, producer_stream, slot_offset)。运行期 executor 执行 ring.at(slot_offset).get_event(producer) + consumer.wait_event(event)。下面两张图分别说明:(a) 哪些声明会成为 topological_sort 的边、哪些只 emit 事件;(b) 每种边的 slot_offset 如何由 ring 旋转推出。

Same-progress dependency DAG (topological sort source) solid edges constrain in-progress execution order; dashed edges only emit ring-rotated wait_event start_input_dist la=1, data_dist wait_input_dist la=1, data_dist prefetch_embeddings la=1, prefetch forward la=0, default backward la=0, default aux_stats la=2, stats (hyp.) depends_on depends_on depends_on (cross-la) same_progress_sync cross_iter (-1) Edge legend (color = sync source; line style = topo membership): reads/writes — TOPO if same offset, else ring-rotated; slot_offset = read_slot.batch_offset depends_on (producer.la == consumer.la) — TOPO; slot_offset = consumer.la depends_on (producer.la > consumer.la) — NOT topo, ring-rotated; slot_offset = consumer.la same_progress_sync — TOPO; slot_offset = producer.la cross_iter_depends_on=(X,-N) — NOT topo; slot_offset = consumer.la + neg_offset
Five tasks from HSTU prefetch variant + one hypothetical aux_stats task (not in real HSTU; only here to illustrate cross_iter_depends_on).
Why la=2 for the cross_iter consumer? Two constraints, derived from the slot_offset math:
  1. cross_iter_depends_on=((X,-N)) code-level: slot_offset = consumer.la − N ≥ 0consumer.la ≥ N (deps.py:446, otherwise rejected as out-of-ring).
  2. depends_on future-read: any same-batch depends_on=("Y",) requires consumer.la ≤ Y.la (deps.py:368). So a task that depends_on a low-la training task (backward/optimizer/forward, la=0) is itself constrained to la=0 — and would therefore fail constraint (1).
Combined: a cross_iter consumer cannot have any same-batch depends_on on low-la training tasks. It must be a high-lookahead, prefetch-side task whose only same-batch deps are on other high-la tasks (h2d, prefetch). Streaming / auxiliary statistics is the canonical example.
topological_sort only consumes the three solid edges (start_input_distwait_input_distprefetch_embeddings, plus prefetch_embeddingsbackward); the dashed edges contribute event triples but no in-progress ordering — their producer/consumer ordering across progress() calls is mediated by ring rotation, not by topo sort.
HSTU prefetch 变体里抽出 5 个 task,加 1 个假想的 aux_stats task(HSTU 实际无此任务,这里只为演示 cross_iter_depends_on)。
为什么 cross_iter consumer 是 la=2?由 slot_offset 数学推出两条约束:
  1. cross_iter_depends_on=((X,-N)) 代码层:slot_offset = consumer.la − N ≥ 0consumer.la ≥ N(deps.py:446,否则视为出 ring 拒绝)。
  2. depends_on future-read:同 batch depends_on=("Y",) 要求 consumer.la ≤ Y.la(deps.py:368)。所以任何 depends_on 低 la 训练 task(backward/optimizer/forward,la=0)的 task,自身也被钉在 la=0——这就违反约束 (1)
合起来:cross_iter consumer 在同 batch 内不能 depends_on 任何低 la 训练 task,必须是高 lookahead 的 prefetch 侧任务,同 batch 依赖只能落在其它高 la task(h2d、prefetch)上。流式/辅助统计就是典型用例。
topological_sort 只看 3 条实线边(start_input_distwait_input_distprefetch_embeddings,加 prefetch_embeddingsbackward);虚线边只 emit 事件三元组,不参与 progress 内顺序——它们 producer/consumer 跨 progress 的顺序由 ring 旋转保证,不由 topo sort 排。
Ring rotation — three sync types resolving to slot_offset events recorded by producer at slot[producer.la]; consumer reads from slot[slot_offset] P_(n-1) P_n slot[2] B_(n+1) ● h2d_evt slot[1] B_n ● prefetch_evt slot[0] B_(n-1) forward_evt slot[2] B_(n+2) ● h2d_evt slot[1] B_(n+1) ● prefetch_evt slot[0] B_n ● prefetch_evt(B_n) ring.advance() — slot[k] → slot[k-1] ① forward.depends_on=("prefetch") — slot_offset = consumer.la = 0 (after 1 ring advance) ② backward.same_progress_sync=("prefetch") — slot_offset = producer.la = 1 (no rotation) ③ aux_stats(la=2).cross_iter_depends_on=(("h2d", -1)) — slot_offset = consumer.la + neg_offset = 2 + (-1) = 1 (after 1 ring advance)
Two consecutive progress() calls. ① depends_on across progresses: prefetch records its event for B_n at slot[1] in P_(n-1); after one ring advance that slot rotates to slot[0]; forward in P_n looks up slot[consumer.la=0] and finds prefetch_evt(B_n). ② same_progress_sync within one progress: prefetch records at slot[1] in P_n; backward (also in P_n, no rotation between them) looks up slot[producer.la=1] directly. ③ cross_iter_depends_on: an aux task with la=2 depending on h2d(la=2) from one progress earlier — h2d's event was at slot[2] in P_(n-1); after rotation it's at slot[1] in P_n; aux looks up slot[consumer.la + neg_offset=1]. The shared structure: producer always records at slot[producer.la] in its own progress; consumer's slot_offset depends on which sync type and how many ring advances have happened in between. 两次连续 progress() 调用。① depends_on 跨 progress:prefetch 在 P_(n-1) 的 slot[1] 录 B_n 的 event;ring advance 一次后该 slot 旋到 slot[0];P_n 里的 forward 查 slot[consumer.la=0],拿到 prefetch_evt(B_n)。② same_progress_sync 同 progress 内:prefetch 在 P_n 的 slot[1] 录 event;backward 也在 P_n(中间没有 ring 旋转),直接查 slot[producer.la=1]。③ cross_iter_depends_on:假想 aux 任务 la=2,依赖 1 个 progress 之前的 h2d(la=2)——h2d 的 event 当时在 P_(n-1) 的 slot[2],旋转一次后到 P_n 的 slot[1];aux 查 slot[consumer.la + neg_offset=1]。共同结构:producer 永远在自己 progress 的 slot[producer.la] 录 event;consumer 的 slot_offset 取决于是哪种 sync、中间 ring 转了几次。
Edge type边类型 slot_offset formulaslot_offset 公式 Topo edge?是否拓扑边 deps.py sitedeps.py 位置
reads/writesread_slot.batch_offsetYes if writer.batch_offset == read_slot.batch_offset; else ring-rotatedwriter.batch_offset == read_slot.batch_offset 时是;否则走 ring 旋转deps.py:341-348
depends_on (producer.la == consumer.la)consumer.batch_offsetYesdeps.py:350-390
depends_on (producer.la > consumer.la)consumer.batch_offset (after diff ring advances)No (ring rotation)否(依赖 ring 旋转)deps.py:350-390
cross_iter_depends_on=((X,-N))consumer.batch_offset + neg_offsetNo (ring rotation across N progresses)否(跨 N 次 progress 的 ring 旋转)deps.py:399-463
same_progress_syncproducer.batch_offsetYes (no rotation between producer and consumer)是(producer 与 consumer 间无 ring 旋转)deps.py:476-487

Common structure: producer always records its event at ring.at(producer.batch_offset) (executor.py:165) in its own progress call. The consumer's slot_offset answer is "where has that slot rotated to by the time consumer runs" — and the four formulas above are just the four answers when ring rotation count is 0 (same_progress_sync), X.la − consumer.la (depends_on cross-la), N + X.la − consumer.la (cross_iter), or directly given by read_slot.batch_offset (reads/writes). 共同结构:producer 永远在本次 progress 调用中往 ring.at(producer.batch_offset)executor.py:165)录 event。consumer 的 slot_offset 答案就是"那个 slot 在 consumer 跑的时候已经转到哪儿"——上面四个公式分别是 ring 旋转次数 = 0(same_progress_sync)、X.la − consumer.la(depends_on 跨 la)、N + X.la − consumer.la(cross_iter)、由 read_slot.batch_offset 直接给出(reads/writes)这四种场景。

3.7 same_progress_sync — when to use it3.7 same_progress_sync 的使用场景

Reads as: "this task must wait for X's work in this same progress() call to finish, regardless of which batches X and self are processing." Engine guarantees: 语义:"本 task 等 X 这同一次 progress() 内的工作做完,不管 X 和自己处理的是哪个 batch。"引擎承诺:

  • Adds a topo-DAG edge X → self (X is guaranteed to fire before self within the progress).加一条 topo-DAG 边 X → self(保证 X 在 progress 内先于 self 触发)。
  • For threaded execution, adds a CPU-side threading.Event wait (cross-thread case).线程执行时跨线程加一条 CPU 端 threading.Event 等待。
  • For cross-stream, emits wait_event at slot[producer.la]; same-stream relies on CUDA stream FIFO.跨流时在 slot[producer.la] emit wait_event;同流靠 CUDA stream FIFO 保序。

Three known use cases: 三种已知用法:

# Use case用法 Example例子
(a) Stream coherency on out-of-slot mutable state. Two tasks touch a shared object NOT declared in any reads/writes slot (e.g. dynamicemb cache, torchrec module attribute, global counter). Engine cannot infer a dataflow edge → use same_progress_sync to make the wait explicit. This is the original / canonical use; mirrors the legacy default_stream.wait_stream(prefetch_stream) pattern.slot 之外可变状态的 stream 协同。两个 task 都触碰一个 reads/writes 没声明的对象(dynamicemb cache、torchrec module 属性、全局计数器等)。引擎看不见数据流边 → 用 same_progress_sync 把这条等待写出来。这是原始的、典型的用法;对应遗留代码的 default_stream.wait_stream(prefetch_stream) 模式。 backward.same_progress_sync=("prefetch_embeddings",) in HSTU prefetch variant — drains prefetch's GPU work before backward / before the next progress.HSTU prefetch 变体里的 backward.same_progress_sync=("prefetch_embeddings",) —— 在 backward / 下次 progress 之前把 prefetch 的 GPU 工作排空。
(b) Explicit form of the $\Delta = 0$ cross_iter_depends_on. When the user's mental model is "this la=c task waits for that la=p task's current-progress output" with $c = p + N$ ($N>0$), they may equivalently write same_progress_sync=(X,) for clarity, OR write cross_iter_depends_on=((X, -N),) with the batch-flow phrasing — the engine auto-promotes the latter to the same mechanical contract (same topo edge, same slot lookup, same CPU edge). Use same_progress_sync when the ordering intent is naturally "wait for X in this progress, regardless of batch."$\Delta = 0$ cross_iter_depends_on 的显式写法。当用户思维是"我这个 la=c 的 task 等那个 la=p 的 task 在本次 progress 的输出",$c = p + N$ ($N>0$),可以等价地用 same_progress_sync=(X,)(更清晰)或 cross_iter_depends_on=((X, -N),)(batch-flow 措辞)——引擎将后者自动提升为前者的机理(相同 topo 边、相同 slot 查询、相同 CPU 边)。当顺序意图自然就是"在本 progress 内等 X 完成、不管 batch"时用 same_progress_sync forward.same_progress_sync=("update",) when fwd.la=1, update.la=0 — fwd uses post-update weights from the same progress. See tests/commons/test_engine_same_progress_sync_correctness.py for verified GPU equivalence to a manual la=1-pipelined SGD baseline.forward.same_progress_sync=("update",),fwd.la=1, update.la=0 —— fwd 用本次 progress update 后的 weight。tests/commons/test_engine_same_progress_sync_correctness.py 在 GPU 上验证了与手动 la=1 流水线 SGD baseline 完全等价。
(c) Same-progress logging / metric aggregation. A consumer (e.g. metric writer) wants the producer's current-progress event for coherent measurement, even if the two don't share any slot or out-of-slot state. Less common; aux statistics tasks typically.同 progress 内的 logging / 指标汇总。consumer(如 metric writer)想拿到 producer 本次 progress 的 event 做一致性测量,即使两者没共享 slot 或其它状态。比较少见;多见于辅助统计任务。 Hypothetical metric_writer.same_progress_sync=("forward",) for capturing forward stats per-progress.假想 metric_writer.same_progress_sync=("forward",) 用于按 progress 抓取 forward 的统计量。
(d) Implicit, via cross_iter_depends_on Δ=0 auto-promotion. If the user's mental model is batch-flow ("I want X's work for batch K−N"), they may write cross_iter_depends_on=((X, -N),) directly. When the lookahead math gives Δ=0 (consumer.la == producer.la + N), the engine auto-promotes the declaration to the same_progress_sync mechanical contract — same topo edge, same CPU edge, same slot[producer.la] event lookup. The user does not need to know same_progress_sync as a separate field; it works either way.通过 cross_iter_depends_on Δ=0 自动提升隐式触发。用户思维如果是 batch-flow("我要 X 在 batch K−N 的工作"),可以直接写 cross_iter_depends_on=((X, -N),)。当 lookahead 数学给出 Δ=0(consumer.la == producer.la + N),引擎自动把这个声明转成 same_progress_sync 的机理——相同的 topo 边、相同的 CPU 边、相同的 slot[producer.la] event 查询。用户不需要知道 same_progress_sync 是独立字段;两种写法效果完全一致。 forward.cross_iter_depends_on=(("update", -1),) with fwd.la=1, update.la=0 → Δ=0 auto-promoted. Equivalent to use case (b) but using the cross_iter syntax. See SPEC_cross_iter_delta0_autoconvert.md.forward.cross_iter_depends_on=(("update", -1),),fwd.la=1, update.la=0 → Δ=0 自动提升。与 (b) 等价,但用 cross_iter 语法。详见 SPEC_cross_iter_delta0_autoconvert.md

Direction convention. The field lives on the consumer side; X.same_progress_sync=() always lists producers the consumer is waiting on. producer.la and consumer.la may differ — the field is la-agnostic (in contrast to depends_on which requires producer.la ≥ consumer.la to avoid future-read). 方向约定。字段写在 consumer 一侧;X.same_progress_sync=() 列的都是 X 等待的 producerproducer.laconsumer.la 可以不相等——这个字段对 lookahead 不敏感(区别于 depends_on 必须 producer.la ≥ consumer.la 以避免 future-read)。

Distinguishing from depends_on and cross_iter_depends_on: depends_oncross_iter_depends_on 的区别:

  • depends_on=("X",) — same-batch logical wait. Both must process the same batch K. Constrained by producer.la ≥ consumer.la.depends_on=("X",) —— 同 batch 逻辑等待。两者必须处理同一个 batch K。约束 producer.la ≥ consumer.la
  • cross_iter_depends_on=((X, -N),) — different-batch wait. "Consumer on batch K waits for X on batch K−N." Δ ≥ 1 is the cross-progress, ring-rotated case; Δ=0 (i.e. producer.la + N == consumer.la) is auto-promoted to the same_progress_sync mechanical contract — same topo edge, CPU edge, and current-progress event lookup at slot[producer.la] (per SPEC_cross_iter_delta0_autoconvert.md).cross_iter_depends_on=((X, -N),) —— 跨 batch 等待。"consumer 在 batch K 时等 X 在 batch K−N 的工作"。Δ ≥ 1 走 ring-rotated 跨 progress 路径;Δ=0(即 producer.la + N == consumer.la)会被自动 promote 为 same_progress_sync 的机制契约——相同的 topo 边、CPU 边、以及在 slot[producer.la] 上 lookup 当前 progress 的 event(详见 SPEC_cross_iter_delta0_autoconvert.md)。
  • same_progress_sync=("X",) — same-progress wait, batch-agnostic. The two ARE allowed to process different batches; what matters is X completes its current-progress GPU work before consumer reads.same_progress_sync=("X",) —— 同 progress 等待,不关心 batch。两者允许处理不同 batch;重点是 X 在本次 progress 的 GPU 工作完成后 consumer 才读。

4. Threaded executor (Problem #3)4. 多线程执行器(Problem #3)

ThreadedExecutor dispatches tasks to a concurrent.futures.ThreadPoolExecutor with one worker per thread_id in the active thread_map. The pool resizes on demand if a later stage needs more thread groups than the current pool has. ThreadedExecutor 把任务派发到 concurrent.futures.ThreadPoolExecutor,每个活跃 thread_map 中的 thread_id 对应一个 worker。如果后续 stage 需要更多线程组,线程池会按需扩容。

4.1 thread_map strategies4.1 thread_map 策略

Spec规约 Resolution解析方式 Use case使用场景
None or "by_stream" (default默认) thread_id = task.stream or "default" Common case: one CPU thread per CUDA stream常见情况:每个 CUDA 流一个 CPU 线程
"per_task" thread_id = task.name Maximum parallelism; useful for stress tests最大并行度;适合压力测试
dict[str, str] {task_name: thread_id} with "default" fallback未命中走 "default" Custom pinning (e.g. HSTU's {io, compute})自定义绑定(例如 HSTU 的 {io, compute}
Callable[[Task], str] User function用户提供的函数 Dynamic policies based on task fields基于任务字段的动态策略

Threads and CUDA streams are decoupled: a task's stream selects the CUDA stream context inside StreamPool.use(); thread_map selects the CPU worker that submits the task. The same stream may be submitted from different threads (different iterations or — with caution — same iteration). 线程与 CUDA 流是 解耦的:任务的 stream 字段决定 StreamPool.use() 内的 CUDA 流上下文;thread_map 决定提交任务的 CPU 线程。同一个流可以来自不同线程(来自不同迭代,或谨慎地来自同一迭代)。

4.2 CPU dependency computation4.2 CPU 依赖计算

_compute_cpu_deps builds a dict {task_name → [predecessor_completion_events]} by walking the topo-sorted active task list and adding four edge types: _compute_cpu_deps 沿拓扑序遍历活跃任务,构造 {task_name → [predecessor_completion_events]} 字典,添加四类边:

  1. Slot-based基于 slot: any writer of a slot the task reads (excluding self). 该任务所读 slot 的任意 writer(自己除外)。
  2. Same-batch logical (depends_on)同 batch 逻辑(depends_on: every task.depends_on name with matching lookahead that is in active. Cross-lookahead names go through ring rotation as a cross-stream wait, not a CPU edge. task.depends_on 中且 lookahead 相等、又在活跃集合中的任务名。跨 lookahead 的名字走 ring 平移变成跨流 wait,不进 CPU 边。
  3. Same-progress sync (same_progress_sync)同 progress 协同(same_progress_sync: every task.same_progress_sync name that is in active — same-progress GPU coherency wait, regardless of lookahead. task.same_progress_sync 中且在活跃集合中的任务名 —— 同一次 progress 内的 GPU 协同等待,与 lookahead 无关。
  4. Same-stream FIFO同流 FIFO: the most recent task on the same stream in topological order — preserves CUDA stream FIFO semantics when thread_map splits same-stream tasks across threads. 拓扑序中同一个流上的最近一个任务 —— 当 thread_map 把同流任务拆到不同线程时,这条边保住了 CUDA 流的 FIFO 语义。

The cross-thread filter then keeps only edges where thread_id_of(producer) ≠ thread_id_of(consumer); same-thread predecessors are already serialized by their thread loop. 随后通过跨线程过滤,仅保留 thread_id_of(producer) ≠ thread_id_of(consumer) 的边;同线程的前驱已经被线程内的循环串行化。

4.3 NCCL ordered lock4.3 NCCL 顺序锁

Tasks tagged nccl=True acquire a ticket-based lock that guarantees topological-order execution across all ranks. Tickets are assigned by the topo-sorted task tuple (deterministic across ranks because the schedule and tie-break are deterministic). Without this, a multi-threaded executor on different ranks may submit collectives in different orders → mismatched NCCL calls → deadlock. 标了 nccl=True 的任务要获取一把基于票据的锁,保证所有 rank 都按拓扑序执行集合通信。票据由拓扑排序后的任务元组分配(schedule 与 tie-break 都是确定性的,所以各 rank 一致)。如果没有它,多线程执行器在不同 rank 上可能以不同顺序提交集合操作 → NCCL 调用错配 → 死锁。

_NcclOrderedLock — state & transitions next_ticket = N _failed = false holding ticket N (running task) aborted _failed = true waiting in cond.wait() acquire(N) release() acquire(K>N) cond.wait() release notify_all + K==next release(failed=true) abort() — wakes all waiters RuntimeError
Ticket-based FIFO with two failure-aware exits: release(failed=True) when the running task raises; abort() when an unrelated worker dies before the expected ticket release would have happened. 票据 FIFO 设计了两条失败退出:当前任务抛错时走 release(failed=True);当某个无关 worker 在预期的票据释放之前先死掉时由 abort() 触发。

4.4 Cancellation & abort4.4 取消与中止

Each execute_stage call sets up: 每次 execute_stage 调用会准备:

  • A threading.Event per active task — signaled when the task finishes (or its thread errors out).每个活跃任务一个 threading.Event —— 任务完成(或所在线程出错)时置位。
  • A shared cancelled Event — set by the first failing thread.一个共享的 cancelled Event —— 由第一个失败的线程置位。
  • An errors_lock + errors list to collect exceptions across threads.errors_lock + errors 列表用于跨线程收集异常。

The except handler in _run_thread_chain does three things in order: _run_thread_chainexcept 分支按顺序做三件事:

  1. cancelled.set()stops further task starts on all chains.阻止所有线程链上后续任务的启动。
  2. self._nccl_lock.abort()wakes any worker blocked inside acquire(ticket > current).唤醒任何阻塞在 acquire(ticket > current) 的 worker。
  3. Append exception to errors; the finally block sets every remaining task's completion event so peer threads waiting on cpu_deps Events don't deadlock.把异常追加进 errorsfinally 块把剩余任务的完成事件全部置位,避免其他线程在 cpu_deps 上死锁。

After the stage barrier (future.result() on every chain), execute_stage raises errors[0]. stage 屏障(对每条链 future.result())之后,execute_stage 抛出 errors[0]

5. HSTU adapter (Problem #2)5. HSTU 适配层(Problem #2)

5.1 14 tasks5.1 14 个任务

HSTUPipeline._build_schedule assembles a single-stage schedule from these task factories. Author declaration order is just a tie-break for incomparable tasks — actual execution order is decided by topological_sort over reads/writes + depends_on + same_progress_sync. HSTUPipeline._build_schedule 用这些任务工厂拼出一个单 stage 的 schedule。作者的声明顺序只在不可比的任务之间作为 tie-break 用;真正的执行顺序由 topological_sort 基于 reads/writes + depends_on + same_progress_sync 决定。

# Task任务 lookahead stream nccl Purpose用途
1h2d2 (np) / 2 (pf)memcpyCPU→GPU + create torchrec_ctxCPU→GPU + 创建 torchrec_ctx
2start_shuffle2 / 2memcpyconditionalKK shuffler phase 1 (background thread)KK shuffler 第 1 阶段(后台线程)
3finish_shuffle2 / 2memcpyconditionalKK shuffler phase 2 (AllGather + index_select)KK shuffler 第 2 阶段(AllGather + index_select)
4start_input_dist1 / 1data_disttorchrec splits all-to-alltorchrec splits all-to-all
5wait_input_dist1 / 1data_disttensors-awaitable wait等待 tensors-awaitable 完成
6zero_grad0defaultoptimizer.zero_grad + Megatron buffer resetoptimizer.zero_grad + Megatron buffer 重置
7global_tokens_allreduce0defaultPer-rank loss-token AllReduceper-rank loss-token AllReduce
8nccl_safety_barrier0defaultwait_stream(memcpy) for shuffle ordering为 shuffle 顺序插入 wait_stream(memcpy)
9forward0defaultset_module_context + model(batch)set_module_context + model(batch)
10prefetch_embeddings (prefetch only)1prefetchcache prefetch + populate module_input_post_prefetch缓存 prefetch + 填充 module_input_post_prefetch
11backward0default(loss × dp_size / global_tokens).backward()
12finalize_model_grads0defaultMegatron TP grad finalizeMegatron TP 梯度收尾
13optimizer_step0defaultoptimizer.step + write step_resultoptimizer.step + 写 step_result
14watchdog_step0defaultOptional CUDA mem watchdog可选的 CUDA 内存 watchdog

np = non-prefetch variant, pf = prefetch variant. Both keep max_lookahead = 2 at prefetch_depth=1 (matches legacy 3-batch in-flight). For prefetch_depth=K, max_lookahead = K+1; in_flight_batches = max_lookahead + 1. np = 无 prefetch 变体,pf = prefetch 变体。两者在 prefetch_depth=1 时都是 max_lookahead = 2(与遗留的 3 批在飞匹配)。prefetch_depth=Kmax_lookahead = K+1in_flight_batches = max_lookahead + 1

5.2 HSTU_DEFAULT_THREAD_MAP5.2 HSTU_DEFAULT_THREAD_MAP

Two thread groups: 两个线程组:

HSTU_DEFAULT_THREAD_MAP = {
    # io thread — pure data movement
    "h2d": "io",
    "start_shuffle": "io",
    "finish_shuffle": "io",
    # compute thread — everything that touches shared module state
    "start_input_dist": "compute",
    "wait_input_dist": "compute",
    "prefetch_embeddings": "compute",
    "zero_grad": "compute",
    "global_tokens_allreduce": "compute",
    "nccl_safety_barrier": "compute",
    "forward": "compute",
    "backward": "compute",
    "finalize_model_grads": "compute",
    "optimizer_step": "compute",
    "watchdog_step": "compute",
}
Why two threads, not many为什么是两个线程而不是更多: Two threads (not many) lets H2D / shuffle on "io" overlap NCCL + GPU compute on "compute", without paying the per-task pool dispatch overhead of full thread-per-task. forward is the only writer of the shared module.forward._context attribute, and runs as a single task on "compute", so there is no cross-thread race to defend against. 分两线程(而非更多)是为了让 "io" 上的 H2D / shuffle 与 "compute" 上的 NCCL + GPU 计算重叠,又不付 thread-per-task 的调度开销。共享属性 module.forward._context 的唯一写者是 forward 这一个 task,固定跑在 "compute" 线程上,没有跨线程竞态需要防。

5.3 Bootstrap flow5.3 引导流程

The first call to HSTUPipeline.progress() runs a one-time bootstrap because torchrec's _rewrite_model needs a real batch for FX tracing. Sequence diagram in §6.1. 首次调用 HSTUPipeline.progress() 会执行一次性引导,因为 torchrec 的 _rewrite_model 需要一个真实 batch 做 FX 追踪。时序图见 §6.1

Key invariants: 关键不变量:

  • StreamPool built first先建 StreamPoolH2D + bootstrap collectives run on the engine's actual streams, not throwaway streams.H2D + 引导阶段的集合通信都跑在引擎真正用的流上,而非临时流。
  • Real per-batch context真实的 per-batch contextbootstrap context is created by state.create_torchrec_ctx() with version=1 (the v0 single-shared-context branch is forbidden).引导 context 由 state.create_torchrec_ctx() 创建,version=1(v0 单共享 context 分支被禁用)。
  • Peek batch is seeded into the ringpeek batch 被植入 ring_pipe._seed_first_batch(...) populates ring.at(max_offset) with {batch_cpu, batch_gpu, torchrec_ctx, shuffled_batch (identity only)}; idempotent guards in h2d/start_input_dist/finish_shuffle skip the work that's already been done._pipe._seed_first_batch(...){batch_cpu, batch_gpu, torchrec_ctx, shuffled_batch(仅 identity)} 写入 ring.at(max_offset);h2d/start_input_dist/finish_shuffle 中的幂等保护会跳过已完成的工作。
  • No 1-batch loss不丢一个 batchthe peek batch flows through forward/backward/optimizer like any other batch.peek batch 与普通 batch 一样走完 forward/backward/optimizer。

5.4 native vs prefetch variant5.4 native 与 prefetch 变体

Same task list, plus or minus prefetch_embeddings. The non-prefetch variant uses PipelinedForward which reads input_dist_tensors_requests directly. The prefetch variant uses PrefetchPipelinedForward which reads from module_input_post_prefetch, populated in the previous iter by prefetch_embeddings. 两者任务列表相同,区别是有没有 prefetch_embeddings。无 prefetch 变体用 PipelinedForward,直接读 input_dist_tensors_requests;prefetch 变体用 PrefetchPipelinedForward,读上一轮 prefetch_embeddings 写入的 module_input_post_prefetch

Critical ordering (within one progress)关键顺序(同一次 progress 内): prefetch_embeddings must run after forward in every progress, not before. Reason: dynamicemb's _prefetch_outstanding_keys counter is incremented by prefetch() and decremented at end of forward. The ordering is enforced by backward.same_progress_sync=("prefetch_embeddings",): backward (and therefore this progress's whole compute chain after forward) waits for prefetch's GPU work in the same progress to finish — a stream coherency wait, not a batch dependency. Within one progress: prefetch_embeddings 必须在每次 progress 中跑在 forward 之后,不能反过来。原因:dynamicemb 的 _prefetch_outstanding_keys 计数器在 prefetch() 时加,在 forward 结束时减。这一顺序由 backward.same_progress_sync=("prefetch_embeddings",) 强制:backward(以及由此 progress 中 forward 之后的整条计算链)等本次 progress 内 prefetch 的 GPU 工作完成 —— 是 stream 协同等待,不是 batch 依赖。同一次 progress 内:

  • forward → prefetch (correct)forward → prefetch(正确): forward consumes prev-iter's prefetched data first (counter decremented), then this-iter's prefetch adds new keys. Steady-state outstanding ≈ 1 batch.forward 先消费上一轮 prefetch 的数据(计数减),然后本轮 prefetch 新增 key。稳态在飞 ≈ 1 个 batch。
  • prefetch → forward (wrong)prefetch → forward(错误): prefetch adds new keys before forward consumes prev's. Steady-state outstanding ≈ 2 batches → cache overflow.prefetch 在 forward 消费前一轮之前就加新 key。稳态在飞 ≈ 2 个 batch → cache 溢出。

Why same_progress_sync and not depends_on? prefetch_embeddings has lookahead=1 (it pre-loads next batch K+1) while backward has lookahead=0 (consumes batch K). They process different batches, so a same-batch depends_on would be incorrect; what we actually need is "drain prefetch's GPU work in this progress before the next progress starts" — exactly the same-progress GPU coherency intent. 为什么用 same_progress_sync 而不是 depends_onprefetch_embeddingslookahead=1(提前为 K+1 batch 加载),而 backwardlookahead=0(消费 batch K)。两者处理的是不同 batch,写 depends_on(同 batch 等待)语义就错了;真正要表达的是"在本次 progress 结束前,把 prefetch 的 GPU 工作排空" —— 正好是同 progress 的 GPU 协同意图。

5.5 Auto-scheduler v2: joint search5.5 自动调度器 v2:联合搜索

The v2 design searches the coupled space thread_map × stream_map × fire_ordering for the HSTU schedule. It does not replace the current lookahead-only scheduler in autosched/fire_order.py; instead, every candidate schedule is first materialized with its proposed thread/stream/fire-order choices, then passed through auto_assign_lookaheads(..., max_in_flight=5). The returned lookahead map becomes part of the scored candidate. v2 设计在 HSTU schedule 上联合搜索 thread_map × stream_map × fire_ordering。它不会替换当前 autosched/fire_order.py 中的 lookahead-only 调度器;每个候选 schedule 会先按候选的线程、流、触发顺序物化,再交给 auto_assign_lookaheads(..., max_in_flight=5)。返回的 lookahead map 会成为该候选的一部分并参与打分。

Axis维度 Candidate source候选来源 Bound / safety rule边界 / 安全规则
thread_mapNamed HSTU presets: default, by_stream, io_prefetch_compute, io_data_dist_compute, io_data_dist_prefetch_compute.HSTU 命名预设:defaultby_streamio_prefetch_computeio_data_dist_computeio_data_dist_prefetch_computeper_task is rejected because the 14-task HSTU graph would exceed max_threads ≤ 4.per_task 被拒绝,因为 14 个 HSTU 任务会超过 max_threads ≤ 4
stream_mapPrep groups start from current streams: memcpy for H2D/shuffle, data_dist for input distribution, prefetch for embedding prefetch.准备阶段任务组从现有流开始:H2D/shuffle 用 memcpy,input distribution 用 data_dist,embedding prefetch 用 prefetch。forward, backward, finalize_model_grads, and optimizer_step are frozen on the default stream at lookahead 0.forwardbackwardfinalize_model_gradsoptimizer_step 固定在 default stream 且 lookahead 为 0。
fire_orderingResource-aware topological tie-break policies: critical-chain first, NCCL queue early, PCIe prep early, longest off-default first, or min-slack first.资源感知的拓扑排序 tie-break 策略:critical-chain first、NCCL queue early、PCIe prep early、longest off-default first、min-slack first。Ordering never removes DAG edges and preserves same-communicator NCCL ticket order.排序不会删除 DAG 边,并保留同 communicator NCCL 的 ticket 顺序。

The objective is modeled steady-state latency: simulate several warmup/steady/drain progress calls with the task costs from hstu_prefetch_caching_1node.json, then score the interval between successive optimizer_step completions. The overlap matrix is the resource model: same stream serializes, same NCCL communicator serializes even on different streams, PCIe-bound tasks contend, and all other pairs may overlap if the DAG permits. 目标函数是建模后的稳态延迟:用 hstu_prefetch_caching_1node.json 中的任务 cost 模拟若干 warmup/steady/drain progress,再对连续 optimizer_step 完成之间的间隔打分。overlap matrix 是资源模型:同 stream 串行,同 NCCL communicator 即使在不同 stream 上也串行,PCIe-bound 任务竞争带宽,其余任务只要 DAG 允许就可重叠。

# Pseudocode sketch only; full design is in tasks/SPEC_autosched_joint_search.md.
beam = [current_hstu_candidate]
for round in search_rounds:
    expanded = mutate_thread_map(beam) + mutate_stream_map(beam) + mutate_fire_ordering(beam)
    for candidate in expanded:
        schedule = rebuild_schedule(candidate.thread_map, candidate.stream_map, candidate.fire_ordering)
        candidate.lookahead_map = auto_assign_lookaheads(schedule, cost_model, max_in_flight=5)
        candidate.score = modeled_optimizer_step_interval(schedule, candidate.lookahead_map)
    beam = top_k(valid_candidates, by="score")
Constraint summary约束摘要: max_threads ≤ 4, max_in_flight ≤ 5, bit-exact default-stream anchors at la=0, and same-communicator NCCL serialization from the existing overlap matrix. max_threads ≤ 4max_in_flight ≤ 5、bit-exact 任务固定 default stream 且 la=0,并沿用现有 overlap matrix 中的同 communicator NCCL 串行规则。

Full pseudocode (6 procedures)完整伪代码(6 个过程)

Python-like, but not runnable. The pseudocode below mirrors tasks/SPEC_autosched_joint_search.md; use <details> to expand each procedure individually. Python 风格但非可运行代码;与 tasks/SPEC_autosched_joint_search.md 一致。点击下方 <details> 展开每个过程。

1. JOINT_HSTU_SEARCHmain beam loop主 beam 循环
procedure JOINT_HSTU_SEARCH(
    base_schedule,
    cost_model,
    hstu_thread_presets,
    beam_width,
    max_threads = 4,
    max_in_flight = 5,
):
    require max_threads <= 4
    require max_in_flight <= 5

    frozen = {
        "forward",
        "backward",
        "finalize_model_grads",
        "optimizer_step",
    }

    legal_thread_presets = []
    for preset_name, preset in hstu_thread_presets:
        if preset_name == "per_task":
            continue                       # exceeds max_threads <= 4
        legal_thread_presets.append((preset_name, preset))

    canonical = Candidate(
        thread_map    = HSTU_DEFAULT_THREAD_MAP,
        stream_map    = CURRENT_HSTU_STREAMS(base_schedule),
        fire_ordering = DECLARATION_ORDER(base_schedule),
        lookahead_map = CURRENT_LOOKAHEADS(base_schedule),
    )

    canonical = NORMALIZE_AND_SCORE(
        canonical, base_schedule, cost_model,
        frozen, max_threads, max_in_flight,
    )

    beam = TOP_BY_SCORE([canonical], beam_width)
    best = canonical

    for round in 1..SEARCH_ROUNDS:
        expanded = []
        for candidate in beam:
            expanded += MUTATE_THREAD_MAP(candidate, legal_thread_presets)
            expanded += MUTATE_STREAM_MAP(candidate)
            expanded += MUTATE_FIRE_ORDERING(candidate)

        scored = []
        for candidate in UNIQUE(expanded):
            normalized = NORMALIZE_AND_SCORE(
                candidate, base_schedule, cost_model,
                frozen, max_threads, max_in_flight,
            )
            if normalized is valid:
                scored.append(normalized)

        beam = TOP_BY_SCORE(DOMINANCE_PRUNE(scored), beam_width)

        if beam is empty:                    break
        if SCORE(beam[0]) < SCORE(best):    best = beam[0]
        if NO_SCORE_IMPROVEMENT_FOR_PATIENCE_ROUNDS():  break

    return best
2. NORMALIZE_AND_SCOREcomposes with auto_assign_lookaheads复用现有 auto_assign_lookaheads 作为 lookahead oracle
procedure NORMALIZE_AND_SCORE(
    candidate, base_schedule, cost_model,
    frozen, max_threads, max_in_flight,
):
    if VIOLATES_FROZEN_STREAM_OR_LOOKAHEAD(candidate, frozen):
        return invalid

    resolved_thread_map = RESOLVE_THREAD_MAP(candidate.thread_map,
                                            candidate.stream_map)
    if COUNT_THREADS(resolved_thread_map) > max_threads:
        return invalid

    provisional_schedule = REBUILD_SCHEDULE(
        base_schedule,
        stream_map    = candidate.stream_map,
        fire_ordering = candidate.fire_ordering,
        lookahead_map = CURRENT_LOOKAHEADS(base_schedule),
    )

    try:
        derived_lookahead = auto_assign_lookaheads(
            provisional_schedule, cost_model,
            max_in_flight = max_in_flight,
        )
    except SchedulerRejectsCandidate:
        return invalid

    if ANY(derived_lookahead[t] != 0 for t in frozen):
        return invalid

    scored_schedule = REBUILD_SCHEDULE(
        base_schedule,
        stream_map    = candidate.stream_map,
        fire_ordering = candidate.fire_ordering,
        lookahead_map = derived_lookahead,
    )

    overlap_matrix = compute_overlap_matrix(scored_schedule.tasks)
    latency = RESOURCE_LIST_SCHEDULE_LATENCY(
        scored_schedule, cost_model,
        resolved_thread_map, overlap_matrix, max_in_flight,
    )

    candidate.lookahead_map = derived_lookahead
    candidate.score         = latency
    return candidate
3. MUTATE_THREAD_MAPenumerate the 5 surviving presets枚举 5 个合法 preset
procedure MUTATE_THREAD_MAP(candidate, legal_thread_presets):
    for preset_name, preset in legal_thread_presets:
        yield candidate with thread_map = preset
4. MUTATE_STREAM_MAPgroup-level moves, not full Cartesian按 prep group 整体迁移,避免笛卡尔爆炸
procedure MUTATE_STREAM_MAP(candidate):
    movable_groups = [
        {"h2d", "start_shuffle", "finish_shuffle"},
        {"start_input_dist", "wait_input_dist"},
        {"prefetch_embeddings"},
    ]

    for group in movable_groups:
        if group is absent from schedule:
            continue

        for stream_choice in LEGAL_STREAM_CHOICES(group):
            next_map = COPY(candidate.stream_map)
            for task in group:
                if task in BIT_EXACT_FROZEN_TASKS:
                    continue                  # never move forward/bwd/opt/finalize
                next_map[task] = stream_choice

            yield candidate with stream_map = next_map
5. MUTATE_FIRE_ORDERINGtopological tie-break policies + adjacent-pair swaps5 种拓扑 tie-break 策略 + 相邻可换对的交换
procedure MUTATE_FIRE_ORDERING(candidate):
    policies = [
        CRITICAL_DEFAULT_CHAIN_FIRST,
        LONGEST_OFF_DEFAULT_FIRST,
        NCCL_QUEUE_EARLY_WITH_TICKET_ORDER,
        PCIE_PREP_EARLY,
        MIN_SLACK_FIRST,
    ]

    for policy in policies:
        priority = BUILD_PRIORITY_VECTOR(candidate, policy)
        yield candidate with fire_ordering = priority

    # local search refinement: any topo-incomparable adjacent pair
    # in the current order is allowed to swap
    for adjacent_pair in INCOMPARABLE_ADJACENT_PAIRS(candidate.fire_ordering):
        yield candidate with that pair swapped
6. DOMINANCE_PRUNEconservative dedup before scoring打分前的保守去重
procedure DOMINANCE_PRUNE(candidates):
    grouped = GROUP_BY(
        candidates,
        keys = (thread_map, stream_map, lookahead_map),
    )
    for group in grouped:
        keep the lowest-score fire_ordering
        discard candidates with same or worse score and no lower thread count

Full SPEC (objective definition, constraint encoding, complexity bound, worked example): tasks/SPEC_autosched_joint_search.md (685 lines). 完整 SPEC(目标函数、约束编码、复杂度边界、worked example):tasks/SPEC_autosched_joint_search.md(685 行)。

Interactive visualization (Diagram 5: search-space landscape). 交互式可视化(图 5:搜索空间)。

Thread-map preset comparison (6 hand-coded presets, stream × thread × lookahead per HSTU task). 6 个 thread_map preset 对照(每个 HSTU task 的 stream × thread × lookahead 布局)。

6. Workflow sequence diagrams6. 工作流时序图

6.1 First progress() call (bootstrap)6.1 首次 progress() 调用(引导)

Sequence: HSTUPipeline.progress() — first call User HSTUPipeline torchrec _rewrite_model StreamPool SchedulablePipeline Engine ring progress(dataloader_iter) next(iter) ← peek_cpu _build_schedule() → (schedule, pool) pool.use("memcpy") + _to_device(peek_cpu) → peek_gpu peek_gpu.record_stream(default, data_dist, prefetch?) _rewrite_model(peek_gpu, dist=data_dist, default=default) FX trace + install PipelinedForward monkey-patches → (pipelined_modules, model, ...) data_dist.wait_stream(memcpy) _start_data_dist(modules, peek, peek_ctx) [in data_dist stream ctx] _override_input_dist_forwards(modules) SchedulablePipeline(schedule, pool, executor=ThreadedExecutor(...)) _seed_first_batch({batch_cpu, batch_gpu, torchrec_ctx, ...}) ring.at(max_offset).set(...) progress(dataloader_iter) → (loss, tokens, output)
Bootstrap is a one-time cost on first progress(). The peek batch is fully integrated into the engine via _seed_first_batch; subsequent calls skip steps 2–12 and go straight to SchedulablePipeline.progress(). 引导只在首次 progress() 时发生一次。peek batch 通过 _seed_first_batch 完整融入引擎;后续调用直接跳过第 2–12 步进入 SchedulablePipeline.progress()

6.2 Steady-state iteration6.2 稳态迭代

Sequence: steady-state iter under HSTU_DEFAULT_THREAD_MAP SchedulablePipeline ThreadedExecutor io thread compute thread _NcclLock CUDA streams execute_stage(stage, ctx, iter, mask) partition by thread, compute cpu_deps par [io ‖ compute] submit io chain h2d (memcpy stream) enqueue H2D copy start_shuffle (nccl) acquire(0) AllGather workloads release(0) finish_shuffle (nccl) acquire(1) AllGather batch + index_select release(1) signal completion[h2d/start/finish_shuffle].set() submit compute chain start_input_dist → wait_input_dist → ... → forward (serial within thread; nccl tasks acquire tickets 2,3 from _NcclLock) backward (nccl) — same_progress_sync=("prefetch_embeddings",) [pf only] wait_stream(prefetch); DDP AllReduce finalize_model_grads → optimizer.step → watchdog barrier: future.result() on every chain
The par fragment marks parallel execution: io chain and compute chain run concurrently. Within each chain, tasks fire in topological order (declaration order is the tie-break). NCCL-tagged tasks acquire tickets from _NcclOrderedLock in that same topological order (0, 1, 2, 3 here), guaranteeing deterministic submission order across ranks. par 片段标记并行执行:io 链与 compute 链并发跑。链内任务按拓扑序触发(声明顺序作 tie-break)。带 NCCL 标记的任务按相同拓扑序从 _NcclOrderedLock 取票(这里是 0、1、2、3),从而在所有 rank 上保证确定的提交顺序。

6.3 NCCL ticket lock6.3 NCCL 票据锁

Sequence: two threads acquiring NCCL tickets io thread _NcclOrderedLock compute thread acquire(0) [next_ticket=0, immediate] acquire(2) → cond.wait() [BLOCKS] release() [next_ticket=1, notify_all] acquire(1) [matches, immediate] compute woken; sees next=1, K=2; cond.wait() again release() [next_ticket=2, notify_all] compute woken; sees next=2 == K; proceeds return from acquire(2) release() [next_ticket=3, notify_all]
Tickets 0 and 1 are issued to NCCL tasks earlier in topological order (on io thread); ticket 2 went to a NCCL task later in topological order (on compute thread). The lock guarantees ticket 0 → 1 → 2 submission order across ranks even though the threads run concurrently. 票据 0 和 1 发给拓扑序较早的 NCCL 任务(在 io 线程);票据 2 给的是拓扑序较晚的 NCCL 任务(在 compute 线程)。即使两个线程并发执行,锁仍保证所有 rank 上的提交顺序为 0 → 1 → 2。

6.4 Multi-batch overlap (timing diagram)6.4 多批次重叠(时序图)

Pipeline timing — non-prefetch HSTU, in_flight=3 i=0 (prefill) i=1 (prefill) i=2 (steady) i=3 (steady) i=M (drain) i=M+1 (drain) offset 2 h2d/shuffle offset 1 input_dist offset 0 forward+bwd+opt batch 0 batch 1 batch 2 batch 3 batch 0 batch 1 batch 2 batch 3 batch 0 batch 1 batch 2 batch 3 progress() returns: batch 0 result batch 1 result batch 2 result batch 3 result prefill absorbs 2 iters; 4 batches in flight at iter ≥ 2 if M is large
Each column is one progress()-internal iteration. Rows show what runs at each lookahead. After 2 prefill iters, 3 batches are in flight simultaneously (different stages on different streams via the io/compute split). The progress() return tracks lookahead=0's step_result. 每列是一次 progress() 内部迭代。各行表示在不同 lookahead 上跑的内容。经过 2 轮预热后,3 个 batch 同时在飞(io/compute 拆分让不同阶段落在不同流上)。progress() 的返回值对应 lookahead=0 的 step_result

7. Class & method reference7. 类与方法参考

7.1 Engine classes7.1 引擎类

Class diagram: engine core SchedulablePipeline - _schedule: Schedule - _stream_pool: StreamPool - _executor, _ring, _ctx + progress(iter) → object + step(batch) → object + shutdown() + basic(model, opt, ...) - _seed_first_batch(slot_contents) - _run_one_internal_iter(iter) Schedule + stages: Tuple[Stage, ...] + stream_slots: Tuple[str, ...] + in_flight_batches @property Stage + tasks: Tuple[Task, ...] Task + name: str + stream: str + lookahead: int + reads, writes: Tuple[DataSlot] + depends_on: Tuple[str] + cross_iter_depends_on: Tuple[(str,int)] + same_progress_sync: Tuple[str] + nccl: bool + run(ctx) → None + init(ctx) → None + from_fn(name, fn, ...) DataSlot + name: str + batch_offset: int BatchRing - _slots: List[SlotStore] + at(offset) → SlotStore + advance() → None TaskContext - _ring, _stream_pool + slots @property → SlotStore + stream_pool @property + iter_count: int [thread-local] - _active_offset [thread-local] StreamPool - _streams: Dict[str, Stream] + get(name) → Stream + use(name) ctx-mgr 1 1 1 * tasks * _ring stream_pool _ring
Public + private (italicized in uml-text-mute) members. SchedulablePipeline aggregates a Schedule, StreamPool, and Executor; the BatchRing + TaskContext are constructed internally. 公开成员 + 私有成员(uml-text-mute 斜体)。SchedulablePipeline 聚合 ScheduleStreamPoolExecutorBatchRing + TaskContext 由内部构造。

7.2 Executor classes7.2 执行器类

Class / function类 / 函数 Member成员 Description说明
SequentialExecutorexecute_stage(stage, ctx, iter_count, mask, cross_stream_waits, pool)Iterate over stage.tasks in order, run each under pool.use(stream).按顺序遍历 stage.tasks,每个任务在 pool.use(stream) 上下文里执行。
shutdown()No-op.空操作。
ThreadedExecutor__init__(thread_map=None, max_workers=None)Lazy ThreadPoolExecutor; resizes on stage demand.懒构造 ThreadPoolExecutor;按 stage 需要扩容。
execute_stage(...)Partition active by thread_id, build cpu_deps, submit chains, barrier.按 thread_id 切分活跃任务,构造 cpu_deps,提交各链,最后做屏障。
shutdown()Joins worker threads.合并 worker 线程。
_NcclOrderedLockacquire(ticket)Block until next_ticket == ticket or raise on _failed.阻塞至 next_ticket == ticket,或在 _failed 时抛错。
release(failed=False)Increment next_ticket; if failed, set _failed.递增 next_ticket;若 failed,置位 _failed
abort()Force-set _failed + notify_all. Wakes blocked waiters.强制置 _failed + notify_all,唤醒所有等待者。
reset()Reset both fields. Called per stage.重置两个字段,每个 stage 调用一次。
_resolve_thread_id(task, thread_map)helper辅助函数Maps a task to its thread id given any thread_map spec.在任意 thread_map 规约下把任务映射到 thread id。
_compute_cpu_deps(active, thread_id_of, completion)helper辅助函数Build cross-thread {name → [Event]}.构造跨线程的 {name → [Event]}
_apply_cross_stream_waits(task, waits, pool)helper辅助函数GPU-side wait_stream emission.发出 GPU 端的 wait_stream

7.3 HSTU classes7.3 HSTU 类

Class Member成员 Visibility可见性 Purpose用途
HSTUPipeline__init__(model, optimizer, device, *, prefetch=False, prefetch_depth=1, batch_shuffler=None, threaded=True, thread_map=None, ...)publicConstruct the adapter; pipeline lazily built on first progress().构造适配器;流水线在首次 progress() 时懒构建。
progress(dataloader_iter) → (loss, tokens, output)publicDrive one full iter; matches legacy return signature.驱动一次完整迭代;返回签名与遗留实现一致。
attach(model=None) / detach() → ModulepublicPause/resume hooks (matches legacy).暂停/恢复钩子(与遗留一致)。
shutdown() + context-managerpublicTear down executor + threads.拆解执行器与线程。
_build_schedule()internalAssemble Schedule + StreamPool from variant flags.根据变体参数装配 Schedule + StreamPool。
_rewrite_model(peek, dist, default, memcpy=None)internalBootstrap torchrec FX trace + monkey-patches.引导 torchrec FX 追踪与 monkey-patch。
HSTUPipelineFactoryregister(name, factory)publicRegister a custom variant.注册自定义变体。
create(name, **kwargs) → HSTUPipelinepublicLook up + instantiate.查找并实例化。
list() → List[str]publicEnumerate registered names.列出已注册名称。
Pre-registered variants预注册变体: "hstu_sparse_dist" (native), "hstu_prefetch_sparse_dist" (prefetch).
PipelineStatemodel, optimizer, device, batch_shufflerinternalCaptured at construction.构造时捕获。
pipelined_modulesinternalPopulated by _rewrite_model._rewrite_model 填充。
create_torchrec_ctx() → TrainPipelineContextinternalv1 only — asserts ctx.version == 1.仅 v1 —— 断言 ctx.version == 1

8. User-facing API examples8. 用户接口与示例

8.1 T1: vanilla preset (≤ 8-line diff)8.1 T1:基础预设(≤ 8 行 diff)

from commons.pipeline.engine import SchedulablePipeline

# Before — manual loop:
# for batch in loader:
#     opt.zero_grad()
#     loss = model(batch).sum()
#     loss.backward()
#     opt.step()

# After — preset:
pipe = SchedulablePipeline.basic(model, optimizer, loss_fn=lambda out: out.sum())
for batch in loader:
    pipe.step(batch)

8.2 T2: with prefetch + memcpy stream8.2 T2:带 prefetch + memcpy 流

pipe = SchedulablePipeline.basic(
    model, optimizer,
    loss_fn=lambda out: out.sum(),
    prefetch=True,
    memcpy_stream=True,
)
# Drive with cpu-resident batches; engine moves them to GPU on memcpy stream
for cpu_batch in cpu_loader:
    pipe.step(cpu_batch)

8.3 HSTU usage8.3 HSTU 使用

from commons.pipeline.hstu_pipeline import HSTUPipelineFactory

pipe = HSTUPipelineFactory.create(
    "hstu_prefetch_sparse_dist",
    model=model_train,
    optimizer=dense_optimizer,
    device=torch.device("cuda"),
    batch_shuffler=batch_shuffler,
)

# Create the iterator ONCE outside the loop. SPEC_p4 v2's
# iterator-identity reset means a fresh `iter(dataloader)` per
# step would restart the engine each time and discard in-flight
# batches.
dataloader_iter = iter(dataloader)
for _ in range(max_train_iters):
    try:
        loss, tokens, output = pipe.progress(dataloader_iter)
    except StopIteration:
        break
pipe.shutdown()

Matches legacy's return signature; use case is drop-in replacement of JaggedMegatronTrainPipelineSparseDist / JaggedMegatronPrefetchTrainPipelineSparseDist. 返回签名与遗留一致;可作为 JaggedMegatronTrainPipelineSparseDist / JaggedMegatronPrefetchTrainPipelineSparseDist 的直接替换。

8.4 Custom thread_map8.4 自定义 thread_map

# by_stream (default for non-HSTU SchedulablePipeline.basic)
pipe = SchedulablePipeline.basic(model, opt, threaded=True, thread_map="by_stream")

# per_task — every task its own thread
pipe = SchedulablePipeline.basic(model, opt, threaded=True, thread_map="per_task")

# Explicit dict
pipe = SchedulablePipeline.basic(
    model, opt,
    threaded=True,
    thread_map={
        "h2d": "io",
        "forward": "compute",
        "backward": "compute",
        "optimizer_step": "compute",
    },
)

# Callable
pipe = SchedulablePipeline.basic(
    model, opt,
    threaded=True,
    thread_map=lambda task: "io" if task.stream == "memcpy" else "compute",
)

9. Internal API examples9. 内部接口与示例

9.1 Custom Task subclass9.1 自定义 Task 子类

from commons.pipeline.engine import Task, DataSlot

class CountTokensTask(Task):
    name = "count_tokens"
    stream = "default"
    lookahead = 0
    reads = (DataSlot("batch_gpu", 0),)
    writes = (DataSlot("token_count", 0),)

    def run(self, ctx):
        batch = ctx.slots["batch_gpu"]
        ctx.slots.set("token_count", batch.numel())

9.2 Hand-built Schedule9.2 手工搭建 Schedule

from commons.pipeline.engine import (
    Schedule, Stage, StreamPool, Task, DataSlot, SchedulablePipeline,
)

# Two tasks: producer on memcpy stream, consumer on default
producer = Task.from_fn(
    "producer",
    lambda ctx: ctx.slots.set("x", torch.randn(8, device="cuda")),
    stream="memcpy",
    writes=(DataSlot("x", 0),),
)
consumer = Task.from_fn(
    "consumer",
    lambda ctx: ctx.slots.set("step_result", ctx.slots["x"].sum()),
    stream="default",
    reads=(DataSlot("x", 0),),
    writes=(DataSlot("step_result", 0),),
)

schedule = Schedule(
    stages=(Stage(tasks=(producer, consumer)),),
    stream_slots=("default", "memcpy"),
)
pool = StreamPool({
    "default": torch.cuda.default_stream(),
    "memcpy": torch.cuda.Stream(priority=-1),
})
pipe = SchedulablePipeline(schedule, pool, executor="threaded")
result = pipe.step(None)  # engine fills batch_cpu but our tasks don't use it

9.3 Plug in a custom Executor9.3 接入自定义 Executor

# An executor must implement: execute_stage(...) and shutdown().
class VerboseExecutor:
    def __init__(self, inner):
        self._inner = inner

    def execute_stage(self, stage, ctx, iter_count, should_run, waits, pool):
        for task in stage.tasks:
            if should_run(task):
                print(f"[iter {iter_count}] running {task.name}")
        return self._inner.execute_stage(stage, ctx, iter_count, should_run, waits, pool)

    def shutdown(self):
        self._inner.shutdown()

# Use:
from commons.pipeline.engine import SequentialExecutor
pipe = SchedulablePipeline(schedule, pool, executor=VerboseExecutor(SequentialExecutor()))

10. Limitations & followups10. 限制与后续工作

Tracked in tasks/followups.md. The non-resolved items relevant to current users: 完整列表在 tasks/followups.md。与当前用户相关的未解决项:

Item条目 Status状态 Trigger to resume恢复触发条件
Engine offset-aware cross-stream sync (wait_event instead of wait_stream)引擎层 offset 感知的跨流同步(用 wait_event 代替 wait_streamSPEC drafted (SPEC_p3.md)SPEC 已草拟(SPEC_p3.mdPerf profile shows io ↔ compute overlap collapse under non-identity shuffler非 identity shuffler 下性能 profile 显示 io ↔ compute 重叠崩溃
Adam optimizer parityAdam 优化器 parityxfail — torchrec checkpoint Adam-state limitationxfail —— torchrec checkpoint 的 Adam state 限制Upstream torchrec fix lands上游 torchrec 修复合入
Real multi-rank NCCL ordering test真正的多 rank NCCL 顺序测试only Python-call-order tested in unit suite单元测试只覆盖了 Python 调用顺序Add multi-process NCCL fixture; partly covered by 4-GPU parity tests on real workloads补充多进程 NCCL fixture;4-GPU parity 测试已部分覆盖
Non-Megatron DDP support in HSTU adapterHSTU 适配层支持非 Megatron DDPfinalize_model_grads + zero_grad_buffer hardcodedfinalize_model_grads + zero_grad_buffer 写死Generic DDP user wants to adopt有通用 DDP 用户想接入
Semantic B (autonomous data pipeline)Semantic B(自治的数据流水线)Engine has Semantic A only (prefetch_depth)引擎目前只有 Semantic A(prefetch_depthWorkload with high input_dist latency variance有 input_dist 延迟方差较大的负载
Test baseline测试基线 (commit 338f0c46, 4-GPU on ipp1-2028): 25 passed, 16 xfailed (all adam), 0 failed. Anti-flake mode: 2 consecutive runs, 50-step long-run drift check, ≥2-thread utilization probe, NCCL deadlock regression guard. (提交 338f0c46ipp1-2028 上 4-GPU):25 passed、16 xfailed(全部为 adam)、0 failed。抗抖动模式:连跑 2 次、50 步长程漂移检查、≥2 线程利用率探针、NCCL 死锁回归守卫。

Appendix A. Δ in cross_iter_depends_on附录 A. cross_iter_depends_on 中的 Δ

Δ is the engine's internal name for the "progress lag" between when a producer records its event and when a consumer reads it via a cross_iter_depends_on=((X, -N),) edge. Defined and bounded entirely by lookaheads and N; it never appears in the user-facing API but governs which configurations are accepted at SchedulablePipeline.__init__. Δ 是引擎内部用来描述 cross_iter_depends_on=((X, -N),) 这条边上"producer 录 event 到 consumer 读 event 之间的 progress 延迟"的量。完全由 lookahead 和 N 决定;用户接口里不出现,但决定了 SchedulablePipeline.__init__ 时哪些配置会被接受。

A.1 DefinitionA.1 定义

Δ is primitively defined as the progress lag between two task submissions — equivalently, the number of ring.advance() calls (one per progress()) elapsed between when a producer records an event and when a consumer reads it back: Δ 的本质定义是两次任务提交之间的 progress 跨度——等价地,producer 录 event 到 consumer 读 event 之间 ring.advance()(每次 progress() 调一次)调用的次数:

$$\Delta \;\stackrel{\text{def}}{=}\; q_c - q_p$$

where $q_c$ is the progress() index in which consumer reads the event and $q_p$ is the progress() index in which producer recorded it. This definition is universal — it applies to any cross-progress wait, independent of which sync field the user declared. cross-iter requires $q_p < q_c$, i.e. $\Delta \ge 1$. 其中 $q_c$ 是 consumer 读 event 时所在的 progress() 编号,$q_p$ 是 producer 录 event 时所在的 progress() 编号。这个定义是普适的——任何跨 progress 等待都适用,与用户用了哪种 sync 字段无关。cross-iter 要求 $q_p < q_c$,即 $\Delta \ge 1$。

Closed-form value in the cross_iter case. When the wait is declared via consumer.cross_iter_depends_on=(("X", -N),) with $N > 0$, the lookahead mechanics determine $q_c$ and $q_p$ uniquely; working through them (see §A.2) gives: cross_iter 场景下的闭式取值。当依赖通过 consumer.cross_iter_depends_on=(("X", -N),) ($N > 0$) 声明时,lookahead 机制把 $q_c$ 和 $q_p$ 唯一确定下来;按 §A.2 的推导,可以算出:

$$\Delta \;=\; X.\mathrm{la} + N - \mathrm{consumer.la} \quad\text{(cross\_iter only)}$$

This is a derived equality, not a definition — it holds in the cross_iter context because of how lookahead pins down which batch each task processes. Other sync types would yield different closed forms (e.g. same_progress_sync always has $\Delta = 0$ since both tasks are in the same progress). 这是个推导出来的等式,不是定义——在 cross_iter 上下文里成立,是因为 lookahead 把每个 task 处理的 batch 钉死了。其他 sync 类型会有不同的闭式(例如 same_progress_sync 永远 $\Delta = 0$,因为两个 task 在同一次 progress 内)。

Useful decomposition: $\Delta = N + (X.\mathrm{la} - \mathrm{consumer.la})$. When producer and consumer have equal lookahead, $\Delta = N$ exactly (the user's literal "wait for N batches ago"); when producer is deeper in the ring ($X.\mathrm{la} > \mathrm{consumer.la}$), $\Delta > N$ (extra rotations to bring the slot down to consumer's view). 有用的分解:$\Delta = N + (X.\mathrm{la} - \mathrm{consumer.la})$。producer 与 consumer lookahead 相等时 $\Delta = N$(即用户字面理解的"N 个 batch 前");producer 在 ring 更深处 ($X.\mathrm{la} > \mathrm{consumer.la}$) 时 $\Delta > N$(slot 需要多平移几步才到 consumer 视角)。

A.2 Derivation of Δ in the cross_iter caseA.2 cross_iter 场景下 Δ 的推导

Goal: take the primitive Δ definition $q_c - q_p$ from §A.1 and reduce it to a closed-form expression in $X.\mathrm{la}$, $N$, and $\mathrm{consumer.la}$ when the wait is declared as cross_iter_depends_on=(("X", -N),). 目标:把 §A.1 给出的本质定义 $\Delta = q_c - q_p$,在 cross_iter_depends_on=(("X", -N),) 这种声明下化简成只含 $X.\mathrm{la}$、$N$、$\mathrm{consumer.la}$ 的闭式。

Convention: in P_n (the n-th call to progress(), 0-indexed), a task with lookahead k processes batch B_(n+k) and records its event at slot[k]. Equivalently, slot[k] in P_n currently holds B_(n+k). 约定:在 P_n(第 n 次调用 progress(),0-indexed)里,lookahead 为 k 的 task 处理 batch B_(n+k),并在 slot[k] 记录 event。等价地,P_nslot[k] 当前装着 B_(n+k)

Consumer at P_n = q_c processes B_(n + consumer.la). The cross_iter declaration says it waits for X's work on B_(n + consumer.la − N). The progress in which X processes B_(n + consumer.la − N) is P_(n + consumer.la − N − X.la): consumer 在 P_n = q_c 处理 B_(n + consumer.la)。cross_iter 要等 X 在 B_(n + consumer.la − N) 上的工作。而处理 batch B_(n + consumer.la − N) 的 progress 是在 P_(n + consumer.la − N − X.la)

$$ \begin{aligned} q_p &= n + \mathrm{consumer.la} - N - X.\mathrm{la} \\ \Delta &= q_c - q_p \\ &= n - (n + \mathrm{consumer.la} - N - X.\mathrm{la}) \\ &= X.\mathrm{la} + N - \mathrm{consumer.la} \end{aligned} $$

Where consumer actually looks up the event. Producer recorded its event at $\mathrm{slot}[X.\mathrm{la}]$ in its own progress $P_{q_p}$. Each ring.advance() uniformly shifts every slot by $-1$. After $\Delta$ advances, the slot that was at offset $X.\mathrm{la}$ has rotated to offset $X.\mathrm{la} - \Delta$ in consumer's progress $P_{q_c}$. The engine's consumer-side ring index is exactly this: consumer 实际去查的 slot。producer 在自己的 progress $P_{q_p}$ 里把 event 录在 $\mathrm{slot}[X.\mathrm{la}]$;每次 ring.advance() 把所有 slot 整体 $-1$ 平移,经过 $\Delta$ 次 advance 后那个 slot 在 consumer 的 progress $P_{q_c}$ 里已经落到偏移 $X.\mathrm{la} - \Delta$。这个偏移就是引擎在 consumer 端实际查询的 ring index:

$$\mathrm{slot\_offset} \;\stackrel{\text{def}}{=}\; X.\mathrm{la} - \Delta$$

$X.\mathrm{la}$ does appear in $\Delta$, but it cancels when we substitute: $\Delta$ 里其实带着 $X.\mathrm{la}$,代入展开后两个 $X.\mathrm{la}$ 相消:

$$\mathrm{slot\_offset} \;=\; X.\mathrm{la} - (X.\mathrm{la} + N - \mathrm{consumer.la}) \;=\; \mathrm{consumer.la} - N$$

That is why the consumer-side lookup index depends only on $\mathrm{consumer.la}$ and $N$, not on the producer's lookahead — ring rotation is uniform, so producer.la is fully absorbed into $\Delta$. The slot_offset formula in deps.py writes this directly as consumer.batch_offset + neg_offset (where neg_offset = -N). 这就是为什么 consumer 端实际查询的 slot 编号只跟 $\mathrm{consumer.la}$ 和 $N$ 有关,与 producer 的 lookahead 无关——ring 旋转是整体均匀平移,producer.la 的信息全部被 $\Delta$ 吸收。deps.py 里直接写成 consumer.batch_offset + neg_offset(其中 neg_offset = -N)。

A.3 Validity constraintsA.3 合法性约束

Condition条件 Requirement要求 Why / engine response原因 / 引擎反应
$\Delta < 0$$\mathrm{consumer.la} > X.\mathrm{la} + N$Future-read: producer hasn't yet processed $B_{K-N}$ by consumer's progress. deps.py raises at construction.future-read:producer 在 consumer 的 progress 时还没处理过 $B_{K-N}$。deps.py 构造时直接 raise。
$\Delta = 0$$\mathrm{consumer.la} = X.\mathrm{la} + N$Same-progress, no ring rotation. Auto-promoted to same_progress_sync semantics: engine adds the topo-DAG edge X → consumer, the cross-thread CPU edge, and emits wait_event at slot[X.la]. The user-facing declaration stays as written; the engine handles the dispatch. (Earlier versions raised here, instructing the user to rewrite as same_progress_sync; that step is now optional. See SPEC_cross_iter_delta0_autoconvert.md.)同 progress、无 ring 旋转。自动提升为 same_progress_sync 语义:引擎加 topo-DAG 边 X → consumer、跨线程 CPU 边,跨 stream 时在 slot[X.la] emit wait_event。用户层声明保持原样,引擎自动派发。(早期版本会在此 raise 并要求重写为 same_progress_sync;该步骤现在可选。详见 SPEC_cross_iter_delta0_autoconvert.md。)
$\Delta \ge 1$$\mathrm{consumer.la} \le X.\mathrm{la} + N - 1$Strictly cross-progress — accepted.严格跨 progress —— 接受。
Additional (cross-stream only): $\mathrm{slot\_offset} \ge 0$额外(仅 cross-stream):$\mathrm{slot\_offset} \ge 0$$\mathrm{consumer.la} \ge N$ — slot still in ring; otherwise event has been recycled. Same-stream skips this check (CUDA stream FIFO orders submissions without needing the event lookup).$\mathrm{consumer.la} \ge N$ —— slot 仍在 ring 内;否则 event 已被回收。同 stream 跳过此检查(CUDA stream FIFO 直接保序,不需要查 event)。

Combined cross-stream interval: cross-stream 合并区间:

$$N \;\le\; \mathrm{consumer.la} \;\le\; X.\mathrm{la} + N - 1 \quad\Longleftrightarrow\quad 1 \;\le\; \Delta \;\le\; X.\mathrm{la}$$

Same-stream relaxes to just Δ ≥ 1. 同 stream 放宽为只需 Δ ≥ 1。

Validator code: deps.py lines 437-501 (Δ=0 / Δ<0), and deps.py line 515 (slot_offset<0). 校验代码deps.py 437-501 行(Δ=0 / Δ<0),deps.py 515 行(slot_offset<0)。

A.4 Plain-language summaryA.4 通俗解释

Read cross_iter_depends_on=(("X", -N),) as: "the consumer task, when running on batch $K$, needs X's GPU work for batch $K - N$ to be already finished and recorded into the ring." $\Delta$ tells the engine how stale X's record will be by the time consumer reads it — measured in progress() calls. cross_iter_depends_on=(("X", -N),) 读作:"consumer 在 batch $K$ 上跑时,需要 X 在 batch $K - N$ 上的 GPU 工作已经做完并录进 ring。"$\Delta$ 告诉引擎:等 consumer 真正读这条 event 时,它在 ring 里待机了多少个 progress() 步。

  • $\Delta = 1$: producer ran in the immediately preceding progress. Most common case.$\Delta = 1$:producer 在紧邻的上一次 progress 跑完。最常见。
  • $\Delta = 2, 3, \ldots$: producer ran further back. Engine still finds the event in the ring as long as $\Delta \le X.\mathrm{la}$ (the slot hasn't rotated out yet).$\Delta = 2, 3, \ldots$:producer 在更早的 progress 跑完。只要 $\Delta \le X.\mathrm{la}$(slot 还没被旋转出去),引擎仍能在 ring 里找到 event。
  • $\Delta = 0$: producer and consumer are both in the same progress. Engine auto-promotes the declaration to same_progress_sync semantics — same topo edge, same CPU edge, same slot[producer.la] event lookup. Equivalent to writing same_progress_sync=(X,) directly; the cross_iter syntax just lets users keep a batch-flow mental model.$\Delta = 0$:producer 和 consumer 在同一次 progress 里。引擎自动提升这条声明为 same_progress_sync 语义——相同的 topo 边、CPU 边、slot[producer.la] event 查询。与直接写 same_progress_sync=(X,) 等价;cross_iter 语法只是让 batch-flow 思维的用户少改一行。
  • $\Delta < 0$: consumer would be asking for something producer hasn't computed yet. Logically impossible.$\Delta < 0$:consumer 在等 producer 还没算出来的东西。逻辑上不可能。

Quick mnemonic: if you wrote cross_iter_depends_on=(("X", -1),), you usually mean "wait for last progress's X". That works exactly when $\Delta = 1$, which forces $X.\mathrm{la} = \mathrm{consumer.la}$. Any la mismatch causes $\Delta$ to drift and may collide with one of the boundary checks above. 速记:写 cross_iter_depends_on=(("X", -1),) 通常是想表达"等上一次 progress 的 X"。这恰好对应 $\Delta = 1$,强制 $X.\mathrm{la} = \mathrm{consumer.la}$。任何 la 不一致都会让 $\Delta$ 偏离,可能撞上上面的边界检查。

A.5 Worked examplesA.5 实例

$X.\mathrm{la}$ $\mathrm{consumer.la}$ $N$ $\Delta$ $\mathrm{slot\_offset}$ Verdict判定
0011−1cross-stream: REJECTED (slot rotated out). same-stream: OK (FIFO).cross-stream:拒(slot 出 ring)。same-stream:通过(FIFO 保序)。
11110OK. "Wait for last progress's X on same lookahead."通过。"等上一 progress 的 X,同 lookahead。"
22220OK. "Wait for X two progresses back, same lookahead."通过。"等 2 个 progress 前的 X,同 lookahead。"
32230OK. X is deeper, so its slot needs 3 advances to reach consumer's view; ring depth 4 holds it.通过。X 在更深处,slot 需要旋转 3 次才到 consumer 视角;ring 深度 4 装得下。
01100OK — Δ=0 auto-promoted. Engine adds topo edge X → consumer; emits wait_event at slot[0] (cross-stream) or relies on FIFO + topo order (same-stream). Equivalent to same_progress_sync=(X,).通过 —— Δ=0 自动提升。引擎加 topo 边 X → consumer;跨流时 slot[0] emit wait_event,同流靠 FIFO + topo 顺序。等价于 same_progress_sync=(X,)
031−22REJECTED — future-read (Δ<0). Producer hasn't run yet at consumer's progress.拒 —— future-read (Δ<0)。consumer 的 progress 时 producer 还没跑过这个 batch。

Document generated to summarize the schedulable-pipeline rework on branch junzhang/rework-mtms. Source: SPEC.md, SPEC_p2.md, SPEC_p3.md, tasks/followups.md. SVG diagrams hand-authored to UML 2.5 conventions (lifelines + activations + sync/async/return arrows for sequence; class boxes with stereotype-free attribute/operation compartments for class diagrams). 本文档总结分支 junzhang/rework-mtms 上的可调度流水线重写。来源:SPEC.mdSPEC_p2.mdSPEC_p3.mdtasks/followups.md。SVG 图形按照 UML 2.5 规范手工绘制(时序图:lifeline + activation + 同步/异步/返回箭头;类图:无 stereotype 的属性/操作分栏)。