Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 199 additions & 9 deletions src/dispatchers/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define BUFREAD64(buf,var) memcpy(&var, buf, 8); if (diff_endian) swap_64(&var);
#endif

/* Note accessing the following 3 global variables must be protected by a
/* Note accessing the following 5 global variables must be protected by a
* mutex, otherwise it will not be thread safe.
*/

Expand All @@ -52,6 +52,12 @@ static int pnc_numfiles;
*/
static int ncmpi_default_create_format = NC_FORMAT_CLASSIC;

/* attribute to be cached in all communicators */
static int pncio_node_ids_keyval = MPI_KEYVAL_INVALID;

/* attribute to be cached in MPI_COMM_SELF */
static int pncio_init_keyval = MPI_KEYVAL_INVALID;

#define NCMPII_HANDLE_ERROR(func) \
if (mpireturn != MPI_SUCCESS) { \
int errorStringLen; \
Expand All @@ -69,6 +75,146 @@ static int ncmpi_default_create_format = NC_FORMAT_CLASSIC;
} \
}

/* struct PNCIO_node_ids is defined in dispatch.h */

/*----< PNCIO_node_ids_copy() >----------------------------------------------*/
/* A function to be invoked when a communicator is duplicated, which adds a
* reference to the already allocated memory space storing node ID array.
*/
static
int PNCIO_node_ids_copy(MPI_Comm comm,
int keyval,
void *extra,
void *attr_inP,
void *attr_outP,
int *flag)
{
PNCIO_node_ids *attr_in = (PNCIO_node_ids*) attr_inP;
PNCIO_node_ids **attr_out = (PNCIO_node_ids**)attr_outP;

if (attr_in == NULL)
return MPI_ERR_KEYVAL;
else
attr_in->ref_count++;

*attr_out = attr_in;

*flag = 1; /* make a copy in the new communicator */

return MPI_SUCCESS;
}

/*----< PNCIO_node_ids_delete() >--------------------------------------------*/
/* Callback function to be called when a communicator is freed, which frees the
* allocated memory space of node ID array.
*/
static
int PNCIO_node_ids_delete(MPI_Comm comm,
int keyval,
void *attr_val,
void *extra)
{
PNCIO_node_ids *node_ids = (PNCIO_node_ids*) attr_val;

if (node_ids == NULL)
return MPI_ERR_KEYVAL;
else
node_ids->ref_count--;

if (node_ids->ref_count <= 0) {
/* free the allocated array */
if (node_ids->ids != NULL)
free(node_ids->ids);
free(node_ids);
}
return MPI_SUCCESS;
}

/*----< PNCIO_end_call() >---------------------------------------------------*/
/* Callback function to be called at MPI_Finalize(), which frees all cached
* attributes.
*/
static
int PNCIO_end_call(MPI_Comm comm,
int keyval,
void *attribute_val,
void *extra_state)
{
/* Free all keyvals used by PnetCDF */

MPI_Comm_free_keyval(&keyval); /* free pncio_init_keyval */

if (pncio_node_ids_keyval != MPI_KEYVAL_INVALID)
MPI_Comm_free_keyval(&pncio_node_ids_keyval);

return MPI_SUCCESS;
}

/*----< set_get_comm_attr() >------------------------------------------------*/
/* Create/set/get attributes into/from the MPI communicators passed in from the
* user application.
*/
static
void set_get_comm_attr(MPI_Comm comm,
PNCIO_node_ids *node_idsP)
{
PNCIO_node_ids *node_ids;

if (pncio_init_keyval == MPI_KEYVAL_INVALID) {
/* This is the first call ever to PnetCDF API. Creating key
* pncio_init_keyval is necessary for MPI_Finalize() to free key
* pncio_node_ids_keyval.
*/
MPI_Comm_create_keyval(MPI_NULL_COPY_FN, PNCIO_end_call,
&pncio_init_keyval, (void*)0);
MPI_Comm_set_attr(MPI_COMM_SELF, pncio_init_keyval, (void*)0);
}

if (pncio_node_ids_keyval == MPI_KEYVAL_INVALID) {
MPI_Comm_create_keyval(PNCIO_node_ids_copy, PNCIO_node_ids_delete,
&pncio_node_ids_keyval, NULL);
/* ignore error, as it is not a critical error */
}

if (pncio_node_ids_keyval != MPI_KEYVAL_INVALID) {
int found, nprocs;

MPI_Comm_get_attr(comm, pncio_node_ids_keyval, &node_ids, &found);
if (!found) {
/* Construct an array storing node IDs of all processes. Note the
* memory allocated for node_ids will be freed by
* PNCIO_node_ids_delete(), a callback function invoked when the
* MPI communicator is freed.
*/
node_ids = (PNCIO_node_ids*) malloc(sizeof(PNCIO_node_ids));
node_ids->ref_count = 1;

MPI_Comm_size(comm, &nprocs);
if (nprocs == 1) {
node_ids->num_nodes = 1;
node_ids->ids = (int*) malloc(sizeof(int));
node_ids->ids[0] = 0;
}
else {
/* Constructing node IDs requires communication calls to
* MPI_Get_processor_name(), MPI_Gather(), and MPI_Bcast().
*/
ncmpii_construct_node_list(comm, &node_ids->num_nodes,
&node_ids->ids);
}

/* FYI. The same key pncio_node_ids_keyval can be added to
* different MPI communicators with same or different values.
*/
MPI_Comm_set_attr(comm, pncio_node_ids_keyval, node_ids);
}
/* else case: returned node_ids contains the cached value */

/* copy contents */
*node_idsP = *node_ids;
}
}

