diff --git a/config/algorithms/MPI/Cray-MPICH/alltoallv.json b/config/algorithms/MPI/Cray-MPICH/alltoallv.json new file mode 100644 index 00000000..949513dd --- /dev/null +++ b/config/algorithms/MPI/Cray-MPICH/alltoallv.json @@ -0,0 +1,21 @@ +{ + "default_mpich": { + "desc": "MPICH default algorithm selection", + "version": "8.0.0", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + } + ], + "selection": "auto", + "tags": [ + "default" + ] + } +} \ No newline at end of file diff --git a/config/algorithms/MPI/LibPico/alltoall.json b/config/algorithms/MPI/LibPico/alltoall.json index 3a2ed4be..12366a70 100644 --- a/config/algorithms/MPI/LibPico/alltoall.json +++ b/config/algorithms/MPI/LibPico/alltoall.json @@ -28,6 +28,36 @@ "external" ] }, + "bine_dh_over": { + "desc": "LibPico external Alltoall algorithm . This algorithm follows the distance-halving Bine butterfly communication pattern described in the Bine paper.", + "version": "1.0.0", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + }, + { + "key": "comm_sz", + "conditions": [ + { + "operator": "is_power_of_two", + "value": true + } + ] + } + ], + "selection": "pico", + "tags": [ + "bine", + "external", + "distance-halving" + ] + }, "pairwise_ompi_over": { "desc": "libpico pairwise algorithm, copied from Open MPI.", "version": "1.0.0", @@ -57,4 +87,4 @@ "external" ] } -} +} \ No newline at end of file diff --git a/config/algorithms/MPI/LibPico/alltoallv.json b/config/algorithms/MPI/LibPico/alltoallv.json new file mode 100644 index 00000000..fac609ad --- /dev/null +++ b/config/algorithms/MPI/LibPico/alltoallv.json @@ -0,0 +1,32 @@ +{ + "bine_dh_over": { + "desc": "LibPico external Alltoall algorithm . This algorithm follows the distance-halving Bine butterfly communication pattern described in the Bine paper.", + "version": "1.0.0", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + }, + { + "key": "comm_sz", + "conditions": [ + { + "operator": "is_power_of_two", + "value": true + } + ] + } + ], + "selection": "pico", + "tags": [ + "bine", + "external", + "distance-halving" + ] + } +} \ No newline at end of file diff --git a/config/algorithms/MPI/MPICH/alltoallv.json b/config/algorithms/MPI/MPICH/alltoallv.json new file mode 100644 index 00000000..418c0305 --- /dev/null +++ b/config/algorithms/MPI/MPICH/alltoallv.json @@ -0,0 +1,21 @@ +{ + "default_mpich": { + "desc": "MPICH default Alltoallv algorithm selection", + "version": "4.3.0", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + } + ], + "selection": "auto", + "tags": [ + "default" + ] + } +} \ No newline at end of file diff --git a/config/algorithms/MPI/Open-MPI/alltoallv.json b/config/algorithms/MPI/Open-MPI/alltoallv.json new file mode 100644 index 00000000..cec83bf3 --- /dev/null +++ b/config/algorithms/MPI/Open-MPI/alltoallv.json @@ -0,0 +1,66 @@ +{ + "default_ompi": { + "desc": "Open MPI default Alltoallv algorithm selection.", + "version": "4.1.5", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + } + ], + "selection": 0, + "cuda_support": "yes", + "rocm_support": "ucx", + "tags": [ + "ignore" + ] + }, + "basic_linear_ompi": { + "desc": "Open MPI basic linear Alltoallv algorithm.", + "version": "4.1.5", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + } + ], + "selection": 1, + "cuda_support": "yes", + "rocm_support": "ucx", + "tags": [ + "basic_linear", + "linear" + ] + }, + "pairwise_ompi": { + "desc": "Open MPI pairwise Alltoallv algorithm.", + "version": "4.1.5", + "constraints": [ + { + "key": "count", + "conditions": [ + { + "operator": ">=", + "value": "comm_sz" + } + ] + } + ], + "selection": 2, + "cuda_support": "yes", + "rocm_support": "ucx", + "tags": [ + "pairwise" + ] + } +} \ No newline at end of file diff --git a/config/parse_test.py b/config/parse_test.py index b6f1aeb6..265a5886 100644 --- a/config/parse_test.py +++ b/config/parse_test.py @@ -15,7 +15,7 @@ "type": "object", "properties": { "libpico_version": {"type": "string", "pattern": "^\\d+\\.\\d+\\.\\d+$"}, - "collective": {"type": "string", "enum": ["ALLREDUCE", "ALLTOALL", "ALLGATHER", "BCAST", "GATHER", "REDUCE", "REDUCE_SCATTER", "SCATTER"]}, + "collective": {"type": "string", "enum": ["ALLREDUCE", "ALLTOALL","ALLTOALLV", "ALLGATHER", "BCAST", "GATHER", "REDUCE", "REDUCE_SCATTER", "SCATTER"]}, "MPI_Op": {"type": "string"}, "tags": { "type": "object", diff --git a/config/test/alltoallv.json b/config/test/alltoallv.json new file mode 100644 index 00000000..9ff0fa87 --- /dev/null +++ b/config/test/alltoallv.json @@ -0,0 +1,16 @@ +{ + "libpico_version": "1.0.0", + "collective": "ALLTOALLV", + "MPI_Op": "MPI_SUM", + "tags": { + "include": [ + "internal", + "external" + ], + "exclude": [] + }, + "specific": { + "include": [], + "exclude": [] + } +} \ No newline at end of file diff --git a/include/libpico.h b/include/libpico.h index 3659a148..4bc0024f 100644 --- a/include/libpico.h +++ b/include/libpico.h @@ -19,6 +19,8 @@ void* rbuf, size_t rcount, MPI_Datatype rdtype, MPI_Comm comm #define ALLTOALL_MPI_ARGS const void *sbuf, size_t scount, MPI_Datatype sdtype, \ void *rbuf, size_t rcount, MPI_Datatype rdtype, MPI_Comm comm +#define ALLTOALLV_MPI_ARGS const void *sbuf, const int scounts[], const int sdispls[], MPI_Datatype sdtype, \ + void *rbuf, const int rcounts[], const int rdispls[], MPI_Datatype rdtype, MPI_Comm comm #define BCAST_MPI_ARGS void *buf, size_t count, MPI_Datatype dtype, int root, MPI_Comm comm #define GATHER_MPI_ARGS const void *sbuf, size_t scount, MPI_Datatype sdtype, \ void *rbuf, size_t rcount, MPI_Datatype rdtype, int root, MPI_Comm comm @@ -78,6 +80,9 @@ int allgather_bine_send_remap_hierarcic_global_local(ALLGATHER_MPI_ARGS); int alltoall_pairwise_ompi(ALLTOALL_MPI_ARGS); int alltoall_bine(ALLTOALL_MPI_ARGS); +int alltoall_bine_DH(ALLTOALL_MPI_ARGS); + +int alltoallv_bine_DH(ALLTOALLV_MPI_ARGS); int bcast_linear(BCAST_MPI_ARGS); int bcast_binomial_halving(BCAST_MPI_ARGS); diff --git a/libpico/libpico_alltoall.c b/libpico/libpico_alltoall.c index 275f2f7a..2a04c737 100644 --- a/libpico/libpico_alltoall.c +++ b/libpico/libpico_alltoall.c @@ -7,65 +7,77 @@ #include #include #include - +#include #include "libpico.h" #include "libpico_utils.h" - /* Alltoall pairwise implementation from Open MPI 5.0.1 base module. * Original file: ompi/mca/coll/base/coll_base_alltoall.c * Original function: ompi_coll_base_alltoall_intra_pairwise */ -int alltoall_pairwise_ompi(const void *sbuf, size_t scount, MPI_Datatype sdtype, - void* rbuf, size_t rcount, MPI_Datatype rdtype, MPI_Comm comm) +int alltoall_pairwise_ompi(const void *sbuf, size_t scount, MPI_Datatype sdtype, + void *rbuf, size_t rcount, MPI_Datatype rdtype, MPI_Comm comm) { int line = -1, err = 0, rank, size, step, sendto, recvfrom; - void * tmpsend, *tmprecv; + void *tmpsend, *tmprecv; ptrdiff_t lb, sext, rext; - if (MPI_IN_PLACE == sbuf) { + if (MPI_IN_PLACE == sbuf) + { err = MPI_ERR_ARG; line = __LINE__; goto err_hndl; } - MPI_Comm_rank (comm, &rank); - MPI_Comm_size (comm, &size); + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); err = MPI_Type_get_extent(sdtype, &lb, &sext); - if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + if (err != MPI_SUCCESS) + { + line = __LINE__; + goto err_hndl; + } err = MPI_Type_get_extent(rdtype, &lb, &rext); - if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } - + if (err != MPI_SUCCESS) + { + line = __LINE__; + goto err_hndl; + } /* Perform pairwise exchange - starting from 1 so the local copy is last */ - for (step = 1; step < size + 1; step++) { + for (step = 1; step < size + 1; step++) + { /* Determine sender and receiver for this step. */ - sendto = (rank + step) % size; + sendto = (rank + step) % size; recvfrom = (rank + size - step) % size; /* Determine sending and receiving locations */ - tmpsend = (char*)sbuf + (ptrdiff_t)sendto * sext * (ptrdiff_t)scount; - tmprecv = (char*)rbuf + (ptrdiff_t)recvfrom * rext * (ptrdiff_t)rcount; + tmpsend = (char *)sbuf + (ptrdiff_t)sendto * sext * (ptrdiff_t)scount; + tmprecv = (char *)rbuf + (ptrdiff_t)recvfrom * rext * (ptrdiff_t)rcount; /* send and receive */ err = MPI_Sendrecv(tmpsend, scount, sdtype, sendto, 0, tmprecv, rcount, rdtype, recvfrom, 0, comm, MPI_STATUS_IGNORE); - if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; } + if (err != MPI_SUCCESS) + { + line = __LINE__; + goto err_hndl; + } } return MPI_SUCCESS; - err_hndl: +err_hndl: fprintf(stderr, "\n%s:%4d\tRank %d Error occurred %d\n\n", __FILE__, line, rank, err); - (void)line; // silence compiler warning + (void)line; // silence compiler warning return err; } int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, - void *recvbuf, size_t r_count, MPI_Datatype r_dtype, MPI_Comm comm) + void *recvbuf, size_t r_count, MPI_Datatype r_dtype, MPI_Comm comm) { assert(s_count == r_count); assert(s_dtype == r_dtype); @@ -87,16 +99,18 @@ int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, tmpbuf_size = sbuf_size * size; tmpbuf_size_real = tmpbuf_size + sizeof(uint) * size + sizeof(uint) * size; - tmpbuf = (char *) malloc(tmpbuf_size_real); - if(tmpbuf == NULL){ + tmpbuf = (char *)malloc(tmpbuf_size_real); + if (tmpbuf == NULL) + { err = MPI_ERR_NO_MEM; goto err_hndl; } - resident_block = (uint *) (tmpbuf + tmpbuf_size); - resident_block_next = (uint *) (tmpbuf + tmpbuf_size + sizeof(uint) * size); + resident_block = (uint *)(tmpbuf + tmpbuf_size); + resident_block_next = (uint *)(tmpbuf + tmpbuf_size + sizeof(uint) * size); // At the beginning I only have my blocks - for(size_t i = 0; i < size; i++){ + for (size_t i = 0; i < size; i++) + { resident_block[i] = i; } @@ -104,16 +118,20 @@ int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, // We use recvbuf to receive/send the data, and tmpbuf to organize the data to send at the next step // By doing so, we avoid a copy form tmpbuf to recvbuf at the end - inverse_mask = 0x1 << (int) (log_2(size) - 1); + inverse_mask = 0x1 << (int)(log_2(size) - 1); block_first_mask = ~(inverse_mask - 1); - while(mask < size){ + while (mask < size) + { int partner; - int ntbn = negabinary_to_binary((mask << 1) -1); - if(rank % 2 == 0){ - partner = mod(rank + ntbn, size); - } else { - partner = mod(rank - ntbn, size); + int ntbn = negabinary_to_binary((mask << 1) - 1); + if (rank % 2 == 0) + { + partner = mod(rank + ntbn, size); + } + else + { + partner = mod(rank - ntbn, size); } min_block_s = remap_rank(size, partner) & block_first_mask; max_block_s = min_block_s + inverse_mask - 1; @@ -121,21 +139,26 @@ int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, size_t block_recvd_cnt = 0, block_send_cnt = 0; size_t offset_send = 0, offset_keep = 0; num_resident_blocks_next = 0; - for(size_t i = 0; i < size; i++){ + for (size_t i = 0; i < size; i++) + { uint block = resident_block[i % num_resident_blocks]; - // Shall I send this block? Check the negabinary thing + // Shall I send this block? Check the negabinary thing uint remap_block = remap_rank(size, block); size_t offset = i * sbuf_size; // I move to the beginning of tmpbuf the blocks I want to keep, // and I move to recvbuf the blocks I want to send. - if(remap_block >= min_block_s && remap_block <= max_block_s){ - memcpy((char*) recvbuf + offset_send, tmpbuf + offset, sbuf_size); + if (remap_block >= min_block_s && remap_block <= max_block_s) + { + memcpy((char *)recvbuf + offset_send, tmpbuf + offset, sbuf_size); offset_send += sbuf_size; block_send_cnt++; - }else{ + } + else + { // Copy the blocks we are not sending to the second half of recvbuf - if(offset != offset_keep){ + if (offset != offset_keep) + { memcpy(tmpbuf + offset_keep, tmpbuf + offset, sbuf_size); } offset_keep += sbuf_size; @@ -145,15 +168,18 @@ int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, num_resident_blocks_next++; } } - assert(block_recvd_cnt == size/2); - assert(block_send_cnt == size/2); + assert(block_recvd_cnt == size / 2); + assert(block_send_cnt == size / 2); num_resident_blocks /= 2; // I receive data in the second half of tmpbuf (the first half contains the blocks I am keeping from previous iteration) - err = MPI_Sendrecv((char*) recvbuf, s_count * block_send_cnt, s_dtype, partner, 0, - tmpbuf + (size / 2) * sbuf_size, s_count * block_send_cnt, s_dtype, partner, 0, + err = MPI_Sendrecv((char *)recvbuf, s_count * block_send_cnt, s_dtype, partner, 0, + tmpbuf + (size / 2) * sbuf_size, s_count * block_send_cnt, s_dtype, partner, 0, comm, MPI_STATUS_IGNORE); - if(err != MPI_SUCCESS) { goto err_hndl; } + if (err != MPI_SUCCESS) + { + goto err_hndl; + } // Update resident blocks memcpy(resident_block, resident_block_next, sizeof(uint) * num_resident_blocks); @@ -166,35 +192,169 @@ int alltoall_bine(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, // Now I need to permute tmpbuf into recvbuf // Since I always received the new block on the right, and moved the blocks // I wanted to keep to the left, they are now sorted in the same order they reached this - // rank from their corresponding source ranks. + // rank from their corresponding source ranks. // I.e., I should consider the "reverse tree" (with this rank at the bottom and all the other ranks on top), // which represent how the data arrived here. // This tree is basically the opposite that I used to send the data // I should consider the decreasing tree, and viceversa. - for(size_t i = 0; i < size; i++){ + for (size_t i = 0; i < size; i++) + { int rotated_i = 0; - if((rank % 2) == 0){ + if ((rank % 2) == 0) + { rotated_i = mod(i - rank, size); - } else { + } + else + { rotated_i = mod(rank - i, size); } int repr = 0; - if(in_range(rotated_i, log_2(size))){ + if (in_range(rotated_i, log_2(size))) + { repr = binary_to_negabinary(rotated_i); - }else{ + } + else + { repr = binary_to_negabinary(rotated_i - size); } int index = remap_distance_doubling(repr); size_t offset_src = index * sbuf_size; size_t offset_dst = i * sbuf_size; - memcpy((char*) recvbuf + offset_dst, tmpbuf + offset_src, sbuf_size); + memcpy((char *)recvbuf + offset_dst, tmpbuf + offset_src, sbuf_size); } free(tmpbuf); return MPI_SUCCESS; err_hndl: - if(tmpbuf != NULL) free(tmpbuf); + if (tmpbuf != NULL) + free(tmpbuf); return err; } + +// This implementation follows the distance-halving Bine butterfly pattern described in the paper +int alltoall_bine_DH(const void *sendbuf, size_t s_count, MPI_Datatype s_dtype, + void *recvbuf, size_t r_count, MPI_Datatype r_dtype, MPI_Comm comm) +{ + assert(s_count == r_count); + assert(s_dtype == r_dtype); + + int r, size, dtype, s, err = MPI_SUCCESS; + char *work_buffer = NULL; + char *send_buffer = NULL; + char *keep_buffer = NULL; + size_t block_size; + size_t header_size; + size_t packet_size; + + err = MPI_Comm_rank(comm, &r); + if (err != MPI_SUCCESS) + goto err_hndl; + + err = MPI_Comm_size(comm, &size); + if (err != MPI_SUCCESS) + goto err_hndl; + + err = MPI_Type_size(s_dtype, &dtype); + if (err != MPI_SUCCESS) + goto err_hndl; + + s = (int)log2((double)size); + assert((1 << s) == size); + + block_size = s_count * (size_t)dtype; + header_size = 2 * sizeof(int); + packet_size = header_size + block_size; + + work_buffer = malloc((size_t)size * packet_size); + send_buffer = malloc((size_t)size * packet_size); + keep_buffer = malloc((size_t)size * packet_size); + + if (work_buffer == NULL || send_buffer == NULL || keep_buffer == NULL) + { + err = MPI_ERR_NO_MEM; + goto err_hndl; + } + + for (int i = 0; i < size; i++) + { + char *rec = work_buffer + (size_t)i * packet_size; + int src = r; + int dst = i; + + memcpy(rec, &src, sizeof(int)); + memcpy(rec + sizeof(int), &dst, sizeof(int)); + memcpy(rec + header_size,(const char *)sendbuf + (size_t)i * block_size,block_size); + } + + for (int step = 0; step < s; step++) + { + int partner; + int send_count = 0; + int keep_count = 0; + + if ((r % 2) == 0) + partner = mod(r + (1 - (int)pow(-2, s - step)) / 3, size); + else + partner = mod(r - (1 - (int)pow(-2, s - step)) / 3, size); + + for (int j = 0; j < size; j++) + { + char *rec = work_buffer + (size_t)j * packet_size; + int src, dst; + int logical_dst; + int logical_partner; + + memcpy(&src, rec, sizeof(int)); + memcpy(&dst, rec + sizeof(int), sizeof(int)); + + logical_dst = logical_rank_for_bine_dh_root(dst, src, size); + logical_partner = logical_rank_for_bine_dh_root(partner, src, size); + + if (same_prefix_negabinary(logical_dst, logical_partner,s,step + 1)) + { + memcpy(send_buffer + (size_t)send_count * packet_size,rec,packet_size); + send_count++; + } + else + { + memcpy(keep_buffer + (size_t)keep_count * packet_size,rec,packet_size); + keep_count++; + } + } + + err = MPI_Sendrecv(send_buffer, + (int)((size_t)send_count * packet_size), MPI_BYTE, partner, 0, + work_buffer + (size_t)keep_count * packet_size, + (int)((size_t)send_count * packet_size), MPI_BYTE, partner, 0, + comm, MPI_STATUS_IGNORE); + if (err != MPI_SUCCESS) + goto err_hndl; + + memcpy(work_buffer, + keep_buffer, + (size_t)keep_count * packet_size); + } + + for (int i = 0; i < size; i++) + { + char *rec = work_buffer + (size_t)i * packet_size; + int src, dst; + + memcpy(&src, rec, sizeof(int)); + memcpy(&dst, rec + sizeof(int), sizeof(int)); + + assert(dst == r); + + memcpy((char *)recvbuf + (size_t)src * block_size, + rec + header_size, + block_size); + } + +err_hndl: + free(keep_buffer); + free(send_buffer); + free(work_buffer); + return err; +} \ No newline at end of file diff --git a/libpico/libpico_alltoallv.c b/libpico/libpico_alltoallv.c new file mode 100644 index 00000000..a8c4072b --- /dev/null +++ b/libpico/libpico_alltoallv.c @@ -0,0 +1,162 @@ +#include +#include +#include +#include +#include +#include "libpico.h" +#include "libpico_utils.h" + +// This implementation follows the distance-halving Bine butterfly pattern described in the paper +int alltoallv_bine_DH(const void *sendbuf, const int sendcounts[], const int sdispls[], MPI_Datatype sendtype, + void *recvbuf, const int recvcounts[], const int rdispls[], MPI_Datatype recvtype, + MPI_Comm comm) +{ + assert(sendtype == recvtype); + + int r, size, dtype, s, err = MPI_SUCCESS; + char *work_buffer = NULL, *send_buffer = NULL, *keep_buffer = NULL; + size_t header_size = 3 * sizeof(int), max_dim_buffer, dim_work = 0; + + err = MPI_Comm_rank(comm, &r); + if (err != MPI_SUCCESS) + goto err_hndl; + + err = MPI_Comm_size(comm, &size); + if (err != MPI_SUCCESS) + goto err_hndl; + + err = MPI_Type_size(sendtype, &dtype); + if (err != MPI_SUCCESS) + goto err_hndl; + + s = (int)log2((double)size); + assert((1 << s) == size); + + unsigned long local_total_bytes = 0; + unsigned long global_total_bytes = 0; + + for (int i = 0; i < size; i++) + { + local_total_bytes += (unsigned long)((size_t)sendcounts[i] * (size_t)dtype); + } + + err = allreduce_bine_lat(&local_total_bytes, &global_total_bytes, 1, MPI_UNSIGNED_LONG, MPI_SUM, comm); + if (err != MPI_SUCCESS) + goto err_hndl; + max_dim_buffer = (size_t)size * header_size + (size_t)global_total_bytes; + + work_buffer = malloc(max_dim_buffer); + send_buffer = malloc(max_dim_buffer); + keep_buffer = malloc(max_dim_buffer); + + if (work_buffer == NULL || send_buffer == NULL || keep_buffer == NULL) + { + err = MPI_ERR_NO_MEM; + goto err_hndl; + } + + for (int i = 0; i < size; i++) + { + int block_size = (int)((size_t)sendcounts[i] * (size_t)dtype); + size_t offset = (size_t)sdispls[i] * (size_t)dtype; + size_t packet_size = header_size + (size_t)block_size; + if (block_size > 0) + { + char *rec = work_buffer + dim_work; + int src = r; + int dst = i; + + memcpy(rec, &src, sizeof(int)); + memcpy(rec + sizeof(int), &dst, sizeof(int)); + memcpy(rec + 2 * sizeof(int), &block_size, sizeof(int)); + + memcpy(rec + header_size, (const char *)sendbuf + offset, (size_t)block_size); + + dim_work += packet_size; + } + } + + for (int step = 0; step < s; step++) + { + int partner, dim_send = 0, dim_keep = 0, dim_recv = 0; + + if ((r % 2) == 0) + partner = mod(r + (1 - (int)pow(-2, s - step)) / 3, size); + else + partner = mod(r - (1 - (int)pow(-2, s - step)) / 3, size); + + size_t skip = 0; + while (skip < dim_work) + { + char *rec = work_buffer + skip; + + int dst, block_size , src; + memcpy(&src, rec, sizeof(int)); + memcpy(&dst, rec + sizeof(int), sizeof(int)); + memcpy(&block_size, rec + 2 * sizeof(int), sizeof(int)); + + size_t packet_size = header_size + (size_t)block_size; + + int logical_dst = logical_rank_for_bine_dh_root(dst, src, size); + int logical_partner = logical_rank_for_bine_dh_root(partner, src, size); + if (same_prefix_negabinary(logical_dst, logical_partner,s,step + 1)) + { + memcpy(send_buffer + dim_send, rec, packet_size); + dim_send += (int)packet_size; + } + else + { + memcpy(keep_buffer + dim_keep, rec, packet_size); + dim_keep += (int)packet_size; + } + + skip += packet_size; + } + + err = MPI_Sendrecv(&dim_send, 1, MPI_INT, partner, 0, &dim_recv, 1, MPI_INT, partner, 0, comm, MPI_STATUS_IGNORE); + if (err != MPI_SUCCESS) + goto err_hndl; + + err = MPI_Sendrecv(send_buffer, dim_send, MPI_BYTE, partner, 1, work_buffer + dim_keep, dim_recv, MPI_BYTE, partner, 1, + comm, MPI_STATUS_IGNORE); + if (err != MPI_SUCCESS) + goto err_hndl; + + memcpy(work_buffer, keep_buffer, (size_t)dim_keep); + + dim_work = (size_t)dim_keep + (size_t)dim_recv; + } + + size_t skip = 0; + while (skip < dim_work) + { + char *rec = work_buffer + skip; + + int src, dst, block_size; + + memcpy(&src, rec, sizeof(int)); + memcpy(&dst, rec + sizeof(int), sizeof(int)); + memcpy(&block_size, rec + 2 * sizeof(int), sizeof(int)); + + size_t packet_size = header_size + (size_t)block_size; + + assert(dst == r); + + size_t offset = (size_t)rdispls[src] * (size_t)dtype; + + if (block_size > 0) + { + memcpy((char *)recvbuf + offset, + rec + header_size, + (size_t)block_size); + } + + skip += packet_size; + } + +err_hndl: + free(keep_buffer); + free(send_buffer); + free(work_buffer); + return err; +} \ No newline at end of file diff --git a/libpico/libpico_utils.h b/libpico/libpico_utils.h index 0c1f35de..9b499dd5 100644 --- a/libpico/libpico_utils.h +++ b/libpico/libpico_utils.h @@ -811,4 +811,24 @@ static inline unsigned int floor_power_of_two(unsigned int n) { return n - (n >> 1); } +static inline int same_prefix_negabinary(int a, int b, int total_bits, int prefix_len) +{ + a = binary_to_negabinary(a); + b = binary_to_negabinary(b); + + int shift = total_bits - prefix_len; + return (a >> shift) == (b >> shift); +} + +static inline int logical_rank_for_bine_dh_root(int x, int root, int size) +{ + if ((root % 2) == 0) + // use the normal rotation + return mod(x - root, size); + else + // use the mirrored rotation + return mod(root - x, size); +} + + #endif // LIBPICO_UTILS_H diff --git a/pico_core/pico_core_alltoallv.c b/pico_core/pico_core_alltoallv.c new file mode 100644 index 00000000..1b4e3c9b --- /dev/null +++ b/pico_core/pico_core_alltoallv.c @@ -0,0 +1,18 @@ +#include +#include "pico_core_utils.h" + +int alltoallv_allocator(ALLOCATOR_ARGS) { + int comm_sz; + MPI_Comm_size(comm, &comm_sz); + + *sbuf = malloc(count * type_size); + *rbuf = malloc(count * type_size); + *rbuf_gt = malloc(count * type_size); + + if (*sbuf == NULL || *rbuf == NULL || *rbuf_gt == NULL) { + fprintf(stderr, "Error: failed to allocate buffers for alltoallv\n"); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/pico_core/pico_core_utils.c b/pico_core/pico_core_utils.c index 24049ff3..1526c4e4 100644 --- a/pico_core/pico_core_utils.c +++ b/pico_core/pico_core_utils.c @@ -27,6 +27,7 @@ static inline coll_t get_collective_from_string(const char *coll_str) { CHECK_STR(coll_str, "ALLREDUCE", ALLREDUCE); CHECK_STR(coll_str, "ALLGATHER", ALLGATHER); CHECK_STR(coll_str, "ALLTOALL", ALLTOALL); + CHECK_STR(coll_str, "ALLTOALLV", ALLTOALLV); CHECK_STR(coll_str, "BCAST", BCAST); CHECK_STR(coll_str, "GATHER", GATHER); CHECK_STR(coll_str, "REDUCE", REDUCE); @@ -53,6 +54,8 @@ static inline allocator_func_ptr get_allocator(coll_t collective) { return allgather_allocator; case ALLTOALL: return alltoall_allocator; + case ALLTOALLV: + return alltoallv_allocator; case BCAST: return bcast_allocator; case GATHER: @@ -163,7 +166,7 @@ static inline alltoall_func_ptr get_alltoall_function(const char *algorithm) { #ifndef PICO_NCCL CHECK_STR(algorithm, "bine_over", alltoall_bine); CHECK_STR(algorithm, "pairwise_ompi_over", alltoall_pairwise_ompi); - + CHECK_STR(algorithm, "bine_dh_over", alltoall_bine_DH); PICO_CORE_DEBUG_PRINT_STR("MPI_Alltoall"); return alltoall_wrapper; #else @@ -173,6 +176,12 @@ static inline alltoall_func_ptr get_alltoall_function(const char *algorithm) { #endif } +static inline alltoallv_func_ptr get_alltoallv_function(const char *algorithm){ + CHECK_STR(algorithm, "bine_dh_over", alltoallv_bine_DH); + PICO_CORE_DEBUG_PRINT_STR("MPI_Alltoallv"); + return alltoallv_wrapper; +} + /** * @brief Select and returns the appropriate bcast function based * on the algorithm. It returns the default bcast function if the @@ -336,6 +345,9 @@ int get_routine(test_routine_t *test_routine, const char *algorithm) { case ALLTOALL: test_routine->function.alltoall = get_alltoall_function(algorithm); break; + case ALLTOALLV: + test_routine->function.alltoallv = get_alltoallv_function(algorithm); + break; case BCAST: test_routine->function.bcast = get_bcast_function(algorithm); break; @@ -580,6 +592,28 @@ int run_coll_once(test_routine_t test_routine, void *sbuf, void *rbuf, rbuf, local_count, dtype, comm); break; + case ALLTOALLV: { + int *scounts = malloc(comm_sz * sizeof(int)); + int *sdispls = malloc(comm_sz * sizeof(int)); + int *rcounts_v = malloc(comm_sz * sizeof(int)); + int *rdispls = malloc(comm_sz * sizeof(int)); + + for (int i = 0; i < comm_sz; i++) { + scounts[i] = (int)local_count; + rcounts_v[i] = (int)local_count; + sdispls[i] = i * (int)local_count; + rdispls[i] = i * (int)local_count; + } + + ret = test_routine.function.alltoallv(sbuf, scounts, sdispls, dtype, + rbuf, rcounts_v, rdispls, dtype, comm); + + free(scounts); + free(sdispls); + free(rcounts_v); + free(rdispls); + break; + } case BCAST: ret = test_routine.function.bcast(sbuf, count, dtype, 0, comm); break; @@ -636,6 +670,29 @@ int test_loop(test_routine_t test_routine, void *sbuf, void *rbuf, size_t count, rbuf, local_count, dtype, comm, iter, times, test_routine); break; + case ALLTOALLV: { + int *scounts = malloc(comm_sz * sizeof(int)); + int *sdispls = malloc(comm_sz * sizeof(int)); + int *rcounts_v = malloc(comm_sz * sizeof(int)); + int *rdispls = malloc(comm_sz * sizeof(int)); + + for (int i = 0; i < comm_sz; i++) { + scounts[i] = (int)local_count; + rcounts_v[i] = (int)local_count; + sdispls[i] = i * (int)local_count; + rdispls[i] = i * (int)local_count; + } + + ret = alltoallv_test_loop(sbuf, scounts, sdispls, dtype, + rbuf, rcounts_v, rdispls, dtype, + comm, iter, times, test_routine); + + free(scounts); + free(sdispls); + free(rcounts_v); + free(rdispls); + break; + } case BCAST: ret = bcast_test_loop(sbuf, count, dtype, 0, comm, iter, times, test_routine); @@ -740,6 +797,27 @@ int ground_truth_check(test_routine_t test_routine, void *sbuf, void *rbuf, rbuf_gt, count / (size_t) comm_sz, dtype, comm); GT_CHECK_BUFFER(rbuf, rbuf_gt, count / (size_t) comm_sz, dtype, comm); break; + case ALLTOALLV: { + int *scounts = malloc(comm_sz * sizeof(int)); + int *sdispls = malloc(comm_sz * sizeof(int)); + int *rcounts_v = malloc(comm_sz * sizeof(int)); + int *rdispls = malloc(comm_sz * sizeof(int)); + size_t local_count = count / (size_t)comm_sz; + for (int i = 0; i < comm_sz; i++) { + scounts[i] = (int)local_count; + rcounts_v[i] = (int)local_count; + sdispls[i] = i * (int)local_count; + rdispls[i] = i * (int)local_count; + } + PMPI_Alltoallv(sbuf, scounts, sdispls, dtype, + rbuf_gt, rcounts_v, rdispls, dtype, comm); + GT_CHECK_BUFFER(rbuf, rbuf_gt, count, dtype, comm); + free(scounts); + free(sdispls); + free(rcounts_v); + free(rdispls); + break; + } case BCAST: if(rank == 0) { memcpy(rbuf_gt, sbuf, count * type_size); @@ -1055,7 +1133,8 @@ int rand_sbuf_generator(void *sbuf, MPI_Datatype dtype, size_t count, size_t real_sbuf_count = (test_routine.collective == ALLGATHER || test_routine.collective == GATHER || - test_routine.collective == ALLTOALL) ? + test_routine.collective == ALLTOALL || + test_routine.collective == ALLTOALLV) ? count / (size_t) comm_sz : count; for(size_t i = 0; i < real_sbuf_count; i++) { @@ -1262,10 +1341,11 @@ int debug_sbuf_generator(void *sbuf, MPI_Datatype dtype, size_t count, } size_t real_sbuf_count = - (test_routine.collective == ALLGATHER || - test_routine.collective == GATHER || - test_routine.collective == ALLTOALL) ? - count / (size_t) comm_sz : count; + (test_routine.collective == ALLGATHER || + test_routine.collective == GATHER || + test_routine.collective == ALLTOALL || + test_routine.collective == ALLTOALLV) ? + count / (size_t) comm_sz : count; for(int i=0; i< real_sbuf_count; i++){ if(dtype == MPI_INT64_T) { diff --git a/pico_core/pico_core_utils.h b/pico_core/pico_core_utils.h index 3011ce13..57e07801 100644 --- a/pico_core/pico_core_utils.h +++ b/pico_core/pico_core_utils.h @@ -72,6 +72,7 @@ typedef enum{ ALLREDUCE = 0, ALLGATHER, ALLTOALL, + ALLTOALLV, BCAST, GATHER, REDUCE, @@ -108,6 +109,7 @@ typedef int (*allocator_func_ptr)(ALLOCATOR_ARGS); int allreduce_allocator(ALLOCATOR_ARGS); int allgather_allocator(ALLOCATOR_ARGS); int alltoall_allocator(ALLOCATOR_ARGS); +int alltoallv_allocator(ALLOCATOR_ARGS); int bcast_allocator(ALLOCATOR_ARGS); int gather_allocator(ALLOCATOR_ARGS); int reduce_allocator(ALLOCATOR_ARGS); @@ -122,6 +124,7 @@ int scatter_allocator(ALLOCATOR_ARGS); typedef int (*allreduce_func_ptr)(ALLREDUCE_MPI_ARGS); typedef int (*allgather_func_ptr)(ALLGATHER_MPI_ARGS); typedef int (*alltoall_func_ptr)(ALLTOALL_MPI_ARGS); +typedef int (*alltoallv_func_ptr)(ALLTOALLV_MPI_ARGS); typedef int (*bcast_func_ptr)(BCAST_MPI_ARGS); typedef int (*gather_func_ptr)(GATHER_MPI_ARGS); typedef int (*reduce_func_ptr)(REDUCE_MPI_ARGS); @@ -137,6 +140,9 @@ static inline int allgather_wrapper(ALLGATHER_MPI_ARGS){ static inline int alltoall_wrapper(ALLTOALL_MPI_ARGS){ return MPI_Alltoall(sbuf, (int)scount, sdtype, rbuf, (int)rcount, rdtype, comm); } +static inline int alltoallv_wrapper(ALLTOALLV_MPI_ARGS){ + return MPI_Alltoallv(sbuf, scounts, sdispls, sdtype,rbuf, rcounts, rdispls, rdtype, comm); +} static inline int bcast_wrapper(BCAST_MPI_ARGS){ return MPI_Bcast(buf, (int)count, dtype, root, comm); } @@ -189,6 +195,7 @@ typedef struct { allreduce_func_ptr allreduce; allgather_func_ptr allgather; alltoall_func_ptr alltoall; + alltoallv_func_ptr alltoallv; bcast_func_ptr bcast; gather_func_ptr gather; reduce_func_ptr reduce; @@ -298,6 +305,7 @@ static inline int OP_NAME##_test_loop(ARGS, int iter, double *times, \ DEFINE_TEST_LOOP(allreduce, ALLREDUCE_MPI_ARGS, allreduce(sbuf, rbuf, count, dtype, MPI_SUM, comm)) DEFINE_TEST_LOOP(allgather, ALLGATHER_MPI_ARGS, allgather(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm)) DEFINE_TEST_LOOP(alltoall, ALLTOALL_MPI_ARGS, alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm)) +DEFINE_TEST_LOOP(alltoallv, ALLTOALLV_MPI_ARGS, alltoallv(sbuf, scounts, sdispls, sdtype, rbuf, rcounts, rdispls, rdtype, comm)) DEFINE_TEST_LOOP(bcast, BCAST_MPI_ARGS, bcast(buf, count, dtype, 0, comm)) DEFINE_TEST_LOOP(gather, GATHER_MPI_ARGS, gather(sbuf, scount, sdtype, rbuf, rcount, rdtype, 0, comm)) DEFINE_TEST_LOOP(reduce, REDUCE_MPI_ARGS, reduce(sbuf, rbuf, count, dtype, MPI_SUM, 0, comm)) diff --git a/selector/change_dynamic_rules.py b/selector/change_dynamic_rules.py index 706e3fa3..3eac36f4 100644 --- a/selector/change_dynamic_rules.py +++ b/selector/change_dynamic_rules.py @@ -79,7 +79,7 @@ def modify_dynamic_rule(rule_file: str | os.PathLike, collective_type: str, new_ for i, line in enumerate(lines): if re.search(pattern, line): if i + 4 < len(lines): # Ensure we don't go out of bounds - lines[i+4] = f"0 {new_rule} 0 0 # Algorithm\n" + lines[i+4] = f"0 {int(new_rule)} 0 0 # Algorithm\n" with open(rule_file, 'w') as txt_file: txt_file.writelines(lines) return @@ -108,9 +108,15 @@ def main(): print(f"{__file__}: Environment variables not set.", file=sys.stderr) print(f"DYNAMIC_RULE_FILE={dynamic_rule_file}\nCOLLECTIVE_TYPE={collective_type}\nALGORITHM_DECLARATIONS_DIR={os.getenv('ALGORITHM_DECLARATIONS_DIR')}", file=sys.stderr) sys.exit(1) - new_rule = find_dynamic_rule(algorithm_decls_dirs, collective_type, algorithm) - modify_dynamic_rule(dynamic_rule_file, collective_type, new_rule) - + if not str(new_rule).isdigit(): + print( + f"{__file__}: algorithm {algorithm} for {collective_type} maps to non-Open-MPI rule '{new_rule}'. " + "Resetting Open MPI dynamic rule to default.", + file=sys.stderr) + modify_dynamic_rule(dynamic_rule_file, collective_type, 0) + sys.exit(0) + + modify_dynamic_rule(dynamic_rule_file, collective_type, int(new_rule)) if __name__ == "__main__": main() diff --git a/selector/ompi_dynamic_rules.txt b/selector/ompi_dynamic_rules.txt index 4042af06..fcc915bb 100644 --- a/selector/ompi_dynamic_rules.txt +++ b/selector/ompi_dynamic_rules.txt @@ -1,4 +1,4 @@ -9 # Number of collectives +10 # Number of collectives 0 # Collective ID = ALLGATHER 1 # Number of comm sizes 2 # Comm size @@ -17,6 +17,12 @@ 1 # Number of message sizes 0 0 0 0 # Algorithm +4 # Collective ID = ALLTOALLV +1 # Number of comm sizes +2 # Comm size +1 # Number of message sizes +0 0 0 0 # Algorithm + 6 # Collective ID = BARRIER 1 # Number of comm sizes 2 # Comm size diff --git a/tui/models.py b/tui/models.py index 7ac7f3f2..203c2159 100644 --- a/tui/models.py +++ b/tui/models.py @@ -569,6 +569,7 @@ def from_dict(cls, gpu_json: Dict[str, Any]) -> "GPUSupport": class CollectiveType(Enum): UNKNOWN = 'unknown' ALLTOALL = 'alltoall' + ALLTOALLV = 'alltoallv' ALLREDUCE = 'allreduce' ALLGATHER = 'allgather' BARRIER = 'barrier' @@ -586,6 +587,8 @@ def from_str(cls, value: str): value = value.lower() if value == 'alltoall': return cls.ALLTOALL + elif value == 'alltoallv': + return cls.ALLTOALLV elif value == 'allreduce': return cls.ALLREDUCE elif value == 'allgather': diff --git a/tui/tui/steps/libraries.py b/tui/tui/steps/libraries.py index b3b41237..b0e23a9b 100644 --- a/tui/tui/steps/libraries.py +++ b/tui/tui/steps/libraries.py @@ -77,6 +77,7 @@ def compose(self) -> ComposeResult: "Allgather", "Allreduce", "Alltoall", + "Alltoallv", "Broadcast", "Gather", "Reduce",