From d30061e95afb75de2aca66d77056b92656678482 Mon Sep 17 00:00:00 2001 From: Omer Celik Date: Sun, 7 Jun 2026 22:57:27 +0100 Subject: [PATCH] agent: add ACP stdio mode --- Makefile | 29 +- README.md | 4 + ds4_acp.c | 507 ++++++++++++++++++++++++++++++++ ds4_acp.h | 36 +++ ds4_agent.c | 735 ++++++++++++++++++++++++++++++++++++++++++++++- ds4_help.c | 1 + tests/ds4_test.c | 69 +++++ 7 files changed, 1358 insertions(+), 23 deletions(-) create mode 100644 ds4_acp.c create mode 100644 ds4_acp.h diff --git a/Makefile b/Makefile index 72ed73bee..ef6ce1e7f 100644 --- a/Makefile +++ b/Makefile @@ -57,15 +57,15 @@ ds4-bench: ds4_bench.o ds4_help.o $(CORE_OBJS) ds4-eval: ds4_eval.o ds4_help.o $(CORE_OBJS) $(CC) $(CFLAGS) -o $@ ds4_eval.o ds4_help.o $(CORE_OBJS) $(METAL_LDLIBS) -ds4-agent: ds4_agent.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) - $(CC) $(CFLAGS) -o $@ ds4_agent.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) $(METAL_LDLIBS) +ds4-agent: ds4_agent.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) + $(CC) $(CFLAGS) -o $@ ds4_agent.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) $(METAL_LDLIBS) -cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS) +cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS) $(CC) $(CFLAGS) -o ds4 ds4_cli_cpu.o ds4_help.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_help.o ds4_kvstore.o rax.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-bench ds4_bench_cpu.o ds4_help.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-eval ds4_eval_cpu.o ds4_help.o $(CPU_CORE_OBJS) $(LDLIBS) - $(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) + $(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) cuda-regression: @echo "cuda-regression requires a CUDA build" @@ -107,15 +107,15 @@ ds4-bench: ds4_bench.o ds4_help.o $(CORE_OBJS) ds4-eval: ds4_eval.o ds4_help.o $(CORE_OBJS) $(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS) -ds4-agent: ds4_agent.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) +ds4-agent: ds4_agent.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CORE_OBJS) $(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS) -cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS) +cpu: ds4_cli_cpu.o ds4_server_cpu.o ds4_bench_cpu.o ds4_eval_cpu.o ds4_agent_cpu.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o rax.o $(CPU_CORE_OBJS) $(CC) $(CFLAGS) -o ds4 ds4_cli_cpu.o ds4_help.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-server ds4_server_cpu.o ds4_help.o ds4_kvstore.o rax.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-bench ds4_bench_cpu.o ds4_help.o $(CPU_CORE_OBJS) $(LDLIBS) $(CC) $(CFLAGS) -o ds4-eval ds4_eval_cpu.o ds4_help.o $(CPU_CORE_OBJS) $(LDLIBS) - $(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) + $(CC) $(CFLAGS) -o ds4-agent ds4_agent_cpu.o ds4_acp.o ds4_help.o ds4_web.o ds4_kvstore.o linenoise.o $(CPU_CORE_OBJS) $(LDLIBS) cuda-regression: tests/cuda_long_context_smoke ./tests/cuda_long_context_smoke @@ -145,16 +145,19 @@ ds4_bench.o: ds4_bench.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_eval.o: ds4_eval.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h $(CC) $(CFLAGS) -c -o $@ ds4_eval.c -ds4_agent.o: ds4_agent.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h ds4_web.h linenoise.h +ds4_agent.o: ds4_agent.c ds4.h ds4_acp.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h ds4_web.h linenoise.h $(CC) $(CFLAGS) -c -o $@ ds4_agent.c +ds4_acp.o: ds4_acp.c ds4_acp.h + $(CC) $(CFLAGS) -c -o $@ ds4_acp.c + ds4_web.o: ds4_web.c ds4_web.h $(CC) $(CFLAGS) -c -o $@ ds4_web.c ds4_kvstore.o: ds4_kvstore.c ds4_kvstore.h ds4.h ds4_ssd.h $(CC) $(CFLAGS) -c -o $@ ds4_kvstore.c -ds4_test.o: tests/ds4_test.c ds4_server.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h rax.h +ds4_test.o: tests/ds4_test.c ds4_server.c ds4_acp.h ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h rax.h $(CC) $(CFLAGS) -Wno-unused-function -c -o $@ tests/ds4_test.c tests/cuda_long_context_smoke.o: tests/cuda_long_context_smoke.c ds4_gpu.h @@ -181,7 +184,7 @@ ds4_bench_cpu.o: ds4_bench.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_eval_cpu.o: ds4_eval.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h $(CC) $(CFLAGS) -DDS4_NO_GPU -c -o $@ ds4_eval.c -ds4_agent_cpu.o: ds4_agent.c ds4.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h ds4_web.h linenoise.h +ds4_agent_cpu.o: ds4_agent.c ds4.h ds4_acp.h ds4_ssd.h ds4_distributed.h ds4_help.h ds4_kvstore.h ds4_web.h linenoise.h $(CC) $(CFLAGS) -DDS4_NO_GPU -c -o $@ ds4_agent.c ds4_metal.o: ds4_metal.m ds4_gpu.h $(METAL_SRCS) @@ -193,11 +196,11 @@ ds4_cuda.o: ds4_cuda.cu ds4_gpu.h ds4_iq2_tables_cuda.inc tests/cuda_long_context_smoke: tests/cuda_long_context_smoke.o ds4_cuda.o $(NVCC) $(NVCCFLAGS) -o $@ $^ $(CUDA_LDLIBS) -ds4_test: ds4_test.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) +ds4_test: ds4_test.o ds4_acp.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) ifeq ($(UNAME_S),Darwin) - $(CC) $(CFLAGS) -o $@ ds4_test.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) $(METAL_LDLIBS) + $(CC) $(CFLAGS) -o $@ ds4_test.o ds4_acp.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) $(METAL_LDLIBS) else - $(NVCC) $(NVCCFLAGS) -o $@ ds4_test.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) $(CUDA_LDLIBS) + $(NVCC) $(NVCCFLAGS) -o $@ ds4_test.o ds4_acp.o ds4_help.o ds4_kvstore.o rax.o $(CORE_OBJS) $(CUDA_LDLIBS) endif test: ds4_test ds4-eval q4k-dot-test diff --git a/README.md b/README.md index fcd259385..d42438f7f 100644 --- a/README.md +++ b/README.md @@ -587,6 +587,10 @@ stripped session rebuilds the KV cache by prefilling the saved text. Use `--chdir /path/to/ds4` when launching `ds4-agent` from another directory, so relative runtime files such as `metal/*.metal` resolve from the project tree. +`ds4-agent --acp` runs the agent over the Agent Client Protocol using +newline-delimited JSON-RPC on stdin/stdout. In this mode stdout is reserved for +protocol messages; diagnostics continue to go to stderr. + However while the system already works, there is a lot of work to do in order to make it ready for prime time. When finally the agent will reach the wanted shape, we will *likely* split the server and the client creating a stateful diff --git a/ds4_acp.c b/ds4_acp.c new file mode 100644 index 000000000..7c62ecb1c --- /dev/null +++ b/ds4_acp.c @@ -0,0 +1,507 @@ +#include "ds4_acp.h" + +#include +#include +#include +#include +#include + +#define DS4_ACP_JSON_MAX_NESTING 256 + +typedef struct { + char *ptr; + size_t len; + size_t cap; +} acp_buf; + +static void acp_oom(const char *what) { + perror(what); + exit(1); +} + +static void acp_buf_append(acp_buf *b, const char *s, size_t n) { + if (!n) return; + if (b->len + n + 1 > b->cap) { + size_t cap = b->cap ? b->cap * 2 : 256; + while (cap < b->len + n + 1) cap *= 2; + char *p = realloc(b->ptr, cap); + if (!p) acp_oom("ds4-acp: realloc"); + b->ptr = p; + b->cap = cap; + } + memcpy(b->ptr + b->len, s, n); + b->len += n; + b->ptr[b->len] = '\0'; +} + +static void acp_buf_puts(acp_buf *b, const char *s) { + acp_buf_append(b, s, strlen(s)); +} + +static void acp_buf_putc(acp_buf *b, char c) { + acp_buf_append(b, &c, 1); +} + +static char *acp_buf_take(acp_buf *b) { + if (!b->ptr) { + char *p = malloc(1); + if (!p) acp_oom("ds4-acp: malloc"); + p[0] = '\0'; + return p; + } + char *p = b->ptr; + memset(b, 0, sizeof(*b)); + return p; +} + +static char *acp_xstrndup(const char *s, size_t n) { + char *p = malloc(n + 1); + if (!p) acp_oom("ds4-acp: malloc"); + memcpy(p, s, n); + p[n] = '\0'; + return p; +} + +static bool acp_utf8_valid(const char *s, size_t n) { + size_t i = 0; + while (i < n) { + unsigned char c = (unsigned char)s[i++]; + if (c < 0x80) continue; + int need = 0; + if (c >= 0xc2 && c <= 0xdf) need = 1; + else if (c >= 0xe0 && c <= 0xef) need = 2; + else if (c >= 0xf0 && c <= 0xf4) need = 3; + else return false; + if (i + (size_t)need > n) return false; + unsigned char c1 = (unsigned char)s[i]; + if (c == 0xe0 && c1 < 0xa0) return false; + if (c == 0xed && c1 >= 0xa0) return false; + if (c == 0xf0 && c1 < 0x90) return false; + if (c == 0xf4 && c1 >= 0x90) return false; + for (int j = 0; j < need; j++) { + unsigned char cc = (unsigned char)s[i + (size_t)j]; + if ((cc & 0xc0) != 0x80) return false; + } + i += (size_t)need; + } + return true; +} + +void ds4_acp_json_ws(const char **p) { + while (**p && isspace((unsigned char)**p)) (*p)++; +} + +static bool json_lit(const char **p, const char *lit) { + size_t n = strlen(lit); + if (strncmp(*p, lit, n) != 0) return false; + *p += n; + return true; +} + +static int json_hex(char c) { + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'a' && c <= 'f') return c - 'a' + 10; + if (c >= 'A' && c <= 'F') return c - 'A' + 10; + return -1; +} + +static bool json_u16(const char **p, uint32_t *out) { + if ((*p)[0] != '\\' || (*p)[1] != 'u') return false; + uint32_t cp = 0; + for (int i = 0; i < 4; i++) { + int h = json_hex((*p)[2 + i]); + if (h < 0) return false; + cp = (cp << 4) | (uint32_t)h; + } + *p += 6; + *out = cp; + return true; +} + +static void json_utf8(acp_buf *b, uint32_t cp) { + char tmp[4]; + if (cp <= 0x7f) { + tmp[0] = (char)cp; + acp_buf_append(b, tmp, 1); + } else if (cp <= 0x7ff) { + tmp[0] = (char)(0xc0 | (cp >> 6)); + tmp[1] = (char)(0x80 | (cp & 0x3f)); + acp_buf_append(b, tmp, 2); + } else if (cp <= 0xffff) { + tmp[0] = (char)(0xe0 | (cp >> 12)); + tmp[1] = (char)(0x80 | ((cp >> 6) & 0x3f)); + tmp[2] = (char)(0x80 | (cp & 0x3f)); + acp_buf_append(b, tmp, 3); + } else { + tmp[0] = (char)(0xf0 | (cp >> 18)); + tmp[1] = (char)(0x80 | ((cp >> 12) & 0x3f)); + tmp[2] = (char)(0x80 | ((cp >> 6) & 0x3f)); + tmp[3] = (char)(0x80 | (cp & 0x3f)); + acp_buf_append(b, tmp, 4); + } +} + +bool ds4_acp_json_string(const char **p, char **out) { + ds4_acp_json_ws(p); + if (**p != '"') return false; + (*p)++; + acp_buf b = {0}; + while (**p && **p != '"') { + unsigned char c = (unsigned char)**p; + if (c < 0x20) goto fail; + if (c == '\\') { + (*p)++; + switch (**p) { + case '"': acp_buf_putc(&b, '"'); (*p)++; break; + case '\\': acp_buf_putc(&b, '\\'); (*p)++; break; + case '/': acp_buf_putc(&b, '/'); (*p)++; break; + case 'b': acp_buf_putc(&b, '\b'); (*p)++; break; + case 'f': acp_buf_putc(&b, '\f'); (*p)++; break; + case 'n': acp_buf_putc(&b, '\n'); (*p)++; break; + case 'r': acp_buf_putc(&b, '\r'); (*p)++; break; + case 't': acp_buf_putc(&b, '\t'); (*p)++; break; + case 'u': { + (*p)--; + uint32_t cp, lo = 0; + if (!json_u16(p, &cp)) goto fail; + if (cp >= 0xd800 && cp <= 0xdbff) { + const char *q = *p; + if (!json_u16(&q, &lo) || + lo < 0xdc00 || lo > 0xdfff) + goto fail; + *p = q; + cp = 0x10000 + (((cp - 0xd800) << 10) | (lo - 0xdc00)); + } else if (cp >= 0xdc00 && cp <= 0xdfff) { + goto fail; + } + json_utf8(&b, cp); + break; + } + default: + goto fail; + } + } else { + acp_buf_putc(&b, (char)c); + (*p)++; + } + } + if (**p != '"') goto fail; + (*p)++; + *out = acp_buf_take(&b); + return true; + +fail: + free(b.ptr); + return false; +} + +static bool json_number(const char **p) { + const char *s = *p; + if (*s == '-') s++; + if (*s == '0') { + s++; + } else if (isdigit((unsigned char)*s)) { + while (isdigit((unsigned char)*s)) s++; + } else { + return false; + } + if (*s == '.') { + s++; + if (!isdigit((unsigned char)*s)) return false; + while (isdigit((unsigned char)*s)) s++; + } + if (*s == 'e' || *s == 'E') { + s++; + if (*s == '+' || *s == '-') s++; + if (!isdigit((unsigned char)*s)) return false; + while (isdigit((unsigned char)*s)) s++; + } + *p = s; + return true; +} + +static bool json_skip_value_depth(const char **p, int depth); + +static bool json_skip_array_depth(const char **p, int depth) { + if (depth >= DS4_ACP_JSON_MAX_NESTING) return false; + ds4_acp_json_ws(p); + if (**p != '[') return false; + (*p)++; + ds4_acp_json_ws(p); + if (**p == ']') { + (*p)++; + return true; + } + while (**p) { + if (!json_skip_value_depth(p, depth + 1)) return false; + ds4_acp_json_ws(p); + if (**p == ']') { + (*p)++; + return true; + } + if (**p != ',') return false; + (*p)++; + } + return false; +} + +static bool json_skip_object_depth(const char **p, int depth) { + if (depth >= DS4_ACP_JSON_MAX_NESTING) return false; + ds4_acp_json_ws(p); + if (**p != '{') return false; + (*p)++; + ds4_acp_json_ws(p); + if (**p == '}') { + (*p)++; + return true; + } + while (**p) { + char *key = NULL; + if (!ds4_acp_json_string(p, &key)) return false; + free(key); + ds4_acp_json_ws(p); + if (**p != ':') return false; + (*p)++; + if (!json_skip_value_depth(p, depth + 1)) return false; + ds4_acp_json_ws(p); + if (**p == '}') { + (*p)++; + return true; + } + if (**p != ',') return false; + (*p)++; + } + return false; +} + +static bool json_skip_value_depth(const char **p, int depth) { + ds4_acp_json_ws(p); + if (**p == '"') { + char *s = NULL; + bool ok = ds4_acp_json_string(p, &s); + free(s); + return ok; + } + if (**p == '{') return json_skip_object_depth(p, depth); + if (**p == '[') return json_skip_array_depth(p, depth); + if (json_lit(p, "true") || json_lit(p, "false") || json_lit(p, "null")) + return true; + return json_number(p); +} + +bool ds4_acp_json_skip_value(const char **p) { + return json_skip_value_depth(p, 0); +} + +bool ds4_acp_json_raw_value(const char **p, char **out) { + ds4_acp_json_ws(p); + const char *start = *p; + if (!ds4_acp_json_skip_value(p)) return false; + *out = acp_xstrndup(start, (size_t)(*p - start)); + return true; +} + +static bool json_id_value(const char *raw) { + const char *p = raw; + ds4_acp_json_ws(&p); + if (*p == '"' || *p == '-' || isdigit((unsigned char)*p)) return true; + return !strcmp(p, "null"); +} + +bool ds4_acp_object_get_raw(const char *json, const char *key, char **out) { + const char *p = json; + ds4_acp_json_ws(&p); + if (*p != '{') return false; + p++; + ds4_acp_json_ws(&p); + while (*p && *p != '}') { + char *k = NULL; + if (!ds4_acp_json_string(&p, &k)) return false; + ds4_acp_json_ws(&p); + if (*p != ':') { + free(k); + return false; + } + p++; + if (!strcmp(k, key)) { + free(k); + return ds4_acp_json_raw_value(&p, out); + } + free(k); + if (!ds4_acp_json_skip_value(&p)) return false; + ds4_acp_json_ws(&p); + if (*p == ',') { + p++; + ds4_acp_json_ws(&p); + } else if (*p != '}') { + return false; + } + } + return false; +} + +bool ds4_acp_object_get_string(const char *json, const char *key, char **out) { + char *raw = NULL; + *out = NULL; + if (!ds4_acp_object_get_raw(json, key, &raw)) return false; + const char *p = raw; + bool ok = ds4_acp_json_string(&p, out); + ds4_acp_json_ws(&p); + if (*p) ok = false; + if (!ok) { + free(*out); + *out = NULL; + } + free(raw); + return ok; +} + +void ds4_acp_request_free(ds4_acp_request *r) { + free(r->id_json); + free(r->method); + free(r->params_json); + memset(r, 0, sizeof(*r)); +} + +ds4_acp_parse_result ds4_acp_parse_request(const char *json, + ds4_acp_request *out, + char *err, size_t err_len) { + memset(out, 0, sizeof(*out)); + const char *p = json; + bool saw_jsonrpc = false; + bool saw_method = false; + bool jsonrpc_ok = false; + ds4_acp_json_ws(&p); + if (*p != '{') { + snprintf(err, err_len, "expected JSON object"); + return DS4_ACP_PARSE_JSON; + } + p++; + ds4_acp_json_ws(&p); + while (*p && *p != '}') { + char *key = NULL; + if (!ds4_acp_json_string(&p, &key)) { + snprintf(err, err_len, "invalid object key"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + ds4_acp_json_ws(&p); + if (*p != ':') { + free(key); + snprintf(err, err_len, "expected ':'"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + p++; + if (!strcmp(key, "jsonrpc")) { + char *v = NULL; + if (!ds4_acp_json_string(&p, &v)) { + free(key); + snprintf(err, err_len, "invalid jsonrpc"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_REQUEST; + } + saw_jsonrpc = true; + jsonrpc_ok = !strcmp(v, "2.0"); + free(v); + } else if (!strcmp(key, "id")) { + free(out->id_json); + out->id_json = NULL; + if (!ds4_acp_json_raw_value(&p, &out->id_json)) { + free(key); + snprintf(err, err_len, "invalid id"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + out->has_id = true; + } else if (!strcmp(key, "method")) { + free(out->method); + out->method = NULL; + if (!ds4_acp_json_string(&p, &out->method)) { + free(key); + snprintf(err, err_len, "invalid method"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_REQUEST; + } + saw_method = true; + } else if (!strcmp(key, "params")) { + free(out->params_json); + out->params_json = NULL; + if (!ds4_acp_json_raw_value(&p, &out->params_json)) { + free(key); + snprintf(err, err_len, "invalid params"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + out->has_params = true; + } else if (!ds4_acp_json_skip_value(&p)) { + free(key); + snprintf(err, err_len, "invalid value"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + free(key); + ds4_acp_json_ws(&p); + if (*p == ',') { + p++; + ds4_acp_json_ws(&p); + } else if (*p != '}') { + snprintf(err, err_len, "expected ',' or '}'"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + } + if (*p != '}') { + snprintf(err, err_len, "unterminated object"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + p++; + ds4_acp_json_ws(&p); + if (*p) { + snprintf(err, err_len, "trailing data"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_JSON; + } + if (!saw_jsonrpc || !jsonrpc_ok || !saw_method || + (out->has_id && !json_id_value(out->id_json))) + { + snprintf(err, err_len, "invalid JSON-RPC request"); + ds4_acp_request_free(out); + return DS4_ACP_PARSE_REQUEST; + } + return DS4_ACP_PARSE_OK; +} + +char *ds4_acp_json_escape(const char *s, size_t n) { + acp_buf b = {0}; + bool valid_utf8 = acp_utf8_valid(s, n); + acp_buf_putc(&b, '"'); + for (size_t i = 0; i < n; i++) { + unsigned char c = (unsigned char)s[i]; + switch (c) { + case '"': acp_buf_puts(&b, "\\\""); break; + case '\\': acp_buf_puts(&b, "\\\\"); break; + case '\b': acp_buf_puts(&b, "\\b"); break; + case '\f': acp_buf_puts(&b, "\\f"); break; + case '\n': acp_buf_puts(&b, "\\n"); break; + case '\r': acp_buf_puts(&b, "\\r"); break; + case '\t': acp_buf_puts(&b, "\\t"); break; + default: + if (c < 0x20) { + char tmp[8]; + snprintf(tmp, sizeof(tmp), "\\u%04x", c); + acp_buf_puts(&b, tmp); + } else if (!valid_utf8 && c >= 0x80) { + char tmp[8]; + snprintf(tmp, sizeof(tmp), "\\u%04x", c); + acp_buf_puts(&b, tmp); + } else { + acp_buf_putc(&b, (char)c); + } + break; + } + } + acp_buf_putc(&b, '"'); + return acp_buf_take(&b); +} diff --git a/ds4_acp.h b/ds4_acp.h new file mode 100644 index 000000000..bcfb4434f --- /dev/null +++ b/ds4_acp.h @@ -0,0 +1,36 @@ +#ifndef DS4_ACP_H +#define DS4_ACP_H + +#include +#include + +typedef enum { + DS4_ACP_PARSE_OK, + DS4_ACP_PARSE_JSON, + DS4_ACP_PARSE_REQUEST, +} ds4_acp_parse_result; + +typedef struct { + bool has_id; + char *id_json; + char *method; + bool has_params; + char *params_json; +} ds4_acp_request; + +void ds4_acp_request_free(ds4_acp_request *r); +ds4_acp_parse_result ds4_acp_parse_request(const char *json, + ds4_acp_request *out, + char *err, size_t err_len); + +void ds4_acp_json_ws(const char **p); +bool ds4_acp_json_string(const char **p, char **out); +bool ds4_acp_json_skip_value(const char **p); +bool ds4_acp_json_raw_value(const char **p, char **out); + +bool ds4_acp_object_get_string(const char *json, const char *key, char **out); +bool ds4_acp_object_get_raw(const char *json, const char *key, char **out); + +char *ds4_acp_json_escape(const char *s, size_t n); + +#endif diff --git a/ds4_agent.c b/ds4_agent.c index bbd62e9de..7fcbe9c37 100644 --- a/ds4_agent.c +++ b/ds4_agent.c @@ -1,4 +1,5 @@ #include "ds4.h" +#include "ds4_acp.h" #include "ds4_distributed.h" #include "ds4_help.h" #include "ds4_kvstore.h" @@ -65,6 +66,7 @@ typedef struct { agent_generation_options gen; const char *chdir_path; bool non_interactive; + bool acp; } agent_config; typedef enum { @@ -95,6 +97,14 @@ typedef struct { typedef struct agent_bash_job agent_bash_job; +typedef enum { + AGENT_TURN_NONE, + AGENT_TURN_END, + AGENT_TURN_MAX_TOKENS, + AGENT_TURN_CANCELLED, + AGENT_TURN_ERROR, +} agent_turn_result; + typedef struct { ds4_engine *engine; agent_config *cfg; @@ -111,12 +121,14 @@ typedef struct { pthread_t thread; pthread_mutex_t mu; pthread_cond_t cond; + pthread_mutex_t acp_mu; int wake_fd[2]; FILE *trace; bool wake_pending; bool stop; bool interrupt; bool initialized; + agent_turn_result last_turn_result; bool save_requested; bool compact_requested; bool power_requested; @@ -145,6 +157,8 @@ typedef struct { bool more_valid; agent_bash_job *bash_jobs; int next_bash_job_id; + char acp_session_id[64]; + int next_acp_tool_call_id; } agent_worker; static unsigned agent_next_prefill_label(void); @@ -318,6 +332,7 @@ static void agent_trace(agent_worker *w, const char *fmt, ...); static void agent_trace_text(agent_worker *w, const char *label, const char *text, size_t len); static void agent_publish_system_status(agent_worker *w, const char *msg); +static void agent_acp_notify_assistant(agent_worker *w, const char *s, size_t n); static int agent_web_confirm(void *privdata, const char *message, char *err, size_t err_len); static void agent_web_log(void *privdata, const char *message); @@ -408,6 +423,20 @@ static char *agent_input_buf_take(agent_input_buf *b) { return p; } +static char *agent_input_buf_take_line(agent_input_buf *b) { + if (!b->ptr) return NULL; + char *nl = memchr(b->ptr, '\n', b->len); + if (!nl) return NULL; + size_t len = (size_t)(nl - b->ptr); + if (len && b->ptr[len - 1] == '\r') len--; + char *line = xstrndup(b->ptr, len); + size_t used = (size_t)(nl - b->ptr) + 1; + memmove(b->ptr, b->ptr + used, b->len - used); + b->len -= used; + b->ptr[b->len] = '\0'; + return line; +} + static void agent_input_buf_free(agent_input_buf *b) { free(b->ptr); memset(b, 0, sizeof(*b)); @@ -557,6 +586,8 @@ static agent_config parse_options(int argc, char **argv) { c.gen.prompt = need_arg(&i, argc, argv, arg); } else if (!strcmp(arg, "--non-interactive")) { c.non_interactive = true; + } else if (!strcmp(arg, "--acp")) { + c.acp = true; } else if (!strcmp(arg, "-sys") || !strcmp(arg, "--system")) { c.gen.system = need_arg(&i, argc, argv, arg); } else if (!strcmp(arg, "--trace")) { @@ -662,6 +693,10 @@ static agent_config parse_options(int argc, char **argv) { if (c.engine.directional_steering_file && !steering_scale_set) c.engine.directional_steering_ffn = 1.0f; + if (c.acp && c.non_interactive) { + fprintf(stderr, "ds4-agent: --acp cannot be combined with --non-interactive\n"); + exit(2); + } char dist_err[256]; if (ds4_dist_prepare_engine_options(&c.engine.distributed, &c.engine, @@ -1073,6 +1108,7 @@ static void agent_wake_locked(agent_worker *w) { * to the terminal, which keeps linenoise redraws serialized in one place. */ static void agent_publish(agent_worker *w, const char *s, size_t n) { if (!n) return; + if (w->cfg && w->cfg->acp) return; pthread_mutex_lock(&w->mu); if (w->out_len + n + 1 > w->out_cap) { size_t cap = w->out_cap ? w->out_cap * 2 : 4096; @@ -1130,11 +1166,93 @@ static void agent_set_error(agent_worker *w, const char *msg) { w->status.state = AGENT_WORKER_ERROR; w->status.prefill_tps = 0.0; w->status.greedy_sampling = false; + w->last_turn_result = AGENT_TURN_ERROR; snprintf(w->status.error, sizeof(w->status.error), "%s", msg ? msg : "unknown error"); agent_wake_locked(w); pthread_mutex_unlock(&w->mu); } +static void agent_set_turn_result(agent_worker *w, agent_turn_result result) { + pthread_mutex_lock(&w->mu); + w->last_turn_result = result; + agent_wake_locked(w); + pthread_mutex_unlock(&w->mu); +} + +static const char *agent_turn_stop_reason(agent_turn_result result) { + switch (result) { + case AGENT_TURN_MAX_TOKENS: return "max_tokens"; + case AGENT_TURN_CANCELLED: return "cancelled"; + case AGENT_TURN_END: return "end_turn"; + case AGENT_TURN_ERROR: + case AGENT_TURN_NONE: + default: return NULL; + } +} + +static void agent_acp_write_all_locked(const char *s, size_t n) { + write_all(STDOUT_FILENO, s, n); +} + +static void agent_acp_puts_locked(const char *s) { + agent_acp_write_all_locked(s, strlen(s)); +} + +static void agent_acp_write_response(agent_worker *w, const char *id_json, + const char *result_json) { + pthread_mutex_lock(&w->acp_mu); + agent_acp_puts_locked("{\"jsonrpc\":\"2.0\",\"id\":"); + agent_acp_write_all_locked(id_json ? id_json : "null", + strlen(id_json ? id_json : "null")); + agent_acp_puts_locked(",\"result\":"); + agent_acp_write_all_locked(result_json ? result_json : "{}", + strlen(result_json ? result_json : "{}")); + agent_acp_puts_locked("}\n"); + pthread_mutex_unlock(&w->acp_mu); +} + +static void agent_acp_write_error(agent_worker *w, const char *id_json, + int code, const char *message) { + char codebuf[32]; + snprintf(codebuf, sizeof(codebuf), "%d", code); + char *qmsg = ds4_acp_json_escape(message ? message : "error", + strlen(message ? message : "error")); + pthread_mutex_lock(&w->acp_mu); + agent_acp_puts_locked("{\"jsonrpc\":\"2.0\",\"id\":"); + agent_acp_write_all_locked(id_json ? id_json : "null", + strlen(id_json ? id_json : "null")); + agent_acp_puts_locked(",\"error\":{\"code\":"); + agent_acp_write_all_locked(codebuf, strlen(codebuf)); + agent_acp_puts_locked(",\"message\":"); + agent_acp_write_all_locked(qmsg, strlen(qmsg)); + agent_acp_puts_locked("}}\n"); + pthread_mutex_unlock(&w->acp_mu); + free(qmsg); +} + +static void agent_acp_write_request_error(agent_worker *w, + const ds4_acp_request *req, + int code, const char *message) { + if (req->has_id) agent_acp_write_error(w, req->id_json, code, message); +} + +static void agent_acp_notify_assistant(agent_worker *w, const char *s, size_t n) { + if (!w->cfg || !w->cfg->acp || !w->acp_session_id[0] || !n) return; + char *qsid = ds4_acp_json_escape(w->acp_session_id, strlen(w->acp_session_id)); + char *qtext = ds4_acp_json_escape(s, n); + pthread_mutex_lock(&w->acp_mu); + agent_acp_puts_locked( + "{\"jsonrpc\":\"2.0\",\"method\":\"session/update\",\"params\":{\"sessionId\":"); + agent_acp_write_all_locked(qsid, strlen(qsid)); + agent_acp_puts_locked( + ",\"update\":{\"sessionUpdate\":\"agent_message_chunk\",\"content\":{\"type\":\"text\",\"text\":"); + agent_acp_write_all_locked(qtext, strlen(qtext)); + agent_acp_puts_locked("}}}}\n"); + pthread_mutex_unlock(&w->acp_mu); + free(qsid); + free(qtext); +} + /* ============================================================================ * Trace Logging * ============================================================================ @@ -1613,6 +1731,7 @@ static char *agent_tail_capture_take(agent_tail_capture *t, size_t *len) { static void renderer_write(agent_token_renderer *r, const char *s, size_t n) { if (r->capture) agent_tail_capture_append(r->capture, s, n); + else if (r->worker->cfg && r->worker->cfg->acp) agent_acp_notify_assistant(r->worker, s, n); else agent_publish(r->worker, s, n); } @@ -1660,6 +1779,7 @@ static void renderer_restore_text_attrs(agent_token_renderer *r) { } static void renderer_write_complete_char_raw(agent_token_renderer *r, const char *s, size_t n) { + if (r->worker->cfg && r->worker->cfg->acp && r->in_think) return; bool styled = r->use_color && renderer_has_text_attrs(r); if (styled && !r->color_open) { renderer_set_text_attrs(r); @@ -2729,6 +2849,10 @@ static const char *agent_tool_param_color(agent_tool_param_kind kind) { } static void agent_tool_viz_write(agent_stream_renderer *sr, const char *s, size_t n) { + if (sr->renderer->worker->cfg && sr->renderer->worker->cfg->acp) { + for (size_t i = 0; i < n; i++) sr->viz.last_output_newline = s[i] == '\n'; + return; + } renderer_plain(sr->renderer, s, n); for (size_t i = 0; i < n; i++) sr->viz.last_output_newline = s[i] == '\n'; } @@ -3439,11 +3563,13 @@ static void agent_stream_text(agent_stream_renderer *sr, const char *text, size_ agent_stream_flush_start_tail(sr); sr->in_think = false; sr->renderer->in_think = false; - renderer_reset_color(sr->renderer); - if (!sr->renderer->last_output_newline) + if (!sr->renderer->worker->cfg->acp) { + renderer_reset_color(sr->renderer); + if (!sr->renderer->last_output_newline) + renderer_write(sr->renderer, "\n", 1); renderer_write(sr->renderer, "\n", 1); - renderer_write(sr->renderer, "\n", 1); - sr->renderer->last_output_newline = true; + sr->renderer->last_output_newline = true; + } sr->post_think_gap = true; i += strlen(think_close); continue; @@ -3996,6 +4122,7 @@ static void agent_worker_build_system_tokens(agent_worker *w, ds4_tokens *out) { } static void agent_publish_system_status(agent_worker *w, const char *msg) { + if (w->cfg->acp) return; if (w->cfg->non_interactive) return; if (isatty(STDOUT_FILENO)) { static const char marker[] = "\x1b[33m✦ \x1b[38;5;218m"; @@ -4032,7 +4159,7 @@ static void agent_publishf_system_status(agent_worker *w, const char *fmt, ...) static int agent_web_confirm(void *privdata, const char *message, char *err, size_t err_len) { agent_worker *w = privdata; - if (!w || w->cfg->non_interactive) { + if (!w || w->cfg->non_interactive || w->cfg->acp) { snprintf(err, err_len, "visible Chrome browser startup requires interactive approval"); return 0; @@ -4105,6 +4232,7 @@ static void worker_answer_web_approval(agent_worker *w, bool allow, * after the tool result is appended, so the next model input can contain both * the tool observation and the user's pending correction. */ static char *worker_request_queued_user_drain(agent_worker *w) { + if (w->cfg->acp) return NULL; pthread_mutex_lock(&w->mu); w->queued_user_drain_pending = true; w->queued_user_drain_answered = false; @@ -7035,6 +7163,120 @@ static pid_t agent_tool_pid(const agent_tool_call *call) { /* Execute one parsed DSML tool call and return the text that will be appended as * the tool-role result. UI visualization already happened while streaming; this * function is only about side effects and the model-visible observation. */ +static const char *agent_acp_tool_kind(const char *name) { + if (!name) return "other"; + if (!strcmp(name, "read") || !strcmp(name, "more") || !strcmp(name, "list")) + return "read"; + if (!strcmp(name, "write") || !strcmp(name, "edit")) + return "edit"; + if (!strcmp(name, "search")) return "search"; + if (!strcmp(name, "google_search")) return "search"; + if (!strcmp(name, "visit_page")) return "fetch"; + if (!strcmp(name, "bash") || !strcmp(name, "bash_status") || + !strcmp(name, "bash_stop")) + return "execute"; + return "other"; +} + +static void agent_acp_tool_title(const agent_tool_call *call, + char *buf, size_t len) { + const char *name = call->name ? call->name : "tool"; + const char *detail = agent_tool_arg_value(call, "path"); + if (!detail) detail = agent_tool_arg_value(call, "command"); + if (!detail) detail = agent_tool_arg_value(call, "query"); + if (!detail) detail = agent_tool_arg_value(call, "url"); + if (detail && detail[0]) + snprintf(buf, len, "%s %s", name, detail); + else + snprintf(buf, len, "%s", name); +} + +static char *agent_acp_tool_raw_input(const agent_tool_call *call) { + agent_buf b = {0}; + char *qname = ds4_acp_json_escape(call->name ? call->name : "", + strlen(call->name ? call->name : "")); + agent_buf_puts(&b, "{\"name\":"); + agent_buf_puts(&b, qname); + agent_buf_puts(&b, ",\"args\":{"); + free(qname); + for (int i = 0; i < call->argc; i++) { + if (i) agent_buf_puts(&b, ","); + const char *name = call->args[i].name ? call->args[i].name : ""; + const char *value = call->args[i].value ? call->args[i].value : ""; + char *qarg = ds4_acp_json_escape(name, strlen(name)); + char *qval = ds4_acp_json_escape(value, strlen(value)); + agent_buf_puts(&b, qarg); + agent_buf_puts(&b, ":"); + agent_buf_puts(&b, qval); + free(qarg); + free(qval); + } + agent_buf_puts(&b, "}}"); + return agent_buf_take(&b); +} + +/* ACP tool updates mirror the existing DSML tool execution: one tool_call when + * DS4 starts running the tool, followed by a tool_call_update with the final + * status and output. */ +static void agent_acp_notify_tool(agent_worker *w, const char *session_update, + const char *tool_id, + const agent_tool_call *call, + const char *status, + const char *output) { + if (!w->cfg->acp || !w->acp_session_id[0]) return; + char title[512]; + agent_acp_tool_title(call, title, sizeof(title)); + char *qsid = ds4_acp_json_escape(w->acp_session_id, strlen(w->acp_session_id)); + char *qid = ds4_acp_json_escape(tool_id, strlen(tool_id)); + char *qtitle = ds4_acp_json_escape(title, strlen(title)); + char *qkind = ds4_acp_json_escape(agent_acp_tool_kind(call->name), + strlen(agent_acp_tool_kind(call->name))); + char *qstatus = ds4_acp_json_escape(status, strlen(status)); + char *raw_input = agent_acp_tool_raw_input(call); + char *qout = output ? ds4_acp_json_escape(output, strlen(output)) : NULL; + + pthread_mutex_lock(&w->acp_mu); + agent_acp_puts_locked( + "{\"jsonrpc\":\"2.0\",\"method\":\"session/update\",\"params\":{\"sessionId\":"); + agent_acp_write_all_locked(qsid, strlen(qsid)); + agent_acp_puts_locked(",\"update\":{\"sessionUpdate\":"); + char *qupdate = ds4_acp_json_escape(session_update, strlen(session_update)); + agent_acp_write_all_locked(qupdate, strlen(qupdate)); + free(qupdate); + agent_acp_puts_locked(",\"toolCallId\":"); + agent_acp_write_all_locked(qid, strlen(qid)); + if (!strcmp(session_update, "tool_call")) { + agent_acp_puts_locked(",\"title\":"); + agent_acp_write_all_locked(qtitle, strlen(qtitle)); + agent_acp_puts_locked(",\"kind\":"); + agent_acp_write_all_locked(qkind, strlen(qkind)); + agent_acp_puts_locked(",\"rawInput\":"); + agent_acp_write_all_locked(raw_input, strlen(raw_input)); + } + agent_acp_puts_locked(",\"status\":"); + agent_acp_write_all_locked(qstatus, strlen(qstatus)); + if (qout) { + agent_acp_puts_locked(",\"rawOutput\":{\"text\":"); + agent_acp_write_all_locked(qout, strlen(qout)); + agent_acp_puts_locked("}"); + /* ToolCallContent wraps a normal ContentBlock, so this outer type is + * "content" while the inner block keeps its own "text" discriminator. */ + agent_acp_puts_locked(",\"content\":[{\"type\":\"content\",\"content\":{\"type\":\"text\",\"text\":"); + agent_acp_write_all_locked(qout, strlen(qout)); + agent_acp_puts_locked("}}]"); + } + agent_acp_puts_locked("}}}\n"); + pthread_mutex_unlock(&w->acp_mu); + + free(qsid); + free(qid); + free(qtitle); + free(qkind); + free(qstatus); + free(raw_input); + free(qout); +} + static char *agent_execute_tool_call(agent_worker *w, const agent_tool_call *call) { agent_buf result = {0}; if (!call->name) return xstrdup("Tool error: missing tool name\n"); @@ -7100,7 +7342,16 @@ static char *agent_execute_tool_call(agent_worker *w, const agent_tool_call *cal static char *agent_execute_tool_calls(agent_worker *w, const agent_tool_calls *calls) { agent_buf all = {0}; for (int i = 0; i < calls->len; i++) { + char tool_id[64]; + snprintf(tool_id, sizeof(tool_id), "tool_%d", ++w->next_acp_tool_call_id); + agent_acp_notify_tool(w, "tool_call", tool_id, &calls->v[i], + "pending", NULL); + agent_acp_notify_tool(w, "tool_call_update", tool_id, &calls->v[i], + "in_progress", NULL); char *res = agent_execute_tool_call(w, &calls->v[i]); + agent_acp_notify_tool(w, "tool_call_update", tool_id, &calls->v[i], + !strncmp(res, "Tool error:", 11) ? + "failed" : "completed", res); char hdr[128]; snprintf(hdr, sizeof(hdr), "Tool result %d (%s):\n", i + 1, calls->v[i].name ? calls->v[i].name : "unknown"); @@ -7543,6 +7794,7 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { if (agent_err_is_interrupted(compact_err)) { worker_clear_interrupt(w); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); return 0; } agent_set_error(w, compact_err[0] ? compact_err : "context compaction failed"); @@ -7575,6 +7827,11 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { * after a DSML stanza completes we terminate that assistant message, append * the tool result as a tool message, then ask the model to continue. */ for (int tool_round = 0; ; tool_round++) { + if (worker_should_interrupt(w)) { + agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); + return 0; + } if (tool_round > 0 && !agent_worker_compact_if_needed(w, "soft limit before tool continuation", compact_err, sizeof(compact_err))) @@ -7582,6 +7839,7 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { if (agent_err_is_interrupted(compact_err)) { worker_clear_interrupt(w); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); return 0; } agent_set_error(w, compact_err[0] ? compact_err : "context compaction failed"); @@ -7631,6 +7889,7 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { ds4_tokens_push(&w->transcript, ds4_token_eos(w->engine)); worker_clear_interrupt(w); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); return 0; } if (sync_rc != 0) { @@ -7732,8 +7991,8 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { break; } } - bool interrupted = worker_should_interrupt(w); + bool hit_max_tokens = !interrupted && generated >= max_tokens; agent_stream_text(&stream, NULL, 0, true); renderer_finish(&renderer); worker_set_greedy_sampling(w, false); @@ -7743,6 +8002,7 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { agent_publish_system_status(w, "Stopped by user"); worker_clear_interrupt(w); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); return 0; } if (stream.dsml_in_think) { @@ -7768,6 +8028,8 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { if (!got_tool && !malformed_tool && !early_tool_error) { agent_dsml_parser_free(&dsml); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, hit_max_tokens ? + AGENT_TURN_MAX_TOKENS : AGENT_TURN_END); return 0; } @@ -7803,6 +8065,7 @@ static int worker_run_turn(agent_worker *w, const char *user_text) { if (agent_err_is_interrupted(compact_err)) { worker_clear_interrupt(w); agent_set_status(w, AGENT_WORKER_IDLE); + agent_set_turn_result(w, AGENT_TURN_CANCELLED); return 0; } agent_set_error(w, compact_err[0] ? compact_err : "context compaction failed"); @@ -8059,6 +8322,7 @@ static bool worker_submit(agent_worker *w, const char *text) { w->status.generated = 0; w->status.gen_tps = 0.0; w->status.greedy_sampling = false; + w->last_turn_result = AGENT_TURN_NONE; pthread_cond_signal(&w->cond); } pthread_mutex_unlock(&w->mu); @@ -8125,6 +8389,13 @@ static void worker_get_status(agent_worker *w, agent_status *status) { pthread_mutex_unlock(&w->mu); } +static agent_turn_result worker_turn_result(agent_worker *w) { + pthread_mutex_lock(&w->mu); + agent_turn_result result = w->last_turn_result; + pthread_mutex_unlock(&w->mu); + return result; +} + static bool worker_is_idle(agent_worker *w) { pthread_mutex_lock(&w->mu); bool idle = w->initialized && @@ -9308,6 +9579,7 @@ static int agent_worker_init(agent_worker *w, ds4_engine *engine, agent_config * w->wake_fd[1] = -1; pthread_mutex_init(&w->mu, NULL); pthread_cond_init(&w->cond, NULL); + pthread_mutex_init(&w->acp_mu, NULL); w->status.state = AGENT_WORKER_IDLE; if (pipe(w->wake_fd) != 0) return -1; int old_flags; @@ -9366,6 +9638,7 @@ static void agent_worker_free(agent_worker *w) { if (w->trace) fclose(w->trace); free(w->cmd_text); free(w->out); + pthread_mutex_destroy(&w->acp_mu); pthread_cond_destroy(&w->cond); pthread_mutex_destroy(&w->mu); } @@ -9502,6 +9775,447 @@ static int agent_read_stdin_available(agent_input_buf *in, bool *eof) { } } +static void agent_acp_make_session_id(agent_worker *w) { + uint64_t bits[3]; + bits[0] = (uint64_t)time(NULL); + bits[1] = (uint64_t)getpid(); + bits[2] = (uint64_t)clock(); + char sha[41]; + ds4_kvstore_sha1_bytes_hex(bits, sizeof(bits), sha); + snprintf(w->acp_session_id, sizeof(w->acp_session_id), "ds4-%s", sha); +} + +static bool agent_acp_wait_initialized(agent_worker *w, char *err, size_t err_len) { + for (;;) { + agent_status st; + if (worker_is_initialized(w, &st)) { + if (st.state == AGENT_WORKER_ERROR) { + snprintf(err, err_len, "%s", + st.error[0] ? st.error : "worker initialization failed"); + return false; + } + return true; + } + struct pollfd pfd = {.fd = w->wake_fd[0], .events = POLLIN}; + int pr = poll(&pfd, 1, -1); + if (pr < 0) { + if (errno == EINTR) continue; + snprintf(err, err_len, "poll failed: %s", strerror(errno)); + return false; + } + if (pfd.revents & POLLIN) { + char *out = NULL; + size_t out_len = 0; + drain_wake_fd(w->wake_fd[0]); + worker_consume(w, &out, &out_len, &st); + free(out); + } + } +} + +static bool agent_acp_check_session(agent_worker *w, const char *params, + char *err, size_t err_len) { + char *session_id = NULL; + if (!params || !ds4_acp_object_get_string(params, "sessionId", &session_id)) { + snprintf(err, err_len, "missing sessionId"); + return false; + } + bool ok = w->acp_session_id[0] && !strcmp(session_id, w->acp_session_id); + if (!ok) snprintf(err, err_len, "unknown sessionId"); + free(session_id); + return ok; +} + +static bool agent_acp_json_empty_array(const char *json) { + const char *p = json; + ds4_acp_json_ws(&p); + if (*p != '[') return false; + p++; + ds4_acp_json_ws(&p); + if (*p != ']') return false; + p++; + ds4_acp_json_ws(&p); + return *p == '\0'; +} + +static bool agent_acp_protocol_version_param(const char *params) { + char *raw = NULL; + if (!params || !ds4_acp_object_get_raw(params, "protocolVersion", &raw)) + return false; + const char *p = raw; + ds4_acp_json_ws(&p); + bool ok = false; + if (isdigit((unsigned char)*p)) { + errno = 0; + char *end = NULL; + long version = strtol(p, &end, 10); + const char *q = end; + ds4_acp_json_ws(&q); + ok = errno == 0 && *q == '\0' && version >= 0 && version <= 65535; + } + free(raw); + return ok; +} + +static bool agent_acp_append_prompt_block(agent_buf *out, const char *block, + char *err, size_t err_len) { + char *type = NULL; + if (!ds4_acp_object_get_string(block, "type", &type)) { + snprintf(err, err_len, "prompt block missing type"); + return false; + } + if (!strcmp(type, "text")) { + char *text = NULL; + if (!ds4_acp_object_get_string(block, "text", &text)) { + free(type); + snprintf(err, err_len, "text block missing text"); + return false; + } + if (out->len && text[0]) agent_buf_puts(out, "\n"); + agent_buf_puts(out, text); + free(text); + free(type); + return true; + } + if (!strcmp(type, "resource_link")) { + char *uri = NULL; + char *name = NULL; + if (!ds4_acp_object_get_string(block, "uri", &uri) || + !ds4_acp_object_get_string(block, "name", &name)) + { + free(type); + free(uri); + free(name); + snprintf(err, err_len, "resource_link block missing name or uri"); + return false; + } + if (out->len) agent_buf_puts(out, "\n"); + agent_buf_puts(out, "[resource_link"); + if (name && name[0]) { + agent_buf_puts(out, " "); + agent_buf_puts(out, name); + } + agent_buf_puts(out, ": "); + agent_buf_puts(out, uri); + agent_buf_puts(out, "]"); + free(uri); + free(name); + free(type); + return true; + } + snprintf(err, err_len, "unsupported prompt block type: %s", type); + free(type); + return false; +} + +/* ACP prompt blocks can carry richer content than DS4's native text prompt. + * The first ACP pass accepts the required text and resource_link variants and + * flattens them into the same user text path used by the terminal agent. */ +static bool agent_acp_flatten_prompt(const char *params, + char **prompt_out, + char *err, size_t err_len) { + char *prompt = NULL; + if (!params || !ds4_acp_object_get_raw(params, "prompt", &prompt)) { + snprintf(err, err_len, "missing prompt"); + return false; + } + const char *p = prompt; + ds4_acp_json_ws(&p); + if (*p != '[') { + free(prompt); + snprintf(err, err_len, "prompt must be an array"); + return false; + } + p++; + ds4_acp_json_ws(&p); + agent_buf out = {0}; + while (*p && *p != ']') { + char *block = NULL; + if (!ds4_acp_json_raw_value(&p, &block)) { + free(prompt); + free(out.ptr); + snprintf(err, err_len, "invalid prompt block"); + return false; + } + bool ok = agent_acp_append_prompt_block(&out, block, err, err_len); + free(block); + if (!ok) { + free(prompt); + free(out.ptr); + return false; + } + ds4_acp_json_ws(&p); + if (*p == ',') { + p++; + ds4_acp_json_ws(&p); + } else if (*p != ']') { + free(prompt); + free(out.ptr); + snprintf(err, err_len, "invalid prompt array"); + return false; + } + } + if (*p != ']' || out.len == 0) { + free(prompt); + free(out.ptr); + snprintf(err, err_len, "empty or unterminated prompt"); + return false; + } + free(prompt); + *prompt_out = agent_buf_take(&out); + return true; +} + +static void agent_acp_finish_prompt_if_needed(agent_worker *worker, + bool *prompt_active, + char **prompt_id_json) { + if (!*prompt_active) return; + agent_status st; + worker_get_status(worker, &st); + agent_turn_result result = worker_turn_result(worker); + if (result == AGENT_TURN_NONE || + (st.state != AGENT_WORKER_IDLE && st.state != AGENT_WORKER_ERROR)) + { + return; + } + /* ACP has no generic error stop reason; worker failures are reported as + * JSON-RPC internal errors instead of successful prompt responses. */ + if (result == AGENT_TURN_ERROR || st.state == AGENT_WORKER_ERROR) { + agent_acp_write_error(worker, *prompt_id_json, -32603, + st.error[0] ? st.error : "agent turn failed"); + } else { + const char *stop_reason = agent_turn_stop_reason(result); + if (!stop_reason) { + agent_acp_write_error(worker, *prompt_id_json, -32603, + "agent turn ended without stop reason"); + } else { + char response[96]; + snprintf(response, sizeof(response), "{\"stopReason\":\"%s\"}", + stop_reason); + agent_acp_write_response(worker, *prompt_id_json, response); + } + } + free(*prompt_id_json); + *prompt_id_json = NULL; + *prompt_active = false; +} + +static void agent_acp_handle_request(agent_worker *worker, + ds4_acp_request *req, + bool *initialized, + bool *session_created, + bool *prompt_active, + char **prompt_id_json) { + char err[256] = {0}; + if (!strcmp(req->method, "initialize")) { + if (!req->has_params || + !agent_acp_protocol_version_param(req->params_json)) + { + agent_acp_write_request_error(worker, req, -32602, + "initialize requires protocolVersion"); + return; + } + *initialized = true; + if (req->has_id) { + agent_acp_write_response(worker, req->id_json, + "{\"protocolVersion\":1," + "\"agentInfo\":{\"name\":\"ds4-agent\",\"version\":\"0\"}," + "\"agentCapabilities\":{}}"); + } + return; + } + + if (!strcmp(req->method, "session/new")) { + char *cwd = NULL; + char *mcp_servers = NULL; + if (!*initialized) { + agent_acp_write_request_error(worker, req, -32600, + "initialize must be called first"); + return; + } + if (!req->has_params || + !ds4_acp_object_get_string(req->params_json, "cwd", &cwd) || + cwd[0] != '/') + { + free(cwd); + agent_acp_write_request_error(worker, req, -32602, + "session/new requires absolute cwd"); + return; + } + if (!ds4_acp_object_get_raw(req->params_json, "mcpServers", + &mcp_servers) || + !agent_acp_json_empty_array(mcp_servers)) + { + free(cwd); + free(mcp_servers); + agent_acp_write_request_error(worker, req, -32602, + "session/new requires empty mcpServers"); + return; + } + free(mcp_servers); + if (*session_created) { + free(cwd); + agent_acp_write_request_error(worker, req, -32603, + "ACP session already exists"); + return; + } + if (chdir(cwd) != 0) { + snprintf(err, sizeof(err), "failed to chdir to %s: %s", + cwd, strerror(errno)); + free(cwd); + agent_acp_write_request_error(worker, req, -32603, err); + return; + } + free(cwd); + if (!agent_acp_wait_initialized(worker, err, sizeof(err))) { + agent_acp_write_request_error(worker, req, -32603, err); + return; + } + agent_acp_make_session_id(worker); + *session_created = true; + if (req->has_id) { + char *qsid = ds4_acp_json_escape(worker->acp_session_id, + strlen(worker->acp_session_id)); + agent_buf b = {0}; + agent_buf_puts(&b, "{\"sessionId\":"); + agent_buf_puts(&b, qsid); + agent_buf_puts(&b, "}"); + char *res = agent_buf_take(&b); + agent_acp_write_response(worker, req->id_json, res); + free(res); + free(qsid); + } + return; + } + + if (!strcmp(req->method, "session/prompt")) { + char *prompt = NULL; + if (!*session_created || + !agent_acp_check_session(worker, req->params_json, err, sizeof(err))) + { + agent_acp_write_request_error(worker, req, -32602, + err[0] ? err : "invalid session"); + return; + } + if (!req->has_id) return; + if (*prompt_active) { + agent_acp_write_request_error(worker, req, -32603, + "agent is busy"); + return; + } + if (!agent_acp_flatten_prompt(req->params_json, &prompt, + err, sizeof(err))) + { + agent_acp_write_request_error(worker, req, -32602, err); + return; + } + if (!worker_submit(worker, prompt)) { + free(prompt); + agent_acp_write_request_error(worker, req, -32603, + "agent is not ready"); + return; + } + free(prompt); + *prompt_active = true; + *prompt_id_json = req->id_json; + req->id_json = NULL; + req->has_id = false; + return; + } + + if (!strcmp(req->method, "session/cancel")) { + if (!req->has_params || + !agent_acp_check_session(worker, req->params_json, err, sizeof(err))) + { + if (req->has_id) + agent_acp_write_error(worker, req->id_json, -32602, + err[0] ? err : "invalid session"); + return; + } + worker_interrupt(worker); + if (req->has_id) agent_acp_write_response(worker, req->id_json, "{}"); + return; + } + + if (req->has_id) + agent_acp_write_error(worker, req->id_json, -32601, "method not found"); +} + +static int run_agent_acp(ds4_engine *engine, agent_config *cfg) { + agent_worker worker; + if (agent_worker_init(&worker, engine, cfg) != 0) return 1; + + int old_stdin_flags = -1; + set_nonblock(STDIN_FILENO, true, &old_stdin_flags); + + bool initialized = false; + bool session_created = false; + bool prompt_active = false; + bool stdin_eof = false; + char *prompt_id_json = NULL; + agent_input_buf in = {0}; + int rc = 0; + + while (!stdin_eof || prompt_active) { + struct pollfd fds[2] = { + {.fd = STDIN_FILENO, .events = stdin_eof ? 0 : POLLIN}, + {.fd = worker.wake_fd[0], .events = POLLIN}, + }; + int pr = poll(fds, 2, -1); + if (pr < 0) { + if (errno == EINTR) continue; + perror("ds4-agent: poll"); + rc = 1; + break; + } + if (fds[0].revents & (POLLIN | POLLHUP)) { + if (agent_read_stdin_available(&in, &stdin_eof) != 0) { + rc = 1; + break; + } + char *line; + while ((line = agent_input_buf_take_line(&in)) != NULL) { + if (!line[0]) { + free(line); + continue; + } + ds4_acp_request req; + char err[256] = {0}; + ds4_acp_parse_result parsed = + ds4_acp_parse_request(line, &req, err, sizeof(err)); + if (parsed != DS4_ACP_PARSE_OK) { + agent_acp_write_error(&worker, NULL, + parsed == DS4_ACP_PARSE_JSON ? -32700 : -32600, + err[0] ? err : "invalid JSON-RPC request"); + free(line); + continue; + } + agent_acp_handle_request(&worker, &req, &initialized, + &session_created, &prompt_active, + &prompt_id_json); + ds4_acp_request_free(&req); + free(line); + } + } + if (fds[1].revents & POLLIN) { + char *out = NULL; + size_t out_len = 0; + agent_status st; + drain_wake_fd(worker.wake_fd[0]); + worker_consume(&worker, &out, &out_len, &st); + free(out); + } + agent_acp_finish_prompt_if_needed(&worker, &prompt_active, + &prompt_id_json); + } + + free(prompt_id_json); + agent_input_buf_free(&in); + if (old_stdin_flags >= 0) fcntl(STDIN_FILENO, F_SETFL, old_stdin_flags); + agent_worker_free(&worker); + return rc; +} + /* Headless mode is intentionally just another front-end for the same worker. * With -p/--prompt it is a one-shot execution. Without -p it becomes a small * stdin protocol: announce readiness on stderr, collect bytes until stdin has @@ -10088,12 +10802,13 @@ int main(int argc, char **argv) { memset(&sa, 0, sizeof(sa)); sigemptyset(&sa.sa_mask); sa.sa_handler = agent_sigint_handler; - bool sigint_installed = !cfg.non_interactive && + bool sigint_installed = !cfg.non_interactive && !cfg.acp && sigaction(SIGINT, &sa, &old_int) == 0; - int rc = cfg.non_interactive ? - run_agent_non_interactive(engine, &cfg) : - run_agent(engine, &cfg); + int rc = cfg.acp ? run_agent_acp(engine, &cfg) : + (cfg.non_interactive ? + run_agent_non_interactive(engine, &cfg) : + run_agent(engine, &cfg)); if (sigint_installed) sigaction(SIGINT, &old_int, NULL); ds4_engine_close(engine); diff --git a/ds4_help.c b/ds4_help.c index 92f184a96..9b3653b62 100644 --- a/ds4_help.c +++ b/ds4_help.c @@ -273,6 +273,7 @@ static void print_agent_specific(FILE *fp, const help_colors *c) { title(fp, c, "Agent Options"); opt(fp, c, "-p, --prompt TEXT", "Submit an initial prompt after startup."); opt(fp, c, "--non-interactive", "Run without TUI. With -p: one turn; without -p: repeated stdin prompts."); + opt(fp, c, "--acp", "Run Agent Client Protocol JSON-RPC over stdin/stdout."); opt(fp, c, "-sys, --system TEXT", "Extra system prompt. Empty disables extra text."); opt(fp, c, "--trace FILE", "Write prompt, token, and DSML debug trace."); opt(fp, c, "--chdir DIR", "Change working directory before loading runtime assets."); diff --git a/tests/ds4_test.c b/tests/ds4_test.c index 35af6bafe..543fbc2e4 100644 --- a/tests/ds4_test.c +++ b/tests/ds4_test.c @@ -1,6 +1,7 @@ #define DS4_SERVER_TEST #define DS4_SERVER_TEST_NO_MAIN #include "../ds4_server.c" +#include "../ds4_acp.h" #ifndef DS4_NO_GPU #include "../ds4_gpu.h" #include @@ -1801,6 +1802,73 @@ static void test_server_unit_group(void) { ds4_server_unit_tests_run(); } +static void test_acp_jsonrpc_parser(void) { + ds4_acp_request r; + char err[128]; + TEST_ASSERT(ds4_acp_parse_request( + "{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"initialize\",\"params\":{\"protocolVersion\":1}}", + &r, err, sizeof(err)) == DS4_ACP_PARSE_OK); + TEST_ASSERT(r.has_id); + TEST_ASSERT(!strcmp(r.id_json, "7")); + TEST_ASSERT(!strcmp(r.method, "initialize")); + TEST_ASSERT(r.has_params); + ds4_acp_request_free(&r); + + TEST_ASSERT(ds4_acp_parse_request( + "{\"jsonrpc\":\"2.0\",\"id\":\"abc\",\"method\":\"session/new\",\"params\":{\"cwd\":\"/tmp\"}}", + &r, err, sizeof(err)) == DS4_ACP_PARSE_OK); + TEST_ASSERT(!strcmp(r.id_json, "\"abc\"")); + char *cwd = NULL; + TEST_ASSERT(ds4_acp_object_get_string(r.params_json, "cwd", &cwd)); + TEST_ASSERT(!strcmp(cwd, "/tmp")); + free(cwd); + ds4_acp_request_free(&r); + + TEST_ASSERT(ds4_acp_parse_request("{bad", &r, err, sizeof(err)) == + DS4_ACP_PARSE_JSON); + TEST_ASSERT(ds4_acp_parse_request("{\"jsonrpc\":\"2.0\",\"id\":1}", &r, + err, sizeof(err)) == + DS4_ACP_PARSE_REQUEST); +} + +static void test_acp_json_strings(void) { + const char *p = "\"a\\n\\\"b\\u0021\""; + char *s = NULL; + TEST_ASSERT(ds4_acp_json_string(&p, &s)); + TEST_ASSERT(!strcmp(s, "a\n\"b!")); + free(s); + + p = "\"\\ud83d\\ude00\""; + s = NULL; + TEST_ASSERT(ds4_acp_json_string(&p, &s)); + TEST_ASSERT(!strcmp(s, "\xf0\x9f\x98\x80")); + free(s); + + p = "\"\\ud800\""; + s = NULL; + TEST_ASSERT(!ds4_acp_json_string(&p, &s)); + p = "\"\\ud800\\u0041\""; + s = NULL; + TEST_ASSERT(!ds4_acp_json_string(&p, &s)); + p = "\"\\udc00\""; + s = NULL; + TEST_ASSERT(!ds4_acp_json_string(&p, &s)); + + char *q = ds4_acp_json_escape("a\n\"b\\", strlen("a\n\"b\\")); + TEST_ASSERT(!strcmp(q, "\"a\\n\\\"b\\\\\"")); + free(q); + + const char bad_utf8[] = {(char)0xff, 0}; + q = ds4_acp_json_escape(bad_utf8, 1); + TEST_ASSERT(!strcmp(q, "\"\\u00ff\"")); + free(q); +} + +static void test_acp_unit_group(void) { + test_acp_jsonrpc_parser(); + test_acp_json_strings(); +} + typedef void (*test_fn)(void); typedef struct { @@ -1822,6 +1890,7 @@ static const ds4_test_entry test_entries[] = { {"--streaming-decode-prefill-correctness", "streaming-decode-prefill-correctness", "streaming decode-style cold prefill drift and repeatability", test_streaming_decode_prefill_correctness}, #endif {"--server", "server", "server parser/rendering/cache unit tests", test_server_unit_group}, + {"--acp", "acp", "ACP JSON-RPC parser and writer unit tests", test_acp_unit_group}, }; static void test_print_help(const char *prog) {