Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 136 additions & 80 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,58 @@ export class MemoryStore {
private ftsIndexCreated = false;
private lastFtsError: string | null = null;

// Per-ID mutex locks to prevent concurrent delete+add in update()
private updateLocks = new Map<string, Promise<void>>();
// Global write lock for bulkDelete to prevent concurrent updates
private bulkDeleteLock: Promise<void> = Promise.resolve();

constructor(private readonly config: StoreConfig) { }

/**
* Per-ID mutex: serializes concurrent update() calls on the same memory ID.
* Uses Promise chaining — no external mutex library needed.
*/
private async withUpdateLock<T>(id: string, fn: () => Promise<T>): Promise<T> {
// Wait for any in-progress bulkDelete to finish first
await this.bulkDeleteLock;

const prev = this.updateLocks.get(id) ?? Promise.resolve();
let resolve: () => void;
const next = new Promise<void>(r => { resolve = r; });
this.updateLocks.set(id, next);
try {
await prev;
return await fn();
} finally {
resolve!();
// Clean up only if no newer lock was queued for this ID
if (this.updateLocks.get(id) === next) {
this.updateLocks.delete(id);
}
}
}

/**
* Global write lock for bulkDelete: blocks all concurrent update() calls
* while bulk deletion is in progress.
*/
private async withBulkDeleteLock<T>(fn: () => Promise<T>): Promise<T> {
const prev = this.bulkDeleteLock;
let resolve: () => void;
this.bulkDeleteLock = new Promise<void>(r => { resolve = r; });
try {
await prev;
// Also wait for all in-flight per-ID update locks to drain
const pending = Array.from(this.updateLocks.values());
if (pending.length > 0) {
await Promise.all(pending);
}
return await fn();
} finally {
resolve!();
}
}

get dbPath(): string {
return this.config.dbPath;
}
Expand Down Expand Up @@ -694,7 +744,7 @@ export class MemoryStore {
): Promise<MemoryEntry | null> {
await this.ensureInitialized();

// Support both full UUID and short prefix (8+ hex chars), same as delete()
// Validate ID format before acquiring lock
const uuidRegex =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
const prefixRegex = /^[0-9a-f]{8,}$/i;
Expand All @@ -705,68 +755,71 @@ export class MemoryStore {
throw new Error(`Invalid memory ID format: ${id}`);
}

let rows: any[];
if (isFullId) {
const safeId = escapeSqlLiteral(id);
rows = await this.table!.query()
.where(`id = '${safeId}'`)
.limit(1)
.toArray();
} else {
// Prefix match
const all = await this.table!.query()
.select([
"id",
"text",
"vector",
"category",
"scope",
"importance",
"timestamp",
"metadata",
])
.limit(1000)
.toArray();
rows = all.filter((r: any) => (r.id as string).startsWith(id));
if (rows.length > 1) {
throw new Error(
`Ambiguous prefix "${id}" matches ${rows.length} memories. Use a longer prefix or full ID.`,
);
// Per-ID mutex: serializes concurrent update() on the same memory
return this.withUpdateLock(id, async () => {
let rows: any[];
if (isFullId) {
const safeId = escapeSqlLiteral(id);
rows = await this.table!.query()
.where(`id = '${safeId}'`)
.limit(1)
.toArray();
} else {
// Prefix match
const all = await this.table!.query()
.select([
"id",
"text",
"vector",
"category",
"scope",
"importance",
"timestamp",
"metadata",
])
.limit(1000)
.toArray();
rows = all.filter((r: any) => (r.id as string).startsWith(id));
if (rows.length > 1) {
throw new Error(
`Ambiguous prefix "${id}" matches ${rows.length} memories. Use a longer prefix or full ID.`,
);
}
}
}

if (rows.length === 0) return null;
if (rows.length === 0) return null;

const row = rows[0];
const rowScope = (row.scope as string | undefined) ?? "global";
const row = rows[0];
const rowScope = (row.scope as string | undefined) ?? "global";

// Check scope permissions
if (
scopeFilter &&
scopeFilter.length > 0 &&
!scopeFilter.includes(rowScope)
) {
throw new Error(`Memory ${id} is outside accessible scopes`);
}
// Check scope permissions
if (
scopeFilter &&
scopeFilter.length > 0 &&
!scopeFilter.includes(rowScope)
) {
throw new Error(`Memory ${id} is outside accessible scopes`);
}

// Build updated entry, preserving original timestamp
const updated: MemoryEntry = {
id: row.id as string,
text: updates.text ?? (row.text as string),
vector: updates.vector ?? Array.from(row.vector as Iterable<number>),
category: updates.category ?? (row.category as MemoryEntry["category"]),
scope: rowScope,
importance: updates.importance ?? Number(row.importance),
timestamp: Number(row.timestamp), // preserve original
metadata: updates.metadata ?? ((row.metadata as string) || "{}"),
};
// Build updated entry, preserving original timestamp
const updated: MemoryEntry = {
id: row.id as string,
text: updates.text ?? (row.text as string),
vector: updates.vector ?? Array.from(row.vector as Iterable<number>),
category: updates.category ?? (row.category as MemoryEntry["category"]),
scope: rowScope,
importance: updates.importance ?? Number(row.importance),
timestamp: Number(row.timestamp), // preserve original
metadata: updates.metadata ?? ((row.metadata as string) || "{}"),
};

// LanceDB doesn't support in-place update; delete + re-add
const resolvedId = escapeSqlLiteral(row.id as string);
await this.table!.delete(`id = '${resolvedId}'`);
await this.table!.add([updated]);
// LanceDB doesn't support in-place update; delete + re-add
const resolvedId = escapeSqlLiteral(row.id as string);
await this.table!.delete(`id = '${resolvedId}'`);
await this.table!.add([updated]);

return updated;
return updated;
});
}

async bulkDelete(
Expand All @@ -775,37 +828,40 @@ export class MemoryStore {
): Promise<number> {
await this.ensureInitialized();

const conditions: string[] = [];
// Global write lock: blocks concurrent update() calls during bulk deletion
return this.withBulkDeleteLock(async () => {
const conditions: string[] = [];

if (scopeFilter.length > 0) {
const scopeConditions = scopeFilter
.map((scope) => `scope = '${escapeSqlLiteral(scope)}'`)
.join(" OR ");
conditions.push(`(${scopeConditions})`);
}
if (scopeFilter.length > 0) {
const scopeConditions = scopeFilter
.map((scope) => `scope = '${escapeSqlLiteral(scope)}'`)
.join(" OR ");
conditions.push(`(${scopeConditions})`);
}

if (beforeTimestamp) {
conditions.push(`timestamp < ${beforeTimestamp}`);
}
if (beforeTimestamp) {
conditions.push(`timestamp < ${beforeTimestamp}`);
}

if (conditions.length === 0) {
throw new Error(
"Bulk delete requires at least scope or timestamp filter for safety",
);
}
if (conditions.length === 0) {
throw new Error(
"Bulk delete requires at least scope or timestamp filter for safety",
);
}

const whereClause = conditions.join(" AND ");
const whereClause = conditions.join(" AND ");

// Count first
const countResults = await this.table!.query().where(whereClause).toArray();
const deleteCount = countResults.length;
// Count first
const countResults = await this.table!.query().where(whereClause).toArray();
const deleteCount = countResults.length;

// Then delete
if (deleteCount > 0) {
await this.table!.delete(whereClause);
}
// Then delete
if (deleteCount > 0) {
await this.table!.delete(whereClause);
}

return deleteCount;
return deleteCount;
});
}

get hasFtsSupport(): boolean {
Expand Down
102 changes: 102 additions & 0 deletions src/tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,107 @@ export function registerMemoryListTool(
);
}

// ============================================================================
// Purge Scope Tool
// ============================================================================

export function registerMemoryPurgeScopeTool(
api: OpenClawPluginApi,
context: ToolContext,
) {
api.registerTool(
(toolCtx) => {
const agentId = resolveAgentId((toolCtx as any)?.agentId, context.agentId) ?? "main";
return {
name: "memory_purge_scope",
label: "Memory Purge Scope",
description:
"Permanently delete ALL memories in a specific scope. This is a DESTRUCTIVE and IRREVERSIBLE operation. " +
"You must set confirm=true to execute. Use with extreme caution — typically for user/agent cleanup.",
parameters: Type.Object({
scope: Type.String({
description: "The scope to purge (e.g. 'agent:ent_user-xxx_default')",
}),
confirm: Type.Boolean({
description: "Must be true to execute. Safety guard against accidental purge.",
}),
}),
async execute(_toolCallId, params) {
const { scope, confirm } = params as {
scope: string;
confirm: boolean;
};

if (!confirm) {
return {
content: [
{
type: "text",
text: "Purge aborted: confirm must be true to execute this destructive operation.",
},
],
details: { error: "not_confirmed" },
};
}

if (!scope || !scope.trim()) {
return {
content: [
{ type: "text", text: "Purge aborted: scope must be a non-empty string." },
],
details: { error: "invalid_scope" },
};
}

// Access control: only allow purging scopes this agent can access
if (!context.scopeManager.isAccessible(scope, agentId)) {
return {
content: [
{
type: "text",
text: `Access denied: scope "${scope}" is not accessible by agent "${agentId}".`,
},
],
details: {
error: "scope_access_denied",
requestedScope: scope,
agentId,
},
};
}

try {
const deletedCount = await context.store.bulkDelete([scope]);
return {
content: [
{
type: "text",
text: `Purged scope "${scope}": ${deletedCount} memor${deletedCount === 1 ? "y" : "ies"} deleted.`,
},
],
details: {
scope,
deletedCount,
},
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Failed to purge scope "${scope}": ${error instanceof Error ? error.message : String(error)}`,
},
],
details: { error: "purge_failed", message: String(error) },
};
}
},
};
},
{ name: "memory_purge_scope" },
);
}

// ============================================================================
// Tool Registration Helper
// ============================================================================
Expand All @@ -1220,6 +1321,7 @@ export function registerAllMemoryTools(
if (options.enableManagementTools) {
registerMemoryStatsTool(api, context);
registerMemoryListTool(api, context);
registerMemoryPurgeScopeTool(api, context);
}
if (options.enableSelfImprovementTools !== false) {
registerSelfImprovementLogTool(api, context);
Expand Down