Skip to content

gin/efa_gda: implement PutValue via shared sc_endpoint slot pool#5

Open
anshumang wants to merge 7 commits into
amazon-contributing:masterfrom
anshumang:efa_gdaki_putvalue
Open

gin/efa_gda: implement PutValue via shared sc_endpoint slot pool#5
anshumang wants to merge 7 commits into
amazon-contributing:masterfrom
anshumang:efa_gdaki_putvalue

Conversation

@anshumang
Copy link
Copy Markdown
Collaborator

@anshumang anshumang commented May 22, 2026

Description

Replaces the no-op PutValue stub in the EFA GDA backend with a real
implementation.

EFA RDMA_WRITE doesn't support inline data — efa-dp-direct's
wr_set_inline_data is wired only for the SEND opcode — so the 4/8-byte
srcVal must be staged in a registered source buffer before being written
to the peer.

Routing (mirrors Put)

The PutValue WQE rides on the same endpoint that Put would use given the
caller's signal request:

- `signal != NONE`  → `sc_endpoints[signalId]` (via `signal_handles[]`)
- `signal == NONE`  → data endpoint

The arrival of the value RDMA_WRITE at the receiver bumps the
FI_REMOTE_WRITE counter on the chosen sc_endpoint — i.e. value and
signal in a single WQE
. No second WQE, no ATOMIC_FA, no inter-WR
ordering dependency from SRD.

