Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
535 changes: 535 additions & 0 deletions docs/tech-specs/knowledge-core-completeness.md

Large diffs are not rendered by default.

254 changes: 250 additions & 4 deletions tests/unit/test_cores/test_knowledge_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from unittest.mock import call

from trustgraph.cores.knowledge import KnowledgeManager
from trustgraph.schema import KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term, EntityEmbeddings, IRI, LITERAL
from trustgraph.schema import (
KnowledgeResponse, Triples, GraphEmbeddings, Metadata, Triple, Term,
EntityEmbeddings, IRI, LITERAL,
LibraryMetadata, LibraryBlob,
LibrarianResponse, DocumentMetadata,
)


@pytest.fixture
Expand Down Expand Up @@ -373,11 +378,252 @@ async def test_delete_kg_core(self, knowledge_manager):
mock_respond = AsyncMock()

await knowledge_manager.delete_kg_core(mock_request, mock_respond, "test-user")

# Verify table store was called correctly
knowledge_manager.table_store.delete_kg_core.assert_called_once_with("test-user", "test-doc-id")

# Verify response
mock_respond.assert_called_once()
response = mock_respond.call_args[0][0]
assert response.error is None
assert response.error is None


class TestKnowledgeManagerLibraryDownload:
"""Test get_kg_core streaming of library documents."""

@pytest.fixture
def manager_with_librarian(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
mock_librarian = AsyncMock()
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="test_user",
cassandra_password="test_pass",
keyspace="test_keyspace",
flow_config=mock_flow_config,
librarian=mock_librarian,
)
manager.table_store = AsyncMock()
return manager

@pytest.mark.asyncio
async def test_get_kg_core_streams_library_docs(self, manager_with_librarian):
mock_request = Mock()
mock_request.id = "root-doc"
mock_respond = AsyncMock()

manager_with_librarian.table_store.get_triples = AsyncMock()
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()

root_meta = DocumentMetadata(
id="root-doc", kind="application/pdf", title="Test PDF",
document_type="source",
)
child_meta = DocumentMetadata(
id="chunk-1", kind="text/plain", title="Chunk 1",
parent_id="root-doc", document_type="chunk",
)

manager_with_librarian.librarian.fetch_document_metadata.return_value = root_meta
manager_with_librarian.librarian.request.return_value = LibrarianResponse(
document_metadatas=[child_meta],
)
manager_with_librarian.librarian.fetch_document_content.side_effect = [
b"cm9vdCBjb250ZW50",
b"Y2h1bmsgY29udGVudA==",
]

await manager_with_librarian.get_kg_core(
mock_request, mock_respond, "test-user"
)

responses = [c[0][0] for c in mock_respond.call_args_list]

lm_responses = [r for r in responses if r.library_metadata is not None]
lb_responses = [r for r in responses if r.library_blob is not None]
eos_responses = [r for r in responses if r.eos is True]

assert len(lm_responses) == 2
assert lm_responses[0].library_metadata.id == "root-doc"
assert lm_responses[0].library_metadata.document_type == "source"
assert lm_responses[1].library_metadata.id == "chunk-1"
assert lm_responses[1].library_metadata.parent_id == "root-doc"

assert len(lb_responses) == 2
assert lb_responses[0].library_blob.id == "root-doc"
assert lb_responses[0].library_blob.data == b"cm9vdCBjb250ZW50"
assert lb_responses[1].library_blob.id == "chunk-1"

assert len(eos_responses) == 1

@pytest.mark.asyncio
async def test_get_kg_core_no_librarian_skips_library(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="u", cassandra_password="p",
keyspace="ks", flow_config=mock_flow_config,
)
manager.table_store = AsyncMock()
manager.table_store.get_triples = AsyncMock()
manager.table_store.get_graph_embeddings = AsyncMock()

mock_request = Mock()
mock_request.id = "doc-1"
mock_respond = AsyncMock()

await manager.get_kg_core(mock_request, mock_respond, "w")

responses = [c[0][0] for c in mock_respond.call_args_list]
assert all(r.library_metadata is None for r in responses)
assert all(r.library_blob is None for r in responses)

@pytest.mark.asyncio
async def test_get_kg_core_librarian_metadata_failure_is_graceful(
self, manager_with_librarian,
):
mock_request = Mock()
mock_request.id = "missing-doc"
mock_respond = AsyncMock()

manager_with_librarian.table_store.get_triples = AsyncMock()
manager_with_librarian.table_store.get_graph_embeddings = AsyncMock()
manager_with_librarian.librarian.fetch_document_metadata.side_effect = (
RuntimeError("not found")
)

await manager_with_librarian.get_kg_core(
mock_request, mock_respond, "test-user"
)

