Skip to content

Commit

Permalink
Add shims for parallel functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ZedThree committed Sep 27, 2023
1 parent 816935a commit f64ee26
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 135 deletions.
29 changes: 15 additions & 14 deletions include/netCDF4.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,7 @@ cdef extern from "netcdf.h":

IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cdef extern from "mpi-compat.h": pass
cdef extern from "netcdf_par.h":
ctypedef int MPI_Comm
ctypedef int MPI_Info
int nc_create_par(char *path, int cmode, MPI_Comm comm, MPI_Info info, int *ncidp) nogil
int nc_open_par(char *path, int mode, MPI_Comm comm, MPI_Info info, int *ncidp) nogil
int nc_var_par_access(int ncid, int varid, int par_access) nogil
cdef enum:
NC_COLLECTIVE
NC_INDEPENDENT
cdef extern from "netcdf.h":
cdef enum:
NC_MPIIO
NC_MPIPOSIX
NC_PNETCDF


# taken from numpy.pxi in numpy 1.0rc2.
cdef extern from "numpy/arrayobject.h":
Expand Down Expand Up @@ -436,6 +423,13 @@ cdef extern from "netcdf-compat.h":
unsigned* levelp, unsigned* blocksizep,
unsigned* addshufflep) nogil

# Parallel shims
ctypedef int MPI_Comm
ctypedef int MPI_Info
int nc_create_par(char *path, int cmode, MPI_Comm comm, MPI_Info info, int *ncidp) nogil
int nc_open_par(char *path, int mode, MPI_Comm comm, MPI_Info info, int *ncidp) nogil
int nc_var_par_access(int ncid, int varid, int par_access) nogil

cdef enum:
HAS_RENAME_GRP
HAS_NC_INQ_PATH
Expand Down Expand Up @@ -464,3 +458,10 @@ cdef extern from "netcdf-compat.h":
H5Z_FILTER_ZSTD
H5Z_FILTER_BZIP2
H5Z_FILTER_BLOSC

NC_COLLECTIVE
NC_INDEPENDENT

NC_MPIIO
NC_MPIPOSIX
NC_PNETCDF
14 changes: 14 additions & 0 deletions include/netcdf-compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ static inline int nc_close_memio(int ncid, NC_memio* info) { return NC_EINVAL; }
#define HAS_PARALLEL_SUPPORT 1
#else
#define HAS_PARALLEL_SUPPORT 0
typedef int MPI_Comm;
typedef int MPI_Info;
static inline int nc_create_par(const char *path, int cmode, MPI_Comm comm, MPI_Info info, int *ncidp) { return NC_EINVAL; }
static inline int nc_open_par(const char *path, int mode, MPI_Comm comm, MPI_Info info, int *ncidp) { return NC_EINVAL; }
static inline int nc_var_par_access(int ncid, int varid, int par_access) { return NC_EINVAL; }
# ifndef NC_INDEPENDENT
# define NC_INDEPENDENT 0
# define NC_COLLECTIVE 1
# endif
# ifndef NC_MPIIO
# define NC_MPIIO 0x2000
# define NC_MPIPOSIX NC_MPIIO
# define NC_PNETCDF (NC_MPIIO)
# endif
#endif

#if defined(NC_HAS_PARALLEL4) && NC_HAS_PARALLEL4
Expand Down
193 changes: 72 additions & 121 deletions src/netCDF4/_netCDF4.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ __has_nc_open_mem__ = HAS_NC_OPEN_MEM
__has_nc_create_mem__ = HAS_NC_CREATE_MEM
__has_parallel4_support__ = HAS_PARALLEL4_SUPPORT
__has_pnetcdf_support__ = HAS_PNETCDF_SUPPORT
__has_parallel_support__ = HAS_PARALLEL_SUPPORT
__has_quantization_support__ = HAS_QUANTIZATION_SUPPORT
__has_zstandard_support__ = HAS_ZSTANDARD_SUPPORT
__has_bzip2_support__ = HAS_BZIP2_SUPPORT
Expand All @@ -1273,8 +1274,12 @@ IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
ctypedef MPI.Comm Comm
ctypedef MPI.Info Info
ELSE:
ctypedef object Comm
ctypedef object Info
ctypedef int Comm
ctypedef int Info
cdef Comm MPI_COMM_WORLD
cdef Info MPI_INFO_NULL
MPI_COMM_WORLD = 0
MPI_INFO_NULL = 0