Plumbing

  • Shared source slot pool, sliced per endpoint. The plugin allocates
    one contiguous GPU-VMM region (FI_HMEM_CUDA / FI_MR_DMABUF) sized to
    data.sq_size + Σ sc_endpoints[i].sq_size. Each endpoint owns its
    own slice; the slice base lives on the endpoint's GPU-resident
    nccl_ofi_gin_gdaki_dev_endpoint_handle (next to qp / cq /
    sq_lock / sq_size). Single MR over the whole pool, single lkey.

  • Slot claim and post. PutValue selects the endpoint by signal
    type, acquires that endpoint's sq_lock, runs the SQ-overflow
    backpressure check on that endpoint
    (submitted_count - *local_cntr_value + 1 <= sq_size), claims slot
    (submitted_count % sq_size) inside the endpoint's slice, stores
    srcVal (with an optional __threadfence_system when the
    caller's release scope is narrower than thread_scope_system), and
    posts a single-WR RDMA_WRITE from the slot to
    dstWin->peers[peer].remote_addr + dstOff using the chosen
    endpoint's address materials.

  • Slot reuse safety. A slot can only be reclaimed once that
    endpoint's FI_WRITE counter advances past its WR — guaranteed by
    the SQ-overflow check that's already used for Put on the same
    endpoint.

Signal contract

Mirrors Put's contract on this backend:

- **INDEXED signals only.** `NCCL_GIN_SIGNAL_TYPE_VA` is asserted
  out (would require a dedicated VA-signal posting path that
  doesn't exist on EFA today).
- **Inc-by-1 only.** The `FI_REMOTE_WRITE` counter ticks once per
  inbound write; `signalOpArg > 1` is asserted out (can be relaxed
  to silently accept later if symmetry with `Put` is preferred).
- `hasDescriptor` is not supported.

Related changes

Changes & Impact

  • PutValue is now functional (was a no-op stub). Existing
    ncclGin_C callers that pass
    ncclGin_None{} for signal continue to work; the new
    ncclGin_SignalInc{signalId} form also works.
  • No additional HW counters consumed beyond what Put already
    uses (the dedicated PutValue endpoint that earlier iterations
    carried has been removed).
  • Layout-pinned header. nccl_ofi_gin_gdaki_dev.h adds
    putvalue_slice_base (uint64_t) to
    nccl_ofi_gin_gdaki_dev_endpoint_handle, plus putvalue_lkey
    (uint32_t) and putvalue_slot_size (uint32_t) on
    nccl_ofi_gin_gdaki_dev_handle. The aws-ofi-nccl plugin's matching
    header must update in lockstep — partial deployments will misread
    the device handle.
  • No ABI change to ncclGinApi_*.

Commits

  • f5f70f3 nccl_device/efa_gda: implement PutValue via shared sc_endpoint slot pool
    — the kernel-side implementation.
  • 06c99ff tests/efa_gda: add gin_putvalue_gpu standalone GPU test
    — two-phase functional test covering both no-signal and signal
    routing paths.

Validation

tests/efa_gda/gin_putvalue_gpu covers both routing paths:

  • Phase 1 (no-signal): rank 0 calls
    gin.putValue<int>(team, peer=1, dstWin, dstOff, value, ncclGin_None{}, ncclCoopThread()).
    WQE rides on the data endpoint. Rank 1 polls the destination memory
    directly. PASS.
  • Phase 2 (signal): rank 0 calls the same with
    ncclGin_SignalInc{signalId=0}. WQE rides on signal_handles[0]'s
    sc_endpoint. Rank 1 calls gin.waitSignal(coop, signalId=0, least=1)
    and verifies the destination memory carries the new value.
    PASS.

Vendored efa-dp-direct's CUDA device-function header
(efa_cuda_dp_impl.cuh) declared its functions as `__device__` (sometimes
with `inline`, sometimes not). When this header is included from
multiple translation units — which the GIN EFA-GDA backend does — the
linker either emits multiple-definition errors or silently picks one
copy depending on optimization flags.

Promote every __device__ function in the header to `__device__ static
inline`:

  - `static` forces internal linkage so each TU gets its own private
    copy and the linker never sees colliding global symbols.
  - `inline` keeps the standard "header-only function may be defined
    in multiple TUs" semantics.

Pure annotation change. No body, signature, or call-site changes.
Implement Put against the EFA-GDA data endpoint, using the GPU-resident
FI_WRITE counter the plugin populates on the device handle to back the
SQ-overflow check. Concurrent CTAs serialize their post sequence under
a per-QP spinlock; lanes within a warp targeting the same QP coalesce
into one batched post.

Components introduced:

  - gin_efa_gda_dev.h: mirror of the plugin-side device handle
    layout. Defines:
      * struct nccl_ofi_gin_dev_endpoint_handle { qp, cq, addressing,
        sq_lock, local_cntr_value, submitted_count, sq_size } -
        common per-endpoint state.
      * struct nccl_ofi_gin_gdaki_dev_handle { data, nranks, rank } -
        top-level handle returned via ncclGinCtx::handle.
      * struct nccl_ofi_gin_gdaki_mr_peer { remote_addr, rkey, pad }
        and nccl_ofi_gin_gdaki_mr_handle { lkey, nranks, local_addr,
        peers[] } - absolute-address MR layout. EFA's domain
        advertises FI_MR_VIRT_ADDR, so WQEs take absolute virtual
        addresses for both local and remote buffers; the kernel
        computes them from base + offset using these fields.

    Layout is shared with the plugin's mirror struct in
    include/rdma/gin/nccl_ofi_gin_gdaki_dev.h - keep them in sync.

  - Put template: posts an RDMA write WQE on the data endpoint when
    hasWins && bytes > 0.

      * Per-QP spinlock around start_sq_batch / sq_batch_place_wr /
        flush_sq_wrs - efa-dp-direct's batch sequence is single-
        threaded per QP, so concurrent CTAs targeting the same QP
        must serialize via atomicCAS on sq_lock + atomicExch on
        release.
      * Warp-cooperative aggregated post - threads in the same warp
        targeting the same QP (selected via __match_any_sync) form
        a group. The lowest lane is the leader: it acquires the
        lock, calls start_sq_batch with the group's lane count,
        every lane writes its WR in parallel via sq_batch_place_wr,
        then the leader rings the doorbell once via flush_sq_wrs.
        Threads on different QPs form independent groups.
      * SQ-overflow backpressure - efa_cuda_start_sq_batch is not a
        flow-control gate against the NIC consumer (its internal
        check gates against the per-batch staging buffer max_batch,
        not against the SQ ring). Without an outer spin a fast
        producer would lap the NIC and corrupt the ring. The leader
        spins on (submitted_count - *local_cntr_value + batch_size)
        <= sq_size before reserving SQ slots. The two checks
        compose: outer spin gates the SQ ring; start_sq_batch gates
        the staging buffer.
      * submitted_count bumped under sq_lock after flush_sq_wrs - the
        same lock that guards the post sequence keeps the counter's
        producer side single-writer.

The plugin gates the entire GDAKI plugin on HAVE_DECL_FI_EFA_GDA_OPS
&& HAVE_FI_EFA_COMP_CNTR, so when the kernel runs there is always a
valid local_cntr_value and a non-zero sq_size.
Implement ncclGinApi_Flush by spinning on the data endpoint's
GPU-resident FI_WRITE counter until the NIC has completed every
WR submitted on that QP.

  - Snapshot the target submitted_count under sq_lock. Put bumps
    submitted_count under the same lock after every flush_sq_wrs,
    so reading under the lock ensures the snapshot doesn't tear
    or race with a concurrent Put leader bumping the count.

  - Drop the lock before the spin. The wait depends only on the
    NIC bumping *local_cntr_value, which doesn't need sq_lock;
    holding the lock during the spin would needlessly block
    Put on the same QP.

  - Spin while *local_cntr_value < target, honoring abortFlag.

This relies on the data endpoint's local_cntr_value pointer always
being valid, which the plugin guarantees by gating the entire GDAKI
plugin on HAVE_DECL_FI_EFA_GDA_OPS && HAVE_FI_EFA_COMP_CNTR.
Extend the EFA-GDA backend with the per-request signal/counter
endpoint dispatch and the scratch-buffer signal-only Put path,
plus the Get/Reset Signal/Counter device APIs.

gin_efa_gda_dev.h:

  - struct nccl_ofi_gin_dev_counter_handle composes
    nccl_ofi_gin_dev_endpoint_handle (qp / cq / addressing /
    sq_lock / counter completion fields) and adds cntr_value
    pointing at the FI_WRITE counter (counter view) or
    FI_REMOTE_WRITE counter (signal view) of the same underlying
    sc_endpoint on the plugin side.

  - nccl_ofi_gin_gdaki_dev_handle gains:
      * counter_handles[nCounters], signal_handles[nSignals] -
        arrays of device counter handles. Both arrays index into
        the same underlying sc_endpoint; the array pointer is NULL
        when the corresponding count is zero.
      * scratch_lkey, scratch_local_addr, scratch_remote_addrs[],
        scratch_remote_rkeys[] - per-context signal-only scratch
        buffer set up by createContext (allocated on the proxy
        domain, allgathered per rank).

gin_efa_gda.h:

  - Put: split the post path into two patterns based on
      hasPayload    = hasWins && bytes > 0
      needsSignalEp = (signal.type != NONE) || hasCounter

      (a) Data put (hasPayload): RDMA write of the user payload.
          Routed through the signal/counter endpoint when
          needsSignalEp so the receiver's FI_REMOTE_WRITE on
          that endpoint fires on completion; otherwise routed
          through the data endpoint.
      (b) Signal-only (!hasPayload && needsSignalEp): 0-byte
          RDMA write into the peer's scratch buffer. The write
          event itself bumps the receiver's FI_REMOTE_WRITE
          counter on the signal endpoint.

    Endpoint dispatch picks signal_handles[signalId] when
    signal.type == INDEXED, otherwise counter_handles[counterId].
    Both arrays index into the same sc_endpoint, so the post
    lands on the same QP either way; we just dereference the
    array that matches what the caller asked for, which lets us
    tolerate the unused array being NULL.

    Per-QP spinlock, warp-cooperative aggregated post, and
    SQ-overflow backpressure apply uniformly to data and
    sc_endpoint paths.

  - Flush: drain three classes of endpoints —
      * dev->data
      * dev->signal_handles[0..nSignals)
      * dev->counter_handles[0..nCounters)
    For each, snapshot submitted_count under the endpoint's
    sq_lock (matching the lock Put bumps the counter under),
    then spin on *local_cntr_value until the firmware has caught
    up. The plugin allocates separate submitted_count fields for
    the signal and counter views of an sc_endpoint, so we wait
    on both to drain everything regardless of which view Put
    routed through. Honors abortFlag.

  - GetSignalPtr / GetCounterPtr return the cntr_value pointer
    from the corresponding handle array.

  - ResetSignal (for INDEXED signals only) / ResetCounter zero
    the cntr_value at the corresponding handle.

EFA's FI_REMOTE_WRITE counter increments by exactly 1 per remote
write, matching NCCL_GIN_SIGNAL_OP_INC. ADD with arg > 1 and
VA-typed signals are not supported by this backend; the unused
signalOp / signalOpArg parameters are documented in Put.
@anshumang anshumang force-pushed the efa_gdaki_putvalue branch 2 times, most recently from fa66717 to 3f09fc6 Compare May 28, 2026 01:56
anshumang added 2 commits May 29, 2026 23:42
Replace the no-op PutValue stub for the EFA_GDA backend with a real
implementation that stages the user value through a registered local
slot, then posts an RDMA_WRITE from the slot to the user's
destination MR. EFA RDMA_WRITE does not support inline data
(efa-dp-direct's wr_set_inline_data only supports SEND opcode), so
direct in-WQE delivery is not possible.

Endpoint routing (mirrors Put):
  - signal != NONE  -> sc_endpoints[signalId] (signal_handles[signalId])
  - signal == NONE  -> data endpoint
The PutValue WQE rides on the same QP that would carry a Put with the
same signal request, so the arrival of the value RDMA_WRITE at the
receiver bumps the FI_REMOTE_WRITE counter on the chosen sc_endpoint.
Value-and-signal land in a single WQE; no second WQE is needed and
there is no inter-WR ordering dependency from SRD.

Source slot pool plumbing (paired plugin commit
"gin/gdaki: PutValue source slot pool, shared across endpoints"):
  - dev->putvalue_lkey, putvalue_slot_size identify the registered
    GPU pool (FI_HMEM_CUDA / FI_MR_DMABUF, single MR over the whole
    pool).
  - Each endpoint's slice base lives on its own
    nccl_ofi_gin_gdaki_dev_endpoint_handle, next to qp / cq /
    sq_lock / sq_size. The kernel reads slice_base from the same
    endpoint struct it already selects for QP / sq_lock / etc. —
    no parallel arrays, no separate index. Slice size is implied
    by ep.sq_size; per-call slot byte offset is
        (ep.submitted_count % ep.sq_size) * dev->putvalue_slot_size,
    added to ep.putvalue_slice_base.
  - Per-endpoint slot reuse uses each endpoint's existing
    (submitted_count - *local_cntr_value) backpressure — no shared
    allocator, no global atomic in the hot path.

Kernel sequence (one rank-0 thread does all the work, coop.sync()
brackets):
  - Pick endpoint based on signal.type. Read qp / sq_lock /
    submitted_count_ptr / local_cntr_ptr / sq_size_val / slice_base
    from the chosen endpoint handle.
  - Acquire the chosen endpoint's sq_lock.
  - SQ-overflow backpressure spin on (submitted_count - *local_cntr_value).
  - Claim slot (submitted_count % sq_size_val), store srcVal into
    slice_base + slot_idx * dev->putvalue_slot_size.
  - If the caller's given < required and required == thread_scope_system,
    __threadfence_system().
  - Build a single-WR RDMA_WRITE from the slot to
    dstWin->peers[peer].remote_addr + dstOff with the chosen
    endpoint's address materials, post via the standard
    start/place/flush sequence, bump submitted_count, release the
    sq_lock.

EFA backend signal contract (matches Put):
  - INDEXED signals only (asserted).
  - Inc-by-1 only (FI_REMOTE_WRITE ticks once per inbound write);
    signalOpArg > 1 is asserted out (can be relaxed to silently
    accept later if symmetry with Put is preferred).

Mirrors the dev_handle / dev_endpoint_handle field additions in
aws-ofi-nccl/include/rdma/gin/nccl_ofi_gin_gdaki_dev.h. The two
headers are layout-pinned; if either is updated the other must be
updated in lockstep.
Two-rank functional test for ncclGinApi_PutValue<EFA_GDA>, exercising
both the no-signal and signal paths.

Phase 1 (no-signal):
  rank 0 calls gin.putValue<int>(team, peer=1, dstWin, dstOff=0,
  value, ncclGin_None{}, ncclCoopThread()). The WQE rides on the data
  endpoint. rank 1 polls the destination memory in a kernel and
  reports PASS / TIMEOUT.

Phase 2 (signal):
  rank 0 calls gin.putValue<int>(..., ncclGin_SignalInc{signalId=0},
  ncclCoopThread()). The WQE rides on signal_handles[0]'s sc_endpoint
  so the arrival of the value RDMA_WRITE at the receiver bumps the
  FI_REMOTE_WRITE counter on signalId=0. rank 1 calls
  gin.waitSignal(coop, signalId=0, least=1) to confirm and then reads
  the destination to verify the value also landed.

Validates the device-side PutValue specialization end-to-end:
  - endpoint selection (signal != NONE -> sc_endpoints[signalId]
    via signal_handles[]; signal == NONE -> data endpoint)
  - shared GPU source slot pool sliced per endpoint via
    putvalue_slice_base[] / putvalue_slice_size[]
  - the kernel's lock / backpressure / claim-slot / stage-srcVal /
    build-WQE / post / submitted_count++ / unlock sequence
  - per-peer remote MR rkey/address lookup via dstWin->peers[peer]
  - value-and-signal-in-one-WQE: the receiver's FI_REMOTE_WRITE
    counter ticks on signalId=0 from the value WQE alone, with no
    second WQE.

Build: cd tests/efa_gda && NCCL_INSTALL=<path> ./build.sh
Run:   mpirun -np 2 ./gin_putvalue_gpu  (one rank per node)
       (with the usual EFA + GDAKI env: FI_PROVIDER=efa,
        OFI_NCCL_GIN_GDAKI=1, FI_EFA_USE_HW_CNTR=1, NCCL_GIN_TYPE=4,
        NCCL_SYM_GIN_KERNELS_ENABLE=0, etc.)

Pairs with the NCCL kernel-side commit
"nccl_device/efa_gda: implement PutValue via shared sc_endpoint slot pool"
and the aws-ofi-nccl plugin commit
"gin/gdaki: PutValue source slot pool, shared across endpoints".
@anshumang anshumang force-pushed the efa_gdaki_putvalue branch from 3f09fc6 to 06c99ff Compare May 29, 2026 23:42
@anshumang anshumang changed the title gin/efa_gda: Implement PutValue via dedicated endpoint gin/efa_gda: implement PutValue via shared sc_endpoint slot pool May 30, 2026
/* ── Put ───────────────────────────────────────────────────────────── */

template <>
struct ncclGinApi_Put<NCCL_NET_DEVICE_GIN_EFA_GDA> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ctx sharing mode.
DOCA_GPUNETIO_VERBS_RESOURCE_SHARING_MODE_CTA & DOCA_GPUNETIO_VERBS_RESOURCE_SHARING_MODE_GPU

I dont see that you have any code associated with is, there should be different protection level on each one.


efa_cuda_start_sq_batch(qp, batch_size);
}
__syncwarp(same_qp_mask);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is block level operation, you might need system level operation

* (lowest lane in the group) acquires the lock and calls the
* single-threaded start/flush helpers. */
uint32_t warp_mask = __activemask();
int lane_id = threadIdx.x & 31;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you have function in coop to fetch lane id


/* Warp-cooperative aggregated post.
*
* If multiple threads in this warp are concurrently entering Put
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mental mode is coop not warp. and coop can be anything it can be warp it can be block ...

/* One doorbell ring per group. */
efa_cuda_flush_sq_wrs(qp);
*submitted_count_ptr += (uint64_t)batch_size;
__threadfence();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same this is not always thread sometime is __threadfence_system

bool is_leader = (lane_id == leader_lane);

if (is_leader) {
while (atomicCAS(sq_lock, 0u, 1u) != 0u) { /* spin */ }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a performance killer, in IB they doing atomicAdd(&qp->sq_rsvd_index, 1); on the slot index.

…PutValue

Suppress the SQ doorbell ring when the caller passes
ncclGinOptFlagsAggregateRequests, signalling that more posts to the same
QP are coming and the application wants one MMIO doorbell write per
burst rather than per call. Match the DOCA-GDAKI behavior, which has
honored the flag since its initial Put implementation.

Mechanism:
  - Put::call: gate the tail efa_cuda_flush_sq_wrs(qp) on
    !(optFlags & ncclGinOptFlagsAggregateRequests). submitted_count
    is bumped under the lock regardless so Flush can wait on the
    correct target.
  - PutValue::call: same gating.
  - Flush::call: at the top of wait_for_endpoint, under the endpoint's
    sq_lock, ring any deferred doorbell via efa_cuda_flush_sq_wrs(qp).
    No-op when wqes_pending == 0; required when prior Puts staged WQEs
    with the aggregate flag — without this the spin on local_cntr_value
    would never converge because the NIC was never doorbelled.

Slot indexing:
  efa_cuda_sq_batch_place_wr writes at qp->sq.wq.pc + index_in_batch,
  and qp->sq.wq.pc only advances inside flush_sq_wrs. So when a
  doorbell ring is deferred across calls, multiple batches accumulate
  in the staging buffer — the slot index for each call's WR must be
  the *cumulative* offset (qp->sq.wq.wqes_pending - batch_size), not
  zero. Without this the second deferred call overwrites the slot
  that the first call wrote, and only the last (non-deferred) post
  reaches the destination.

  - Put::call: leader snapshots qp->sq.wq.wqes_pending after
    start_sq_batch (under the same sq_lock that protects the staging
    buffer), broadcasts the value to the warp via __shfl_sync, and
    each lane places its WR at batch_base_idx + my_idx.
  - PutValue::call: single-lane post; uses qp->sq.wq.wqes_pending - 1
    directly.

Bounds:
  efa-dp-direct's start_sq_batch auto-flushes when wqes_pending +
  batch_size would exceed sq.max_batch (16 WQEs on p6-b200), so
  unbounded deferral is bounded by hardware. The optimization saves
  doorbell rings only between auto-flushes — i.e., when a coop-mode
  batch posts fewer than max_batch WQEs cooperatively in one call and
  the caller wants to defer because more posts are coming.

Test:
  tests/efa_gda/gin_putvalue_gpu Phase 3:
    rank 0 issues BURST=8 PutValues at offsets 0..7 with the
    aggregate flag set on the first 7 and clear on the 8th. Each
    carries a unique sentinel (0x44440000+i). Receiver polls for the
    8th sentinel (which forces all 7 deferred WQEs to land first via
    the doorbell on the 8th post) then verifies all 8 sentinels are
    in place at their respective offsets.

Validated end-to-end on AWS p6-b200 (B200, sm_100), 2 nodes, EFA-direct
GDAKI mode (NCCL_GIN_TYPE=4): all three test phases (no-signal,
signal, aggregate-requests) PASS.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants