Skip to content

TL/UCP: add a2a disp exchange in onesided a2av#1248

Open
wfaderhold21 wants to merge 3 commits intoopenucx:masterfrom
wfaderhold21:topic/a2av-fix
Open

TL/UCP: add a2a disp exchange in onesided a2av#1248
wfaderhold21 wants to merge 3 commits intoopenucx:masterfrom
wfaderhold21:topic/a2av-fix

Conversation

@wfaderhold21
Copy link
Copy Markdown
Collaborator

What

Adds an alltoall exchange of destination displacements prior to starting onesided alltoallv

Why ?

Currently using onesided alltoallv requires a user to perform an alltoall operation to exchange the destination displacements prior to issuing the onesided alltoallv. This is inconsistent when switching between two-sided and onesided alltoall algorithms.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Dec 20, 2025

Greptile Summary

This PR removes the burden of a manual pre-exchange from users of the onesided alltoallv algorithm by incorporating a UCC-internal alltoall phase (via ucc_tl_ucp_alltoall_pairwise_init) that exchanges destination displacements before the data-transfer phase. The two phases are chained in a ucc_tl_ucp_schedule_t, with scratch memory allocated for the received displacements and freed in the schedule's custom finalize callback.

Key issues:

  • Critical – infinite loop on zero-count sends (alltoallv_onesided.c:47–69): The for-loop in ucc_tl_ucp_alltoallv_onesided_data_start uses task->onesided.put_posted < gsize as its exit condition. put_posted is incremented only inside ucc_tl_ucp_put_nb. When data_size == 0 the put is skipped, so put_posted never reaches gsize and the function spins forever. The fix is to drive the loop with an independent per-peer counter rather than put_posted.
  • The error-path bookkeeping (a2a_in_sched / data_in_sched flags, fall-through from free_tasks to free_schedule) correctly avoids double-frees.
  • The test change (test_alltoallv.cc) is clean: the now-redundant user-side MPI_Alltoall pre-exchange is removed.

Confidence Score: 1/5

  • Not safe to merge — the data-start function will loop forever whenever any peer has a zero send count.
  • The loop termination condition in ucc_tl_ucp_alltoallv_onesided_data_start is tied exclusively to put_posted, which is only incremented by ucc_tl_ucp_put_nb. Because put_nb is now skipped for zero-count sends, the start function never exits if any rank has even one zero-count entry, making this a reliable hang in any heterogeneous-count workload.
  • src/components/tl/ucp/alltoallv/alltoallv_onesided.c — specifically the loop in ucc_tl_ucp_alltoallv_onesided_data_start

Important Files Changed

Filename Overview
src/components/tl/ucp/alltoallv/alltoallv_onesided.c Core change: wraps the onesided alltoallv in a two-task schedule that first exchanges destination displacements via alltoall, then performs the data put. Contains a critical loop-termination bug: when any peer's send count is zero, put_posted never reaches gsize because put_nb is skipped for that peer, causing an infinite spin in the start function.
test/mpi/test_alltoallv.cc Correctly removes the user-side MPI_Alltoall pre-exchange of destination displacements since UCC now performs that exchange internally; no issues found.

Comments Outside Diff (1)

  1. src/components/tl/ucp/alltoallv/alltoallv_onesided.c, line 47-69 (link)

    Infinite loop when any peer has zero send count

    The loop termination condition task->onesided.put_posted < gsize relies on put_posted being incremented once per peer iteration. put_posted is incremented exclusively inside ucc_tl_ucp_put_nb (confirmed at tl_ucp_sendrecv.h:616). Because the new code skips put_nb when data_size == 0, put_posted never advances for zero-count peers, and the loop runs forever.

    Before this PR, put_nb was called unconditionally for every peer (even for a zero-byte transfer), so put_posted reliably reached gsize and the loop always terminated. The new conditional skip breaks that invariant.

    To fix, track the loop iteration count separately so the termination condition is independent of how many actual puts were posted:

        ucc_rank_t         n_done;
    
        ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
    
        recv_displs = (ucc_aint_t *)sched->scratch_mc_header->addr;
    
        for (n_done = 0, peer = (grank + 1) % gsize;
             n_done < gsize;
             n_done++, peer = (peer + 1) % gsize) {
            sd_disp   = ucc_coll_args_get_displacement(&TASK_ARGS(task), s_disp, peer) * sdt_size;
            data_size = ucc_coll_args_get_count(&TASK_ARGS(task), s_counts, peer) * sdt_size;
    
            if (data_size > 0) {
                recv_displ = ucc_coll_args_get_displacement(&TASK_ARGS(task), recv_displs, peer);
                dd_disp    = recv_displ * rdt_size;
                UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
                                                PTR_OFFSET(dest, dd_disp),
                                                data_size, mtype, peer, src_memh,
                                                dst_memh, team, task),
                              task, out);
                UCPCHECK_GOTO(ucc_tl_ucp_ep_flush(peer, team, task), task, out);
            }
            UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, dst_memh, team),
                          task, out);
        }

    With the original put_posted-based termination the loop is guaranteed to run indefinitely any time the alltoallv has even one zero-length send slot.

