Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 87 additions & 78 deletions src/components/tl/cuda/allreduce/allreduce_nvls.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,105 +21,114 @@ enum {

ucc_status_t ucc_tl_cuda_allreduce_nvls_start(ucc_coll_task_t *coll_task)
{
ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t);
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_ee_h ee = task->super.ee;
ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t);
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_ee_h ee = task->super.ee;
cudaStream_t stream = (ee) ? (cudaStream_t)ee->ee_context : team->stream;
// stream is used in tl_trace below, but not used in this function
(void)stream;
ucc_rank_t trank = UCC_TL_TEAM_RANK(team);
ucc_ec_cuda_event_t *ec_event = (ucc_ec_cuda_event_t *)task->allreduce_nvls.evtCompletion;
cudaEvent_t evt = ec_event->event;
CUdeviceptr mc_va = task->allreduce_nvls.mc_va;
CUdeviceptr uc_va = task->allreduce_nvls.uc_va;
ucc_datatype_t dt = task->allreduce_nvls.dt;
uint32_t sm_count = UCC_TL_CUDA_TEAM_LIB(team)->cfg.nvls_sm_count;
uint32_t threads = UCC_TL_CUDA_TEAM_LIB(team)->cfg.nvls_threads;
ucc_status_t status;
cudaError_t cuda_status;

task->allreduce_nvls.buf_size_bytes = args->dst.info.count * ucc_dt_size(task->allreduce_nvls.dt);
task->allreduce_nvls.rbuf = args->dst.info.buffer;
task->allreduce_nvls.sbuf = UCC_IS_INPLACE(*args) ? args->dst.info.buffer : args->src.info.buffer;
task->allreduce_nvls.buf_size_bytes =
args->dst.info.count * ucc_dt_size(task->allreduce_nvls.dt);
task->allreduce_nvls.rbuf = args->dst.info.buffer;
task->allreduce_nvls.sbuf =
UCC_IS_INPLACE(*args) ? args->dst.info.buffer : args->src.info.buffer;

tl_trace(UCC_TASK_LIB(task),
"task: %p stream: %p allreduce_nvls_start symmetric uc addr: %p "
"mc addr: %p "
"buf_size_bytes: %zu, is inplace: %d",
task, stream, (void *)task->allreduce_nvls.uc_va, (void *)task->allreduce_nvls.mc_va,
task, stream, (void *)task->allreduce_nvls.uc_va,
(void *)task->allreduce_nvls.mc_va,
task->allreduce_nvls.buf_size_bytes, UCC_IS_INPLACE(*args));

task->allreduce_nvls.stage = STAGE_KERNEL;

// copy src buffer to symmetric memory first
cuda_status = cudaMemcpyAsync((void *)uc_va, task->allreduce_nvls.sbuf,
task->allreduce_nvls.buf_size_bytes,
cudaMemcpyDeviceToDevice, stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaMemcpyAsync failed: %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_MEMORY; // TODO: better error code?
return UCC_ERR_NO_MEMORY;
}

// Choose between dedicated barriers (optimal) or inline barriers (compatibility)
if (UCC_TL_CUDA_TEAM_LIB(team)->cfg.nvls_dedicated_barriers) {
// DEDICATED BARRIERS: Separates sync from compute, eliminates contention
status = post_allreduce_kernel_dedicated_barriers(
stream, sm_count, threads, mc_va,
task->allreduce_nvls.buf_size_bytes, TASK_NVLS_CONTROL_MC(task),
TASK_NVLS_CONTROL_UC(task), task->allreduce_nvls.coll_id, trank,
UCC_TL_TEAM_SIZE(team), dt);
if (status != UCC_OK) {
ucc_error(
"failed to post allreduce kernel with dedicated barriers");
task->super.status = status;
return status;
}
} else {
// INLINE BARRIERS: Original approach for compatibility/debugging
status = post_allreduce_kernel(
stream, sm_count, threads, mc_va,
task->allreduce_nvls.buf_size_bytes, TASK_NVLS_CONTROL_MC(task),
TASK_NVLS_CONTROL_UC(task), task->allreduce_nvls.coll_id, trank,
UCC_TL_TEAM_SIZE(team), dt);
if (status != UCC_OK) {
ucc_error("failed to post allreduce kernel with inline barriers");
task->super.status = status;
return status;
}
}
cuda_status = cudaMemcpyAsync(
(void *)task->allreduce_nvls.rbuf, (void *)uc_va,
task->allreduce_nvls.buf_size_bytes, cudaMemcpyDeviceToDevice, stream);
if (cuda_status != cudaSuccess) {
ucc_error("task: %p, cudaMemcpyAsync failed: %s, stream: %p, sbuf: "
"%p, rbuf: %p, uc_va: %p, buf_size_bytes: %zu",
task, cudaGetErrorString(cuda_status), stream,
task->allreduce_nvls.sbuf, task->allreduce_nvls.rbuf,
(void *)uc_va, task->allreduce_nvls.buf_size_bytes);
task->super.status = UCC_ERR_NO_RESOURCE;
return UCC_ERR_NO_RESOURCE;
}
cuda_status = cudaEventRecord(evt, stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventRecord failed: %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return UCC_ERR_NO_RESOURCE;
}
task->allreduce_nvls.stage = STAGE_WAIT;

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

