Schedulable Pipeline Engine

Multi-thread, multi-stream execution with data dependency & NCCL ordering guarantees

1. Thread/Stream Decoupling
2. Data Dependency
3. NCCL Ordering
4. Multi-Batch Pipeline
5. Search Space

Thread / Stream Decoupling

Core design: A task's stream decides which CUDA stream context it runs in. The thread_map decides which CPU worker thread submits it. They are independent.

Example: prefetch pipeline with 2 streams, 2 threads

default stream (compute)
memcpy stream (H2D)
CUDA stream context
CPU thread boundary

thread_map strategies

ThreadedExecutor(thread_map="by_stream")    # default: group by task.stream
ThreadedExecutor(thread_map="per_task")     # every task gets its own thread
ThreadedExecutor(thread_map={"h2d": "io", "fwd": "compute", ...})   # explicit dict
ThreadedExecutor(thread_map=lambda t: "io" if t.stream=="memcpy" else "compute")  # callable
Same CUDA stream can be used from different CPU threads. Different CUDA streams can share a CPU thread. The GPU serializes work per-stream regardless of which CPU thread submitted it.

Data Dependency Correctness

Two independent mechanisms ensure data visibility:
1. GPU-side: wait_stream — from cross_stream_waits (stream-based)
2. CPU-side: threading.Event — from DAG analysis (thread-based)

How dependencies are resolved

GPU wait_stream (cross-stream)
CPU Event.wait (cross-thread)
Implicit ordering (same thread, sequential)

Dependency computation flow

                    Schedule (tasks with reads/writes/depends_on)
                                    │
                 ┌──────────────────┼──────────────────┐
                 ▼                                     ▼
        infer_cross_stream_waits()           _compute_cpu_deps()
         (deps.py — stream-based)          (executor.py — thread-based)
                 │                                     │
                 ▼                                     ▼
    GPU: consumer_stream.wait_stream(     CPU: threading.Event.wait()
         producer_stream)                 before task.run() on different
                                          thread than the dependency
Key invariant: CPU deps are computed from the task DAG (reads/writes + depends_on), NOT from stream assignment. This is what makes thread/stream decoupling safe. Two tasks on the same stream but different threads still get correct CPU ordering.

Example: h2d writes "batch_gpu", forward reads it

NCCL Submission Ordering

Problem: NCCL collectives (AllReduce, AllGather) are matched across ranks by call order. If rank 0 calls AllReduce_A then AllReduce_B, rank 1 must do the same. With multi-threading, this order could differ → deadlock.

Solution: _NcclOrderedLock (ticket-based)

NCCL collective (nccl=True)
Regular task
Ticket acquisition order

Mechanism

Declaration order:  [task_A (nccl), task_B (nccl), task_C (nccl)]
Ticket assignment:   ticket=0       ticket=1       ticket=2

_NcclOrderedLock state machine:
  ┌─────────────────────────────────────────────────────────────┐
  │  next_ticket = 0                                            │
  │                                                             │
  │  Thread X: acquire(ticket=0) → runs immediately             │
  │            task_A.run()                                     │
  │            release() → next_ticket = 1, notify_all()        │
  │                                                             │
  │  Thread Y: acquire(ticket=1) → was waiting, now runs        │
  │            task_B.run()                                     │
  │            release() → next_ticket = 2, notify_all()        │
  │                                                             │
  │  Thread X: acquire(ticket=2) → runs                         │
  │            task_C.run()                                     │
  │            release() → next_ticket = 3                      │
  └─────────────────────────────────────────────────────────────┘

  If task_B fails:
    release(failed=True) → self._failed = True
    Thread X: acquire(ticket=2) → sees _failed → raises RuntimeError
    → task_C never runs (no desync across ranks)
Guarantee: All ranks execute NCCL collectives in identical declaration order, regardless of thread scheduling. Failure in any NCCL task aborts all subsequent ones.

Multi-Batch Pipeline Overlap

The pipeline overlaps different batches via batch_offset. Within one internal iteration, tasks with different offsets operate on different batches in the BatchRing simultaneously.

Prefetch pipeline: in_flight_batches=2 (max_offset=1)

BatchRing state per iteration

Mask formula (SPEC §4.8)

Task with batch_offset=k runs at iteration i iff:
    (max_offset - k) ≤ i < M + (max_offset - k)

Where M = number of batches pulled from dataloader.

For in_flight_batches=2 (max_offset=1), M=5:
  k=1 (h2d):    runs at i=0,1,2,3,4    (prefetch: one iteration ahead)
  k=0 (compute): runs at i=1,2,3,4,5    (steady + drain)

  i=0: prefill  — only h2d(batch_0) runs
  i=1: steady   — h2d(batch_1) + compute(batch_0) overlap!
  i=2: steady   — h2d(batch_2) + compute(batch_1) overlap!
  ...
  i=5: drain    — only compute(batch_4) runs
Yes, different batches' stages DO overlap. At iteration i=2, h2d is working on batch_2 (via offset=1) while forward/backward/optimizer work on batch_1 (via offset=0). With ThreadedExecutor, these can run on different CPU threads too.

Full 3-batch pipeline: in_flight_batches=3

Auto-Scheduler v2 Search Space

Joint search chooses thread_map, stream_map, and fire_ordering. Each candidate then calls the existing auto_assign_lookaheads lookahead oracle before scoring.

Beam-search funnel

default-stream anchor
movable prep task
NCCL-serialized resource
scored candidate

Hard constraints

max_threads   <= 4
max_in_flight <= 5
forward / backward / finalize_model_grads / optimizer_step:
    stream="default", lookahead=0

NCCL tasks with the same comm group serialize even on different streams.