diff --git a/doc/nn_tcp.adoc b/doc/nn_tcp.adoc index f5bd024bc..3edec8c96 100644 --- a/doc/nn_tcp.adoc +++ b/doc/nn_tcp.adoc @@ -53,6 +53,13 @@ NN_TCP_NODELAY:: delaying of TCP acknowledgments. Using this option improves latency at the expense of throughput. Type of this option is int. Default value is 0. +NN_TCP_QUICKACK:: + This option, when set to 1, requests immediate sending of TCP acknowledgment. + Unlike NN_TCP_NODELAY, this is a one-shot option that is automatically + cleared by the kernel after each receive operation. This option should be + called after nn_recv() to force an immediate ACK for the received data. + This is a Linux-specific option. Type of this option is int. + EXAMPLE ------- diff --git a/src/core/sock.c b/src/core/sock.c index 8348d30e3..60c3e3e5b 100644 --- a/src/core/sock.c +++ b/src/core/sock.c @@ -25,6 +25,7 @@ #include "../protocol.h" #include "../transport.h" +#include "../tcp.h" #include "sock.h" #include "global.h" @@ -296,6 +297,44 @@ static int nn_sock_setopt_inner (struct nn_sock *self, int level, /* Transport-specific options. */ if (level < NN_SOL_SOCKET) { + /* Special handling for TCP_QUICKACK - apply to all active endpoints. + + TCP_QUICKACK must be set on active connections (after recv), so we + need to iterate through all endpoints and apply it to their active + connections. + + Architecture: + - Socket has multiple endpoints (one per nn_bind/nn_connect call) + - Each bind endpoint (btcp) can have multiple accepted connections (atcp) + - Each connect endpoint (ctcp) has one outgoing connection + + This iteration hits all endpoints, and each endpoint's setopt will + in turn apply the option to all its active connections. */ + if (level == NN_TCP && option == NN_TCP_QUICKACK) { + struct nn_list_item *it; + struct nn_ep *ep; + int rc; + int applied = 0; + + /* Iterate through all endpoints (btcp/ctcp instances) */ + for (it = nn_list_begin (&self->eps); + it != nn_list_end (&self->eps); + it = nn_list_next (&self->eps, it)) { + ep = nn_cont (it, struct nn_ep, item); + + /* Only apply to TCP endpoints that have a setopt operation. + This will call nn_btcp_setopt or nn_ctcp_setopt. */ + if (ep->ops.setopt != NULL) { + rc = ep->ops.setopt (ep->tran, option, optval, optvallen); + if (rc == 0) + applied = 1; + } + } + + /* Return success if at least one endpoint accepted it */ + return applied ? 0 : -ENOPROTOOPT; + } + optset = nn_sock_optset (self, level); if (!optset) return -ENOPROTOOPT; diff --git a/src/core/symbol.c b/src/core/symbol.c index 2a404b9ae..343605548 100644 --- a/src/core/symbol.c +++ b/src/core/symbol.c @@ -121,6 +121,7 @@ static const struct nn_symbol_properties sym_value_names [] = { NN_SYM(NN_REQ_RESEND_IVL, TRANSPORT_OPTION, INT, MILLISECONDS), NN_SYM(NN_SURVEYOR_DEADLINE, TRANSPORT_OPTION, INT, MILLISECONDS), NN_SYM(NN_TCP_NODELAY, TRANSPORT_OPTION, INT, BOOLEAN), + NN_SYM(NN_TCP_QUICKACK, TRANSPORT_OPTION, INT, BOOLEAN), NN_SYM(NN_WS_MSG_TYPE, TRANSPORT_OPTION, INT, NONE), NN_SYM(NN_DONTWAIT, FLAG, NONE, NONE), diff --git a/src/tcp.h b/src/tcp.h index 1d9077655..8abfab669 100644 --- a/src/tcp.h +++ b/src/tcp.h @@ -30,6 +30,7 @@ extern "C" { #define NN_TCP -3 #define NN_TCP_NODELAY 1 +#define NN_TCP_QUICKACK 2 #ifdef __cplusplus } diff --git a/src/transport.h b/src/transport.h index 8f89432f3..86da0ad94 100644 --- a/src/transport.h +++ b/src/transport.h @@ -76,6 +76,9 @@ struct nn_ep_ops { /* Deallocate the endpoint object. It will already have been stopped. */ void (*destroy) (void *); + + /* Set a socket option on an active endpoint. Can be NULL if not supported. */ + int (*setopt) (void *, int option, const void *optval, size_t optvallen); }; /* Set up an ep for use by a transport. The final opaque argument is passed diff --git a/src/transports/tcp/atcp.c b/src/transports/tcp/atcp.c index 2f9433123..bc7b7ddac 100644 --- a/src/transports/tcp/atcp.c +++ b/src/transports/tcp/atcp.c @@ -108,6 +108,44 @@ void nn_atcp_stop (struct nn_atcp *self) nn_fsm_stop (&self->fsm); } +int nn_atcp_setopt (struct nn_atcp *self, int option, const void *optval, + size_t optvallen) +{ + int val; + int rc; + + /* This is called for each accepted TCP connection (atcp) to apply + socket options on the active, established connection. + + atcp represents one accepted client connection within a btcp listener. */ + + /* Only handle TCP_QUICKACK for now. */ + if (option != NN_TCP_QUICKACK) + return -ENOPROTOOPT; + + /* Only apply if we're in ACTIVE state with a valid stcp connection. */ + if (self->state != NN_ATCP_STATE_ACTIVE) + return -ENOPROTOOPT; + + /* TCP_QUICKACK is always an int. */ + if (optvallen != sizeof (int)) + return -EINVAL; + val = *(const int*) optval; + + /* Apply TCP_QUICKACK directly to the underlying OS socket. + + We can't use nn_usock_setsockopt() because it has an assertion that + only allows setting options on sockets in STARTING or ACCEPTED state. + Since this connection is already ACTIVE (established and exchanging data), + we access self->usock.s directly to call the OS setsockopt(). */ + rc = setsockopt (self->usock.s, IPPROTO_TCP, TCP_QUICKACK, + &val, sizeof (val)); + if (rc != 0) + return -errno; + + return 0; +} + static void nn_atcp_shutdown (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { diff --git a/src/transports/tcp/atcp.h b/src/transports/tcp/atcp.h index ee9cf45d7..202545f54 100644 --- a/src/transports/tcp/atcp.h +++ b/src/transports/tcp/atcp.h @@ -76,6 +76,8 @@ void nn_atcp_term (struct nn_atcp *self); int nn_atcp_isidle (struct nn_atcp *self); void nn_atcp_start (struct nn_atcp *self, struct nn_usock *listener); void nn_atcp_stop (struct nn_atcp *self); +int nn_atcp_setopt (struct nn_atcp *self, int option, const void *optval, + size_t optvallen); #endif diff --git a/src/transports/tcp/btcp.c b/src/transports/tcp/btcp.c index 1fdef0366..091051044 100644 --- a/src/transports/tcp/btcp.c +++ b/src/transports/tcp/btcp.c @@ -86,9 +86,11 @@ struct nn_btcp { /* nn_ep virtual interface implementation. */ static void nn_btcp_stop (void *); static void nn_btcp_destroy (void *); +static int nn_btcp_setopt (void *, int option, const void *optval, size_t optvallen); const struct nn_ep_ops nn_btcp_ep_ops = { nn_btcp_stop, - nn_btcp_destroy + nn_btcp_destroy, + nn_btcp_setopt }; /* Private functions. */ @@ -188,6 +190,34 @@ static void nn_btcp_destroy (void *self) nn_free (btcp); } +static int nn_btcp_setopt (void *self, int option, const void *optval, + size_t optvallen) +{ + struct nn_btcp *btcp = self; + struct nn_list_item *it; + struct nn_atcp *atcp; + int rc; + int applied = 0; + + /* btcp is a bind endpoint (listener) that can have multiple accepted + connections. Each accepted connection is an atcp instance. + + This is the second level of iteration: + - sock.c iterates through endpoints (including this btcp) + - This function iterates through all atcp connections within this btcp + - Each atcp applies the option to its underlying socket */ + for (it = nn_list_begin (&btcp->atcps); + it != nn_list_end (&btcp->atcps); + it = nn_list_next (&btcp->atcps, it)) { + atcp = nn_cont (it, struct nn_atcp, item); + rc = nn_atcp_setopt (atcp, option, optval, optvallen); + if (rc == 0) + applied = 1; + } + + return applied ? 0 : -ENOPROTOOPT; +} + static void nn_btcp_shutdown (struct nn_fsm *self, int src, int type, void *srcptr) { diff --git a/src/transports/tcp/ctcp.c b/src/transports/tcp/ctcp.c index eedcc2d0f..d473d73c8 100644 --- a/src/transports/tcp/ctcp.c +++ b/src/transports/tcp/ctcp.c @@ -95,9 +95,11 @@ struct nn_ctcp { /* nn_ep virtual interface implementation. */ static void nn_ctcp_stop (void *); static void nn_ctcp_destroy (void *); +static int nn_ctcp_setopt (void *, int option, const void *optval, size_t optvallen); const struct nn_ep_ops nn_ctcp_ep_ops = { nn_ctcp_stop, - nn_ctcp_destroy + nn_ctcp_destroy, + nn_ctcp_setopt }; /* Private functions. */ @@ -222,6 +224,44 @@ static void nn_ctcp_destroy (void *self) nn_free (ctcp); } +static int nn_ctcp_setopt (void *self, int option, const void *optval, + size_t optvallen) +{ + struct nn_ctcp *ctcp = self; + int val; + int rc; + + /* ctcp is a connect endpoint (outgoing connection). + Unlike btcp which can have multiple atcp connections, ctcp represents + a single outgoing connection, so we apply the option directly. */ + + /* Only handle TCP_QUICKACK for now. */ + if (option != NN_TCP_QUICKACK) + return -ENOPROTOOPT; + + /* Only apply if we're in ACTIVE state with a valid stcp connection. */ + if (ctcp->state != NN_CTCP_STATE_ACTIVE) + return -ENOPROTOOPT; + + /* TCP_QUICKACK is always an int. */ + if (optvallen != sizeof (int)) + return -EINVAL; + val = *(const int*) optval; + + /* Apply TCP_QUICKACK directly to the underlying OS socket. + + We can't use nn_usock_setsockopt() because it has an assertion that + only allows setting options on sockets in STARTING or ACCEPTED state. + Since this connection is already ACTIVE (established and exchanging data), + we access ctcp->usock.s directly to call the OS setsockopt(). */ + rc = setsockopt (ctcp->usock.s, IPPROTO_TCP, TCP_QUICKACK, + &val, sizeof (val)); + if (rc != 0) + return -errno; + + return 0; +} + static void nn_ctcp_shutdown (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { diff --git a/tests/tcp.c b/tests/tcp.c index 0d950aef5..55ded8395 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -220,5 +220,28 @@ int main (int argc, const char *argv[]) nn_sleep (100); test_close (sc); + /* Test TCP_QUICKACK socket option. */ + sb = test_socket (AF_SP, NN_PAIR); + test_bind (sb, socket_address); + sc = test_socket (AF_SP, NN_PAIR); + test_connect (sc, socket_address); + nn_sleep (100); + + /* Send and receive a message. */ + test_send (sc, "HELLO"); + test_recv (sb, "HELLO"); + + /* Set TCP_QUICKACK after recv - should succeed on active connection. */ + opt = 1; + rc = nn_setsockopt (sb, NN_TCP, NN_TCP_QUICKACK, &opt, sizeof (opt)); + errno_assert (rc == 0); + + /* Try setting it on the sender side too. */ + rc = nn_setsockopt (sc, NN_TCP, NN_TCP_QUICKACK, &opt, sizeof (opt)); + errno_assert (rc == 0); + + test_close (sb); + test_close (sc); + return 0; }