void ucc_tl_cuda_allreduce_nvls_progress(ucc_coll_task_t *coll_task)
{
ucc_tl_cuda_task_t *task = ucc_derived_of(coll_task, ucc_tl_cuda_task_t);
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
ucc_rank_t trank = UCC_TL_TEAM_RANK(team);
ucc_ec_cuda_event_t *ec_event = (ucc_ec_cuda_event_t *)task->allreduce_nvls.evt_completion;
ucc_ec_cuda_event_t *ec_event = (ucc_ec_cuda_event_t *)task->allreduce_nvls.evtCompletion;
cudaEvent_t evt = ec_event->event;
CUdeviceptr mc_va = task->allreduce_nvls.mc_va;
CUdeviceptr uc_va = task->allreduce_nvls.uc_va;
ucc_ee_h ee = task->super.ee;
cudaStream_t stream = (ee) ? (cudaStream_t)ee->ee_context : team->stream;
ucc_datatype_t dt = task->allreduce_nvls.dt;
uint32_t sm_count = UCC_TL_CUDA_TEAM_LIB(team)->cfg.nvls_sm_count;
uint32_t threads = UCC_TL_CUDA_TEAM_LIB(team)->cfg.nvls_threads;

ucc_status_t status;
cudaError_t cuda_status;

switch (task->allreduce_nvls.stage) {
case STAGE_KERNEL:
// copy src buffer to symmetric memory first
cuda_status =
cudaMemcpyAsync((void *)uc_va, task->allreduce_nvls.sbuf,
task->allreduce_nvls.buf_size_bytes,
cudaMemcpyDeviceToDevice, stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaMemcpyAsync failed: %s",
cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_MEMORY; // TODO: better error code?
return;
}

status = post_allreduce_kernel(stream, sm_count, threads, mc_va,
task->allreduce_nvls.buf_size_bytes,
TASK_NVLS_CONTROL_MC(task),
TASK_NVLS_CONTROL_UC(task),
task->allreduce_nvls.coll_id,
trank,
UCC_TL_TEAM_SIZE(team), dt);
if (status != UCC_OK) {
ucc_error("failed to post allreduce kernel");
task->super.status = status;
return;
}
cuda_status = cudaMemcpyAsync((void *)task->allreduce_nvls.rbuf,
(void *)uc_va,
task->allreduce_nvls.buf_size_bytes,
cudaMemcpyDeviceToDevice,
stream);
if (cuda_status != cudaSuccess) {
ucc_error("task: %p, cudaMemcpyAsync failed: %s, stream: %p, sbuf: "
"%p, rbuf: %p, uc_va: %p, buf_size_bytes: %zu",
task, cudaGetErrorString(cuda_status), stream,
task->allreduce_nvls.sbuf, task->allreduce_nvls.rbuf,
(void *)uc_va, task->allreduce_nvls.buf_size_bytes);
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
cuda_status = cudaEventRecord(evt, stream);
if (cuda_status != cudaSuccess) {
ucc_error("cudaEventRecord failed: %s", cudaGetErrorString(cuda_status));
task->super.status = UCC_ERR_NO_RESOURCE;
return;
}
task->allreduce_nvls.stage = STAGE_WAIT;
// fallthrough
case STAGE_WAIT:
cuda_status = cudaEventQuery(evt);
if (cuda_status == cudaErrorNotReady) {
task->super.status = UCC_INPROGRESS;
return;
}
task->super.status = UCC_OK;
break;
cuda_status = cudaEventQuery(evt);
if (cuda_status == cudaErrorNotReady) {
task->super.status = UCC_INPROGRESS;
return;
}
task->super.status = UCC_OK;
return;
}

ucc_status_t ucc_tl_cuda_allreduce_nvls_finalize(ucc_coll_task_t *task)
Expand Down
99 changes: 99 additions & 0 deletions src/components/tl/cuda/kernels/allreduce_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,55 @@ __global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), total_blocks * (launch_counter * 2 + 2));
}

