From 166e4af7af6a9c230137c5cbc5ffcc303e3fb673 Mon Sep 17 00:00:00 2001 From: octo-patch Date: Sun, 19 Apr 2026 12:44:04 +0800 Subject: [PATCH 1/3] fix(wren-ai-service): make DDLChunker synchronous to fix Task passed to embedding node When Hamilton's AsyncDriver executes the indexing DAG, it wraps async nodes in asyncio Tasks. Under complex MDL schemas with many relationships, the async chunk node's Task was being passed unawaited to the downstream embedding node instead of the actual dict result, causing the embedder to receive an asyncio Task repr string rather than the document chunks. This makes DDLChunker.run() and its helpers synchronous, matching the pattern used by all other indexing pipelines (historical_question, table_description, project_meta). The async machinery in _model_preprocessor was unnecessary since MODEL_PREPROCESSORS is empty by default and all helper operations are CPU-bound string manipulations. Update tests to call chunker.run() synchronously accordingly. Fixes #2138 --- .../src/pipelines/indexing/db_schema.py | 23 ++++++++---------- .../pipelines/indexing/test_db_schema.py | 24 +++++++++---------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/wren-ai-service/src/pipelines/indexing/db_schema.py b/wren-ai-service/src/pipelines/indexing/db_schema.py index 394d087b46..9b06b1a39c 100644 --- a/wren-ai-service/src/pipelines/indexing/db_schema.py +++ b/wren-ai-service/src/pipelines/indexing/db_schema.py @@ -1,4 +1,3 @@ -import asyncio import logging import sys import uuid @@ -29,7 +28,7 @@ @component class DDLChunker: @component.output_types(documents=List[Document]) - async def run( + def run( self, mdl: Dict[str, Any], column_batch_size: int, @@ -48,7 +47,7 @@ def _additional_meta() -> Dict[str, Any]: }, "content": chunk["payload"], } - for chunk in await self._get_ddl_commands( + for chunk in self._get_ddl_commands( **mdl, column_batch_size=column_batch_size ) ] @@ -63,7 +62,7 @@ def _additional_meta() -> Dict[str, Any]: ] } - async def _model_preprocessor( + def _model_preprocessor( self, models: List[Dict[str, Any]], **kwargs ) -> List[Dict[str, Any]]: def _column_preprocessor( @@ -81,9 +80,9 @@ def _column_preprocessor( **addition, } - async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: + def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: addition = { - key: await helper(model, **kwargs) + key: helper(model, **kwargs) for key, helper in helper.MODEL_PREPROCESSORS.items() if helper.condition(model, **kwargs) } @@ -100,11 +99,9 @@ async def _preprocessor(model: Dict[str, Any], **kwargs) -> Dict[str, Any]: "primaryKey": model.get("primaryKey", ""), } - tasks = [_preprocessor(model, **kwargs) for model in models] - - return await asyncio.gather(*tasks) + return [_preprocessor(model, **kwargs) for model in models] - async def _get_ddl_commands( + def _get_ddl_commands( self, models: List[Dict[str, Any]], relationships: List[Dict[str, Any]], @@ -115,7 +112,7 @@ async def _get_ddl_commands( ) -> List[dict]: return ( self._convert_models_and_relationships( - await self._model_preprocessor(models, **kwargs), + self._model_preprocessor(models, **kwargs), relationships, column_batch_size, ) @@ -300,13 +297,13 @@ def validate_mdl(mdl_str: str, validator: MDLValidator) -> Dict[str, Any]: @observe(capture_input=False) -async def chunk( +def chunk( mdl: Dict[str, Any], chunker: DDLChunker, column_batch_size: int, project_id: Optional[str] = None, ) -> Dict[str, Any]: - return await chunker.run( + return chunker.run( mdl=mdl, column_batch_size=column_batch_size, project_id=project_id, diff --git a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py index 20bd8ac682..7d045e537a 100644 --- a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py +++ b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py @@ -13,7 +13,7 @@ async def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} - document = await chunker.run(mdl, column_batch_size=1) + document = chunker.run(mdl, column_batch_size=1) assert document == {"documents": []} @@ -35,7 +35,7 @@ async def test_single_model(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document: Document = actual["documents"][0] @@ -74,7 +74,7 @@ async def test_multiple_models(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_1: Document = actual["documents"][0] @@ -119,7 +119,7 @@ async def test_column_is_primary_key(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -164,7 +164,7 @@ async def test_column_with_properties(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -221,7 +221,7 @@ async def test_column_with_nested_columns(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -264,7 +264,7 @@ async def test_column_with_calculated_property(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 2 document_0: Document = actual["documents"][0] @@ -328,7 +328,7 @@ async def test_column_with_relationship(): "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 6 document_0: Document = actual["documents"][0] @@ -399,7 +399,7 @@ async def test_column_batch_size(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=2) + actual = chunker.run(mdl, column_batch_size=2) assert len(actual["documents"]) == 3 document_0: Document = actual["documents"][0] @@ -453,7 +453,7 @@ async def test_view(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -483,7 +483,7 @@ async def test_view_with_properties(): "relationships": [], "metrics": [], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] @@ -518,7 +518,7 @@ async def test_metric(): } ], } - actual = await chunker.run(mdl, column_batch_size=1) + actual = chunker.run(mdl, column_batch_size=1) assert len(actual["documents"]) == 1 document_0: Document = actual["documents"][0] From 33999d1ab0d591b77dc41ea7d7286fab0d43eda6 Mon Sep 17 00:00:00 2001 From: octo-patch Date: Sun, 19 Apr 2026 13:27:23 +0800 Subject: [PATCH 2/3] test(wren-ai-service): drop async/asyncio from chunker-only tests DDLChunker.run() is now synchronous, so the chunker test cases no longer need pytest.mark.asyncio or async def. Only test_pipeline_run keeps async because it still awaits DBSchema.run. --- .../pipelines/indexing/test_db_schema.py | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py index 7d045e537a..a78d653ee2 100644 --- a/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py +++ b/wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py @@ -8,8 +8,7 @@ from src.pipelines.indexing.db_schema import DBSchema, DDLChunker -@pytest.mark.asyncio -async def test_empty_mdl(): +def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} @@ -17,8 +16,7 @@ async def test_empty_mdl(): assert document == {"documents": []} -@pytest.mark.asyncio -async def test_single_model(): +def test_single_model(): chunker = DDLChunker() mdl = { "models": [ @@ -49,8 +47,7 @@ async def test_single_model(): ) -@pytest.mark.asyncio -async def test_multiple_models(): +def test_multiple_models(): chunker = DDLChunker() mdl = { "models": [ @@ -98,8 +95,7 @@ async def test_multiple_models(): ) -@pytest.mark.asyncio -async def test_column_is_primary_key(): +def test_column_is_primary_key(): chunker = DDLChunker() mdl = { "models": [ @@ -140,8 +136,7 @@ async def test_column_is_primary_key(): ) -@pytest.mark.asyncio -async def test_column_with_properties(): +def test_column_with_properties(): chunker = DDLChunker() mdl = { "models": [ @@ -195,8 +190,7 @@ async def test_column_with_properties(): ) -@pytest.mark.asyncio -async def test_column_with_nested_columns(): +def test_column_with_nested_columns(): chunker = DDLChunker() mdl = { "models": [ @@ -242,8 +236,7 @@ async def test_column_with_nested_columns(): ) -@pytest.mark.asyncio -async def test_column_with_calculated_property(): +def test_column_with_calculated_property(): chunker = DDLChunker() mdl = { "models": [ @@ -285,8 +278,7 @@ async def test_column_with_calculated_property(): ) -@pytest.mark.asyncio -async def test_column_with_relationship(): +def test_column_with_relationship(): chunker = DDLChunker() mdl = { "models": [ @@ -381,8 +373,7 @@ async def test_column_with_relationship(): ) -@pytest.mark.asyncio -async def test_column_batch_size(): +def test_column_batch_size(): chunker = DDLChunker() mdl = { "models": [ @@ -444,8 +435,7 @@ async def test_column_batch_size(): ) -@pytest.mark.asyncio -async def test_view(): +def test_view(): chunker = DDLChunker() mdl = { "models": [], @@ -468,8 +458,7 @@ async def test_view(): ) -@pytest.mark.asyncio -async def test_view_with_properties(): +def test_view_with_properties(): chunker = DDLChunker() mdl = { "models": [], @@ -498,8 +487,7 @@ async def test_view_with_properties(): ) -@pytest.mark.asyncio -async def test_metric(): +def test_metric(): chunker = DDLChunker() mdl = { "models": [], From 6f57f005a30350ff9b162b8533190e97d966c32f Mon Sep 17 00:00:00 2001 From: octo-patch Date: Sat, 25 Apr 2026 09:50:23 +0800 Subject: [PATCH 3/3] fix(wren-ai-service): expose batch_size in LitellmEmbedderProvider to cap embedding API calls (fixes #2031) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The document embedder's batch_size (how many texts are sent per embedding API call) was hardcoded to 32 inside AsyncDocumentEmbedder and not reachable from config.yaml. Users with embedding providers that enforce a lower batch limit (e.g. max 10) had no way to reduce it — setting column_indexing_batch_size in the settings section only controls DDL chunking, not the embedding API batch size. Add batch_size as an explicit, named parameter to LitellmEmbedderProvider and forward it to get_document_embedder(). Users can now set it per-model in the embedder section of config.yaml: type: embedder provider: litellm_embedder models: - model: openai/text-embedding-v4 alias: default batch_size: 10 # cap API call size for providers with batch limits Also document the option with a commented example in config.qwen3.yaml. Co-Authored-By: Octopus --- wren-ai-service/docs/config_examples/config.qwen3.yaml | 3 +++ wren-ai-service/src/providers/embedder/litellm.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/wren-ai-service/docs/config_examples/config.qwen3.yaml b/wren-ai-service/docs/config_examples/config.qwen3.yaml index 0ebaed162a..f42530025d 100644 --- a/wren-ai-service/docs/config_examples/config.qwen3.yaml +++ b/wren-ai-service/docs/config_examples/config.qwen3.yaml @@ -66,6 +66,9 @@ models: alias: default api_base: https://api.openai.com/v1 # change this according to your embedding model timeout: 120 + # batch_size controls how many documents are sent per embedding API call (default: 32). + # Lower this value if your embedding provider enforces a maximum batch size. + # batch_size: 32 --- type: engine diff --git a/wren-ai-service/src/providers/embedder/litellm.py b/wren-ai-service/src/providers/embedder/litellm.py index 4d051e3284..aa98509323 100644 --- a/wren-ai-service/src/providers/embedder/litellm.py +++ b/wren-ai-service/src/providers/embedder/litellm.py @@ -172,12 +172,14 @@ def __init__( ] = None, # e.g. EMBEDDER_OPENAI_API_KEY, EMBEDDER_ANTHROPIC_API_KEY, etc. api_base: Optional[str] = None, timeout: float = 120.0, + batch_size: int = 32, # number of documents sent per embedding API call **kwargs, ): self._api_key = os.getenv(api_key_name) if api_key_name else None self._api_base = remove_trailing_slash(api_base) if api_base else None self._embedding_model = model self._timeout = timeout + self._batch_size = batch_size if "provider" in kwargs: del kwargs["provider"] self._kwargs = kwargs @@ -197,5 +199,6 @@ def get_document_embedder(self): api_base_url=self._api_base, model=self._embedding_model, timeout=self._timeout, + batch_size=self._batch_size, **self._kwargs, )