responses = [c[0][0] for c in mock_respond.call_args_list]
assert all(r.library_metadata is None for r in responses)
assert any(r.eos for r in responses)


class TestKnowledgeManagerLibraryUpload:
"""Test put_kg_core handling of library metadata and blob records."""

@pytest.fixture
def manager_with_librarian(self, mock_flow_config):
with patch('trustgraph.cores.knowledge.KnowledgeTableStore'):
mock_librarian = AsyncMock()
manager = KnowledgeManager(
cassandra_host=["localhost"],
cassandra_username="u", cassandra_password="p",
keyspace="ks", flow_config=mock_flow_config,
librarian=mock_librarian,
)
manager.table_store = AsyncMock()
return manager

@pytest.mark.asyncio
async def test_put_metadata_then_blob_calls_librarian(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.return_value = LibrarianResponse()

# First call: metadata
req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="doc-1", kind="application/pdf", title="Test",
document_type="source",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")

# Metadata is buffered, librarian not called yet
manager_with_librarian.librarian.request.assert_not_called()

# Second call: blob
req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(
id="doc-1", data=b"dGVzdA==",
)
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")

# Now librarian should have been called with add-document
manager_with_librarian.librarian.request.assert_called_once()
call_args = manager_with_librarian.librarian.request.call_args[0][0]
assert call_args.operation == "add-document"
assert call_args.document_metadata.id == "doc-1"
assert call_args.document_metadata.kind == "application/pdf"
assert call_args.content == b"dGVzdA=="

@pytest.mark.asyncio
async def test_put_child_document_uses_add_child_operation(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.return_value = LibrarianResponse()

req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="chunk-1", kind="text/plain", title="Chunk",
parent_id="doc-1", document_type="chunk",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")

req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="chunk-1", data=b"Y2h1bms=")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")

call_args = manager_with_librarian.librarian.request.call_args[0][0]
assert call_args.operation == "add-child-document"
assert call_args.document_metadata.parent_id == "doc-1"

@pytest.mark.asyncio
async def test_put_blob_without_metadata_logs_warning(
self, manager_with_librarian,
):
mock_respond = AsyncMock()

req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="orphan", data=b"data")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")

# Librarian should not be called for orphan blob
manager_with_librarian.librarian.request.assert_not_called()

@pytest.mark.asyncio
async def test_put_existing_document_is_graceful(
self, manager_with_librarian,
):
mock_respond = AsyncMock()
manager_with_librarian.librarian.request.side_effect = RuntimeError(
"Document already exists"
)

req_meta = Mock()
req_meta.triples = None
req_meta.graph_embeddings = None
req_meta.library_metadata = LibraryMetadata(
id="doc-1", kind="application/pdf", title="Test",
document_type="source",
)
req_meta.library_blob = None
await manager_with_librarian.put_kg_core(req_meta, mock_respond, "ws")

req_blob = Mock()
req_blob.triples = None
req_blob.graph_embeddings = None
req_blob.library_metadata = None
req_blob.library_blob = LibraryBlob(id="doc-1", data=b"data")
await manager_with_librarian.put_kg_core(req_blob, mock_respond, "ws")

# Should not raise — "already exists" is handled gracefully
33 changes: 32 additions & 1 deletion tests/unit/test_tables/test_knowledge_table_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,15 @@ class TestGetTriples:
@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_row_converts_to_triples(self, mock_async_execute_paged):
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri)
# row[3] is a list of (s_val, s_uri, p_val, p_uri, o_val, o_uri, graph)
fake_row = (
None, None, None,
[
(
"http://example.org/alice", True,
"http://example.org/knows", True,
"http://example.org/bob", True,
"urn:graph:source",
),
],
)
Expand Down Expand Up @@ -191,3 +192,33 @@ async def receiver(msg):
assert t.s.iri == "http://example.org/alice"
assert t.p.iri == "http://example.org/knows"
assert t.o.iri == "http://example.org/bob"
assert t.g == "urn:graph:source"

@pytest.mark.asyncio
@patch('trustgraph.tables.knowledge.async_execute_paged', new_callable=AsyncMock)
async def test_empty_graph_name_becomes_none(self, mock_async_execute_paged):
fake_row = (
None, None, None,
[
(
"http://example.org/alice", True,
"http://example.org/knows", True,
"http://example.org/bob", True,
"",
),
],
)

store = _make_store()
store.cassandra = Mock()
store.get_triples_stmt = Mock()
mock_async_execute_paged.return_value = [[fake_row]]

received = []

async def receiver(msg):
received.append(msg)

await store.get_triples("w", "d", receiver)

assert received[0].triples[0].g is None
Loading
Loading