From 91fef9536293dcc782594d142bee46648f86fb3c Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Sun, 19 Apr 2026 10:21:53 +0300 Subject: [PATCH 1/5] rpc : refactor the RPC transport (#21998) * rpc : refactor the RPC transport Move all transport related code into a separate file and use the socket_t interface to hide all transport implementation details. * fix win32 * better socket_t construction --- ggml/src/ggml-rpc/CMakeLists.txt | 1 + ggml/src/ggml-rpc/ggml-rpc.cpp | 806 +++---------------------------- ggml/src/ggml-rpc/transport.cpp | 683 ++++++++++++++++++++++++++ ggml/src/ggml-rpc/transport.h | 34 ++ 4 files changed, 782 insertions(+), 742 deletions(-) create mode 100644 ggml/src/ggml-rpc/transport.cpp create mode 100644 ggml/src/ggml-rpc/transport.h diff --git a/ggml/src/ggml-rpc/CMakeLists.txt b/ggml/src/ggml-rpc/CMakeLists.txt index 8671ce5ceaf..40e11fead63 100644 --- a/ggml/src/ggml-rpc/CMakeLists.txt +++ b/ggml/src/ggml-rpc/CMakeLists.txt @@ -2,6 +2,7 @@ message(STATUS "Using RPC backend") ggml_add_backend_library(ggml-rpc ggml-rpc.cpp + transport.cpp ) if (WIN32) diff --git a/ggml/src/ggml-rpc/ggml-rpc.cpp b/ggml/src/ggml-rpc/ggml-rpc.cpp index 017ef0af360..2ded7397868 100644 --- a/ggml/src/ggml-rpc/ggml-rpc.cpp +++ b/ggml/src/ggml-rpc/ggml-rpc.cpp @@ -2,6 +2,7 @@ #include "ggml-impl.h" #include "ggml-backend-impl.h" #include "ggml-cpp.h" +#include "transport.h" #include #include @@ -12,35 +13,11 @@ #include #include #include -#ifdef _WIN32 -# define WIN32_LEAN_AND_MEAN -# ifndef NOMINMAX -# define NOMINMAX -# endif -# include -# include -#else -# include -# include -# include -# include -# include -# include -# include -#endif #include #include #include #include -#ifdef GGML_RPC_RDMA -# include -# include -# ifndef _WIN32 -# include -# endif -#endif // GGML_RPC_RDMA - static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); #define LOG_DBG(...) \ @@ -49,128 +26,6 @@ static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); namespace fs = std::filesystem; -static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB - -#ifdef _WIN32 -typedef SOCKET sockfd_t; -using ssize_t = __int64; -#else -typedef int sockfd_t; -#endif - -// cross-platform socket - -#ifdef GGML_RPC_RDMA -static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock) -static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB -static constexpr size_t RDMA_GID_SIZE = 16; // RoCE GID / IB GID is always 16 bytes -using rdma_gid_t = std::array; - -struct rdma_conn { - struct ibv_context * ctx = nullptr; - struct ibv_pd * pd = nullptr; - struct ibv_cq * scq = nullptr; // send completions - struct ibv_cq * rcq = nullptr; // recv completions - struct ibv_qp * qp = nullptr; - - void * tx_buf = nullptr; - struct ibv_mr * tx_mr = nullptr; - - void * rx_buf = nullptr; // RDMA_RX_DEPTH × RDMA_CHUNK contiguous - struct ibv_mr * rx_mr = nullptr; - int rx_head = 0; - - uint32_t max_inline = 0; - - uint8_t * rx_slot(int i) const { - return static_cast(rx_buf) + static_cast(i) * RDMA_CHUNK; - } - - bool post_rx(int i) { - struct ibv_sge sge = {}; - sge.addr = (uintptr_t)rx_slot(i); - sge.length = RDMA_CHUNK; - sge.lkey = rx_mr->lkey; - struct ibv_recv_wr wr = {}, * bad = nullptr; - wr.wr_id = (uint64_t)i; - wr.sg_list = &sge; - wr.num_sge = 1; - return ibv_post_recv(qp, &wr, &bad) == 0; - } - - ~rdma_conn() { - if (tx_mr) ibv_dereg_mr(tx_mr); - if (rx_mr) ibv_dereg_mr(rx_mr); - free(tx_buf); - free(rx_buf); - if (qp) ibv_destroy_qp(qp); - if (scq) ibv_destroy_cq(scq); - if (rcq) ibv_destroy_cq(rcq); - if (pd) ibv_dealloc_pd(pd); - if (ctx) ibv_close_device(ctx); - } -}; - -// Local RDMA parameters captured during the probe phase and later consumed -// by rdma_activate() after the remote side's caps arrive via HELLO. -struct rdma_local_info { - uint32_t qpn = 0; - uint32_t psn = 0; - uint8_t gid[RDMA_GID_SIZE] = {}; - uint8_t ib_port = 0; - int gid_idx = 0; - enum ibv_mtu path_mtu = IBV_MTU_1024; -}; -#endif // GGML_RPC_RDMA - -// conn_caps size for transport-agnostic capability exchange -static constexpr size_t RPC_CONN_CAPS_SIZE = 24; - -// conn_caps RDMA layout helper -#ifdef GGML_RPC_RDMA -struct rdma_caps { - uint32_t qpn; - uint32_t psn; - uint8_t gid[RDMA_GID_SIZE]; -}; -static_assert(sizeof(rdma_caps) == RPC_CONN_CAPS_SIZE, "rdma_caps must match conn_caps size"); -#endif // GGML_RPC_RDMA - -// Forward declarations for transport function pointers -struct socket_t; -static bool tcp_send_impl(socket_t * sock, const void * data, size_t size); -static bool tcp_recv_impl(socket_t * sock, void * data, size_t size); - -struct socket_t { - sockfd_t fd; - bool (*fn_send)(socket_t *, const void *, size_t) = tcp_send_impl; - bool (*fn_recv)(socket_t *, void *, size_t) = tcp_recv_impl; -#ifdef GGML_RPC_RDMA - std::unique_ptr rdma; - rdma_local_info rdma_local = {}; -#endif // GGML_RPC_RDMA - socket_t(sockfd_t fd) : fd(fd) {} - ~socket_t() { -#ifdef GGML_RPC_RDMA - rdma.reset(); -#endif // GGML_RPC_RDMA - LOG_DBG("[%s] closing socket %d\n", __func__, this->fd); -#ifdef _WIN32 - if (fd != INVALID_SOCKET) closesocket(this->fd); -#else - if (fd >= 0) close(this->fd); -#endif - } - - // Advertise local transport capabilities into conn_caps. - // May probe RDMA and store the probe on this socket for update_caps. - void get_caps(uint8_t * caps); - - // Activate transport upgrade based on remote conn_caps using the probe - // previously stored by get_caps. - void update_caps(const uint8_t * remote_caps); -}; - // macro for nicer error messages on server crash #define RPC_STATUS_ASSERT(x) if (!(x)) GGML_ABORT("Remote RPC server crashed or returned malformed response") @@ -403,540 +258,27 @@ static uint64_t fnv_hash(const uint8_t * data, size_t len) { return hash; } -static std::shared_ptr make_socket(sockfd_t fd) { -#ifdef _WIN32 - if (fd == INVALID_SOCKET) { - return nullptr; - } -#else - if (fd < 0) { - return nullptr; - } -#endif - return std::make_shared(fd); -} - -static bool set_no_delay(sockfd_t sockfd) { - int flag = 1; - // set TCP_NODELAY to disable Nagle's algorithm - int ret = setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int)); - return ret == 0; -} - -static bool set_reuse_addr(sockfd_t sockfd) { - int flag = 1; - int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof(int)); - return ret == 0; -} - -static std::shared_ptr socket_connect(const char * host, int port) { - struct sockaddr_in addr; - auto sockfd = socket(AF_INET, SOCK_STREAM, 0); - auto sock_ptr = make_socket(sockfd); - if (sock_ptr == nullptr) { - return nullptr; - } - if (!set_no_delay(sockfd)) { - GGML_LOG_ERROR("Failed to set TCP_NODELAY\n"); - return nullptr; - } - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - struct hostent * server = gethostbyname(host); - if (server == NULL) { - GGML_LOG_ERROR("Cannot resolve host '%s'\n", host); - return nullptr; - } - memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length); - if (connect(sock_ptr->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - return nullptr; - } - return sock_ptr; -} - -static std::shared_ptr socket_accept(sockfd_t srv_sockfd) { - auto client_socket_fd = accept(srv_sockfd, NULL, NULL); - auto client_socket = make_socket(client_socket_fd); - if (client_socket == nullptr) { - return nullptr; - } - if (!set_no_delay(client_socket_fd)) { - GGML_LOG_ERROR("Failed to set TCP_NODELAY\n"); - return nullptr; - } - return client_socket; -} - -static std::shared_ptr create_server_socket(const char * host, int port) { - auto sockfd = socket(AF_INET, SOCK_STREAM, 0); - auto sock = make_socket(sockfd); - if (sock == nullptr) { - return nullptr; - } - if (!set_reuse_addr(sockfd)) { - GGML_LOG_ERROR("Failed to set SO_REUSEADDR\n"); - return nullptr; - } - if (inet_addr(host) == INADDR_NONE) { - GGML_LOG_ERROR("Invalid host address: %s\n", host); - return nullptr; - } - struct sockaddr_in serv_addr; - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = inet_addr(host); - serv_addr.sin_port = htons(port); - - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - return nullptr; - } - if (listen(sockfd, 1) < 0) { - return nullptr; - } - return sock; -} - -static bool send_data(sockfd_t sockfd, const void * data, size_t size) { - size_t bytes_sent = 0; - while (bytes_sent < size) { - size_t size_to_send = std::min(size - bytes_sent, MAX_CHUNK_SIZE); - ssize_t n = send(sockfd, (const char *)data + bytes_sent, size_to_send, 0); - if (n < 0) { - GGML_LOG_ERROR("send failed (bytes_sent=%zu, size_to_send=%zu)\n", - bytes_sent, size_to_send); - return false; - } - bytes_sent += (size_t)n; - } - return true; -} - -static bool recv_data(sockfd_t sockfd, void * data, size_t size) { - size_t bytes_recv = 0; - while (bytes_recv < size) { - size_t size_to_recv = std::min(size - bytes_recv, MAX_CHUNK_SIZE); - ssize_t n = recv(sockfd, (char *)data + bytes_recv, size_to_recv, 0); - if (n < 0) { - GGML_LOG_ERROR("recv failed (bytes_recv=%zu, size_to_recv=%zu)\n", - bytes_recv, size_to_recv); - return false; - } - if (n == 0) { - LOG_DBG("recv returned 0 (peer closed?)\n"); - return false; - } - bytes_recv += (size_t)n; - } - return true; -} - -// TCP transport implementations (for function-pointer dispatch) - -static bool tcp_send_impl(socket_t * sock, const void * data, size_t size) { - return send_data(sock->fd, data, size); -} - -static bool tcp_recv_impl(socket_t * sock, void * data, size_t size) { - return recv_data(sock->fd, data, size); -} - -// RDMA transport (performance-optimized, auto-negotiated) - -#ifdef GGML_RPC_RDMA - -static bool rdma_send_impl(socket_t * sock, const void * data, size_t size); -static bool rdma_recv_impl(socket_t * sock, void * data, size_t size); - -static inline bool tcp_peer_closed(int fd) { - if (fd < 0) return false; -#ifndef _WIN32 - struct pollfd pfd = { fd, POLLIN | POLLRDHUP, 0 }; - int r = poll(&pfd, 1, 0); - return r > 0 && (pfd.revents & (POLLHUP | POLLERR | POLLRDHUP)); -#else - return false; -#endif -} - -static inline bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc, int tcp_fd) { - for (uint64_t s = 0; ; s++) { - int n = ibv_poll_cq(cq, 1, wc); - if (n > 0) { - if (wc->status != IBV_WC_SUCCESS) { - GGML_LOG_ERROR("RDMA CQ wc error: status=%d (%s) vendor_err=0x%x\n", - wc->status, ibv_wc_status_str(wc->status), wc->vendor_err); - } - return wc->status == IBV_WC_SUCCESS; - } - if (n < 0) return false; - if ((s & 0xFFFFF) == 0 && s > 0) { - if (tcp_peer_closed(tcp_fd)) { - return false; - } - } - } -} - -static bool rdma_send(rdma_conn * c, const void * data, size_t size, int tcp_fd) { - const uint8_t * src = (const uint8_t *)data; - size_t rem = size; - while (rem > 0) { - size_t chunk = std::min(rem, RDMA_CHUNK); - - struct ibv_sge sge = {}; - struct ibv_send_wr wr = {}, * bad = nullptr; - wr.opcode = IBV_WR_SEND; - wr.sg_list = &sge; - wr.num_sge = 1; - - if (chunk <= c->max_inline) { - sge.addr = (uintptr_t)src; - sge.length = chunk; - wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_INLINE; - } else { - memcpy(c->tx_buf, src, chunk); - sge.addr = (uintptr_t)c->tx_buf; - sge.length = chunk; - sge.lkey = c->tx_mr->lkey; - wr.send_flags = IBV_SEND_SIGNALED; - } - - if (ibv_post_send(c->qp, &wr, &bad) != 0) return false; - struct ibv_wc wc; - if (!rdma_poll(c->scq, &wc, tcp_fd)) return false; - - src += chunk; - rem -= chunk; - } - return true; -} - - -static bool rdma_recv(rdma_conn * c, void * data, size_t size, int tcp_fd) { - uint8_t * dst = (uint8_t *)data; - size_t rem = size; - while (rem > 0) { - struct ibv_wc wc; - if (!rdma_poll(c->rcq, &wc, tcp_fd)) return false; - - int slot = (int)wc.wr_id; - size_t got = wc.byte_len; - memcpy(dst, c->rx_slot(slot), got); - - if (!c->post_rx(slot)) return false; - - dst += got; - rem -= got; - } - return true; -} - -static bool rdma_send_impl(socket_t * sock, const void * data, size_t size) { - return rdma_send(sock->rdma.get(), data, size, sock->fd); -} - -static bool rdma_recv_impl(socket_t * sock, void * data, size_t size) { - return rdma_recv(sock->rdma.get(), data, size, sock->fd); -} - -// Build a RoCE GID-shaped 16-byte target from a TCP socket's local address. -// Used to match the socket's local IP against the kernel's GID table so that -// a single memcmp handles IPv4, IPv4-mapped IPv6, and native IPv6 uniformly: -// AF_INET -> ::ffff:a.b.c.d (bytes 10-11 = 0xff, last 4 = IPv4) -// AF_INET6 (IPv4-mapped) -> ::ffff:a.b.c.d (already in GID shape) -// AF_INET6 (native v6) -> the 16-byte IPv6 address as-is -// Returns std::nullopt on unsupported family or getsockname failure. -static std::optional rdma_build_target_gid(sockfd_t tcp_fd) { - sockaddr_storage addr = {}; - socklen_t addr_len = sizeof(addr); - if (getsockname(tcp_fd, reinterpret_cast(&addr), &addr_len) != 0) { - return std::nullopt; - } - rdma_gid_t target = {}; - if (addr.ss_family == AF_INET) { - const auto * a = reinterpret_cast(&addr); - target[10] = 0xff; - target[11] = 0xff; - memcpy(&target[12], &a->sin_addr, 4); - return target; - } - if (addr.ss_family == AF_INET6) { - const auto * a = reinterpret_cast(&addr); - memcpy(target.data(), &a->sin6_addr, RDMA_GID_SIZE); - return target; - } - return std::nullopt; -} - -static rdma_conn * rdma_probe(sockfd_t tcp_fd, rdma_local_info * out) { - const char * dev_env = std::getenv("GGML_RDMA_DEV"); - const char * gid_env = std::getenv("GGML_RDMA_GID"); - - auto target_gid = rdma_build_target_gid(tcp_fd); - if (!target_gid) { - return nullptr; - } - - const uint8_t ib_port = 1; - int num_devs = 0; - ibv_device ** devs = ibv_get_device_list(&num_devs); - if (!devs || num_devs == 0) return nullptr; - - ibv_context * ibctx = nullptr; - const char * matched_dev = nullptr; - int gid_idx = gid_env ? atoi(gid_env) : -1; - int gid_version = IBV_GID_TYPE_IB; // 0 = unknown/IB - - for (int d = 0; d < num_devs; d++) { - const char * dn = ibv_get_device_name(devs[d]); - if (dev_env && strcmp(dev_env, dn) != 0) continue; - - ibv_context * ctx = ibv_open_device(devs[d]); - if (!ctx) continue; - - ibv_port_attr pa; - if (ibv_query_port(ctx, ib_port, &pa) != 0) { ibv_close_device(ctx); continue; } - - int found_gid = gid_idx; - int found_version = IBV_GID_TYPE_IB; - if (found_gid < 0) { - // Find a GID on this port whose bytes equal the local TCP address - // (IPv4 or IPv6). Prefer RoCE v2 (UDP/IP, L3-routable) over v1 - // (raw Ethernet, same-L2 only) so silent hangs on L3-routed paths - // are avoided. ibv_query_gid_ex returns gid+type in one call. - int v2_idx = -1; - int v1_idx = -1; - for (int i = 0; i < pa.gid_tbl_len; i++) { - ibv_gid_entry entry = {}; - if (ibv_query_gid_ex(ctx, ib_port, i, &entry, 0) != 0) continue; - if (memcmp(entry.gid.raw, target_gid->data(), RDMA_GID_SIZE) != 0) continue; - if (entry.gid_type == IBV_GID_TYPE_ROCE_V2 && v2_idx < 0) { - v2_idx = i; - } else if (entry.gid_type == IBV_GID_TYPE_ROCE_V1 && v1_idx < 0) { - v1_idx = i; - } - } - if (v2_idx >= 0) { - found_gid = v2_idx; - found_version = IBV_GID_TYPE_ROCE_V2; - } else if (v1_idx >= 0) { - found_gid = v1_idx; - found_version = IBV_GID_TYPE_ROCE_V1; - } - } else { - // Explicit GID index from GGML_RDMA_GID — fetch its type for logging. - ibv_gid_entry entry = {}; - if (ibv_query_gid_ex(ctx, ib_port, found_gid, &entry, 0) == 0) { - found_version = entry.gid_type; - } - } - if (found_gid >= 0) { - ibctx = ctx; - gid_idx = found_gid; - gid_version = found_version; - matched_dev = dn; - out->path_mtu = pa.active_mtu; - break; - } - ibv_close_device(ctx); - } - ibv_free_device_list(devs); - if (!ibctx) return nullptr; - - out->ib_port = ib_port; - out->gid_idx = gid_idx; - - // unique_ptr owns ibctx and every subsequent resource via ~rdma_conn(), - // so each failure path is a plain `return nullptr;`. - auto c = std::make_unique(); - c->ctx = ibctx; - - c->pd = ibv_alloc_pd(ibctx); - if (!c->pd) return nullptr; - - c->scq = ibv_create_cq(ibctx, 16, nullptr, nullptr, 0); - c->rcq = ibv_create_cq(ibctx, RDMA_RX_DEPTH + 4, nullptr, nullptr, 0); - if (!c->scq || !c->rcq) return nullptr; - - ibv_qp_init_attr qia = {}; - qia.send_cq = c->scq; - qia.recv_cq = c->rcq; - qia.qp_type = IBV_QPT_RC; - qia.cap.max_send_wr = 4; - qia.cap.max_recv_wr = RDMA_RX_DEPTH + 4; - qia.cap.max_send_sge = 1; - qia.cap.max_recv_sge = 1; - qia.cap.max_inline_data = 256; - - c->qp = ibv_create_qp(c->pd, &qia); - if (!c->qp) return nullptr; - c->max_inline = qia.cap.max_inline_data; - - c->tx_buf = aligned_alloc(4096, RDMA_CHUNK); - c->rx_buf = aligned_alloc(4096, static_cast(RDMA_RX_DEPTH) * RDMA_CHUNK); - if (!c->tx_buf || !c->rx_buf) return nullptr; - - c->tx_mr = ibv_reg_mr(c->pd, c->tx_buf, RDMA_CHUNK, IBV_ACCESS_LOCAL_WRITE); - c->rx_mr = ibv_reg_mr(c->pd, c->rx_buf, static_cast(RDMA_RX_DEPTH) * RDMA_CHUNK, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); - if (!c->tx_mr || !c->rx_mr) return nullptr; - - ibv_gid local_gid; - if (ibv_query_gid(ibctx, ib_port, gid_idx, &local_gid) != 0) return nullptr; - - out->qpn = c->qp->qp_num; - out->psn = c->qp->qp_num & 0xffffff; - memcpy(out->gid, &local_gid, RDMA_GID_SIZE); - - const char * ver_str = ""; - if (gid_version == IBV_GID_TYPE_ROCE_V2) { - ver_str = " RoCEv2"; - } else if (gid_version == IBV_GID_TYPE_ROCE_V1) { - ver_str = " RoCEv1"; - } - GGML_LOG_INFO("RDMA probed: dev=%s gid=%d%s qpn=%u inline=%u\n", - matched_dev, gid_idx, ver_str, out->qpn, c->max_inline); - return c.release(); -} - -// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET->INIT->pre-post->RTR->RTS. -// On success, the connection is live and ready for rdma_send/rdma_recv. -static bool rdma_activate(rdma_conn * c, const rdma_local_info * local, - uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) { - // RESET -> INIT - { - struct ibv_qp_attr a = {}; - a.qp_state = IBV_QPS_INIT; - a.port_num = local->ib_port; - a.pkey_index = 0; - a.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; - if (ibv_modify_qp(c->qp, &a, - IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { - return false; - } - } - - for (int i = 0; i < RDMA_RX_DEPTH; i++) { - if (!c->post_rx(i)) return false; - } - - // INIT -> RTR - { - struct ibv_qp_attr a = {}; - a.qp_state = IBV_QPS_RTR; - a.path_mtu = local->path_mtu; - a.dest_qp_num = remote_qpn; - a.rq_psn = remote_psn; - a.max_dest_rd_atomic = 1; - a.min_rnr_timer = 1; - a.ah_attr.is_global = 1; - memcpy(&a.ah_attr.grh.dgid, remote_gid, RDMA_GID_SIZE); - a.ah_attr.grh.hop_limit = 1; - a.ah_attr.grh.sgid_index = local->gid_idx; - a.ah_attr.dlid = 0; - a.ah_attr.port_num = local->ib_port; - if (ibv_modify_qp(c->qp, &a, - IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | - IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER) != 0) { - return false; - } - } - - // RTR -> RTS - { - struct ibv_qp_attr a = {}; - a.qp_state = IBV_QPS_RTS; - a.timeout = 14; - a.retry_cnt = 7; - a.rnr_retry = 7; - a.sq_psn = local->psn; - a.max_rd_atomic = 1; - if (ibv_modify_qp(c->qp, &a, - IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | - IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC) != 0) { - return false; - } - } - - GGML_LOG_INFO("RDMA activated: qpn=%u->%u mtu=%d rx_depth=%d\n", - local->qpn, remote_qpn, 128 << local->path_mtu, RDMA_RX_DEPTH); - return true; -} - -#endif // GGML_RPC_RDMA - -// --------------------------------------------------------------------------- -// socket_t transport capability methods -// --------------------------------------------------------------------------- - -void socket_t::get_caps(uint8_t * caps) { - memset(caps, 0, RPC_CONN_CAPS_SIZE); -#ifdef GGML_RPC_RDMA - rdma_local = {}; - rdma.reset(rdma_probe(fd, &rdma_local)); - if (rdma) { - rdma_caps rc = {}; - rc.qpn = rdma_local.qpn; - rc.psn = rdma_local.psn; - memcpy(rc.gid, rdma_local.gid, RDMA_GID_SIZE); - memcpy(caps, &rc, sizeof(rc)); - } -#endif // GGML_RPC_RDMA -} - -void socket_t::update_caps(const uint8_t * remote_caps) { -#ifdef GGML_RPC_RDMA - if (!rdma) { - return; - } - rdma_caps rc = {}; - memcpy(&rc, remote_caps, sizeof(rc)); - if (rc.qpn == 0) { - rdma.reset(); - return; - } - if (rdma_activate(rdma.get(), &rdma_local, rc.qpn, rc.psn, rc.gid)) { - fn_send = rdma_send_impl; - fn_recv = rdma_recv_impl; - } else { - GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n"); - rdma.reset(); - } -#else - (void)remote_caps; -#endif // GGML_RPC_RDMA -} - -// unified transport dispatch (via function pointers) - -static bool send_data(socket_t * sock, const void * data, size_t size) { - return sock->fn_send(sock, data, size); -} - -static bool recv_data(socket_t * sock, void * data, size_t size) { - return sock->fn_recv(sock, data, size); -} - -static bool send_msg(socket_t * sock, const void * msg, size_t msg_size) { - if (!send_data(sock, &msg_size, sizeof(msg_size))) { +static bool send_msg(socket_ptr sock, const void * msg, size_t msg_size) { + if (!sock->send_data(&msg_size, sizeof(msg_size))) { return false; } - return send_data(sock, msg, msg_size); + return sock->send_data(msg, msg_size); } -static bool recv_msg(socket_t * sock, void * msg, size_t msg_size) { +static bool recv_msg(socket_ptr sock, void * msg, size_t msg_size) { uint64_t size; - if (!recv_data(sock, &size, sizeof(size))) { + if (!sock->recv_data(&size, sizeof(size))) { return false; } if (size != msg_size) { return false; } - return recv_data(sock, msg, msg_size); + return sock->recv_data(msg, msg_size); } -static bool recv_msg(socket_t * sock, std::vector & input) { +static bool recv_msg(socket_ptr sock, std::vector & input) { uint64_t size; - if (!recv_data(sock, &size, sizeof(size))) { + if (!sock->recv_data(&size, sizeof(size))) { return false; } try { @@ -945,7 +287,7 @@ static bool recv_msg(socket_t * sock, std::vector & input) { GGML_LOG_ERROR("Failed to allocate input buffer of size %" PRIu64 "\n", size); return false; } - return recv_data(sock, input.data(), size); + return sock->recv_data(input.data(), size); } static bool parse_endpoint(const std::string & endpoint, std::string & host, int & port) { @@ -964,15 +306,15 @@ static bool parse_endpoint(const std::string & endpoint, std::string & host, int // RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) | // No response -static bool send_rpc_cmd(const std::shared_ptr & sock, enum rpc_cmd cmd, const void * input, size_t input_size) { +static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size) { uint8_t cmd_byte = cmd; - if (!send_data(sock.get(), &cmd_byte, sizeof(cmd_byte))) { + if (!sock->send_data(&cmd_byte, sizeof(cmd_byte))) { return false; } - if (!send_data(sock.get(), &input_size, sizeof(input_size))) { + if (!sock->send_data(&input_size, sizeof(input_size))) { return false; } - if (!send_data(sock.get(), input, input_size)) { + if (!sock->send_data(input, input_size)) { return false; } return true; @@ -980,18 +322,18 @@ static bool send_rpc_cmd(const std::shared_ptr & sock, enum rpc_cmd cm // RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) | // RPC response: | response_size (8 bytes) | response_data (response_size bytes) | -static bool send_rpc_cmd(const std::shared_ptr & sock, enum rpc_cmd cmd, const void * input, size_t input_size, void * output, size_t output_size) { +static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size, void * output, size_t output_size) { if (!send_rpc_cmd(sock, cmd, input, input_size)) { return false; } uint64_t out_size; - if (!recv_data(sock.get(), &out_size, sizeof(out_size))) { + if (!sock->recv_data(&out_size, sizeof(out_size))) { return false; } if (out_size != output_size) { return false; } - if (!recv_data(sock.get(), output, output_size)) { + if (!sock->recv_data(output, output_size)) { return false; } return true; @@ -1025,7 +367,6 @@ static std::shared_ptr get_socket(const std::string & endpoint) { static std::mutex mutex; std::lock_guard lock(mutex); static std::unordered_map> sockets; - static bool initialized = false; auto it = sockets.find(endpoint); if (it != sockets.end()) { @@ -1040,19 +381,10 @@ static std::shared_ptr get_socket(const std::string & endpoint) { return nullptr; } -#ifdef _WIN32 - if (!initialized) { - WSADATA wsaData; - int res = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (res != 0) { - return nullptr; - } - initialized = true; + if (!rpc_transport_init()) { + return nullptr; } -#else - GGML_UNUSED(initialized); -#endif - auto sock = socket_connect(host.c_str(), port); + auto sock = socket_t::connect(host.c_str(), port); if (sock == nullptr) { return nullptr; } @@ -2110,10 +1442,10 @@ rpc_server::~rpc_server() { } static void rpc_serve_client(const std::vector & backends, const char * cache_dir, - socket_t * sockfd) { + socket_ptr sock) { rpc_server server(backends, cache_dir); uint8_t cmd; - if (!recv_data(sockfd, &cmd, 1)) { + if (!sock->recv_data(&cmd, 1)) { return; } if (cmd != RPC_CMD_HELLO) { @@ -2123,7 +1455,7 @@ static void rpc_serve_client(const std::vector & backends, const // Read input_size and validate protocol version uint64_t hello_input_size; - if (!recv_data(sockfd, &hello_input_size, sizeof(hello_input_size))) { + if (!sock->recv_data(&hello_input_size, sizeof(hello_input_size))) { return; } @@ -2134,24 +1466,22 @@ static void rpc_serve_client(const std::vector & backends, const } rpc_msg_hello_req req = {}; - if (!recv_data(sockfd, &req, sizeof(req))) { + if (!sock->recv_data(&req, sizeof(req))) { return; } rpc_msg_hello_rsp rsp = {}; server.hello(rsp); - // Advertise server transport capabilities based on client's caps - sockfd->get_caps(rsp.conn_caps); - - if (!send_msg(sockfd, &rsp, sizeof(rsp))) { + sock->get_caps(rsp.conn_caps); + if (!send_msg(sock, &rsp, sizeof(rsp))) { return; } // Activate transport upgrade using client's caps - sockfd->update_caps(req.conn_caps); + sock->update_caps(req.conn_caps); while (true) { - if (!recv_data(sockfd, &cmd, 1)) { + if (!sock->recv_data(&cmd, 1)) { break; } if (cmd >= RPC_CMD_COUNT) { @@ -2165,115 +1495,115 @@ static void rpc_serve_client(const std::vector & backends, const return; } case RPC_CMD_DEVICE_COUNT: { - if (!recv_msg(sockfd, nullptr, 0)) { + if (!recv_msg(sock, nullptr, 0)) { return; } rpc_msg_device_count_rsp response; response.device_count = backends.size(); - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_ALLOC_BUFFER: { rpc_msg_alloc_buffer_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_alloc_buffer_rsp response; if (!server.alloc_buffer(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_GET_ALLOC_SIZE: { rpc_msg_get_alloc_size_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_get_alloc_size_rsp response; if (!server.get_alloc_size(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_GET_ALIGNMENT: { rpc_msg_get_alignment_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_get_alignment_rsp response; if (!server.get_alignment(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_GET_MAX_SIZE: { rpc_msg_get_max_size_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_get_max_size_rsp response; if (!server.get_max_size(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_BUFFER_GET_BASE: { rpc_msg_buffer_get_base_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_buffer_get_base_rsp response; if (!server.buffer_get_base(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_FREE_BUFFER: { rpc_msg_free_buffer_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } if (!server.free_buffer(request)) { return; } - if (!send_msg(sockfd, nullptr, 0)) { + if (!send_msg(sock, nullptr, 0)) { return; } break; } case RPC_CMD_BUFFER_CLEAR: { rpc_msg_buffer_clear_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } if (!server.buffer_clear(request)) { return; } - if (!send_msg(sockfd, nullptr, 0)) { + if (!send_msg(sock, nullptr, 0)) { return; } break; } case RPC_CMD_SET_TENSOR: { std::vector input; - if (!recv_msg(sockfd, input)) { + if (!recv_msg(sock, input)) { return; } if (!server.set_tensor(input)) { @@ -2283,62 +1613,62 @@ static void rpc_serve_client(const std::vector & backends, const } case RPC_CMD_SET_TENSOR_HASH: { rpc_msg_set_tensor_hash_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_set_tensor_hash_rsp response; if (!server.set_tensor_hash(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_INIT_TENSOR: { rpc_msg_init_tensor_req request; - if (!recv_msg(sockfd, &request,sizeof(request))) { + if (!recv_msg(sock, &request,sizeof(request))) { return; } if (!server.init_tensor(request)) { return; } - if (!send_msg(sockfd, nullptr, 0)) { + if (!send_msg(sock, nullptr, 0)) { return; } break; } case RPC_CMD_GET_TENSOR: { rpc_msg_get_tensor_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } std::vector response; if (!server.get_tensor(request, response)) { return; } - if (!send_msg(sockfd, response.data(), response.size())) { + if (!send_msg(sock, response.data(), response.size())) { return; } break; } case RPC_CMD_COPY_TENSOR: { rpc_msg_copy_tensor_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_copy_tensor_rsp response; if (!server.copy_tensor(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; } case RPC_CMD_GRAPH_COMPUTE: { std::vector input; - if (!recv_msg(sockfd, input)) { + if (!recv_msg(sock, input)) { return; } if (!server.graph_compute(input)) { @@ -2348,7 +1678,7 @@ static void rpc_serve_client(const std::vector & backends, const } case RPC_CMD_GRAPH_RECOMPUTE: { rpc_msg_graph_recompute_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } if (!server.graph_recompute(request)) { @@ -2358,14 +1688,14 @@ static void rpc_serve_client(const std::vector & backends, const } case RPC_CMD_GET_DEVICE_MEMORY: { rpc_msg_get_device_memory_req request; - if (!recv_msg(sockfd, &request, sizeof(request))) { + if (!recv_msg(sock, &request, sizeof(request))) { return; } rpc_msg_get_device_memory_rsp response; if (!server.get_device_memory(request, response)) { return; } - if (!send_msg(sockfd, &response, sizeof(response))) { + if (!send_msg(sock, &response, sizeof(response))) { return; } break; @@ -2424,36 +1754,28 @@ void ggml_backend_rpc_start_server(const char * endpoint, const char * cache_dir #else printf(" transport : TCP\n"); #endif // GGML_RPC_RDMA -#ifdef _WIN32 - { - WSADATA wsaData; - int res = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (res != 0) { - fprintf(stderr, "WSAStartup failed: %d\n", res); - return; - } + if (!rpc_transport_init()) { + fprintf(stderr, "Failed to initialize RPC transport\n"); + return; } -#endif - auto server_socket = create_server_socket(host.c_str(), port); + auto server_socket = socket_t::create_server(host.c_str(), port); if (server_socket == nullptr) { fprintf(stderr, "Failed to create server socket\n"); return; } while (true) { - auto client_socket = socket_accept(server_socket->fd); + auto client_socket = server_socket->accept(); if (client_socket == nullptr) { fprintf(stderr, "Failed to accept client connection\n"); return; } printf("Accepted client connection\n"); fflush(stdout); - rpc_serve_client(backends, cache_dir, client_socket.get()); + rpc_serve_client(backends, cache_dir, client_socket); printf("Client connection closed\n"); fflush(stdout); } -#ifdef _WIN32 - WSACleanup(); -#endif + rpc_transport_shutdown(); for (auto backend : backends) { ggml_backend_free(backend); } diff --git a/ggml/src/ggml-rpc/transport.cpp b/ggml/src/ggml-rpc/transport.cpp new file mode 100644 index 00000000000..a728152421f --- /dev/null +++ b/ggml/src/ggml-rpc/transport.cpp @@ -0,0 +1,683 @@ +#include "transport.h" +#include "ggml-impl.h" + +#ifdef _WIN32 +# define WIN32_LEAN_AND_MEAN +# ifndef NOMINMAX +# define NOMINMAX +# endif +# include +# include +#else +# include +# include +# include +# include +# include +# include +# include +#endif +#include +#include +#include + +#ifdef GGML_RPC_RDMA +# include +# include +# ifndef _WIN32 +# include +# endif +#endif // GGML_RPC_RDMA + +#ifdef _WIN32 +typedef SOCKET sockfd_t; +using ssize_t = __int64; +#else +typedef int sockfd_t; +#endif + +static const char * RPC_DEBUG = std::getenv("GGML_RPC_DEBUG"); + +#define LOG_DBG(...) \ + do { if (RPC_DEBUG) GGML_LOG_DEBUG(__VA_ARGS__); } while (0) + +#ifdef GGML_RPC_RDMA +static constexpr size_t RDMA_CHUNK = 256 * 1024; // 256 KiB per send/recv (fits default 8 MiB memlock) +static constexpr int RDMA_RX_DEPTH = 24; // pre-posted recv ring: 24 × 256 KiB = 6 MiB +static constexpr size_t RDMA_GID_SIZE = 16; // RoCE GID / IB GID is always 16 bytes +using rdma_gid_t = std::array; + +struct rdma_conn { + struct ibv_context * ctx = nullptr; + struct ibv_pd * pd = nullptr; + struct ibv_cq * scq = nullptr; // send completions + struct ibv_cq * rcq = nullptr; // recv completions + struct ibv_qp * qp = nullptr; + + void * tx_buf = nullptr; + struct ibv_mr * tx_mr = nullptr; + + void * rx_buf = nullptr; // RDMA_RX_DEPTH × RDMA_CHUNK contiguous + struct ibv_mr * rx_mr = nullptr; + int rx_head = 0; + + uint32_t max_inline = 0; + + uint8_t * rx_slot(int i) const { + return static_cast(rx_buf) + static_cast(i) * RDMA_CHUNK; + } + + bool post_rx(int i) { + struct ibv_sge sge = {}; + sge.addr = (uintptr_t)rx_slot(i); + sge.length = RDMA_CHUNK; + sge.lkey = rx_mr->lkey; + struct ibv_recv_wr wr = {}, * bad = nullptr; + wr.wr_id = (uint64_t)i; + wr.sg_list = &sge; + wr.num_sge = 1; + return ibv_post_recv(qp, &wr, &bad) == 0; + } + + ~rdma_conn() { + if (tx_mr) ibv_dereg_mr(tx_mr); + if (rx_mr) ibv_dereg_mr(rx_mr); + free(tx_buf); + free(rx_buf); + if (qp) ibv_destroy_qp(qp); + if (scq) ibv_destroy_cq(scq); + if (rcq) ibv_destroy_cq(rcq); + if (pd) ibv_dealloc_pd(pd); + if (ctx) ibv_close_device(ctx); + } +}; + +// Local RDMA parameters captured during the probe phase and later consumed +// by rdma_activate() after the remote side's caps arrive via HELLO. +struct rdma_local_info { + uint32_t qpn = 0; + uint32_t psn = 0; + uint8_t gid[RDMA_GID_SIZE] = {}; + uint8_t ib_port = 0; + int gid_idx = 0; + enum ibv_mtu path_mtu = IBV_MTU_1024; +}; + +struct rdma_caps { + uint32_t qpn; + uint32_t psn; + uint8_t gid[RDMA_GID_SIZE]; +}; + +static_assert(sizeof(rdma_caps) == RPC_CONN_CAPS_SIZE, "rdma_caps must match conn_caps size"); + +#endif // GGML_RPC_RDMA + +struct socket_t::impl { + impl(sockfd_t fd) : use_rdma(false), fd(fd) {} + ~impl(); + bool send_data(const void * data, size_t size); + bool recv_data(void * data, size_t size); + void get_caps(uint8_t * local_caps); + void update_caps(const uint8_t * remote_caps); + +#ifdef GGML_RPC_RDMA + bool tcp_peer_closed(); + std::optional rdma_build_target_gid(); + bool rdma_probe(); + bool rdma_activate(uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid); + bool rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc); + bool rdma_send(const void * data, size_t size); + bool rdma_recv(void * data, size_t size); + + std::unique_ptr rdma; + rdma_local_info rdma_local = {}; +#endif // GGML_RPC_RDMA + bool use_rdma; + sockfd_t fd; +}; + +socket_t::impl::~impl() { +#ifdef GGML_RPC_RDMA + rdma.reset(); +#endif // GGML_RPC_RDMA + LOG_DBG("[%s] closing socket %d\n", __func__, this->fd); +#ifdef _WIN32 + if (fd != INVALID_SOCKET) closesocket(this->fd); +#else + if (fd >= 0) close(this->fd); +#endif +} + +#ifdef GGML_RPC_RDMA + +bool socket_t::impl::tcp_peer_closed() { + if (fd < 0) return false; +#ifndef _WIN32 + struct pollfd pfd = { fd, POLLIN | POLLRDHUP, 0 }; + int r = poll(&pfd, 1, 0); + return r > 0 && (pfd.revents & (POLLHUP | POLLERR | POLLRDHUP)); +#else + return false; +#endif +} + +// Build a RoCE GID-shaped 16-byte target from a TCP socket's local address. +// Used to match the socket's local IP against the kernel's GID table so that +// a single memcmp handles IPv4, IPv4-mapped IPv6, and native IPv6 uniformly: +// AF_INET -> ::ffff:a.b.c.d (bytes 10-11 = 0xff, last 4 = IPv4) +// AF_INET6 (IPv4-mapped) -> ::ffff:a.b.c.d (already in GID shape) +// AF_INET6 (native v6) -> the 16-byte IPv6 address as-is +// Returns std::nullopt on unsupported family or getsockname failure. +std::optional socket_t::impl::rdma_build_target_gid() { + sockaddr_storage addr = {}; + socklen_t addr_len = sizeof(addr); + if (getsockname(fd, reinterpret_cast(&addr), &addr_len) != 0) { + return std::nullopt; + } + rdma_gid_t target = {}; + if (addr.ss_family == AF_INET) { + const auto * a = reinterpret_cast(&addr); + target[10] = 0xff; + target[11] = 0xff; + memcpy(&target[12], &a->sin_addr, 4); + return target; + } + if (addr.ss_family == AF_INET6) { + const auto * a = reinterpret_cast(&addr); + memcpy(target.data(), &a->sin6_addr, RDMA_GID_SIZE); + return target; + } + return std::nullopt; +} + +bool socket_t::impl::rdma_probe() { + const char * dev_env = std::getenv("GGML_RDMA_DEV"); + const char * gid_env = std::getenv("GGML_RDMA_GID"); + + auto target_gid = rdma_build_target_gid(); + if (!target_gid) { + return false; + } + + const uint8_t ib_port = 1; + int num_devs = 0; + ibv_device ** devs = ibv_get_device_list(&num_devs); + if (!devs || num_devs == 0) return false; + + ibv_context * ibctx = nullptr; + const char * matched_dev = nullptr; + int gid_idx = gid_env ? atoi(gid_env) : -1; + int gid_version = IBV_GID_TYPE_IB; // 0 = unknown/IB + + for (int d = 0; d < num_devs; d++) { + const char * dn = ibv_get_device_name(devs[d]); + if (dev_env && strcmp(dev_env, dn) != 0) continue; + + ibv_context * ctx = ibv_open_device(devs[d]); + if (!ctx) continue; + + ibv_port_attr pa; + if (ibv_query_port(ctx, ib_port, &pa) != 0) { ibv_close_device(ctx); continue; } + + int found_gid = gid_idx; + int found_version = IBV_GID_TYPE_IB; + if (found_gid < 0) { + // Find a GID on this port whose bytes equal the local TCP address + // (IPv4 or IPv6). Prefer RoCE v2 (UDP/IP, L3-routable) over v1 + // (raw Ethernet, same-L2 only) so silent hangs on L3-routed paths + // are avoided. ibv_query_gid_ex returns gid+type in one call. + int v2_idx = -1; + int v1_idx = -1; + for (int i = 0; i < pa.gid_tbl_len; i++) { + ibv_gid_entry entry = {}; + if (ibv_query_gid_ex(ctx, ib_port, i, &entry, 0) != 0) continue; + if (memcmp(entry.gid.raw, target_gid->data(), RDMA_GID_SIZE) != 0) continue; + if (entry.gid_type == IBV_GID_TYPE_ROCE_V2 && v2_idx < 0) { + v2_idx = i; + } else if (entry.gid_type == IBV_GID_TYPE_ROCE_V1 && v1_idx < 0) { + v1_idx = i; + } + } + if (v2_idx >= 0) { + found_gid = v2_idx; + found_version = IBV_GID_TYPE_ROCE_V2; + } else if (v1_idx >= 0) { + found_gid = v1_idx; + found_version = IBV_GID_TYPE_ROCE_V1; + } + } else { + // Explicit GID index from GGML_RDMA_GID — fetch its type for logging. + ibv_gid_entry entry = {}; + if (ibv_query_gid_ex(ctx, ib_port, found_gid, &entry, 0) == 0) { + found_version = entry.gid_type; + } + } + if (found_gid >= 0) { + ibctx = ctx; + gid_idx = found_gid; + gid_version = found_version; + matched_dev = dn; + rdma_local.path_mtu = pa.active_mtu; + break; + } + ibv_close_device(ctx); + } + ibv_free_device_list(devs); + if (!ibctx) return false; + + rdma_local.ib_port = ib_port; + rdma_local.gid_idx = gid_idx; + + rdma = std::make_unique(); + rdma->ctx = ibctx; + + rdma->pd = ibv_alloc_pd(ibctx); + if (!rdma->pd) return false; + + rdma->scq = ibv_create_cq(ibctx, 16, nullptr, nullptr, 0); + rdma->rcq = ibv_create_cq(ibctx, RDMA_RX_DEPTH + 4, nullptr, nullptr, 0); + if (!rdma->scq || !rdma->rcq) return false; + + ibv_qp_init_attr qia = {}; + qia.send_cq = rdma->scq; + qia.recv_cq = rdma->rcq; + qia.qp_type = IBV_QPT_RC; + qia.cap.max_send_wr = 4; + qia.cap.max_recv_wr = RDMA_RX_DEPTH + 4; + qia.cap.max_send_sge = 1; + qia.cap.max_recv_sge = 1; + qia.cap.max_inline_data = 256; + + rdma->qp = ibv_create_qp(rdma->pd, &qia); + if (!rdma->qp) return false; + rdma->max_inline = qia.cap.max_inline_data; + + rdma->tx_buf = aligned_alloc(4096, RDMA_CHUNK); + rdma->rx_buf = aligned_alloc(4096, static_cast(RDMA_RX_DEPTH) * RDMA_CHUNK); + if (!rdma->tx_buf || !rdma->rx_buf) return false; + + rdma->tx_mr = ibv_reg_mr(rdma->pd, rdma->tx_buf, RDMA_CHUNK, IBV_ACCESS_LOCAL_WRITE); + rdma->rx_mr = ibv_reg_mr(rdma->pd, rdma->rx_buf, static_cast(RDMA_RX_DEPTH) * RDMA_CHUNK, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); + if (!rdma->tx_mr || !rdma->rx_mr) return false; + + ibv_gid local_gid; + if (ibv_query_gid(ibctx, ib_port, gid_idx, &local_gid) != 0) return false; + + rdma_local.qpn = rdma->qp->qp_num; + rdma_local.psn = rdma->qp->qp_num & 0xffffff; + memcpy(&rdma_local.gid, &local_gid, RDMA_GID_SIZE); + + const char * ver_str = ""; + if (gid_version == IBV_GID_TYPE_ROCE_V2) { + ver_str = " RoCEv2"; + } else if (gid_version == IBV_GID_TYPE_ROCE_V1) { + ver_str = " RoCEv1"; + } + GGML_LOG_INFO("RDMA probed: dev=%s gid=%d%s qpn=%u inline=%u\n", + matched_dev, gid_idx, ver_str, rdma_local.qpn, rdma->max_inline); + return true; +} + +// Phase 2: Given remote QPN/PSN/GID, transition QP: RESET->INIT->pre-post->RTR->RTS. +// On success, the connection is live and ready for rdma_send/rdma_recv. +bool socket_t::impl::rdma_activate(uint32_t remote_qpn, uint32_t remote_psn, const uint8_t * remote_gid) { + // RESET -> INIT + { + struct ibv_qp_attr a = {}; + a.qp_state = IBV_QPS_INIT; + a.port_num = rdma_local.ib_port; + a.pkey_index = 0; + a.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE; + if (ibv_modify_qp(rdma->qp, &a, + IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) { + return false; + } + } + + for (int i = 0; i < RDMA_RX_DEPTH; i++) { + if (!rdma->post_rx(i)) return false; + } + + // INIT -> RTR + { + struct ibv_qp_attr a = {}; + a.qp_state = IBV_QPS_RTR; + a.path_mtu = rdma_local.path_mtu; + a.dest_qp_num = remote_qpn; + a.rq_psn = remote_psn; + a.max_dest_rd_atomic = 1; + a.min_rnr_timer = 1; + a.ah_attr.is_global = 1; + memcpy(&a.ah_attr.grh.dgid, remote_gid, RDMA_GID_SIZE); + a.ah_attr.grh.hop_limit = 1; + a.ah_attr.grh.sgid_index = rdma_local.gid_idx; + a.ah_attr.dlid = 0; + a.ah_attr.port_num = rdma_local.ib_port; + if (ibv_modify_qp(rdma->qp, &a, + IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER) != 0) { + return false; + } + } + + // RTR -> RTS + { + struct ibv_qp_attr a = {}; + a.qp_state = IBV_QPS_RTS; + a.timeout = 14; + a.retry_cnt = 7; + a.rnr_retry = 7; + a.sq_psn = rdma_local.psn; + a.max_rd_atomic = 1; + if (ibv_modify_qp(rdma->qp, &a, + IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC) != 0) { + return false; + } + } + + GGML_LOG_INFO("RDMA activated: qpn=%u->%u mtu=%d rx_depth=%d\n", + rdma_local.qpn, remote_qpn, 128 << rdma_local.path_mtu, RDMA_RX_DEPTH); + return true; +} + +bool socket_t::impl::rdma_poll(struct ibv_cq * cq, struct ibv_wc * wc) { + for (uint64_t s = 0; ; s++) { + int n = ibv_poll_cq(cq, 1, wc); + if (n > 0) { + if (wc->status != IBV_WC_SUCCESS) { + GGML_LOG_ERROR("RDMA CQ wc error: status=%d (%s) vendor_err=0x%x\n", + wc->status, ibv_wc_status_str(wc->status), wc->vendor_err); + } + return wc->status == IBV_WC_SUCCESS; + } + if (n < 0) return false; + if ((s & 0xFFFFF) == 0 && s > 0) { + if (tcp_peer_closed()) { + return false; + } + } + } +} + +bool socket_t::impl::rdma_send(const void * data, size_t size) { + rdma_conn * c = rdma.get(); + const uint8_t * src = (const uint8_t *)data; + size_t rem = size; + while (rem > 0) { + size_t chunk = std::min(rem, RDMA_CHUNK); + + struct ibv_sge sge = {}; + struct ibv_send_wr wr = {}, * bad = nullptr; + wr.opcode = IBV_WR_SEND; + wr.sg_list = &sge; + wr.num_sge = 1; + + if (chunk <= c->max_inline) { + sge.addr = (uintptr_t)src; + sge.length = chunk; + wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_INLINE; + } else { + memcpy(c->tx_buf, src, chunk); + sge.addr = (uintptr_t)c->tx_buf; + sge.length = chunk; + sge.lkey = c->tx_mr->lkey; + wr.send_flags = IBV_SEND_SIGNALED; + } + + if (ibv_post_send(c->qp, &wr, &bad) != 0) return false; + struct ibv_wc wc; + if (!rdma_poll(c->scq, &wc)) return false; + + src += chunk; + rem -= chunk; + } + return true; +} + +bool socket_t::impl::rdma_recv(void * data, size_t size) { + rdma_conn * c = rdma.get(); + uint8_t * dst = (uint8_t *)data; + size_t rem = size; + while (rem > 0) { + struct ibv_wc wc; + if (!rdma_poll(c->rcq, &wc)) return false; + + int slot = (int)wc.wr_id; + size_t got = wc.byte_len; + memcpy(dst, c->rx_slot(slot), got); + + if (!c->post_rx(slot)) return false; + + dst += got; + rem -= got; + } + return true; +} + +#endif // GGML_RPC_RDMA + +bool socket_t::impl::send_data(const void * data, size_t size) { +#ifdef GGML_RPC_RDMA + if (use_rdma) { + return rdma_send(data, size); + } +#endif + size_t bytes_sent = 0; + while (bytes_sent < size) { + size_t size_to_send = std::min(size - bytes_sent, MAX_CHUNK_SIZE); + ssize_t n = send(fd, (const char *)data + bytes_sent, size_to_send, 0); + if (n < 0) { + GGML_LOG_ERROR("send failed (bytes_sent=%zu, size_to_send=%zu)\n", + bytes_sent, size_to_send); + return false; + } + bytes_sent += (size_t)n; + } + return true; +} + +bool socket_t::impl::recv_data(void * data, size_t size) { +#ifdef GGML_RPC_RDMA + if (use_rdma) { + return rdma_recv(data, size); + } +#endif + size_t bytes_recv = 0; + while (bytes_recv < size) { + size_t size_to_recv = std::min(size - bytes_recv, MAX_CHUNK_SIZE); + ssize_t n = recv(fd, (char *)data + bytes_recv, size_to_recv, 0); + if (n < 0) { + GGML_LOG_ERROR("recv failed (bytes_recv=%zu, size_to_recv=%zu)\n", + bytes_recv, size_to_recv); + return false; + } + if (n == 0) { + LOG_DBG("recv returned 0 (peer closed?)\n"); + return false; + } + bytes_recv += (size_t)n; + } + return true; +} + +void socket_t::impl::get_caps(uint8_t * local_caps) { + memset(local_caps, 0, RPC_CONN_CAPS_SIZE); +#ifdef GGML_RPC_RDMA + rdma_local = {}; + if (rdma_probe()) { + rdma_caps rc = {}; + rc.qpn = rdma_local.qpn; + rc.psn = rdma_local.psn; + memcpy(rc.gid, rdma_local.gid, RDMA_GID_SIZE); + memcpy(local_caps, &rc, sizeof(rc)); + } else { + rdma.reset(); + } +#endif // GGML_RPC_RDMA +} + +void socket_t::impl::update_caps(const uint8_t * remote_caps) { +#ifdef GGML_RPC_RDMA + if (!rdma) { + return; + } + rdma_caps rc = {}; + memcpy(&rc, remote_caps, sizeof(rc)); + if (rc.qpn == 0) { + rdma.reset(); + return; + } + if (rdma_activate(rc.qpn, rc.psn, rc.gid)) { + use_rdma = true; + } else { + GGML_LOG_ERROR("RDMA activate failed, staying on TCP\n"); + rdma.reset(); + } +#else + (void)remote_caps; +#endif // GGML_RPC_RDMA +} + + +///////////////////////////////////////////////////////////////////////////// + +socket_t::socket_t(std::unique_ptr p) : pimpl(std::move(p)) {} + +socket_t::~socket_t() = default; + +bool socket_t::send_data(const void * data, size_t size) { + return pimpl->send_data(data, size); +} + +bool socket_t::recv_data(void * data, size_t size) { + return pimpl->recv_data(data, size); +} + +void socket_t::get_caps(uint8_t * local_caps) { + return pimpl->get_caps(local_caps); +} + +void socket_t::update_caps(const uint8_t * remote_caps) { + return pimpl->update_caps(remote_caps); +} + +static bool is_valid_fd(sockfd_t sockfd) { +#ifdef _WIN32 + return sockfd != INVALID_SOCKET; +#else + return sockfd >= 0; +#endif +} + +static bool set_no_delay(sockfd_t sockfd) { + int flag = 1; + // set TCP_NODELAY to disable Nagle's algorithm + int ret = setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int)); + return ret == 0; +} + +static bool set_reuse_addr(sockfd_t sockfd) { + int flag = 1; + int ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof(int)); + return ret == 0; +} + +socket_ptr socket_t::accept() { + auto client_socket_fd = ::accept(pimpl->fd, NULL, NULL); + if (!is_valid_fd(client_socket_fd)) { + return nullptr; + } + if (!set_no_delay(client_socket_fd)) { + GGML_LOG_ERROR("Failed to set TCP_NODELAY\n"); + return nullptr; + } + return socket_ptr(new socket_t(std::make_unique(client_socket_fd))); +} + +socket_ptr socket_t::create_server(const char * host, int port) { + auto sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (!is_valid_fd(sockfd)) { + return nullptr; + } + if (!set_reuse_addr(sockfd)) { + GGML_LOG_ERROR("Failed to set SO_REUSEADDR\n"); + return nullptr; + } + if (inet_addr(host) == INADDR_NONE) { + GGML_LOG_ERROR("Invalid host address: %s\n", host); + return nullptr; + } + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host); + serv_addr.sin_port = htons(port); + + if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + return nullptr; + } + if (listen(sockfd, 1) < 0) { + return nullptr; + } + return socket_ptr(new socket_t(std::make_unique(sockfd))); +} + +socket_ptr socket_t::connect(const char * host, int port) { + auto sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (!is_valid_fd(sockfd)) { + return nullptr; + } + if (!set_no_delay(sockfd)) { + GGML_LOG_ERROR("Failed to set TCP_NODELAY\n"); + return nullptr; + } + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + struct hostent * server = gethostbyname(host); + if (server == NULL) { + GGML_LOG_ERROR("Cannot resolve host '%s'\n", host); + return nullptr; + } + memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length); + if (::connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + return nullptr; + } + return socket_ptr(new socket_t(std::make_unique(sockfd))); +} + +#ifdef _WIN32 +static std::mutex g_rpc_transport_mu; +static bool g_rpc_transport_wsa_started = false; +#endif + +bool rpc_transport_init() { +#ifdef _WIN32 + std::lock_guard lock(g_rpc_transport_mu); + if (g_rpc_transport_wsa_started) { + return true; + } + WSADATA wsaData; + int res = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (res != 0) { + return false; + } + g_rpc_transport_wsa_started = true; + return true; +#else + return true; +#endif +} + +void rpc_transport_shutdown() { +#ifdef _WIN32 + std::lock_guard lock(g_rpc_transport_mu); + if (!g_rpc_transport_wsa_started) { + return; + } + WSACleanup(); + g_rpc_transport_wsa_started = false; +#endif +} diff --git a/ggml/src/ggml-rpc/transport.h b/ggml/src/ggml-rpc/transport.h new file mode 100644 index 00000000000..73b85cc530a --- /dev/null +++ b/ggml/src/ggml-rpc/transport.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include + +struct socket_t; +typedef std::shared_ptr socket_ptr; + +static constexpr size_t MAX_CHUNK_SIZE = 1024ull * 1024ull * 1024ull; // 1 GiB +static constexpr size_t RPC_CONN_CAPS_SIZE = 24; + +struct socket_t { + ~socket_t(); + + bool send_data(const void * data, size_t size); + bool recv_data(void * data, size_t size); + + socket_ptr accept(); + + void get_caps(uint8_t * local_caps); + void update_caps(const uint8_t * remote_caps); + + static socket_ptr create_server(const char * host, int port); + static socket_ptr connect(const char * host, int port); + +private: + struct impl; + explicit socket_t(std::unique_ptr p); + std::unique_ptr pimpl; +}; + +bool rpc_transport_init(); +void rpc_transport_shutdown(); From 455d8e4be85aaae75195435a513a2a5dbef0136c Mon Sep 17 00:00:00 2001 From: Sascha Rogmann <59577610+srogmann@users.noreply.github.com> Date: Sun, 19 Apr 2026 09:24:06 +0200 Subject: [PATCH 2/5] server : speculative checkpointing (#19493) * server : speculative decoding using checkpoints * server : fix draft check with checkpoints * server : rename spec vars * server : log levels * server : refactored spec logic to speculative.cpp * server : renamed spec checkpoints option * server : fix spec checkpoints, logging * speculative : checkpoints with draft model, logging * server : n_tokens_cur and create_checkpoint in draft * server : fix server_speculative_callback (slot.id) * spec : fix ngram-map/begin idx_last_check * spec : init ckpt (begin() wasn't called) * chore: update webui build output * server : restore sampler in spec checkpoint and clear mem * cont : avoid --spec-use-checkpoints argument * cont : remove server_prompt_checkpoint_with_size * spec : rename (leave_draft_state) * cont : clean-up * cont : do not ignore partial drafts even if the are short * cont : spec callback owned by session * cont : simplify * cont : avoid empty speculative session * cont : simplify * cont : simplify * cont : enable mtmd speculative decoding * cont : keep the spec sampler alive * cont : simplify * cont : fix nullptr deref + draft checkpoints * cont : remove common_speculative_accept_response * cont : remove callback * cont : simplify * cont : minor * cont : simplify * cont : fix accepted number --------- Co-authored-by: Georgi Gerganov --- common/chat.cpp | 2 +- common/common.h | 4 +- common/ngram-map.cpp | 8 +- common/speculative.cpp | 162 ++++++++++++-- common/speculative.h | 14 +- tools/server/server-common.cpp | 16 +- tools/server/server-common.h | 4 +- tools/server/server-context.cpp | 374 ++++++++++++++++++++------------ tools/server/server-task.cpp | 4 +- tools/server/server-task.h | 11 + 10 files changed, 420 insertions(+), 179 deletions(-) diff --git a/common/chat.cpp b/common/chat.cpp index e27b6c3413c..e424206af86 100644 --- a/common/chat.cpp +++ b/common/chat.cpp @@ -2334,7 +2334,7 @@ common_chat_msg common_chat_peg_parse(const common_peg_arena & src_pars ? input : params.generation_prompt + input; - LOG_DBG("Parsing PEG input with format %s: %s\n", common_chat_format_name(params.format), effective_input.c_str()); + //LOG_DBG("Parsing PEG input with format %s: %s\n", common_chat_format_name(params.format), effective_input.c_str()); common_peg_parse_flags flags = COMMON_PEG_PARSE_FLAG_LENIENT; if (params.debug) { diff --git a/common/common.h b/common/common.h index 81c26955656..4c36e85e0bc 100644 --- a/common/common.h +++ b/common/common.h @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -303,7 +302,7 @@ struct common_params_speculative { // general-purpose speculative decoding parameters int32_t n_max = 16; // maximum number of tokens to draft during speculative decoding - int32_t n_min = 0; // minimum number of draft tokens to use for speculative decoding + int32_t n_min = 0; // minimum number of draft tokens to use for speculative decoding float p_split = 0.1f; // speculative decoding split probability float p_min = 0.75f; // minimum speculative decoding probability (greedy) @@ -312,6 +311,7 @@ struct common_params_speculative { uint16_t ngram_size_n = 12; // ngram size for lookup uint16_t ngram_size_m = 48; // mgram size for speculative tokens uint16_t ngram_min_hits = 1; // minimum hits at ngram/mgram lookup for mgram to be proposed + bool use_checkpoints = false; // use checkpoints to rewind in token history of recurrent models std::shared_ptr ngram_mod; diff --git a/common/ngram-map.cpp b/common/ngram-map.cpp index ebf771a24a7..8e3978f7ed0 100644 --- a/common/ngram-map.cpp +++ b/common/ngram-map.cpp @@ -208,7 +208,7 @@ void common_ngram_map_begin( count_keys, count_keys_del, count_values_del, count_map_entries_upd); } - map.idx_last_check = (map.size_last_begin > 0) ? map.size_last_begin - 1 : 0; + map.idx_last_check = size_begin; map.size_last_begin = size_begin; } @@ -231,7 +231,7 @@ void common_ngram_map_draft(common_ngram_map & map, GGML_ABORT("%s: cur_len exceeds UINT32_MAX: %zu", __func__, cur_len); } - if (map.idx_last_check > cur_len) { + if (map.idx_last_check > cur_len) { // Should not happen because of common_ngram_map_begin(). GGML_ABORT("%s: map.idx_last_check > cur_len: %zu > %zu", __func__, map.idx_last_check, cur_len); } @@ -386,7 +386,7 @@ void common_ngram_map_draft(common_ngram_map & map, LOG_DBG("%s: key_idx = %zu, key_offset = %zu, key_num = %d, draft.size = %zu\n", __func__, curr_key.key_idx, key_offset, curr_key.key_num, draft.size()); - map.last_draft_created = false; + map.last_draft_created = true; map.last_draft_key_idx = key_offset; map.last_draft_value_idx = 0; // value 0 is used for simple mode return; @@ -524,7 +524,7 @@ void common_ngram_map_accept(common_ngram_map & map, uint16_t n_accepted) { struct common_ngram_map_value & curr_value = curr_key.values[val_idx]; // value used for draft generation. // update the value statistics - LOG_INF("common_ngram_map_send_accepted: n_accepted = %d, prev value_num = %d\n", + LOG_DBG("common_ngram_map_send_accepted: n_accepted = %d, prev value_num = %d\n", n_accepted, curr_value.n_accepted); curr_value.n_accepted = n_accepted; } diff --git a/common/speculative.cpp b/common/speculative.cpp index 3e68c38e49c..1789560eeaa 100644 --- a/common/speculative.cpp +++ b/common/speculative.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #define SPEC_VOCAB_MAX_SIZE_DIFFERENCE 128 #define SPEC_VOCAB_CHECK_START_TOKEN_ID 5 @@ -144,10 +145,28 @@ struct common_speculative_state { virtual void accept(uint16_t n_accepted) = 0; }; +struct common_speculative_checkpoint { + llama_pos pos_min = 0; + llama_pos pos_max = 0; + + int64_t n_tokens = 0; + + std::vector data; + + size_t size() const { + return data.size(); + } + + size_t ckpt_size = 0; +}; + struct common_speculative_state_draft : public common_speculative_state { llama_context * ctx_tgt; // only used for retokenizing from ctx_dft llama_context * ctx_dft; + struct common_speculative_checkpoint ckpt; + bool use_checkpoint; + common_sampler * smpl; llama_batch batch; @@ -160,10 +179,12 @@ struct common_speculative_state_draft : public common_speculative_state { enum common_speculative_type type, llama_context * ctx_tgt, llama_context * ctx_dft, - const std::vector> & replacements) + const std::vector> & replacements, + bool use_checkpoint) : common_speculative_state(type) , ctx_tgt(ctx_tgt) , ctx_dft(ctx_dft) + , use_checkpoint(use_checkpoint) { batch = llama_batch_init(llama_n_batch(ctx_dft), 0, 1); smpl = nullptr; @@ -218,7 +239,48 @@ struct common_speculative_state_draft : public common_speculative_state { } void begin(const llama_tokens & prompt) override { - GGML_UNUSED(prompt); + if (use_checkpoint && ckpt.size() > 0) { + // delete checkpoint + LOG_DBG("%s: delete checkpoint, prompt.size=%zu, pos_min=%d, pos_max=%d, n_tokens=%" PRId64 ", size=%.3f MiB\n", + __func__, prompt.size(), ckpt.pos_min, ckpt.pos_max, ckpt.n_tokens, (float) ckpt.data.size() / 1024 / 1024); + ckpt.pos_min = 0; + ckpt.pos_max = 0; + ckpt.n_tokens = 0; + ckpt.ckpt_size = 0; + ckpt.data.clear(); + } + } + + size_t draft_create_checkpoint(int n_tokens_prompt, int n_tokens_batch) { + int slot_id = 0; + const size_t checkpoint_size = llama_state_seq_get_size_ext(ctx_dft, slot_id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + + ckpt.pos_min = llama_memory_seq_pos_min(llama_get_memory(ctx_dft), slot_id); + ckpt.pos_max = llama_memory_seq_pos_max(llama_get_memory(ctx_dft), slot_id); + ckpt.n_tokens = n_tokens_prompt - n_tokens_batch; + ckpt.data.resize(checkpoint_size); + + const size_t n = llama_state_seq_get_data_ext(ctx_dft, ckpt.data.data(), checkpoint_size, slot_id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + if (n != checkpoint_size) { + GGML_ABORT("checkpoint size mismatch: expected %zu, got %zu\n", checkpoint_size, n); + } + + LOG_DBG("%s: pos_min = %d, pos_max = %d, size = %.3f MiB\n", __func__, + ckpt.pos_min, ckpt.pos_max, (float) ckpt.data.size() / 1024 / 1024); + return n; + } + + size_t draft_restore_checkpoint(size_t ckpt_size_part_expected) { + int slot_id = 0; + LOG_DBG("%s: pos_min = %d, pos_max = %d\n", __func__, ckpt.pos_min, ckpt.pos_max); + const size_t n = llama_state_seq_set_data_ext(ctx_dft, ckpt.data.data(), ckpt.size(), slot_id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + if (n != ckpt_size_part_expected) { + GGML_ABORT("%s: failed to restore context checkpoint (pos_min=%d, pos_max=%d, size=%zu, get_data_ext->%zu, set_data_ext->%zu", + __func__, ckpt.pos_min, ckpt.pos_max, ckpt.size(), ckpt_size_part_expected, n); + } + llama_memory_seq_rm(llama_get_memory(ctx_dft), slot_id, ckpt.pos_max + 1, -1); + + return n; } void draft( @@ -236,8 +298,8 @@ struct common_speculative_state_draft : public common_speculative_state { auto * mem_dft = llama_get_memory(ctx_dft); - int reuse_i = 0; - int reuse_n = 0; + int reuse_i = 0; // index of part to be reused in prompt_dft + int reuse_n = 0; // length of part to be reused in prompt_dft const int n_ctx = llama_n_ctx(ctx_dft) - params.n_max; @@ -287,18 +349,26 @@ struct common_speculative_state_draft : public common_speculative_state { } } - LOG_DBG("%s: reuse_i = %d, reuse_n = %d, prompt = %d\n", __func__, reuse_i, reuse_n, (int) prompt_dft.size()); + LOG_DBG("%s: reuse_i = %d, reuse_n = %d, #prompt_dft = %zu, #prompt_cur = %zu\n", + __func__, reuse_i, reuse_n, prompt_dft.size(), prompt_cur.size()); + if (use_checkpoint && ckpt.ckpt_size == 0 && reuse_n > 0) { + LOG_DBG("%s: no checkpoint available, no reuse, (reuse_i=%d, reuse_n=%d) -> (0, 0)\n", + __func__, reuse_i, reuse_n); + reuse_i = 0; + reuse_n = 0; + } result.clear(); result.reserve(params.n_max); - if (reuse_n == 0) { + bool needs_ckpt = use_checkpoint && prompt_dft.size() > 0; + if (reuse_n == 0 || (use_checkpoint && reuse_i > 0)) { llama_memory_clear(mem_dft, false); prompt_dft.clear(); } else { // this happens when a previous draft has been discarded (for example, due to being too small), but the // target model agreed with it. in this case, we simply pass back the previous results to save compute - if (reuse_i + reuse_n < (int) prompt_dft.size() && prompt_dft[reuse_i + reuse_n] == id_last) { + if (reuse_i + reuse_n < (int64_t) prompt_dft.size() && prompt_dft[reuse_i + reuse_n] == id_last) { for (int i = reuse_i + reuse_n + 1; i < (int) prompt_dft.size(); ++i) { result.push_back(prompt_dft[i]); @@ -310,19 +380,50 @@ struct common_speculative_state_draft : public common_speculative_state { return; } + bool do_restore = false; + if (prompt_dft.size() > prompt_cur.size() && reuse_i + reuse_n < (int64_t) prompt_dft.size()) { + // This can happen after a partial acceptance (speculative decoding with checkpoints) + LOG_DBG("%s: #prompt_dft=%zu, #prompt_cur=%zu, shorten draft\n", + __func__, prompt_dft.size(), prompt_cur.size()); + prompt_dft.resize(prompt_cur.size()); + do_restore = true; + } + if (reuse_i > 0) { - llama_memory_seq_rm (mem_dft, 0, 0, reuse_i); + bool is_removed = llama_memory_seq_rm (mem_dft, 0, 0, reuse_i); + if (!is_removed) { + LOG_ERR("%s: llama_memory_seq_rm failed, reuse_i=%d\n", __func__, reuse_i); + } llama_memory_seq_add(mem_dft, 0, reuse_i, -1, -reuse_i); prompt_dft.erase(prompt_dft.begin(), prompt_dft.begin() + reuse_i); } - if (reuse_n < (int) prompt_dft.size()) { - llama_memory_seq_rm (mem_dft, 0, reuse_n, -1); - prompt_dft.erase(prompt_dft.begin() + reuse_n, prompt_dft.end()); + if (reuse_n < (int) prompt_dft.size() || do_restore) { + if (use_checkpoint) { + if (ckpt.n_tokens > (int64_t) prompt_dft.size()) { + LOG_INF("%s: checkpoint is too large, prompt_tgt.size=%zu, ckpt.n_tokens=%" PRId64 ", reuse_n=%d, prompt_dft.size=%zu\n", + __func__, prompt_tgt.size(), ckpt.n_tokens, reuse_n, prompt_dft.size()); + } + draft_restore_checkpoint(ckpt.ckpt_size); + reuse_n = ckpt.n_tokens; + prompt_dft.resize(reuse_n); + needs_ckpt = false; + } else { + bool is_removed = llama_memory_seq_rm (mem_dft, 0, reuse_n, -1); + if (!is_removed) { + LOG_ERR("%s: llama_memory_seq_rm failed, reuse_n=%d, prompt_dft.size=%zu\n", + __func__, reuse_n, prompt_dft.size()); + } + prompt_dft.erase(prompt_dft.begin() + reuse_n, prompt_dft.end()); + } } } + if (needs_ckpt) { + ckpt.ckpt_size = draft_create_checkpoint(prompt_dft.size(), batch.n_tokens); + } + // prepare a batch to evaluate any new tokens in the prompt common_batch_clear(batch); @@ -337,7 +438,11 @@ struct common_speculative_state_draft : public common_speculative_state { if (batch.n_tokens > 0) { //LOG_DBG("%s: draft prompt batch: %s\n", __func__, string_from(ctx, batch).c_str()); - llama_decode(ctx_dft, batch); + int ret = llama_decode(ctx_dft, batch); + if (ret != 0 && ret != 1) { + LOG_WRN("%s: llama_decode returned %d, prompt_cur.size=%zu\n", + __func__, ret, prompt_cur.size()); + } } const llama_pos n_past = prompt_dft.size(); @@ -351,7 +456,11 @@ struct common_speculative_state_draft : public common_speculative_state { LOG_DBG("%s: draft prompt: %s\n", __func__, string_from(ctx_dft, prompt_dft).c_str()); - llama_decode(ctx_dft, batch); + int ret = llama_decode(ctx_dft, batch); + if (ret != 0 && ret != 1) { + LOG_WRN("%s: llama_decode returned %d, prompt_cur.size=%zu, prompt_dft.size=%zu\n", + __func__, ret, prompt_cur.size(), prompt_dft.size()); + } common_sampler_reset(smpl); @@ -387,7 +496,11 @@ struct common_speculative_state_draft : public common_speculative_state { common_batch_add(batch, id, n_past + i + 1, { 0 }, true); // evaluate the drafted tokens on the draft model - llama_decode(ctx_dft, batch); + ret = llama_decode(ctx_dft, batch); + if (ret != 0) { + LOG_WRN("%s: llama_decode[%d] returned %d, prompt_cur.size=%zu, prompt_dft.size=%zu\n", + __func__, i, ret, prompt_cur.size(), prompt_dft.size()); + } prompt_dft.push_back(id); } @@ -739,6 +852,7 @@ struct common_speculative_state_ngram_cache : public common_speculative_state { struct common_speculative { std::vector> impls; // list of implementations to use and their states + common_speculative_state * curr_impl = nullptr; // current implementation in use (for stats) }; @@ -798,13 +912,13 @@ enum common_speculative_type common_speculative_type_from_name(const std::string return it->second; } -bool common_speculative_is_compat(llama_context * ctx_tgt) { +common_speculative_compat_type common_speculative_is_compat(llama_context * ctx_tgt) { auto * mem = llama_get_memory(ctx_tgt); if (mem == nullptr) { - return false; + return COMMON_SPECULATIVE_COMPAT_TYPE_NO; } - bool res = true; + common_speculative_compat_type res = COMMON_SPECULATIVE_COMPAT_TYPE_FULL; llama_memory_clear(mem, true); @@ -816,14 +930,14 @@ bool common_speculative_is_compat(llama_context * ctx_tgt) { int ret = llama_decode(ctx_tgt, llama_batch_get_one(tmp.data(), tmp.size())); if (ret != 0) { LOG_ERR("%s: llama_decode() failed: %d\n", __func__, ret); - res = false; + res = COMMON_SPECULATIVE_COMPAT_TYPE_NO; goto done; } // try to remove the last tokens if (!llama_memory_seq_rm(mem, 0, 1, -1)) { LOG_WRN("%s: the target context does not support partial sequence removal\n", __func__); - res = false; + res = COMMON_SPECULATIVE_COMPAT_TYPE_CKPT; goto done; } @@ -909,9 +1023,10 @@ common_speculative * common_speculative_init( break; case COMMON_SPECULATIVE_TYPE_DRAFT: { impls.push_back(std::make_unique(config.type, - /* .ctx_tgt = */ ctx_tgt, - /* .ctx_dft = */ ctx_dft, - /* .replacements = */ params.replacements + /* .ctx_tgt = */ ctx_tgt, + /* .ctx_dft = */ ctx_dft, + /* .replacements = */ params.replacements, + /* .use_checkpoint= */ params.use_checkpoints // TODO: this should be based on the draft model! )); break; } @@ -966,7 +1081,8 @@ common_speculative * common_speculative_init( } auto * result = new common_speculative { - /* .impls = */ std::move(impls) + /* .impls = */ std::move(impls), + /* .curr_impl = */ nullptr, }; return result; diff --git a/common/speculative.h b/common/speculative.h index 876cde3d180..cbe6e5bdb73 100644 --- a/common/speculative.h +++ b/common/speculative.h @@ -14,9 +14,15 @@ enum common_speculative_type common_speculative_type_from_name(const std::string // convert type to string std::string common_speculative_type_to_str(enum common_speculative_type type); +enum common_speculative_compat_type { + COMMON_SPECULATIVE_COMPAT_TYPE_NO = 0, + COMMON_SPECULATIVE_COMPAT_TYPE_FULL = 1, + COMMON_SPECULATIVE_COMPAT_TYPE_CKPT = 2, +}; + // check if the llama_context is compatible for speculative decoding // note: clears the memory of the context -bool common_speculative_is_compat(llama_context * ctx_tgt); +common_speculative_compat_type common_speculative_is_compat(llama_context * ctx_tgt); common_speculative * common_speculative_init( common_params_speculative & params, @@ -39,3 +45,9 @@ void common_speculative_accept(common_speculative * spec, uint16_t n_accepted); // print statistics about the speculative decoding void common_speculative_print_stats(const common_speculative * spec); + +struct common_speculative_deleter { + void operator()(common_speculative * s) { common_speculative_free(s); } +}; + +typedef std::unique_ptr common_speculative_ptr; diff --git a/tools/server/server-common.cpp b/tools/server/server-common.cpp index f66b1f2557c..cae64884b36 100644 --- a/tools/server/server-common.cpp +++ b/tools/server/server-common.cpp @@ -391,15 +391,25 @@ void server_tokens::push_back(server_tokens & tokens) { } void server_tokens::insert(const llama_tokens & inp_tokens) { - GGML_ASSERT(!has_mtmd); // only allow this if mtmd is disabled tokens.insert(tokens.end(), inp_tokens.begin(), inp_tokens.end()); } -const llama_tokens & server_tokens::get_text_tokens() const { - GGML_ASSERT(!has_mtmd); // only allow this if mtmd is disabled +const llama_tokens & server_tokens::get_tokens() const { + GGML_ASSERT(!has_mtmd); return tokens; } +llama_tokens server_tokens::get_text_tokens() const { + llama_tokens res; + res.reserve(tokens.size()); + for (llama_token t : tokens) { + if (t != LLAMA_TOKEN_NULL) { + res.push_back(t); + } + } + return res; +} + void server_tokens::set_token(llama_pos pos, llama_token id) { GGML_ASSERT(!has_mtmd); // only allow this if mtmd is disabled tokens[pos] = id; diff --git a/tools/server/server-common.h b/tools/server/server-common.h index 57545aa53ed..093a43453c2 100644 --- a/tools/server/server-common.h +++ b/tools/server/server-common.h @@ -190,7 +190,9 @@ struct server_tokens { void insert(const llama_tokens & inp_tokens); // for compatibility with speculative decoding, ctx shift, slot save/load - const llama_tokens & get_text_tokens() const; + const llama_tokens & get_tokens() const; + + llama_tokens get_text_tokens() const; // for compatibility with speculative decoding void set_token(llama_pos pos, llama_token id); diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 2488f81b8a6..70ebcc225e3 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1,3 +1,4 @@ + #include "server-context.h" #include "server-common.h" #include "server-http.h" @@ -19,6 +20,7 @@ #include #include #include +#include // fix problem with std::min and std::max #if defined(_WIN32) @@ -33,6 +35,31 @@ using json = nlohmann::ordered_json; constexpr int HTTP_POLLING_SECONDS = 1; +static server_prompt_checkpoint server_get_checkpoint(llama_context * ctx, int id, int64_t n_tokens, llama_pos pos_min = -1, llama_pos pos_max = -1) { + if (pos_min == -1) { + pos_min = llama_memory_seq_pos_min(llama_get_memory(ctx), id); + } + if (pos_max == -1) { + pos_max = llama_memory_seq_pos_max(llama_get_memory(ctx), id); + } + + const size_t checkpoint_size = llama_state_seq_get_size_ext(ctx, id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + + auto cur = server_prompt_checkpoint { + /*.pos_min = */ pos_min, + /*.pos_max = */ pos_max, + /*.n_tokens = */ n_tokens, + /*.data = */ std::vector(checkpoint_size), + }; + + const size_t n = llama_state_seq_get_data_ext(ctx, cur.data.data(), checkpoint_size, id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + if (n != checkpoint_size) { + GGML_ABORT("checkpoint size mismatch: expected %zu, got %zu\n", checkpoint_size, n); + } + + return cur; +} + // state diagram: https://github.com/ggml-org/llama.cpp/pull/9283 enum slot_state { SLOT_STATE_IDLE, @@ -57,7 +84,12 @@ struct server_slot { // multimodal mtmd_context * mctx = nullptr; - common_speculative * spec = nullptr; + // speculative decoding + llama_tokens spec_draft; + std::vector spec_i_batch; + server_prompt_checkpoint spec_ckpt; + common_speculative_ptr spec; + // TODO: move members that belong to the task (such as `generated_text`, `has_new_line`) to task_results_state // see https://github.com/ggml-org/llama.cpp/pull/18283#issuecomment-3710175837 @@ -83,11 +115,6 @@ struct server_slot { std::string debug_generated_text; llama_tokens generated_tokens; - // idx of draft tokens in the main batch - // non-empty if we went to evaluate draft tokens - // ref: https://github.com/ggml-org/llama.cpp/pull/17808 - std::vector i_batch_dft; - std::vector generated_token_probs; bool has_next_token = true; @@ -147,8 +174,7 @@ struct server_slot { common_sampler_ptr smpl; - llama_token sampled; // in speculative mode, this is the last accepted token - llama_tokens drafted; + llama_token sampled; // in speculative mode, this is the last accepted token // stats size_t n_sent_text = 0; // number of sent text character @@ -178,8 +204,11 @@ struct server_slot { stopping_word = ""; n_sent_text = 0; - drafted.clear(); - i_batch_dft.clear(); + if (can_speculate()) { + spec_draft.clear(); + spec_i_batch.clear(); + spec_ckpt.clear(); + } generated_tokens.clear(); generated_token_probs.clear(); json_schema = json(); @@ -300,6 +329,85 @@ struct server_slot { return n_draft_max; } + void update_batch(llama_batch & batch) { + const int n_draft_max = get_n_draft_max(); + if (n_draft_max > 0) { + GGML_ASSERT(can_speculate()); + + // generate draft tokens in speculative decoding mode + // TODO: rework to have a single draft llama_context shared across all slots [TAG_SERVER_SPEC_REWORK] + // perform the speculative drafting for all sequences at the same time in a single batch + const llama_tokens & tokens = prompt.tokens.get_text_tokens(); + + const auto & params_spec = task->params.speculative; + + if (!spec_draft.empty()) { + // we have a previous (partial) draft to reuse + if (task->params.speculative.use_checkpoints) { + GGML_ASSERT(!spec_ckpt.empty()); + } + } else { + GGML_ASSERT(spec_i_batch.empty()); + + // generate a new draft + spec_draft = common_speculative_draft(spec.get(), params_spec, tokens, sampled); + + if (spec_draft.size() > (size_t) n_draft_max) { + SLT_WRN(*this, "draft size %d exceeds max %d, truncating\n", (int) spec_draft.size(), n_draft_max); + spec_draft.resize(n_draft_max); + } + + if (spec_draft.size() < (size_t) params_spec.n_min) { + SLT_DBG(*this, "ignoring small draft: %d < %d\n", (int) spec_draft.size(), params_spec.n_min); + spec_draft.clear(); + } + + if (!spec_draft.empty() && params_spec.use_checkpoints) { + const auto n_tokens = prompt.tokens.size(); + + auto & ckpt = spec_ckpt; + + ckpt = server_get_checkpoint(ctx, this->id, n_tokens); + + SLT_DBG(*this, "created speculative checkpoint (pos_min = %d, pos_max = %d, n_tokens = %zu, size = %.3f MiB)\n", + ckpt.pos_min, ckpt.pos_max, n_tokens, (float) ckpt.data.size() / 1024 / 1024); + } + } + + GGML_ASSERT(spec_draft.size() <= (size_t) n_draft_max); + } + + if (spec_draft.empty()) { + // no speculative decoding + i_batch = batch.n_tokens; + + common_batch_add(batch, sampled, prompt.tokens.pos_next(), { this->id }, true); + + SLT_DBG(*this, "slot decode token, id=%d, n_ctx = %d, n_tokens = %d, truncated = %d\n", + sampled, n_ctx, prompt.n_tokens(), truncated); + } else { + SLT_DBG(*this, "generate_draft: id=%d, #tokens=%zu, #draft=%zu, pos_next=%d\n", + sampled, prompt.tokens.size(), spec_draft.size(), prompt.tokens.pos_next()); + + GGML_ASSERT(spec_i_batch.empty()); + + spec_i_batch.push_back(batch.n_tokens); + for (size_t i = 0; i < spec_draft.size(); i++) { + spec_i_batch.push_back(batch.n_tokens + i + 1); + } + + auto pos0 = prompt.tokens.pos_next(); + + common_batch_add(batch, sampled, pos0++, { this->id }, true); + for (auto token : spec_draft) { + common_batch_add(batch, token, pos0++, { this->id }, true); + } + } + + prompt.tokens.push_back(sampled); + prompt.tokens.insert(spec_draft); + } + void release() { if (is_processing()) { GGML_ASSERT(task); @@ -400,7 +508,7 @@ struct server_slot { ); } - common_speculative_print_stats(spec); + common_speculative_print_stats(spec.get()); } json to_json(bool only_metrics = false) const { @@ -591,16 +699,17 @@ struct server_context_impl { void destroy() { llama_init.reset(); + ctx = nullptr; model = nullptr; mtmd_free(mctx); mctx = nullptr; - // Clear any sampling context for (server_slot & slot : slots) { - common_speculative_free(slot.spec); - slot.spec = nullptr; + if (slot.can_speculate()) { + slot.spec.reset(); + } } llama_batch_free(batch); @@ -642,9 +751,6 @@ struct server_context_impl { llama_init = common_init_from_params(params_base); - // propagate model-metadata sampling defaults back to caller - params.sampling = params_base.sampling; - model = llama_init->model(); ctx = llama_init->context(); @@ -660,6 +766,7 @@ struct server_context_impl { add_bos_token = llama_vocab_get_add_bos(vocab); if (params_base.speculative.has_dft()) { + // TODO speculative: move to common/speculative.cpp? SRV_INF("loading draft model '%s'\n", params_base.speculative.mparams_dft.path.c_str()); const auto & params_spec = params_base.speculative; @@ -727,11 +834,6 @@ struct server_context_impl { params_base.n_cache_reuse = 0; SRV_WRN("%s\n", "cache_reuse is not supported by multimodal, it will be disabled"); } - - if (params_base.speculative.type != COMMON_SPECULATIVE_TYPE_NONE) { - params_base.speculative.type = COMMON_SPECULATIVE_TYPE_NONE; - SRV_WRN("%s\n", "speculative decoding is not supported by multimodal, it will be disabled"); - } } if (!llama_memory_can_shift(llama_get_memory(ctx))) { @@ -769,14 +871,23 @@ struct server_context_impl { slots.clear(); - const bool can_spec = common_speculative_is_compat(ctx); - if (!can_spec) { + const auto spec_type = common_speculative_is_compat(ctx); + if (spec_type == COMMON_SPECULATIVE_COMPAT_TYPE_NO) { SRV_WRN("%s", "speculative decoding not supported by this context\n"); } + if (spec_type == COMMON_SPECULATIVE_COMPAT_TYPE_CKPT) { + SRV_WRN("%s", "speculative decoding will use checkpoints\n"); + params_base.speculative.use_checkpoints = true; + } + // initialize slots for (int i = 0; i < params_base.n_parallel; i++) { - server_slot slot; + slots.emplace_back(); + } + + for (int i = 0; i < params_base.n_parallel; i++) { + server_slot & slot = slots[i]; slot.id = i; slot.ctx = ctx; @@ -786,16 +897,11 @@ struct server_context_impl { slot.prompt.tokens.has_mtmd = mctx != nullptr; // try speculative decoding - if (can_spec) { - slot.spec = common_speculative_init(params_base.speculative, slot.ctx); + if (spec_type != COMMON_SPECULATIVE_COMPAT_TYPE_NO) { + slot.spec.reset(common_speculative_init(params_base.speculative, slot.ctx)); + if (slot.spec) { - if (mctx) { - SRV_ERR("%s\n", "speculative decoding is not supported with multimodal"); - return false; - } SLT_INF(slot, "%s", "speculative decoding context initialized\n"); - } else { - SLT_INF(slot, "%s", "speculative decoding context not initialized\n"); } } @@ -806,8 +912,6 @@ struct server_context_impl { }; slot.reset(); - - slots.push_back(std::move(slot)); } { @@ -854,6 +958,9 @@ struct server_context_impl { model_aliases = params_base.model_alias; model_tags = params_base.model_tags; + // propagate new defaults back to caller + params = params_base; + if (!is_resume) { return init(); } @@ -1197,7 +1304,7 @@ struct server_context_impl { backend_sampling &= task.params.sampling.backend_sampling; // TODO: speculative decoding requires multiple samples per batch - not supported yet - backend_sampling &= !(slot.spec && task.params.speculative.n_max > 0); + backend_sampling &= !(slot.can_speculate() && task.params.speculative.n_max > 0); // TODO: getting post/pre sampling logits is not yet supported with backend sampling backend_sampling &= !need_logits; @@ -1703,6 +1810,26 @@ struct server_context_impl { return true; } + // n_tokens_cur: the number of tokens added to the batch for the current slot + void create_checkpoint(server_slot & slot, const int64_t n_tokens_cur, llama_pos pos_min, llama_pos pos_max) { + while (slot.prompt.checkpoints.size() >= (size_t) params_base.n_ctx_checkpoints) { + // make room for the new checkpoint, if needed + const auto & cur = slot.prompt.checkpoints.front(); + + SLT_WRN(slot, "erasing old context checkpoint (pos_min = %d, pos_max = %d, n_tokens = %" PRId64 ", size = %.3f MiB)\n", + cur.pos_min, cur.pos_max, cur.n_tokens, (float) cur.data.size() / 1024 / 1024); + + slot.prompt.checkpoints.erase(slot.prompt.checkpoints.begin()); + } + + const auto & cur = slot.prompt.checkpoints.emplace_back(server_get_checkpoint(ctx, slot.id, slot.prompt.n_tokens() - n_tokens_cur, pos_min, pos_max)); + + SLT_WRN(slot, + "created context checkpoint %d of %d (pos_min = %d, pos_max = %d, n_tokens = %" PRId64 ", size = %.3f MiB)\n", + (int) slot.prompt.checkpoints.size(), params_base.n_ctx_checkpoints, cur.pos_min, + cur.pos_max, cur.n_tokens, (float) cur.data.size() / 1024 / 1024); + } + void process_single_task(server_task && task) { switch (task.type) { case SERVER_TASK_TYPE_COMPLETION: @@ -1854,7 +1981,7 @@ struct server_context_impl { std::string filename = task.slot_action.filename; std::string filepath = task.slot_action.filepath; - const llama_tokens & tokens = slot->prompt.tokens.get_text_tokens(); + const llama_tokens & tokens = slot->prompt.tokens.get_tokens(); const size_t nwrite = llama_state_seq_save_file(ctx, filepath.c_str(), slot->id, tokens.data(), token_count); const int64_t t_end = ggml_time_us(); @@ -2061,7 +2188,7 @@ struct server_context_impl { { GGML_ASSERT(!slot.prompt.tokens.has_mtmd); - llama_tokens new_tokens = slot.prompt.tokens.get_text_tokens(); // copy + llama_tokens new_tokens = slot.prompt.tokens.get_tokens(); // copy for (size_t i = n_keep + n_discard; i < new_tokens.size(); i++) { new_tokens[i - n_discard] = new_tokens[i]; } @@ -2100,61 +2227,7 @@ struct server_context_impl { continue; } - // generate draft tokens in speculative decoding mode - // TODO: rework to have a single draft llama_context shared across all slots [TAG_SERVER_SPEC_REWORK] - // perform the speculative drafting for all sequences at the same time in a single batch - const int n_draft_max = slot.get_n_draft_max(); - if (n_draft_max > 0) { - if (mctx) { - // we should never reach this, as speculative is automatically disabled if mmproj is loaded - GGML_ABORT("not supported by multimodal"); - } - - const llama_tokens & cached_text_tokens = slot.prompt.tokens.get_text_tokens(); - - const auto & params_spec = slot.task->params.speculative; - - llama_tokens draft = common_speculative_draft(slot.spec, params_spec, cached_text_tokens, slot.sampled); - - if (draft.size() > (size_t) n_draft_max) { - SLT_WRN(slot, "draft size %d exceeds max %d, truncating\n", (int) draft.size(), n_draft_max); - draft.resize(n_draft_max); - } - - // add the sampled token to the batch - slot.i_batch_dft.push_back(batch.n_tokens); - common_batch_add(batch, slot.sampled, slot.prompt.tokens.pos_next(), { slot.id }, true); - slot.prompt.tokens.push_back(slot.sampled); - - if (slot.task->params.speculative.n_min > (int) draft.size()) { - SLT_DBG(slot, "ignoring small draft: %d < %d\n", (int) draft.size(), slot.task->params.speculative.n_min); - // fallback to normal decoding - slot.i_batch = slot.i_batch_dft[0]; - slot.drafted.clear(); - slot.i_batch_dft.clear(); - } else { - // keep track of total number of drafted tokens tested - slot.n_draft_total += draft.size(); - - // add all drafted tokens to the batch - for (size_t i = 0; i < draft.size(); i++) { - slot.i_batch_dft.push_back(batch.n_tokens); - common_batch_add(batch, draft[i], slot.prompt.tokens.pos_next(), { slot.id }, true); - slot.prompt.tokens.push_back(draft[i]); - } - slot.drafted = std::move(draft); - } - } else { - // no speculative decoding - slot.i_batch = batch.n_tokens; - - common_batch_add(batch, slot.sampled, slot.prompt.tokens.pos_next(), { slot.id }, true); - - slot.prompt.tokens.push_back(slot.sampled); - - SLT_DBG(slot, "slot decode token, n_ctx = %d, n_tokens = %d, truncated = %d\n", - slot.n_ctx, slot.prompt.n_tokens(), slot.truncated); - } + slot.update_batch(batch); } // process in chunks of params.n_batch @@ -2651,40 +2724,12 @@ struct server_context_impl { // no need to create checkpoints that are too close together do_checkpoint = do_checkpoint && (slot.prompt.checkpoints.empty() || slot.prompt.n_tokens() - n_tokens_cur > slot.prompt.checkpoints.back().n_tokens + 64); + SLT_DBG(slot, "main/do_checkpoint = %s, pos_min = %d, pos_max = %d\n", do_checkpoint ? "yes" : "no", pos_min, pos_max); // note: we create the checkpoint before calling llama_decode(), so the current batch is not // yet processed and therefore it is not part of the checkpoint. if (do_checkpoint) { - while (slot.prompt.checkpoints.size() >= (size_t) params_base.n_ctx_checkpoints) { - // make room for the new checkpoint, if needed - const auto & cur = slot.prompt.checkpoints.front(); - - SLT_WRN(slot, - "erasing old context checkpoint (pos_min = %d, pos_max = %d, n_tokens = %" PRId64 - ", size = %.3f MiB)\n", - cur.pos_min, cur.pos_max, cur.n_tokens, (float) cur.data.size() / 1024 / 1024); - - slot.prompt.checkpoints.erase(slot.prompt.checkpoints.begin()); - } - - const size_t checkpoint_size = - llama_state_seq_get_size_ext(ctx, slot.id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); - - auto & cur = slot.prompt.checkpoints.emplace_back(server_prompt_checkpoint{ - /*.pos_min = */ pos_min, - /*.pos_max = */ pos_max, - /*.n_tokens = */ slot.prompt.n_tokens() - n_tokens_cur, - /*.data = */ std::vector(checkpoint_size), - }); - - llama_state_seq_get_data_ext(ctx, cur.data.data(), checkpoint_size, slot.id, - LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); - - SLT_WRN(slot, - "created context checkpoint %d of %d (pos_min = %d, pos_max = %d, n_tokens = %" PRId64 - ", size = %.3f MiB)\n", - (int) slot.prompt.checkpoints.size(), params_base.n_ctx_checkpoints, cur.pos_min, - cur.pos_max, cur.n_tokens, (float) cur.data.size() / 1024 / 1024); + create_checkpoint(slot, n_tokens_cur, pos_min, pos_max); } } @@ -2856,19 +2901,19 @@ struct server_context_impl { slot.state = SLOT_STATE_GENERATING; if (slot.can_speculate()) { - common_speculative_begin(slot.spec, slot.prompt.tokens.get_text_tokens()); + common_speculative_begin(slot.spec.get(), slot.prompt.tokens.get_text_tokens()); } } else if (slot.state != SLOT_STATE_GENERATING) { continue; // continue loop of slots } - if (slot.i_batch_dft.size() > 0) { + if (slot.can_speculate() && !slot.spec_draft.empty()) { continue; // sample using speculative decoding } const int tok_idx = slot.i_batch - i; - llama_token id = common_sampler_sample(slot.smpl.get(), ctx, tok_idx); + llama_token id = common_sampler_sample(slot.smpl.get(), slot.ctx, tok_idx); slot.i_batch = -1; @@ -2889,7 +2934,7 @@ struct server_context_impl { completion_token_output result; result.tok = id; - result.text_to_send = common_token_to_piece(ctx, result.tok, accept_special_token(slot, result.tok)); + result.text_to_send = common_token_to_piece(slot.ctx, result.tok, accept_special_token(slot, result.tok)); result.prob = 1.0f; // TODO: set it here instead of doing inside populate_token_probs if (slot.task->params.sampling.n_probs > 0) { @@ -2909,43 +2954,86 @@ struct server_context_impl { // speculative decoding - main model sample and accept for (auto & slot : slots) { - if (slot.state != SLOT_STATE_GENERATING || slot.i_batch_dft.empty()) { + if (slot.state != SLOT_STATE_GENERATING || !slot.can_speculate() || slot.spec_draft.empty()) { continue; } - const size_t n_draft = slot.drafted.size(); + // save the original draft size + const size_t n_draft = slot.spec_draft.size(); + + GGML_ASSERT(n_draft > 0); + + // verify and try to accept the draft + { + const auto & params_spec = slot.task->params.speculative; + + common_sampler_ptr smpl_save(common_sampler_clone(slot.smpl.get())); + + GGML_ASSERT(slot.spec_i_batch.size() == n_draft + 1); + auto accepted = common_sampler_sample_and_accept_n(slot.smpl.get(), slot.ctx, slot.spec_i_batch, slot.spec_draft); + slot.spec_i_batch.clear(); + + SLT_DBG(slot, "%s: n_draft=%zu, accepted=%zu\n", __func__, slot.spec_draft.size(), accepted.size()); + + GGML_ASSERT(accepted.size() >= 1); + + // check for partial draft acceptance + if (accepted.size() < slot.spec_draft.size() + 1) { + if (params_spec.use_checkpoints) { + // partial acceptance is not supported by the context -> truncate the draft and restore the state + slot.spec_draft = std::move(accepted); + + auto & ckpt = slot.spec_ckpt; + + SLT_DBG(slot, "restoring speculative checkpoint (pos_min = %d, pos_max = %d, size = %zu)\n", ckpt.pos_min, ckpt.pos_max, ckpt.size()); + + const size_t n = llama_state_seq_set_data_ext(slot.ctx, ckpt.data.data(), ckpt.size(), slot.id, LLAMA_STATE_SEQ_FLAGS_PARTIAL_ONLY); + if (n != ckpt.size()) { + GGML_ABORT("%s: failed to restore context checkpoint (pos_min=%d, pos_max=%d, size=%zu, get_data_ext->%zu, set_data_ext->%zu", + __func__, ckpt.pos_min, ckpt.pos_max, ckpt.size(), ckpt.size(), n); + } + + llama_memory_seq_rm(llama_get_memory(slot.ctx), slot.id, ckpt.pos_max + 1, -1); - // the accepted tokens from the speculation - const auto ids = common_sampler_sample_and_accept_n(slot.smpl.get(), ctx, slot.i_batch_dft, slot.drafted); - slot.i_batch_dft.clear(); - slot.drafted.clear(); + slot.prompt.tokens.keep_first(ckpt.n_tokens); + slot.smpl = std::move(smpl_save); + + continue; + } + + LOG_DBG("%s: partial acceptance: %zu < %zu\n", __func__, accepted.size(), slot.spec_draft.size()); + } + + common_speculative_accept(slot.spec.get(), accepted.size() - 1); + + slot.spec_draft = std::move(accepted); + } const int64_t t_current = ggml_time_us(); - slot.n_decoded += ids.size(); + const auto ids = std::move(slot.spec_draft); + slot.n_decoded += ids.size(); slot.t_token_generation = std::max(1, t_current - slot.t_start_generation) / 1e3; // update how many tokens out of those tested were accepted slot.n_draft_accepted += ids.size() - 1; - - // inform the speculative decoding about the number of accepted tokens - common_speculative_accept(slot.spec, ids.size() - 1); - - // rollback to the state before sampling the draft tokens - slot.prompt.tokens.keep_first(slot.prompt.n_tokens() - n_draft); + slot.n_draft_total += n_draft; // add accepted tokens to the prompt + slot.prompt.tokens.keep_first(slot.prompt.n_tokens() - n_draft); slot.prompt.tokens.insert({ids.begin(), ids.end() - 1}); + slot.sampled = ids.back(); // last accepted token + SLT_DBG(slot, "add accepted tokens: sampled=%d, ids.size=%zu, n_draft=%zu\n", slot.sampled, ids.size(), n_draft); - llama_memory_seq_rm(llama_get_memory(ctx), slot.id, slot.prompt.n_tokens(), -1); + llama_memory_seq_rm(llama_get_memory(slot.ctx), slot.id, slot.prompt.n_tokens(), -1); for (size_t i = 0; i < ids.size(); ++i) { completion_token_output result; result.tok = ids[i]; - result.text_to_send = common_token_to_piece(ctx, result.tok, accept_special_token(slot, result.tok)); + result.text_to_send = common_token_to_piece(slot.ctx, result.tok, accept_special_token(slot, result.tok)); result.prob = 1.0f; // set later // TODO: set result.probs @@ -3665,7 +3753,7 @@ void server_routes::init_routes() { params.n_predict, meta->slot_n_ctx, params.spm_infill, - tokenized_prompts[0].get_text_tokens() // TODO: this could maybe be multimodal. + tokenized_prompts[0].get_tokens() // TODO: this could maybe be multimodal. ); std::vector files; // dummy diff --git a/tools/server/server-task.cpp b/tools/server/server-task.cpp index 4fb953b4920..2187b8d21b5 100644 --- a/tools/server/server-task.cpp +++ b/tools/server/server-task.cpp @@ -162,7 +162,7 @@ common_chat_msg task_result_state::update_chat_msg( bool filter_tool_calls) { generated_text += text_added; auto msg_prv_copy = chat_msg; - SRV_DBG("Parsing chat message: %s\n", generated_text.c_str()); + //SRV_DBG("Parsing chat message: %s\n", generated_text.c_str()); auto new_msg = common_chat_parse( generated_text, is_partial, @@ -304,6 +304,8 @@ task_params server_task::params_from_json_cmpl( params.sampling.backend_sampling = json_value(data, "backend_sampling", defaults.sampling.backend_sampling); params.post_sampling_probs = json_value(data, "post_sampling_probs", defaults.post_sampling_probs); + params.speculative = defaults.speculative; + params.speculative.n_min = json_value(data, "speculative.n_min", defaults.speculative.n_min); params.speculative.n_max = json_value(data, "speculative.n_max", defaults.speculative.n_max); params.speculative.p_min = json_value(data, "speculative.p_min", defaults.speculative.p_min); diff --git a/tools/server/server-task.h b/tools/server/server-task.h index 95f39207b18..289e1fb8d24 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -576,6 +576,17 @@ struct server_prompt_checkpoint { size_t size() const { return data.size(); } + + bool empty() const { + return data.empty(); + } + + void clear() { + pos_min = 0; + pos_max = 0; + n_tokens = 0; + data.clear(); + } }; struct server_prompt { From 09b4efa95f8223f6d168f046c80fe4529a68117b Mon Sep 17 00:00:00 2001 From: texasich <101962694+texasich@users.noreply.github.com> Date: Sun, 19 Apr 2026 02:25:05 -0500 Subject: [PATCH 3/5] cmake: remove CMP0194 policy to restore MSVC builds (#21934) #21630 added the CMP0194 NEW policy to silence a CMake warning, but on Windows runners it caused CMake to prefer the MinGW toolchain for ASM and broke MSVC builds. Reverting only that policy block restores the previous working behavior. The CMake 4.1+ warning comes back, but that is cosmetic and does not break any platform. Reported-by: oobabooga Refs: #21630 Co-authored-by: texasich --- ggml/CMakeLists.txt | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ggml/CMakeLists.txt b/ggml/CMakeLists.txt index 6b65ecd6e5c..a0eb9204eab 100644 --- a/ggml/CMakeLists.txt +++ b/ggml/CMakeLists.txt @@ -1,11 +1,5 @@ cmake_minimum_required(VERSION 3.14...3.28) # for add_link_options and implicit target directories. -# ref: https://cmake.org/cmake/help/latest/policy/CMP0194.html -# MSVC is not a valid assembler for the ASM language. -# Set to NEW to avoid a warning on CMake 4.1+ with MSVC. -if (POLICY CMP0194) - cmake_policy(SET CMP0194 NEW) -endif() project("ggml" C CXX ASM) ### GGML Version From 8685e7b07582957924393d912fffe6f4588e235e Mon Sep 17 00:00:00 2001 From: Dowon Date: Sun, 19 Apr 2026 16:25:39 +0900 Subject: [PATCH 4/5] convert : support sentence-transformer 5.4 config files (#22087) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * convert : support sentence-transformer 5.4 config files * fix: embeddinggemma * fix: mapping Co-authored-by: Sigbjørn Skjæret * fix: pooling_mode Co-authored-by: Sigbjørn Skjæret --------- Co-authored-by: Sigbjørn Skjæret --- convert_hf_to_gguf.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/convert_hf_to_gguf.py b/convert_hf_to_gguf.py index 2df5e94fe23..5b4fb79fc1b 100755 --- a/convert_hf_to_gguf.py +++ b/convert_hf_to_gguf.py @@ -1850,20 +1850,28 @@ def _try_set_pooling_type(self) -> None: with open(module_path, encoding="utf-8") as f: modules = json.load(f) for mod in modules: - if mod["type"] == "sentence_transformers.models.Pooling": + if mod["type"].endswith("Pooling"): pooling_path = mod["path"] break + mode_mapping = { + "mean": gguf.PoolingType.MEAN, + "cls": gguf.PoolingType.CLS, + "lasttoken": gguf.PoolingType.LAST, + } + # get pooling type if pooling_path is not None: with open(self.dir_model / pooling_path / "config.json", encoding="utf-8") as f: pooling = json.load(f) - if pooling["pooling_mode_mean_tokens"]: + if pooling.get("pooling_mode_mean_tokens"): pooling_type = gguf.PoolingType.MEAN - elif pooling["pooling_mode_cls_token"]: + elif pooling.get("pooling_mode_cls_token"): pooling_type = gguf.PoolingType.CLS - elif pooling["pooling_mode_lasttoken"]: + elif pooling.get("pooling_mode_lasttoken"): pooling_type = gguf.PoolingType.LAST + elif (pooling_mode := pooling.get("pooling_mode")) in mode_mapping: + pooling_type = mode_mapping[pooling_mode] else: raise NotImplementedError("Only MEAN, CLS, and LAST pooling types supported") self.gguf_writer.add_pooling_type(pooling_type) @@ -7180,7 +7188,7 @@ def __init__(self, *args, **kwargs): with open(modules_file, encoding="utf-8") as modules_json_file: mods = json.load(modules_json_file) for mod in mods: - if mod["type"] == "sentence_transformers.models.Dense": + if mod["type"].endswith("Dense"): mod_path = mod["path"] # check if model.safetensors file for Dense layer exists model_tensors_file = self.dir_model / mod_path / "model.safetensors" From 037bfe38d0297001869df87150286952ae94cb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sigbj=C3=B8rn=20Skj=C3=A6ret?= Date: Sun, 19 Apr 2026 09:32:08 +0200 Subject: [PATCH 5/5] ci : install spirv-headers for vulkan-cross (#22109) --- .github/workflows/build-cross.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build-cross.yml b/.github/workflows/build-cross.yml index 74508129ac5..aef45afdeac 100644 --- a/.github/workflows/build-cross.yml +++ b/.github/workflows/build-cross.yml @@ -246,6 +246,7 @@ jobs: apt-get install -y --no-install-recommends \ build-essential \ glslc \ + spirv-headers \ gcc-14-loongarch64-linux-gnu \ g++-14-loongarch64-linux-gnu \ libvulkan-dev:loong64