diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index efb780f6fc..36c196ab19 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -82,14 +82,39 @@ jobs: - name: Install Databricks CLI uses: databricks/setup-cli@8db12393ac48926ab0074f3928f36c7abc441f90 # v0.292.0 + # `databricks labs install lsql` fetches the release list and source zipball from + # api.github.com *anonymously* (the CLI never reads GITHUB_TOKEN), so it routinely + # hits GitHub's 60 req/hr unauthenticated rate limit on shared runner IPs and fails + # with "403 Forbidden". To avoid that, pre-fetch everything with authenticated `gh` + # (5000 req/hr), seed the CLI's local cache + lib dir, then install with --offline so + # the CLI makes no GitHub API calls at all. - name: Install LSQL - run: | - databricks labs install lsql --log-level=trace - databricks labs installed - env: # this is a temporary hack + env: DATABRICKS_HOST: any DATABRICKS_TOKEN: any - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + set -euo pipefail + labs_dir="$HOME/.databricks/labs/lsql" + mkdir -p "$labs_dir/cache" "$labs_dir/lib" + + # Seed the version cache the CLI reads, so `install --offline` resolves "latest" + # without calling api.github.com. The cache format is {refreshed_at, data:[releases]}. + releases="$(gh api repos/databrickslabs/lsql/releases)" + latest_tag="$(printf '%s' "$releases" | jq -r '.[0].tag_name')" + printf '%s' "$releases" \ + | jq '{refreshed_at: (now | todate), data: .}' \ + > "$labs_dir/cache/databrickslabs-lsql-releases.json" + + # Download and unpack the release source into the lib dir (what the CLI would + # otherwise pull from the GitHub zipball endpoint). Strip the single top-level dir. + tmp="$(mktemp -d)" + gh release download "$latest_tag" --repo databrickslabs/lsql --archive=zip --output "$tmp/lsql.zip" + unzip -q "$tmp/lsql.zip" -d "$tmp/unpacked" + cp -R "$(find "$tmp/unpacked" -mindepth 1 -maxdepth 1 -type d)"/. "$labs_dir/lib/" + + databricks labs install lsql --offline --log-level=trace + databricks labs installed - name: Reformat SQL queries run: databricks labs lsql fmt --normalize-case false --exclude tests/unit/source_code/samples/ diff --git a/docs/ucx/docs/reference/commands/index.mdx b/docs/ucx/docs/reference/commands/index.mdx index c00b7f2766..65c92da43b 100644 --- a/docs/ucx/docs/reference/commands/index.mdx +++ b/docs/ucx/docs/reference/commands/index.mdx @@ -836,6 +836,7 @@ access the configuration file from the command line. Here's the description of c * `database_to_catalog_mapping`: An optional dictionary mapping source database names to target catalog names. * `default_catalog`: An optional string representing the default catalog name. * `skip_tacl_migration`: Optional flag, allow skipping TACL migration when migrating tables or creating catalogs and schemas. + * `enable_uniform_iceberg`: Optional experimental flag (default `false`), enables Delta UniForm (IcebergCompatV2) on migrated Delta tables for Iceberg interoperability. **Warning:** enabling this will disable Deletion Vectors and run `REORG TABLE APPLY (PURGE)` on each migrated table as a prerequisite for UniForm. See [Databricks documentation](https://docs.databricks.com/aws/en/delta/uniform#enable-iceberg-reads-on-an-existing-table). * `default_owner_group`: Assigns this group to all migrated objects (catalogs, databases, tables, views, etc.). The group has to be an account group and the user running the migration has to be a member of this group. * `log_level`: An optional string representing the log level. * `workspace_start_path`: A string representing the starting path for notebooks and directories crawler in the workspace. diff --git a/src/databricks/labs/ucx/config.py b/src/databricks/labs/ucx/config.py index 0a852282ea..9130afefc6 100644 --- a/src/databricks/labs/ucx/config.py +++ b/src/databricks/labs/ucx/config.py @@ -93,6 +93,9 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes # Skip TACL migration during table migration skip_tacl_migration: bool = False + # [EXPERIMENTAL] Enable Delta UniForm (Iceberg compatibility) on migrated Delta tables + enable_uniform_iceberg: bool = False + # Select SQL query statement disposition query_statement_disposition: Disposition = Disposition.INLINE diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index d0fcafbcbe..7ccde95ff9 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -124,6 +124,7 @@ def migrate_tables( hiveserde_in_place_migrate: bool = False, managed_table_external_storage: str = "CLONE", check_uc_table: bool = True, + enable_uniform_iceberg: bool = False, ): if managed_table_external_storage == "CONVERT_TO_EXTERNAL": self._spark = self._spark_session @@ -134,7 +135,11 @@ def migrate_tables( if what == What.VIEW: return self._migrate_views() return self._migrate_tables( - what, managed_table_external_storage.upper(), hiveserde_in_place_migrate, check_uc_table + what, + managed_table_external_storage.upper(), + hiveserde_in_place_migrate, + check_uc_table, + enable_uniform_iceberg, ) def _migrate_tables( @@ -143,6 +148,7 @@ def _migrate_tables( managed_table_external_storage: str, hiveserde_in_place_migrate: bool = False, check_uc_table: bool = True, + enable_uniform_iceberg: bool = False, ): tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) @@ -154,6 +160,7 @@ def _migrate_tables( table, managed_table_external_storage, hiveserde_in_place_migrate, + enable_uniform_iceberg, ) ) Threads.strict("migrate tables", tasks) @@ -207,12 +214,16 @@ def _safe_migrate_table( src_table: TableToMigrate, managed_table_external_storage: str, hiveserde_in_place_migrate: bool = False, + enable_uniform_iceberg: bool = False, ) -> bool: if self._table_already_migrated(src_table.rule.as_uc_table_key): logger.info(f"Table {src_table.src.key} already migrated to {src_table.rule.as_uc_table_key}") return True try: - return self._migrate_table(src_table, managed_table_external_storage, hiveserde_in_place_migrate) + result = self._migrate_table(src_table, managed_table_external_storage, hiveserde_in_place_migrate) + if result and enable_uniform_iceberg and src_table.src.is_delta: + self._enable_uniform_iceberg(src_table.rule.as_uc_table_key) + return result except Exception as e: # pylint: disable=broad-exception-caught # Catching a Spark AnalysisException here, for which we do not have the dependency to catch explicitly pattern = ( # See https://github.com/databrickslabs/ucx/issues/2891 @@ -747,6 +758,23 @@ def _sql_add_migrated_comment(self, table: Table, target_table_key: str) -> str: """Docs: https://docs.databricks.com/en/data-governance/unity-catalog/migrate.html#add-comments-to-indicate-that-a-hive-table-has-been-migrated""" return f"COMMENT ON {table.kind} {escape_sql_identifier(table.key)} IS 'This {table.kind.lower()} is deprecated. Please use `{target_table_key}` instead of `{table.key}`.';" + def _enable_uniform_iceberg(self, target_table_key: str) -> None: + """Enables Delta UniForm (IcebergCompatV2) on the migrated table. + See https://docs.databricks.com/aws/en/delta/uniform#enable-iceberg-reads-on-an-existing-table""" + logger.info(f"Enabling UniForm Iceberg compatibility on {target_table_key}") + escaped = escape_sql_identifier(target_table_key) + self._sql_backend.execute( + f"ALTER TABLE {escaped} " f"SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')" + ) + self._sql_backend.execute(f"REORG TABLE {escaped} APPLY (PURGE)") + self._sql_backend.execute( + f"ALTER TABLE {escaped} " + f"SET TBLPROPERTIES (" + f"'delta.columnMapping.mode' = 'name', " + f"'delta.universalFormat.enabledFormats' = 'iceberg', " + f"'delta.enableIcebergCompatV2' = 'true')" + ) + def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int) -> str: """Adds a property to the table indicating the source of the migration. This is used to track the source of the migration for auditing purposes. diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 640602a4d8..e8ed647d34 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -24,7 +24,9 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext): to the Unity Catalog. """ ctx.tables_migrator.migrate_tables( - what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage + what=What.EXTERNAL_SYNC, + managed_table_external_storage=ctx.config.managed_table_external_storage, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, ) @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table]) @@ -32,7 +34,10 @@ def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext): """This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog using deep clone. """ - ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA) + ctx.tables_migrator.migrate_tables( + what=What.DBFS_ROOT_DELTA, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, + ) @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_dbfs_root_non_delta_tables( @@ -42,7 +47,10 @@ def migrate_dbfs_root_non_delta_tables( """This workflow task migrates non delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog using CTAS. """ - ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_NON_DELTA) + ctx.tables_migrator.migrate_tables( + what=What.DBFS_ROOT_NON_DELTA, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, + ) @job_task( job_cluster="user_isolation", @@ -119,6 +127,7 @@ def migrate_hive_serde_in_place(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables( what=What.EXTERNAL_HIVESERDE, hiveserde_in_place_migrate=True, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, ) @job_task( @@ -180,6 +189,7 @@ def migrate_other_external_ctas(self, ctx: RuntimeContext): """This workflow task migrates non-SYNC supported and non HiveSerde external tables using CTAS""" ctx.tables_migrator.migrate_tables( what=What.EXTERNAL_NO_SYNC, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, ) @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables]) @@ -187,6 +197,7 @@ def migrate_hive_serde_ctas(self, ctx: RuntimeContext): """This workflow task migrates HiveSerde tables using CTAS""" ctx.tables_migrator.migrate_tables( what=What.EXTERNAL_HIVESERDE, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, ) @job_task( @@ -296,7 +307,10 @@ def __init__(self): @job_task(job_cluster="user_isolation", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental]) def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): """[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" - ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) + ctx.tables_migrator.migrate_tables( + what=What.TABLE_IN_MOUNT, + enable_uniform_iceberg=ctx.config.enable_uniform_iceberg, + ) @job_task(job_cluster="user_isolation") def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: diff --git a/tests/integration/hive_metastore/test_migrate.py b/tests/integration/hive_metastore/test_migrate.py index bef03a4b27..d700f50e5c 100644 --- a/tests/integration/hive_metastore/test_migrate.py +++ b/tests/integration/hive_metastore/test_migrate.py @@ -948,3 +948,27 @@ def test_migration_index_deleted_source(make_table, runtime_ctx, sql_backend, ma f"failed-to-migrate: {src_table.schema_name}.{src_table.name} set as a source does no longer exist" ) assert any(expected_message in record.message for record in caplog.records) + + +@retried(on=[NotFound], timeout=timedelta(minutes=2)) +def test_migrate_managed_tables_with_uniform_iceberg(ws, sql_backend, runtime_ctx, make_catalog): + src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore") + src_managed_table = runtime_ctx.make_table( + catalog_name=src_schema.catalog_name, + schema_name=src_schema.name, + ) + + dst_catalog = make_catalog() + dst_schema = runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema.name) + + rules = [Rule.from_src_dst(src_managed_table, dst_schema)] + + runtime_ctx.with_table_mapping_rules(rules) + runtime_ctx.with_dummy_resource_permission() + runtime_ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=True) + + target_table_properties = ws.tables.get(f"{dst_schema.full_name}.{src_managed_table.name}").properties + assert target_table_properties["upgraded_from"] == src_managed_table.full_name + assert target_table_properties["delta.columnMapping.mode"] == "name" + assert target_table_properties["delta.universalFormat.enabledFormats"] == "iceberg" + assert target_table_properties["delta.enableIcebergCompatV2"] == "true" diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index fb74f73742..d26f7c29e2 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1803,3 +1803,138 @@ def test_migrate_tables_handles_table_with_empty_column(caplog) -> None: migration_status_refresher.index.assert_not_called() # Only called when migrating view migrate_grants.apply.assert_not_called() # Errors before getting here external_locations.resolve_mount.assert_not_called() # Only called when migrating external table + + +def test_migrate_dbfs_root_delta_table_with_uniform_iceberg_enabled(ws, mock_pyspark): + errors = {} + rows = {} + backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = TablesCrawler(backend, "inventory_database") + table_mapping = mock_table_mapping(["managed_dbfs"]) + migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler) + migrate_grants = create_autospec(MigrateGrants) + external_locations = create_autospec(ExternalLocations) + table_migrate = TablesMigrator( + table_crawler, + ws, + backend, + table_mapping, + migration_status_refresher, + migrate_grants, + external_locations, + ) + table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=True) + + assert ( + "ALTER TABLE `ucx_default`.`db1_dst`.`managed_dbfs` " + "SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')" + ) in backend.queries + assert "REORG TABLE `ucx_default`.`db1_dst`.`managed_dbfs` APPLY (PURGE)" in backend.queries + assert ( + "ALTER TABLE `ucx_default`.`db1_dst`.`managed_dbfs` " + "SET TBLPROPERTIES (" + "'delta.columnMapping.mode' = 'name', " + "'delta.universalFormat.enabledFormats' = 'iceberg', " + "'delta.enableIcebergCompatV2' = 'true')" + ) in backend.queries + migrate_grants.apply.assert_called() + external_locations.resolve_mount.assert_not_called() + + +def test_migrate_external_sync_table_with_uniform_iceberg_enabled(ws, mock_pyspark): + errors = {} + rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]} + backend = MockBackend(fails_on_first=errors, rows=rows) + crawler_backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = TablesCrawler(crawler_backend, "inventory_database") + table_mapping = mock_table_mapping(["external_src"]) + migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler) + migrate_grants = create_autospec(MigrateGrants) + external_locations = create_autospec(ExternalLocations) + table_migrate = TablesMigrator( + table_crawler, + ws, + backend, + table_mapping, + migration_status_refresher, + migrate_grants, + external_locations, + ) + table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, enable_uniform_iceberg=True) + + assert ( + "ALTER TABLE `ucx_default`.`db1_dst`.`external_dst` " + "SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')" + ) in backend.queries + assert "REORG TABLE `ucx_default`.`db1_dst`.`external_dst` APPLY (PURGE)" in backend.queries + assert ( + "ALTER TABLE `ucx_default`.`db1_dst`.`external_dst` " + "SET TBLPROPERTIES (" + "'delta.columnMapping.mode' = 'name', " + "'delta.universalFormat.enabledFormats' = 'iceberg', " + "'delta.enableIcebergCompatV2' = 'true')" + ) in backend.queries + migrate_grants.apply.assert_called() + external_locations.resolve_mount.assert_not_called() + + +def test_migrate_non_delta_table_with_uniform_iceberg_does_not_alter(ws, mock_pyspark): + errors = {} + rows = {} + backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = TablesCrawler(backend, "inventory_database") + table_mapping = mock_table_mapping(["dbfs_parquet"]) + migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler) + migrate_grants = create_autospec(MigrateGrants) + external_locations = create_autospec(ExternalLocations) + table_migrate = TablesMigrator( + table_crawler, + ws, + backend, + table_mapping, + migration_status_refresher, + migrate_grants, + external_locations, + ) + table_migrate.migrate_tables(what=What.DBFS_ROOT_NON_DELTA, enable_uniform_iceberg=True) + + uniform_query = ( + "SET TBLPROPERTIES (" + "'delta.columnMapping.mode' = 'name', " + "'delta.universalFormat.enabledFormats' = 'iceberg', " + "'delta.enableIcebergCompatV2' = 'true')" + ) + assert all(uniform_query not in q for q in backend.queries) + migrate_grants.apply.assert_called() + external_locations.resolve_mount.assert_not_called() + + +def test_migrate_delta_table_without_uniform_iceberg_flag_does_not_alter(ws, mock_pyspark): + errors = {} + rows = {} + backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = TablesCrawler(backend, "inventory_database") + table_mapping = mock_table_mapping(["managed_dbfs"]) + migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler) + migrate_grants = create_autospec(MigrateGrants) + external_locations = create_autospec(ExternalLocations) + table_migrate = TablesMigrator( + table_crawler, + ws, + backend, + table_mapping, + migration_status_refresher, + migrate_grants, + external_locations, + ) + table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=False) + + uniform_query = ( + "SET TBLPROPERTIES (" + "'delta.columnMapping.mode' = 'name', " + "'delta.universalFormat.enabledFormats' = 'iceberg', " + "'delta.enableIcebergCompatV2' = 'true')" + ) + assert all(uniform_query not in q for q in backend.queries) + migrate_grants.apply.assert_called() + external_locations.resolve_mount.assert_not_called()