用户看到的是一行torch.nn.functional.softmax(x)背后 runtime 要做分配 Stream、入队命令、调度到 AI Core、等待完成、同步结果。如果这一行的延迟是 10μsruntime 的调度开销必须 0.5μs——否则就是 5% 的性能损失。runtime 的 Stream 调度引擎管理 32 个命令队列Command Queue每个对应一个硬件 Stream。命令入队后Dispatcher 根据资源Cube/Vector/L1/HBM 带宽做调度——不是先到先得是 resource-aware 调度。Stream 架构runtime Stream 架构 每个 Stream 独立维护命令队列32 个 Stream 共享 64 个 AI Core Stream 0cmd_1,cmd_2,... → AI Core 0,1 (Cube/Vector) Stream 1cmd_1,cmd_2,... → AI Core 2,3 (Cube/Vector) ... Dispatcher 调度器根据资源可用性决定哪个 Stream 的命令可以执行// runtime/stream/stream_engine.hclassStreamEngine{private:staticconstexprintMAX_STREAMS32;structCommand{uint64_tid;KernelPtr kernel;void*args;intstream_id;uint64_tenqueue_time;ResourceRequirement resources;};structCommandQueue{std::queueCommandpending;std::queueCommandexecuting;intstream_id;boolis_active;};std::arrayCommandQueue,MAX_STREAMSstreams_;structResourceDispatcher{intavailable_cube_units62;intavailable_vector_units62;intavailable_l1_kb32*1024;intavailable_hbm_bw900;intmax_concurrent_kernels8;intcurrent_concurrent0;};ResourceDispatcher dispatcher_;public:StatusEnqueueCommand(intstream_id,KernelPtr kernel,void*args){Command cmd;cmd.idnext_command_id_;cmd.kernelkernel;cmd.argsargs;cmd.stream_idstream_id;cmd.enqueue_timeGetTimestamp();cmd.resourceskernel-GetResourceRequirement(args);streams_[stream_id].pending.push(cmd);Schedule();returnStatus::OK;}voidSchedule(){if(dispatcher_.current_concurrentdispatcher_.max_concurrent_kernels){return;}// 优先级老化调度不是简单 Round-Robinstd::vectorCommand*candidates;for(ints0;sMAX_STREAMS;s){autostreamstreams_[s];if(stream.is_active!stream.pending.empty()){candidates.push_back(stream.pending.front());}}// 按等待时间排序等待越久优先级越高std::sort(candidates.begin(),candidates.end(),[](Command*a,Command*b){return(GetTimestamp()-a-enqueue_time)(GetTimestamp()-b-enqueue_time);});for(auto*cmd:candidates){if(!IsResourceAvailable(cmd-resources))continue;// 原子分配资源避免部分分配导致死锁ReserveResources(cmd-resources);autostreamstreams_[cmd-stream_id];stream.pending.pop();stream.executing.push(*cmd);dispatcher_.current_concurrent;LaunchKernelAsync(cmd-kernel,cmd-args,cmd-stream_id);}}boolIsResourceAvailable(constResourceRequirementreq){returnreq.cube_unitsdispatcher_.available_cube_unitsreq.vector_unitsdispatcher_.available_vector_unitsreq.l1_kbdispatcher_.available_l1_kbreq.hbm_bw_gbpsdispatcher_.available_hbm_bw;}voidReserveResources(constResourceRequirementreq){dispatcher_.available_cube_units-req.cube_units;dispatcher_.available_vector_units-req.vector_units;dispatcher_.available_l1_kb-req.l1_kb;dispatcher_.available_hbm_bw-req.hbm_bw_gbps;}voidOnKernelComplete(uint64_tcmd_id,intstream_id){autostreamstreams_[stream_id];Command completedstream.executing.front();stream.executing.pop();ReleaseResources(completed.resources);dispatcher_.current_concurrent--;Schedule();// 触发下一轮}};同步原语EventStream 之间需要同步——等 Stream 0 的 AllReduce 完成后Stream 1 才能用梯度更新参数。// runtime/stream/sync_primitives.cppclassStreamSynchronizer{private:structEvent{uint64_tid;intstream_id;uint64_tcmd_id;boolrecorded;boolcompleted;};std::unordered_mapuint64_t,Eventevents_;std::vectorstd::vectorintstream_wait_for_;// Stream 依赖图public:voidRecordEvent(uint64_tevent_id,intstream_id,uint64_tcmd_id){Event ev;ev.idevent_id;ev.stream_idstream_id;ev.cmd_idcmd_id;ev.recordedtrue;ev.completedfalse;events_[event_id]ev;}StatusStreamWaitEvent(intwaiting_stream_id,uint64_tevent_id){Eventevevents_[event_id];if(!ev.recorded)returnStatus::INVALID_EVENT;if(ev.completed)returnStatus::OK;stream_wait_for_[waiting_stream_id].push_back(ev.stream_id);streams_[waiting_stream_id].is_activefalse;returnStatus::OK;}voidCompleteEvent(uint64_tevent_id){Eventevevents_[event_id];ev.completedtrue;// 唤醒等待此事件的所有 Streamfor(ints0;sMAX_STREAMS;s){autowaitersstream_wait_for_[s];boolall_waited_completetrue;for(intwaited_stream:waiters){if(!IsStreamComplete(waited_stream)){all_waited_completefalse;break;}}if(all_waited_complete){streams_[s].is_activetrue;Schedule();}}}StatusSynchronizeAll(){for(ints0;sMAX_STREAMS;s){while(!streams_[s].executing.empty()){SpinWait(10);// 最后手段平时不用全局同步}}returnStatus::OK;}};事件同步的实际用法# Python 侧——compute-communication overlapimporttorch_npu compute_streamtorch_npu.Stream()comm_streamtorch_npu.Stream()# 计算流上跑前向withtorch_npu.stream(compute_stream):lossmodel(input)# 通信流上跑 AllReduce和前向并行withtorch_npu.stream(comm_stream):eventtorch_npu.Event()hccl.all_reduce(loss,eventevent)# 只等通信流——不影响其他 Streamcompute_stream.wait_event(event)# 等到了梯度就可以更新参数withtorch_npu.stream(compute_stream):optimizer.step()踩坑一Stream 饥饿Starvation某些大 kernel 持续提交 → 总是分配不到资源 → 小 kernel 永远得不到执行。修复优先级老化——每 100μs 等待 1 优先级等待时间越久的排越前。intGetPriority()const{uint64_twait_time_us(GetTimestamp()-enqueue_time)/1000;returnwait_time_us/100;// 每 100μs 1}踩坑二资源死锁Kernel A 占 16 个 Cube 等 2 个 VectorKernel B 占所有 Vector 等 32 个 Cube → 永远等不到 → 死锁。修复原子资源分配——要么全部分配要么一个都不给拒绝部分分配。StatusAtomicReserveResources(constResourceRequirementreq){if(!IsResourceAvailable(req)){returnStatus::INSUFFICIENT_RESOURCES;// 拒绝不部分分配}dispatcher_.available_cube_units-req.cube_units;dispatcher_.available_vector_units-req.vector_units;dispatcher_.available_l1_kb-req.l1_kb;dispatcher_.available_hbm_bw-req.hbm_bw_gbps;returnStatus::OK;}踩坑三全局同步破坏 Overlaptorch.npu.synchronize()等待所有 Stream 完成——compute-communication overlap 全部失效。正确做法用 Event 做精确同步# ❌ 全局同步——所有 Stream 都停torch.npu.synchronize()# ✅ 精确同步——只等需要的 Streameventtorch_npu.Event()compute_stream.record_event(event)comm_stream.wait_event(event)runtime 的 Stream 引擎是 NPU 硬件的操作系统。32 个 Stream 并发执行、资源感知调度防饥饿、原子资源分配防死锁、Event 同步只等需要的。训练时的 compute-communication overlap 就靠这个引擎——缺少它AllReduce 和 MatrixMul 就只能串行。