/*----< new_id_PNCList() >---------------------------------------------------*/
/* Return a new ID (array index) from the PNC list, pnc_filelist[] that is
* not used. Note the used elements in pnc_filelist[] may not be contiguous.
Expand Down Expand Up @@ -348,6 +494,7 @@ ncmpi_create(MPI_Comm comm,
void *ncp;
PNC *pncp;
PNC_driver *driver;
PNCIO_node_ids node_ids;
#ifdef BUILD_DRIVER_FOO
int enable_foo_driver=0;
#endif
Expand All @@ -358,6 +505,24 @@ ncmpi_create(MPI_Comm comm,
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nprocs);

#ifdef ENABLE_THREAD_SAFE
int perr;
perr = pthread_mutex_lock(&lock);
if (perr != 0)
printf("Warning in file %s line %d: pthread_mutex_lock() failed (%s)\n",
__FILE__, __LINE__, strerror(perr));
#endif

/* creating communicator attributes must be protected by a mutex */
set_get_comm_attr(comm, &node_ids);

#ifdef ENABLE_THREAD_SAFE
perr = pthread_mutex_unlock(&lock);
if (perr != 0)
printf("Warning in file %s line %d: pthread_mutex_unlock() failed (%s)\n",
__FILE__, __LINE__, strerror(perr));
#endif

if (rank == 0)
set_env_mode(&env_mode);

Expand Down Expand Up @@ -523,9 +688,12 @@ ncmpi_create(MPI_Comm comm,
return err;
}

