Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,39 @@ jobs:
- name: Install Databricks CLI
uses: databricks/setup-cli@8db12393ac48926ab0074f3928f36c7abc441f90 # v0.292.0

# `databricks labs install lsql` fetches the release list and source zipball from
# api.github.com *anonymously* (the CLI never reads GITHUB_TOKEN), so it routinely
# hits GitHub's 60 req/hr unauthenticated rate limit on shared runner IPs and fails
# with "403 Forbidden". To avoid that, pre-fetch everything with authenticated `gh`
# (5000 req/hr), seed the CLI's local cache + lib dir, then install with --offline so
# the CLI makes no GitHub API calls at all.
- name: Install LSQL
run: |
databricks labs install lsql --log-level=trace
databricks labs installed
env: # this is a temporary hack
env:
DATABRICKS_HOST: any
DATABRICKS_TOKEN: any
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
set -euo pipefail
labs_dir="$HOME/.databricks/labs/lsql"
mkdir -p "$labs_dir/cache" "$labs_dir/lib"

# Seed the version cache the CLI reads, so `install --offline` resolves "latest"
# without calling api.github.com. The cache format is {refreshed_at, data:[releases]}.
releases="$(gh api repos/databrickslabs/lsql/releases)"
latest_tag="$(printf '%s' "$releases" | jq -r '.[0].tag_name')"
printf '%s' "$releases" \
| jq '{refreshed_at: (now | todate), data: .}' \
> "$labs_dir/cache/databrickslabs-lsql-releases.json"

# Download and unpack the release source into the lib dir (what the CLI would
# otherwise pull from the GitHub zipball endpoint). Strip the single top-level dir.
tmp="$(mktemp -d)"
gh release download "$latest_tag" --repo databrickslabs/lsql --archive=zip --output "$tmp/lsql.zip"
unzip -q "$tmp/lsql.zip" -d "$tmp/unpacked"
cp -R "$(find "$tmp/unpacked" -mindepth 1 -maxdepth 1 -type d)"/. "$labs_dir/lib/"

databricks labs install lsql --offline --log-level=trace
databricks labs installed

