Skip to content

Commit 6ae7bdd

Browse files
dcramerclaude
andcommitted
Fix memory extraction misattribution, double-extraction, and contradiction bugs
- Skip speaker_info label for pre-labeled history messages (starting with @) in _format_conversation to prevent double-labeling misattribution - Add touch_debounce() to MemoryPostprocessService and wire it through RPC extraction handlers to prevent postprocess double-extraction - Add _conflicts_with_self_fact() guard in process_extracted_facts() to drop third-party person_facts that contradict authoritative self-facts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c92fb80 commit 6ae7bdd

7 files changed

Lines changed: 422 additions & 1 deletion

File tree

src/ash/integrations/memory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def register_rpc_methods(self, server, context: IntegrationContext) -> None:
5252
components.memory_manager,
5353
memory_extractor=components.memory_extractor,
5454
sessions_path=context.sessions_path,
55+
postprocess_service=self._postprocess,
5556
)
5657

5758
async def on_message_postprocess(

src/ash/memory/extractor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,9 @@ def _format_conversation(
756756
text = text[:2000] + "..."
757757

758758
if msg.role == Role.USER:
759-
if speaker_info:
759+
# Skip speaker_info for pre-labeled history messages (already
760+
# contain @username: prefix from chat history loading).
761+
if speaker_info and not text.lstrip().startswith("@"):
760762
label = speaker_info.format_label()
761763
lines.append(f"<user>\n{label}: {text}\n</user>")
762764
else:

src/ash/memory/postprocess.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ def __init__(
4848
self._confidence_threshold = confidence_threshold
4949
self._last_extraction_time: float | None = None
5050

51+
def touch_debounce(self) -> None:
52+
"""Mark an extraction as having just occurred.
53+
54+
Called by RPC extraction handlers so the postprocess debounce timer
55+
is aware that extraction already happened, preventing double-extraction.
56+
"""
57+
self._last_extraction_time = time.time()
58+
5159
def maybe_schedule(
5260
self,
5361
*,

src/ash/memory/processing.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,75 @@ async def build_owner_names_for_speaker(
351351
return owner_names
352352

353353

354+
async def _conflicts_with_self_fact(
355+
store: Store,
356+
content: str,
357+
subject_person_ids: list[str],
358+
speaker_person_id: str | None,
359+
) -> bool:
360+
"""Check if a person_fact conflicts with an existing authoritative self-fact.
361+
362+
When a third party claims something about a subject (person_fact), and the
363+
subject already has a self-fact (stated by themselves) on the same topic,
364+
the third-party claim should be dropped to preserve subject authority.
365+
366+
Only applies when the speaker is NOT the subject (i.e. third-party claims).
367+
"""
368+
from ash.graph.edges import get_memories_about_person
369+
from ash.store.trust import classify_trust
370+
from ash.store.types import get_assertion
371+
372+
if not subject_person_ids:
373+
return False
374+
375+
# If the speaker is the subject, this is a self-fact update, not a conflict
376+
if speaker_person_id and speaker_person_id in subject_person_ids:
377+
return False
378+
379+
# Find existing self-facts about each subject
380+
for pid in subject_person_ids:
381+
memory_ids = get_memories_about_person(store._graph, pid)
382+
for mid in memory_ids:
383+
memory = store._graph.memories.get(mid)
384+
if not memory or memory.superseded_at or memory.archived_at:
385+
continue
386+
387+
# Must be a self-fact (speaker is the subject)
388+
trust = classify_trust(store._graph, mid)
389+
if trust != "fact":
390+
continue
391+
392+
# Also verify assertion kind is SELF_FACT if assertion exists
393+
assertion = get_assertion(memory)
394+
if assertion and assertion.assertion_kind != AssertionKind.SELF_FACT:
395+
continue
396+
397+
# Check semantic similarity — is the new claim about the same topic?
398+
try:
399+
query_embedding = await store._embeddings.embed(content)
400+
similar = store._index.search(query_embedding, limit=5)
401+
for found_id, similarity in similar:
402+
if found_id == mid and similarity >= 0.75:
403+
logger.info(
404+
"person_fact_blocked_by_self_fact",
405+
extra={
406+
"fact.content": content[:80],
407+
"self_fact.id": mid,
408+
"self_fact.content": memory.content[:80],
409+
"similarity": similarity,
410+
},
411+
)
412+
return True
413+
except Exception:
414+
logger.debug(
415+
"self_fact_conflict_check_failed",
416+
extra={"person_id": pid},
417+
exc_info=True,
418+
)
419+
420+
return False
421+
422+
354423
async def process_extracted_facts(
355424
facts: list[ExtractedFact],
356425
store: Store,
@@ -536,6 +605,17 @@ async def process_extracted_facts(
536605
)
537606
assertion = downgrade_assertion_to_context(assertion)
538607

608+
# Guard: drop third-party person_facts that contradict authoritative
609+
# self-facts from the subject. See specs/memory/index.md.
610+
if (
611+
assertion.assertion_kind == AssertionKind.PERSON_FACT
612+
and subject_person_ids
613+
and await _conflicts_with_self_fact(
614+
store, fact.content, subject_person_ids, stated_by_pid
615+
)
616+
):
617+
continue
618+
539619
# DM sensitivity floor: ephemeral types get minimum PERSONAL
540620
# in private chats as defense-in-depth against cross-context leakage
541621
effective_sensitivity = fact.sensitivity

src/ash/rpc/methods/memory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
if TYPE_CHECKING:
2020
from ash.memory.extractor import MemoryExtractor
21+
from ash.memory.postprocess import MemoryPostprocessService
2122
from ash.rpc.server import RPCServer
2223
from ash.store.store import Store
2324

@@ -31,6 +32,7 @@ def register_memory_methods(
3132
memory_manager: "Store",
3233
memory_extractor: "MemoryExtractor | None" = None,
3334
sessions_path: Path | None = None,
35+
postprocess_service: "MemoryPostprocessService | None" = None,
3436
) -> None:
3537
"""Register memory-related RPC methods.
3638
@@ -39,6 +41,7 @@ def register_memory_methods(
3941
memory_manager: Store instance.
4042
memory_extractor: Optional extractor for fact classification/extraction.
4143
sessions_path: Path to sessions directory (for memory.extract).
44+
postprocess_service: Optional postprocess service for debounce coordination.
4245
"""
4346

4447
async def _build_username_lookup() -> dict[str, str]:
@@ -496,6 +499,11 @@ async def _extract_and_store_from_messages(
496499
chat_type=chat_type,
497500
)
498501

502+
# Touch postprocess debounce so the background extraction timer
503+
# knows an RPC extraction just occurred, preventing double-extraction.
504+
if postprocess_service and stored_ids:
505+
postprocess_service.touch_debounce()
506+
499507
return {"stored": len(stored_ids)}
500508

501509
async def memory_extract_from_messages(params: dict[str, Any]) -> dict[str, Any]:

tests/test_memory_extractor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,3 +961,51 @@ def test_aliases_skips_invalid_inner_types(self, extractor):
961961

962962
assert len(facts) == 1
963963
assert facts[0].aliases == {}
964+
965+
966+
class TestFormatConversation:
967+
"""Tests for _format_conversation speaker labeling behavior."""
968+
969+
@pytest.fixture
970+
def extractor(self):
971+
return MemoryExtractor(
972+
llm=MagicMock(),
973+
model="test-model",
974+
confidence_threshold=0.7,
975+
)
976+
977+
def test_skips_speaker_info_for_at_prefixed_messages(self, extractor):
978+
"""Pre-labeled history messages (starting with @) should not get speaker_info prepended."""
979+
speaker = SpeakerInfo(username="sksembhi", display_name="SK")
980+
messages = [
981+
Message(role=Role.USER, content="@evanpurkhiser (Evan): I'm 6'2\""),
982+
]
983+
result = extractor._format_conversation(messages, speaker_info=speaker)
984+
# Should NOT prepend @sksembhi label — message is already labeled
985+
assert "@sksembhi" not in result
986+
assert "@evanpurkhiser (Evan): I'm 6'2\"" in result
987+
988+
def test_adds_speaker_info_for_unprefixed_messages(self, extractor):
989+
"""Unprefixed user messages should get speaker_info label prepended."""
990+
speaker = SpeakerInfo(username="sksembhi", display_name="SK")
991+
messages = [
992+
Message(role=Role.USER, content="Hello world"),
993+
]
994+
result = extractor._format_conversation(messages, speaker_info=speaker)
995+
assert "@sksembhi (SK): Hello world" in result
996+
997+
def test_mixed_history_and_current_messages(self, extractor):
998+
"""History (pre-labeled) and current (unlabeled) messages should be handled correctly."""
999+
speaker = SpeakerInfo(username="sksembhi", display_name="SK")
1000+
messages = [
1001+
Message(role=Role.USER, content="@evanpurkhiser (Evan): I'm 6'2\""),
1002+
Message(role=Role.ASSISTANT, content="Got it!"),
1003+
Message(role=Role.USER, content="Evan's height is 5'2\""),
1004+
]
1005+
result = extractor._format_conversation(messages, speaker_info=speaker)
1006+
# History message: no speaker_info prepended
1007+
assert "@sksembhi" not in result.split("</user>")[0]
1008+
# Current message: speaker_info prepended
1009+
assert "@sksembhi (SK): Evan's height is 5'2\"" in result
1010+
# History message preserved
1011+
assert "@evanpurkhiser (Evan): I'm 6'2\"" in result

0 commit comments

Comments
 (0)