Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,24 @@ ifneq ($(SKIP_VEC_SETS),yes)
FINAL_CFLAGS+=-DINCLUDE_VEC_SETS=1
endif

# io_uring support (Linux 5.6+, requires liburing)
LIBURING_LIBS=
LIBURING_PKGCONFIG := $(shell $(PKG_CONFIG) --exists liburing 2>/dev/null && echo $$?)
ifeq ($(LIBURING_PKGCONFIG),0)
LIBURING_LIBS=$(shell $(PKG_CONFIG) --libs liburing)
LIBURING_CFLAGS=$(shell $(PKG_CONFIG) --cflags liburing)
else
LIBURING_LIBS=-luring
LIBURING_CFLAGS=
endif

ifeq ($(USE_IO_URING),yes)
ifeq ($(uname_S),Linux)
FINAL_CFLAGS+= -DHAVE_IO_URING $(LIBURING_CFLAGS)
FINAL_LIBS+= $(LIBURING_LIBS)
endif
endif

ifndef V
define MAKE_INSTALL
@printf ' %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$(1)$(ENDCOLOR) 1>&2
Expand Down Expand Up @@ -382,7 +400,7 @@ endif

REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o io_uring_batch.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
Expand Down Expand Up @@ -417,6 +435,7 @@ persist-settings: distclean
echo MALLOC=$(MALLOC) >> .make-settings
echo BUILD_TLS=$(BUILD_TLS) >> .make-settings
echo USE_SYSTEMD=$(USE_SYSTEMD) >> .make-settings
echo USE_IO_URING=$(USE_IO_URING) >> .make-settings
echo CFLAGS=$(CFLAGS) >> .make-settings
echo LDFLAGS=$(LDFLAGS) >> .make-settings
echo REDIS_CFLAGS=$(REDIS_CFLAGS) >> .make-settings
Expand Down
18 changes: 11 additions & 7 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@

/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#ifdef HAVE_IO_URING
#include "ae_io_uring.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#include "ae_select.c"
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
#endif
Expand Down
266 changes: 266 additions & 0 deletions src/ae_io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/* Linux io_uring based ae.c module with epoll fallback
*
* Copyright (c) 2024-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* This module uses io_uring multishot poll for event readiness notification.
* Requires Linux 5.6+ and liburing. If io_uring initialization fails at
* runtime (old kernel, insufficient resources), falls back to epoll
* transparently within the same binary.
*/

#include <liburing.h>
#include <sys/epoll.h>
#include <poll.h>

#define AE_IO_URING_RING_ENTRIES 4096

#define IOURING_USERDATA(fd, mask) (((uint64_t)(unsigned)(fd)) | ((uint64_t)(mask) << 32))
#define IOURING_USERDATA_FD(ud) ((int)((ud) & 0xFFFFFFFF))
#define IOURING_USERDATA_MASK(ud) ((int)((ud) >> 32))

#define IOURING_TAG_BATCH_READ 0x100
#define IOURING_TAG_BATCH_WRITE 0x200

typedef struct aeApiState {
int use_io_uring;
/* epoll fallback state */
int epfd;
struct epoll_event *epoll_events;
/* io_uring state */
struct io_uring ring;
int ring_entries;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
memset(state, 0, sizeof(*state));
state->epfd = -1;

int entries = eventLoop->setsize;
if (entries < AE_IO_URING_RING_ENTRIES)
entries = AE_IO_URING_RING_ENTRIES;

if (io_uring_queue_init(entries, &state->ring, 0) == 0) {
state->use_io_uring = 1;
state->ring_entries = entries;
eventLoop->apidata = state;
return 0;
}

/* io_uring unavailable, fall back to epoll */
state->use_io_uring = 0;
state->epoll_events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
if (!state->epoll_events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024);
if (state->epfd == -1) {
zfree(state->epoll_events);
zfree(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;
if (state->use_io_uring) return 0;
state->epoll_events = zrealloc(state->epoll_events,
sizeof(struct epoll_event) * setsize);
return 0;
}

static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
if (state->use_io_uring) {
io_uring_queue_exit(&state->ring);
} else {
close(state->epfd);
zfree(state->epoll_events);
}
zfree(state);
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;

if (state->use_io_uring) {
int old_mask = eventLoop->events[fd].mask;
int full_mask = old_mask | mask;

/* Cancel existing multishot poll if modifying */
if (old_mask != AE_NONE) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring);
if (!sqe) return -1;
io_uring_prep_poll_remove(sqe, IOURING_USERDATA(fd, old_mask));
io_uring_sqe_set_data64(sqe, 0);
io_uring_submit(&state->ring);

struct io_uring_cqe *cqe;
io_uring_wait_cqe(&state->ring, &cqe);
io_uring_cqe_seen(&state->ring, cqe);
}

struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring);
if (!sqe) return -1;

