diff --git a/DS4CACHE.md b/DS4CACHE.md new file mode 100644 index 000000000..6f6aab329 --- /dev/null +++ b/DS4CACHE.md @@ -0,0 +1,159 @@ +# DS4C In-Memory Cache + +In-memory point checkpoints and live tail-cache for single-session DS4C. + +> **Design target**: Designed for machines with **256 GB or more** of unified/system +> memory. The default checkpoint budget alone is 8192 MiB; enabling tail-cache +> adds further per-request memory via the rolling swa_shard ring. On smaller +> machines evaluate memory pressure before enabling. + +## Overview + +Two complementary caches, both 128-token-aligned and correctness-proven: + +| Cache | Purpose | Interval | Storage | +|-------|---------|----------|---------| +| **Point checkpoint** | Coarse persistent cache | 4096 tokens | Fixed-size array, sorted by position | +| **Tail cache** | Dense near-frontier cache | 128 tokens | 18-slot rolling ring | + +Neither changes the model's raw-SWA window or attention geometry. Both use +exact token-prefix matching (SHA1 hash), not rendered-text SHA1. + +![Point checkpoint](ds4c-point-checkpoint.png) + +![Tail cache](ds4c-tail-cache.png) + +## Point checkpoint + +### Design + +- Captured at `POINT_CHECKPOINT_INTERVAL_TOKENS` boundaries (default 4096) +- Token-prefix exact match (SHA1 of token IDs) +- Sorted array, insert by position, evict oldest when over budget +- Memory budget: `--kv-point-checkpoint-space-mb` (default 8192 MiB) + +### Budget and eviction + +Budget must support at least **10 checkpoints** to be useful. A budget too small +for this many entries defeats the purpose — the cache would evict entries faster +than they accumulate meaningful reuse. + +Per-checkpoint size depends on model geometry and context depth: +``` +swa_shard bytes ≈ N_LAYER × min(point, N_SWA) × N_HEAD_DIM × 4 + + token IDs + logits + frontier counters +``` + +Typical sizes for common configurations: + +| Context | N_SWA | Approx per entry | 10-entry minimum | +|---------|-------|------------------|------------------| +| 32K | 4096 | ~500–650 MiB | ~5–6.5 GiB | +| 128K | 4096 | ~500–650 MiB | ~5–6.5 GiB | + +The swa_shard saves only raw sliding-window attention rows (not compressed KV), +so it caps at `N_SWA` regardless of context depth. The default 8192 MiB budget +covers ~12–16 entries for typical models. + +Eviction is **FIFO by position** (oldest first). Entries are sorted by ascending +token position; when budget is exceeded during insert, `entries[0]` (lowest +position = oldest) is evicted. Buffers are freed on eviction, not preserved. +No LRU scoring — position ordering is sufficient for monotonic checkpoint +progression. + +### Lifecycle + +``` +request → tokenize → scan checkpoints → SHA1 match? + hit → load swa_shard(K) + → invalidate entries > K (metadata clear, preserve buffers) + → rebuild verify hash from live session + → build effective prompt then warm prefill + miss → fall through to tail cache / cold prefill + +prefill done → position % 4096 == 0 → capture swa_shard → insert sorted +``` + +### Compile-time geometry + +`POINT_CHECKPOINT_INTERVAL_TOKENS`: +- `0` = disabled (also disables tail cache) +- `>= 2048` and `% 128 == 0` = valid + +Startup validation refuses incompatible combinations (e.g. tail cache enabled +with point checkpoints disabled). + +## Tail cache + +### Design + +- 18-slot rolling ring, 128-token spacing +- Covers `[latest_point - lookback_tokens, latest_point]` +- `tail_rows = lookback_tokens + 2 × min_rewind_tokens` +- Pointwise hit only: exact token-prefix match at a stored point with non-empty suffix +- No arbitrary rewind, no range restore, no LCP match + +### Lifecycle + +``` +request → scan tail ring → pointwise exact match? + hit → load swa_shard(K) + → triple verify (count + token[i] vs stored + token[i] vs prompt) + → reset ALL tail metadata (keep buffers) + → build effective prompt then warm prefill + miss → fall through to point checkpoint + +decode progress → store 128-aligned frontiers (rolling window) + → trim points older than lookback window (metadata invalid only) + +X-C0NR-MOTIF: on → skip decode-time cache writes (control traffic) +``` + +### Ring dynamics + +- `next` slot overwrites oldest (round-robin) +- Window trim invalidates entries before `latest_point - lookback_tokens` +- Full reset after any hit (single-track constraint) + +## Runtime metadata + +### `GET /__ds4/runtime` + +Returns JSON with server identity, model name, context size, and log file paths +for downstream observability: + +```json +{"ok":true, "server":"ds4-server", "model":"deepseek-v4-flash", + "ctx":32768, + "logs":{"stdout":"/path/to/server.log", "stderr":"/path/to/server.err"}} +``` + +### RSS reporting + +Uses `task_vm_info_data_t.phys_footprint` (macOS Activity Monitor equivalent), +which includes Metal GPU buffer allocations unlike `resident_size`. + +## CLI flags + +| Flag | Default | Meaning | +|------|---------|---------| +| `--kv-point-checkpoint-space-mb N` | 8192 | Point checkpoint memory budget; 0=unlimited | +| `--kv-prefix-lookback-tokens N` | 0 | Enable tail cache over last N tokens | +| `--kv-prefix-lookback-min-rewind-tokens N` | 128 | Minimum rewind distance for tail hit | +| `--kv-cache-verbose` | off | Log active checkpoint metadata after events | + +## Invariants + +- 128-aligned positions only +- Token-prefix exact match (not text SHA1) +- `swa_shard` restore copies raw SWA + frontier counters, no tensor repair +- Semantic invalidation clears metadata, preserves buffers +- Buffer lifetime separate from semantic validity +- Motif decode updates skipped via `X-C0NR-MOTIF: on` + +## Tested + +- Apple Silicon M3 Ultra, 512 GB unified memory +- Metal backend only (CPU backend untested) +- `ds4_server_unit_tests_run()` passes all checkpoint/tail-cache tests +- `make ds4-server ds4_test && ds4_test` passes diff --git a/ds4.c b/ds4.c index 39694470d..b0ab6a6a9 100644 --- a/ds4.c +++ b/ds4.c @@ -7275,6 +7275,13 @@ typedef struct { uint32_t head_dim; } ds4_kv_cache; +uint32_t ds4_tail_swa_rows(uint32_t ctx_size) { + uint32_t rows = DS4_N_SWA; + if (rows > ctx_size) rows = ctx_size; + if (rows == 0) rows = 1; + return rows; +} + static uint32_t ds4_default_raw_cap(uint32_t ctx_size) { uint32_t raw_cap = DS4_N_SWA; if (raw_cap > ctx_size) raw_cap = ctx_size; @@ -18833,6 +18840,284 @@ void ds4_session_snapshot_free(ds4_session_snapshot *snap) { memset(snap, 0, sizeof(*snap)); } +void ds4_session_swa_shard_free(ds4_session_swa_shard *shard) { + if (!shard) return; + free(shard->ptr); + memset(shard, 0, sizeof(*shard)); +} + +/* ========================================================================= + * SWA shard — partial raw-SWA KV data. It can be restored only onto a + * compatible trunk and is not a standalone session snapshot. + * ========================================================================= */ + +static uint64_t session_swa_payload_live_tensor_bytes(const ds4_gpu_graph *g, uint32_t checkpoint_len) { + uint64_t bytes = 0; + const uint32_t raw_live = session_raw_live_rows(g, checkpoint_len); + for (uint32_t il = 0; il < DS4_N_LAYER; il++) { + bytes += (uint64_t)raw_live * DS4_N_HEAD_DIM * sizeof(float); + } + return bytes; +} + +static uint64_t ds4_session_swa_shard_payload_bytes_at(ds4_session *s, uint32_t checkpoint_len) { + if (!s || !s->checkpoint_valid) return 0; + if (ds4_session_is_cpu(s)) return 0; /* SWA shard not supported for CPU path */ + const ds4_gpu_graph *g = &s->graph; + uint64_t bytes = (uint64_t)DS4_SESSION_PAYLOAD_U32_FIELDS * sizeof(uint32_t); + bytes += (uint64_t)checkpoint_len * sizeof(int); /* token IDs */ + bytes += (uint64_t)DS4_N_VOCAB * sizeof(float); /* logits */ + bytes += (uint64_t)DS4_N_LAYER * sizeof(uint32_t); /* n_comp frontier */ + bytes += (uint64_t)DS4_N_LAYER * sizeof(uint32_t); /* n_index_comp frontier */ + bytes += session_swa_payload_live_tensor_bytes(g, checkpoint_len); + return bytes; +} + +uint64_t ds4_session_swa_shard_payload_bytes(ds4_session *s) { + if (!s || !s->checkpoint_valid) return 0; + return ds4_session_swa_shard_payload_bytes_at(s, (uint32_t)s->checkpoint.len); +} + +int ds4_session_save_swa_shard_at(ds4_session *s, int point, ds4_session_swa_shard *shard, char *err, size_t errlen) { + if (!s || !shard) { + payload_set_err(err, errlen, "invalid swa shard save"); + return 1; + } + if (!s->checkpoint_valid || point <= 0 || point > s->checkpoint.len) { + payload_set_err(err, errlen, "invalid swa shard point"); + return 1; + } + if (ds4_session_is_cpu(s)) { + payload_set_err(err, errlen, "SWA shard not supported on CPU backend"); + return 1; + } +#ifndef DS4_NO_GPU + if (ds4_gpu_synchronize() == 0) { + payload_set_err(err, errlen, "failed to synchronize accelerator before swa shard"); + return 1; + } + ds4_gpu_graph *g = &s->graph; + const uint32_t checkpoint_len = (uint32_t)point; + const uint32_t raw_live = session_raw_live_rows(g, checkpoint_len); + const uint32_t raw_first = checkpoint_len - raw_live; + if ((uint32_t)s->checkpoint.len - raw_first > g->raw_cap) { + payload_set_err(err, errlen, "swa shard point is outside live raw cache window"); + return 1; + } + + uint32_t header[DS4_SESSION_PAYLOAD_U32_FIELDS] = { + DS4_SESSION_PAYLOAD_MAGIC, + DS4_SESSION_PAYLOAD_VERSION, + (uint32_t)s->ctx_size, + s->prefill_cap, + g->raw_cap, + g->raw_window, + g->comp_cap, + checkpoint_len, + DS4_N_LAYER, + DS4_N_HEAD_DIM, + DS4_N_INDEXER_HEAD_DIM, + DS4_N_VOCAB, + raw_live, + }; + + const uint64_t bytes = ds4_session_swa_shard_payload_bytes_at(s, checkpoint_len); + if (shard->cap < bytes) { + uint8_t *p = realloc(shard->ptr, (size_t)bytes); + if (!p) { + payload_set_err(err, errlen, "out of memory allocating swa shard"); + return 1; + } + shard->ptr = p; + shard->cap = bytes; + } + + FILE *fp = fmemopen(shard->ptr, (size_t)bytes, "wb"); + if (!fp) { + payload_set_err(err, errlen, "failed to open memory stream for swa shard"); + return 1; + } + int rc = 0; + for (uint32_t i = 0; rc == 0 && i < DS4_SESSION_PAYLOAD_U32_FIELDS; i++) + rc = payload_write_u32(fp, header[i], err, errlen); + for (uint32_t i = 0; rc == 0 && i < checkpoint_len; i++) + rc = payload_write_u32(fp, (uint32_t)s->checkpoint.v[i], err, errlen); + if (rc == 0) + rc = payload_write_bytes(fp, s->logits, (uint64_t)DS4_N_VOCAB * sizeof(float), err, errlen); + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + const uint32_t ratio = ds4_layer_compress_ratio(il); + const uint32_t n_comp = ratio == 0 ? 0 : checkpoint_len / ratio; + rc = payload_write_u32(fp, n_comp, err, errlen); + } + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + const uint32_t ratio = ds4_layer_compress_ratio(il); + const uint32_t n_index_comp = ratio == 4 ? checkpoint_len / ratio : 0; + rc = payload_write_u32(fp, n_index_comp, err, errlen); + } + + uint8_t *buf = NULL; + if (rc == 0) buf = xmalloc(DS4_SESSION_IO_CHUNK); + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + for (uint32_t r = 0; rc == 0 && r < raw_live; r++) { + const uint32_t pos = raw_first + r; + const uint32_t phys = pos % g->raw_cap; + rc = payload_write_tensor_span(fp, + g->layer_raw_cache[il], + (uint64_t)phys * DS4_N_HEAD_DIM * sizeof(float), + (uint64_t)DS4_N_HEAD_DIM * sizeof(float), + buf, DS4_SESSION_IO_CHUNK, err, errlen); + } + } + free(buf); + if (fclose(fp) != 0 && rc == 0) { + payload_set_err(err, errlen, "failed to finalize swa shard"); + return 1; + } + if (rc != 0) return 1; + shard->len = bytes; + return 0; +#else + payload_set_err(err, errlen, "GPU support not compiled in"); + return 1; +#endif +} + +int ds4_session_save_swa_shard(ds4_session *s, ds4_session_swa_shard *shard, char *err, size_t errlen) { + if (!s || !s->checkpoint_valid) { + payload_set_err(err, errlen, "invalid swa shard save"); + return 1; + } + return ds4_session_save_swa_shard_at(s, s->checkpoint.len, shard, err, errlen); +} + +int ds4_session_load_swa_shard(ds4_session *s, const ds4_session_swa_shard *shard, char *err, size_t errlen) { + if (!s || !shard || !shard->ptr || shard->len == 0) { + payload_set_err(err, errlen, "invalid swa shard load"); + return 1; + } + if (ds4_session_is_cpu(s)) { + payload_set_err(err, errlen, "SWA shard not supported on CPU backend"); + return 1; + } +#ifndef DS4_NO_GPU + if (shard->len > (uint64_t)SIZE_MAX) { + payload_set_err(err, errlen, "swa shard is too large for this platform"); + return 1; + } + FILE *fp = fmemopen((void *)shard->ptr, (size_t)shard->len, "rb"); + if (!fp) { + payload_set_err(err, errlen, "failed to open memory stream for swa shard restore"); + return 1; + } + + uint64_t remaining = shard->len; + uint32_t h[DS4_SESSION_PAYLOAD_U32_FIELDS]; + int rc = 0; + for (uint32_t i = 0; rc == 0 && i < DS4_SESSION_PAYLOAD_U32_FIELDS; i++) + rc = payload_read_u32(fp, &h[i], &remaining, err, errlen); + if (rc != 0) { fclose(fp); return 1; } + if (h[0] != DS4_SESSION_PAYLOAD_MAGIC || h[1] != DS4_SESSION_PAYLOAD_VERSION) { + fclose(fp); + payload_set_err(err, errlen, "unsupported swa shard version"); + return 1; + } + const uint32_t saved_tokens = h[7]; + const uint32_t saved_raw_window = h[5]; + const uint32_t saved_raw_cap = h[4]; + const uint32_t saved_raw_live = h[12]; + if (h[8] != DS4_N_LAYER || h[9] != DS4_N_HEAD_DIM || + h[10] != DS4_N_INDEXER_HEAD_DIM || h[11] != DS4_N_VOCAB) + { + fclose(fp); + payload_set_err(err, errlen, "swa shard was written for a different DS4 layout"); + return 1; + } + + ds4_gpu_graph *g = &s->graph; + if (saved_raw_window != g->raw_window) { + fclose(fp); + payload_set_err(err, errlen, "swa shard graph chunk layout does not match current runtime"); + return 1; + } + const uint32_t expected_raw_live = saved_tokens < saved_raw_window ? saved_tokens : saved_raw_window; + if (saved_raw_cap == 0 || saved_raw_live != expected_raw_live || saved_raw_live > g->raw_cap) { + fclose(fp); + payload_set_err(err, errlen, "swa shard raw ring layout does not match"); + return 1; + } + + token_vec new_checkpoint = {0}; + for (uint32_t i = 0; rc == 0 && i < saved_tokens; i++) { + uint32_t tok = 0; + rc = payload_read_u32(fp, &tok, &remaining, err, errlen); + if (rc == 0) token_vec_push(&new_checkpoint, (int)tok); + } + if (rc == 0) + rc = payload_read_bytes(fp, s->logits, (uint64_t)DS4_N_VOCAB * sizeof(float), &remaining, err, errlen); + + uint32_t n_comp[DS4_MAX_LAYER]; + uint32_t n_index_comp[DS4_MAX_LAYER]; + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + rc = payload_read_u32(fp, &n_comp[il], &remaining, err, errlen); + if (rc == 0 && n_comp[il] > g->layer_comp_cap[il]) { + rc = 1; + payload_set_err(err, errlen, "swa shard has invalid compressed row count"); + } + } + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + rc = payload_read_u32(fp, &n_index_comp[il], &remaining, err, errlen); + if (rc == 0 && n_index_comp[il] > g->layer_comp_cap[il]) { + rc = 1; + payload_set_err(err, errlen, "swa shard has invalid indexer row count"); + } + } + + if (rc == 0 && ds4_gpu_synchronize() == 0) { + token_vec_free(&new_checkpoint); + fclose(fp); + payload_set_err(err, errlen, "failed to synchronize accelerator before swa shard restore"); + return 1; + } + + s->checkpoint_valid = false; + s->mtp_draft_valid = false; + g->mtp_n_raw = 0; + + uint8_t *buf = NULL; + if (rc == 0) buf = xmalloc(DS4_SESSION_IO_CHUNK); + for (uint32_t il = 0; rc == 0 && il < DS4_N_LAYER; il++) { + const uint32_t raw_first = saved_tokens - saved_raw_live; + for (uint32_t r = 0; rc == 0 && r < saved_raw_live; r++) { + const uint32_t pos = raw_first + r; + const uint32_t phys = pos % g->raw_cap; + rc = payload_read_tensor_span(fp, + g->layer_raw_cache[il], + (uint64_t)phys * DS4_N_HEAD_DIM * sizeof(float), + (uint64_t)DS4_N_HEAD_DIM * sizeof(float), + buf, DS4_SESSION_IO_CHUNK, &remaining, err, errlen); + } + g->layer_n_comp[il] = n_comp[il]; + g->layer_n_index_comp[il] = n_index_comp[il]; + } + free(buf); + + if (rc != 0) { + token_vec_free(&new_checkpoint); + fclose(fp); + return 1; + } + + token_vec_free(&s->checkpoint); + s->checkpoint = new_checkpoint; + s->checkpoint_valid = true; + fclose(fp); + return 0; +#else + payload_set_err(err, errlen, "GPU support not compiled in"); + return 1; +#endif +} + void ds4_engine_dump_tokens(ds4_engine *e, const ds4_tokens *tokens) { dump_tokens(&e->vocab, tokens); } diff --git a/ds4.h b/ds4.h index 7b7233c36..66e5642b2 100644 --- a/ds4.h +++ b/ds4.h @@ -131,6 +131,12 @@ typedef struct { uint64_t bytes; } ds4_session_payload_file; +typedef struct { + uint8_t *ptr; + uint64_t len; + uint64_t cap; +} ds4_session_swa_shard; + int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt); void ds4_engine_close(ds4_engine *e); void ds4_engine_summary(ds4_engine *e); @@ -231,6 +237,7 @@ ds4_session_rewrite_result ds4_session_rewrite_from_common( ds4_session *s, const ds4_tokens *prompt, int common, char *err, size_t errlen); int ds4_session_common_prefix(ds4_session *s, const ds4_tokens *prompt); +uint32_t ds4_tail_swa_rows(uint32_t ctx_size); int ds4_session_argmax(ds4_session *s); int ds4_session_argmax_excluding(ds4_session *s, int excluded_id); int ds4_sample_logits(const float *logits, int n_vocab, float temperature, @@ -297,6 +304,14 @@ int ds4_session_load_payload(ds4_session *s, FILE *fp, uint64_t payload_bytes, c int ds4_session_save_snapshot(ds4_session *s, ds4_session_snapshot *snap, char *err, size_t errlen); int ds4_session_load_snapshot(ds4_session *s, const ds4_session_snapshot *snap, char *err, size_t errlen); void ds4_session_snapshot_free(ds4_session_snapshot *snap); +void ds4_session_swa_shard_free(ds4_session_swa_shard *shard); + +/* SWA shard: partial raw-SWA data that can be restored only onto a + * compatible trunk. This is not a standalone session snapshot. */ +uint64_t ds4_session_swa_shard_payload_bytes(ds4_session *s); +int ds4_session_save_swa_shard(ds4_session *s, ds4_session_swa_shard *shard, char *err, size_t errlen); +int ds4_session_save_swa_shard_at(ds4_session *s, int point, ds4_session_swa_shard *shard, char *err, size_t errlen); +int ds4_session_load_swa_shard(ds4_session *s, const ds4_session_swa_shard *shard, char *err, size_t errlen); uint64_t ds4_session_layer_payload_bytes(ds4_session *s, uint32_t layer_start, diff --git a/ds4_server.c b/ds4_server.c index f5c96e885..a814a63ec 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -38,6 +38,7 @@ #include #include #include +#include static volatile sig_atomic_t g_stop_requested = 0; static volatile sig_atomic_t g_listen_fd = -1; @@ -604,6 +605,7 @@ typedef struct { ds4_think_mode think_mode; bool has_tools; bool prompt_preserves_reasoning; + bool skip_decode_cache_updates; /* X-C0NR-MOTIF: on */ /* For /v1/responses: emit reasoning_summary_* events / fields only when the * client opted in via reasoning.summary. Other APIs leave this false; the * field is ignored on those code paths. */ @@ -4655,10 +4657,10 @@ static bool try_repair_dsml(const char *s, size_t len, buf *out) { size_t d; if ((d = strlen(ts)) && !strncmp(p, ts, d)) { tos++; p += d; } else if ((d = strlen(te)) && !strncmp(p, te, d)) { toe++; p += d; } - else if ((d = strlen(is)) && !strncmp(p, is, d)) { ios++; p += d; } - else if ((d = strlen(ie)) && !strncmp(p, ie, d)) { ioe++; p += d; } else if ((d = strlen(ps)) && !strncmp(p, ps, d)) { pos++; p += d; } else if ((d = strlen(pe)) && !strncmp(p, pe, d)) { poe++; p += d; } + else if ((d = strlen(is)) && !strncmp(p, is, d)) { ios++; p += d; } + else if ((d = strlen(ie)) && !strncmp(p, ie, d)) { ioe++; p += d; } else p++; } if (tos == toe && ios == ioe && pos == poe) return false; @@ -7593,6 +7595,19 @@ static double now_sec(void) { return (double)ts.tv_sec + (double)ts.tv_nsec * 1e-9; } +/* Get current physical memory footprint in MiB (equivalent to Activity + * Monitor's "Memory" column on macOS). Uses phys_footprint which includes + * Metal GPU buffer allocations, mmap-backed pages, and wired memory — + * unlike resident_size which excludes clean file-backed pages. */ +static double get_rss_mib(void) { + task_vm_info_data_t info; + mach_msg_type_number_t count = TASK_VM_INFO_COUNT; + if (task_info(mach_task_self(), TASK_VM_INFO, + (task_info_t)&info, &count) != KERN_SUCCESS) + return 0.0; + return (double)info.phys_footprint / (1024.0 * 1024.0); +} + static void server_log(ds4_log_type type, const char *fmt, ...) { time_t now = time(NULL); struct tm tm; @@ -7694,6 +7709,50 @@ typedef struct { size_t visible_len; } visible_live_state; +#define POINT_CHECKPOINT_ALIGNMENT_TOKENS 128 +#define POINT_CHECKPOINT_MIN_INTERVAL_TOKENS 2048 +#ifndef POINT_CHECKPOINT_INTERVAL_TOKENS +#define POINT_CHECKPOINT_INTERVAL_TOKENS 4096 +#endif +#define POINT_CHECKPOINT_DEFAULT_SPACE_MB 8192 + +#define PREFIX_LOOKBACK_MAX_TOKENS 2048 +#define PREFIX_LOOKBACK_DEFAULT_MIN_REWIND 128 +#define PREFIX_LOOKBACK_RING_SLOTS 18 + +typedef struct { + int point; /* absolute token position */ + int offset; + bool valid; /* semantic validity; buffers may remain allocated */ + char token_prefix_hash[41]; /* SHA1(token_ids[0..point)) at capture time */ + char verify_hash[41]; /* SHA1(live[0..point)) — rebuilt after each invalidation */ + ds4_tokens tokens; /* exact token IDs at capture time, for prefix comparison */ + ds4_session_swa_shard swa_shard; /* partial raw-SWA data for this point */ + time_t created_at; + int hits; +} point_checkpoint_entry; + +typedef struct { + int lookback_tokens; + int min_rewind_tokens; + int tail_rows; + int hits; + int count; + int next; + int last_store_point; + uint64_t total_bytes; + point_checkpoint_entry entries[PREFIX_LOOKBACK_RING_SLOTS]; +} prefix_lookback_cache; + +typedef struct { + int interval; /* N = capture at every N-token boundary */ + uint64_t max_bytes; /* memory budget in bytes; 0 = unlimited (one entry) */ + point_checkpoint_entry *entries; + int count; + int capacity; + uint64_t total_bytes; +} point_checkpoint_cache; + static bool id_list_contains(const stop_list *ids, const char *id); static void id_list_push_unique(stop_list *ids, const char *id); @@ -7707,6 +7766,8 @@ struct server { live_tool_state anthropic_live; visible_live_state thinking_live; bool disable_exact_dsml_tool_replay; + bool cache_verbose; + bool point_checkpoint_replay_boundary_token; bool enable_cors; pthread_mutex_t tool_mu; pthread_mutex_t mu; @@ -7720,6 +7781,8 @@ struct server { FILE *trace; pthread_mutex_t trace_mu; uint64_t trace_seq; + prefix_lookback_cache prefix_lookback; + point_checkpoint_cache point_checkpoint; }; /* Jobs are stack-owned by the client thread. The worker signals completion @@ -7734,6 +7797,98 @@ struct job { job *next; }; +static void checkpoint_time_str(time_t t, char *buf, size_t len) { + if (!buf || len == 0) return; + if (t <= 0) { + snprintf(buf, len, "-"); + return; + } + struct tm tm; + localtime_r(&t, &tm); + strftime(buf, len, "%Y-%m-%dT%H:%M:%S%z", &tm); +} + +static long checkpoint_age_seconds(time_t t) { + time_t now = time(NULL); + return t > 0 && now >= t ? (long)(now - t) : 0; +} + +static bool is_point_checkpoint_boundary(const server *s, int point) { + return s && s->point_checkpoint.interval > 0 && point > 0 && + point % s->point_checkpoint.interval == 0; +} + +static int prefix_lookback_effective_tokens(const prefix_lookback_cache *c) { + if (!c || c->lookback_tokens <= 0) return 0; + return c->lookback_tokens < PREFIX_LOOKBACK_MAX_TOKENS ? + c->lookback_tokens : PREFIX_LOOKBACK_MAX_TOKENS; +} + +static void point_checkpoint_refresh_offsets(point_checkpoint_cache *c) { + if (!c) return; + for (int i = 0; i < c->count; i++) c->entries[i].offset = i; +} + +static int point_checkpoint_find_index(const point_checkpoint_cache *c, + int point, + const char hash[41]) { + if (!c || !hash) return -1; + for (int i = 0; i < c->count; i++) { + const point_checkpoint_entry *e = &c->entries[i]; + if (e->point == point && strncmp(e->token_prefix_hash, hash, 40) == 0) + return i; + } + return -1; +} + +static void log_active_checkpoints(server *s, const char *reason) { + if (!s || !s->cache_verbose) return; + prefix_lookback_cache *pl = &s->prefix_lookback; + point_checkpoint_cache *pc = &s->point_checkpoint; + server_log(DS4_LOG_KVCACHE, + "ds4-server: active checkpoints reason=%s prefix_lookback_tokens=%d min_rewind=%d tail_rows=%d prefix_hits=%d point=%d/%d point_bytes=%llu", + reason ? reason : "?", + pl->lookback_tokens, + pl->min_rewind_tokens, + pl->tail_rows, + pl->hits, + pc->count, + pc->capacity, + (unsigned long long)pc->total_bytes); + for (int i = 0; i < pc->count; i++) { + point_checkpoint_entry *e = &pc->entries[i]; + char ts[40]; + checkpoint_time_str(e->created_at, ts, sizeof(ts)); + server_log(DS4_LOG_KVCACHE, + "ds4-server: active point checkpoint offset=%d point=%d created_at=%s age=%lds hits=%d tokens=%d bytes=%llu hash=%.12s", + e->offset, + e->point, + ts, + checkpoint_age_seconds(e->created_at), + e->hits, + e->tokens.len, + (unsigned long long)e->swa_shard.cap, + e->token_prefix_hash); + } + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry *e = &pl->entries[i]; + if (!e->valid || !e->swa_shard.ptr) continue; + char ts[40]; + checkpoint_time_str(e->created_at, ts, sizeof(ts)); + server_log(DS4_LOG_KVCACHE, + "ds4-server: active tail checkpoint class=tail offset=%d point=%d point_boundary=%d created_at=%s age=%lds hits=%d tokens=%d bytes=%llu hash=%.12s", + e->offset, + e->point, + is_point_checkpoint_boundary(s, e->point) ? 1 : 0, + ts, + checkpoint_age_seconds(e->created_at), + e->hits, + e->tokens.len, + (unsigned long long)e->swa_shard.cap, + e->token_prefix_hash); + } +} + /* ========================================================================= * Tool Call Text Memory. * ========================================================================= @@ -8626,6 +8781,556 @@ static void build_prompt_from_exact_prefix_and_text_suffix( engine, exact_prefix, suffix_text, out); } +static void sha1_token_prefix_hex(const ds4_tokens *tokens, int prefix_len, char out[41]); +static void point_checkpoint_entry_free(point_checkpoint_entry *entry); +static void point_checkpoint_entry_invalidate(point_checkpoint_entry *entry); + +static void prefix_lookback_free(prefix_lookback_cache *c) { + if (!c) return; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry_free(&c->entries[i]); + } + memset(c, 0, sizeof(*c)); +} + +static void point_checkpoint_rebuild_index(server *s); +static int point_checkpoint_first_token_mismatch(const point_checkpoint_entry *e, + const ds4_tokens *prompt); +static void point_checkpoint_log_token_mismatch(const char *where, + const point_checkpoint_entry *e, + const ds4_tokens *prompt); + +static void prefix_lookback_refresh_offsets(prefix_lookback_cache *c) { + if (!c) return; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) c->entries[i].offset = i; +} + +static int prefix_lookback_valid_count(const prefix_lookback_cache *c) { + if (!c) return 0; + int n = 0; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + if (c->entries[i].valid && c->entries[i].swa_shard.ptr) n++; + } + return n; +} + +static int prefix_lookback_latest_valid_point(const prefix_lookback_cache *c) { + if (!c) return 0; + int point = 0; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + const point_checkpoint_entry *e = &c->entries[i]; + if (e->valid && e->swa_shard.ptr && e->point > point) point = e->point; + } + return point; +} + +static void prefix_lookback_refresh_state(prefix_lookback_cache *c) { + if (!c) return; + c->count = prefix_lookback_valid_count(c); + c->last_store_point = prefix_lookback_latest_valid_point(c); + prefix_lookback_refresh_offsets(c); +} + +static void prefix_lookback_reset_metadata(prefix_lookback_cache *c) { + if (!c) return; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry_invalidate(&c->entries[i]); + } + prefix_lookback_refresh_state(c); +} + +static int prefix_lookback_spacing(const prefix_lookback_cache *c) { + (void)c; + return 128; +} + +static bool prefix_lookback_has_nearby_point(const prefix_lookback_cache *c, int point) { + if (!c || point <= 0) return false; + int spacing = prefix_lookback_spacing(c); + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + const point_checkpoint_entry *e = &c->entries[i]; + if (!e->valid || !e->swa_shard.ptr) continue; + int d = e->point > point ? e->point - point : point - e->point; + if (d < spacing) return true; + } + return false; +} + +static void prefix_lookback_trim_window(prefix_lookback_cache *c, int latest_point) { + if (!c || c->lookback_tokens <= 0 || latest_point <= 0) return; + int floor = latest_point - prefix_lookback_effective_tokens(c); + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry *e = &c->entries[i]; + if (e->valid && e->point < floor) point_checkpoint_entry_invalidate(e); + } + prefix_lookback_refresh_state(c); +} + +static int prefix_lookback_try_load(server *s, const ds4_tokens *prompt, + ds4_tokens *effective_prompt, char *err, size_t errlen) { + prefix_lookback_cache *c = s ? &s->prefix_lookback : NULL; + if (!s || !c || c->lookback_tokens <= 0 || !prompt || prompt->len <= 0) { + (void)err; (void)errlen; + return 0; + } + point_checkpoint_entry *best = NULL; + point_checkpoint_entry *nearest_mismatch = NULL; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry *e = &c->entries[i]; + int mismatch_pos = point_checkpoint_first_token_mismatch(e, prompt); + const char *skip_reason = NULL; + if (!e->valid) skip_reason = "invalid"; + else if (!e->swa_shard.ptr) skip_reason = "no_swa_shard"; + else if (e->point <= 0) skip_reason = "bad_point"; + else if (e->point > (int)prompt->len) skip_reason = "point_gt_prompt"; + else if (e->point == (int)prompt->len) skip_reason = "no_suffix"; + else if (e->point % 128 != 0) skip_reason = "unaligned"; + else if (mismatch_pos < e->point) { + if (!nearest_mismatch) nearest_mismatch = e; + continue; + } + if (skip_reason) continue; + if (mismatch_pos == e->point && (!best || e->point > best->point)) best = e; + } + if (!best) { + if (nearest_mismatch) point_checkpoint_log_token_mismatch("tail-lookup", nearest_mismatch, prompt); + return 0; + } + int checkpoint_point = best->point; + int restore_point = checkpoint_point; + server_log(DS4_LOG_KVCACHE, + "ds4-server: tail checkpoint hit class=tail offset=%d point=%d prompt=%d suffix=%d point_boundary=%d hash=%s bytes=%llu", + best->offset, + best->point, + prompt->len, + prompt->len - best->point, + is_point_checkpoint_boundary(s, best->point) ? 1 : 0, + best->token_prefix_hash, + (unsigned long long)best->swa_shard.cap); + if (ds4_session_load_swa_shard(s->session, &best->swa_shard, err, errlen) != 0) + return 0; + const ds4_tokens *loaded = ds4_session_tokens(s->session); + if (!loaded || loaded->len < checkpoint_point) { + snprintf(err, errlen, "tail checkpoint load token mismatch"); + ds4_session_invalidate(s->session); + return 0; + } + for (int i = 0; i < checkpoint_point; i++) { + if (i >= (int)prompt->len || loaded->v[i] != best->tokens.v[i] || loaded->v[i] != prompt->v[i]) { + snprintf(err, errlen, "tail checkpoint token-id mismatch at %d", i); + ds4_session_invalidate(s->session); + return 0; + } + } + ds4_tokens_copy(effective_prompt, loaded); + for (int i = restore_point; i < (int)prompt->len; i++) + ds4_tokens_push(effective_prompt, prompt->v[i]); + best->hits++; + c->hits++; + prefix_lookback_reset_metadata(c); + return restore_point; +} + +static bool prefix_lookback_store_at(server *s, const ds4_tokens *tokens, int position, const char *ctx) { + if (!s || !tokens || s->prefix_lookback.lookback_tokens <= 0) return false; + if (position <= 0 || position % 128 != 0) return false; + if (position > tokens->len) return false; + prefix_lookback_cache *c = &s->prefix_lookback; + char live_hash[41] = {0}; + sha1_token_prefix_hex(tokens, position, live_hash); + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + point_checkpoint_entry *e = &c->entries[i]; + if (e->valid && e->point == position && + strncmp(e->token_prefix_hash, live_hash, 40) == 0) + return true; + } + int slot = c->next % PREFIX_LOOKBACK_RING_SLOTS; + c->next = (slot + 1) % PREFIX_LOOKBACK_RING_SLOTS; + point_checkpoint_entry *entry = &c->entries[slot]; + uint64_t old_cap = entry->swa_shard.cap; + char err[160] = {0}; + if (ds4_session_save_swa_shard_at(s->session, position, &entry->swa_shard, err, sizeof(err)) != 0) { + server_log(DS4_LOG_WARNING, + "ds4-server: tail checkpoint swa capture failed at %d: %s", + position, err); + return false; + } + tokens_copy_prefix(&entry->tokens, tokens, position); + entry->point = position; + entry->valid = true; + memcpy(entry->token_prefix_hash, live_hash, sizeof(entry->token_prefix_hash)); + memcpy(entry->verify_hash, live_hash, sizeof(entry->verify_hash)); + entry->created_at = time(NULL); + entry->hits = 0; + if (entry->swa_shard.cap > old_cap) c->total_bytes += entry->swa_shard.cap - old_cap; + else if (old_cap > entry->swa_shard.cap && c->total_bytes >= old_cap - entry->swa_shard.cap) + c->total_bytes -= old_cap - entry->swa_shard.cap; + c->last_store_point = position; + prefix_lookback_trim_window(c, position); + server_log(DS4_LOG_KVCACHE, + "ds4-server: tail checkpoint stored class=tail offset=%d point=%d point_boundary=%d hash=%s bytes=%llu total=%llu ctx=%s", + entry->offset, entry->point, + is_point_checkpoint_boundary(s, entry->point) ? 1 : 0, + entry->token_prefix_hash, + (unsigned long long)entry->swa_shard.cap, + (unsigned long long)c->total_bytes, + ctx ? ctx : "-"); + log_active_checkpoints(s, "tail-store"); + return true; +} + +static void prefix_lookback_store_window(server *s, const ds4_tokens *tokens, const char *ctx) { + if (!s || !tokens || s->prefix_lookback.lookback_tokens <= 0) return; + prefix_lookback_cache *c = &s->prefix_lookback; + int end = tokens->len - (tokens->len % 128); + if (end <= 0) return; + int spacing = prefix_lookback_spacing(c); + int floor = end - prefix_lookback_effective_tokens(c); + int start = c->last_store_point > 0 ? c->last_store_point + spacing : floor; + if (start < floor) start = floor; + if (start < 128) start = 128; + start += (128 - (start % 128)) % 128; + for (int point = start; point <= end; point += spacing) { + if (prefix_lookback_has_nearby_point(c, point)) continue; + prefix_lookback_store_at(s, tokens, point, ctx); + } +} + +static void prefix_lookback_maybe_store_current(server *s, const char *ctx) { + if (!s || s->prefix_lookback.lookback_tokens <= 0) return; + const ds4_tokens *tokens = ds4_session_tokens(s->session); + if (!tokens || tokens->len <= 0) return; + prefix_lookback_store_window(s, tokens, ctx); +} + +/* ========================================================================= + * Phase-4 in-memory pointwise checkpoint cache + * ========================================================================= */ + +static void sha1_token_prefix_hex(const ds4_tokens *tokens, int prefix_len, char out[41]) { + ds4_kvstore_sha1_bytes_hex(tokens->v, (size_t)prefix_len * sizeof(int), out); +} + +static void point_checkpoint_entry_free(point_checkpoint_entry *entry) { + if (!entry) return; + ds4_tokens_free(&entry->tokens); + ds4_session_swa_shard_free(&entry->swa_shard); + memset(entry, 0, sizeof(*entry)); +} + +static void point_checkpoint_entry_invalidate(point_checkpoint_entry *entry) { + if (!entry) return; + entry->valid = false; + entry->tokens.len = 0; + entry->swa_shard.len = 0; + entry->token_prefix_hash[0] = '\0'; + entry->verify_hash[0] = '\0'; + entry->created_at = 0; + entry->hits = 0; +} + +static void point_checkpoint_cache_free(point_checkpoint_cache *c) { + if (!c) return; + for (int i = 0; i < c->count; i++) point_checkpoint_entry_free(&c->entries[i]); + free(c->entries); + memset(c, 0, sizeof(*c)); +} + +static void point_checkpoint_evict_oldest(point_checkpoint_cache *c) { + if (!c || c->count <= 0) return; + c->total_bytes -= c->entries[0].swa_shard.cap; + point_checkpoint_entry_free(&c->entries[0]); + if (c->count > 1) + memmove(c->entries, c->entries + 1, (size_t)(c->count - 1) * sizeof(point_checkpoint_entry)); + c->count--; + if (c->entries) memset(&c->entries[c->count], 0, sizeof(c->entries[c->count])); + point_checkpoint_refresh_offsets(c); +} + +static void point_checkpoint_invalidate_after(point_checkpoint_cache *c, int point) { + if (!c || c->count <= 0) return; + for (int i = 0; i < c->count; i++) + if (c->entries[i].point > point) point_checkpoint_entry_invalidate(&c->entries[i]); + point_checkpoint_refresh_offsets(c); +} + +static void point_checkpoint_rebuild_index(server *s) { + point_checkpoint_cache *c = &s->point_checkpoint; + if (!c || c->count <= 0) return; + const ds4_tokens *live = ds4_session_tokens(s->session); + if (!live || live->len <= 0) return; + for (int i = 0; i < c->count; i++) { + point_checkpoint_entry *e = &c->entries[i]; + if (!e->valid || !e->swa_shard.ptr) continue; + if (e->point > live->len) { + memset(e->verify_hash, 0, sizeof(e->verify_hash)); + continue; + } + sha1_token_prefix_hex(live, e->point, e->verify_hash); + } +} + +static bool point_checkpoint_insert(point_checkpoint_cache *c, point_checkpoint_entry *entry) { + if (!c || !entry) return false; + uint64_t cost = entry->swa_shard.cap; + int idx = 0; + while (idx < c->count && c->entries[idx].point < entry->point) idx++; + if (idx < c->count && c->entries[idx].point == entry->point) { + if (c->total_bytes >= c->entries[idx].swa_shard.cap) c->total_bytes -= c->entries[idx].swa_shard.cap; + else c->total_bytes = 0; + point_checkpoint_entry_free(&c->entries[idx]); + c->entries[idx] = *entry; + c->total_bytes += cost; + memset(entry, 0, sizeof(*entry)); + point_checkpoint_refresh_offsets(c); + return true; + } + /* Evict oldest while over budget (except keep at least one entry when max_bytes==0) */ + while (c->count > 0 && c->max_bytes > 0 && c->total_bytes + cost > c->max_bytes) + point_checkpoint_evict_oldest(c); + idx = 0; + while (idx < c->count && c->entries[idx].point < entry->point) idx++; + if (c->count >= c->capacity) { + int new_cap = c->capacity ? c->capacity * 2 : 4; + point_checkpoint_entry *new_entries = realloc(c->entries, + (size_t)new_cap * sizeof(point_checkpoint_entry)); + if (!new_entries) { point_checkpoint_entry_free(entry); return false; } + c->entries = new_entries; + c->capacity = new_cap; + } + memmove(c->entries + idx + 1, c->entries + idx, + (size_t)(c->count - idx) * sizeof(point_checkpoint_entry)); + c->entries[idx] = *entry; + c->count++; + c->total_bytes += cost; + memset(entry, 0, sizeof(*entry)); + point_checkpoint_refresh_offsets(c); + return true; +} + +static int point_checkpoint_first_token_mismatch(const point_checkpoint_entry *e, + const ds4_tokens *prompt) { + if (!e || !prompt) return 0; + int limit = e->tokens.len < prompt->len ? e->tokens.len : prompt->len; + if (limit > e->point) limit = e->point; + for (int i = 0; i < limit; i++) { + if (e->tokens.v[i] != prompt->v[i]) return i; + } + return limit; +} + +static void point_checkpoint_log_token_mismatch(const char *where, + const point_checkpoint_entry *e, + const ds4_tokens *prompt) { + if (!e || !prompt) return; + int pos = point_checkpoint_first_token_mismatch(e, prompt); + int ckpt = pos < e->tokens.len ? e->tokens.v[pos] : -1; + int incoming = pos < prompt->len ? prompt->v[pos] : -1; + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint token-id mismatch %s point=%d at=%d checkpoint=%d prompt=%d prompt_len=%d", + where ? where : "?", e->point, pos, ckpt, incoming, prompt->len); +} + +static void point_checkpoint_capture_maybe(server *s, int position) { + if (!s || s->point_checkpoint.interval <= 0) return; + if (position <= 0) return; + int interval = s->point_checkpoint.interval; + if (position % interval != 0) return; + point_checkpoint_cache *c = &s->point_checkpoint; + const ds4_tokens *live = ds4_session_tokens(s->session); + if (!live || live->len < position) return; + char live_hash[41] = {0}; + sha1_token_prefix_hex(live, position, live_hash); + if (c->count > 0) { + for (int i = 0; i < c->count; i++) { + if (c->entries[i].valid && c->entries[i].point == position && + strncmp(live_hash, c->entries[i].token_prefix_hash, 40) == 0) + return; + } + } + int existing = -1; + for (int i = 0; i < c->count; i++) { + if (c->entries[i].point == position) { + existing = i; + break; + } + } + if (existing >= 0) { + point_checkpoint_entry *entry = &c->entries[existing]; + uint64_t old_cap = entry->swa_shard.cap; + char err[160] = {0}; + if (ds4_session_save_swa_shard(s->session, &entry->swa_shard, err, sizeof(err)) != 0) { + server_log(DS4_LOG_WARNING, + "ds4-server: point checkpoint swa capture failed at %d: %s", + position, err); + return; + } + tokens_copy_prefix(&entry->tokens, live, position); + entry->valid = true; + memcpy(entry->token_prefix_hash, live_hash, sizeof(entry->token_prefix_hash)); + memcpy(entry->verify_hash, live_hash, sizeof(entry->verify_hash)); + entry->created_at = time(NULL); + entry->hits = 0; + if (entry->swa_shard.cap > old_cap) c->total_bytes += entry->swa_shard.cap - old_cap; + char ts[40]; + checkpoint_time_str(entry->created_at, ts, sizeof(ts)); + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint stored offset=%d point=%d created_at=%s hash=%s bytes=%llu total=%llu interval=%d reuse=1", + entry->offset, entry->point, ts, entry->token_prefix_hash, + (unsigned long long)entry->swa_shard.cap, + (unsigned long long)c->total_bytes, + c->interval); + log_active_checkpoints(s, "point-store"); + return; + } + point_checkpoint_entry entry = {0}; + entry.point = position; + char err[160] = {0}; + if (ds4_session_save_swa_shard(s->session, &entry.swa_shard, err, sizeof(err)) != 0) { + server_log(DS4_LOG_WARNING, + "ds4-server: point checkpoint swa capture failed at %d: %s", + position, err); + return; + } + tokens_copy_prefix(&entry.tokens, live, position); + entry.valid = true; + memcpy(entry.token_prefix_hash, live_hash, sizeof(entry.token_prefix_hash)); + entry.created_at = time(NULL); + entry.hits = 0; + memcpy(entry.verify_hash, live_hash, sizeof(entry.verify_hash)); + int stored_point = entry.point; + char stored_hash[41] = {0}; + memcpy(stored_hash, entry.token_prefix_hash, sizeof(stored_hash)); + time_t stored_created_at = entry.created_at; + uint64_t stored_bytes = entry.swa_shard.cap; + if (point_checkpoint_insert(c, &entry)) { + int stored_offset = point_checkpoint_find_index(c, stored_point, stored_hash); + char ts[40]; + checkpoint_time_str(stored_created_at, ts, sizeof(ts)); + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint stored offset=%d point=%d created_at=%s hash=%s bytes=%llu total=%llu interval=%d", + stored_offset, stored_point, ts, stored_hash, + (unsigned long long)stored_bytes, + (unsigned long long)c->total_bytes, + c->interval); + log_active_checkpoints(s, "point-store"); + } +} + +static int point_checkpoint_try_load(server *s, const ds4_tokens *prompt, + ds4_tokens *effective_prompt, char *err, size_t errlen) { + point_checkpoint_cache *c = &s->point_checkpoint; + if (!c || c->interval <= 0 || c->count <= 0) { + (void)err; (void)errlen; + return 0; + } + /* Point checkpoints are RAM SWA shards keyed by the incoming + * prompt's token prefix. Do not cap them by the current live LCP: a replay + * with clamped max_tokens can diverge in sampled decode while still having + * deeper prefill checkpoints from the previous full prompt. */ + { + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint manifest start count=%d prompt=%d", + c->count, (int)prompt->len); + point_checkpoint_entry *best = NULL; + point_checkpoint_entry *nearest_mismatch = NULL; + for (int i = c->count - 1; i >= 0; i--) { + point_checkpoint_entry *e = &c->entries[i]; + int mismatch_pos = point_checkpoint_first_token_mismatch(e, prompt); + const char *skip_reason = NULL; + if (!e->valid) skip_reason = "invalid"; + else if (!e->swa_shard.ptr) skip_reason = "no_swa_shard"; + else if (e->point <= 0) skip_reason = "bad_point"; + else if (e->point > (int)prompt->len) skip_reason = "point_gt_prompt"; + else if (mismatch_pos < e->point) { + int ckpt = mismatch_pos < e->tokens.len ? e->tokens.v[mismatch_pos] : -1; + int prom = mismatch_pos < (int)prompt->len ? prompt->v[mismatch_pos] : -1; + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint manifest scan offset=%d point=%d skip=mismatch at=%d ckpt=%d prompt=%d", + e->offset, e->point, mismatch_pos, ckpt, prom); + if (!nearest_mismatch) nearest_mismatch = e; + } + if (skip_reason) { + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint manifest scan offset=%d point=%d skip=%s prompt=%d", + e->offset, e->point, skip_reason, (int)prompt->len); + continue; + } + if (mismatch_pos == e->point) { + best = e; + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint manifest hit offset=%d point=%d tokens=%d bytes=%llu", + e->offset, e->point, e->tokens.len, + (unsigned long long)e->swa_shard.cap); + break; + } + } + if (!best) { + if (nearest_mismatch) + point_checkpoint_log_token_mismatch("lookup", nearest_mismatch, prompt); + server_log(DS4_LOG_WARNING, + "ds4-server: point checkpoint miss count=%d prompt=%d no_match", + c->count, (int)prompt->len); + return 0; + } + int checkpoint_point = best->point; + if (checkpoint_point <= 0) return 0; + if (ds4_session_load_swa_shard(s->session, &best->swa_shard, err, errlen) != 0) + return 0; + const ds4_tokens *loaded = ds4_session_tokens(s->session); + if (!loaded || loaded->len < checkpoint_point) { + snprintf(err, errlen, "point checkpoint load token mismatch"); + ds4_session_invalidate(s->session); + return 0; + } + for (int i = 0; i < checkpoint_point; i++) { + if (i >= (int)prompt->len || loaded->v[i] != best->tokens.v[i] || loaded->v[i] != prompt->v[i]) { + int loaded_id = loaded && i < loaded->len ? loaded->v[i] : -1; + int ckpt_id = i < best->tokens.len ? best->tokens.v[i] : -1; + int prompt_id = i < prompt->len ? prompt->v[i] : -1; + snprintf(err, errlen, + "point checkpoint token-id mismatch at %d loaded=%d checkpoint=%d prompt=%d", + i, loaded_id, ckpt_id, prompt_id); + ds4_session_invalidate(s->session); + return 0; + } + } + int restore_point = checkpoint_point; + if (s->point_checkpoint_replay_boundary_token) { + restore_point = checkpoint_point - 1; + ds4_session_rewind(s->session, restore_point); + loaded = ds4_session_tokens(s->session); + } + ds4_tokens_copy(effective_prompt, loaded); + for (int i = restore_point; i < (int)prompt->len; i++) + ds4_tokens_push(effective_prompt, prompt->v[i]); + best->hits++; + /* Higher checkpoints belong to the previous token track; warm prefill + * must regenerate them with the current prompt's hashes and token IDs. + * The optional boundary-token replay is experimental; the default path + * restores the exact checkpoint point. */ + point_checkpoint_invalidate_after(c, restore_point); + point_checkpoint_rebuild_index(s); + if (s->cache_verbose) { + char ts[40]; + checkpoint_time_str(best->created_at, ts, sizeof(ts)); + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint hit checkpoint offset=%d point=%d restore=%d replay_boundary=%d created_at=%s age=%lds hits=%d tokens=%d bytes=%llu hash=%.12s", + best->offset, + best->point, + restore_point, + s->point_checkpoint_replay_boundary_token ? 1 : 0, + ts, + checkpoint_age_seconds(best->created_at), + best->hits, + best->tokens.len, + (unsigned long long)best->swa_shard.cap, + best->token_prefix_hash); + } + return restore_point; + } +} + static int kv_cache_store_len(const kv_disk_cache *kc, int tokens) { return ds4_kvstore_store_len(kc, tokens); } @@ -8830,6 +9535,24 @@ static int kv_cache_try_load(server *s, const request *req, req && req->api == API_RESPONSES); } + + + + + + + + + + + + + + + + + + static int live_text_prefix_prompt(server *s, const request *req, ds4_tokens *effective_prompt) { if (!s || !req || !req->prompt_text || !effective_prompt) return 0; @@ -9206,6 +9929,8 @@ static uint64_t trace_begin( (unsigned long long)j->req.seed); fprintf(s->trace, "stream_include_usage: %d\n", j->req.stream_include_usage ? 1 : 0); + fprintf(s->trace, "skip_decode_cache_updates: %d\n", + j->req.skip_decode_cache_updates ? 1 : 0); trace_write_cache_diag(s, cache_diag, &j->req.tool_replay, cached, cache_source, disk_cached, disk_path); if (j->req.raw_body) { @@ -9305,6 +10030,7 @@ typedef struct { const char *phase; bool has_tools; bool responses_protocol; + bool skip_decode_cache_updates; double t0; double last_t; int last_current; @@ -9345,6 +10071,7 @@ static void log_flags(char *buf, size_t len, bool responses_protocol, } static void log_decode_progress(req_kind kind, int prompt_tokens, int completion, + int max_tokens, bool responses_protocol, bool tools, bool thinking, bool dsml_start, bool dsml_end, @@ -9356,6 +10083,7 @@ static void log_decode_progress(req_kind kind, int prompt_tokens, int completion const int interval_tokens = completion - *last_completion; const double chunk_tps = interval_s > 0.0 ? (double)interval_tokens / interval_s : 0.0; const double avg_tps = elapsed > 0.0 ? (double)completion / elapsed : 0.0; + const double pct = max_tokens > 0 ? 100.0 * (double)completion / (double)max_tokens : 0.0; char ctx[48]; request_ctx_span(ctx, sizeof(ctx), prompt_tokens + *last_completion, @@ -9364,10 +10092,12 @@ static void log_decode_progress(req_kind kind, int prompt_tokens, int completion log_flags(flags, sizeof(flags), responses_protocol, tools, thinking, dsml_start, dsml_end); server_log(DS4_LOG_GENERATION, - "ds4-server: %s ctx=%s gen=%d%s%s decoding chunk=%.2f t/s avg=%.2f t/s %.3fs", + "ds4-server: %s ctx=%s gen=%d max=%d pct=%.1f%%%s%s decoding chunk=%.2f t/s avg=%.2f t/s %.3fs", kind == REQ_CHAT ? "chat" : "completion", ctx, completion, + max_tokens, + pct, flags[0] ? " " : "", flags, chunk_tps, @@ -9575,6 +10305,7 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot if (p->seen && current == p->last_current) { if (p->srv && current > p->cached_tokens) { kv_cache_maybe_store_continued(p->srv); + prefix_lookback_maybe_store_current(p->srv, p->ctx); } return; } @@ -9616,6 +10347,8 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot elapsed); if (p->srv && current > p->cached_tokens) { kv_cache_maybe_store_continued(p->srv); + prefix_lookback_maybe_store_current(p->srv, p->ctx); + point_checkpoint_capture_maybe(p->srv, current); } } @@ -9827,6 +10560,7 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct .cached_tokens = loaded, .phase = "tool checkpoint rebuild", .has_tools = j->req.has_tools, + .skip_decode_cache_updates = j->req.skip_decode_cache_updates, .t0 = rebuild_t0, .fd = j->fd, .stream = j->req.stream, @@ -9916,21 +10650,56 @@ static void generate_job(server *s, job *j) { ds4_tokens effective_prompt = {0}; const ds4_tokens *prompt_for_sync = &j->req.prompt; const bool responses_protocol = j->req.api == API_RESPONSES; - bool responses_live_continuation = false; - bool anthropic_live_continuation = false; - bool thinking_live_continuation = false; - const char *responses_live_match = NULL; - int responses_live_match_ids = 0; - int anthropic_live_match_ids = 0; + int cached = 0; + const char *cache_source = "none"; + if (cached == 0) { + char restore_err[160] = {0}; + cached = prefix_lookback_try_load(s, &j->req.prompt, &effective_prompt, + restore_err, sizeof(restore_err)); + if (cached > 0) { + cache_source = "memory-prefix-lookback"; + prompt_for_sync = &effective_prompt; + server_log(DS4_LOG_KVCACHE, + "ds4-server: prefix lookback hit tokens=%d", cached); + } else if (restore_err[0]) { + server_log(DS4_LOG_WARNING, + "ds4-server: prefix lookback restore failed: %s", + restore_err); + } + } + /* Phase-4: pointwise in-memory checkpoint fallback after Phase-3 miss */ + if (cached == 0 && s->point_checkpoint.interval > 0) { + char checkpoint_err[160] = {0}; + cached = point_checkpoint_try_load(s, &j->req.prompt, &effective_prompt, + checkpoint_err, sizeof(checkpoint_err)); + if (cached > 0) { + cache_source = "memory-point-checkpoint"; + prompt_for_sync = &effective_prompt; + server_log(DS4_LOG_KVCACHE, + "ds4-server: point checkpoint hit tokens=%d", cached); + } else if (checkpoint_err[0]) { + server_log(DS4_LOG_WARNING, + "ds4-server: point checkpoint restore failed: %s", + checkpoint_err); + } + } /* Responses gets the first chance to continue from live state. This is * the whole point of the API shape: a request that is bound to prior live * output by visible transcript or tool call ids does not need to prove an * exact token-prefix match. Exact token/text/disk matching remains the * fallback when the live state is absent or no longer describes the * request. */ - int cached = responses_live_visible_prefix_prompt(s, &j->req, old_pos, - &effective_prompt); - const char *cache_source = cached > 0 ? "responses-visible" : "none"; + bool responses_live_continuation = false; + bool anthropic_live_continuation = false; + bool thinking_live_continuation = false; + const char *responses_live_match = NULL; + int responses_live_match_ids = 0; + int anthropic_live_match_ids = 0; + if (cached == 0) { + cached = responses_live_visible_prefix_prompt(s, &j->req, old_pos, + &effective_prompt); + cache_source = cached > 0 ? "responses-visible" : "none"; + } if (cached > 0) { responses_live_match = "visible-prefix"; if (responses_live_matches_request(s, &j->req.responses_live_call_ids, @@ -10027,6 +10796,20 @@ static void generate_job(server *s, job *j) { prompt_for_sync = &effective_prompt; } } + + + + + + + + + + + + + + const bool responses_reasoning_state_preserved = cached > 0 && ((!strcmp(cache_source, "responses-visible") || @@ -10049,6 +10832,16 @@ static void generate_job(server *s, job *j) { cache_source, disk_cached, disk_cache_path); char ctx_span[48]; request_ctx_span(ctx_span, sizeof(ctx_span), cached, prompt_tokens); + server_log(DS4_LOG_KVCACHE, + "ds4-server: cache decision source=%s cached=%d prompt=%d suffix=%d motif=%s skip_decode_cache_updates=%d", + cache_source, + cached, + prompt_tokens, + prompt_tokens > cached ? prompt_tokens - cached : 0, + j->req.skip_decode_cache_updates ? "on" : "off", + j->req.skip_decode_cache_updates ? 1 : 0); + server_log(DS4_LOG_KVCACHE, + "ds4-server: rss %.1f MiB", get_rss_mib()); server_prefill_progress progress = { .srv = s, .kind = j->req.kind, @@ -10056,6 +10849,7 @@ static void generate_job(server *s, job *j) { .cached_tokens = cached, .has_tools = j->req.has_tools, .responses_protocol = responses_protocol, + .skip_decode_cache_updates = j->req.skip_decode_cache_updates, .t0 = t0, .fd = j->fd, .stream = j->req.stream, @@ -10192,7 +10986,9 @@ static void generate_job(server *s, job *j) { ctx_span, req_flags[0] ? " " : "", req_flags, - now_sec() - t0); + now_sec() - t0); + prefix_lookback_store_window(s, prompt_for_sync, ctx_span); + point_checkpoint_capture_maybe(s, prompt_for_sync->len); if (cold_store_len == prompt_for_sync->len) { if (kv_cache_store_live_prefix(s, prompt_for_sync, cold_store_len, "cold")) { kv_cache_note_store(&s->kv, cold_store_len); @@ -10299,6 +11095,7 @@ static void generate_job(server *s, job *j) { thinking_gates_tool_markers && thinking.inside; dsml_decode_tracker dsml_tracker; dsml_decode_tracker_init(&dsml_tracker); + bool logged_decode_cache_skip = false; while (!g_stop_requested && completion < max_tokens && ds4_session_pos(s->session) < ds4_session_ctx(s->session)) { @@ -10307,6 +11104,14 @@ static void generate_job(server *s, job *j) { const bool in_tool_call = dsml_decode_state_is_tool(dsml_state); if (!(j->req.kind == REQ_CHAT && j->req.has_tools && (saw_tool_start || in_tool_call))) { kv_cache_maybe_store_continued(s); + if (!j->req.skip_decode_cache_updates) { + prefix_lookback_maybe_store_current(s, ctx_span); + } else if (!logged_decode_cache_skip) { + server_log(DS4_LOG_KVCACHE, + "ds4-server: decode cache updates skipped motif=on ctx=%s", + ctx_span); + logged_decode_cache_skip = true; + } } float temperature = j->req.temperature; int top_k = j->req.top_k; @@ -10485,6 +11290,7 @@ static void generate_job(server *s, job *j) { if (completion >= next_decode_log) { log_decode_progress(j->req.kind, prompt_tokens, completion, + max_tokens, responses_protocol, j->req.has_tools, thinking.inside, @@ -10598,6 +11404,7 @@ static void generate_job(server *s, job *j) { if (completion > last_decode_log_completion) { log_decode_progress(j->req.kind, prompt_tokens, completion, + max_tokens, responses_protocol, j->req.has_tools, thinking.inside, @@ -10968,6 +11775,7 @@ typedef struct { char path[256]; char *body; size_t body_len; + bool x_c0nr_motif; /* X-C0NR-MOTIF: on */ } http_request; static void http_request_free(http_request *r) { @@ -11002,6 +11810,23 @@ static long content_length(const char *h, size_t n) { return 0; } +static bool header_x_c0nr_motif(const char *h, size_t n) { + const char *p = h, *end = h + n; + while (p < end) { + const char *line = p; + while (p < end && *p != '\n') p++; + size_t len = (size_t)(p - line); + if (len && line[len - 1] == '\r') len--; + if (len >= 14 && strncasecmp(line, "X-C0NR-MOTIF:", 13) == 0) { + const char *v = line + 13; + while (v < line + len && isspace((unsigned char)*v)) v++; + return strncasecmp(v, "on", 2) == 0; + } + if (p < end) p++; + } + return false; +} + static bool read_http_request(int fd, http_request *r) { buf b = {0}; ssize_t hend = -1; @@ -11029,6 +11854,8 @@ static bool read_http_request(int fd, http_request *r) { char *q = strchr(r->path, '?'); if (q) *q = '\0'; + r->x_c0nr_motif = header_x_c0nr_motif(b.ptr, (size_t)hend); + long clen = content_length(b.ptr, (size_t)hend); if (clen < 0 || (size_t)clen > max_body) goto fail; while (b.len < (size_t)hend + (size_t)clen) { @@ -11120,6 +11947,36 @@ static bool send_models(server *s, int fd) { return ok; } +static bool fd_path_json(buf *b, const char *key, int fd) { +#ifdef F_GETPATH + char pathbuf[PATH_MAX]; + if (fcntl(fd, F_GETPATH, pathbuf) == 0 && pathbuf[0]) { + json_escape(b, key); + buf_putc(b, ':'); + json_escape(b, pathbuf); + return true; + } +#else + (void)fd; +#endif + return false; +} + +static bool send_ds4_runtime(server *s, int fd) { + buf b = {0}; + buf_puts(&b, "{\"ok\":true,\"server\":\"ds4-server\",\"model\":"); + json_escape(&b, ds4_engine_model_name(s->engine)); + buf_printf(&b, ",\"ctx\":%d,\"logs\":{", ds4_session_ctx(s->session)); + bool any = fd_path_json(&b, "stdout", STDOUT_FILENO); + if (any) buf_putc(&b, ','); + if (fd_path_json(&b, "stderr", STDERR_FILENO)) any = true; + (void)any; + buf_puts(&b, "}}\n"); + bool ok = http_response(fd, s->enable_cors, 200, "application/json", b.ptr); + buf_free(&b); + return ok; +} + static void client_done(server *s) { pthread_mutex_lock(&s->mu); if (s->clients > 0) s->clients--; @@ -11152,6 +12009,11 @@ static void *client_main(void *arg) { http_request_free(&hr); goto done; } + if (!strcmp(hr.method, "GET") && !strcmp(hr.path, "/__ds4/runtime")) { + send_ds4_runtime(s, fd); + http_request_free(&hr); + goto done; + } const char *model_path_prefix = "/v1/models/"; const size_t model_path_prefix_len = strlen(model_path_prefix); if (!strcmp(hr.method, "GET") && @@ -11185,6 +12047,8 @@ static void *client_main(void *arg) { goto done; } if (ok) req.raw_body = xstrndup(hr.body, hr.body_len); + if (ok && !strcmp(hr.path, "/v1/chat/completions")) + req.skip_decode_cache_updates = hr.x_c0nr_motif; http_request_free(&hr); if (!ok) { http_error(fd, s->enable_cors, 400, err); @@ -11283,7 +12147,12 @@ typedef struct { const char *kv_disk_dir; uint64_t kv_disk_space_mb; kv_cache_options kv_cache; + int prefix_lookback_tokens; + int prefix_lookback_min_rewind_tokens; + int point_checkpoint_space_mb; bool kv_cache_reject_different_quant; + bool cache_verbose; + bool point_checkpoint_replay_boundary_token; bool disable_exact_dsml_tool_replay; int tool_memory_max_ids; bool enable_cors; @@ -11345,6 +12214,8 @@ static void server_close_resources(server *s) { s->trace = NULL; } kv_cache_close(&s->kv); + prefix_lookback_free(&s->prefix_lookback); + point_checkpoint_cache_free(&s->point_checkpoint); tool_memory_free(&s->tool_mem); live_tool_state_free(&s->responses_live); live_tool_state_free(&s->anthropic_live); @@ -11382,6 +12253,25 @@ static ds4_backend default_server_backend(void) { #endif } +static void validate_checkpoint_geometry(const server_config *c) { + if (POINT_CHECKPOINT_INTERVAL_TOKENS != 0) { + if (POINT_CHECKPOINT_INTERVAL_TOKENS < POINT_CHECKPOINT_MIN_INTERVAL_TOKENS || + POINT_CHECKPOINT_INTERVAL_TOKENS % POINT_CHECKPOINT_ALIGNMENT_TOKENS != 0) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: POINT_CHECKPOINT_INTERVAL_TOKENS must be 0 (disabled) or a multiple of %d and >= %d", + POINT_CHECKPOINT_ALIGNMENT_TOKENS, + POINT_CHECKPOINT_MIN_INTERVAL_TOKENS); + exit(2); + } + return; + } + if (c && c->prefix_lookback_tokens > 0) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: --kv-prefix-lookback-tokens requires point checkpoints; POINT_CHECKPOINT_INTERVAL_TOKENS=0 disables tail cache"); + exit(2); + } +} + static server_config parse_options(int argc, char **argv) { server_config c = { .engine = { @@ -11395,6 +12285,8 @@ static server_config parse_options(int argc, char **argv) { .ctx_size = 32768, .default_tokens = 393216, .tool_memory_max_ids = DS4_TOOL_MEMORY_DEFAULT_MAX_IDS, + .prefix_lookback_min_rewind_tokens = PREFIX_LOOKBACK_DEFAULT_MIN_REWIND, + .point_checkpoint_space_mb = POINT_CHECKPOINT_DEFAULT_SPACE_MB, }; c.kv_cache = kv_cache_default_options(); @@ -11464,6 +12356,21 @@ static server_config parse_options(int argc, char **argv) { c.kv_cache.boundary_align_tokens = parse_nonneg_int_arg(need_arg(&i, argc, argv, arg), arg); } else if (!strcmp(arg, "--kv-cache-reject-different-quant")) { c.kv_cache_reject_different_quant = true; + } else if (!strcmp(arg, "--kv-prefix-lookback-tokens")) { + c.prefix_lookback_tokens = parse_nonneg_int_arg(need_arg(&i, argc, argv, arg), arg); + } else if (!strcmp(arg, "--kv-prefix-lookback-min-rewind-tokens") || + !strcmp(arg, "--kv-prefix-lookback-align-tokens")) { + c.prefix_lookback_min_rewind_tokens = parse_nonneg_int_arg(need_arg(&i, argc, argv, arg), arg); + } else if (!strcmp(arg, "--kv-prefix-lookback-entries")) { + (void)need_arg(&i, argc, argv, arg); + server_log(DS4_LOG_WARNING, + "ds4-server: --kv-prefix-lookback-entries is ignored; prefix lookback uses the live SWA tail"); + } else if (!strcmp(arg, "--kv-point-checkpoint-space-mb")) { + c.point_checkpoint_space_mb = parse_nonneg_int_arg(need_arg(&i, argc, argv, arg), arg); + } else if (!strcmp(arg, "--kv-point-checkpoint-replay-boundary-token")) { + c.point_checkpoint_replay_boundary_token = true; + } else if (!strcmp(arg, "--kv-cache-verbose")) { + c.cache_verbose = true; } else if (!strcmp(arg, "--disable-exact-dsml-tool-replay")) { c.disable_exact_dsml_tool_replay = true; } else if (!strcmp(arg, "--tool-memory-max-ids")) { @@ -11518,6 +12425,7 @@ static server_config parse_options(int argc, char **argv) { server_log(DS4_LOG_DEFAULT, "ds4-server: %s", dist_err); exit(2); } + validate_checkpoint_geometry(&c); return c; } @@ -11565,12 +12473,29 @@ int main(int argc, char **argv) { s.session = session; s.default_tokens = cfg.default_tokens; s.disable_exact_dsml_tool_replay = cfg.disable_exact_dsml_tool_replay; + s.cache_verbose = cfg.cache_verbose; + s.point_checkpoint_replay_boundary_token = cfg.point_checkpoint_replay_boundary_token; s.tool_mem.max_entries = cfg.tool_memory_max_ids; s.enable_cors = cfg.enable_cors; + s.prefix_lookback.lookback_tokens = cfg.prefix_lookback_tokens; + s.prefix_lookback.min_rewind_tokens = cfg.prefix_lookback_min_rewind_tokens > 0 ? + cfg.prefix_lookback_min_rewind_tokens : PREFIX_LOOKBACK_DEFAULT_MIN_REWIND; + s.prefix_lookback.tail_rows = s.prefix_lookback.lookback_tokens > 0 ? + s.prefix_lookback.lookback_tokens + 2 * s.prefix_lookback.min_rewind_tokens : 0; + s.point_checkpoint.interval = POINT_CHECKPOINT_INTERVAL_TOKENS; + s.point_checkpoint.max_bytes = cfg.point_checkpoint_space_mb > 0 ? + (uint64_t)cfg.point_checkpoint_space_mb * 1024ULL * 1024ULL : 0; if (cfg.kv_disk_dir) { kv_cache_open(&s.kv, cfg.kv_disk_dir, cfg.kv_disk_space_mb, cfg.kv_cache_reject_different_quant, cfg.kv_cache); } + if (s.prefix_lookback.lookback_tokens > 0) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: live prefix lookback enabled tokens=%d min_rewind=%d tail_rows=%d", + s.prefix_lookback.lookback_tokens, + s.prefix_lookback.min_rewind_tokens, + s.prefix_lookback.tail_rows); + } if (s.disable_exact_dsml_tool_replay) { server_log(DS4_LOG_DEFAULT, "ds4-server: exact DSML tool replay disabled; tool history uses canonical JSON rendering"); @@ -15456,6 +16381,308 @@ static void test_thinking_canonical_non_thinking_mode_noop(void) { chat_msgs_free(&msgs); } +static ds4_tokens test_tokens_range(int n, int base) { + ds4_tokens t = {0}; + for (int i = 0; i < n; i++) ds4_tokens_push(&t, base + i); + return t; +} + +static point_checkpoint_entry test_checkpoint_entry(int point, int base, size_t bytes) { + point_checkpoint_entry e = {0}; + e.point = point; + e.valid = true; + e.swa_shard.ptr = malloc(bytes); + e.swa_shard.len = bytes; + e.swa_shard.cap = bytes; + memset(e.swa_shard.ptr, base & 0xff, bytes); + e.tokens = test_tokens_range(point, base); + sha1_token_prefix_hex(&e.tokens, point, e.token_prefix_hash); + memcpy(e.verify_hash, e.token_prefix_hash, sizeof(e.verify_hash)); + return e; +} + +static void test_prefix_lookback_ring_keeps_window_points(void) { + prefix_lookback_cache c = {.lookback_tokens = 4096, .min_rewind_tokens = 128}; + for (int i = 0; i < PREFIX_LOOKBACK_RING_SLOTS; i++) { + c.entries[i] = test_checkpoint_entry((i + 1) * 128, 1000 + i * 1000, 4096); + } + prefix_lookback_trim_window(&c, PREFIX_LOOKBACK_RING_SLOTS * 128); + TEST_ASSERT(prefix_lookback_valid_count(&c) == 17); + TEST_ASSERT(!c.entries[0].valid); + for (int i = 1; i < PREFIX_LOOKBACK_RING_SLOTS; i++) TEST_ASSERT(c.entries[i].valid); + prefix_lookback_free(&c); +} + +static void test_prefix_lookback_reset_metadata_preserves_buffers(void) { + prefix_lookback_cache c = {.lookback_tokens = 4096, .min_rewind_tokens = 128}; + c.entries[0] = test_checkpoint_entry(128, 1000, 4096); + c.entries[1] = test_checkpoint_entry(256, 2000, 8192); + c.entries[2] = test_checkpoint_entry(384, 3000, 16384); + c.last_store_point = 384; + uint8_t *p1 = c.entries[1].swa_shard.ptr; + uint8_t *p2 = c.entries[2].swa_shard.ptr; + prefix_lookback_reset_metadata(&c); + TEST_ASSERT(!c.entries[0].valid); + TEST_ASSERT(!c.entries[1].valid); + TEST_ASSERT(!c.entries[2].valid); + TEST_ASSERT(c.entries[1].swa_shard.ptr == p1); + TEST_ASSERT(c.entries[2].swa_shard.ptr == p2); + TEST_ASSERT(c.entries[1].swa_shard.cap == 8192); + TEST_ASSERT(c.entries[2].swa_shard.cap == 16384); + TEST_ASSERT(c.entries[1].swa_shard.len == 0); + TEST_ASSERT(c.entries[2].tokens.len == 0); + TEST_ASSERT(prefix_lookback_valid_count(&c) == 0); + TEST_ASSERT(c.last_store_point == 0); + prefix_lookback_free(&c); +} + +static void test_prefix_lookback_pointwise_exact_match_only(void) { + point_checkpoint_entry e = test_checkpoint_entry(256, 1000, 4096); + ds4_tokens prompt = test_tokens_range(384, 1000); + TEST_ASSERT(point_checkpoint_first_token_mismatch(&e, &prompt) == 256); + prompt.v[255] = 999999; + TEST_ASSERT(point_checkpoint_first_token_mismatch(&e, &prompt) == 255); + point_checkpoint_entry_free(&e); + ds4_tokens_free(&prompt); +} + +static void test_prefix_lookback_disabled_by_default(void) { + prefix_lookback_cache c = {0}; + server s = {0}; + s.prefix_lookback = c; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(384, 1000); + ds4_tokens effective = {0}; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_accepts_small_prompt_tail(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(384, 1000); + ds4_tokens effective = {0}; + prompt.v[128] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_ignores_generated_tail_when_prompt_tail_matches(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(480, 1000); + ds4_tokens effective = {0}; + prompt.v[384] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_accepts_min_rewind(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(384, 1000); + ds4_tokens effective = {0}; + prompt.v[128] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_accepts_unaligned_point_in_window(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(4300, 1000); + ds4_tokens effective = {0}; + prompt.v[2500] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + + + +static void test_prefix_lookback_rejects_outside_window(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(4300, 1000); + ds4_tokens effective = {0}; + prompt.v[1900] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_rejects_full_prefix(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(384, 1000); + ds4_tokens effective = {0}; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +static void test_prefix_lookback_options_do_not_enable_disk_cache(void) { + char *argv[] = { + "ds4-server", + "--kv-prefix-lookback-tokens", "4096", + "--kv-prefix-lookback-min-rewind-tokens", "128", + }; + server_config cfg = parse_options((int)(sizeof(argv) / sizeof(argv[0])), argv); + TEST_ASSERT(cfg.prefix_lookback_tokens == 4096); + TEST_ASSERT(cfg.prefix_lookback_min_rewind_tokens == 128); + TEST_ASSERT(cfg.kv_disk_dir == NULL); +} + +static void test_prefix_lookback_rejects_below_min_rewind(void) { + server s = {0}; + s.prefix_lookback = (prefix_lookback_cache){.lookback_tokens = 2048, .min_rewind_tokens = 128}; + char err[32] = {0}; + ds4_tokens prompt = test_tokens_range(384, 1000); + ds4_tokens effective = {0}; + prompt.v[200] = 999999; + TEST_ASSERT(prefix_lookback_try_load(&s, &prompt, &effective, err, sizeof(err)) == 0); + TEST_ASSERT(effective.len == 0); + ds4_tokens_free(&prompt); + ds4_tokens_free(&effective); +} + +/* Phase-4 point checkpoint unit tests */ + +static void test_point_checkpoint_disabled_by_default(void) { + /* Default interval is 4096; test that 0 explicitly means disabled */ + point_checkpoint_cache c = {0}; + c.interval = 0; + /* capture_maybe should be a no-op when interval==0; tested via smoke */ + TEST_ASSERT(c.interval == 0); + point_checkpoint_cache_free(&c); +} + +static void test_point_checkpoint_rejects_without_entries(void) { + /* With interval set but no entries captured, count should be 0 */ + point_checkpoint_cache c = {.interval = 4096, .max_bytes = 0}; + TEST_ASSERT(c.count == 0); + TEST_ASSERT(c.entries == NULL); + point_checkpoint_cache_free(&c); +} + +static void test_point_checkpoint_eviction(void) { + /* Test budget enforcement: insert 3 entries with 1 MiB cap → only last survives */ + point_checkpoint_cache c = {.interval = 4096, .max_bytes = 1048576}; /* 1 MiB */ + ds4_session_swa_shard s1 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; /* 512K */ + ds4_session_swa_shard s2 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; + ds4_session_swa_shard s3 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; + memset(s1.ptr, 1, s1.cap); + memset(s2.ptr, 2, s2.cap); + memset(s3.ptr, 3, s3.cap); + point_checkpoint_entry e1 = {.point = 4096, .swa_shard = s1}; + point_checkpoint_entry e2 = {.point = 8192, .swa_shard = s2}; + point_checkpoint_entry e3 = {.point = 12288, .swa_shard = s3}; + TEST_ASSERT(point_checkpoint_insert(&c, &e1)); + TEST_ASSERT(c.count == 1); + TEST_ASSERT(point_checkpoint_insert(&c, &e2)); + /* 512K+512K=1MiB, fits budget */ + TEST_ASSERT(c.count == 2); + TEST_ASSERT(point_checkpoint_insert(&c, &e3)); + /* Adding 3rd 512K exceeds 1MiB → evicts e1 (oldest) */ + TEST_ASSERT(c.count == 2); + TEST_ASSERT(c.entries[0].point == 8192); + TEST_ASSERT(c.entries[1].point == 12288); + point_checkpoint_cache_free(&c); +} + +static void test_point_checkpoint_semantic_invalidate_preserves_buffers(void) { + point_checkpoint_cache c = {.interval = 4096, .max_bytes = 0}; + ds4_session_swa_shard s1 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; + ds4_session_swa_shard s2 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; + ds4_session_swa_shard s3 = {.ptr = malloc(524288), .len = 524288, .cap = 524288}; + memset(s1.ptr, 1, s1.cap); + memset(s2.ptr, 2, s2.cap); + memset(s3.ptr, 3, s3.cap); + point_checkpoint_entry e1 = {.point = 4096, .valid = true, .swa_shard = s1}; + point_checkpoint_entry e2 = {.point = 8192, .valid = true, .swa_shard = s2}; + point_checkpoint_entry e3 = {.point = 12288, .valid = true, .swa_shard = s3}; + TEST_ASSERT(point_checkpoint_insert(&c, &e1)); + TEST_ASSERT(point_checkpoint_insert(&c, &e2)); + TEST_ASSERT(point_checkpoint_insert(&c, &e3)); + uint8_t *p2 = c.entries[1].swa_shard.ptr; + uint8_t *p3 = c.entries[2].swa_shard.ptr; + uint64_t total = c.total_bytes; + + point_checkpoint_invalidate_after(&c, 4096); + TEST_ASSERT(c.count == 3); + TEST_ASSERT(c.total_bytes == total); + TEST_ASSERT(c.entries[0].valid); + TEST_ASSERT(!c.entries[1].valid); + TEST_ASSERT(!c.entries[2].valid); + TEST_ASSERT(c.entries[1].swa_shard.ptr == p2); + TEST_ASSERT(c.entries[2].swa_shard.ptr == p3); + TEST_ASSERT(c.entries[1].swa_shard.cap == 524288); + TEST_ASSERT(c.entries[2].swa_shard.cap == 524288); + TEST_ASSERT(c.entries[1].swa_shard.len == 0); + TEST_ASSERT(c.entries[2].tokens.len == 0); + TEST_ASSERT(c.entries[1].token_prefix_hash[0] == '\0'); + TEST_ASSERT(c.entries[2].verify_hash[0] == '\0'); + point_checkpoint_cache_free(&c); +} + +static void test_point_checkpoint_cli_parsing(void) { + char *argv[] = { + "ds4-server", + "--kv-point-checkpoint-space-mb", "8192", + }; + server_config cfg = parse_options((int)(sizeof(argv) / sizeof(argv[0])), argv); + TEST_ASSERT(cfg.point_checkpoint_space_mb == 8192); + TEST_ASSERT(cfg.kv_disk_dir == NULL); +} + +static void test_point_checkpoint_compile_time_geometry(void) { + TEST_ASSERT(POINT_CHECKPOINT_INTERVAL_TOKENS == 0 || + POINT_CHECKPOINT_INTERVAL_TOKENS >= POINT_CHECKPOINT_MIN_INTERVAL_TOKENS); + TEST_ASSERT(POINT_CHECKPOINT_INTERVAL_TOKENS == 0 || + POINT_CHECKPOINT_INTERVAL_TOKENS % POINT_CHECKPOINT_ALIGNMENT_TOKENS == 0); +} + +static void test_point_checkpoint_free_empty(void) { + point_checkpoint_cache c = {0}; + point_checkpoint_cache_free(&c); + TEST_ASSERT(c.count == 0); + TEST_ASSERT(c.entries == NULL); +} + +static void test_point_checkpoint_sha1_token_prefix(void) { + /* Verify SHA1 of known token IDs produces deterministic output */ + ds4_tokens tokens = {0}; + ds4_tokens_push(&tokens, 42); + ds4_tokens_push(&tokens, 100); + ds4_tokens_push(&tokens, 2048); + char hash1[41] = {0}, hash2[41] = {0}; + sha1_token_prefix_hex(&tokens, 3, hash1); + sha1_token_prefix_hex(&tokens, 3, hash2); + TEST_ASSERT(strncmp(hash1, hash2, 40) == 0); + /* Different prefix length = different hash */ + char hash_partial[41] = {0}; + sha1_token_prefix_hex(&tokens, 2, hash_partial); + TEST_ASSERT(strncmp(hash1, hash_partial, 40) != 0); + ds4_tokens_free(&tokens); +} + static void ds4_server_unit_tests_run(void) { test_request_defaults_use_min_p_filtering(); test_reasoning_effort_mapping(); @@ -15555,6 +16782,26 @@ static void ds4_server_unit_tests_run(void) { test_kv_cache_eviction_score_decays_stale_hits(); test_kv_cache_eviction_decayed_hits_tie_break_by_age(); test_kv_cache_eviction_keeps_aligned_continued_frontiers(); + test_prefix_lookback_ring_keeps_window_points(); + test_prefix_lookback_reset_metadata_preserves_buffers(); + test_prefix_lookback_pointwise_exact_match_only(); + test_prefix_lookback_disabled_by_default(); + test_prefix_lookback_accepts_small_prompt_tail(); + test_prefix_lookback_ignores_generated_tail_when_prompt_tail_matches(); + test_prefix_lookback_accepts_min_rewind(); + test_prefix_lookback_accepts_unaligned_point_in_window(); + test_prefix_lookback_rejects_outside_window(); + test_prefix_lookback_rejects_full_prefix(); + test_prefix_lookback_options_do_not_enable_disk_cache(); + test_prefix_lookback_rejects_below_min_rewind(); + test_point_checkpoint_disabled_by_default(); + test_point_checkpoint_rejects_without_entries(); + test_point_checkpoint_eviction(); + test_point_checkpoint_semantic_invalidate_preserves_buffers(); + test_point_checkpoint_cli_parsing(); + test_point_checkpoint_compile_time_geometry(); + test_point_checkpoint_free_empty(); + test_point_checkpoint_sha1_token_prefix(); } #ifndef DS4_SERVER_TEST_NO_MAIN diff --git a/ds4c-point-checkpoint.png b/ds4c-point-checkpoint.png new file mode 100644 index 000000000..b823e9424 Binary files /dev/null and b/ds4c-point-checkpoint.png differ diff --git a/ds4c-tail-cache.png b/ds4c-tail-cache.png new file mode 100644 index 000000000..c36162650 Binary files /dev/null and b/ds4c-tail-cache.png differ