- name: Reformat SQL queries
run: databricks labs lsql fmt --normalize-case false --exclude tests/unit/source_code/samples/
Expand Down
1 change: 1 addition & 0 deletions docs/ucx/docs/reference/commands/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ access the configuration file from the command line. Here's the description of c
* `database_to_catalog_mapping`: An optional dictionary mapping source database names to target catalog names.
* `default_catalog`: An optional string representing the default catalog name.
* `skip_tacl_migration`: Optional flag, allow skipping TACL migration when migrating tables or creating catalogs and schemas.
* `enable_uniform_iceberg`: Optional experimental flag (default `false`), enables Delta UniForm (IcebergCompatV2) on migrated Delta tables for Iceberg interoperability. **Warning:** enabling this will disable Deletion Vectors and run `REORG TABLE APPLY (PURGE)` on each migrated table as a prerequisite for UniForm. See [Databricks documentation](https://docs.databricks.com/aws/en/delta/uniform#enable-iceberg-reads-on-an-existing-table).
* `default_owner_group`: Assigns this group to all migrated objects (catalogs, databases, tables, views, etc.). The group has to be an account group and the user running the migration has to be a member of this group.
* `log_level`: An optional string representing the log level.
* `workspace_start_path`: A string representing the starting path for notebooks and directories crawler in the workspace.
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes
# Skip TACL migration during table migration
skip_tacl_migration: bool = False

# [EXPERIMENTAL] Enable Delta UniForm (Iceberg compatibility) on migrated Delta tables
enable_uniform_iceberg: bool = False

# Select SQL query statement disposition
query_statement_disposition: Disposition = Disposition.INLINE

Expand Down
32 changes: 30 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def migrate_tables(
hiveserde_in_place_migrate: bool = False,
managed_table_external_storage: str = "CLONE",
check_uc_table: bool = True,
enable_uniform_iceberg: bool = False,
):
if managed_table_external_storage == "CONVERT_TO_EXTERNAL":
self._spark = self._spark_session
Expand All @@ -134,7 +135,11 @@ def migrate_tables(
if what == What.VIEW:
return self._migrate_views()
return self._migrate_tables(
what, managed_table_external_storage.upper(), hiveserde_in_place_migrate, check_uc_table
what,
managed_table_external_storage.upper(),
hiveserde_in_place_migrate,
check_uc_table,
enable_uniform_iceberg,
)

def _migrate_tables(
Expand All @@ -143,6 +148,7 @@ def _migrate_tables(
managed_table_external_storage: str,
hiveserde_in_place_migrate: bool = False,
check_uc_table: bool = True,
enable_uniform_iceberg: bool = False,
):
tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table)
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
Expand All @@ -154,6 +160,7 @@ def _migrate_tables(
table,
managed_table_external_storage,
hiveserde_in_place_migrate,
enable_uniform_iceberg,
)
)
Threads.strict("migrate tables", tasks)
Expand Down Expand Up @@ -207,12 +214,16 @@ def _safe_migrate_table(
src_table: TableToMigrate,
managed_table_external_storage: str,
hiveserde_in_place_migrate: bool = False,
enable_uniform_iceberg: bool = False,
) -> bool:
if self._table_already_migrated(src_table.rule.as_uc_table_key):
logger.info(f"Table {src_table.src.key} already migrated to {src_table.rule.as_uc_table_key}")
return True
try:
return self._migrate_table(src_table, managed_table_external_storage, hiveserde_in_place_migrate)
result = self._migrate_table(src_table, managed_table_external_storage, hiveserde_in_place_migrate)
if result and enable_uniform_iceberg and src_table.src.is_delta:
self._enable_uniform_iceberg(src_table.rule.as_uc_table_key)
return result
except Exception as e: # pylint: disable=broad-exception-caught
# Catching a Spark AnalysisException here, for which we do not have the dependency to catch explicitly
pattern = ( # See https://github.com/databrickslabs/ucx/issues/2891
Expand Down Expand Up @@ -747,6 +758,23 @@ def _sql_add_migrated_comment(self, table: Table, target_table_key: str) -> str:
"""Docs: https://docs.databricks.com/en/data-governance/unity-catalog/migrate.html#add-comments-to-indicate-that-a-hive-table-has-been-migrated"""
return f"COMMENT ON {table.kind} {escape_sql_identifier(table.key)} IS 'This {table.kind.lower()} is deprecated. Please use `{target_table_key}` instead of `{table.key}`.';"

def _enable_uniform_iceberg(self, target_table_key: str) -> None:
"""Enables Delta UniForm (IcebergCompatV2) on the migrated table.
See https://docs.databricks.com/aws/en/delta/uniform#enable-iceberg-reads-on-an-existing-table"""
logger.info(f"Enabling UniForm Iceberg compatibility on {target_table_key}")
escaped = escape_sql_identifier(target_table_key)
self._sql_backend.execute(
f"ALTER TABLE {escaped} " f"SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')"
)
self._sql_backend.execute(f"REORG TABLE {escaped} APPLY (PURGE)")
self._sql_backend.execute(
f"ALTER TABLE {escaped} "
f"SET TBLPROPERTIES ("
f"'delta.columnMapping.mode' = 'name', "
f"'delta.universalFormat.enabledFormats' = 'iceberg', "
f"'delta.enableIcebergCompatV2' = 'true')"
)

def _sql_alter_from(self, table: Table, target_table_key: str, ws_id: int) -> str:
"""Adds a property to the table indicating the source of the migration.
This is used to track the source of the migration for auditing purposes.
Expand Down
22 changes: 18 additions & 4 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext):
to the Unity Catalog.
"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage
what=What.EXTERNAL_SYNC,
managed_table_external_storage=ctx.config.managed_table_external_storage,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table])
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):
"""This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
using deep clone.
"""
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA)
ctx.tables_migrator.migrate_tables(
what=What.DBFS_ROOT_DELTA,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table])
def migrate_dbfs_root_non_delta_tables(
Expand All @@ -42,7 +47,10 @@ def migrate_dbfs_root_non_delta_tables(
"""This workflow task migrates non delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
using CTAS.
"""
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_NON_DELTA)
ctx.tables_migrator.migrate_tables(
what=What.DBFS_ROOT_NON_DELTA,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(
job_cluster="user_isolation",
Expand Down Expand Up @@ -119,6 +127,7 @@ def migrate_hive_serde_in_place(self, ctx: RuntimeContext):
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_HIVESERDE,
hiveserde_in_place_migrate=True,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(
Expand Down Expand Up @@ -180,13 +189,15 @@ def migrate_other_external_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates non-SYNC supported and non HiveSerde external tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_NO_SYNC,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
def migrate_hive_serde_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates HiveSerde tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_HIVESERDE,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(
Expand Down Expand Up @@ -296,7 +307,10 @@ def __init__(self):
@job_task(job_cluster="user_isolation", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental])
def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)
ctx.tables_migrator.migrate_tables(
what=What.TABLE_IN_MOUNT,
enable_uniform_iceberg=ctx.config.enable_uniform_iceberg,
)

@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,3 +948,27 @@ def test_migration_index_deleted_source(make_table, runtime_ctx, sql_backend, ma
f"failed-to-migrate: {src_table.schema_name}.{src_table.name} set as a source does no longer exist"
)
assert any(expected_message in record.message for record in caplog.records)


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_managed_tables_with_uniform_iceberg(ws, sql_backend, runtime_ctx, make_catalog):
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_managed_table = runtime_ctx.make_table(
catalog_name=src_schema.catalog_name,
schema_name=src_schema.name,
)

dst_catalog = make_catalog()
dst_schema = runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema.name)

rules = [Rule.from_src_dst(src_managed_table, dst_schema)]

runtime_ctx.with_table_mapping_rules(rules)
runtime_ctx.with_dummy_resource_permission()
runtime_ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=True)

target_table_properties = ws.tables.get(f"{dst_schema.full_name}.{src_managed_table.name}").properties
assert target_table_properties["upgraded_from"] == src_managed_table.full_name
assert target_table_properties["delta.columnMapping.mode"] == "name"
assert target_table_properties["delta.universalFormat.enabledFormats"] == "iceberg"
assert target_table_properties["delta.enableIcebergCompatV2"] == "true"
135 changes: 135 additions & 0 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1803,3 +1803,138 @@ def test_migrate_tables_handles_table_with_empty_column(caplog) -> None:
migration_status_refresher.index.assert_not_called() # Only called when migrating view
migrate_grants.apply.assert_not_called() # Errors before getting here
external_locations.resolve_mount.assert_not_called() # Only called when migrating external table


def test_migrate_dbfs_root_delta_table_with_uniform_iceberg_enabled(ws, mock_pyspark):
errors = {}
rows = {}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = mock_table_mapping(["managed_dbfs"])
migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
migrate_grants = create_autospec(MigrateGrants)
external_locations = create_autospec(ExternalLocations)
table_migrate = TablesMigrator(
table_crawler,
ws,
backend,
table_mapping,
migration_status_refresher,
migrate_grants,
external_locations,
)
table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=True)

assert (
"ALTER TABLE `ucx_default`.`db1_dst`.`managed_dbfs` "
"SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')"
) in backend.queries
assert "REORG TABLE `ucx_default`.`db1_dst`.`managed_dbfs` APPLY (PURGE)" in backend.queries
assert (
"ALTER TABLE `ucx_default`.`db1_dst`.`managed_dbfs` "
"SET TBLPROPERTIES ("
"'delta.columnMapping.mode' = 'name', "
"'delta.universalFormat.enabledFormats' = 'iceberg', "
"'delta.enableIcebergCompatV2' = 'true')"
) in backend.queries
migrate_grants.apply.assert_called()
external_locations.resolve_mount.assert_not_called()


def test_migrate_external_sync_table_with_uniform_iceberg_enabled(ws, mock_pyspark):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
backend = MockBackend(fails_on_first=errors, rows=rows)
crawler_backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(crawler_backend, "inventory_database")
table_mapping = mock_table_mapping(["external_src"])
migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
migrate_grants = create_autospec(MigrateGrants)
external_locations = create_autospec(ExternalLocations)
table_migrate = TablesMigrator(
table_crawler,
ws,
backend,
table_mapping,
migration_status_refresher,
migrate_grants,
external_locations,
)
table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, enable_uniform_iceberg=True)

assert (
"ALTER TABLE `ucx_default`.`db1_dst`.`external_dst` "
"SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false')"
) in backend.queries
assert "REORG TABLE `ucx_default`.`db1_dst`.`external_dst` APPLY (PURGE)" in backend.queries
assert (
"ALTER TABLE `ucx_default`.`db1_dst`.`external_dst` "
"SET TBLPROPERTIES ("
"'delta.columnMapping.mode' = 'name', "
"'delta.universalFormat.enabledFormats' = 'iceberg', "
"'delta.enableIcebergCompatV2' = 'true')"
) in backend.queries
migrate_grants.apply.assert_called()
external_locations.resolve_mount.assert_not_called()


def test_migrate_non_delta_table_with_uniform_iceberg_does_not_alter(ws, mock_pyspark):
errors = {}
rows = {}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = mock_table_mapping(["dbfs_parquet"])
migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
migrate_grants = create_autospec(MigrateGrants)
external_locations = create_autospec(ExternalLocations)
table_migrate = TablesMigrator(
table_crawler,
ws,
backend,
table_mapping,
migration_status_refresher,
migrate_grants,
external_locations,
)
table_migrate.migrate_tables(what=What.DBFS_ROOT_NON_DELTA, enable_uniform_iceberg=True)

uniform_query = (
"SET TBLPROPERTIES ("
"'delta.columnMapping.mode' = 'name', "
"'delta.universalFormat.enabledFormats' = 'iceberg', "
"'delta.enableIcebergCompatV2' = 'true')"
)
assert all(uniform_query not in q for q in backend.queries)
migrate_grants.apply.assert_called()
external_locations.resolve_mount.assert_not_called()


def test_migrate_delta_table_without_uniform_iceberg_flag_does_not_alter(ws, mock_pyspark):
errors = {}
rows = {}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = mock_table_mapping(["managed_dbfs"])
migration_status_refresher = TableMigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
migrate_grants = create_autospec(MigrateGrants)
external_locations = create_autospec(ExternalLocations)
table_migrate = TablesMigrator(
table_crawler,
ws,
backend,
table_mapping,
migration_status_refresher,
migrate_grants,
external_locations,
)
table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, enable_uniform_iceberg=False)

uniform_query = (
"SET TBLPROPERTIES ("
"'delta.columnMapping.mode' = 'name', "
"'delta.universalFormat.enabledFormats' = 'iceberg', "
"'delta.enableIcebergCompatV2' = 'true')"
)
assert all(uniform_query not in q for q in backend.queries)
migrate_grants.apply.assert_called()
external_locations.resolve_mount.assert_not_called()
Loading