unsigned poll_mask = 0;
if (full_mask & AE_READABLE) poll_mask |= POLLIN;
if (full_mask & AE_WRITABLE) poll_mask |= POLLOUT;

io_uring_prep_poll_multishot(sqe, fd, poll_mask);
io_uring_sqe_set_data64(sqe, IOURING_USERDATA(fd, full_mask));
io_uring_submit(&state->ring);
return 0;
}

/* epoll fallback */
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;

if (state->use_io_uring) {
int old_mask = eventLoop->events[fd].mask;
int new_mask = old_mask & (~delmask);

struct io_uring_sqe *sqe = io_uring_get_sqe(&state->ring);
if (!sqe) return;
io_uring_prep_poll_remove(sqe, IOURING_USERDATA(fd, old_mask));
io_uring_sqe_set_data64(sqe, 0);
io_uring_submit(&state->ring);

struct io_uring_cqe *cqe;
io_uring_wait_cqe(&state->ring, &cqe);
io_uring_cqe_seen(&state->ring, cqe);

if (new_mask != AE_NONE) {
sqe = io_uring_get_sqe(&state->ring);
if (!sqe) return;

unsigned poll_mask = 0;
if (new_mask & AE_READABLE) poll_mask |= POLLIN;
if (new_mask & AE_WRITABLE) poll_mask |= POLLOUT;

io_uring_prep_poll_multishot(sqe, fd, poll_mask);
io_uring_sqe_set_data64(sqe, IOURING_USERDATA(fd, new_mask));
io_uring_submit(&state->ring);
}
return;
}

/* epoll fallback */
struct epoll_event ee = {0};
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
} else {
epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
}
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;

if (state->use_io_uring) {
int numevents = 0;
struct io_uring_cqe *cqe;

if (tvp != NULL) {
struct __kernel_timespec ts;
ts.tv_sec = tvp->tv_sec;
ts.tv_nsec = tvp->tv_usec * 1000;
int ret = io_uring_submit_and_wait_timeout(&state->ring, &cqe,
1, &ts, NULL);
if (ret < 0 && ret != -ETIME && ret != -EINTR)
return 0;
} else {
int ret = io_uring_submit_and_wait(&state->ring, 1);
if (ret < 0 && ret != -EINTR)
return 0;
}

unsigned head;
int seen = 0;
io_uring_for_each_cqe(&state->ring, head, cqe) {
uint64_t ud = io_uring_cqe_get_data64(cqe);
int tag = IOURING_USERDATA_MASK(ud);
seen++;

/* Skip cancel completions and batch I/O completions */
if (ud == 0 || tag >= IOURING_TAG_BATCH_READ)
continue;

int fd = IOURING_USERDATA_FD(ud);
int res = cqe->res;
int mask = 0;

if (res < 0) {
mask = AE_READABLE | AE_WRITABLE;
} else {
if (res & POLLIN) mask |= AE_READABLE;
if (res & POLLOUT) mask |= AE_WRITABLE;
if (res & POLLERR) mask |= AE_READABLE | AE_WRITABLE;
if (res & POLLHUP) mask |= AE_READABLE | AE_WRITABLE;
}

if (mask && numevents < eventLoop->setsize) {
eventLoop->fired[numevents].fd = fd;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
io_uring_cq_advance(&state->ring, seen);
return numevents;
}

/* epoll fallback */
int retval, numevents = 0;
retval = epoll_wait(state->epfd, state->epoll_events, eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->epoll_events + j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} else if (retval == -1 && errno != EINTR) {
panic("aeApiPoll: epoll_wait, %s", strerror(errno));
}
return numevents;
}

static char *aeApiName(void) {
return "io_uring";
}
6 changes: 6 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3206,6 +3206,12 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
#ifdef HAVE_IO_URING
createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 1, NULL, NULL),
createBoolConfig("io-uring-batch-writes", NULL, IMMUTABLE_CONFIG, server.io_uring_batch_writes, 1, NULL, NULL),
createBoolConfig("io-uring-batch-reads", NULL, IMMUTABLE_CONFIG, server.io_uring_batch_reads, 0, NULL, NULL),
createIntConfig("io-uring-sq-size", NULL, IMMUTABLE_CONFIG, 64, 16384, server.io_uring_sq_size, 1024, INTEGER_CONFIG, NULL, NULL),
#endif
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, PREFETCH_BATCH_MAX_SIZE, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */
Expand Down
2 changes: 2 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
/* Test for polling API */
#ifdef __linux__
#define HAVE_EPOLL 1
/* HAVE_IO_URING is defined via Makefile when USE_IO_URING=yes.
* Requires Linux 5.6+ and liburing. */
#endif

/* Test for accept4() */
Expand Down
Loading
Loading