# set path to SSL certificates (issue #1246)
# available starting in version 4.9.1
Expand Down Expand Up @@ -2137,7 +2142,7 @@ strings.
def __init__(self, filename, mode='r', clobber=True, format='NETCDF4',
diskless=False, persist=False, keepweakref=False,
memory=None, encoding=None, parallel=False,
Comm comm=None, Info info=None, **kwargs):
comm=None, info=None, **kwargs):
"""
**`__init__(self, filename, mode="r", clobber=True, diskless=False,
persist=False, keepweakref=False, memory=None, encoding=None,
Expand Down Expand Up @@ -2238,9 +2243,8 @@ strings.
cdef char *path
cdef char namstring[NC_MAX_NAME+1]
cdef int cmode, parmode
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cdef MPI_Comm mpicomm
cdef MPI_Info mpiinfo
cdef MPI_Comm mpicomm
cdef MPI_Info mpiinfo

memset(&self._buffer, 0, sizeof(self._buffer))

Expand All @@ -2261,30 +2265,29 @@ strings.
raise ValueError(msg)

if parallel:
IF HAS_PARALLEL4_SUPPORT != 1 and HAS_PNETCDF_SUPPORT != 1:
msg='parallel mode requires MPI enabled netcdf-c'
if not __has_parallel_support__:
raise ValueError("parallel mode requires MPI enabled netcdf-c")

parallel_formats = []
if __has_parallel4_support__:
parallel_formats += ['NETCDF4','NETCDF4_CLASSIC']
if __has_pnetcdf_support__:
parallel_formats += ['NETCDF3_CLASSIC',
'NETCDF3_64BIT_OFFSET',
'NETCDF3_64BIT_DATA',
'NETCDF3_64BIT']
if format not in parallel_formats:
msg='parallel mode only works with the following formats: ' + ' '.join(parallel_formats)
raise ValueError(msg)
ELSE:
parallel_formats = []
if __has_parallel4_support__:
parallel_formats += ['NETCDF4','NETCDF4_CLASSIC']
if __has_pnetcdf_support__:
parallel_formats += ['NETCDF3_CLASSIC',
'NETCDF3_64BIT_OFFSET',
'NETCDF3_64BIT_DATA',
'NETCDF3_64BIT']
if format not in parallel_formats:
msg='parallel mode only works with the following formats: ' + ' '.join(parallel_formats)
raise ValueError(msg)
if comm is not None:
mpicomm = comm.ob_mpi
else:
mpicomm = MPI_COMM_WORLD
if info is not None:
mpiinfo = info.ob_mpi
else:
mpiinfo = MPI_INFO_NULL
parmode = NC_MPIIO | _cmode_dict[format]
if comm is not None:
mpicomm = comm.ob_mpi
else:
mpicomm = MPI_COMM_WORLD
if info is not None:
mpiinfo = info.ob_mpi
else:
mpiinfo = MPI_INFO_NULL
parmode = NC_MPIIO | _cmode_dict[format]

self._inmemory = False

Expand All @@ -2306,48 +2309,21 @@ strings.
self._inmemory = True # checked in close method

else:
if clobber:
if parallel:
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_CLOBBER | parmode
with nogil:
ierr = nc_create_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
elif diskless:
if persist:
cmode = NC_WRITE | NC_CLOBBER | NC_DISKLESS | NC_PERSIST
with nogil:
ierr = nc_create(path, cmode, &grpid)
else:
cmode = NC_CLOBBER | NC_DISKLESS
with nogil:
ierr = nc_create(path, cmode , &grpid)
else:
with nogil:
ierr = nc_create(path, NC_CLOBBER, &grpid)
cmode = NC_CLOBBER if clobber else NC_NOCLOBBER

if parallel:
cmode |= parmode
with nogil:
ierr = nc_create_par(path, cmode, mpicomm, mpiinfo, &grpid)
else:
if parallel:
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_NOCLOBBER | parmode
with nogil:
ierr = nc_create_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
elif diskless:
if diskless:
cmode |= NC_DISKLESS
if persist:
cmode = NC_WRITE | NC_NOCLOBBER | NC_DISKLESS | NC_PERSIST
with nogil:
ierr = nc_create(path, cmode, &grpid)
else:
cmode = NC_NOCLOBBER | NC_DISKLESS
with nogil:
ierr = nc_create(path, cmode , &grpid)
else:
with nogil:
ierr = nc_create(path, NC_NOCLOBBER, &grpid)
cmode |= NC_WRITE | NC_PERSIST

with nogil:
ierr = nc_create(path, cmode, &grpid)

# reset default format to netcdf3 - this is a workaround
# for issue 170 (nc_open'ing a DAP dataset after switching
# format to NETCDF4). This bug should be fixed in version
Expand All @@ -2369,13 +2345,9 @@ strings.
ierr = nc_open_mem(<char *>path, 0, self._buffer.len, <void *>self._buffer.buf, &grpid)

elif parallel:
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_NOWRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
cmode = NC_NOWRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, mpicomm, mpiinfo, &grpid)
elif diskless:
cmode = NC_NOWRITE | NC_DISKLESS
with nogil:
Expand All @@ -2393,13 +2365,9 @@ strings.
ierr = nc_open(path, NC_NOWRITE, &grpid)
elif mode in ['a','r+'] and os.path.exists(filename):
if parallel:
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_WRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
cmode = NC_WRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, mpicomm, mpiinfo, &grpid)
elif diskless:
cmode = NC_WRITE | NC_DISKLESS
with nogil:
Expand All @@ -2410,13 +2378,9 @@ strings.
elif mode in ['as','r+s'] and os.path.exists(filename):
if parallel:
# NC_SHARE ignored
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_WRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
cmode = NC_WRITE | NC_MPIIO
with nogil:
ierr = nc_open_par(path, cmode, mpicomm, mpiinfo, &grpid)
elif diskless:
cmode = NC_SHARE | NC_DISKLESS
with nogil:
Expand All @@ -2429,13 +2393,9 @@ strings.
if clobber:
if parallel:
# NC_SHARE ignored
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_CLOBBER | parmode
with nogil:
ierr = nc_create_par(path, NC_CLOBBER | cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
cmode = NC_CLOBBER | parmode
with nogil:
ierr = nc_create_par(path, NC_CLOBBER | cmode, mpicomm, mpiinfo, &grpid)
elif diskless:
if persist:
cmode = NC_WRITE | NC_SHARE | NC_CLOBBER | NC_DISKLESS
Expand All @@ -2452,13 +2412,9 @@ strings.
else:
if parallel:
# NC_SHARE ignored
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
cmode = NC_NOCLOBBER | parmode
with nogil:
ierr = nc_create_par(path, cmode, \
mpicomm, mpiinfo, &grpid)
ELSE:
pass
cmode = NC_NOCLOBBER | parmode
with nogil:
ierr = nc_create_par(path, cmode, mpicomm, mpiinfo, &grpid)
elif diskless:
if persist:
cmode = NC_WRITE | NC_SHARE | NC_NOCLOBBER | NC_DISKLESS
Expand Down Expand Up @@ -6018,25 +5974,20 @@ NC_CHAR).
return data

def set_collective(self, value):
"""
**`set_collective(self,True_or_False)`**
"""**`set_collective(self,True_or_False)`**
turn on or off collective parallel IO access. Ignored if file is not
open for parallel access.
turn on or off collective parallel IO access. Ignored if file is not
open for parallel access.
"""
IF HAS_PARALLEL4_SUPPORT or HAS_PNETCDF_SUPPORT:
# set collective MPI IO mode on or off
if value:
with nogil:
ierr = nc_var_par_access(self._grpid, self._varid,
NC_COLLECTIVE)
else:
with nogil:
ierr = nc_var_par_access(self._grpid, self._varid,
NC_INDEPENDENT)
_ensure_nc_success(ierr)
ELSE:
pass # does nothing
if not __has_parallel_support__:
return

mode = NC_COLLECTIVE if value else NC_INDEPENDENT
with nogil:
ierr = nc_var_par_access(self._grpid, self._varid,
NC_COLLECTIVE)
_ensure_nc_success(ierr)


def get_dims(self):
"""
Expand Down

0 comments on commit f64ee26

Please sign in to comment.