Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/session/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@stello-ai/session",
"version": "0.7.0",
"version": "0.7.1",
"description": "Session layer for Stello — conversation topology engine",
"license": "Apache-2.0",
"author": "Stello Contributors",
Expand Down
124 changes: 124 additions & 0 deletions packages/session/src/__tests__/abort.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { describe, it, expect } from 'vitest'
import { makeSession } from './helpers.js'
import { loadSession } from '../create-session.js'
import { InMemoryStorageAdapter } from '../mocks/in-memory-storage.js'
import type { LLMAdapter, LLMChunk, LLMCompleteOptions, LLMResult, Message } from '../types/llm.js'

/** 让 fetch-style adapter 监听 signal 的最小 LLMAdapter */
Expand Down Expand Up @@ -157,3 +159,125 @@ describe('Session.stream() AbortSignal', () => {
expect(llm.calls[0]!.signal).toBe(controller.signal)
})
})

/**
* 当 tool 执行被 abort 中断后,storage 中会残留 assistant(toolCalls) 但缺对应 tool 结果。
* 下一次 send/stream 加载历史送给 LLM 时,必须把这种孤儿组过滤掉,
* 否则 OpenAI-compat adapter 会因协议不一致返回 400(assistant 有 tool_calls 缺响应)。
*/
describe('orphaned tool_calls sanitization (abort recovery)', () => {
/** 直接往 storage 注入一个携带 orphan 历史的 session */
async function seedSession(
storage: InMemoryStorageAdapter,
sessionId: string,
records: Message[],
): Promise<void> {
const now = new Date().toISOString()
await storage.putSession({
id: sessionId,
label: 'Test',
role: 'standard',
status: 'active',
tags: [],
metadata: {},
createdAt: now,
updatedAt: now,
})
for (const rec of records) {
await storage.appendRecord(sessionId, rec)
}
}

/** 抓取 LLM 收到的 prompt messages 的 mock adapter */
function createCapturingLLM(reply: LLMResult = { content: 'ok' }): LLMAdapter & { calls: Message[][] } {
const calls: Message[][] = []
const adapter: LLMAdapter = {
maxContextTokens: 1_000_000,
async complete(messages) {
calls.push(messages)
return reply
},
}
return Object.assign(adapter, { calls })
}

it('tail orphan:abort 留下 assistant(toolCalls) 后下一轮 send 不应把孤儿带进 prompt', async () => {
const storage = new InMemoryStorageAdapter()
const sessionId = 'tail-orphan'
await seedSession(storage, sessionId, [
{ role: 'user', content: 'do X' },
{
role: 'assistant',
content: '',
toolCalls: [{ id: 'tc-tail', name: 'foo', input: {} }],
},
])

const llm = createCapturingLLM()
const session = await loadSession(sessionId, { storage, llm })
expect(session).not.toBeNull()
await session!.send('follow-up')

const hasOrphan = llm.calls[0]!.some(
(m) => m.role === 'assistant' && m.toolCalls?.some((tc) => tc.id === 'tc-tail'),
)
expect(hasOrphan).toBe(false)
// 跟进的 user 消息应当是最末一条
expect(llm.calls[0]![llm.calls[0]!.length - 1]).toMatchObject({ role: 'user', content: 'follow-up' })
})

it('middle orphan:被夹在干净消息中间的 orphan 也必须过滤(仅裁尾不够)', async () => {
const storage = new InMemoryStorageAdapter()
const sessionId = 'middle-orphan'
await seedSession(storage, sessionId, [
{ role: 'user', content: 'first' },
{
role: 'assistant',
content: '',
toolCalls: [{ id: 'tc-orphan', name: 'foo', input: {} }],
},
{ role: 'user', content: 'second' },
{ role: 'assistant', content: 'clean response' },
])

const llm = createCapturingLLM()
const session = await loadSession(sessionId, { storage, llm })
expect(session).not.toBeNull()
await session!.send('third')

const hasOrphan = llm.calls[0]!.some(
(m) => m.role === 'assistant' && m.toolCalls?.some((tc) => tc.id === 'tc-orphan'),
)
expect(hasOrphan).toBe(false)
// 干净的 assistant 应保留
const hasClean = llm.calls[0]!.some((m) => m.role === 'assistant' && m.content === 'clean response')
expect(hasClean).toBe(true)
})

it('完整 tool call 组应原样保留(不能误伤)', async () => {
const storage = new InMemoryStorageAdapter()
const sessionId = 'complete-group'
await seedSession(storage, sessionId, [
{ role: 'user', content: 'do Y' },
{
role: 'assistant',
content: '',
toolCalls: [{ id: 'tc-ok', name: 'bar', input: {} }],
},
{ role: 'tool', content: 'result', toolCallId: 'tc-ok' },
{ role: 'assistant', content: 'final' },
])

const llm = createCapturingLLM()
const session = await loadSession(sessionId, { storage, llm })
expect(session).not.toBeNull()
await session!.send('next')

const hasAssistantWithCall = llm.calls[0]!.some(
(m) => m.role === 'assistant' && m.toolCalls?.some((tc) => tc.id === 'tc-ok'),
)
const hasToolResult = llm.calls[0]!.some((m) => m.role === 'tool' && m.toolCallId === 'tc-ok')
expect(hasAssistantWithCall).toBe(true)
expect(hasToolResult).toBe(true)
})
})
49 changes: 47 additions & 2 deletions packages/session/src/context-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,49 @@ import type { Message, LLMAdapter } from './types/llm.js'
import type { SessionStorage } from './types/storage.js'
import type { CompressFn } from './types/functions.js'

