diff --git a/assets/migrate_weaviate_collections.py b/assets/migrate_weaviate_collections.py index f0bb95f62..859abc4e2 100644 --- a/assets/migrate_weaviate_collections.py +++ b/assets/migrate_weaviate_collections.py @@ -3,8 +3,8 @@ This script: - Identifies collections with old schema (no vectorConfig) - Creates new collections with proper vectorConfig including "default" named vector -- Migrates data using cursor-based pagination (efficient for large datasets) -- Uses batch operations for fast inserts +- Migrates data using Weaviate iterator (recommended for reading all objects) +- Uses fixed-size batch operations for reliable inserts - Preserves all object properties and vectors Note: - This is a community-edited version of the draft of the script presented by the Dify Team. @@ -17,24 +17,71 @@ """ import os +import requests import weaviate from weaviate.classes.config import Configure, VectorDistances import sys import time from typing import List, Dict, Any -# Configuration +# ============================================================================= +# Connection Configuration +# ============================================================================= +# This script reads Weaviate connection info from environment variables, +# making it suitable for running inside the Dify Worker container where +# these variables are already set. +# +# If running outside the container (e.g. locally with kubectl port-forward), +# set the environment variables before running: +# +# export WEAVIATE_ENDPOINT="http://localhost:18080" +# export WEAVIATE_GRPC_ENDPOINT="grpc://localhost:50051" +# export WEAVIATE_API_KEY="your-api-key" +# python migrate_weaviate_collections.py +# +# Or override the defaults directly below: +# +# WEAVIATE_ENDPOINT format: http://: (REST API endpoint) +# WEAVIATE_GRPC_ENDPOINT format: grpc://: (gRPC endpoint) +# WEAVIATE_API_KEY: The API key configured in your Weaviate instance +# ============================================================================= WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080") WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051") WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih") BATCH_SIZE = 1000 + +# Derived values — parsed from the endpoints above. +# These are used by the Weaviate Python client (connect_to_local) and REST calls. WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0] WEAVIATE_PORT = int(WEAVIATE_ENDPOINT.split(":")[-1]) WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1]) +def check_properties_need_migration(schema: Dict[str, Any]) -> bool: + """ + Check if collection properties need migration: + - document_id or doc_id has uuid type (should be text) + - chunk_index has moduleConfig (should not have it) + """ + properties = schema.get("properties", []) + for prop in properties: + prop_name = prop.get("name", "") + + # Check if document_id or doc_id is uuid type + if prop_name in ["document_id", "doc_id"]: + if prop.get("dataType") == ["uuid"]: + return True + + # Check if chunk_index has moduleConfig + if prop_name == "chunk_index": + if "moduleConfig" in prop: + return True + + return False + + def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]: - """Identify collections that need migration (those without vectorConfig)""" + """Identify collections that need migration (those without vectorConfig OR with wrong property types)""" collections_to_migrate = [] all_collections = client.collections.list_all() @@ -48,12 +95,29 @@ def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]: collection = client.collections.get(collection_name) config = collection.config.get() - # Check if this collection has the old schema + # Check if this collection has the old schema (no vectorConfig) if config.vector_config is None: collections_to_migrate.append(collection_name) - print(f" - {collection_name}: OLD SCHEMA (needs migration)") - else: - print(f" - {collection_name}: NEW SCHEMA (skip)") + print(f" - {collection_name}: OLD SCHEMA - no vectorConfig (needs migration)") + continue + + # Also check if properties need migration (uuid -> text conversion) + try: + response = requests.get( + f"{WEAVIATE_ENDPOINT}/v1/schema/{collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, + ) + + if response.status_code == 200: + schema = response.json() + if check_properties_need_migration(schema): + collections_to_migrate.append(collection_name) + print(f" - {collection_name}: PROPERTY TYPE MISMATCH (needs migration)") + continue + except Exception as e: + print(f" - {collection_name}: Error checking schema: {e}") + + print(f" - {collection_name}: OK (skip)") return collections_to_migrate @@ -62,8 +126,6 @@ def get_collection_schema( client: weaviate.WeaviateClient, collection_name: str ) -> Dict[str, Any]: """Get the full schema of a collection via REST API""" - import requests - response = requests.get( f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, @@ -75,12 +137,58 @@ def get_collection_schema( raise Exception(f"Failed to get schema: {response.text}") +def transform_properties(properties: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Transform properties to match the new schema format: + - document_id: uuid -> text, indexFilterable=true, indexSearchable=true, tokenization="word" + - doc_id: uuid -> text, indexFilterable=true, indexSearchable=true, tokenization="word" + - chunk_index: remove moduleConfig, ensure indexFilterable=true, indexSearchable=false + """ + transformed = [] + + for prop in properties: + new_prop = prop.copy() + prop_name = prop.get("name", "") + + # Convert document_id and doc_id from uuid to text + if prop_name in ["document_id", "doc_id"]: + if prop.get("dataType") == ["uuid"]: + print(f" Converting {prop_name}: uuid -> text (with text search enabled)") + new_prop["dataType"] = ["text"] + new_prop["indexFilterable"] = True + new_prop["indexRangeFilters"] = False + new_prop["indexSearchable"] = True + new_prop["tokenization"] = "word" + + # Remove auto-schema description + if "description" in new_prop: + del new_prop["description"] + + # Fix chunk_index: remove moduleConfig, ensure proper index settings + if prop_name == "chunk_index": + if "moduleConfig" in new_prop: + print(f" Removing moduleConfig from {prop_name}") + del new_prop["moduleConfig"] + + # Ensure correct index settings + new_prop["indexFilterable"] = True + new_prop["indexRangeFilters"] = False + new_prop["indexSearchable"] = False + + # Remove moduleConfig from any other properties if present + elif "moduleConfig" in new_prop: + print(f" Removing moduleConfig from {prop_name}") + del new_prop["moduleConfig"] + + transformed.append(new_prop) + + return transformed + + def create_new_collection( client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any] ) -> str: """Create a new collection with updated schema using REST API""" - import requests - # Generate new collection name new_name = f"{old_name}_migrated" @@ -107,9 +215,10 @@ def create_new_collection( "properties": [], } - # Copy properties from old schema + # Copy and transform properties from old schema if "properties" in schema: - new_schema["properties"] = schema["properties"] + print(" Transforming properties...") + new_schema["properties"] = transform_properties(schema["properties"]) # Create collection via REST API response = requests.post( @@ -125,64 +234,60 @@ def create_new_collection( return new_name +def transform_property_values(properties: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform property values during migration: + - Convert UUID objects to strings for document_id and doc_id + """ + from uuid import UUID + + transformed = {} + + for key, value in properties.items(): + # Convert UUID to string for document_id and doc_id + if key in ["document_id", "doc_id"] and value is not None: + if isinstance(value, UUID): + transformed[key] = str(value) + else: + transformed[key] = str(value) if value else value + else: + transformed[key] = value + + return transformed + + def migrate_collection_data( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ) -> int: - """Migrate data from old collection to new collection using cursor-based pagination""" - + """Migrate data from old collection to new collection using iterator + fixed-size batch""" old_collection = client.collections.get(old_collection_name) new_collection = client.collections.get(new_collection_name) total_migrated = 0 - cursor = None print(f"Migrating data from {old_collection_name} to {new_collection_name}") - while True: - # Fetch batch of objects using cursor-based pagination - if cursor is None: - # First batch - response = old_collection.query.fetch_objects( - limit=BATCH_SIZE, include_vector=True + # Use fixed-size batch to avoid overwhelming the server during migration + with new_collection.batch.fixed_size(batch_size=BATCH_SIZE) as batch: + # Use iterator (recommended by Weaviate) instead of manual cursor pagination + for obj in old_collection.iterator(include_vector=True): + # Prepare and transform properties (uuid -> text conversion) + properties = transform_property_values(obj.properties) + + # Add object with vector + batch.add_object( + properties=properties, + vector=( + obj.vector["default"] + if isinstance(obj.vector, dict) + else obj.vector + ), + uuid=obj.uuid, ) - else: - # Subsequent batches using cursor - response = old_collection.query.fetch_objects( - limit=BATCH_SIZE, include_vector=True, after=cursor - ) - - objects = response.objects - - if not objects: - break - # Use batch insert for efficiency - with new_collection.batch.dynamic() as batch: - for obj in objects: - # Prepare properties - properties = obj.properties - - # Add object with vector - batch.add_object( - properties=properties, - vector=( - obj.vector["default"] - if isinstance(obj.vector, dict) - else obj.vector - ), - uuid=obj.uuid, - ) - - total_migrated += len(objects) - print(f" Migrated {total_migrated} objects...") - - # Update cursor for next iteration - if len(objects) < BATCH_SIZE: - # Last batch - break - else: - # Get the last object's UUID for cursor - cursor = objects[-1].uuid + total_migrated += 1 + if total_migrated % BATCH_SIZE == 0: + print(f" Migrated {total_migrated} objects...") print(f" Total migrated: {total_migrated} objects") return total_migrated @@ -222,13 +327,31 @@ def verify_migration( def replace_old_collection( client: weaviate.WeaviateClient, old_collection_name: str, new_collection_name: str ): - """Replace old collection with migrated one by recreating with original name""" - import requests + """ + Replace old collection with migrated one by recreating with original name. + Safety: The old collection is only deleted AFTER the new one is fully created, + populated, and verified. If any step fails, both collections are preserved so + no data is lost. The user can re-run the script or recover manually. + """ print(f"\nReplacing old collection with migrated data...") - # Step 1: Delete old collection - print(f" Step 1: Deleting old collection...") + # Step 1: Get schema from migrated collection + print(f" Step 1: Getting schema from migrated collection...") + schema_response = requests.get( + f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", + headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, + ) + if schema_response.status_code != 200: + raise Exception( + f"Failed to get migrated collection schema: {schema_response.text}" + ) + schema = schema_response.json() + + # Step 2: Delete old collection to free the name + # This is required because Weaviate does not support rename. + # The migrated collection still holds a full copy of the data. + print(f" Step 2: Deleting old collection (migrated copy is safe)...") response = requests.delete( f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, @@ -238,78 +361,80 @@ def replace_old_collection( else: print(f" Deleted") - # Step 2: Get schema from migrated collection - print(f" Step 2: Getting schema from migrated collection...") - schema_response = requests.get( - f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", - headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, - ) - schema = schema_response.json() - schema["class"] = old_collection_name - # Step 3: Create collection with original name and new schema print(f" Step 3: Creating collection with original name...") + schema["class"] = old_collection_name create_response = requests.post( f"{WEAVIATE_ENDPOINT}/v1/schema", json=schema, headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, ) if create_response.status_code not in [200, 201]: + print(f" FAILED to create collection: {create_response.text}") + print(f" DATA IS SAFE in: {new_collection_name}") + print(f" You can retry or recover manually.") raise Exception(f"Failed to create collection: {create_response.text}") print(f" Created") - # Step 4: Copy data to collection with original name using cursor-based pagination + # Step 4: Copy data from migrated collection to the newly created one print(f" Step 4: Copying data to original collection name...") migrated_collection = client.collections.get(new_collection_name) new_collection = client.collections.get(old_collection_name) total_copied = 0 - cursor = None - - while True: - # Fetch batch of objects using cursor-based pagination - if cursor is None: - # First batch - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE - ) - else: - # Subsequent batches using cursor - response = migrated_collection.query.fetch_objects( - include_vector=True, limit=BATCH_SIZE, after=cursor - ) - objects = response.objects - - if not objects: - break - - # Use batch insert for efficiency - with new_collection.batch.dynamic() as batch: - for obj in objects: + try: + # Use fixed-size batch to avoid overwhelming the server during migration + with new_collection.batch.fixed_size(batch_size=BATCH_SIZE) as batch: + # Use iterator (recommended by Weaviate) instead of manual cursor pagination + for obj in migrated_collection.iterator(include_vector=True): batch.add_object( properties=obj.properties, vector=obj.vector, uuid=obj.uuid ) - total_copied += len(objects) - print(f" Copied {total_copied} objects...") - - # Update cursor for next iteration - if len(objects) < BATCH_SIZE: - break - else: - cursor = objects[-1].uuid + total_copied += 1 + if total_copied % BATCH_SIZE == 0: + print(f" Copied {total_copied} objects...") + except Exception as e: + print(f" COPY INTERRUPTED after {total_copied} objects: {e}") + print(f" DATA IS SAFE in: {new_collection_name}") + print(f" You can re-run this script to retry.") + raise print(f" Total copied: {total_copied} objects") - # Step 5: Delete the temporary migrated collection - print(f" Step 5: Cleaning up temporary migrated collection...") + # Step 5: Verify copy before cleaning up + print(f" Step 5: Verifying copy...") + migrated_agg = migrated_collection.aggregate.over_all(total_count=True) + new_agg = new_collection.aggregate.over_all(total_count=True) + + if migrated_agg.total_count != new_agg.total_count: + print( + f" WARNING: Count mismatch! " + f"Migrated: {migrated_agg.total_count}, New: {new_agg.total_count}" + ) + print(f" Keeping {new_collection_name} as backup for safety.") + print( + f"\n PARTIAL SUCCESS: {old_collection_name} created with {new_agg.total_count} objects, " + f"but {new_collection_name} retained due to count mismatch." + ) + return False + + print(f" Verified: {new_agg.total_count} objects match.") + + # Step 6: Only now delete the migrated collection — everything is confirmed safe + print(f" Step 6: Cleaning up temporary migrated collection...") response = requests.delete( f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}", headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}, ) if response.status_code == 200: print(f" Cleaned up") + else: + print( + f" Warning: Could not delete {new_collection_name}: {response.text}" + ) + print(f" You can delete it manually later.") print( f"\n SUCCESS! {old_collection_name} now has the new schema with {total_copied} objects"