Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/mpid/ch4/shm/posix/eager/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ noinst_HEADERS += src/mpid/ch4/shm/posix/eager/include/posix_eager.h \

include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/src/Makefile.mk
include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk
include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk
include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/stub/Makefile.mk

endif
3 changes: 3 additions & 0 deletions src/mpid/ch4/shm/posix/eager/include/posix_eager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_buf_limit(void)

#define __posix_eager_inline_stub__ 0
#define __posix_eager_inline_iqueue__ 1
#define __posix_eager_inline_quicq__ 2

#if POSIX_EAGER_INLINE==__posix_eager_inline_stub__
#include "../stub/posix_eager_inline.h"
#elif POSIX_EAGER_INLINE==__posix_eager_inline_iqueue__
#include "../iqueue/posix_eager_inline.h"
#elif POSIX_EAGER_INLINE==__posix_eager_inline_quicq__
#include "../quicq/posix_eager_inline.h"
#else
#error "No direct posix eager included"
#endif
Expand Down
16 changes: 16 additions & 0 deletions src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
##
## Copyright (C) by Argonne National Laboratory
## See COPYRIGHT in top-level directory
##

if BUILD_CH4_SHM_POSIX_EAGER_QUICQ

noinst_HEADERS += src/mpid/ch4/shm/posix/eager/quicq/quicq_send.h \
src/mpid/ch4/shm/posix/eager/quicq/quicq_recv.h \
src/mpid/ch4/shm/posix/eager/quicq/quicq_progress.h \
src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h

mpi_core_sources += src/mpid/ch4/shm/posix/eager/quicq/func_table.c \
src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c

endif
35 changes: 35 additions & 0 deletions src/mpid/ch4/shm/posix/eager/quicq/func_table.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#ifdef POSIX_EAGER_INLINE
/* this file is empty */
#else

#define POSIX_EAGER_DISABLE_INLINES

#include <mpidimpl.h>
#include "posix_eager_inline.h"

MPIDI_POSIX_eager_funcs_t MPIDI_POSIX_eager_quicq_funcs = {
MPIDI_POSIX_quicq_init,
MPIDI_POSIX_quicq_post_init,
MPIDI_POSIX_quicq_finalize,

MPIDI_POSIX_eager_send,

MPIDI_POSIX_eager_recv_begin,
MPIDI_POSIX_eager_recv_memcpy,
MPIDI_POSIX_eager_recv_commit,

MPIDI_POSIX_eager_recv_posted_hook,
MPIDI_POSIX_eager_recv_completed_hook,

MPIDI_POSIX_eager_payload_limit,
MPIDI_POSIX_eager_buf_limit,

MPIDI_POSIX_eager_progress
};

#endif /* POSIX_EAGER_INLINE */
14 changes: 14 additions & 0 deletions src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#ifndef POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED
#define POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED

#include "quicq_noinline.h"
#include "quicq_send.h"
#include "quicq_recv.h"
#include "quicq_progress.h"

#endif /* POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED */
12 changes: 12 additions & 0 deletions src/mpid/ch4/shm/posix/eager/quicq/quicq_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#ifndef POSIX_EAGER_QUICQ_IMPL_H_INCLUDED
#define POSIX_EAGER_QUICQ_IMPL_H_INCLUDED

#include <mpidimpl.h>
#include "mpidu_init_shm.h"

#endif /* POSIX_EAGER_QUICQ_IMPL_H_INCLUDED */
293 changes: 293 additions & 0 deletions src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#include "mpidimpl.h"
#include "quicq_noinline.h"
#include "mpidu_genq.h"

/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===

cvars:
- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS
category : CH4
type : int
default : 8
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
The number of cells used for the depth of the quicq.

- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE
category : CH4
type : int
default : 256
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Size of each cell.

- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_EXTBUFS
category : CH4
type : int
default : 64
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
The number of cells used for the depth of the quicq.

- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE
category : CH4
type : int
default : 16384
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Size of each cell.

- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE
category : CH4
type : int
default : 3
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Size of the array to store expected receives to speed polling.

- name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_BATCH_SIZE
category : CH4
type : int
default : 4
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Number of terminals to poll during one iteration of the progress loop.

=== END_MPI_T_CVAR_INFO_BLOCK ===
*/

#define QUEUE_CELL_BASE(q_base) \
((char *) (q_base) + sizeof(MPIDI_POSIX_eager_quicq_cntr_t))

MPIDI_POSIX_eager_quicq_global_t MPIDI_POSIX_eager_quicq_global;

static int init_transport(int vci_src, int vci_dst)
{
int mpi_errno = MPI_SUCCESS;

MPIDI_POSIX_eager_quicq_transport_t *transport;
transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci_dst);

transport->num_cells_per_queue = MPL_pof2(MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS);
transport->size_of_cell = MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE;

transport->cell_alloc_size = transport->size_of_cell
+ MPL_ROUND_UP_ALIGN(sizeof(MPIDI_POSIX_eager_quicq_cell_t), MPL_CACHELINE_SIZE);
transport->cell_alloc_size = MPL_ROUND_UP_ALIGN(transport->cell_alloc_size, MPL_CACHELINE_SIZE);

/* to and from each other processes */
transport->num_queues = MPIR_Process.local_size - 1;
int total_num_queues = MPIR_Process.local_size * transport->num_queues;

int queue_obj_size = transport->cell_alloc_size * transport->num_cells_per_queue
+ sizeof(MPIDI_POSIX_eager_quicq_cntr_t);
int shm_alloc_size = total_num_queues * queue_obj_size;

mpi_errno = MPIDU_Init_shm_alloc(shm_alloc_size, &transport->shm_base);
MPIR_ERR_CHECK(mpi_errno);

transport->send_terminals = (MPIDI_POSIX_eager_quicq_terminal_t *)
MPL_malloc(MPIR_Process.local_size * sizeof(MPIDI_POSIX_eager_quicq_terminal_t),
MPL_MEM_SHM);
transport->recv_terminals = (MPIDI_POSIX_eager_quicq_terminal_t *)
MPL_malloc(MPIR_Process.local_size * sizeof(MPIDI_POSIX_eager_quicq_terminal_t),
MPL_MEM_SHM);

int my_rank = MPIR_Process.local_rank;
transport->send_terminals[my_rank].cell_base = NULL;
transport->send_terminals[my_rank].cntr = NULL;
transport->send_terminals[my_rank].last_seq = 0;
transport->send_terminals[my_rank].last_ack = 0;
transport->recv_terminals[my_rank].cell_base = NULL;
transport->recv_terminals[my_rank].cntr = NULL;
transport->recv_terminals[my_rank].last_seq = 0;
transport->recv_terminals[my_rank].last_ack = 0;
for (int remote_rank = 0; remote_rank < MPIR_Process.local_size; remote_rank++) {
int q_idx = 0;
if (my_rank < remote_rank) {
q_idx = my_rank * transport->num_queues + remote_rank - 1;
} else if (my_rank == remote_rank) {
continue;
} else {
q_idx = my_rank * transport->num_queues + remote_rank;
}
void *q_base = transport->shm_base + queue_obj_size * q_idx;
memset(q_base, 0, queue_obj_size);
transport->send_terminals[remote_rank].cell_base = QUEUE_CELL_BASE(q_base);
transport->send_terminals[remote_rank].cntr = q_base;
MPL_atomic_store_uint64(&transport->send_terminals[remote_rank].cntr->seq.a, 0);
MPL_atomic_store_uint64(&transport->send_terminals[remote_rank].cntr->ack.a, 0);
transport->send_terminals[remote_rank].last_seq = 0;
transport->send_terminals[remote_rank].last_ack = 0;
}
for (int remote_rank = 0; remote_rank < MPIR_Process.local_size; remote_rank++) {
int q_idx = 0;
if (my_rank < remote_rank) {
q_idx = remote_rank * transport->num_queues + my_rank;
} else if (my_rank == remote_rank) {
continue;
} else {
q_idx = remote_rank * transport->num_queues + my_rank - 1;
}
void *q_base = transport->shm_base + queue_obj_size * q_idx;
/* recv terminal is initialized by the sender as send terminal */
transport->recv_terminals[remote_rank].cell_base = QUEUE_CELL_BASE(q_base);
transport->recv_terminals[remote_rank].cntr = q_base;
transport->recv_terminals[remote_rank].last_seq = 0;
transport->recv_terminals[remote_rank].last_ack = 0;
}

if (MPIR_CVAR_DEBUG_SUMMARY && MPIR_Process.rank == 0) {
fprintf(stdout, "==== QUICQ sizes and limits ====\n");
fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_NUM_CELLS %d\n",
MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS);
fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_CELL_SIZE %d\n",
MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE);
fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_NUM_EXTBUFS %d\n",
MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS);
fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_EXTBUF_SIZE %d\n",
MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE);
fprintf(stdout, "sizeof(MPIDI_POSIX_eager_quicq_cell_t): %lu\n",
sizeof(MPIDI_POSIX_eager_quicq_cell_t));
fprintf(stdout, "cell_alloc_size: %d\n", transport->cell_alloc_size);
fprintf(stdout, "num_cells_per_queue: %d\n", transport->num_cells_per_queue);
fprintf(stdout, "queue_obj_size: %d\n", queue_obj_size);
}

/* create additional SHM region for bigger buffer */
int queue_type = MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL;
mpi_errno = MPIDU_genq_shmem_pool_create(MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE,
MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_EXTBUFS,
MPIR_Process.local_size,
MPIR_Process.local_rank,
1, &queue_type, &transport->extbuf_pool);
MPIR_ERR_CHECK(mpi_errno);

/* create polling cache, allocate one extra element for storing last rank in batch mode */
int cache_alloc_size = (MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE + 1) * sizeof(int16_t);
MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks = MPL_malloc(cache_alloc_size,
MPL_MEM_SHM);
MPIR_Assert(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks);
memset(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks, -1, cache_alloc_size);

fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}

int MPIDI_POSIX_quicq_init(int rank, int size)
{
int mpi_errno = MPI_SUCCESS;

MPIR_FUNC_ENTER;

/* ensure max alignment for payload */
MPIR_Assert((MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE & (MAX_ALIGNMENT - 1)) == 0);
MPIR_Assert((sizeof(MPIDI_POSIX_eager_quicq_cell_t) & (MAX_ALIGNMENT - 1)) == 0);

/* Init vci 0. Communication on vci 0 is enabled afterwards. */
MPIDI_POSIX_eager_quicq_global.max_vcis = 1;

mpi_errno = init_transport(0, 0);
MPIR_ERR_CHECK(mpi_errno);

mpi_errno = MPIDU_Init_shm_barrier();
MPIR_ERR_CHECK(mpi_errno);

fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
goto fn_exit;
}

int MPIDI_POSIX_quicq_post_init(void)
{
int mpi_errno = MPI_SUCCESS;

/* gather max_vcis */
int max_vcis = 0;
max_vcis = 0;
MPIDU_Init_shm_put(&MPIDI_POSIX_global.num_vcis, sizeof(int));
MPIDU_Init_shm_barrier();
for (int i = 0; i < MPIR_Process.local_size; i++) {
int num;
MPIDU_Init_shm_get(i, sizeof(int), &num);
if (max_vcis < num) {
max_vcis = num;
}
}
MPIDU_Init_shm_barrier();

MPIDI_POSIX_eager_quicq_global.max_vcis = max_vcis;

for (int vci_src = 0; vci_src < max_vcis; vci_src++) {
for (int vci_dst = 0; vci_dst < max_vcis; vci_dst++) {
if (vci_src == 0 && vci_dst == 0) {
continue;
}
mpi_errno = init_transport(vci_src, vci_dst);
MPIR_ERR_CHECK(mpi_errno);

}
}

mpi_errno = MPIDU_Init_shm_barrier();
MPIR_ERR_CHECK(mpi_errno);

fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}

int MPIDI_POSIX_quicq_finalize(void)
{
int mpi_errno = MPI_SUCCESS;

MPIR_FUNC_ENTER;

int max_vcis = MPIDI_POSIX_eager_quicq_global.max_vcis;
for (int vci_src = 0; vci_src < max_vcis; vci_src++) {
for (int vci_dst = 0; vci_dst < max_vcis; vci_dst++) {
MPIDI_POSIX_eager_quicq_transport_t *transport;
transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci_dst);

mpi_errno = MPIDU_Init_shm_free(transport->shm_base);
MPIR_ERR_CHECK(mpi_errno);

mpi_errno = MPIDU_genq_shmem_pool_destroy(transport->extbuf_pool);
MPIR_ERR_CHECK(mpi_errno);

MPL_free(transport->send_terminals);
MPL_free(transport->recv_terminals);
}
}

MPL_free(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks);

fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
goto fn_exit;
}
Loading