Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bindings/python/dspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ def Get(self, name, version, lb, ub, timeout, dtype = None):
passed_type = None if dtype == None else np.dtype(dtype)
return wrapper_dspaces_get(self.client, (self.nspace + name).encode('ascii'), version, lb, ub, passed_type, timeout)

def GetModule(self, module, params = {}):
if not isinstance(module, str):
raise TypeError("module should be a module name")
if not isinstance(params, dict):
raise TypeError("params should be a dictionary.")
serialized_params = {key: json.dumps(val) for key,val in params.items()}
result, err = wrapper_dspaces_get_module(self.client, module.encode('ascii'), serialized_params)
if err < 0:
if err == -3:
raise DSModuleError(err)
elif err == -2 or err == -1 or err == -4 or err == -5:
raise DSRemoteFaultError(err)
elif err == -6:
raise DSConnectionError(err)
else:
raise Exception("unknown failure")
return(result)

def Exec(self, name, version, lb=None, ub=None, fn=None):
arg = DSObject(name=name, version=version, lb=lb, ub=ub)
return(self.VecExec([arg], fn))
Expand Down Expand Up @@ -162,6 +180,9 @@ def GetVarObjs(self, var_name):
def GetModules(self):
return wrapper_dspaces_get_modules(self.client)

def AddModule(self, name, namespace, url):
wrapper_dspaces_add_module(self.client, name.encode('ascii'), namespace.encode('ascii'), url.encode('ascii'))

def _get_expr(obj, client):
if isinstance(obj, DSExpr):
return(obj)
Expand Down
104 changes: 101 additions & 3 deletions bindings/python/dspaces_wrapper.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void wrapper_dspaces_put(PyObject *clientppy, PyObject *obj, const char *name,
Py_END_ALLOW_THREADS
// clang-format on

return;
return;
}

