diff --git a/src/store.ts b/src/store.ts index 454141e..b1c0cf7 100644 --- a/src/store.ts +++ b/src/store.ts @@ -158,8 +158,58 @@ export class MemoryStore { private ftsIndexCreated = false; private lastFtsError: string | null = null; + // Per-ID mutex locks to prevent concurrent delete+add in update() + private updateLocks = new Map>(); + // Global write lock for bulkDelete to prevent concurrent updates + private bulkDeleteLock: Promise = Promise.resolve(); + constructor(private readonly config: StoreConfig) { } + /** + * Per-ID mutex: serializes concurrent update() calls on the same memory ID. + * Uses Promise chaining — no external mutex library needed. + */ + private async withUpdateLock(id: string, fn: () => Promise): Promise { + // Wait for any in-progress bulkDelete to finish first + await this.bulkDeleteLock; + + const prev = this.updateLocks.get(id) ?? Promise.resolve(); + let resolve: () => void; + const next = new Promise(r => { resolve = r; }); + this.updateLocks.set(id, next); + try { + await prev; + return await fn(); + } finally { + resolve!(); + // Clean up only if no newer lock was queued for this ID + if (this.updateLocks.get(id) === next) { + this.updateLocks.delete(id); + } + } + } + + /** + * Global write lock for bulkDelete: blocks all concurrent update() calls + * while bulk deletion is in progress. + */ + private async withBulkDeleteLock(fn: () => Promise): Promise { + const prev = this.bulkDeleteLock; + let resolve: () => void; + this.bulkDeleteLock = new Promise(r => { resolve = r; }); + try { + await prev; + // Also wait for all in-flight per-ID update locks to drain + const pending = Array.from(this.updateLocks.values()); + if (pending.length > 0) { + await Promise.all(pending); + } + return await fn(); + } finally { + resolve!(); + } + } + get dbPath(): string { return this.config.dbPath; } @@ -694,7 +744,7 @@ export class MemoryStore { ): Promise { await this.ensureInitialized(); - // Support both full UUID and short prefix (8+ hex chars), same as delete() + // Validate ID format before acquiring lock const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; const prefixRegex = /^[0-9a-f]{8,}$/i; @@ -705,68 +755,71 @@ export class MemoryStore { throw new Error(`Invalid memory ID format: ${id}`); } - let rows: any[]; - if (isFullId) { - const safeId = escapeSqlLiteral(id); - rows = await this.table!.query() - .where(`id = '${safeId}'`) - .limit(1) - .toArray(); - } else { - // Prefix match - const all = await this.table!.query() - .select([ - "id", - "text", - "vector", - "category", - "scope", - "importance", - "timestamp", - "metadata", - ]) - .limit(1000) - .toArray(); - rows = all.filter((r: any) => (r.id as string).startsWith(id)); - if (rows.length > 1) { - throw new Error( - `Ambiguous prefix "${id}" matches ${rows.length} memories. Use a longer prefix or full ID.`, - ); + // Per-ID mutex: serializes concurrent update() on the same memory + return this.withUpdateLock(id, async () => { + let rows: any[]; + if (isFullId) { + const safeId = escapeSqlLiteral(id); + rows = await this.table!.query() + .where(`id = '${safeId}'`) + .limit(1) + .toArray(); + } else { + // Prefix match + const all = await this.table!.query() + .select([ + "id", + "text", + "vector", + "category", + "scope", + "importance", + "timestamp", + "metadata", + ]) + .limit(1000) + .toArray(); + rows = all.filter((r: any) => (r.id as string).startsWith(id)); + if (rows.length > 1) { + throw new Error( + `Ambiguous prefix "${id}" matches ${rows.length} memories. Use a longer prefix or full ID.`, + ); + } } - } - if (rows.length === 0) return null; + if (rows.length === 0) return null; - const row = rows[0]; - const rowScope = (row.scope as string | undefined) ?? "global"; + const row = rows[0]; + const rowScope = (row.scope as string | undefined) ?? "global"; - // Check scope permissions - if ( - scopeFilter && - scopeFilter.length > 0 && - !scopeFilter.includes(rowScope) - ) { - throw new Error(`Memory ${id} is outside accessible scopes`); - } + // Check scope permissions + if ( + scopeFilter && + scopeFilter.length > 0 && + !scopeFilter.includes(rowScope) + ) { + throw new Error(`Memory ${id} is outside accessible scopes`); + } - // Build updated entry, preserving original timestamp - const updated: MemoryEntry = { - id: row.id as string, - text: updates.text ?? (row.text as string), - vector: updates.vector ?? Array.from(row.vector as Iterable), - category: updates.category ?? (row.category as MemoryEntry["category"]), - scope: rowScope, - importance: updates.importance ?? Number(row.importance), - timestamp: Number(row.timestamp), // preserve original - metadata: updates.metadata ?? ((row.metadata as string) || "{}"), - }; + // Build updated entry, preserving original timestamp + const updated: MemoryEntry = { + id: row.id as string, + text: updates.text ?? (row.text as string), + vector: updates.vector ?? Array.from(row.vector as Iterable), + category: updates.category ?? (row.category as MemoryEntry["category"]), + scope: rowScope, + importance: updates.importance ?? Number(row.importance), + timestamp: Number(row.timestamp), // preserve original + metadata: updates.metadata ?? ((row.metadata as string) || "{}"), + }; - // LanceDB doesn't support in-place update; delete + re-add - const resolvedId = escapeSqlLiteral(row.id as string); - await this.table!.delete(`id = '${resolvedId}'`); - await this.table!.add([updated]); + // LanceDB doesn't support in-place update; delete + re-add + const resolvedId = escapeSqlLiteral(row.id as string); + await this.table!.delete(`id = '${resolvedId}'`); + await this.table!.add([updated]); - return updated; + return updated; + }); } async bulkDelete( @@ -775,37 +828,40 @@ export class MemoryStore { ): Promise { await this.ensureInitialized(); - const conditions: string[] = []; + // Global write lock: blocks concurrent update() calls during bulk deletion + return this.withBulkDeleteLock(async () => { + const conditions: string[] = []; - if (scopeFilter.length > 0) { - const scopeConditions = scopeFilter - .map((scope) => `scope = '${escapeSqlLiteral(scope)}'`) - .join(" OR "); - conditions.push(`(${scopeConditions})`); - } + if (scopeFilter.length > 0) { + const scopeConditions = scopeFilter + .map((scope) => `scope = '${escapeSqlLiteral(scope)}'`) + .join(" OR "); + conditions.push(`(${scopeConditions})`); + } - if (beforeTimestamp) { - conditions.push(`timestamp < ${beforeTimestamp}`); - } + if (beforeTimestamp) { + conditions.push(`timestamp < ${beforeTimestamp}`); + } - if (conditions.length === 0) { - throw new Error( - "Bulk delete requires at least scope or timestamp filter for safety", - ); - } + if (conditions.length === 0) { + throw new Error( + "Bulk delete requires at least scope or timestamp filter for safety", + ); + } - const whereClause = conditions.join(" AND "); + const whereClause = conditions.join(" AND "); - // Count first - const countResults = await this.table!.query().where(whereClause).toArray(); - const deleteCount = countResults.length; + // Count first + const countResults = await this.table!.query().where(whereClause).toArray(); + const deleteCount = countResults.length; - // Then delete - if (deleteCount > 0) { - await this.table!.delete(whereClause); - } + // Then delete + if (deleteCount > 0) { + await this.table!.delete(whereClause); + } - return deleteCount; + return deleteCount; + }); } get hasFtsSupport(): boolean { diff --git a/src/tools.ts b/src/tools.ts index 0575432..29000dc 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -1198,6 +1198,107 @@ export function registerMemoryListTool( ); } +// ============================================================================ +// Purge Scope Tool +// ============================================================================ + +export function registerMemoryPurgeScopeTool( + api: OpenClawPluginApi, + context: ToolContext, +) { + api.registerTool( + (toolCtx) => { + const agentId = resolveAgentId((toolCtx as any)?.agentId, context.agentId) ?? "main"; + return { + name: "memory_purge_scope", + label: "Memory Purge Scope", + description: + "Permanently delete ALL memories in a specific scope. This is a DESTRUCTIVE and IRREVERSIBLE operation. " + + "You must set confirm=true to execute. Use with extreme caution — typically for user/agent cleanup.", + parameters: Type.Object({ + scope: Type.String({ + description: "The scope to purge (e.g. 'agent:ent_user-xxx_default')", + }), + confirm: Type.Boolean({ + description: "Must be true to execute. Safety guard against accidental purge.", + }), + }), + async execute(_toolCallId, params) { + const { scope, confirm } = params as { + scope: string; + confirm: boolean; + }; + + if (!confirm) { + return { + content: [ + { + type: "text", + text: "Purge aborted: confirm must be true to execute this destructive operation.", + }, + ], + details: { error: "not_confirmed" }, + }; + } + + if (!scope || !scope.trim()) { + return { + content: [ + { type: "text", text: "Purge aborted: scope must be a non-empty string." }, + ], + details: { error: "invalid_scope" }, + }; + } + + // Access control: only allow purging scopes this agent can access + if (!context.scopeManager.isAccessible(scope, agentId)) { + return { + content: [ + { + type: "text", + text: `Access denied: scope "${scope}" is not accessible by agent "${agentId}".`, + }, + ], + details: { + error: "scope_access_denied", + requestedScope: scope, + agentId, + }, + }; + } + + try { + const deletedCount = await context.store.bulkDelete([scope]); + return { + content: [ + { + type: "text", + text: `Purged scope "${scope}": ${deletedCount} memor${deletedCount === 1 ? "y" : "ies"} deleted.`, + }, + ], + details: { + scope, + deletedCount, + }, + }; + } catch (error) { + return { + content: [ + { + type: "text", + text: `Failed to purge scope "${scope}": ${error instanceof Error ? error.message : String(error)}`, + }, + ], + details: { error: "purge_failed", message: String(error) }, + }; + } + }, + }; + }, + { name: "memory_purge_scope" }, + ); +} + // ============================================================================ // Tool Registration Helper // ============================================================================ @@ -1220,6 +1321,7 @@ export function registerAllMemoryTools( if (options.enableManagementTools) { registerMemoryStatsTool(api, context); registerMemoryListTool(api, context); + registerMemoryPurgeScopeTool(api, context); } if (options.enableSelfImprovementTools !== false) { registerSelfImprovementLogTool(api, context);