/* Duplicate comm, because users may free it (though unlikely). Note
* MPI_Comm_dup() is collective. We pass pncp->comm to drivers, so there
* is no need for a driver to duplicate it again.
/* Duplicate comm, because users may use it doing other point-to-point
* communication. When this happened, that communication can mess up with
* the PnetCDF/MPI-IO internal communication, particularly when in
* independent data mode. Note MPI_Comm_dup() is collective. We pass
* pncp->comm to drivers, so there is no need for a driver to duplicate it
* again.
*/
if (comm != MPI_COMM_WORLD && comm != MPI_COMM_SELF) {
mpireturn = MPI_Comm_dup(comm, &pncp->comm);
Expand All @@ -542,7 +710,7 @@ ncmpi_create(MPI_Comm comm,

/* calling the driver's create subroutine */
err = driver->create(pncp->comm, pncp->path, cmode, *ncidp, env_mode,
combined_info, &ncp);
combined_info, node_ids, &ncp);
if (status == NC_NOERR) status = err;
if (combined_info != MPI_INFO_NULL) MPI_Info_free(&combined_info);
if (status != NC_NOERR && status != NC_EMULTIDEFINE_CMODE) {
Expand Down Expand Up @@ -591,6 +759,7 @@ ncmpi_open(MPI_Comm comm,
void *ncp;
PNC *pncp;
PNC_driver *driver;
PNCIO_node_ids node_ids;
#ifdef BUILD_DRIVER_FOO
int enable_foo_driver=0;
#endif
Expand All @@ -601,6 +770,24 @@ ncmpi_open(MPI_Comm comm,
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nprocs);

#ifdef ENABLE_THREAD_SAFE
int perr;
perr = pthread_mutex_lock(&lock);
if (perr != 0)
printf("Warning in file %s line %d: pthread_mutex_lock() failed (%s)\n",
__FILE__, __LINE__, strerror(perr));
#endif

/* creating communicator attributes must be protected by a mutex */
set_get_comm_attr(comm, &node_ids);

#ifdef ENABLE_THREAD_SAFE
perr = pthread_mutex_unlock(&lock);
if (perr != 0)
printf("Warning in file %s line %d: pthread_mutex_unlock() failed (%s)\n",
__FILE__, __LINE__, strerror(perr));
#endif

if (rank == 0)
set_env_mode(&env_mode);

Expand Down Expand Up @@ -754,9 +941,12 @@ ncmpi_open(MPI_Comm comm,
err = new_id_PNCList(ncidp, pncp);
if (err != NC_NOERR) return err;

/* Duplicate comm, because users may free it (though unlikely). Note
* MPI_Comm_dup() is collective. We pass pncp->comm to drivers, so there
* is no need for a driver to duplicate it again.
/* Duplicate comm, because users may use it doing other point-to-point
* communication. When this happened, that communication can mess up with
* the PnetCDF/MPI-IO internal communication, particularly when in
* independent data mode. Note MPI_Comm_dup() is collective. We pass
* pncp->comm to drivers, so there is no need for a driver to duplicate it
* again.
*/
if (comm != MPI_COMM_WORLD && comm != MPI_COMM_SELF) {
mpireturn = MPI_Comm_dup(comm, &pncp->comm);
Expand All @@ -772,7 +962,7 @@ ncmpi_open(MPI_Comm comm,

/* calling the driver's open subroutine */
err = driver->open(pncp->comm, pncp->path, omode, *ncidp, env_mode,
combined_info, &ncp);
combined_info, node_ids, &ncp);
if (status == NC_NOERR) status = err;
if (combined_info != MPI_INFO_NULL) MPI_Info_free(&combined_info);
if (status != NC_NOERR && status != NC_EMULTIDEFINE_OMODE &&
Expand Down
10 changes: 8 additions & 2 deletions src/drivers/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ ncmpii_construct_node_list(MPI_Comm comm,
MPI_Gatherv(my_procname, my_procname_len, MPI_CHAR,
NULL, NULL, NULL, MPI_CHAR, root, comm);

/* compute node IDs of each MPI process */
node_ids = (int *) NCI_Malloc(sizeof(int) * (nprocs + 1));
/* node_ids is an array storing the compute node IDs of all MPI processes
* in the MPI communicator supplied by the application program. Here, we
* use malloc() instead of NCI_Malloc, because node_ids will be freed when
* the communicator is freed. When communicator is MPI_COMM_WORLD or
* MPI_COMM_SELF, it is freed at MPI_Finalize() whose calls to free()
* cannot be tracked by PnetCDF.
*/
node_ids = (int *) malloc(sizeof(int) * (nprocs + 1));

if (rank == root) {
/* all_procnames[] can tell us the number of nodes and number of
Expand Down
4 changes: 2 additions & 2 deletions src/drivers/nc4io/nc4io_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ struct NC_nc4 {

extern int
nc4io_create(MPI_Comm comm, const char *path, int cmode, int ncid,
int env_mode, MPI_Info info, void **ncdp);
int env_mode, MPI_Info info, PNCIO_node_ids node_ids, void **ncdp);

extern int
nc4io_open(MPI_Comm comm, const char *path, int omode, int ncid,
int env_mode, MPI_Info info, void **ncdp);
int env_mode, MPI_Info info, PNCIO_node_ids node_ids, void **ncdp);

extern int
nc4io_close(void *ncdp);
Expand Down
30 changes: 16 additions & 14 deletions src/drivers/nc4io/nc4io_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@
#include <nc4io_driver.h>

int
nc4io_create(MPI_Comm comm,
const char *path,
int cmode,
int ncid,
int env_mode,
MPI_Info info,
void **ncpp) /* OUT */
nc4io_create(MPI_Comm comm,
const char *path,
int cmode,
int ncid,
int env_mode,
MPI_Info info,
PNCIO_node_ids node_ids, /* node IDs of all processes */
void **ncpp) /* OUT */
{
char *filename;
int err, ncidtmp;
Expand Down Expand Up @@ -109,13 +110,14 @@ nc4io_create(MPI_Comm comm,
}

int
nc4io_open(MPI_Comm comm,
const char *path,
int omode,
int ncid,
int env_mode,
MPI_Info info,
void **ncpp)
nc4io_open(MPI_Comm comm,
const char *path,
int omode,
int ncid,
int env_mode,
MPI_Info info,
PNCIO_node_ids node_ids, /* node IDs of all processes */
void **ncpp) /* OUT */
{
char *filename;
int err, ncidtmp;
Expand Down
6 changes: 4 additions & 2 deletions src/drivers/ncadios/ncadios_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ struct NC_ad {

extern int
ncadios_create(MPI_Comm comm, const char *path, int cmode, int ncid,
int env_mode, MPI_Info info, void **ncdp);
int env_mode, MPI_Info info, PNCIO_node_ids node_ids,
void **ncdp);

extern int
ncadios_open(MPI_Comm comm, const char *path, int omode, int ncid,
int env_mode, MPI_Info info, void **ncdp);
int env_mode, MPI_Info info, PNCIO_node_ids node_ids,
void **ncdp);

extern int
ncadios_close(void *ncdp);
Expand Down
Loading