TL/UCP: add a2a disp exchange in onesided a2av#1248
TL/UCP: add a2a disp exchange in onesided a2av#1248wfaderhold21 wants to merge 3 commits intoopenucx:masterfrom
Conversation
|
| Filename | Overview |
|---|---|
| src/components/tl/ucp/alltoallv/alltoallv_onesided.c | Core change: wraps the onesided alltoallv in a two-task schedule that first exchanges destination displacements via alltoall, then performs the data put. Contains a critical loop-termination bug: when any peer's send count is zero, put_posted never reaches gsize because put_nb is skipped for that peer, causing an infinite spin in the start function. |
| test/mpi/test_alltoallv.cc | Correctly removes the user-side MPI_Alltoall pre-exchange of destination displacements since UCC now performs that exchange internally; no issues found. |
Comments Outside Diff (1)
-
src/components/tl/ucp/alltoallv/alltoallv_onesided.c, line 47-69 (link)Infinite loop when any peer has zero send count
The loop termination condition
task->onesided.put_posted < gsizerelies onput_postedbeing incremented once per peer iteration.put_postedis incremented exclusively insideucc_tl_ucp_put_nb(confirmed attl_ucp_sendrecv.h:616). Because the new code skipsput_nbwhendata_size == 0,put_postednever advances for zero-count peers, and the loop runs forever.Before this PR,
put_nbwas called unconditionally for every peer (even for a zero-byte transfer), soput_postedreliably reachedgsizeand the loop always terminated. The new conditional skip breaks that invariant.To fix, track the loop iteration count separately so the termination condition is independent of how many actual puts were posted:
ucc_rank_t n_done; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); recv_displs = (ucc_aint_t *)sched->scratch_mc_header->addr; for (n_done = 0, peer = (grank + 1) % gsize; n_done < gsize; n_done++, peer = (peer + 1) % gsize) { sd_disp = ucc_coll_args_get_displacement(&TASK_ARGS(task), s_disp, peer) * sdt_size; data_size = ucc_coll_args_get_count(&TASK_ARGS(task), s_counts, peer) * sdt_size; if (data_size > 0) { recv_displ = ucc_coll_args_get_displacement(&TASK_ARGS(task), recv_displs, peer); dd_disp = recv_displ * rdt_size; UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp), PTR_OFFSET(dest, dd_disp), data_size, mtype, peer, src_memh, dst_memh, team, task), task, out); UCPCHECK_GOTO(ucc_tl_ucp_ep_flush(peer, team, task), task, out); } UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, dst_memh, team), task, out); }
With the original
put_posted-based termination the loop is guaranteed to run indefinitely any time the alltoallv has even one zero-length send slot.
Last reviewed commit: 2b55466
|
/build |
| if (UCC_COLL_ARGS_DISPL64(&coll_args->args)) { | ||
| displ_size = sizeof(uint64_t); | ||
| dt = UCC_DT_INT64; | ||
| } else { | ||
| displ_size = sizeof(uint32_t); | ||
| dt = UCC_DT_INT32; | ||
| } |
There was a problem hiding this comment.
assuming that displacement type and receive datatype are the same for all ranks which might not be the case
There was a problem hiding this comment.
just to be clear, it could be possible for multiple ranks to launch the collective with different sized displacements? (e.g., rank 0 uses 64 bit displacements, rank 1 uses 32 bit displacements)?
| memset(&bargs, 0, sizeof(bargs)); | ||
| bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL; | ||
| bargs.args.mask = 0; | ||
| bargs.args.flags = 0; | ||
| bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements; | ||
| bargs.args.src.info.count = gsize; | ||
| bargs.args.src.info.datatype = dt; | ||
| bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; | ||
| bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr; | ||
| bargs.args.dst.info.count = gsize; | ||
| bargs.args.dst.info.datatype = dt; | ||
| bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; |
There was a problem hiding this comment.
minor, for persistent collectives we can do this operation once
|
|
||
| /* exchange displacements with alltoall */ | ||
| memset(&bargs, 0, sizeof(bargs)); | ||
| bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL; |
There was a problem hiding this comment.
is it ok to use twosided alltoall in onesided algorithm?
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | ||
| if (ucc_unlikely(status != UCC_OK)) { | ||
| goto out; | ||
| } | ||
| schedule = &sched->super.super; |
There was a problem hiding this comment.
sched->scratch_mc_header is never initialized to NULL after ucc_tl_ucp_get_schedule, which allocates via ucc_mpool_get (an unzeroed allocator). If ucc_mc_alloc fails at line 175 and goto free_schedule is taken, the check at line 238 (if (sched->scratch_mc_header)) reads an uninitialized garbage pointer, leading to undefined behavior when ucc_mc_free is called on invalid memory.
Fix: Explicitly initialize the field immediately after obtaining the schedule:
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | |
| if (ucc_unlikely(status != UCC_OK)) { | |
| goto out; | |
| } | |
| schedule = &sched->super.super; | |
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | |
| if (ucc_unlikely(status != UCC_OK)) { | |
| goto out; | |
| } | |
| sched->scratch_mc_header = NULL; | |
| schedule = &sched->super.super; |
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | ||
| if (ucc_unlikely(status != UCC_OK)) { | ||
| goto out; | ||
| } | ||
| sched->scratch_mc_header = NULL; | ||
| schedule = &sched->super.super; | ||
| ucc_schedule_init(schedule, coll_args, team); | ||
| schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start; | ||
| schedule->super.progress = NULL; | ||
| schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize; |
There was a problem hiding this comment.
Redundant double call to ucc_schedule_init
ucc_tl_ucp_get_schedule already calls ucc_schedule_init internally as its last step (see tl_ucp_task.h). The second call to ucc_schedule_init(schedule, coll_args, team) at line 172 re-runs ucc_coll_task_init, resets n_tasks = 0, and re-initialises all callback pointers — all of which are immediately overwritten on the next lines anyway. Since the schedule was just freshly allocated from the mpool this is harmless, but it is redundant and may confuse future readers.
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | |
| if (ucc_unlikely(status != UCC_OK)) { | |
| goto out; | |
| } | |
| sched->scratch_mc_header = NULL; | |
| schedule = &sched->super.super; | |
| ucc_schedule_init(schedule, coll_args, team); | |
| schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start; | |
| schedule->super.progress = NULL; | |
| schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize; | |
| status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched); | |
| if (ucc_unlikely(status != UCC_OK)) { | |
| goto out; | |
| } | |
| sched->scratch_mc_header = NULL; | |
| schedule = &sched->super.super; | |
| schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start; | |
| schedule->super.progress = NULL; | |
| schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize; |
3d953bb to
7ac23d8
Compare
| /* exchange displacements with pairwise alltoall */ | ||
| memset(&bargs, 0, sizeof(bargs)); | ||
| bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL; | ||
| bargs.args.mask = 0; | ||
| bargs.args.flags = 0; | ||
| bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements; | ||
| bargs.args.src.info.count = gsize; | ||
| bargs.args.src.info.datatype = dt; | ||
| bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; | ||
| bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr; | ||
| bargs.args.dst.info.count = gsize; | ||
| bargs.args.dst.info.datatype = dt; | ||
| bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; | ||
|
|
||
| bargs.team = team->params.team; | ||
| status = ucc_tl_ucp_alltoall_pairwise_init(&bargs, team, &a2a_task); |
There was a problem hiding this comment.
Alltoall count sends destination displacements to wrong peers
The alltoall exchange is intended to let each rank learn what destination displacement each peer has assigned to it. In MPI terms this is MPI_Alltoall(dst_displs, 1, dt, recv_displs, 1, dt, comm) — each rank sends one element to every other rank. The original test code confirmed this:
MPI_Alltoall(args.dst.info_v.displacements, 1, datatype, ldisp, 1, datatype, team.comm);In UCC alltoall, src.info.count is the total number of elements in the send buffer, and each peer receives count / gsize elements (as seen in alltoall_onesided.c:146-147: nelems = TASK_ARGS(task).src.info.count; nelems = (nelems / gsize) * ...).
With count = gsize, each peer correctly receives gsize / gsize = 1 element, so the exchange logic itself is fine. However, this deserves a comment clarifying that count is the total buffer count (not per-peer), since the UCC alltoall API is easy to misread here:
| /* exchange displacements with pairwise alltoall */ | |
| memset(&bargs, 0, sizeof(bargs)); | |
| bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL; | |
| bargs.args.mask = 0; | |
| bargs.args.flags = 0; | |
| bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements; | |
| bargs.args.src.info.count = gsize; | |
| bargs.args.src.info.datatype = dt; | |
| bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; | |
| bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr; | |
| bargs.args.dst.info.count = gsize; | |
| bargs.args.dst.info.datatype = dt; | |
| bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; | |
| bargs.team = team->params.team; | |
| status = ucc_tl_ucp_alltoall_pairwise_init(&bargs, team, &a2a_task); | |
| bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements; | |
| /* count is total buffer size; each peer receives count/gsize = 1 displacement */ | |
| bargs.args.src.info.count = gsize; | |
| bargs.args.src.info.datatype = dt; | |
| bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST; | |
| bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr; | |
| /* count is total buffer size; each peer receives count/gsize = 1 displacement */ | |
| bargs.args.dst.info.count = gsize; | |
| bargs.args.dst.info.datatype = dt; | |
| bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST; |
| free_tasks: | ||
| if (data_task) { | ||
| ucc_tl_ucp_put_task(data_task); | ||
| } | ||
| if (a2a_task && a2a_task->finalize) { | ||
| a2a_task->finalize(a2a_task); | ||
| } |
There was a problem hiding this comment.
Potential double-free of a2a_task after ucc_schedule_add_task succeeds
If ucc_schedule_add_task(schedule, a2a_task) at line 218 succeeds and the task is enqueued in the schedule, but then a subsequent UCC_CHECK_GOTO fails, execution falls into free_tasks, which calls a2a_task->finalize(a2a_task). Later, free_schedule calls ucc_tl_ucp_put_schedule, which may also clean up the tasks already registered with the schedule.
For comparison, alltoall_onesided.c does not separately finalize tasks in its error path — it relies entirely on ucc_tl_ucp_put_schedule to reclaim all task memory. The same pattern should be used here to avoid a double-free.
If ucc_schedule_add_task is designed such that ownership of the task passes to the schedule, the free_tasks cleanup should be conditioned on whether the task was added to the schedule or not:
free_tasks:
/* Only free tasks that were NOT already added to the schedule */
if (data_task && /* not yet added */ ) {
ucc_tl_ucp_put_task(data_task);
}
if (a2a_task && a2a_task->finalize && /* not yet added */ ) {
a2a_task->finalize(a2a_task);
}
free_schedule:
...
What
Adds an alltoall exchange of destination displacements prior to starting onesided alltoallv
Why ?
Currently using onesided alltoallv requires a user to perform an alltoall operation to exchange the destination displacements prior to issuing the onesided alltoallv. This is inconsistent when switching between two-sided and onesided alltoall algorithms.