/**
* 从历史中移除所有不完整的 tool call 组:
* 任何 `assistant(toolCalls=[A,B,...])` 后续若缺少与之 `toolCallId` 对应的 `tool` 消息,
* 则整组(assistant + 已写入的 tool 消息)一并丢弃;同时丢弃没有匹配 assistant 的孤立 tool 消息。
*
* 触发场景:
* - tool 执行被 AbortSignal 中断 → assistant 已写入但 tool result 永远不再回灌(最常见)
* - 进程崩溃 / 手动改库 / 旧版 bug 残留
*
* 这是历史→prompt 的不变量保证:送给 LLM 的 messages 必须满足 OpenAI/Anthropic 协议
* 对 tool call group 完整性的要求,否则 OpenAI-compat adapter 会返回 400。
*/
export function removeIncompleteToolCallGroups(records: Message[]): Message[] {
const result: Message[] = []
let i = 0
while (i < records.length) {
const rec = records[i]!
if (rec.role === 'assistant' && rec.toolCalls && rec.toolCalls.length > 0) {
const expectedIds = new Set(rec.toolCalls.map((tc) => tc.id))
let j = i + 1
while (j < records.length && records[j]!.role === 'tool') {
const t = records[j]!
if (t.toolCallId) expectedIds.delete(t.toolCallId)
j++
}
if (expectedIds.size === 0) {
for (let k = i; k < j; k++) result.push(records[k]!)
}
// 不完整 → 整组丢弃
i = j
continue
}
if (rec.role === 'tool') {
// 没有前导 assistant(toolCalls) 的孤立 tool 消息 → 丢弃
i++
continue
}
result.push(rec)
i++
}
return result
}