Last reviewed commit: 2b55466

@wfaderhold21
Copy link
Copy Markdown
Collaborator Author

/build

Comment on lines +157 to +163
if (UCC_COLL_ARGS_DISPL64(&coll_args->args)) {
displ_size = sizeof(uint64_t);
dt = UCC_DT_INT64;
} else {
displ_size = sizeof(uint32_t);
dt = UCC_DT_INT32;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming that displacement type and receive datatype are the same for all ranks which might not be the case

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be clear, it could be possible for multiple ranks to launch the collective with different sized displacements? (e.g., rank 0 uses 64 bit displacements, rank 1 uses 32 bit displacements)?

Comment on lines +187 to +198
memset(&bargs, 0, sizeof(bargs));
bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL;
bargs.args.mask = 0;
bargs.args.flags = 0;
bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements;
bargs.args.src.info.count = gsize;
bargs.args.src.info.datatype = dt;
bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST;
bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr;
bargs.args.dst.info.count = gsize;
bargs.args.dst.info.datatype = dt;
bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, for persistent collectives we can do this operation once


/* exchange displacements with alltoall */
memset(&bargs, 0, sizeof(bargs));
bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok to use twosided alltoall in onesided algorithm?

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +163 to +167
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
schedule = &sched->super.super;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sched->scratch_mc_header is never initialized to NULL after ucc_tl_ucp_get_schedule, which allocates via ucc_mpool_get (an unzeroed allocator). If ucc_mc_alloc fails at line 175 and goto free_schedule is taken, the check at line 238 (if (sched->scratch_mc_header)) reads an uninitialized garbage pointer, leading to undefined behavior when ucc_mc_free is called on invalid memory.

Fix: Explicitly initialize the field immediately after obtaining the schedule:

Suggested change
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
schedule = &sched->super.super;
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
sched->scratch_mc_header = NULL;
schedule = &sched->super.super;

Comment on lines +166 to +175
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
sched->scratch_mc_header = NULL;
schedule = &sched->super.super;
ucc_schedule_init(schedule, coll_args, team);
schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start;
schedule->super.progress = NULL;
schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant double call to ucc_schedule_init

ucc_tl_ucp_get_schedule already calls ucc_schedule_init internally as its last step (see tl_ucp_task.h). The second call to ucc_schedule_init(schedule, coll_args, team) at line 172 re-runs ucc_coll_task_init, resets n_tasks = 0, and re-initialises all callback pointers — all of which are immediately overwritten on the next lines anyway. Since the schedule was just freshly allocated from the mpool this is harmless, but it is redundant and may confuse future readers.

Suggested change
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
sched->scratch_mc_header = NULL;
schedule = &sched->super.super;
ucc_schedule_init(schedule, coll_args, team);
schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start;
schedule->super.progress = NULL;
schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize;
status = ucc_tl_ucp_get_schedule(tl_team, coll_args, &sched);
if (ucc_unlikely(status != UCC_OK)) {
goto out;
}
sched->scratch_mc_header = NULL;
schedule = &sched->super.super;
schedule->super.post = ucc_tl_ucp_alltoallv_onesided_sched_start;
schedule->super.progress = NULL;
schedule->super.finalize = ucc_tl_ucp_alltoallv_onesided_sched_finalize;

Comment on lines +186 to +201
/* exchange displacements with pairwise alltoall */
memset(&bargs, 0, sizeof(bargs));
bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL;
bargs.args.mask = 0;
bargs.args.flags = 0;
bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements;
bargs.args.src.info.count = gsize;
bargs.args.src.info.datatype = dt;
bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST;
bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr;
bargs.args.dst.info.count = gsize;
bargs.args.dst.info.datatype = dt;
bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST;

bargs.team = team->params.team;
status = ucc_tl_ucp_alltoall_pairwise_init(&bargs, team, &a2a_task);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alltoall count sends destination displacements to wrong peers

The alltoall exchange is intended to let each rank learn what destination displacement each peer has assigned to it. In MPI terms this is MPI_Alltoall(dst_displs, 1, dt, recv_displs, 1, dt, comm) — each rank sends one element to every other rank. The original test code confirmed this:

MPI_Alltoall(args.dst.info_v.displacements, 1, datatype, ldisp, 1, datatype, team.comm);

In UCC alltoall, src.info.count is the total number of elements in the send buffer, and each peer receives count / gsize elements (as seen in alltoall_onesided.c:146-147: nelems = TASK_ARGS(task).src.info.count; nelems = (nelems / gsize) * ...).

With count = gsize, each peer correctly receives gsize / gsize = 1 element, so the exchange logic itself is fine. However, this deserves a comment clarifying that count is the total buffer count (not per-peer), since the UCC alltoall API is easy to misread here:

Suggested change
/* exchange displacements with pairwise alltoall */
memset(&bargs, 0, sizeof(bargs));
bargs.args.coll_type = UCC_COLL_TYPE_ALLTOALL;
bargs.args.mask = 0;
bargs.args.flags = 0;
bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements;
bargs.args.src.info.count = gsize;
bargs.args.src.info.datatype = dt;
bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST;
bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr;
bargs.args.dst.info.count = gsize;
bargs.args.dst.info.datatype = dt;
bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST;
bargs.team = team->params.team;
status = ucc_tl_ucp_alltoall_pairwise_init(&bargs, team, &a2a_task);
bargs.args.src.info.buffer = (void *)coll_args->args.dst.info_v.displacements;
/* count is total buffer size; each peer receives count/gsize = 1 displacement */
bargs.args.src.info.count = gsize;
bargs.args.src.info.datatype = dt;
bargs.args.src.info.mem_type = UCC_MEMORY_TYPE_HOST;
bargs.args.dst.info.buffer = (void *)sched->scratch_mc_header->addr;
/* count is total buffer size; each peer receives count/gsize = 1 displacement */
bargs.args.dst.info.count = gsize;
bargs.args.dst.info.datatype = dt;
bargs.args.dst.info.mem_type = UCC_MEMORY_TYPE_HOST;

Comment on lines +233 to +239
free_tasks:
if (data_task) {
ucc_tl_ucp_put_task(data_task);
}
if (a2a_task && a2a_task->finalize) {
a2a_task->finalize(a2a_task);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential double-free of a2a_task after ucc_schedule_add_task succeeds

If ucc_schedule_add_task(schedule, a2a_task) at line 218 succeeds and the task is enqueued in the schedule, but then a subsequent UCC_CHECK_GOTO fails, execution falls into free_tasks, which calls a2a_task->finalize(a2a_task). Later, free_schedule calls ucc_tl_ucp_put_schedule, which may also clean up the tasks already registered with the schedule.

For comparison, alltoall_onesided.c does not separately finalize tasks in its error path — it relies entirely on ucc_tl_ucp_put_schedule to reclaim all task memory. The same pattern should be used here to avoid a double-free.

If ucc_schedule_add_task is designed such that ownership of the task passes to the schedule, the free_tasks cleanup should be conditioned on whether the task was added to the schedule or not:

free_tasks:
    /* Only free tasks that were NOT already added to the schedule */
    if (data_task && /* not yet added */ ) {
        ucc_tl_ucp_put_task(data_task);
    }
    if (a2a_task && a2a_task->finalize && /* not yet added */ ) {
        a2a_task->finalize(a2a_task);
    }
free_schedule:
    ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants