diff --git a/src/mpid/ch4/shm/posix/eager/Makefile.mk b/src/mpid/ch4/shm/posix/eager/Makefile.mk index 16f89f138c7..2f50fb20917 100644 --- a/src/mpid/ch4/shm/posix/eager/Makefile.mk +++ b/src/mpid/ch4/shm/posix/eager/Makefile.mk @@ -15,6 +15,7 @@ noinst_HEADERS += src/mpid/ch4/shm/posix/eager/include/posix_eager.h \ include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/src/Makefile.mk include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk +include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk include $(top_srcdir)/src/mpid/ch4/shm/posix/eager/stub/Makefile.mk endif diff --git a/src/mpid/ch4/shm/posix/eager/include/posix_eager_impl.h b/src/mpid/ch4/shm/posix/eager/include/posix_eager_impl.h index 49656573a86..5fcf8d8086a 100644 --- a/src/mpid/ch4/shm/posix/eager/include/posix_eager_impl.h +++ b/src/mpid/ch4/shm/posix/eager/include/posix_eager_impl.h @@ -65,11 +65,14 @@ MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_buf_limit(void) #define __posix_eager_inline_stub__ 0 #define __posix_eager_inline_iqueue__ 1 +#define __posix_eager_inline_quicq__ 2 #if POSIX_EAGER_INLINE==__posix_eager_inline_stub__ #include "../stub/posix_eager_inline.h" #elif POSIX_EAGER_INLINE==__posix_eager_inline_iqueue__ #include "../iqueue/posix_eager_inline.h" +#elif POSIX_EAGER_INLINE==__posix_eager_inline_quicq__ +#include "../quicq/posix_eager_inline.h" #else #error "No direct posix eager included" #endif diff --git a/src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk b/src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk new file mode 100644 index 00000000000..f4955b1956b --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/Makefile.mk @@ -0,0 +1,16 @@ +## +## Copyright (C) by Argonne National Laboratory +## See COPYRIGHT in top-level directory +## + +if BUILD_CH4_SHM_POSIX_EAGER_QUICQ + +noinst_HEADERS += src/mpid/ch4/shm/posix/eager/quicq/quicq_send.h \ + src/mpid/ch4/shm/posix/eager/quicq/quicq_recv.h \ + src/mpid/ch4/shm/posix/eager/quicq/quicq_progress.h \ + src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h + +mpi_core_sources += src/mpid/ch4/shm/posix/eager/quicq/func_table.c \ + src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c + +endif diff --git a/src/mpid/ch4/shm/posix/eager/quicq/func_table.c b/src/mpid/ch4/shm/posix/eager/quicq/func_table.c new file mode 100644 index 00000000000..0093fceaf1a --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/func_table.c @@ -0,0 +1,35 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifdef POSIX_EAGER_INLINE +/* this file is empty */ +#else + +#define POSIX_EAGER_DISABLE_INLINES + +#include +#include "posix_eager_inline.h" + +MPIDI_POSIX_eager_funcs_t MPIDI_POSIX_eager_quicq_funcs = { + MPIDI_POSIX_quicq_init, + MPIDI_POSIX_quicq_post_init, + MPIDI_POSIX_quicq_finalize, + + MPIDI_POSIX_eager_send, + + MPIDI_POSIX_eager_recv_begin, + MPIDI_POSIX_eager_recv_memcpy, + MPIDI_POSIX_eager_recv_commit, + + MPIDI_POSIX_eager_recv_posted_hook, + MPIDI_POSIX_eager_recv_completed_hook, + + MPIDI_POSIX_eager_payload_limit, + MPIDI_POSIX_eager_buf_limit, + + MPIDI_POSIX_eager_progress +}; + +#endif /* POSIX_EAGER_INLINE */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h b/src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h new file mode 100644 index 00000000000..55f6fda98ab --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/posix_eager_inline.h @@ -0,0 +1,14 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED +#define POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED + +#include "quicq_noinline.h" +#include "quicq_send.h" +#include "quicq_recv.h" +#include "quicq_progress.h" + +#endif /* POSIX_EAGER_QUICQ_DIRECT_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_impl.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_impl.h new file mode 100644 index 00000000000..d973988f4a0 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_impl.h @@ -0,0 +1,12 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_IMPL_H_INCLUDED +#define POSIX_EAGER_QUICQ_IMPL_H_INCLUDED + +#include +#include "mpidu_init_shm.h" + +#endif /* POSIX_EAGER_QUICQ_IMPL_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c b/src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c new file mode 100644 index 00000000000..cb0a1d47498 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_init.c @@ -0,0 +1,293 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include "mpidimpl.h" +#include "quicq_noinline.h" +#include "mpidu_genq.h" + +/* +=== BEGIN_MPI_T_CVAR_INFO_BLOCK === + +cvars: + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS + category : CH4 + type : int + default : 8 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + The number of cells used for the depth of the quicq. + + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE + category : CH4 + type : int + default : 256 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Size of each cell. + + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_EXTBUFS + category : CH4 + type : int + default : 64 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + The number of cells used for the depth of the quicq. + + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE + category : CH4 + type : int + default : 16384 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Size of each cell. + + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE + category : CH4 + type : int + default : 3 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Size of the array to store expected receives to speed polling. + + - name : MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_BATCH_SIZE + category : CH4 + type : int + default : 4 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Number of terminals to poll during one iteration of the progress loop. + +=== END_MPI_T_CVAR_INFO_BLOCK === +*/ + +#define QUEUE_CELL_BASE(q_base) \ + ((char *) (q_base) + sizeof(MPIDI_POSIX_eager_quicq_cntr_t)) + +MPIDI_POSIX_eager_quicq_global_t MPIDI_POSIX_eager_quicq_global; + +static int init_transport(int vci_src, int vci_dst) +{ + int mpi_errno = MPI_SUCCESS; + + MPIDI_POSIX_eager_quicq_transport_t *transport; + transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci_dst); + + transport->num_cells_per_queue = MPL_pof2(MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS); + transport->size_of_cell = MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE; + + transport->cell_alloc_size = transport->size_of_cell + + MPL_ROUND_UP_ALIGN(sizeof(MPIDI_POSIX_eager_quicq_cell_t), MPL_CACHELINE_SIZE); + transport->cell_alloc_size = MPL_ROUND_UP_ALIGN(transport->cell_alloc_size, MPL_CACHELINE_SIZE); + + /* to and from each other processes */ + transport->num_queues = MPIR_Process.local_size - 1; + int total_num_queues = MPIR_Process.local_size * transport->num_queues; + + int queue_obj_size = transport->cell_alloc_size * transport->num_cells_per_queue + + sizeof(MPIDI_POSIX_eager_quicq_cntr_t); + int shm_alloc_size = total_num_queues * queue_obj_size; + + mpi_errno = MPIDU_Init_shm_alloc(shm_alloc_size, &transport->shm_base); + MPIR_ERR_CHECK(mpi_errno); + + transport->send_terminals = (MPIDI_POSIX_eager_quicq_terminal_t *) + MPL_malloc(MPIR_Process.local_size * sizeof(MPIDI_POSIX_eager_quicq_terminal_t), + MPL_MEM_SHM); + transport->recv_terminals = (MPIDI_POSIX_eager_quicq_terminal_t *) + MPL_malloc(MPIR_Process.local_size * sizeof(MPIDI_POSIX_eager_quicq_terminal_t), + MPL_MEM_SHM); + + int my_rank = MPIR_Process.local_rank; + transport->send_terminals[my_rank].cell_base = NULL; + transport->send_terminals[my_rank].cntr = NULL; + transport->send_terminals[my_rank].last_seq = 0; + transport->send_terminals[my_rank].last_ack = 0; + transport->recv_terminals[my_rank].cell_base = NULL; + transport->recv_terminals[my_rank].cntr = NULL; + transport->recv_terminals[my_rank].last_seq = 0; + transport->recv_terminals[my_rank].last_ack = 0; + for (int remote_rank = 0; remote_rank < MPIR_Process.local_size; remote_rank++) { + int q_idx = 0; + if (my_rank < remote_rank) { + q_idx = my_rank * transport->num_queues + remote_rank - 1; + } else if (my_rank == remote_rank) { + continue; + } else { + q_idx = my_rank * transport->num_queues + remote_rank; + } + void *q_base = transport->shm_base + queue_obj_size * q_idx; + memset(q_base, 0, queue_obj_size); + transport->send_terminals[remote_rank].cell_base = QUEUE_CELL_BASE(q_base); + transport->send_terminals[remote_rank].cntr = q_base; + MPL_atomic_store_uint64(&transport->send_terminals[remote_rank].cntr->seq.a, 0); + MPL_atomic_store_uint64(&transport->send_terminals[remote_rank].cntr->ack.a, 0); + transport->send_terminals[remote_rank].last_seq = 0; + transport->send_terminals[remote_rank].last_ack = 0; + } + for (int remote_rank = 0; remote_rank < MPIR_Process.local_size; remote_rank++) { + int q_idx = 0; + if (my_rank < remote_rank) { + q_idx = remote_rank * transport->num_queues + my_rank; + } else if (my_rank == remote_rank) { + continue; + } else { + q_idx = remote_rank * transport->num_queues + my_rank - 1; + } + void *q_base = transport->shm_base + queue_obj_size * q_idx; + /* recv terminal is initialized by the sender as send terminal */ + transport->recv_terminals[remote_rank].cell_base = QUEUE_CELL_BASE(q_base); + transport->recv_terminals[remote_rank].cntr = q_base; + transport->recv_terminals[remote_rank].last_seq = 0; + transport->recv_terminals[remote_rank].last_ack = 0; + } + + if (MPIR_CVAR_DEBUG_SUMMARY && MPIR_Process.rank == 0) { + fprintf(stdout, "==== QUICQ sizes and limits ====\n"); + fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_NUM_CELLS %d\n", + MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS); + fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_CELL_SIZE %d\n", + MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE); + fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_NUM_EXTBUFS %d\n", + MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS); + fprintf(stdout, "MPIR_CVAR_CH4_POSIX_QUICQ_EXTBUF_SIZE %d\n", + MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE); + fprintf(stdout, "sizeof(MPIDI_POSIX_eager_quicq_cell_t): %lu\n", + sizeof(MPIDI_POSIX_eager_quicq_cell_t)); + fprintf(stdout, "cell_alloc_size: %d\n", transport->cell_alloc_size); + fprintf(stdout, "num_cells_per_queue: %d\n", transport->num_cells_per_queue); + fprintf(stdout, "queue_obj_size: %d\n", queue_obj_size); + } + + /* create additional SHM region for bigger buffer */ + int queue_type = MPIDU_GENQ_SHMEM_QUEUE_TYPE__SERIAL; + mpi_errno = MPIDU_genq_shmem_pool_create(MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE, + MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_EXTBUFS, + MPIR_Process.local_size, + MPIR_Process.local_rank, + 1, &queue_type, &transport->extbuf_pool); + MPIR_ERR_CHECK(mpi_errno); + + /* create polling cache, allocate one extra element for storing last rank in batch mode */ + int cache_alloc_size = (MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE + 1) * sizeof(int16_t); + MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks = MPL_malloc(cache_alloc_size, + MPL_MEM_SHM); + MPIR_Assert(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks); + memset(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks, -1, cache_alloc_size); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +int MPIDI_POSIX_quicq_init(int rank, int size) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_FUNC_ENTER; + + /* ensure max alignment for payload */ + MPIR_Assert((MPIR_CVAR_CH4_SHM_POSIX_QUICQ_CELL_SIZE & (MAX_ALIGNMENT - 1)) == 0); + MPIR_Assert((sizeof(MPIDI_POSIX_eager_quicq_cell_t) & (MAX_ALIGNMENT - 1)) == 0); + + /* Init vci 0. Communication on vci 0 is enabled afterwards. */ + MPIDI_POSIX_eager_quicq_global.max_vcis = 1; + + mpi_errno = init_transport(0, 0); + MPIR_ERR_CHECK(mpi_errno); + + mpi_errno = MPIDU_Init_shm_barrier(); + MPIR_ERR_CHECK(mpi_errno); + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} + +int MPIDI_POSIX_quicq_post_init(void) +{ + int mpi_errno = MPI_SUCCESS; + + /* gather max_vcis */ + int max_vcis = 0; + max_vcis = 0; + MPIDU_Init_shm_put(&MPIDI_POSIX_global.num_vcis, sizeof(int)); + MPIDU_Init_shm_barrier(); + for (int i = 0; i < MPIR_Process.local_size; i++) { + int num; + MPIDU_Init_shm_get(i, sizeof(int), &num); + if (max_vcis < num) { + max_vcis = num; + } + } + MPIDU_Init_shm_barrier(); + + MPIDI_POSIX_eager_quicq_global.max_vcis = max_vcis; + + for (int vci_src = 0; vci_src < max_vcis; vci_src++) { + for (int vci_dst = 0; vci_dst < max_vcis; vci_dst++) { + if (vci_src == 0 && vci_dst == 0) { + continue; + } + mpi_errno = init_transport(vci_src, vci_dst); + MPIR_ERR_CHECK(mpi_errno); + + } + } + + mpi_errno = MPIDU_Init_shm_barrier(); + MPIR_ERR_CHECK(mpi_errno); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} + +int MPIDI_POSIX_quicq_finalize(void) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_FUNC_ENTER; + + int max_vcis = MPIDI_POSIX_eager_quicq_global.max_vcis; + for (int vci_src = 0; vci_src < max_vcis; vci_src++) { + for (int vci_dst = 0; vci_dst < max_vcis; vci_dst++) { + MPIDI_POSIX_eager_quicq_transport_t *transport; + transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci_dst); + + mpi_errno = MPIDU_Init_shm_free(transport->shm_base); + MPIR_ERR_CHECK(mpi_errno); + + mpi_errno = MPIDU_genq_shmem_pool_destroy(transport->extbuf_pool); + MPIR_ERR_CHECK(mpi_errno); + + MPL_free(transport->send_terminals); + MPL_free(transport->recv_terminals); + } + } + + MPL_free(MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks); + + fn_exit: + MPIR_FUNC_EXIT; + return mpi_errno; + fn_fail: + goto fn_exit; +} diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_noinline.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_noinline.h new file mode 100644 index 00000000000..89c9c2b1cbe --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_noinline.h @@ -0,0 +1,22 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_NOINLINE_H_INCLUDED +#define POSIX_EAGER_QUICQ_NOINLINE_H_INCLUDED + +#include "quicq_types.h" +#include "quicq_impl.h" + +int MPIDI_POSIX_quicq_init(int rank, int size); +int MPIDI_POSIX_quicq_post_init(void); +int MPIDI_POSIX_quicq_finalize(void); + +#ifdef POSIX_EAGER_INLINE +#define MPIDI_POSIX_eager_init MPIDI_POSIX_quicq_init +#define MPIDI_POSIX_eager_post_init MPIDI_POSIX_quicq_post_init +#define MPIDI_POSIX_eager_finalize MPIDI_POSIX_quicq_finalize +#endif + +#endif /* POSIX_EAGER_QUICQ_NOINLINE_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_pre.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_pre.h new file mode 100644 index 00000000000..b961f907913 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_pre.h @@ -0,0 +1,13 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_PRE_H_INCLUDED +#define POSIX_EAGER_QUICQ_PRE_H_INCLUDED + +typedef struct MPIDI_POSIX_eager_quicq_recv_transaction { + void *pointer_to_cell; +} MPIDI_POSIX_eager_quicq_recv_transaction_t; + +#endif /* POSIX_EAGER_QUICQ_PRE_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_progress.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_progress.h new file mode 100644 index 00000000000..6b08942d1df --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_progress.h @@ -0,0 +1,100 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_PROGRESS_H_INCLUDED +#define POSIX_EAGER_QUICQ_PROGRESS_H_INCLUDED + +#include "quicq_impl.h" + +MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_eager_progress(int vci, int *made_progress) +{ + int ret = MPI_SUCCESS; + MPIDI_POSIX_eager_quicq_transport_t *transport; + MPIDI_POSIX_eager_quicq_terminal_t *terminal; + + *made_progress = 0; + int max_vcis = MPIDI_POSIX_eager_quicq_global.max_vcis; + for (int vci_src = 0; vci_src < max_vcis; vci_src++) { + transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci); + + // for (int src_local_rank = 0; src_local_rank < MPIR_Process.local_size; + // src_local_rank++) { + // if (src_local_rank == MPIR_Process.local_rank) { + // continue; + // } + // terminal = &transport->recv_terminals[src_local_rank]; + // if (terminal->last_seq == terminal->last_ack) { + // uint64_t new_seq = MPL_atomic_acquire_load_uint64(&terminal->cntr->seq.a); + // if (new_seq != terminal->last_ack) { + // terminal->last_seq = new_seq; + // *made_progress = 1; + // } + // } + // } + + int poll_cache_size = MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE; + int poll_batch_size = MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_BATCH_SIZE; + int last_pos = poll_cache_size + 1; + int src_local_rank = 0; + int16_t *poll_cache = MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks; + for (int count = 0; count < poll_cache_size + poll_batch_size; count++) { + if (count < poll_cache_size) { + /* ranks with posted recv */ + src_local_rank = poll_cache[count]; + } else { + /* batch polling for uncached ranks, using the extra element at the end of + * the polling cache to store last checked rank in batch mode */ + if (count == poll_cache_size) { + poll_cache[last_pos] = poll_cache[poll_cache_size]; + } + int16_t last_cache = poll_cache[last_pos]; + last_cache = (last_cache + 1) % (int16_t) MPIR_Process.local_size; + src_local_rank = last_cache; + poll_cache[last_pos] = last_cache; + } + + if (src_local_rank == -1 || src_local_rank == MPIR_Process.local_rank) { + continue; + } + + terminal = &transport->recv_terminals[src_local_rank]; + uint64_t new_seq = MPL_atomic_acquire_load_uint64(&terminal->cntr->seq.a); + if (new_seq != terminal->last_seq) { + terminal->last_seq = new_seq; + *made_progress = 1; + } + } + + for (int dst_local_rank = 0; dst_local_rank < MPIR_Process.local_size; + dst_local_rank++) { + if (dst_local_rank == MPIR_Process.local_rank) { + continue; + } + terminal = &transport->send_terminals[dst_local_rank]; + if (terminal->last_ack < terminal->last_seq) { + /* TODO: what if recv completes out of order */ + uint64_t new_ack = MPL_atomic_acquire_load_uint64(&terminal->cntr->ack.a); + if (new_ack != terminal->last_ack) { + MPIDI_POSIX_eager_quicq_cell_t *cell = NULL; + for (int i = terminal->last_ack; i < new_ack; i++) { + int cell_idx = MPIDI_POSIX_EAGER_QUICQ_CNTR_TO_IDX(i); + cell = terminal->cell_base + cell_idx * transport->cell_alloc_size; + if (cell->type & MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_EXTBUF) { + char *payload = MPIDI_POSIX_EAGER_QUICQ_CELL_PAYLOAD(cell); + void *extbuf = ((MPIDI_POSIX_eager_quicq_extbuf_hdr *) payload)->buf; + MPIDU_genq_shmem_pool_cell_free(transport->extbuf_pool, extbuf); + } + } + terminal->last_ack = new_ack; + *made_progress = 1; + } + } + } + } + + return ret; +} + +#endif /* POSIX_EAGER_QUICQ_PROGRESS_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_recv.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_recv.h new file mode 100644 index 00000000000..26b41d18c6b --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_recv.h @@ -0,0 +1,149 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_RECV_H_INCLUDED +#define POSIX_EAGER_QUICQ_RECV_H_INCLUDED + +#include "quicq_impl.h" +#include "mpidu_genq.h" + +MPL_STATIC_INLINE_PREFIX int +MPIDI_POSIX_eager_recv_begin(int vci, MPIDI_POSIX_eager_recv_transaction_t * transaction) +{ + MPIDI_POSIX_eager_quicq_transport_t *transport; + MPIDI_POSIX_eager_quicq_terminal_t *terminal; + MPIDI_POSIX_eager_quicq_cell_t *cell = NULL; + int ret = MPIDI_POSIX_NOK; + + MPIR_FUNC_ENTER; + + /* TODO: measure the latency overhead due to multiple vci */ + int max_vcis = MPIDI_POSIX_eager_quicq_global.max_vcis; + for (int vci_src = 0; vci_src < max_vcis; vci_src++) { + transport = MPIDI_POSIX_eager_quicq_get_transport(vci_src, vci); + + for (int src_local_rank = 0; src_local_rank < MPIR_Process.local_size; + src_local_rank++) { + if (src_local_rank == MPIR_Process.local_rank) { + continue; + } + terminal = &transport->recv_terminals[src_local_rank]; + if (terminal->last_ack == terminal->last_seq) { + continue; + } + + int cell_idx = MPIDI_POSIX_EAGER_QUICQ_CNTR_TO_IDX(terminal->last_ack); + cell = terminal->cell_base + cell_idx * transport->cell_alloc_size; + + transaction->src_local_rank = cell->from; + transaction->src_vci = vci_src; + transaction->dst_vci = vci; + transaction->payload_sz = cell->payload_size; + + if (likely(cell->type & MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_HDR)) { + transaction->msg_hdr = &cell->am_header; + } else { + MPIR_Assert(cell->type & MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_DATA); + transaction->msg_hdr = NULL; + } + + if (cell->type & MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_EXTBUF) { + /* switch the cell to the actual buffer */ + char *extbuf_hdr = MPIDI_POSIX_EAGER_QUICQ_CELL_PAYLOAD(cell); + uint64_t handle = ((MPIDI_POSIX_eager_quicq_extbuf_hdr *) extbuf_hdr)->handle; + char *extbuf = MPIDU_genq_shmem_pool_handle_to_cell(transport->extbuf_pool, handle); + MPIR_Assert(extbuf != NULL); + transaction->payload = extbuf; + } else { + transaction->payload = MPIDI_POSIX_EAGER_QUICQ_CELL_PAYLOAD(cell); + } + + transaction->transport.quicq.pointer_to_cell = cell; + + ret = MPIDI_POSIX_OK; + break; + } + } + + MPIR_FUNC_EXIT; + return ret; +} + +MPL_STATIC_INLINE_PREFIX void +MPIDI_POSIX_eager_recv_memcpy(MPIDI_POSIX_eager_recv_transaction_t * transaction, + void *dst, const void *src, size_t size) +{ + MPIR_Typerep_copy(dst, src, size, MPIR_TYPEREP_FLAG_NONE); +} + +MPL_STATIC_INLINE_PREFIX void +MPIDI_POSIX_eager_recv_commit(MPIDI_POSIX_eager_recv_transaction_t * transaction) +{ + MPIDI_POSIX_eager_quicq_transport_t *transport; + MPIDI_POSIX_eager_quicq_terminal_t *terminal; + + MPIR_FUNC_ENTER; + + transport = MPIDI_POSIX_eager_quicq_get_transport(transaction->src_vci, transaction->dst_vci); + terminal = &transport->recv_terminals[transaction->src_local_rank]; + terminal->last_ack++; + MPL_atomic_release_store_uint64(&terminal->cntr->ack.a, terminal->last_ack); + + MPIR_FUNC_EXIT; +} + +MPL_STATIC_INLINE_PREFIX void MPIDI_POSIX_eager_recv_posted_hook(int grank) +{ + int local_rank, i; + + MPIR_FUNC_ENTER; + + if (grank >= 0) { + local_rank = MPIDI_POSIX_global.local_ranks[grank]; + + /* Put the posted receive in the list of fastboxes to be polled first. If the list is full, + * it will get polled after the boxes in the list are polled, which will be slower, but will + * still match the message. */ + for (i = 0; i < MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE; i++) { + if (MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks[i] == -1) { + MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks[i] = local_rank; + break; + } else if (MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks[i] == + local_rank) { + break; + } else { + continue; + } + } + } + + MPIR_FUNC_EXIT; +} + +MPL_STATIC_INLINE_PREFIX void MPIDI_POSIX_eager_recv_completed_hook(int grank) +{ + int i, local_rank; + + MPIR_FUNC_ENTER; + + if (grank >= 0) { + local_rank = MPIDI_POSIX_global.local_ranks[grank]; + + /* Remove the posted receive from the list of fastboxes to be polled first now that the + * request is done. */ + for (i = 0; i < MPIR_CVAR_CH4_SHM_POSIX_QUICQ_POLL_CACHE_SIZE; i++) { + if (MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks[i] == local_rank) { + MPIDI_POSIX_eager_quicq_global.first_poll_local_ranks[i] = -1; + break; + } else { + continue; + } + } + } + + MPIR_FUNC_EXIT; +} + +#endif /* POSIX_EAGER_QUICQ_RECV_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_send.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_send.h new file mode 100644 index 00000000000..d207b4465a4 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_send.h @@ -0,0 +1,154 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_SEND_H_INCLUDED +#define POSIX_EAGER_QUICQ_SEND_H_INCLUDED + +#include "quicq_impl.h" +#include "mpidu_genq.h" + +MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_payload_limit(void) +{ + return MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE; +} + +MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_buf_limit(void) +{ + return MPIR_CVAR_CH4_SHM_POSIX_QUICQ_EXTBUF_SIZE; +} + +/* This function attempts to send the next chunk of a message via the queue. If no cells are + * available, this function will return and the caller is expected to queue the message for later + * and retry. + * + * grank - The global rank (the rank in MPI_COMM_WORLD) of the receiving process. + * msg_hdr - The header of the message to be sent. This can be NULL if there is no header to be sent + * (such as if the header was sent in a previous chunk, am_hdr will be NULL too in this + * case. + * am_hdr, am_hdr_sz - am header this could be NULL if not sending the first chunk + * buf, count, datatype - Data buffer and signature for the send buffer. They could be NULL in the + * case of a header-only message + * offset - current offset. + * bytes_sent - output variable for how much data actually been sent, pass NULL if no data + * need to be send + */ +MPL_STATIC_INLINE_PREFIX int +MPIDI_POSIX_eager_send(int grank, MPIDI_POSIX_am_header_t * msg_hdr, const void *am_hdr, + MPI_Aint am_hdr_sz, const void *buf, MPI_Aint count, MPI_Datatype datatype, + MPI_Aint offset, int src_vci, int dst_vci, MPI_Aint * bytes_sent) +{ + MPIDI_POSIX_eager_quicq_transport_t *transport; + MPIDI_POSIX_eager_quicq_cell_t *cell; + MPIDI_POSIX_eager_quicq_terminal_t *terminal; + size_t capacity = 0, available = 0; + char *payload = NULL; + int ret = MPIDI_POSIX_OK; + MPI_Aint packed_size = 0; + + MPIR_FUNC_ENTER; + + /* Get the transport object that holds all of the global variables. */ + transport = MPIDI_POSIX_eager_quicq_get_transport(src_vci, dst_vci); + + /* Try to get a new cell to hold the message */ + /* Select the appropriate free queue depending on whether we are using intra-NUMA or inter-NUMAfree + * free queue. */ + int dst_local_rank = MPIDI_POSIX_global.local_ranks[grank]; + bool is_topo_local = + (MPIDI_POSIX_global.local_rank_dist[dst_local_rank] == MPIDI_POSIX_DIST__LOCAL); + + /* Find the correct queue for this rank pair. */ + terminal = &transport->send_terminals[dst_local_rank]; + + /* If a cell wasn't available, let the caller know that we weren't able to send the message + * immediately. */ + if (unlikely(terminal->last_seq - terminal->last_ack >= transport->num_cells_per_queue)) { + ret = MPIDI_POSIX_NOK; + goto fn_exit; + } + + int cell_idx = MPIDI_POSIX_EAGER_QUICQ_CNTR_TO_IDX(terminal->last_seq); + cell = terminal->cell_base + cell_idx * transport->cell_alloc_size; + + /* Get the memory allocated to be used for the message transportation. */ + payload = MPIDI_POSIX_EAGER_QUICQ_CELL_PAYLOAD(cell); + + /* Figure out the capacity of each cell */ + capacity = MPIDI_POSIX_EAGER_QUICQ_CELL_CAPACITY(transport); + + available = capacity; + + cell->from = MPIR_Process.local_rank; + cell->type = 0; + cell->payload_size = 0; + + /* If this is the beginning of the message, mark it as the head. Otherwise it will be the + * tail. */ + char *data_buf = NULL; + MPI_Aint data_sz = 0; + MPIDI_Datatype_check_size(datatype, count, data_sz); + data_sz -= offset; + if (am_hdr_sz > capacity || data_sz > (capacity - am_hdr_sz)) { + MPIDU_genq_shmem_pool_cell_alloc(transport->extbuf_pool, (void **) &data_buf, + MPIR_Process.local_rank, 0 /* intra NUMA */ , buf); + MPIR_Assert(data_buf != NULL); /* debug */ + // if (unlikely(data_buf == NULL)) { + // ret = MPIDI_POSIX_NOK; + // goto fn_exit; + // } + + available = MPIDI_POSIX_eager_payload_limit(); + cell->type |= MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_EXTBUF; + + uint64_t handle = MPIDU_genq_shmem_pool_cell_to_handle(transport->extbuf_pool, data_buf); + ((MPIDI_POSIX_eager_quicq_extbuf_hdr *) payload)->handle = handle; + ((MPIDI_POSIX_eager_quicq_extbuf_hdr *) payload)->buf = data_buf; + } else { + data_buf = payload; + MPIR_Assert(!(cell->type & MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_EXTBUF)); + } + + if (am_hdr) { + cell->am_header = *msg_hdr; + cell->type |= MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_HDR; + if (is_topo_local) { + MPIR_Typerep_copy(data_buf, am_hdr, am_hdr_sz, MPIR_TYPEREP_FLAG_NONE); + } else { + MPIR_Typerep_copy(data_buf, am_hdr, am_hdr_sz, MPIR_TYPEREP_FLAG_STREAM); + } + cell->payload_size += am_hdr_sz; + cell->am_header.am_hdr_sz = am_hdr_sz; + available -= am_hdr_sz; + data_buf += am_hdr_sz; + } else { + cell->type |= MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_DATA; + } + + /* We want to skip packing of send buffer if there is no data to be sent . buf == NULL is + * not a correct check here because derived datatype can use absolute address for displacement + * which requires buffer address passed as MPI_BOTTOM which is usually NULL. count == 0 is also + * not reliable because the derived datatype could have zero block size which contains no + * data. */ + if (bytes_sent) { + if (is_topo_local) { + MPIR_Typerep_pack(buf, count, datatype, offset, data_buf, available, &packed_size, + MPIR_TYPEREP_FLAG_NONE); + } else { + MPIR_Typerep_pack(buf, count, datatype, offset, data_buf, available, &packed_size, + MPIR_TYPEREP_FLAG_STREAM); + } + cell->payload_size += packed_size; + *bytes_sent = packed_size; + } + + terminal->last_seq++; + MPL_atomic_release_store_uint64(&terminal->cntr->seq.a, terminal->last_seq); + + fn_exit: + MPIR_FUNC_EXIT; + return ret; +} + +#endif /* POSIX_EAGER_QUICQ_SEND_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/quicq_types.h b/src/mpid/ch4/shm/posix/eager/quicq/quicq_types.h new file mode 100644 index 00000000000..c44b043b800 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/quicq_types.h @@ -0,0 +1,93 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_QUICQ_TYPES_H_INCLUDED +#define POSIX_EAGER_QUICQ_TYPES_H_INCLUDED + +#include +#include "mpidu_init_shm.h" +#include "mpidu_genq.h" + +#define MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_EXTBUF (0x1) +#define MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_HDR (0x2) +#define MPIDI_POSIX_EAGER_QUICQ_CELL_TYPE_DATA (0x4) + +/* The number of cells per queue is always rounded up to the next power of two number during + * init. In this way we can convert counter value to index by just masking the higher bits. + * The counter has 64 bits. We do not check counter overflow as it would be extremely + * unlikely to happen. Assuming each rank can send and recv at 100M messages per second, it + * would take more than 5000 years to overflow. + */ +#define MPIDI_POSIX_EAGER_QUICQ_CNTR_MASK ((uint64_t) MPIR_CVAR_CH4_SHM_POSIX_QUICQ_NUM_CELLS - 1) +#define MPIDI_POSIX_EAGER_QUICQ_CNTR_TO_IDX(cntr) ((cntr) & MPIDI_POSIX_EAGER_QUICQ_CNTR_MASK) + +typedef struct MPIDI_POSIX_eager_quicq_cell MPIDI_POSIX_eager_quicq_cell_t; + +/* Each cell contains some data being communicated from one process to another. */ +struct MPIDI_POSIX_eager_quicq_cell { + uint16_t type; /* Type of cell (head/tail/etc.) */ + uint16_t from; /* Who is the message in the cell from */ + uint32_t payload_size; /* Size of the message in the cell */ + MPIDI_POSIX_am_header_t am_header; /* If this cell is the beginning of a message, it will have + * an active message header and this will point to it. */ +}; + +typedef struct MPIDI_POSIX_eager_quicq_extbuf_hdr { + uint64_t handle; + void *buf; +} MPIDI_POSIX_eager_quicq_extbuf_hdr; + +typedef struct MPIDI_POSIX_eager_quicq_cntr { + union { + MPL_atomic_uint64_t a; + char pad[MPL_CACHELINE_SIZE]; + } seq; + union { + MPL_atomic_uint64_t a; + char pad[MPL_CACHELINE_SIZE]; + } ack; +} MPIDI_POSIX_eager_quicq_cntr_t; + +typedef struct MPIDI_POSIX_eager_quicq_terminal { + void *cell_base; + MPIDI_POSIX_eager_quicq_cntr_t *cntr; + uint64_t last_seq; + uint64_t last_ack; +} MPIDI_POSIX_eager_quicq_terminal_t; + +typedef struct MPIDI_POSIX_eager_quicq_transport { + int size_of_cell; + int cell_alloc_size; + int num_cells_per_queue; + int num_queues; + void *shm_base; + void **cell_bases; + MPIDI_POSIX_eager_quicq_terminal_t *send_terminals; + MPIDI_POSIX_eager_quicq_terminal_t *recv_terminals; + MPIDU_genq_shmem_pool_t extbuf_pool; +} MPIDI_POSIX_eager_quicq_transport_t; + +typedef struct MPIDI_POSIX_eager_quicq_global { + int max_vcis; + /* 2d array indexed with [src_vci][dst_vci] */ + MPIDI_POSIX_eager_quicq_transport_t transports[MPIDI_CH4_MAX_VCIS][MPIDI_CH4_MAX_VCIS]; + int16_t *first_poll_local_ranks; +} MPIDI_POSIX_eager_quicq_global_t; + +extern MPIDI_POSIX_eager_quicq_global_t MPIDI_POSIX_eager_quicq_global; + +MPL_STATIC_INLINE_PREFIX MPIDI_POSIX_eager_quicq_transport_t + * MPIDI_POSIX_eager_quicq_get_transport(int vci_src, int vci_dst) +{ + return &MPIDI_POSIX_eager_quicq_global.transports[vci_src][vci_dst]; +} + +#define MPIDI_POSIX_EAGER_QUICQ_CELL_PAYLOAD(cell) \ + ((char*)(cell) + sizeof(MPIDI_POSIX_eager_quicq_cell_t)) + +#define MPIDI_POSIX_EAGER_QUICQ_CELL_CAPACITY(transport) \ + ((transport)->cell_alloc_size - sizeof(MPIDI_POSIX_eager_quicq_cell_t)) + +#endif /* POSIX_EAGER_QUICQ_TYPES_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/quicq/subconfigure.m4 b/src/mpid/ch4/shm/posix/eager/quicq/subconfigure.m4 new file mode 100644 index 00000000000..9a88380cd17 --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/quicq/subconfigure.m4 @@ -0,0 +1,19 @@ +[#] start of __file__ +dnl MPICH_SUBCFG_AFTER=src/mpid/ch4 + +AC_DEFUN([PAC_SUBCFG_PREREQ_]PAC_SUBCFG_AUTO_SUFFIX,[ + AM_COND_IF([BUILD_CH4],[ + for eager in $ch4_posix_eager_modules ; do + AS_CASE([$eager],[quicq],[build_ch4_shm_posix_eager_quicq=yes]) + if test $eager = "quicq" ; then + AC_DEFINE(HAVE_CH4_SHM_EAGER_QUICQ,1,[QUICQ submodule is built]) + fi + done + ]) + AM_CONDITIONAL([BUILD_CH4_SHM_POSIX_EAGER_QUICQ],[test "X$build_ch4_shm_posix_eager_quicq" = "Xyes"]) +])dnl + +AC_DEFUN([PAC_SUBCFG_BODY_]PAC_SUBCFG_AUTO_SUFFIX,[ +])dnl end _BODY + +[#] end of __file__ diff --git a/src/mpid/common/genq/mpidu_genq_shmem_pool.h b/src/mpid/common/genq/mpidu_genq_shmem_pool.h index e3de456ba1c..f986057240c 100644 --- a/src/mpid/common/genq/mpidu_genq_shmem_pool.h +++ b/src/mpid/common/genq/mpidu_genq_shmem_pool.h @@ -65,4 +65,17 @@ static inline int MPIDU_genq_shmem_pool_cell_free(MPIDU_genq_shmem_pool_t pool, return rc; } +static inline uint64_t MPIDU_genq_shmem_pool_cell_to_handle(MPIDU_genq_shmem_pool_t pool, + void *cell) +{ + return CELL_TO_HEADER(cell)->handle; +} + +static inline void *MPIDU_genq_shmem_pool_handle_to_cell(MPIDU_genq_shmem_pool_t pool, + uint64_t handle) +{ + MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool; + return HEADER_TO_CELL(HANDLE_TO_HEADER(pool_obj, handle)); +} + #endif /* ifndef MPIDU_GENQ_SHMEM_POOL_H_INCLUDED */