From 85b0cf7b0a0747790d30c1776760df0dd431f8cd Mon Sep 17 00:00:00 2001 From: sorphwer Date: Sun, 1 Mar 2026 11:46:52 +0800 Subject: [PATCH 1/5] Add property type migration to Weaviate migration script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Handle uuid→text conversion for document_id/doc_id and remove spurious moduleConfig from chunk_index during schema migration. This fixes property type incompatibilities that could cause issues even when vectorConfig is already correct. Fixes the following failure scenarios in the old script: 1. Schema type mismatch: Old script copies properties as-is, preserving uuid type for document_id/doc_id. Dify expects text type, so the migrated collection appears successful but Dify fails at runtime. 2. UUID object insertion failure: When source collection has uuid-typed fields, the Weaviate client returns Python UUID objects. Writing these into text-typed fields causes batch insert errors, leading to data loss or migration abort. 3. moduleConfig rejection: Stale moduleConfig on chunk_index from older Weaviate versions can cause collection creation to fail on newer Weaviate, aborting migration entirely. 4. Partial migration blindspot: Collections already migrated for vectorConfig but still carrying wrong property types were skipped with "NEW SCHEMA (skip)", leaving silent incompatibilities. Co-Authored-By: Claude Opus 4.6 --- assets/migrate_weaviate_collections.py | 137 ++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 16 deletions(-) diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index f0bb95f62..fba52218c 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -17,6 +17,7 @@ """ import os +import requests import weaviate from weaviate.classes.config import Configure, VectorDistances import sys @@ -33,8 +34,31 @@ WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1]) +def check_properties_need_migration(schema: Dict[str, Any]) -> bool: + """ + Check if collection properties need migration: + - document_id or doc_id has uuid type (should be text) + - chunk_index has moduleConfig (should not have it) + """ + properties = schema.get("properties", []) + for prop in properties: + prop_name = prop.get("name", "") + + # Check if document_id or doc_id is uuid type + if prop_name in ["document_id", "doc_id"]: + if prop.get("dataType") == ["uuid"]: + return True + + # Check if chunk_index has moduleConfig + if prop_name == "chunk_index": + if "moduleConfig" in prop: + return True + + return False + + def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]: - """Identify collections that need migration (those without vectorConfig)""" + """Identify collections that need migration (those without vectorConfig OR with wrong property types)""" collections_to_migrate = [] all_collections = client.collections.list_all() @@ -48,12 +72,29 @@ def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]: collection = client.collections.get(collection_name) config = collection.config.get() - # Check if this collection has the old schema + # Check if this collection has the old schema (no vectorConfig) if config.vector_config is None: collections_to_migrate.append(collection_name) - print(f" - {collection_name}: OLD SCHEMA (needs migration)") - else: - print(f" - {collection_name}: NEW SCHEMA (skip)") + print(f" - {collection_name}: OLD SCHEMA - no vectorConfig (needs migration)") + continue + + # Also check if properties need migration (uuid -> text conversion) + try: + response = requests.get( + f"{WEAVIATE_ENDPOINT}/v1/schema/{collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, + ) + + if response.status_code == 200: + schema = response.json() + if check_properties_need_migration(schema): + collections_to_migrate.append(collection_name) + print(f" - {collection_name}: PROPERTY TYPE MISMATCH (needs migration)") + continue + except Exception as e: + print(f" - {collection_name}: Error checking schema: {e}") + + print(f" - {collection_name}: OK (skip)") return collections_to_migrate @@ -62,8 +103,6 @@ def get_collection_schema( client: weaviate.WeaviateClient, collection_name: str ) -> Dict[str, Any]: """Get the full schema of a collection via REST API""" - import requests - response = requests.get( f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, @@ -75,12 +114,58 @@ def get_collection_schema( raise Exception(f"Failed to get schema: {response.text}") +def transform_properties(properties: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Transform properties to match the new schema format: + - document_id: uuid -> text, indexFilterable=true, indexSearchable=true, tokenization="word" + - doc_id: uuid -> text, indexFilterable=true, indexSearchable=true, tokenization="word" + - chunk_index: remove moduleConfig, ensure indexFilterable=true, indexSearchable=false + """ + transformed = [] + + for prop in properties: + new_prop = prop.copy() + prop_name = prop.get("name", "") + + # Convert document_id and doc_id from uuid to text + if prop_name in ["document_id", "doc_id"]: + if prop.get("dataType") == ["uuid"]: + print(f" Converting {prop_name}: uuid -> text (with text search enabled)") + new_prop["dataType"] = ["text"] + new_prop["indexFilterable"] = True + new_prop["indexRangeFilters"] = False + new_prop["indexSearchable"] = True + new_prop["tokenization"] = "word" + + # Remove auto-schema description + if "description" in new_prop: + del new_prop["description"] + + # Fix chunk_index: remove moduleConfig, ensure proper index settings + if prop_name == "chunk_index": + if "moduleConfig" in new_prop: + print(f" Removing moduleConfig from {prop_name}") + del new_prop["moduleConfig"] + + # Ensure correct index settings + new_prop["indexFilterable"] = True + new_prop["indexRangeFilters"] = False + new_prop["indexSearchable"] = False + + # Remove moduleConfig from any other properties if present + elif "moduleConfig" in new_prop: + print(f" Removing moduleConfig from {prop_name}") + del new_prop["moduleConfig"] + + transformed.append(new_prop) + + return transformed + + def create_new_collection( client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any] ) -> str: """Create a new collection with updated schema using REST API""" - import requests - # Generate new collection name new_name = f"{old_name}_migrated" @@ -107,9 +192,10 @@ def create_new_collection( "properties": [], } - # Copy properties from old schema + # Copy and transform properties from old schema if "properties" in schema: - new_schema["properties"] = schema["properties"] + print(" Transforming properties...") + new_schema["properties"] = transform_properties(schema["properties"]) # Create collection via REST API response = requests.post( @@ -125,11 +211,32 @@ def create_new_collection( return new_name +def transform_property_values(properties: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform property values during migration: + - Convert UUID objects to strings for document_id and doc_id + """ + from uuid import UUID + + transformed = {} + + for key, value in properties.items(): + # Convert UUID to string for document_id and doc_id + if key in ["document_id", "doc_id"] and value is not None: + if isinstance(value, UUID): + transformed[key] = str(value) + else: + transformed[key] = str(value) if value else value + else: + transformed[key] = value + + return transformed + + def migrate_collection_data( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ) -> int: """Migrate data from old collection to new collection using cursor-based pagination""" - old_collection = client.collections.get(old_collection_name) new_collection = client.collections.get(new_collection_name) @@ -159,8 +266,8 @@ def migrate_collection_data( # Use batch insert for efficiency with new_collection.batch.dynamic() as batch: for obj in objects: - # Prepare properties - properties = obj.properties + # Prepare and transform properties (uuid -> text conversion) + properties = transform_property_values(obj.properties) # Add object with vector batch.add_object( @@ -223,8 +330,6 @@ def replace_old_collection( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ): """Replace old collection with migrated one by recreating with original name""" - import requests - print(f"\nReplacing old collection with migrated data...") # Step 1: Delete old collection From ca8b9c4ec538688b4cd9df061c648fcbecf85721 Mon Sep 17 00:00:00 2001 From: sorphwer Date: Sun, 1 Mar 2026 11:57:21 +0800 Subject: [PATCH 2/5] Add connection configuration comments with usage examples Document how to configure Weaviate connection for both in-container and local (port-forward) scenarios, and clarify derived values. Co-Authored-By: Claude Opus 4.6 --- assets/migrate_weaviate_collections.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index fba52218c..b0d5312e5 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -24,11 +24,35 @@ import time from typing import List, Dict, Any -# Configuration +# ============================================================================= +# Connection Configuration +# ============================================================================= +# This script reads Weaviate connection info from environment variables, +# making it suitable for running inside the Dify Worker container where +# these variables are already set. +# +# If running outside the container (e.g. locally with kubectl port-forward), +# set the environment variables before running: +# +# export WEAVIATE_ENDPOINT="http://localhost:18080" +# export WEAVIATE_GRPC_ENDPOINT="grpc://localhost:50051" +# export WEAVIATE_API_KEY="your-api-key" +# python migrate_weaviate_collections.py +# +# Or override the defaults directly below: +# +# WEAVIATE_ENDPOINT format: http://: (REST API endpoint) +# WEAVIATE_GRPC_ENDPOINT format: grpc://: (gRPC endpoint) +# WEAVIATE_API_KEY: The API key configured in your Weaviate instance +# BATCH_SIZE: Number of objects per batch during migration (default: 1000) +# ============================================================================= WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080") WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051") WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih") BATCH_SIZE = 1000 + +# Derived values — parsed from the endpoints above. +# These are used by the Weaviate Python client (connect_to_local) and REST calls. WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0] WEAVIATE_PORT = int(WEAVIATE_ENDPOINT.split(":")[-1]) WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1]) From a0824d2df09428fbb8bb918dcc33d749a2d521ae Mon Sep 17 00:00:00 2001 From: sorphwer Date: Sun, 1 Mar 2026 13:48:54 +0800 Subject: [PATCH 3/5] Fix data loss risk: verify copy before deleting migrated collection Reorder replace_old_collection to prevent data loss on failure: - Fetch schema BEFORE deleting anything - Wrap data copy in try/except to preserve migrated collection on error - Add count verification after copy, keep migrated as backup on mismatch - Only delete the migrated collection after full verification passes - Print recovery instructions (collection name) on every failure path Co-Authored-By: Claude Opus 4.6 --- assets/migrate_weaviate_collections.py | 129 ++++++++++++++++--------- 1 file changed, 86 insertions(+), 43 deletions(-) diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index b0d5312e5..5abe709bd 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -353,11 +353,31 @@ def verify_migration( def replace_old_collection( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ): - """Replace old collection with migrated one by recreating with original name""" + """ + Replace old collection with migrated one by recreating with original name. + + Safety: The old collection is only deleted AFTER the new one is fully created, + populated, and verified. If any step fails, both collections are preserved so + no data is lost. The user can re-run the script or recover manually. + """ print(f"\nReplacing old collection with migrated data...") - # Step 1: Delete old collection - print(f" Step 1: Deleting old collection...") + # Step 1: Get schema from migrated collection + print(f" Step 1: Getting schema from migrated collection...") + schema_response = requests.get( + f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, + ) + if schema_response.status_code != 200: + raise Exception( + f"Failed to get migrated collection schema: {schema_response.text}" + ) + schema = schema_response.json() + + # Step 2: Delete old collection to free the name + # This is required because Weaviate does not support rename. + # The migrated collection still holds a full copy of the data. + print(f" Step 2: Deleting old collection (migrated copy is safe)...") response = requests.delete( f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, @@ -367,27 +387,22 @@ def replace_old_collection( else: print(f" Deleted") - # Step 2: Get schema from migrated collection - print(f" Step 2: Getting schema from migrated collection...") - schema_response = requests.get( - f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", - headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, - ) - schema = schema_response.json() - schema["class"] = old_collection_name - # Step 3: Create collection with original name and new schema print(f" Step 3: Creating collection with original name...") + schema["class"] = old_collection_name create_response = requests.post( f"{WEAVIATE_ENDPOINT}/v1/schema", json=schema, headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, ) if create_response.status_code not in [200, 201]: + print(f" FAILED to create collection: {create_response.text}") + print(f" DATA IS SAFE in: {new_collection_name}") + print(f" You can retry or recover manually.") raise Exception(f"Failed to create collection: {create_response.text}") print(f" Created") - # Step 4: Copy data to collection with original name using cursor-based pagination + # Step 4: Copy data from migrated collection to the newly created one print(f" Step 4: Copying data to original collection name...") migrated_collection = client.collections.get(new_collection_name) new_collection = client.collections.get(old_collection_name) @@ -395,50 +410,78 @@ def replace_old_collection( total_copied = 0 cursor = None - while True: - # Fetch batch of objects using cursor-based pagination - if cursor is None: - # First batch - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE - ) - else: - # Subsequent batches using cursor - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE, after=cursor - ) + try: + while True: + # Fetch batch of objects using cursor-based pagination + if cursor is None: + response = migrated_collection.query.fetch_objects( + include_vector=True, limit=BATCH_SIZE + ) + else: + response = migrated_collection.query.fetch_objects( + include_vector=True, limit=BATCH_SIZE, after=cursor + ) - objects = response.objects + objects = response.objects - if not objects: - break + if not objects: + break - # Use batch insert for efficiency - with new_collection.batch.dynamic() as batch: - for obj in objects: - batch.add_object( - properties=obj.properties, vector=obj.vector, uuid=obj.uuid - ) + # Use batch insert for efficiency + with new_collection.batch.dynamic() as batch: + for obj in objects: + batch.add_object( + properties=obj.properties, vector=obj.vector, uuid=obj.uuid + ) - total_copied += len(objects) - print(f" Copied {total_copied} objects...") + total_copied += len(objects) + print(f" Copied {total_copied} objects...") - # Update cursor for next iteration - if len(objects) < BATCH_SIZE: - break - else: - cursor = objects[-1].uuid + # Update cursor for next iteration + if len(objects) < BATCH_SIZE: + break + else: + cursor = objects[-1].uuid + except Exception as e: + print(f" COPY INTERRUPTED after {total_copied} objects: {e}") + print(f" DATA IS SAFE in: {new_collection_name}") + print(f" You can re-run this script to retry.") + raise print(f" Total copied: {total_copied} objects") - # Step 5: Delete the temporary migrated collection - print(f" Step 5: Cleaning up temporary migrated collection...") + # Step 5: Verify copy before cleaning up + print(f" Step 5: Verifying copy...") + migrated_agg = migrated_collection.aggregate.over_all(total_count=True) + new_agg = new_collection.aggregate.over_all(total_count=True) + + if migrated_agg.total_count != new_agg.total_count: + print( + f" WARNING: Count mismatch! " + f"Migrated: {migrated_agg.total_count}, New: {new_agg.total_count}" + ) + print(f" Keeping {new_collection_name} as backup for safety.") + print( + f"\n PARTIAL SUCCESS: {old_collection_name} created with {new_agg.total_count} objects, " + f"but {new_collection_name} retained due to count mismatch." + ) + return False + + print(f" Verified: {new_agg.total_count} objects match.") + + # Step 6: Only now delete the migrated collection — everything is confirmed safe + print(f" Step 6: Cleaning up temporary migrated collection...") response = requests.delete( f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, ) if response.status_code == 200: print(f" Cleaned up") + else: + print( + f" Warning: Could not delete {new_collection_name}: {response.text}" + ) + print(f" You can delete it manually later.") print( f"\n SUCCESS! {old_collection_name} now has the new schema with {total_copied} objects" From fb07aa1397f04704b13007834b8a9253f2e95c30 Mon Sep 17 00:00:00 2001 From: sorphwer Date: Fri, 27 Mar 2026 16:27:54 +0800 Subject: [PATCH 4/5] refactor: use Weaviate iterator and fixed-size batch per official recommendation Replace manual cursor-based fetch_objects pagination with collection.iterator() and switch batch.dynamic() to batch.fixed_size() to prevent overwhelming the server during co-located migrations. Addresses review feedback from Weaviate team. --- assets/migrate_weaviate_collections.py | 108 +++++++------------------ 1 file changed, 31 insertions(+), 77 deletions(-) diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index 5abe709bd..ed245b2bc 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -3,8 +3,8 @@ This script: - Identifies collections with old schema (no vectorConfig) - Creates new collections with proper vectorConfig including "default" named vector -- Migrates data using cursor-based pagination (efficient for large datasets) -- Uses batch operations for fast inserts +- Migrates data using Weaviate iterator (recommended for reading all objects) +- Uses fixed-size batch operations for reliable inserts - Preserves all object properties and vectors Note: - This is a community-edited version of the draft of the script presented by the Dify Team. @@ -260,60 +260,35 @@ def transform_property_values(properties: Dict[str, Any]) -> Dict[str, Any]: def migrate_collection_data( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ) -> int: - """Migrate data from old collection to new collection using cursor-based pagination""" + """Migrate data from old collection to new collection using iterator + fixed-size batch""" old_collection = client.collections.get(old_collection_name) new_collection = client.collections.get(new_collection_name) total_migrated = 0 - cursor = None print(f"Migrating data from {old_collection_name} to {new_collection_name}") - while True: - # Fetch batch of objects using cursor-based pagination - if cursor is None: - # First batch - response = old_collection.query.fetch_objects( - limit=BATCH_SIZE, include_vector=True + # Use fixed-size batch to avoid overwhelming the server during migration + with new_collection.batch.fixed_size(batch_size=BATCH_SIZE) as batch: + # Use iterator (recommended by Weaviate) instead of manual cursor pagination + for obj in old_collection.iterator(include_vector=True): + # Prepare and transform properties (uuid -> text conversion) + properties = transform_property_values(obj.properties) + + # Add object with vector + batch.add_object( + properties=properties, + vector=( + obj.vector["default"] + if isinstance(obj.vector, dict) + else obj.vector + ), + uuid=obj.uuid, ) - else: - # Subsequent batches using cursor - response = old_collection.query.fetch_objects( - limit=BATCH_SIZE, include_vector=True, after=cursor - ) - - objects = response.objects - - if not objects: - break - - # Use batch insert for efficiency - with new_collection.batch.dynamic() as batch: - for obj in objects: - # Prepare and transform properties (uuid -> text conversion) - properties = transform_property_values(obj.properties) - - # Add object with vector - batch.add_object( - properties=properties, - vector=( - obj.vector["default"] - if isinstance(obj.vector, dict) - else obj.vector - ), - uuid=obj.uuid, - ) - - total_migrated += len(objects) - print(f" Migrated {total_migrated} objects...") - # Update cursor for next iteration - if len(objects) < BATCH_SIZE: - # Last batch - break - else: - # Get the last object's UUID for cursor - cursor = objects[-1].uuid + total_migrated += 1 + if total_migrated % BATCH_SIZE == 0: + print(f" Migrated {total_migrated} objects...") print(f" Total migrated: {total_migrated} objects") return total_migrated @@ -408,40 +383,19 @@ def replace_old_collection( new_collection = client.collections.get(old_collection_name) total_copied = 0 - cursor = None try: - while True: - # Fetch batch of objects using cursor-based pagination - if cursor is None: - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE - ) - else: - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE, after=cursor + # Use fixed-size batch to avoid overwhelming the server during migration + with new_collection.batch.fixed_size(batch_size=BATCH_SIZE) as batch: + # Use iterator (recommended by Weaviate) instead of manual cursor pagination + for obj in migrated_collection.iterator(include_vector=True): + batch.add_object( + properties=obj.properties, vector=obj.vector, uuid=obj.uuid ) - objects = response.objects - - if not objects: - break - - # Use batch insert for efficiency - with new_collection.batch.dynamic() as batch: - for obj in objects: - batch.add_object( - properties=obj.properties, vector=obj.vector, uuid=obj.uuid - ) - - total_copied += len(objects) - print(f" Copied {total_copied} objects...") - - # Update cursor for next iteration - if len(objects) < BATCH_SIZE: - break - else: - cursor = objects[-1].uuid + total_copied += 1 + if total_copied % BATCH_SIZE == 0: + print(f" Copied {total_copied} objects...") except Exception as e: print(f" COPY INTERRUPTED after {total_copied} objects: {e}") print(f" DATA IS SAFE in: {new_collection_name}") From 74362aae6255c2983e9e1ae898040e2ae9d10b18 Mon Sep 17 00:00:00 2001 From: sorphwer Date: Fri, 27 Mar 2026 17:01:06 +0800 Subject: [PATCH 5/5] fix: remove misleading BATCH_SIZE env var comment (hardcoded, not configurable) --- assets/migrate_weaviate_collections.py | 1 - 1 file changed, 1 deletion(-) diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index ed245b2bc..859abc4e2 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -44,7 +44,6 @@ # WEAVIATE_ENDPOINT format: http://: (REST API endpoint) # WEAVIATE_GRPC_ENDPOINT format: grpc://: (gRPC endpoint) # WEAVIATE_API_KEY: The API key configured in your Weaviate instance -# BATCH_SIZE: Number of objects per batch during migration (default: 1000) # ============================================================================= WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080") WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051")