1. notify_dispatch(internode.cu)
define
这里分为sm0用于通信同步和其他sm准备dispatch的metadata。分为下面1,2两点:
- sm0:
- 阶段1:
nvshmemi_ibgda_quiet(dst_rdma_rank, qp_id)等待其他 RDMA 节点的操作完成。 - 阶段2: 机内机间同步。thread_id == 32(warp 1 的 lane 0):执行机间同步,确保所有 RDMA 节点前面的任务结束,再去clean up掉rdma_buffer_ptr。
- 阶段3:把每个rank、expert、rdma_rank对应的token数量写到send_buffer内对应的地址上。
send_buffer(0)[0..7] = num_tokens_per_rank[0..7] (8个NVL rank的token数)
send_buffer(0)[8..23] = num_tokens_per_expert[0..15] (16个expert的token数)
send_buffer(0)[24] = num_tokens_per_rdma_rank[0] (RDMA rank 0总token数)- 阶段4:RDMA发送,写完之后调用一次nvshmem的put操作。比如rank0的sm0的warp1发送给rank1
本地: send_buffer(1) [25个int]
↓ (RDMA PUT)
远程: recv_buffer(0) [25个int] (在rank 1的GPU上)- 阶段5:机内机间同步。
- 阶段6:从所有 RDMA 节点的 recv_buffer 读取,按 expert 求和;计算前缀和,并发送给节点内其他gpu;之后接受的时候就可以全局聚合前缀和和 expert token 数。
- 阶段7::机内机间同步。
- sm1-x:
- 阶段1:for loop遍历所有token,根据is_token_in_rank这个router map计算出来total_count(发送到该RDMA rank的总token数)和per_nvl_rank_count[j] (发送到该 RDMA rank 的各个 NVL GPU 的 token 数)。
- 阶段2:写入 gbl_channel_prefix_matrix 和 rdma_channel_prefix_matrix
- 阶段3:为每个 channel 计算前缀和,用于后续数据分发的索引计算
template <bool kLowLatencyMode, int kNumRDMARanks>
__global__ void notify_dispatch(...) {
// 所有gpu的sm0来负责同步通信
if (sm_id == 0) {
// 阶段1
auto qps_per_rdma_rank = ibgda_get_state()->num_rc_per_pe * ibgda_get_state()->num_devices_initialized;
for (int i = thread_id; i < qps_per_rdma_rank * (kNumRDMARanks - 1); i += num_threads) {
auto dst_rdma_rank = (i / qps_per_rdma_rank + rdma_rank + 1) % kNumRDMARanks;
auto qp_id = i % qps_per_rdma_rank;
nvshmemi_ibgda_quiet(translate_dst_rdma_rank<kLowLatencyMode>(dst_rdma_rank, nvl_rank), qp_id);
}
__syncthreads();
// 阶段2
if (thread_id == 32)
nvshmem_sync_with_same_gpu_idx<kLowLatencyMode>(rdma_team); // 机间同步
barrier_block<NUM_MAX_NVL_PEERS, true>(barrier_signal_ptrs, nvl_rank); // 机内同步
// 阶段3
for (int i = thread_id; i < num_ranks; i += num_threads)
rdma_recv_num_tokens_mixed.send_buffer(i / NUM_MAX_NVL_PEERS)[i % NUM_MAX_NVL_PEERS] = num_tokens_per_rank[i];
for (int i = thread_id; i < num_experts; i += num_threads)
rdma_recv_num_tokens_mixed.send_buffer(i / num_rdma_experts)[NUM_MAX_NVL_PEERS + i % num_rdma_experts] =
num_tokens_per_expert[i];
if (thread_id < kNumRDMARanks)
rdma_recv_num_tokens_mixed.send_buffer(thread_id)[NUM_MAX_NVL_PEERS + num_rdma_experts] = num_tokens_per_rdma_rank[thread_id];
// 阶段4
for (int i = warp_id; i < kNumRDMARanks; i += num_warps) {
if (i != rdma_rank) {
nvshmemi_ibgda_put_nbi_warp<true>((rdma_recv_num_tokens_mixed.recv_buffer(rdma_rank)),
(rdma_recv_num_tokens_mixed.send_buffer(i)), i, 0, lane_id,0);
} else {机内拷贝}
}
// 阶段5
nvshmemi_ibgda_quiet(thread_id, 0); // 等RDMA 完成
nvshmem_sync_with_same_gpu_idx<kLowLatencyMode>(rdma_team); // 机间同步
// 阶段6
for (int i = 0; i < kNumRDMARanks; ++i)
sum += rdma_recv_num_tokens_mixed.recv_buffer(i)[NUM_MAX_NVL_PEERS + thread_id];
nvl_reduced_num_tokens_per_expert[thread_id] = sum;
for (int i = 0; i < kNumRDMARanks; ++i) {
sum += rdma_recv_num_tokens_mixed.recv_buffer(i)[NUM_MAX_NVL_PEERS + num_rdma_experts];
recv_rdma_rank_prefix_sum[i] = sum;
}
*moe_recv_rdma_counter_mapped = sum;
for (int i = 0; i < kNumRDMARanks; ++i)
nvl_send_num_tokens_per_rank.buffer(nvl_rank)[i] = rdma_recv_num_tokens_mixed.recv_buffer(i)[thread_id];
for (int i = 0; i < num_nvl_experts; ++i)
nvl_send_num_tokens_per_expert.buffer(nvl_rank)[i] = nvl_reduced_num_tokens_per_expert[thread_id * num_nvl_experts + i];
for (int i = 0; i < num_ranks; ++i) {
int src_rdma_rank = i / NUM_MAX_NVL_PEERS, src_nvl_rank = i % NUM_MAX_NVL_PEERS;
sum += nvl_recv_num_tokens_per_rank.buffer(src_nvl_rank)[src_rdma_rank];
recv_gbl_rank_prefix_sum[i] = sum;
}
*moe_recv_counter_mapped = sum;
for (int i = 0; i < NUM_MAX_NVL_PEERS; ++i)
sum += nvl_recv_num_tokens_per_expert.buffer(i)[thread_id];
sum = (sum + expert_alignment - 1) / expert_alignment * expert_alignment;
moe_recv_expert_counter_mapped[thread_id] = sum;
// 阶段7
nvshmem_sync_with_same_gpu_idx<kLowLatencyMode>(rdma_team);
barrier_block<NUM_MAX_NVL_PEERS>(barrier_signal_ptrs, nvl_rank);
// Calculate meta data
} else {
for () {
for (int j = 0; j < NUM_MAX_NVL_PEERS; ++j)
per_nvl_rank_count[j] += is_token_in_rank_values[j];
total_count += (is_token_in_rank_uint64 != 0);
}
//...
}
}同步
这里主要就是nvshmemi_ibgda_quiet、nvshmem_sync_with_same_gpu_idx和barrier_block三种同步。
a. barrier_block
在最开始buffer创建的时候buffer_ptrs[nvl_rank]对应机内每个rank的一部分内存地址就已经变成ipc/frabic handle提前给其他gpu了,所以机内gpu都可以访问这一块内存。在buffer_ptrs[nvl_rank]上还分为了数据区和信号区。
buffer_ptrs[nvl_rank] (通过IPC共享,所有GPU都能访问)
├─ num_nvl_bytes (NVLink数据缓冲区)
└─ barrier_signal_bytes (barrier信号区,8个int = 32字节)
└─ barrier_signal_ptrs[0..7] (每个int对应一个GPU的计数)每个GPU调用atomicAdd_system在自己的位置上加 1024(8 次,每个线程一次),然后调用atomicSub_system在其他 GPU 的 barrier_signal[rank] 上减 1024(8 次,每个线程一次)。之后的while(true)的 __all_sync 一直轮询。
template <int kNumRanks, bool kSyncOnly = false>
__forceinline__ __device__ void barrier_block(int** barrier_signal_ptrs, int rank) {
auto thread_id = static_cast<int>(threadIdx.x);
// For non-sync-only cases, the memory operations by other threads in the block must be visible to the `sys` scope
if constexpr (not kSyncOnly) {
memory_fence();
__syncthreads();
}
// Add self-ranks, sub other ranks
if (thread_id < kNumRanks) {
atomicAdd_system(barrier_signal_ptrs[rank] + thread_id, FINISHED_SUM_TAG);
atomicSub_system(barrier_signal_ptrs[thread_id] + rank, FINISHED_SUM_TAG);
}
EP_DEVICE_ASSERT(kNumRanks <= blockDim.x);
// Check timeout
auto start_time = clock64();
while (true) {
auto value = thread_id < kNumRanks ? ld_volatile_global(barrier_signal_ptrs[rank] + thread_id) : 0;
if (__all_sync(0xffffffff, value <= 0))
break;
if (clock64() - start_time > NUM_TIMEOUT_CYCLES and thread_id < kNumRanks) {
printf("DeepEP timeout check failed: rank = %d, thread = %d, value = %d)\n", rank, thread_id, value);
trap();
}
}
__syncthreads();
}