PyObject *wrapper_dspaces_get(PyObject *clientppy, const char *name,
Expand Down Expand Up @@ -261,6 +261,7 @@ PyObject *wrapper_dspaces_get(PyObject *clientppy, const char *name,
dspaces_get_req(*clientp, &in_req, &out_req, timeout);
Py_END_ALLOW_THREADS
// clang-format on

data = out_req.buf;

free(in_req.var_name);
Expand All @@ -287,6 +288,82 @@ PyObject *wrapper_dspaces_get(PyObject *clientppy, const char *name,
return (arr);
}

PyObject *wrapper_dspaces_get_module(PyObject *clientppy, const char *module,
PyObject *params)
{
dspaces_client_t *clientp = PyLong_AsVoidPtr(clientppy);
// char ***param_str;
struct dspaces_mod_param *dsp_params;
int num_params;
PyObject *key, *val, *key_bstr, *val_bstr, *keys, *ret, *arr;
struct dspaces_req out;
PyArray_Descr *descr;
void *data;
int ndim;
npy_intp *dims;
int i, err;

keys = PyDict_Keys(params);
num_params = PyList_Size(keys);

if(num_params > 0) {
dsp_params = malloc(sizeof(*dsp_params) * num_params);
}
for(i = 0; i < num_params; i++) {
dsp_params[i].type = DSP_JSON;
key = PyList_GetItem(keys, i);
key_bstr = PyUnicode_AsASCIIString(key);
dsp_params[i].key = strdup(PyBytes_AsString(key_bstr));
Py_DECREF(key_bstr);
val = PyDict_GetItem(params, key);
val_bstr = PyUnicode_AsASCIIString(val);
dsp_params[i].val.s = strdup(PyBytes_AsString(val_bstr));
Py_DECREF(val_bstr);
}
Py_DECREF(keys);

// clang-format off
Py_BEGIN_ALLOW_THREADS
err = dspaces_get_module(*clientp, module, dsp_params, num_params, &out);
Py_END_ALLOW_THREADS
// clang-format on

data = out.buf;
for(i = 0; i < num_params; i++) {
free(dsp_params[i].key);
free(dsp_params[i].val.s);
}
if(num_params > 0) {
free(dsp_params);
}

ret = PyTuple_New(2);

PyTuple_SET_ITEM(ret, 1, PyLong_FromLong(err));

if(!data) {
Py_INCREF(Py_None);
PyTuple_SET_ITEM(ret, 0, Py_None);
return (ret);
}
descr = PyArray_DescrNewFromType(out.tag);
ndim = out.ndim;
dims = malloc(sizeof(*dims) * ndim);
for(i = 0; i < ndim; i++) {
dims[(ndim - i) - 1] = (out.ub[i] - out.lb[i]) + 1;
}
arr = PyArray_NewFromDescr(&PyArray_Type, descr, ndim, dims, NULL, data, 0,
NULL);
if(!arr) {
PyErr_SetString(PyExc_RuntimeError, "failed to create numpy array");
return (NULL);
}
PyTuple_SET_ITEM(ret, 0, arr);
free(dims);

return (ret);
}

PyObject *wrapper_dspaces_pexec(PyObject *clientppy, PyObject *req_list,
PyObject *fn, const char *fn_name)
{
Expand Down Expand Up @@ -353,9 +430,9 @@ PyObject *wrapper_dspaces_pexec(PyObject *clientppy, PyObject *req_list,
dspaces_mpexec(*clientp, num_reqs, reqs, PyBytes_AsString(fn),
PyBytes_Size(fn) + 1, fn_name, &data, &data_size);
Py_END_ALLOW_THREADS
// clang-format on
// clang-format on

if(data_size > 0)
if(data_size > 0)
{
result = PyBytes_FromStringAndSize(data, data_size);
}
Expand Down Expand Up @@ -714,10 +791,31 @@ PyObject *wrapper_dspaces_register(PyObject *clientppy, const char *type,
PyTuple_SET_ITEM(ret, 0,
PyUnicode_DecodeASCII(nspace, strlen(nspace), NULL));
} else {
Py_INCREF(Py_None);
PyTuple_SET_ITEM(ret, 0, Py_None);
}

PyTuple_SET_ITEM(ret, 1, PyLong_FromLong(id));

return (ret);
}

PyObject *wrapper_dspaces_add_module(PyObject *clientppy, const char *name, const char *namespace, const char *url)
{
dspaces_client_t *clientp = PyLong_AsVoidPtr(clientppy);
int8_t result;

// clang-format off
Py_BEGIN_ALLOW_THREADS
result = dspaces_add_module(*clientp, name, namespace, url);
Py_END_ALLOW_THREADS
// clang-format on

if(result != 0) {
PyErr_SetString(PyExc_RuntimeError, "dspaces_add_module() failed");
return(NULL);
}

Py_INCREF(Py_None);
return (Py_None);
}
5 changes: 5 additions & 0 deletions bindings/python/dspaces_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ PyObject *wrapper_dspaces_get(PyObject *clientppy, const char *name,
int version, PyObject *lbt, PyObject *ubt,
PyObject *dtype, int timeout);

PyObject *wrapper_dspaces_get_module(PyObject *clientppy, const char *module,
PyObject *params);

PyObject *wrapper_dspaces_pexec(PyObject *clientppy, PyObject *req_list,
PyObject *fn, const char *fn_name);

Expand Down Expand Up @@ -62,3 +65,5 @@ PyObject *wrapper_dspaces_ops_calc(PyObject *clientppy, PyObject *exprppy);

PyObject *wrapper_dspaces_register(PyObject *clientppy, const char *type,
const char *name, const char *data);

PyObject *wrapper_dspaces_add_module(PyObject *clientppy, const char *name, const char *namespace, const char *url);
2 changes: 1 addition & 1 deletion docs/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Bootstrapping communication
---------------------------
The server produces a bootstrap file during its init phase, ``conf.ds``. This file must be read by the clients (or rank zero of the clients
if ``dspaces_init_mpi()`` is being used. This file provides the clients with enough information to make initial contact with the server and
perform wire-up. In order to find this file, the server and client application must be run in the same working directory, or at last a symlink of ``ds.conf`` should be present.
perform wire-up. In order to find this file, the server and client application must be run in the same working directory, or at last a symlink of ``conf.ds`` should be present.

Environment variables
---------------------
Expand Down
16 changes: 16 additions & 0 deletions include/dspaces-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <stdio.h>
#include <stdlib.h>

#include <margo.h>

#include "dspaces-logging.h"

#if defined(__cplusplus)
extern "C" {
#endif
Expand Down Expand Up @@ -61,6 +65,8 @@ static inline void ignore_result(int unused_result) { (void)unused_result; }
#define DSP_INT16 -15
#define DSP_INT32 -16
#define DSP_INT64 -17
#define DSP_STR -18
#define DSP_JSON -19

static size_t type_to_size_map[] = {0,
sizeof(float),
Expand Down Expand Up @@ -90,6 +96,16 @@ static int type_to_size(int type_id)
return (type_to_size_map[-type_id]);
}

#define HG_TRY(op, ret, jmp, estr, ...) \
do { \
hg_return_t hret = op; \
if(hret != HG_SUCCESS) { \
DEBUG_OUT(estr, ##__VA_ARGS__); \
err = ret; \
goto jmp; \
} \
} while(0);

#if defined(__cplusplus)
}
#endif
Expand Down
2 changes: 2 additions & 0 deletions include/dspaces-conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ int parse_conf_toml(const char *fname, struct ds_conf *conf);

void print_conf(struct ds_conf *conf);

char *dspaces_download_module(const char *name, const char *url, char *file);

#endif // __DSPACES_CONF_H
13 changes: 13 additions & 0 deletions include/dspaces-modules.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ struct dspaces_module {
typedef enum dspaces_module_arg_type {
DSPACES_ARG_REAL,
DSPACES_ARG_INT,
DSPACES_ARG_BOOL,
DSPACES_ARG_STR,
DSPACES_ARG_JSON,
DSPACES_ARG_NONE
} dspaces_mod_arg_type_t;

Expand Down Expand Up @@ -68,6 +70,14 @@ struct dspaces_module_ret {

int dspaces_init_mods(struct list_head *mods);

int dspaces_server_add_module(struct list_head *mods, const char *name,
const char *namespace, const char *url,
enum dspaces_mod_type type);

int build_module_args_from_dict(size_t dict_len, int8_t *types, char **keys,
void **vals,
struct dspaces_module_args **argsp);

int build_module_args_from_odsc(obj_descriptor *odsc,
struct dspaces_module_args **argsp);

Expand All @@ -91,4 +101,7 @@ struct dspaces_module_ret *dspaces_module_exec(struct dspaces_module *mod,

int dspaces_module_names(struct list_head *mods, char ***names);

int odsc_from_ret(struct dspaces_module_ret *ret, obj_descriptor **odsc_out,
char name[OD_MAX_NAME_LEN], int version);

#endif // __DSPACES_MODULES_H__
25 changes: 25 additions & 0 deletions include/dspaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,29 @@ struct dspaces_req {
int dspaces_get_req(dspaces_client_t client, struct dspaces_req *in_req,
struct dspaces_req *out_req, int timeout);

struct dspaces_mod_param {
int type;
char *key;
union {
float f;
double d;
int8_t b : 1;
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
uint8_t u8;
uint16_t u16;
uint32_t u32;
uint64_t u64;
char *s;
} val;
};

int dspaces_get_module(dspaces_client_t client, const char *module,
struct dspaces_mod_param *params, int num_param,
struct dspaces_req *out);

int dspaces_pexec(dspaces_client_t client, const char *var_name,
unsigned int ver, int ndim, uint64_t *lb, uint64_t *ub,
const char *fn, unsigned int fnsz, const char *fn_name,
Expand Down Expand Up @@ -467,6 +490,8 @@ int dspaces_get_modules(dspaces_client_t client, char ***mod_names);
long dspaces_register_simple(dspaces_client_t client, const char *type,
const char *name, const char *data, char **nspace);

int dspaces_add_module(dspaces_client_t client, const char *name, const char *namespace, const char *url);

#if defined(__cplusplus)
}
#endif
Expand Down
Loading