From af54a901af1bc68f936bca1ff776ef10bd4a07b6 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Sun, 22 Oct 2017 20:24:24 +0800 Subject: [PATCH] Export event loop to module Export event loop and certain networking related interfaces to module. A demonstration module 'hellonet.c' is added to demonstrate how to use these new set of module interfaces to write event driven commands without resorting to extra thread. --- src/module.c | 515 ++++++++++++++++++++++++++++++- src/modules/Makefile | 7 +- src/modules/hellonet.c | 672 +++++++++++++++++++++++++++++++++++++++++ src/redismodule.h | 65 ++++ 4 files changed, 1247 insertions(+), 12 deletions(-) create mode 100644 src/modules/hellonet.c diff --git a/src/module.c b/src/module.c index fda68b27..93288c42 100644 --- a/src/module.c +++ b/src/module.c @@ -39,14 +39,23 @@ * structures that are never exposed to Redis Modules, if not as void * pointers that have an API the module can call with them) * -------------------------------------------------------------------------- */ +/* attachment related to a specific module */ +struct RedisModuleAttachment { + void *data; + void (*destructor)(void *ctx, void *data); +}; +typedef struct RedisModuleAttachment RedisModuleAttachment; /* This structure represents a module inside the system. */ struct RedisModule { - void *handle; /* Module dlopen() handle. */ - char *name; /* Module name. */ - int ver; /* Module version. We use just progressive integers. */ - int apiver; /* Module API version as requested during initialization.*/ - list *types; /* Module data types. */ + void *handle; /* Module dlopen() handle. */ + char *name; /* Module name. */ + int ver; /* Module version. We use just progressive integers. */ + int apiver; /* Module API version as requested during initialization.*/ + list *types; /* Module data types. */ + client *client; /* Module default client when one is required under context where no real one exists */ + dict *attachments; /* Module attachments */ + RedisModuleAttachment default_attachment; /* default attachment that can be accessed faster */ }; typedef struct RedisModule RedisModule; @@ -659,6 +668,8 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->ver = ver; module->apiver = apiver; module->types = listCreate(); + module->attachments = NULL; + memset(&module->default_attachment,0,sizeof(module->default_attachment)); ctx->module = module; } @@ -3226,20 +3237,29 @@ RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) { * RM_LogIOError() * */ -void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) { +void RM_LogRawWithLevel(RedisModule *module, int level, const char *fmt, va_list ap) { char msg[LOG_MAX_LEN]; size_t name_len; - int level; + name_len = snprintf(msg,sizeof(msg),"<%s> ",module->name); + vsnprintf(msg+name_len,sizeof(msg)-name_len,fmt,ap); + serverLogRaw(level,msg); +} + +/* This is the low level function implementing both: + * + * RM_Log() + * RM_LogIOError() + * + */ +void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) { + int level; if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG; else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE; else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE; else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING; else level = LL_VERBOSE; /* Default. */ - - name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); - vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); - serverLogRaw(level,msg); + RM_LogRawWithLevel(module,level,fmt,ap); } /* Produces a log message to the standard Redis log, the format accepts @@ -3265,6 +3285,15 @@ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { va_end(ap); } +void RM_LogWithLevel(RedisModuleCtx *ctx, int level, const char *fmt, ...) { + if (!ctx->module) return; /* Can only log if module is initialized */ + + va_list ap; + va_start(ap,fmt); + RM_LogRawWithLevel(ctx->module,level,fmt,ap); + va_end(ap); +} + /* Log errors from RDB / AOF serialization callbacks. * * This function should be used when a callback is returning a critical @@ -3656,8 +3685,12 @@ void moduleLoadFromQueue(void) { } void moduleFreeModuleStructure(struct RedisModule *module) { + if (module->attachments != NULL) { + dictRelease(module->attachments); + } listRelease(module->types); sdsfree(module->name); + freeClient(module->client); zfree(module); } @@ -3691,6 +3724,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { /* Redis module loaded! Register it. */ dictAdd(modules,ctx.module->name,ctx.module); ctx.module->handle = handle; + ctx.module->client = createClient(-1); serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); moduleFreeContext(&ctx); return C_OK; @@ -3736,6 +3770,15 @@ int moduleUnload(sds name) { dictReleaseIterator(di); /* Unregister all the hooks. TODO: Yet no hooks support here. */ + if (module->attachments != NULL) { + dictEmpty(module->attachments,NULL); + } + if (module->default_attachment.destructor != NULL) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = module; + module->default_attachment.destructor(&ctx,module->default_attachment.data); + moduleFreeContext(&ctx); + } /* Unload the dynamic library. */ if (dlclose(module->handle) == -1) { @@ -3817,6 +3860,433 @@ size_t moduleCount(void) { return dictSize(modules); } +typedef void (*RedisModuleEventFinalizerProc)(RedisModuleCtx *context, void *clientData); +typedef void (*RedisModuleFileProc)(RedisModuleCtx *context, int fd, void *clientData, int mask); + +typedef struct RedisModuleFileEventData { + RedisModule *module; + int mask; + RedisModuleFileProc rproc; + RedisModuleFileProc wproc; + void *clientData; +} RedisModuleFileEventData; + +void moduleFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) +{ + int rfired = 0; + RedisModuleFileEventData *event = clientData; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + REDISMODULE_NOT_USED(eventLoop); + ctx.module = event->module; + /* use module's client when no real one exist within this context */ + ctx.client = ctx.module->client; + if (event->mask & mask & AE_READABLE) { + rfired = 1; + event->rproc(&ctx,fd,event->clientData,mask); + } + if (event->mask & mask & AE_WRITABLE) { + if (!rfired || event->wproc != event->rproc) + event->wproc(&ctx,fd,event->clientData,mask); + } + moduleFreeContext(&ctx); +} + +/* Create a file event watch for given event type, REDISMODULE_FILE_READABLE/REDISMODULE_FILE_WRITABLE + * are supported at this time. Event callback will be dispatched when the file descriptor is readable + * or writable if corresponding event type is watched. */ +int RM_CreateFileEvent(RedisModuleCtx *ctx, int fd, int mask, RedisModuleFileProc proc, void *clientData) { + RedisModuleFileEventData *event; + REDISMODULE_NOT_USED(ctx); + if (aeGetFileEvents(server.el,fd) == 0) { + if ((event = zcalloc(sizeof(RedisModuleFileEventData))) == NULL) { + return REDISMODULE_ERR; + } + } else { + event = server.el->events[fd].clientData; + } + event->module = ctx->module; + if (mask & AE_READABLE) event->rproc = proc; + if (mask & AE_WRITABLE) event->wproc = proc; + event->clientData = clientData; + if (aeCreateFileEvent(server.el,fd,mask,moduleFileProc,event) != AE_OK) { + if (event != server.el->events[fd].clientData) { + zfree(event); + } + return REDISMODULE_ERR; + } + event->mask |= mask; + return REDISMODULE_OK; +} + +/* Delete a previously watched file event handler */ +void RM_DeleteFileEvent(RedisModuleCtx *ctx, int fd, int mask) { + RedisModuleFileEventData *event; + REDISMODULE_NOT_USED(ctx); + if ((aeGetFileEvents(server.el,fd) & mask) == 0) { + return; + } + aeDeleteFileEvent(server.el,fd,mask); + event = server.el->events[fd].clientData; + event->mask = aeGetFileEvents(server.el,fd); + if (event->mask == 0) { + zfree(server.el->events[fd].clientData); + } +} + +typedef int (*RedisModuleTimeProc)(RedisModuleCtx *context, long long id, void *clientData); + +typedef struct RedisModuleTimeEventData { + RedisModule *module; + RedisModuleTimeProc proc; + RedisModuleEventFinalizerProc finalizer; + void *clientData; +} RedisModuleTimeEventData; + +int moduleTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + RedisModuleTimeEventData *event = clientData; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + int rc; + REDISMODULE_NOT_USED(eventLoop); + ctx.module = event->module; + /* use module's client when no real one exist within this context */ + ctx.client = ctx.module->client; + rc = event->proc(&ctx,id,event->clientData); + moduleFreeContext(&ctx); + return rc; +} + +void moduleEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData) { + RedisModuleTimeEventData *event = clientData; + REDISMODULE_NOT_USED(eventLoop); + if (event->finalizer != NULL) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = event->module; + ctx.client = ctx.module->client; + event->finalizer(&ctx,event->clientData); + moduleFreeContext(&ctx); + } + zfree(event); +} + +/* Add a file event watch for given event type, REDISMODULE_FILE_READABLE/REDISMODULE_FILE_WRITABLE + * are supported at this time. REDISMODULE_FILE_READABLE event is fired when the file descriptor is + * ready for read without blocking */ +int RM_CreateTimeEvent(RedisModuleCtx *ctx, long long milliseconds, RedisModuleTimeProc proc, void *clientData, RedisModuleEventFinalizerProc finalizer, long long *id) { + RedisModuleTimeEventData *event = zcalloc(sizeof(RedisModuleTimeEventData)); + if (event == NULL) { + return REDISMODULE_ERR; + } + event->module = ctx->module; + event->proc = proc; + event->finalizer = finalizer; + event->clientData = clientData; + if ((*id = aeCreateTimeEvent(server.el,milliseconds,moduleTimeProc,event,moduleEventFinalizerProc)) == AE_ERR) { + zfree(event); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Delete an existing timer with id */ +void RM_DeleteTimeEvent(RedisModuleCtx *ctx, long long id) { + REDISMODULE_NOT_USED(ctx); + aeDeleteTimeEvent(server.el,id); +} + +/* Connect to a server using TCP in nonblocking way, return file descriptor of the new connection */ +int RM_TcpNonBlockConnect(RedisModuleCtx *ctx, const char *addr, int port, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetTcpNonBlockConnect(err,(char *)addr,port)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Connect to a server using unix domain socket in nonblocking way, return file descriptor of the new connection */ +int RM_UnixNonBlockConnect(RedisModuleCtx *ctx, const char *path, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetUnixNonBlockConnect(err,(char *)path)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Create an IPV4 TCP server socket and bind to the specified port and optional bind + * address with given backlog. Return file descriptor of the server socket */ +int RM_TcpServer(RedisModuleCtx *ctx, int port, const char *bindaddr, int backlog, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetTcpServer(err,port,(char *)bindaddr,backlog)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Create an IPV6 TCP server socket and bind to the specified port and optional bind + * address with given backlog. Return file descriptor of the server socket */ +int RM_Tcp6Server(RedisModuleCtx *ctx, int port, const char *bindaddr, int backlog, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetTcp6Server(err,port,(char *)bindaddr,backlog)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Create an unix domain socket server socket and bind to the specified path + * with given permission and backlog. Return file descriptor of the server socket */ +int RM_UnixServer(RedisModuleCtx *ctx, const char *path, mode_t perm, int backlog, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetUnixServer(err,(char *)path,perm,backlog)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Accept a connection on TCP server socket, return connected client's ip address + * and port number if ip argument and port argument are not NULL respectively */ +int RM_TcpAccept(RedisModuleCtx *ctx, int serversock, char *ip, size_t ip_len, int *port, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetTcpAccept(err,serversock,ip,ip_len,port)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Accept a new connection on unix domain server socket, */ +int RM_UnixAccept(RedisModuleCtx *ctx, int serversock, int *fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if ((*fd = anetUnixAccept(err,serversock)) == ANET_ERR) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Set a file descriptor to nonblocking mode */ +int RM_EnableNonBlock(RedisModuleCtx *ctx, int fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetNonBlock(err,fd) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Set a file descriptor to blocking mode */ +int RM_DisableNonBlock(RedisModuleCtx *ctx, int fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetBlock(err,fd) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Disable nagle's algorithm on given tcp socket */ +int RM_EnableTcpNoDelay(RedisModuleCtx *ctx, int fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetEnableTcpNoDelay(err,fd) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Enable nagle's algorithm on given tcp socket */ +int RM_DisableTcpNoDelay(RedisModuleCtx *ctx, int fd) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetDisableTcpNoDelay(err,fd) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Enable tcp keepalive and update keepalive interval on given tcp socket */ +int RM_TcpKeepAlive(RedisModuleCtx *ctx, int fd, int interval) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetKeepAlive(err,fd,interval) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Get peer name of a socket */ +int RM_PeerName(RedisModuleCtx *ctx, int fd, char *ip, size_t ip_len, int *port) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetPeerToString(fd,ip,ip_len,port) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s",err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Get sock name of a socket */ +int RM_SockName(RedisModuleCtx *ctx, int fd, char *ip, size_t ip_len, int *port) { + char err[ANET_ERR_LEN]; + REDISMODULE_NOT_USED(ctx); + if (anetSockName(fd,ip,ip_len,port) != ANET_OK) { + RM_LogWithLevel(ctx,LL_WARNING,"%s", err); + return REDISMODULE_ERR; + } else { + return REDISMODULE_OK; + } +} + +/* Create an authenticated and connected client directly to current redis server */ +int RM_CreateClient(RedisModuleCtx *ctx, int *fd) { + int pair[2]; + client *c; + REDISMODULE_NOT_USED(ctx); + if (socketpair(AF_UNIX,SOCK_STREAM,0,pair) < 0) { + RM_LogWithLevel(ctx,LL_WARNING,"Could not open socket pair to create pipe between redis server and module: %s",strerror(errno)); + return REDISMODULE_ERR; + } + if ((c = createClient(pair[0])) != NULL) { + c->authenticated = 1; + *fd = pair[1]; + return REDISMODULE_OK; + } else { + close(pair[1]); + return REDISMODULE_ERR; + } +} + +/* Free the directly connected client created from RedisModule_CreateClient */ +int RM_FreeClient(RedisModuleCtx *ctx, int client) { + REDISMODULE_NOT_USED(ctx); + close(client); + return REDISMODULE_OK; +} + +static void dictModuleAttachmentDestructor(void *privdata, void *val) { + RedisModuleAttachment *attachment = val; + REDISMODULE_NOT_USED(privdata); + if (attachment->destructor) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = privdata; + attachment->destructor(&ctx,attachment->data); + moduleFreeContext(&ctx); + } + zfree(attachment); +} + +static dictType moduleAttachmentDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictModuleAttachmentDestructor, /* val destructor */ +}; + +int RM_Detach(RedisModuleCtx *ctx, const char *key, size_t key_len); + +/* Attach an user data for current module under a specific key. use the same key + * with RedisModule_GetAttachment can access previously attached user data. + * When key is NULL and key_len is 0, the user data is treated specially, + * aka default user data, which can be accessed in a faster way then normal + * user data without dict lookup. When destructor is not null, it will be + * called with RedisModuleCtx and the user data arguments before the attachment + * is going to be removed. Typically when an user data is replaced by another + * one with the same key or it's detached by callingRedisModule_Detach, or + * when the module is unloaded if it is still alive */ +void RM_Attach(RedisModuleCtx *ctx, const char *key, size_t key_len, void *attachment, void (*destructor)(void *, void *)) { + if (key == NULL && key_len == 0) { + RM_Detach(ctx,NULL,0); + ctx->module->default_attachment.data = attachment; + ctx->module->default_attachment.destructor = destructor; + } else { + RedisModule *module = ctx->module; + RedisModuleAttachment *value = zmalloc(sizeof(RedisModuleAttachment)); + value->data = attachment; + value->destructor = destructor; + if (module->attachments == NULL) { + module->attachments = dictCreate(&moduleAttachmentDictType,module); + } + dictReplace(module->attachments,sdsnewlen(key,key_len),value); + } +} + +/* Get previously attached user data with given key, if key is NULL and key_len is 0, + * it returns the default user data without dict lookup */ +void *RM_GetAttachment(RedisModuleCtx *ctx, const char *key, size_t key_len) { + if (key == NULL && key_len == 0) { + return ctx->module->default_attachment.data; + } else if (ctx->module->attachments != NULL) { + sds k = sdsnewlen(key,key_len); + RedisModuleAttachment *value = dictFetchValue(ctx->module->attachments,k); + sdsfree(k); + if (value != NULL) { + return value->data; + } else { + return NULL; + } + } else { + return NULL; + } +} + +/* Detach a previously attached user data for current module, if key is NULL + * and key_len is 0, it tries to detach a special default user data which is + * faster to access. If a destructor is set with the user data, it will be + * called with RedisModuleCtx and user data as arguments to allow module + * writer to perform certain cleanup logic on the user data. */ +int RM_Detach(RedisModuleCtx *ctx, const char *key, size_t key_len) { + if (key == NULL && key_len == 0) { + RedisModuleAttachment *attachment = &ctx->module->default_attachment; + if (attachment->data != NULL) { + if (attachment->destructor) { + attachment->destructor(ctx,attachment->data); + } + memset(attachment,0,sizeof(*attachment)); + return REDISMODULE_OK; + } else { + return REDISMODULE_ERR; + } + } else if (ctx->module->attachments != NULL) { + sds k = sdsnewlen(key,key_len); + int rc = dictDelete(ctx->module->attachments,k); + sdsfree(k); + return rc == DICT_OK ? REDISMODULE_OK : REDISMODULE_ERR; + } else { + return REDISMODULE_ERR; + } +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -3929,4 +4399,27 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DigestAddStringBuffer); REGISTER_API(DigestAddLongLong); REGISTER_API(DigestEndSequence); + REGISTER_API(CreateFileEvent); + REGISTER_API(DeleteFileEvent); + REGISTER_API(CreateTimeEvent); + REGISTER_API(DeleteTimeEvent); + REGISTER_API(TcpNonBlockConnect); + REGISTER_API(UnixNonBlockConnect); + REGISTER_API(TcpServer); + REGISTER_API(Tcp6Server); + REGISTER_API(UnixServer); + REGISTER_API(TcpAccept); + REGISTER_API(UnixAccept); + REGISTER_API(EnableNonBlock); + REGISTER_API(DisableNonBlock); + REGISTER_API(EnableTcpNoDelay); + REGISTER_API(DisableTcpNoDelay); + REGISTER_API(TcpKeepAlive); + REGISTER_API(PeerName); + REGISTER_API(SockName); + REGISTER_API(CreateClient); + REGISTER_API(FreeClient); + REGISTER_API(Attach); + REGISTER_API(GetAttachment); + REGISTER_API(Detach); } diff --git a/src/modules/Makefile b/src/modules/Makefile index 066e65e9..491cd3b3 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so +all: helloworld.so hellotype.so helloblock.so hellonet.so testmodule.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -33,6 +33,11 @@ helloblock.xo: ../redismodule.h helloblock.so: helloblock.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc +hellonet.xo: ../redismodule.h + +hellonet.so: hellonet.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + testmodule.xo: ../redismodule.h testmodule.so: testmodule.xo diff --git a/src/modules/hellonet.c b/src/modules/hellonet.c new file mode 100644 index 00000000..f94770d6 --- /dev/null +++ b/src/modules/hellonet.c @@ -0,0 +1,672 @@ +/* Hellonet module -- An example of blocking command implementation using + * networking api + */ + +#define REDISMODULE_EXPERIMENTAL_API +#include "../redismodule.h" +#include +#include +#include +#include +#include + +const char *CRLF = "\r\n"; + +/* contextual data for hellonet module */ +typedef struct HelloNetContext HelloNetContext; +/* free context of hellonet module */ +static void HelloNetContext_Free(RedisModuleCtx *ctx, void *arg); + +/* for long lived resources bookkeeping */ +typedef struct HelloNetResource HelloNetResource; +/* close a given resource */ +typedef void (*HelloNetResourceCloser)(RedisModuleCtx *ctx, HelloNetResource *resource); +/* free all resources chained together with given closer */ +static void HelloNetResource_FreeAll(RedisModuleCtx *ctx, HelloNetResource *res, HelloNetResourceCloser closer); +/* free a certain resource with given id */ +static void HelloNetResource_Free(RedisModuleCtx *ctx, HelloNetResource **pres, HelloNetResourceCloser closer, long long id); +/* closer for timer */ +static void HelloNetTimer_Closer(RedisModuleCtx *ctx, HelloNetResource *resource); +/* closer for file event */ +static void HelloNetFd_EventCloser(RedisModuleCtx *ctx, HelloNetResource *resource); +/* closer for open file with events monitored */ +static void HelloNetFd_Closer(RedisModuleCtx *ctx, HelloNetResource *resource); +/* create a new resource and chain it to all resources */ +static void HelloNetResource_New(HelloNetResource **pres, long long id); +/* create a new timer resource */ +static void HelloNetTimer_New(RedisModuleCtx *ctx, long long timer); +/* free a timer resource */ +static void HelloNetTimer_Free(RedisModuleCtx *ctx, long long timer); +/* create a new file resource */ +static void HelloNetFd_New(RedisModuleCtx *ctx, int fd); +/* free a file resource */ +static void HelloNetFd_Free(RedisModuleCtx *ctx, int fd); +/* free a closed file resource */ +static void HelloNetFd_FreeClosed(RedisModuleCtx *ctx, int fd); + +/* buffer structure to help reading from and writing to file descriptor */ +typedef struct HelloNetBuffer HelloNetBuffer; +/* buffer flags */ +enum { + HELLONET_BUFFER_FLAG_WRAPPED = 1, +}; +/* read data from fd and transfer it to buffer */ +static int HelloNetBuffer_TransferTo(HelloNetBuffer *buffer, int fd); +/* read data from buffer and write it to fd */ +static int HelloNetBuffer_TransferFrom(HelloNetBuffer *buffer, int fd); +/* initialize an empty buffer */ +static void HelloNetBuffer_Init(HelloNetBuffer *buffer); +/* wrap existing data to buffer for reading */ +static void HelloNetBuffer_Wrap(HelloNetBuffer *buffer, const char *init); +/* rewind buffer for next reading */ +static void HelloNetBuffer_Rewind(HelloNetBuffer *buffer); +/* reset buffer for next writing */ +static void HelloNetBuffer_Reset(HelloNetBuffer *buffer); +/* free buffer's internal data */ +static void HelloNetBuffer_Free(HelloNetBuffer *buffer); + +/* data required to serve keys request */ +typedef struct HelloNetKeysRequest HelloNetKeysRequest; +/* free keys request data */ +static void HelloNetKeysRequest_Free(HelloNetKeysRequest *request); + +/* connector type for connecting to upstream redis server */ +typedef void (*HelloNetKeys_Connector)(RedisModuleCtx *ctx, HelloNetKeysRequest *request); +/* tcp upstream redis connector */ +static void HelloNetKeys_TcpConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request); +/* unix upstream redis connector */ +static void HelloNetKeys_UnixConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request); +/* direct client upstream redis connector */ +static void HelloNetKeys_ClientConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request); + +/* buffered info response data from redis */ +typedef struct HelloNetInfoResponse HelloNetInfoResponse; + +/* create server with given setup */ +typedef int (*HelloNetInfo_Server)(RedisModuleCtx *ctx, const char *address, int port, int backlog); +/* create a tcp server on bind address and port number with given backlog */ +static int HelloNetInfo_TcpServer(RedisModuleCtx *ctx, const char *address, int port, int backlog); +/* create an ipv6 tcp server on bind address and port number with given backlog */ +static int HelloNetInfo_Tcp6Server(RedisModuleCtx *ctx, const char *address, int port, int backlog); +/* create an unix server on path with given permission and backlog */ +static int HelloNetInfo_UnixServer(RedisModuleCtx *ctx, const char *path, int perm, int backlog); +/* acceptor for new client */ +typedef int (*HelloNetInfo_Acceptor)(RedisModuleCtx *ctx, int fd); +/* accept tcp client */ +static int HelloNetInfo_TcpAcceptor(RedisModuleCtx *ctx, int fd); +/* accept unix client */ +static int HelloNetInfo_UnixAcceptor(RedisModuleCtx *ctx, int fd); + +/* module context */ +struct HelloNetContext { + HelloNetResource *fd; + HelloNetResource *timer; +}; + +static void HelloNetContext_Free(RedisModuleCtx *ctx, void *arg) { + HelloNetContext *net = arg; + HelloNetResource_FreeAll(ctx, net->fd, HelloNetFd_Closer); + HelloNetResource_FreeAll(ctx, net->timer, HelloNetTimer_Closer); + RedisModule_Free(net); +} + +/* resource tracking logic */ +struct HelloNetResource { + long long id; + HelloNetResource *next; +}; + +static void HelloNetResource_FreeAll(RedisModuleCtx *ctx, HelloNetResource *res, HelloNetResourceCloser closer) { + HelloNetResource *next; + while (res != NULL) { + next = res->next; + closer(ctx, res); + RedisModule_Free(res); + res = next; + } +} + +static void HelloNetResource_Free(RedisModuleCtx *ctx, HelloNetResource **pres, HelloNetResourceCloser closer, long long id) { + HelloNetResource *res = *pres; + while (res != NULL) { + if (res->id == id) { + *pres = res->next; + break; + } else { + pres = &res->next; + } + } + if (res != NULL) { + closer(ctx, res); + RedisModule_Free(res); + } +} + +static void HelloNetTimer_Closer(RedisModuleCtx *ctx, HelloNetResource *resource) { + RedisModule_DeleteTimeEvent(ctx, resource->id); +} + +static void HelloNetFd_EventCloser(RedisModuleCtx *ctx, HelloNetResource *resource) { + RedisModule_DeleteFileEvent(ctx, resource->id, REDISMODULE_FILE_READABLE | REDISMODULE_FILE_WRITABLE); +} + +static void HelloNetFd_Closer(RedisModuleCtx *ctx, HelloNetResource *resource) { + HelloNetFd_EventCloser(ctx, resource); + close(resource->id); +} + +static void HelloNetResource_New(HelloNetResource **pres, long long id) { + HelloNetResource *res = RedisModule_Alloc(sizeof(*res)); + res->id = id; + res->next = *pres; + *pres = res; +} + +static void HelloNetTimer_New(RedisModuleCtx *ctx, long long timer) { + HelloNetContext *net = RedisModule_GetAttachment(ctx, NULL, 0); + HelloNetResource_New(&net->timer, timer); +} +static void HelloNetTimer_Free(RedisModuleCtx *ctx, long long timer) { + HelloNetContext *net = RedisModule_GetAttachment(ctx, NULL, 0); + HelloNetResource_Free(ctx, &net->timer, HelloNetTimer_Closer, timer); +} + +static void HelloNetFd_New(RedisModuleCtx *ctx, int fd) { + HelloNetContext *net = RedisModule_GetAttachment(ctx, NULL, 0); + HelloNetResource_New(&net->fd, fd); +} + +static void HelloNetFd_Free(RedisModuleCtx *ctx, int fd) { + HelloNetContext *net = RedisModule_GetAttachment(ctx, NULL, 0); + HelloNetResource_Free(ctx, &net->fd, HelloNetFd_Closer, fd); +} + +static void HelloNetFd_FreeClosed(RedisModuleCtx *ctx, int fd) { + HelloNetContext *net = RedisModule_GetAttachment(ctx, NULL, 0); + HelloNetResource_Free(ctx, &net->fd, HelloNetFd_EventCloser, fd); +} + +/* buffer logic to transfer from and to file descriptors */ +struct HelloNetBuffer { + int flags; + int rd; + int wr; + int capacity; + char *buffer; +}; + +static int HelloNetBuffer_TransferTo(HelloNetBuffer *buffer, int fd) { + int available, rd; + while (1) { + if ((available = buffer->capacity - buffer->wr) < 128) { + buffer->capacity += 4096; + /* leave one byte for \0 terminator */ + buffer->buffer = RedisModule_Realloc(buffer->buffer,buffer->capacity+1); + available += 4096; + } + if ((rd = read(fd,buffer->buffer+buffer->wr,available)) > 0) { + buffer->wr += rd; + } else { + break; + } + } + buffer->buffer[buffer->wr] = '\0'; + return rd < 0 && errno != EINTR && errno != EAGAIN ? REDISMODULE_ERR : REDISMODULE_OK; +} + +static int HelloNetBuffer_TransferFrom(HelloNetBuffer *buffer, int fd) { + int wr = write(fd,buffer->buffer+buffer->rd,buffer->wr-buffer->rd); + if (wr > 0) { + buffer->rd += wr; + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { + return REDISMODULE_ERR; + } + return REDISMODULE_OK; +} + +static void HelloNetBuffer_Init(HelloNetBuffer *buffer) { + memset(buffer, 0, sizeof(*buffer)); +} + +static void HelloNetBuffer_Wrap(HelloNetBuffer *buffer, const char *init) { + buffer->rd = 0; + buffer->capacity = buffer->wr = strlen(init); + buffer->buffer = (char *) init; + buffer->flags = HELLONET_BUFFER_FLAG_WRAPPED; +} + +static void HelloNetBuffer_Rewind(HelloNetBuffer *buffer) { + buffer->rd = 0; +} + +static void HelloNetBuffer_Reset(HelloNetBuffer *buffer) { + if ((buffer->flags & HELLONET_BUFFER_FLAG_WRAPPED) != 0) { + HelloNetBuffer_Init(buffer); + } else { + buffer->rd = buffer->wr = 0; + } +} + +static void HelloNetBuffer_Free(HelloNetBuffer *buffer) { + if ((buffer->flags & HELLONET_BUFFER_FLAG_WRAPPED) == 0) { + RedisModule_Free(buffer->buffer); + } + HelloNetBuffer_Init(buffer); +} + +/* bookkeeping data for hellonet.tcp/unix/client request */ +struct HelloNetKeysRequest { + HelloNetKeys_Connector connector; + const char *error; + char *address; + char client[32]; + size_t length; + int left; + HelloNetBuffer buffer; +}; + +static void HelloNetKeysRequest_Free(HelloNetKeysRequest *request) { + RedisModule_Free(request->address); + HelloNetBuffer_Free(&request->buffer); + RedisModule_Free(request); +} + +static int HelloNetKeys_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + HelloNetKeysRequest *request = RedisModule_GetBlockedClientPrivateData(ctx); + HelloNetBuffer *buffer = &request->buffer; + const char *crlf; + int length; + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + if (request->error) { + RedisModule_ReplyWithError(ctx,request->error); + } else { + HelloNetBuffer_Rewind(buffer); + while ((crlf = strstr(buffer->buffer+buffer->rd,CRLF)) != NULL) { + const char c = buffer->buffer[buffer->rd]; + if (c == '*') { + /* array size */ + RedisModule_ReplyWithArray(ctx,strtol(buffer->buffer+buffer->rd+1,NULL,10)); + } else if (c == '$') { + /* array element */ + length = strtol(buffer->buffer+buffer->rd+1,NULL,10); + RedisModule_ReplyWithStringBuffer(ctx,crlf+2,length); + crlf = strstr(crlf+2,CRLF); + } + buffer->rd = (crlf - buffer->buffer) + 2; + } + } + return REDISMODULE_OK; +} + +static int HelloNetKeys_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + char fmtbuff[32]; + size_t len = snprintf(fmtbuff,sizeof(fmtbuff),"%llu",RedisModule_GetClientId(ctx)); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_Detach(ctx,fmtbuff,len); + return RedisModule_ReplyWithError(ctx,"ERR Request timedout"); +} + +static void HelloNetKeys_FreeRequest(void *privdata) { + HelloNetKeysRequest_Free(privdata); +} + +static void HelloNetKeys_Respond(RedisModuleCtx *ctx, HelloNetKeysRequest *request) { + RedisModuleBlockedClient *blocked = RedisModule_GetAttachment(ctx,request->client,request->length); + if (blocked != NULL) { + RedisModule_Detach(ctx,request->client,request->length); + RedisModule_UnblockClient(blocked,request); + } else { + HelloNetKeysRequest_Free(request); + } +} + +static void HelloNetKeys_Error(RedisModuleCtx *ctx, HelloNetKeysRequest *request, const char *error) { + request->error = error; + HelloNetKeys_Respond(ctx, request); +} + +static void HelloNetKeys_ParseResponse(RedisModuleCtx *ctx, int fd, HelloNetKeysRequest *request) { + const char *crlf; + HelloNetBuffer *buffer = &request->buffer; + while ((crlf = strstr(buffer->buffer+buffer->rd,CRLF)) != NULL) { + const char c = buffer->buffer[buffer->rd]; + if (c == '*') { + /* array size */ + request->left = strtol(buffer->buffer+buffer->rd+1,NULL,10); + } else if (c == '$') { + /* array element */ + if ((crlf = strstr(crlf+2,CRLF)) == NULL) { + return; + } + request->left--; + } + buffer->rd = (crlf - buffer->buffer) + 2; + } + + if (request->left == 0) { + if (request->connector == HelloNetKeys_ClientConnector) { + RedisModule_FreeClient(ctx, fd); + } else { + close(fd); + } + HelloNetFd_FreeClosed(ctx, fd); + HelloNetKeys_Respond(ctx, request); + } +} + +static void HelloNetKeys_ReadResponse(RedisModuleCtx *ctx, int fd, void *clientData, int mask) { + HelloNetKeysRequest *request = clientData; + HelloNetBuffer *buffer = &request->buffer; + REDISMODULE_NOT_USED(mask); + HelloNetBuffer_Reset(buffer); + + if (HelloNetBuffer_TransferTo(buffer, fd) != REDISMODULE_OK) { + HelloNetFd_Free(ctx, fd); + HelloNetKeys_Error(ctx,request,"ERR Could not read response from upstream"); + } else { + HelloNetKeys_ParseResponse(ctx, fd, request); + } +} + +static void HelloNetKeys_WriteRequest(RedisModuleCtx *ctx, int fd, void *clientData, int mask) { + HelloNetKeysRequest *request = clientData; + HelloNetBuffer *buffer = &request->buffer; + REDISMODULE_NOT_USED(mask); + if (HelloNetBuffer_TransferFrom(buffer, fd) != REDISMODULE_OK) { + HelloNetKeys_Error(ctx,request,"ERR Could not write keys request to upstream"); + HelloNetFd_Free(ctx, fd); + } else if (buffer->rd == buffer->wr) { + RedisModule_CreateFileEvent(ctx,fd,REDISMODULE_FILE_READABLE,HelloNetKeys_ReadResponse,request); + RedisModule_DeleteFileEvent(ctx,fd,REDISMODULE_FILE_WRITABLE); + } +} + +static void HelloNetKeys_TcpConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request) { + int fd = 0, port = 6379; + char *c = strchr(request->address, ':'); + if (c != NULL) { + *c = '\0'; + port = strtol(c + 1, NULL, 10); + } + if (RedisModule_TcpNonBlockConnect(ctx, request->address, port, &fd) != REDISMODULE_OK) { + HelloNetKeys_Error(ctx,request,"ERR could not connect to redis server via tcp socket"); + } else { + char peerBuffer[256], localBuffer[256]; + int peerPort, localPort; + if (RedisModule_DisableTcpNoDelay(ctx, fd) != REDISMODULE_OK) { + RedisModule_Log(ctx, "warning", "Could not disable tcp no delay on fd %d", fd); + } + if (RedisModule_EnableTcpNoDelay(ctx, fd) != REDISMODULE_OK) { + RedisModule_Log(ctx, "warning", "Could not enable tcp no delay on fd %d", fd); + } + if (RedisModule_TcpKeepAlive(ctx, fd, 120) != REDISMODULE_OK) { + RedisModule_Log(ctx, "warning", "Could not enable tcp keepalive on fd %d", fd); + } + if (RedisModule_CreateFileEvent(ctx, fd, REDISMODULE_FILE_WRITABLE, HelloNetKeys_WriteRequest, request) == REDISMODULE_OK) { + RedisModule_PeerName(ctx, fd, peerBuffer, sizeof(peerBuffer), &peerPort); + RedisModule_SockName(ctx, fd, localBuffer, sizeof(localBuffer), &localPort); + RedisModule_Log(ctx, "notice", "TCP connection established to socket %s:%d from %s:%d", peerBuffer, peerPort, localBuffer, localPort); + HelloNetFd_New(ctx, fd); + } else { + HelloNetKeys_Error(ctx,request,"ERR could not watch for tcp connect finish event"); + } + } +} + +static void HelloNetKeys_UnixConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request) { + int fd = 0; + if (RedisModule_UnixNonBlockConnect(ctx, request->address, &fd) != REDISMODULE_OK) { + HelloNetKeys_Error(ctx,request,"ERR could not connect to redis server via unix socket"); + } else { + if (RedisModule_CreateFileEvent(ctx, fd, REDISMODULE_FILE_WRITABLE, HelloNetKeys_WriteRequest, request) == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "Unix connection established to %s", request->address); + HelloNetFd_New(ctx, fd); + } else { + HelloNetKeys_Error(ctx,request,"ERR could not watch for unix connect finish event"); + } + } +} + +static void HelloNetKeys_ClientConnector(RedisModuleCtx *ctx, HelloNetKeysRequest *request) { + int fd = 0; + if (RedisModule_CreateClient(ctx, &fd) != REDISMODULE_OK) { + HelloNetKeys_Error(ctx,request,"ERR could not create client to redis server"); + } else { + RedisModule_EnableNonBlock(ctx, fd); + if (RedisModule_CreateFileEvent(ctx, fd, REDISMODULE_FILE_WRITABLE, HelloNetKeys_WriteRequest, request) == REDISMODULE_OK) { + HelloNetFd_New(ctx, fd); + } else { + HelloNetKeys_Error(ctx,request,"ERR could not watch client fd"); + } + } +} + +static int HelloNetKeys_Delayed(RedisModuleCtx *ctx, long long id, void *clientData) { + HelloNetKeysRequest *request = clientData; + request->connector(ctx, request); + HelloNetTimer_Free(ctx, id); + return REDISMODULE_TIME_NOMORE; +} + +/* hellonet.tcp/unix/client
-- Block for seconds, + * then reply with all keys. Timeout is the command timeout, so that you can test + * what happens when the delay is greater than the timeout. */ +static int HelloNetKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, HelloNetKeys_Connector connector) { + size_t len; + long long delay, timeout, timer; + const char *address, *err = NULL; + RedisModuleBlockedClient *bc = NULL; + HelloNetKeysRequest *request = NULL; + if (argc != 4) return RedisModule_WrongArity(ctx); + if (RedisModule_StringToLongLong(argv[2],&delay) != REDISMODULE_OK) { + err = "ERR invalid delay"; + goto error; + } + + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK) { + err = "ERR invalid timeout"; + goto error; + } + address = RedisModule_StringPtrLen(argv[1], &len); + request = RedisModule_Calloc(1, sizeof(*request)); + request->connector = connector; + request->address = RedisModule_Alloc(len + 1); + memcpy(request->address, address, len + 1); + request->length = snprintf(request->client, sizeof(request->client), "%llu", RedisModule_GetClientId(ctx));; + HelloNetBuffer_Wrap(&request->buffer, "*2\r\n$4\r\nkeys\r\n$1\r\n*\r\n"); + RedisModule_Attach(ctx, request->client, request->length, + RedisModule_BlockClient(ctx,HelloNetKeys_Reply,HelloNetKeys_Timeout,HelloNetKeys_FreeRequest,timeout), NULL); + if (RedisModule_CreateTimeEvent(ctx, delay, HelloNetKeys_Delayed, request, NULL, &timer) == REDISMODULE_OK) { + HelloNetTimer_New(ctx, timer); + return REDISMODULE_OK; + } + err = "ERR could not create delay timer"; +error: + if (bc != NULL) { + RedisModule_AbortBlock(bc); + } + if (request != NULL) { + HelloNetKeysRequest_Free(request); + } + return RedisModule_ReplyWithError(ctx,err); +} + +/* hellonet.tcpserver/tcp6server/unixserver response */ +struct HelloNetInfoResponse { + HelloNetBuffer buffer; + RedisModuleCallReply *reply; +}; + +static int HelloNetTcp_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetKeys_RedisCommand(ctx, argv, argc, HelloNetKeys_TcpConnector); +} + +static int HelloNetUnix_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetKeys_RedisCommand(ctx, argv, argc, HelloNetKeys_UnixConnector); +} + +static int HelloNetClient_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetKeys_RedisCommand(ctx, argv, argc, HelloNetKeys_ClientConnector); +} + +static void HelloNetInfo_Echo(RedisModuleCtx *ctx, int fd, void *clientData, int mask) { + char address[256]; + HelloNetInfoResponse *response = clientData; + HelloNetBuffer *buffer = &response->buffer; + int port; + REDISMODULE_NOT_USED(mask); + if (HelloNetBuffer_TransferFrom(buffer, fd) != REDISMODULE_OK || buffer->rd == buffer->wr) { + goto cleanup; + } + return; +cleanup: + REDISMODULE_NOT_USED(mask); + RedisModule_PeerName(ctx, fd, address, sizeof(address), &port); + HelloNetFd_Free(ctx, fd); + RedisModule_Log(ctx, "notice", "Closed connection from %s:%d", address, port); + RedisModule_FreeCallReply(response->reply); + HelloNetBuffer_Free(&response->buffer); + RedisModule_Free(response); +} + +static int HelloNetInfo_TcpAcceptor(RedisModuleCtx *ctx, int fd) { + int client, port; + char ip[128]; + if (RedisModule_TcpAccept(ctx, fd, ip, sizeof(ip), &port, &client) != REDISMODULE_OK) { + return 0; + } else { + RedisModule_Log(ctx, "notice", "Accepted tcp connection from %s:%d", ip, port); + return client; + } +} + +static int HelloNetInfo_UnixAcceptor(RedisModuleCtx *ctx, int fd) { + int client; + if (RedisModule_UnixAccept(ctx, fd, &client) != REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "Accepted unix connection"); + return 0; + } else { + return client; + } +} + +static void HelloNetInfo_Accept(RedisModuleCtx *ctx, int fd, void *clientData, int mask) { + HelloNetInfoResponse *response; + HelloNetInfo_Acceptor acceptor = clientData; + int client = acceptor(ctx, fd); + size_t len; + REDISMODULE_NOT_USED(mask); + if (client <= 0) { + return; + } + response = RedisModule_Alloc(sizeof(*response)); + response->reply = RedisModule_Call(ctx, "info", ""); + HelloNetBuffer_Wrap(&response->buffer, RedisModule_CallReplyStringPtr(response->reply, &len)); + if (RedisModule_CreateFileEvent(ctx, client, REDISMODULE_FILE_WRITABLE, HelloNetInfo_Echo, response) == REDISMODULE_OK) { + HelloNetFd_New(ctx, client); + } else { + close(client); + } +} + +static int HelloNetInfo_TcpServer(RedisModuleCtx *ctx, const char *address, int port, int backlog) { + int fd = 0; + RedisModule_TcpServer(ctx, port, strlen(address) == 0 ? NULL : address, backlog, &fd); + return fd; +} + +static int HelloNetInfo_Tcp6Server(RedisModuleCtx *ctx, const char *address, int port, int backlog) { + int fd = 0; + RedisModule_Tcp6Server(ctx, port, strlen(address) == 0 ? NULL : address, backlog, &fd); + return fd; +} + +static int HelloNetInfo_UnixServer(RedisModuleCtx *ctx, const char *path, int perm, int backlog) { + int fd = 0; + RedisModule_UnixServer(ctx, path, perm, backlog, &fd); + return fd; +} + +/* hellonet.tcpserver/tcp6server/unixserver
+ * create server on given address and echo redis info to connected clients */ +static int HelloNetServer_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, + int argc, HelloNetInfo_Server server, HelloNetInfo_Acceptor acceptor) { + size_t len; + const char *address, *err = NULL; + long long port, backlog; + int fd = 0; + if (argc != 4) return RedisModule_WrongArity(ctx); + if (RedisModule_StringToLongLong(argv[2],&port) != REDISMODULE_OK) { + err = "ERR invalid backlog"; + goto error; + } + if (RedisModule_StringToLongLong(argv[3],&backlog) != REDISMODULE_OK) { + err = "ERR invalid backlog"; + goto error; + } + address = RedisModule_StringPtrLen(argv[1], &len); + if ((fd = server(ctx, address, port, backlog)) <= 0) { + err = "ERR could not create server"; + goto error; + } + if (RedisModule_CreateFileEvent(ctx, fd, REDISMODULE_FILE_READABLE, HelloNetInfo_Accept, acceptor) == REDISMODULE_OK) { + HelloNetFd_New(ctx, fd); + RedisModule_ReplyWithLongLong(ctx, fd); + RedisModule_Log(ctx, "notice", "Info server is now ready to accept connections at %s:%lld with backlog of %lld", address, port, backlog); + return REDISMODULE_OK; + } + err = "ERR could not wait for new connection"; +error: + if (fd != 0) { + close(fd); + } + return RedisModule_ReplyWithError(ctx,err); +} + +static int HelloNetTcpServer_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetServer_RedisCommand(ctx, argv, argc, HelloNetInfo_TcpServer, HelloNetInfo_TcpAcceptor); +} + +static int HelloNetTcp6Server_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetServer_RedisCommand(ctx, argv, argc, HelloNetInfo_Tcp6Server, HelloNetInfo_TcpAcceptor); +} + +static int HelloNetUnixServer_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + return HelloNetServer_RedisCommand(ctx, argv, argc, HelloNetInfo_UnixServer, HelloNetInfo_UnixAcceptor); +} + +/* This function must be present on each Redis module. It is used in order to + * register the commands into the Redis server. */ +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"hellonet",1,REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"hellonet.tcp", + HelloNetTcp_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellonet.unix", + HelloNetUnix_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellonet.client", + HelloNetClient_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"hellonet.tcpserver", + HelloNetTcpServer_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellonet.tcp6server", + HelloNetTcp6Server_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellonet.unixserver", + HelloNetUnixServer_RedisCommand,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_Attach(ctx, NULL, 0, RedisModule_Calloc(1, sizeof(HelloNetContext)), HelloNetContext_Free); + + return REDISMODULE_OK; +} diff --git a/src/redismodule.h b/src/redismodule.h index 7fc0fec4..f2a89ed0 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -58,6 +58,16 @@ #define REDISMODULE_HASH_CFIELDS (1<<2) #define REDISMODULE_HASH_EXISTS (1<<3) +#ifdef REDISMODULE_EXPERIMENTAL_API +/* File Events */ +#define REDISMODULE_FILE_NONE 0 +#define REDISMODULE_FILE_READABLE 1 +#define REDISMODULE_FILE_WRITABLE 2 + +/* Time Events */ +#define REDISMODULE_TIME_NOMORE (-1) +#endif + /* A special pointer that we can use between the core and the module to signal * field deletion, and that is impossible to be a valid pointer. */ #define REDISMODULE_HASH_DELETE ((RedisModuleString*)(long)1) @@ -95,6 +105,13 @@ typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value); + +#ifdef REDISMODULE_EXPERIMENTAL_API +typedef void (*RedisModuleFinalizer)(RedisModuleCtx *context, void *clientData); +typedef void (*RedisModuleFileProc)(RedisModuleCtx *context, int fd, void *clientData, int mask); +typedef int (*RedisModuleTimeProc)(RedisModuleCtx *context, long long id, void *clientData); +#endif + #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { uint64_t version; @@ -224,6 +241,30 @@ RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModu void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx); void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx); + +int REDISMODULE_API_FUNC(RedisModule_CreateFileEvent)(RedisModuleCtx *ctx, int fd, int mask, RedisModuleFileProc proc, void *clientData); +void REDISMODULE_API_FUNC(RedisModule_DeleteFileEvent)(RedisModuleCtx *ctx, int fd, int mask); +int REDISMODULE_API_FUNC(RedisModule_CreateTimeEvent)(RedisModuleCtx *ctx, long long milliseconds, RedisModuleTimeProc proc, void *clientData, RedisModuleFinalizer finalizer, long long *id); +void REDISMODULE_API_FUNC(RedisModule_DeleteTimeEvent)(RedisModuleCtx *ctx, long long id); +int REDISMODULE_API_FUNC(RedisModule_TcpNonBlockConnect)(RedisModuleCtx *ctx, const char *addr, int port, int *fd); +int REDISMODULE_API_FUNC(RedisModule_UnixNonBlockConnect)(RedisModuleCtx *ctx, const char *path, int *fd); +int REDISMODULE_API_FUNC(RedisModule_TcpServer)(RedisModuleCtx *ctx, int port, const char *bindaddr, int backlog, int *fd); +int REDISMODULE_API_FUNC(RedisModule_Tcp6Server)(RedisModuleCtx *ctx, int port, const char *bindaddr, int backlog, int *fd); +int REDISMODULE_API_FUNC(RedisModule_UnixServer)(RedisModuleCtx *ctx, const char *path, mode_t perm, int backlog, int *fd); +int REDISMODULE_API_FUNC(RedisModule_TcpAccept)(RedisModuleCtx *ctx, int serversock, char *ip, size_t ip_len, int *port, int *fd); +int REDISMODULE_API_FUNC(RedisModule_UnixAccept)(RedisModuleCtx *ctx, int serversock, int *fd); +int REDISMODULE_API_FUNC(RedisModule_EnableNonBlock)(RedisModuleCtx *ctx, int fd); +int REDISMODULE_API_FUNC(RedisModule_DisableNonBlock)(RedisModuleCtx *ctx, int fd); +int REDISMODULE_API_FUNC(RedisModule_EnableTcpNoDelay)(RedisModuleCtx *ctx, int fd); +int REDISMODULE_API_FUNC(RedisModule_DisableTcpNoDelay)(RedisModuleCtx *ctx, int fd); +int REDISMODULE_API_FUNC(RedisModule_TcpKeepAlive)(RedisModuleCtx *ctx, int fd, int interval); +int REDISMODULE_API_FUNC(RedisModule_PeerName)(RedisModuleCtx *ctx, int fd, char *ip, size_t ip_len, int *port); +int REDISMODULE_API_FUNC(RedisModule_SockName)(RedisModuleCtx *ctx, int fd, char *ip, size_t ip_len, int *port); +int REDISMODULE_API_FUNC(RedisModule_CreateClient)(RedisModuleCtx *ctx, int *fd); +int REDISMODULE_API_FUNC(RedisModule_FreeClient)(RedisModuleCtx *ctx, int client); +void REDISMODULE_API_FUNC(RedisModule_Attach)(RedisModuleCtx *ctx, const char *key, size_t key_len, void *attachment, RedisModuleFinalizer free); +void *REDISMODULE_API_FUNC(RedisModule_GetAttachment)(RedisModuleCtx *ctx, const char *key, size_t key_len); +int REDISMODULE_API_FUNC(RedisModule_Detach)(RedisModuleCtx *ctx, const char *key, size_t key_len); #endif /* This is included inline inside each Redis module. */ @@ -342,6 +383,30 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(IsBlockedTimeoutRequest); REDISMODULE_GET_API(GetBlockedClientPrivateData); REDISMODULE_GET_API(AbortBlock); + + REDISMODULE_GET_API(CreateFileEvent); + REDISMODULE_GET_API(DeleteFileEvent); + REDISMODULE_GET_API(CreateTimeEvent); + REDISMODULE_GET_API(DeleteTimeEvent); + REDISMODULE_GET_API(TcpNonBlockConnect); + REDISMODULE_GET_API(UnixNonBlockConnect); + REDISMODULE_GET_API(TcpServer); + REDISMODULE_GET_API(Tcp6Server); + REDISMODULE_GET_API(UnixServer); + REDISMODULE_GET_API(TcpAccept); + REDISMODULE_GET_API(UnixAccept); + REDISMODULE_GET_API(EnableNonBlock); + REDISMODULE_GET_API(DisableNonBlock); + REDISMODULE_GET_API(EnableTcpNoDelay); + REDISMODULE_GET_API(DisableTcpNoDelay); + REDISMODULE_GET_API(TcpKeepAlive); + REDISMODULE_GET_API(PeerName); + REDISMODULE_GET_API(SockName); + REDISMODULE_GET_API(CreateClient); + REDISMODULE_GET_API(FreeClient); + REDISMODULE_GET_API(Attach); + REDISMODULE_GET_API(GetAttachment); + REDISMODULE_GET_API(Detach); #endif RedisModule_SetModuleAttribs(ctx,name,ver,apiver);