From 166e4af7af6a9c230137c5cbc5ffcc303e3fb673 Mon Sep 17 00:00:00 2001 From: octo-patch Date: Sun, 19 Apr 2026 12:44:04 +0800 Subject: [PATCH 1/2] fix(wren-ai-service): make DDLChunker synchronous to fix Task passed to embedding node When Hamilton's AsyncDriver executes the indexing DAG, it wraps async nodes in asyncio Tasks. Under complex MDL schemas with many relationships, the async chunk node's Task was being passed unawaited to the downstream embedding node instead of the actual dict result, causing the embedder to receive an asyncio Task repr string rather than the document chunks. This makes DDLChunker.run() and its helpers synchronous, matching the pattern used by all other indexing pipelines (historical_question, table_description, project_meta). The async machinery in _model_preprocessor was unnecessary since MODEL_PREPROCESSORS is empty by default and all helper operations are CPU-bound string manipulations. Update tests to call chunker.run() synchronously accordingly. Fixes #2138 --- .../src/pipelines/indexing/db_schema.py | 23 ++++++++---------- .../pipelines/indexing/test_db_schema.py | 24 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/wren-ai-service/src/pipelines/indexing/db_schema.py b/wren-ai-service/src/pipelines/indexing/db_schema.py index 394d087b46..9b06b1a39c 100644 --- a/wren-ai-service/src/pipelines/indexing/db_schema.py +++ b/wren-ai-service/src/pipelines/indexing/db_schema.py @@ -1,4 +1,3 @@ -import asyncio import logging import sys import uuid @@ -29,7 +28,7 @@ @component class DDLChunker: @component.output_types(documents=List[Document]) - async def run( + def run( self, mdl: Dict[str, Any], column_batch_size: int, @@ -48,7 +47,7 @@ def _additional_meta() -> Dict[str, Any]: }, "content": chunk["payload"], } - for chunk in await self._get_ddl_commands( + for chunk in self._get_ddl_commands( **mdl, column_batch_size=column_batch_size ) ] @@ -63,7 +62,7 @@ def _additional_meta() -> Dict[str, Any]: ] } - async def _model_preprocessor( + def _model_preprocessor( self, models: List[Dict[str, Any]], **kwargs ) -> List[Dict[str, Any]]: def _column_preprocessor( @@ -81,9 +80,9 @@ def _column_preprocessor( **addition, } - async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: + def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: addition = { - key: await helper(model, **kwargs) + key: helper(model, **kwargs) for key, helper in helper.MODEL_PREPROCESSORS.items() if helper.condition(model, **kwargs) } @@ -100,11 +99,9 @@ async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: "primaryKey": model.get("primaryKey", ""), } - tasks = [_preprocessor(model, **kwargs) for model in models] - - return await asyncio.gather(*tasks) + return [_preprocessor(model, **kwargs) for model in models] - async def _get_ddl_commands( + def _get_ddl_commands( self, models: List[Dict[str, Any]], relationships: List[Dict[str, Any]], @@ -115,7 +112,7 @@ async def _get_ddl_commands( ) -> List[dict]: return ( self._convert_models_and_relationships( - await self._model_preprocessor(models, **kwargs), + self._model_preprocessor(models, **kwargs), relationships, column_batch_size, ) @@ -300,13 +297,13 @@ def validate_mdl(mdl_str: str, validator: MDLValidator) -> Dict[str, Any]: @observe(capture_input=False) -async def chunk( +def chunk( mdl: Dict[str, Any], chunker: DDLChunker, column_batch_size: int, project_id: Optional[str] = None, ) -> Dict[str, Any]: - return await chunker.run( + return chunker.run( mdl=mdl, column_batch_size=column_batch_size, project_id=project_id, diff --git a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py index 20bd8ac682..7d045e537a 100644 --- a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py +++ b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py @@ -13,7 +13,7 @@ async def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} - document = await chunker.run(mdl, column_batch_size=1) + document = chunker.run(mdl, column_batch_size=1) assert document == {"documents": []} @@ -35,7 +35,7 @@ async def test_single_model(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document: Document = actual["documents"][0] @@ -74,7 +74,7 @@ async def test_multiple_models(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_1: Document = actual["documents"][0] @@ -119,7 +119,7 @@ async def test_column_is_primary_key(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -164,7 +164,7 @@ async def test_column_with_properties(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -221,7 +221,7 @@ async def test_column_with_nested_columns(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -264,7 +264,7 @@ async def test_column_with_calculated_property(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -328,7 +328,7 @@ async def test_column_with_relationship(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 6 document_0: Document = actual["documents"][0] @@ -399,7 +399,7 @@ async def test_column_batch_size(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=2) + actual = chunker.run(mdl, column_batch_size=2) assert len(actual["documents"]) == 3 document_0: Document = actual["documents"][0] @@ -453,7 +453,7 @@ async def test_view(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -483,7 +483,7 @@ async def test_view_with_properties(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -518,7 +518,7 @@ async def test_metric(): } ], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] From 33999d1ab0d591b77dc41ea7d7286fab0d43eda6 Mon Sep 17 00:00:00 2001 From: octo-patch Date: Sun, 19 Apr 2026 13:27:23 +0800 Subject: [PATCH 2/2] test(wren-ai-service): drop async/asyncio from chunker-only tests DDLChunker.run() is now synchronous, so the chunker test cases no longer need pytest.mark.asyncio or async def. Only test_pipeline_run keeps async because it still awaits DBSchema.run. --- .../pipelines/indexing/test_db_schema.py | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py index 7d045e537a..a78d653ee2 100644 --- a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py +++ b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py @@ -8,8 +8,7 @@ from src.pipelines.indexing.db_schema import DBSchema, DDLChunker -@pytest.mark.asyncio -async def test_empty_mdl(): +def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} @@ -17,8 +16,7 @@ async def test_empty_mdl(): assert document == {"documents": []} -@pytest.mark.asyncio -async def test_single_model(): +def test_single_model(): chunker = DDLChunker() mdl = { "models": [ @@ -49,8 +47,7 @@ async def test_single_model(): ) -@pytest.mark.asyncio -async def test_multiple_models(): +def test_multiple_models(): chunker = DDLChunker() mdl = { "models": [ @@ -98,8 +95,7 @@ async def test_multiple_models(): ) -@pytest.mark.asyncio -async def test_column_is_primary_key(): +def test_column_is_primary_key(): chunker = DDLChunker() mdl = { "models": [ @@ -140,8 +136,7 @@ async def test_column_is_primary_key(): ) -@pytest.mark.asyncio -async def test_column_with_properties(): +def test_column_with_properties(): chunker = DDLChunker() mdl = { "models": [ @@ -195,8 +190,7 @@ async def test_column_with_properties(): ) -@pytest.mark.asyncio -async def test_column_with_nested_columns(): +def test_column_with_nested_columns(): chunker = DDLChunker() mdl = { "models": [ @@ -242,8 +236,7 @@ async def test_column_with_nested_columns(): ) -@pytest.mark.asyncio -async def test_column_with_calculated_property(): +def test_column_with_calculated_property(): chunker = DDLChunker() mdl = { "models": [ @@ -285,8 +278,7 @@ async def test_column_with_calculated_property(): ) -@pytest.mark.asyncio -async def test_column_with_relationship(): +def test_column_with_relationship(): chunker = DDLChunker() mdl = { "models": [ @@ -381,8 +373,7 @@ async def test_column_with_relationship(): ) -@pytest.mark.asyncio -async def test_column_batch_size(): +def test_column_batch_size(): chunker = DDLChunker() mdl = { "models": [ @@ -444,8 +435,7 @@ async def test_column_batch_size(): ) -@pytest.mark.asyncio -async def test_view(): +def test_view(): chunker = DDLChunker() mdl = { "models": [], @@ -468,8 +458,7 @@ async def test_view(): ) -@pytest.mark.asyncio -async def test_view_with_properties(): +def test_view_with_properties(): chunker = DDLChunker() mdl = { "models": [], @@ -498,8 +487,7 @@ async def test_view_with_properties(): ) -@pytest.mark.asyncio -async def test_metric(): +def test_metric(): chunker = DDLChunker() mdl = { "models": [],