// DEDICATED BARRIER KERNELS: Single-thread kernels for synchronization only

// Pre-barrier: Wait for all GPUs to finish input copy
__global__ void nvls_pre_barrier_kernel(ucc_tl_cuda_nvls_control_t *mc_bar,
ucc_tl_cuda_nvls_control_t *uc_bar,
uint64_t expected_count)
{
// Only thread 0 in block 0 participates - minimal contention (8 threads total across all GPUs)
if (threadIdx.x == 0 && blockIdx.x == 0) {
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), expected_count);
}
}

// Post-barrier: Wait for all GPUs to finish computation
__global__ void nvls_post_barrier_kernel(ucc_tl_cuda_nvls_control_t *mc_bar,
ucc_tl_cuda_nvls_control_t *uc_bar,
uint64_t expected_count)
{
// Only thread 0 in block 0 participates - minimal contention (8 threads total across all GPUs)
if (threadIdx.x == 0 && blockIdx.x == 0) {
nvls_bar(&(mc_bar->arrival_counter), &(uc_bar->arrival_counter), expected_count);
}
}

// PURE COMPUTE KERNEL: No barriers, maximum performance
template <typename NvlsOps>
__global__ void __launch_bounds__(UCC_TL_CUDA_MAX_NVLS_THREADS)
allreduce_kernel_vec32_no_barriers(uint32_t *base_u32, size_t count_u32,
uint32_t rank, uint32_t tsize)
{
// NO BARRIERS - Pure computation only

// Work distribution per rank (same as original)
size_t chunk_start = ((int64_t)count_u32 * (int64_t)rank) / (int64_t)tsize;
size_t chunk_end = ((int64_t)count_u32 * (int64_t)(rank + 1)) / (int64_t)tsize;

size_t thread_offset = (threadIdx.x + blockIdx.x * blockDim.x) * 4;
size_t stride = blockDim.x * gridDim.x * 4;

// Pure NVLS computation - hardware handles atomicity
for (size_t idx = chunk_start + thread_offset; idx < chunk_end; idx += stride) {
uint4 val;
NvlsOps::ld(val, base_u32 + idx); // Hardware-atomic load-reduce
NvlsOps::st(val, base_u32 + idx); // Hardware-atomic store
}

// NO BARRIERS - Finish immediately when computation done
}

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -88,6 +137,56 @@ ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
return UCC_OK;
}