/** 内置默认压缩提示词 */
const BUILTIN_COMPRESS_PROMPT = `你是对话压缩助手。请将以下对话历史压缩为一段简洁的摘要,保留关键上下文信息。
要求:
Expand Down Expand Up @@ -155,7 +198,8 @@ export async function assembleSessionContext(
const userTimestamp = new Date().toISOString()
const userMessage: Message = { role: 'user', content: userContent, timestamp: userTimestamp }

const history = await storage.listRecords(sessionId)
// 净化历史:移除中断/崩溃残留的不完整 tool call 组,保证送给 LLM 的 prompt 协议合法
const history = removeIncompleteToolCallGroups(await storage.listRecords(sessionId))

// 估算全量 token 数
const fullMessages = [...prefixMessages, ...history, userMessage]
Expand Down Expand Up @@ -266,7 +310,8 @@ export async function assembleMainSessionContext(
const userTimestamp = new Date().toISOString()
const userMessage: Message = { role: 'user', content: userContent, timestamp: userTimestamp }

const history = await storage.listRecords(sessionId)
// 净化历史:与 assembleSessionContext 对称,避免不完整 tool call 组流入 prompt
const history = removeIncompleteToolCallGroups(await storage.listRecords(sessionId))

// 估算全量 token 数
const fullMessages = [...prefixMessages, ...history, userMessage]
Expand Down
54 changes: 11 additions & 43 deletions packages/session/src/create-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,7 @@ import { SessionArchivedError } from './types/session-api.js'
import type { SessionMeta, SessionMetaUpdate, ForkOptions } from './types/session.js'
import type { Message } from './types/llm.js'
import type { CreateSessionOptions, LoadSessionOptions, SendResult, StreamResult } from './types/functions.js'
import { assembleSessionContext, buildSessionIdentityMessages, createBuiltinCompressFn, type CompressionCache } from './context-utils.js'

/** 裁掉尾部不完整的 tool call 组(assistant 有 toolCalls 但缺少对应 tool 结果) */
function trimIncompleteToolCallGroup(records: Message[]): Message[] {
if (records.length === 0) return records
let end = records.length
while (end > 0) {
const last = records[end - 1]!
if (last.role === 'assistant' && last.toolCalls && last.toolCalls.length > 0) {
// assistant 有 toolCalls 但后面没有 tool 消息 → 裁掉
end--
continue
}
if (last.role === 'tool') {
// tool 消息,向前找到对应的 assistant
let assistantIdx = end - 2
while (assistantIdx >= 0 && records[assistantIdx]!.role === 'tool') {
assistantIdx--
}
if (assistantIdx >= 0) {
const assistant = records[assistantIdx]!
if (assistant.role === 'assistant' && assistant.toolCalls && assistant.toolCalls.length > 0) {
const expectedIds = new Set(assistant.toolCalls.map(tc => tc.id))
for (let j = assistantIdx + 1; j < end; j++) {
const rec = records[j]!
if (rec.role === 'tool' && rec.toolCallId) {
expectedIds.delete(rec.toolCallId)
}
}
if (expectedIds.size > 0) {
// 不完整 → 裁掉整个组
end = assistantIdx
continue
}
}
}
}
break
}
return end === records.length ? records : records.slice(0, end)
}
import { assembleSessionContext, buildSessionIdentityMessages, createBuiltinCompressFn, removeIncompleteToolCallGroups, type CompressionCache } from './context-utils.js'

interface ToolResultEnvelope {
toolResults: Array<{
Expand Down Expand Up @@ -114,6 +74,9 @@ async function assembleSessionReplayContext(
messages.push({ role: 'system', content: memory })
}

// 注意:此处刻意不调用 removeIncompleteToolCallGroups。
// replay 路径会把"assistant(toolCalls) + 由 envelope 合成的 tool 消息"拼接成完整组,
// 在加载阶段过早裁剪反而会把回灌目标删掉。完整组校验放在拼接后由调用方做。
const history = await storage.listRecords(sessionId)
messages.push(...history)
return { messages, insightConsumed }
Expand Down Expand Up @@ -227,6 +190,9 @@ function buildSession(
if (replayContext.insightConsumed) {
await storage.clearInsight(currentMeta.id)
}
// 替换为 replay 上下文后,原 assembled.messages 里的 sanitize 不再生效;
// 在拼好"assistant + tool 结果"完整组之后,再做一次孤儿组清理(防御中段 orphan)。
promptMessages = removeIncompleteToolCallGroups(promptMessages)
}

// 调 LLM — adapter 抛 AbortError 时直接向上传播,下方 L3 写入分支整体跳过
Expand Down Expand Up @@ -299,6 +265,8 @@ function buildSession(
if (replayContext.insightConsumed) {
await storage.clearInsight(currentMeta.id)
}
// 拼好完整组之后再清孤儿,防御中段 orphan(与 send() 对称)
promptMessages = removeIncompleteToolCallGroups(promptMessages)
}

if (!options.llm) {
Expand Down Expand Up @@ -440,8 +408,8 @@ function buildSession(
if (ctx !== 'none') {
const parentRecords = await storage.listRecords(currentMeta.id)
const selected = ctx === 'inherit' ? parentRecords : await ctx(parentRecords)
// 裁掉尾部不完整的 tool call 组(fork 发生在 tool 执行中,tool result 还没写入
const records = trimIncompleteToolCallGroup(selected)
// 净化掉不完整的 tool call 组(fork tool 执行中、或父历史里夹有中段 orphan 时都需要
const records = removeIncompleteToolCallGroups(selected)
for (const record of records) {
await storage.appendRecord(childId, record)
}
Expand Down