// NEW: Dedicated barrier kernel launcher - separates sync from compute
ucc_status_t post_allreduce_kernel_dedicated_barriers(cudaStream_t stream, uint32_t sm_count,
uint32_t threads, CUdeviceptr mc_base_addr,
size_t src_size_bytes,
CUdeviceptr mc_control_addr,
CUdeviceptr uc_control_addr,
uint64_t launch_counter,
uint32_t rank, uint32_t tsize,
ucc_datatype_t datatype)
{
assert(sm_count > 0 && sm_count <= UCC_TL_CUDA_MAX_NVLS_SM_COUNT);
assert(threads > 0 && threads <= UCC_TL_CUDA_MAX_NVLS_THREADS);

uint32_t *base_u32 = reinterpret_cast<uint32_t *>(mc_base_addr);
size_t count_u32 = src_size_bytes / sizeof(uint32_t);
ucc_tl_cuda_nvls_control_t *mc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(mc_control_addr);
ucc_tl_cuda_nvls_control_t *uc_bar = reinterpret_cast<ucc_tl_cuda_nvls_control_t *>(uc_control_addr);

// Only 8 threads total (1 per GPU) participate in barriers - minimal contention
uint64_t pre_barrier_count = tsize * (launch_counter * 2 + 1);
uint64_t post_barrier_count = tsize * (launch_counter * 2 + 2);

// PHASE 1: Pre-barrier - wait for all GPUs to finish input copy
nvls_pre_barrier_kernel<<<1, 1, 0, stream>>>(mc_bar, uc_bar, pre_barrier_count);
CUDA_CHECK(cudaGetLastError());

// PHASE 2: Pure compute kernel - no barriers, maximum performance
switch (datatype) {
case UCC_DT_FLOAT32:
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
allreduce_kernel_vec32_no_barriers<NvlsFp32Ops><<<sm_count, threads, 0, stream>>>(
base_u32, count_u32, rank, tsize);
break;
case UCC_DT_BFLOAT16:
assert(((uintptr_t)(mc_base_addr) % 8) == 0);
allreduce_kernel_vec32_no_barriers<NvlsBf16Ops><<<sm_count, threads, 0, stream>>>(
base_u32, count_u32, rank, tsize);
break;
default:
return UCC_ERR_NOT_SUPPORTED;
}
CUDA_CHECK(cudaGetLastError());

// PHASE 3: Post-barrier - wait for all GPUs to finish computation
nvls_post_barrier_kernel<<<1, 1, 0, stream>>>(mc_bar, uc_bar, post_barrier_count);
CUDA_CHECK(cudaGetLastError());

return UCC_OK;
}

#ifdef __cplusplus
}
#endif
18 changes: 17 additions & 1 deletion src/components/tl/cuda/kernels/allreduce_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
extern "C" {
#endif

// Kernel function declaration
// Original kernel with internal barriers
ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
uint32_t threads, CUdeviceptr mc_base_addr,
size_t src_size_bytes,
Expand All @@ -24,6 +24,22 @@ ucc_status_t post_allreduce_kernel(cudaStream_t stream, uint32_t sm_count,
uint32_t rank,
uint32_t tsize, ucc_datatype_t datatype);

// NEW: Dedicated barrier kernels approach - separates synchronization from computation
// Architecture: pre_barrier<<<1,1>>> → compute<<<sm_count,threads>>> → post_barrier<<<1,1>>>
// Benefits:
// - Eliminates 32-block barrier contention (8 threads total vs 100s of threads)
// - Enables higher SM counts without synchronization overhead
// - Consistent ~17μs compute latency (pure computation time)
// - Maintains correctness (proper copy→compute→copy synchronization)
ucc_status_t post_allreduce_kernel_dedicated_barriers(cudaStream_t stream, uint32_t sm_count,
uint32_t threads, CUdeviceptr mc_base_addr,
size_t src_size_bytes,
CUdeviceptr mc_control_addr,
CUdeviceptr uc_control_addr,
uint64_t launch_counter,
uint32_t rank, uint32_t tsize,
ucc_datatype_t datatype);

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions src/components/tl/cuda/tl_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ static ucc_config_field_t ucc_tl_cuda_lib_config_table[] = {
"Number of threads per block to use for NVLS",
ucc_offsetof(ucc_tl_cuda_lib_config_t, nvls_threads),
UCC_CONFIG_TYPE_UINT},

{"NVLS_DEDICATED_BARRIERS", "1",
"Use dedicated barrier kernels (1) to eliminate contention or inline barriers (0) for compatibility",
ucc_offsetof(ucc_tl_cuda_lib_config_t, nvls_dedicated_barriers),
UCC_CONFIG_TYPE_BOOL},
#endif

{"ALLTOALL_USE_COPY_ENGINE", "y",
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/cuda/tl_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ typedef struct ucc_tl_cuda_lib_config {
size_t nvls_symmetric_size; // Size of the symmetric memory for NVLS, for each task
uint32_t nvls_sm_count; // Number of blocks (SMs) to use for NVLS algorithms
uint32_t nvls_threads; // Number of threads per block to use for NVLS algorithms
int nvls_dedicated_barriers; // Use dedicated barrier kernels (1) or inline barriers (0)
#endif
int alltoall_use_copy_engine;
} ucc_tl_cuda_lib_config_t;
Expand Down