diff --git a/CHANGELOG.md b/CHANGELOG.md index b1f16dc..f9fea45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,20 @@ All notable changes to the RAJA project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.3.0] - 2026-03-19 + +### Added + +- **Lake Formation–native Iceberg catalog flow**: DataZone can now import Glue tables registered under Lake Formation. A new `seed_glue_tables.py` script creates and registers Glue databases and Iceberg tables, then imports them into DataZone as assets. +- **`seed_glue_tables.py` script**: New seeding script that provisions Glue databases, registers S3 locations with Lake Formation, creates Iceberg tables, and drives the DataZone Glue import flow end-to-end. +- **LF-native PoC scripts** (`scripts/lf_native_poc/`): Exploratory scripts for the Lake Formation–native Iceberg catalog path, including a throwaway-subscriber creation helper and a package-tag import proof-of-concept. + +### Changed + +- **Terraform: `DATAZONE_PROJECTS` declared in Terraform and fed back via `tf-outputs.json`**: The `DATAZONE_PROJECTS` env var is now sourced from Terraform outputs, eliminating config drift after domain recreation. +- **Lake Formation access granted to DataZone Glue import role**: The IAM role used by DataZone to import Glue assets now has the necessary Lake Formation permissions (`lakeformation:GetDataAccess`, `lakeformation:GrantPermissions`, etc.). +- **Seed scripts no longer use hardcoded project names**: `seed_glue_tables.py`, `seed_users.py`, and `seed_packages.py` derive project names from `seed-config.yaml` rather than hard-coded strings. + ## [1.2.0] - 2026-03-18 ### Added diff --git a/infra/terraform/main.tf b/infra/terraform/main.tf index 8fbc460..9ae71ea 100644 --- a/infra/terraform/main.tf +++ b/infra/terraform/main.tf @@ -39,7 +39,7 @@ locals { [filesha256(local.rale_router_requirements)], [for source_file in fileset(local.rale_router_source_dir, "**") : filesha256("${local.rale_router_source_dir}/${source_file}") if !endswith(source_file, ".pyc")] ))) - lambda_pip_platform = var.lambda_architecture == "arm64" ? "aarch64-manylinux2014" : "x86_64-manylinux2014" + lambda_pip_platform = var.lambda_architecture == "arm64" ? "aarch64-manylinux2014" : "x86_64-manylinux2014" lambda_python_version = "3.14" envoy_source_dir = "${local.repo_root}/infra/envoy" @@ -73,18 +73,308 @@ locals { ARM64 = "ARM64 (linux/arm64)" X86_64 = "X86_64 (linux/amd64)" } - lambda_arn_prefix = "arn:aws:lambda:${var.aws_region}:${data.aws_caller_identity.current.account_id}:function" - control_plane_lambda_name = "${var.stack_name}-control-plane" - rale_authorizer_lambda_name = "${var.stack_name}-rale-authorizer" - rale_router_lambda_name = "${var.stack_name}-rale-router" - control_plane_lambda_arn = "${local.lambda_arn_prefix}:${local.control_plane_lambda_name}" - rale_authorizer_lambda_arn = "${local.lambda_arn_prefix}:${local.rale_authorizer_lambda_name}" - rale_router_lambda_arn = "${local.lambda_arn_prefix}:${local.rale_router_lambda_name}" - datazone_domain_exec_role = "${var.stack_name}-datazone-domain-execution" - datazone_domain_service_role = "${var.stack_name}-datazone-domain-service" - datazone_env_owner_role = "raja-dz-env-owner" - datazone_env_users_role = "raja-dz-env-users" - datazone_env_guests_role = "raja-dz-env-guests" + lambda_arn_prefix = "arn:aws:lambda:${var.aws_region}:${data.aws_caller_identity.current.account_id}:function" + control_plane_lambda_name = "${var.stack_name}-control-plane" + rale_authorizer_lambda_name = "${var.stack_name}-rale-authorizer" + rale_router_lambda_name = "${var.stack_name}-rale-router" + control_plane_lambda_arn = "${local.lambda_arn_prefix}:${local.control_plane_lambda_name}" + rale_authorizer_lambda_arn = "${local.lambda_arn_prefix}:${local.rale_authorizer_lambda_name}" + rale_router_lambda_arn = "${local.lambda_arn_prefix}:${local.rale_router_lambda_name}" + datazone_domain_exec_role = "${var.stack_name}-datazone-domain-execution" + datazone_domain_service_role = "${var.stack_name}-datazone-domain-service" + datazone_env_owner_role = "raja-dz-env-owner" + datazone_env_users_role = "raja-dz-env-users" + datazone_env_guests_role = "raja-dz-env-guests" + tooling_blueprint_id = "cjegf7f6kky6w7" + sagemaker_provisioning_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/service-role/AmazonSageMakerProvisioning-${data.aws_caller_identity.current.account_id}" + sagemaker_manage_access_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/service-role/AmazonSageMakerManageAccess-${var.aws_region}-${aws_datazone_domain.raja.id}" + tooling_regional_parameters = { + (var.aws_region) = { + AZs = join(",", local.azs) + S3Location = "s3://amazon-sagemaker-${data.aws_caller_identity.current.account_id}-${var.aws_region}-db47c3e6c85d" + Subnets = "subnet-09d384be5cc82f4a3,subnet-0c4d8951561fb21ea" + VpcId = "vpc-010008ef3cce35c0c" + } + } + + iceberg_enabled = var.iceberg_s3_bucket != "" + iceberg_source_database = "icebergdatabase-v9cxuqnwjj5a" + iceberg_native_database = "${var.stack_name}-iceberg-lf" + iceberg_table_names = ["package_entry", "package_manifest", "package_revision", "package_tag"] + iceberg_table_name_map = local.iceberg_enabled ? { for name in local.iceberg_table_names : name => name } : {} + iceberg_table_location_arns = local.iceberg_enabled ? { + for table_name, table in data.aws_glue_catalog_table.iceberg_source : + table_name => replace(table.storage_descriptor[0].location, "s3://", "arn:aws:s3:::") + } : {} +} + +data "aws_glue_catalog_table" "iceberg_source" { + for_each = local.iceberg_table_name_map + database_name = local.iceberg_source_database + name = each.key +} + +resource "aws_glue_catalog_database" "iceberg_lf" { + count = local.iceberg_enabled ? 1 : 0 + name = local.iceberg_native_database +} + +resource "aws_glue_catalog_table" "iceberg_lf" { + for_each = local.iceberg_table_name_map + database_name = aws_glue_catalog_database.iceberg_lf[0].name + name = each.key + description = data.aws_glue_catalog_table.iceberg_source[each.key].description + owner = data.aws_glue_catalog_table.iceberg_source[each.key].owner + retention = data.aws_glue_catalog_table.iceberg_source[each.key].retention + table_type = data.aws_glue_catalog_table.iceberg_source[each.key].table_type + + parameters = data.aws_glue_catalog_table.iceberg_source[each.key].parameters + view_expanded_text = data.aws_glue_catalog_table.iceberg_source[each.key].view_expanded_text + view_original_text = data.aws_glue_catalog_table.iceberg_source[each.key].view_original_text + + storage_descriptor { + additional_locations = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].additional_locations + bucket_columns = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].bucket_columns + compressed = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].compressed + input_format = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].input_format + location = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].location + number_of_buckets = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].number_of_buckets + output_format = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].output_format + parameters = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].parameters + stored_as_sub_directories = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].stored_as_sub_directories + + dynamic "columns" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].columns + content { + comment = columns.value.comment + name = columns.value.name + parameters = columns.value.parameters + type = columns.value.type + } + } + + dynamic "schema_reference" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].schema_reference + content { + schema_version_id = schema_reference.value.schema_version_id + schema_version_number = schema_reference.value.schema_version_number + + dynamic "schema_id" { + for_each = schema_reference.value.schema_id + content { + registry_name = schema_id.value.registry_name + schema_arn = schema_id.value.schema_arn + schema_name = schema_id.value.schema_name + } + } + } + } + + dynamic "ser_de_info" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].ser_de_info + content { + name = ser_de_info.value.name + parameters = ser_de_info.value.parameters + serialization_library = ser_de_info.value.serialization_library + } + } + + dynamic "skewed_info" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].skewed_info + content { + skewed_column_names = skewed_info.value.skewed_column_names + skewed_column_value_location_maps = skewed_info.value.skewed_column_value_location_maps + skewed_column_values = skewed_info.value.skewed_column_values + } + } + + dynamic "sort_columns" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].storage_descriptor[0].sort_columns + content { + column = sort_columns.value.column + sort_order = sort_columns.value.sort_order + } + } + } + + dynamic "partition_keys" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].partition_keys + content { + comment = partition_keys.value.comment + name = partition_keys.value.name + parameters = partition_keys.value.parameters + type = partition_keys.value.type + } + } + + dynamic "target_table" { + for_each = data.aws_glue_catalog_table.iceberg_source[each.key].target_table + content { + catalog_id = target_table.value.catalog_id + database_name = target_table.value.database_name + name = target_table.value.name + region = target_table.value.region + } + } +} + +resource "null_resource" "enforce_iceberg_lf_native" { + count = local.iceberg_enabled ? 1 : 0 + + triggers = { + database_name = aws_glue_catalog_database.iceberg_lf[0].name + table_names = join(",", local.iceberg_table_names) + } + + provisioner "local-exec" { + command = <<-EOT + set -euo pipefail + aws glue update-database \ + --region "${var.aws_region}" \ + --name "${aws_glue_catalog_database.iceberg_lf[0].name}" \ + --database-input '{"Name":"${aws_glue_catalog_database.iceberg_lf[0].name}","CreateTableDefaultPermissions":[]}' + + if aws lakeformation list-permissions --region "${var.aws_region}" --resource '{"Database":{"Name":"${aws_glue_catalog_database.iceberg_lf[0].name}"}}' --query "PrincipalResourcePermissions[?Principal.DataLakePrincipalIdentifier=='IAM_ALLOWED_PRINCIPALS'] | length(@)" --output text | grep -Fqx "1"; then + aws lakeformation revoke-permissions \ + --region "${var.aws_region}" \ + --principal DataLakePrincipalIdentifier=IAM_ALLOWED_PRINCIPALS \ + --resource '{"Database":{"Name":"${aws_glue_catalog_database.iceberg_lf[0].name}"}}' \ + --permissions ALL + fi + + for table in ${join(" ", local.iceberg_table_names)}; do + if aws lakeformation list-permissions --region "${var.aws_region}" --resource "{\\"Table\\":{\\"DatabaseName\\":\\"${aws_glue_catalog_database.iceberg_lf[0].name}\\",\\"Name\\":\\"$table\\"}}" --query "PrincipalResourcePermissions[?Principal.DataLakePrincipalIdentifier=='IAM_ALLOWED_PRINCIPALS'] | length(@)" --output text | grep -Fqx "1"; then + aws lakeformation revoke-permissions \ + --region "${var.aws_region}" \ + --principal DataLakePrincipalIdentifier=IAM_ALLOWED_PRINCIPALS \ + --resource "{\\"Table\\":{\\"DatabaseName\\":\\"${aws_glue_catalog_database.iceberg_lf[0].name}\\",\\"Name\\":\\"$table\\"}}" \ + --permissions ALL + fi + done + EOT + } + + depends_on = [aws_glue_catalog_table.iceberg_lf] +} + +resource "null_resource" "ensure_iceberg_table_location" { + for_each = local.iceberg_table_location_arns + + triggers = { + resource_arn = each.value + } + + # The AWS provider exposes aws_lakeformation_resource but does not support import. + # This stack already has these registrations in-place, so ensure they exist via CLI + # and let Terraform manage only the grant side. + provisioner "local-exec" { + command = <<-EOT + set -euo pipefail + if aws lakeformation list-resources --region "${var.aws_region}" --query "ResourceInfoList[?ResourceArn=='${each.value}'].ResourceArn" --output text | grep -Fqx "${each.value}"; then + echo "Lake Formation resource already registered: ${each.value}" + else + aws lakeformation register-resource \ + --region "${var.aws_region}" \ + --resource-arn "${each.value}" \ + --use-service-linked-role \ + --hybrid-access-enabled + fi + EOT + } +} + +resource "aws_lakeformation_permissions" "iceberg_lf_dz_domain_data_location" { + for_each = local.iceberg_table_location_arns + principal = aws_datazone_domain.raja.domain_execution_role + + permissions = ["DATA_LOCATION_ACCESS"] + permissions_with_grant_option = ["DATA_LOCATION_ACCESS"] + + data_location { + arn = each.value + } + + depends_on = [ + null_resource.ensure_iceberg_table_location, + null_resource.enforce_iceberg_lf_native, + ] +} + +resource "aws_lakeformation_permissions" "iceberg_lf_dz_domain_db" { + count = local.iceberg_enabled ? 1 : 0 + principal = aws_datazone_domain.raja.domain_execution_role + + permissions = ["ALL"] + permissions_with_grant_option = ["ALL"] + + database { + name = aws_glue_catalog_database.iceberg_lf[0].name + } + + depends_on = [null_resource.enforce_iceberg_lf_native] +} + +resource "aws_lakeformation_permissions" "iceberg_lf_dz_domain_tables" { + for_each = local.iceberg_table_name_map + principal = aws_datazone_domain.raja.domain_execution_role + + permissions = ["ALL"] + permissions_with_grant_option = ["ALL"] + + table { + database_name = aws_glue_catalog_database.iceberg_lf[0].name + name = each.key + } + + depends_on = [null_resource.enforce_iceberg_lf_native] +} + +# H2 test: grant the owner Lakehouse environment role grantable permissions so +# DataZone can delegate LF grants to subscriber environment roles. +resource "aws_lakeformation_permissions" "iceberg_lf_owner_env_data_location" { + for_each = local.iceberg_table_location_arns + principal = aws_iam_role.datazone_environment_owner.arn + + permissions = ["DATA_LOCATION_ACCESS"] + permissions_with_grant_option = ["DATA_LOCATION_ACCESS"] + + data_location { + arn = each.value + } + + depends_on = [ + null_resource.ensure_iceberg_table_location, + null_resource.enforce_iceberg_lf_native, + ] +} + +resource "aws_lakeformation_permissions" "iceberg_lf_owner_env_db" { + count = local.iceberg_enabled ? 1 : 0 + principal = aws_iam_role.datazone_environment_owner.arn + + permissions = ["ALL"] + permissions_with_grant_option = ["ALL"] + + database { + name = aws_glue_catalog_database.iceberg_lf[0].name + } + + depends_on = [null_resource.enforce_iceberg_lf_native] +} + +resource "aws_lakeformation_permissions" "iceberg_lf_owner_env_tables" { + for_each = local.iceberg_table_name_map + principal = aws_iam_role.datazone_environment_owner.arn + + permissions = ["ALL"] + permissions_with_grant_option = ["ALL"] + + table { + database_name = aws_glue_catalog_database.iceberg_lf[0].name + name = each.key + } + + depends_on = [null_resource.enforce_iceberg_lf_native] } resource "null_resource" "build_raja_layer" { @@ -245,8 +535,8 @@ resource "aws_iam_role_policy" "datazone_domain_execution_blueprint" { Version = "2012-10-17" Statement = [ { - Effect = "Allow" - Action = ["s3:GetObject", "s3:GetObjectVersion"] + Effect = "Allow" + Action = ["s3:GetObject", "s3:GetObjectVersion"] Resource = "arn:aws:s3:::amazon-sagemaker-cf-templates-${var.aws_region}-*/*" }, { @@ -536,6 +826,17 @@ resource "aws_datazone_environment_blueprint_configuration" "default_data_lake" domain_id = aws_datazone_domain.raja.id environment_blueprint_id = "d6y5smpdi8x9lz" enabled_regions = [var.aws_region] + provisioning_role_arn = local.sagemaker_provisioning_role_arn + manage_access_role_arn = local.sagemaker_manage_access_role_arn +} + +resource "aws_datazone_environment_blueprint_configuration" "default_tooling" { + domain_id = aws_datazone_domain.raja.id + environment_blueprint_id = local.tooling_blueprint_id + enabled_regions = [var.aws_region] + provisioning_role_arn = local.sagemaker_provisioning_role_arn + manage_access_role_arn = local.sagemaker_manage_access_role_arn + regional_parameters = local.tooling_regional_parameters } resource "aws_datazone_environment_blueprint_configuration" "raja_registry" { @@ -760,6 +1061,7 @@ resource "aws_lambda_function" "control_plane" { RAJEE_TEST_BUCKET_NAME = aws_s3_bucket.rajee_test.bucket ECS_CLUSTER_NAME = "${var.stack_name}-rajee-cluster" ECS_SERVICE_NAME = "${var.stack_name}-rajee-service" + DATAZONE_PROJECTS = var.datazone_projects } } @@ -871,6 +1173,7 @@ resource "aws_lambda_function" "rale_authorizer" { RALE_AUDIENCE = "raja-rale" RAJA_ALLOW_ASSERTED_PRINCIPAL = var.auth_disabled ? "true" : "false" RAJA_TRUSTED_FORWARDER_ARNS = join(",", [aws_iam_role.rajee_task.arn, aws_iam_role.control_plane_lambda.arn]) + DATAZONE_PROJECTS = var.datazone_projects } } @@ -887,12 +1190,12 @@ resource "aws_lambda_function_url" "rale_authorizer" { } resource "aws_lambda_permission" "rale_authorizer_url_account" { - statement_id = "AllowAccountInvokeRaleAuthorizerUrl" - action = "lambda:InvokeFunctionUrl" - function_name = aws_lambda_function.rale_authorizer.function_name - principal = "*" - function_url_auth_type = "AWS_IAM" - source_account = data.aws_caller_identity.current.account_id + statement_id = "AllowAccountInvokeRaleAuthorizerUrl" + action = "lambda:InvokeFunctionUrl" + function_name = aws_lambda_function.rale_authorizer.function_name + principal = "*" + function_url_auth_type = "AWS_IAM" + source_account = data.aws_caller_identity.current.account_id } resource "aws_iam_role" "rale_router_lambda" { @@ -985,12 +1288,12 @@ resource "aws_lambda_function_url" "rale_router" { } resource "aws_lambda_permission" "rale_router_url_account" { - statement_id = "AllowAccountInvokeRaleRouterUrl" - action = "lambda:InvokeFunctionUrl" - function_name = aws_lambda_function.rale_router.function_name - principal = "*" - function_url_auth_type = "AWS_IAM" - source_account = data.aws_caller_identity.current.account_id + statement_id = "AllowAccountInvokeRaleRouterUrl" + action = "lambda:InvokeFunctionUrl" + function_name = aws_lambda_function.rale_router.function_name + principal = "*" + function_url_auth_type = "AWS_IAM" + source_account = data.aws_caller_identity.current.account_id } resource "aws_api_gateway_rest_api" "raja" { diff --git a/infra/terraform/outputs.tf b/infra/terraform/outputs.tf index d032854..3af7bf0 100644 --- a/infra/terraform/outputs.tf +++ b/infra/terraform/outputs.tf @@ -53,6 +53,11 @@ output "datazone_package_asset_type_revision" { value = aws_datazone_asset_type.quilt_package.revision } +output "iceberg_lf_database_name" { + description = "Lake Formation-native Glue database mirroring the Quilt Iceberg tables." + value = local.iceberg_enabled ? aws_glue_catalog_database.iceberg_lf[0].name : "" +} + output "control_plane_lambda_arn" { description = "Control plane Lambda ARN." value = aws_lambda_function.control_plane.arn diff --git a/infra/terraform/terraform.tfvars.example b/infra/terraform/terraform.tfvars.example index 9e6bbb5..321d1cd 100644 --- a/infra/terraform/terraform.tfvars.example +++ b/infra/terraform/terraform.tfvars.example @@ -14,3 +14,4 @@ certificate_arn = "" ecs_cpu_architecture = "ARM64" rajee_task_cpu = 256 rajee_task_memory = 512 +iceberg_s3_bucket = "" diff --git a/infra/terraform/variables.tf b/infra/terraform/variables.tf index 412afde..199a570 100644 --- a/infra/terraform/variables.tf +++ b/infra/terraform/variables.tf @@ -121,6 +121,12 @@ variable "registry_accessor_arns" { default = [] } +variable "iceberg_s3_bucket" { + description = "S3 bucket containing the Quilt Iceberg tables (without s3:// prefix)." + type = string + default = "" +} + variable "datazone_domain_name" { description = "Amazon DataZone domain name for the RAJA package-grant POC." type = string @@ -150,3 +156,9 @@ variable "datazone_package_asset_type" { type = string default = "QuiltPackage" } + +variable "datazone_projects" { + description = "JSON blob mapping project keys to DataZone project_id/environment_id/project_label. Populated by sagemaker_gaps.py after environments are created and fed back in via TF_VAR_datazone_projects on subsequent runs." + type = string + default = "" +} diff --git a/pyproject.toml b/pyproject.toml index 52be489..db29f04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "raja" -version = "1.2.0" +version = "1.3.0" description = "Add your description here" readme = "README.md" authors = [ @@ -117,11 +117,12 @@ demo-package = { cmd = "pytest tests/integration/test_rajee_package_grant.py -v demo-translation = { cmd = "pytest tests/integration/test_rajee_translation_grant.py -v -s", help = "Run translation grant (TAJ-package) demonstrations" } # AWS deployment -deploy = { sequence = ["_terraform-apply", "_wait-rajee-stable", "_sagemaker-gaps", "_seed-users", "_seed-packages", "_show-outputs"], help = "Deploy standalone RAJA stack, wait for RAJEE, fill V2 gaps, and seed integration fixtures" } +deploy = { sequence = ["_terraform-apply", "_wait-rajee-stable", "_sagemaker-gaps", "_seed-users", "_seed-packages", "_seed-glue-tables", "_show-outputs"], help = "Deploy standalone RAJA stack, wait for RAJEE, fill V2 gaps, and seed integration fixtures" } deploy-fast = { sequence = ["deploy"], help = "Alias for Terraform deploy" } destroy = { sequence = ["_terraform-destroy"], help = "Destroy Terraform stack" } seed-users = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_users", help = "Seed integration test principals into DataZone" } seed-packages = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_packages", help = "Seed quilt3 packages into raja-poc-registry from raja-poc-test and publish the DataZone listing" } +seed-glue-tables = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_glue_tables", help = "Seed LF-native Glue table assets into DataZone and auto-approve subscriber project grants" } sagemaker-gaps = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python scripts/sagemaker_gaps.py", help = "Fill current SageMaker Unified Studio V2 / DataZone Terraform gaps" } # Docker image building @@ -146,11 +147,12 @@ _format = { cmd = "uv run --extra dev ruff format src tests infra lambda_handler _lint-fix = { cmd = "uv run --extra dev ruff check --fix src tests infra lambda_handlers", help = "Internal: fix lint issues" } _typecheck = { cmd = "uv run --extra dev mypy src", help = "Internal: run type checker" } _show-outputs = { cmd = "python scripts/show_outputs.py", help = "Internal: print deployment summary" } -_terraform-apply = { shell = "set -a; [ -f .env ] && . ./.env; set +a; if [ -z \"${RAJA_ADMIN_KEY:-}\" ]; then echo \"Missing RAJA_ADMIN_KEY\" >&2; exit 1; fi; export TF_VAR_raja_admin_key=${RAJA_ADMIN_KEY}; export TF_VAR_raja_default_principal_username=$(python3 -c 'import os; users=[u.strip() for u in os.environ.get(\"RAJA_USERS\", \"\").split(\",\") if u.strip()]; print(users[0] if users else \"\")'); cd infra/terraform && terraform init -input=false && terraform apply -auto-approve -input=false && terraform output -json | python3 -c \"import json,sys; print(json.dumps({k:v['value'] for k,v in json.load(sys.stdin).items()}))\" > ../tf-outputs.json", help = "Internal: deploy Terraform stack and persist outputs" } +_terraform-apply = { shell = "set -a; [ -f .env ] && . ./.env; set +a; if [ -z \"${RAJA_ADMIN_KEY:-}\" ]; then echo \"Missing RAJA_ADMIN_KEY\" >&2; exit 1; fi; export TF_VAR_raja_admin_key=${RAJA_ADMIN_KEY}; export TF_VAR_raja_default_principal_username=$(python3 -c 'import os; users=[u.strip() for u in os.environ.get(\"RAJA_USERS\", \"\").split(\",\") if u.strip()]; print(users[0] if users else \"\")'); export TF_VAR_datazone_projects=$(python3 -c 'import json,os; d=json.load(open(\"infra/tf-outputs.json\")) if os.path.exists(\"infra/tf-outputs.json\") else {}; print(d.get(\"datazone_projects\", \"\"))'); cd infra/terraform && terraform init -input=false && terraform apply -auto-approve -input=false && terraform output -json | python3 -c \"import json,sys; print(json.dumps({k:v['value'] for k,v in json.load(sys.stdin).items()}))\" > ../tf-outputs.json", help = "Internal: deploy Terraform stack and persist outputs" } _wait-rajee-stable = { shell = "set -a; [ -f .env ] && . ./.env; set +a; aws ecs wait services-stable --cluster raja-standalone-rajee-cluster --services raja-standalone-rajee-service", help = "Internal: wait for the RAJEE ECS service to reach a stable state" } _sagemaker-gaps = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python scripts/sagemaker_gaps.py", help = "Internal: fill current SageMaker Unified Studio V2 / DataZone Terraform gaps" } _seed-users = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_users", help = "Internal: seed integration test principals into DataZone" } _seed-packages = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_packages", help = "Internal: seed quilt packages and DataZone listings" } +_seed-glue-tables = { shell = "set -a; [ -f .env ] && . ./.env; set +a; uv run --extra aws python -m scripts.seed_glue_tables", help = "Internal: seed Glue table assets and DataZone subscriptions" } _terraform-destroy = { shell = "set -a; [ -f .env ] && . ./.env; set +a; if [ -z \"${RAJA_ADMIN_KEY:-}\" ]; then echo \"Missing RAJA_ADMIN_KEY\" >&2; exit 1; fi; export TF_VAR_raja_admin_key=${RAJA_ADMIN_KEY}; export TF_VAR_raja_default_principal_username=$(python3 -c 'import os; users=[u.strip() for u in os.environ.get(\"RAJA_USERS\", \"\").split(\",\") if u.strip()]; print(users[0] if users else \"\")'); cd infra/terraform && terraform init -input=false && terraform destroy -auto-approve -input=false && rm -f ../tf-outputs.json", help = "Internal: destroy Terraform stack" } [tool.mypy] diff --git a/scripts/lf_native_poc/README.md b/scripts/lf_native_poc/README.md new file mode 100644 index 0000000..9c9ea15 --- /dev/null +++ b/scripts/lf_native_poc/README.md @@ -0,0 +1,69 @@ +# LF-Native Import POC + +This folder contains the isolated probes that turned the LF-native +Iceberg/DataZone blocker from guesswork into a reproducible path. + +Entrypoints: + +- `python -m scripts.lf_native_poc.package_tag_import_poc` +- `python -m scripts.lf_native_poc.create_throwaway_subscriber` + +## What Actually Worked + +- Do not use manual `create_asset` Glue-table assets for LF-native tables. + They can reach `ACCEPTED` subscriptions without producing managed LF grants. +- Use a real DataZone-managed Glue import and subscribe against the imported + listings, not the manual ones. +- Give the owner Glue data source role Lake Formation access to the database, + every table, and every table location. This was the missing step that let the + data source import all four tables instead of only `package_tag`. +- Re-run the Glue data source import whenever DataZone has imported fewer than + the expected four tables. +- Keep the default Lakehouse blueprint healthy. Fresh subscriber projects only + worked after fixing the live Tooling and Lakehouse blueprint configs. +- The successful LF fulfillment shape is not a direct table grant to the + subscriber environment role ARN. DataZone writes conditional `SELECT` grants + on `712023778557:IAMPrincipals` scoped by `context.datazone.projectId`. + +## Final Working Path + +1. Mirror the four Iceberg tables into `raja-standalone-iceberg-lf`. +2. Ensure the owner project has a DataZone Glue data source for that database. +3. Grant the data source role LF `ALL` on the database and tables plus + `DATA_LOCATION_ACCESS` on all table locations. +4. Start or restart the DataZone Glue import. +5. Wait for imported listings for: + - `package_entry` + - `package_manifest` + - `package_revision` + - `package_tag` +6. Subscribe `bio` and `compute` against those imported listings. +7. Verify LF conditional `SELECT` grants exist for both subscriber project IDs. + +## Working Evidence + +- Imported listings now used by the main seed flow: + - `package_entry` -> `cll99ezfwkw8pz` + - `package_manifest` -> `6q5wgwn4bjha5j` + - `package_revision` -> `apj78613rljtpj` + - `package_tag` -> `5za88zhymk4qzr` +- Completed DataZone grant objects now exist for: + - `package_entry` + - `package_manifest` + - `package_revision` + - `package_tag` +- Final LF state now includes conditional `SELECT` grants for both subscriber + project IDs: + - `bm7eqh5dc6olrb` + - `b3byg401pnpjjb` + +## Cleanup + +- Delete the throwaway subscriber project `60st0m21xz0a3r` if it is no longer + needed. +- Old failed `package_tag` grant records from earlier experiments still exist in + DataZone history. They do not block the working path, but they are noise when + inspecting grant history. +- Terraform still does not fully own the live DataZone domain role selection if + `ignore_changes` remains on `domain_execution_role` / `service_role`. That is + worth reconciling separately so the production-shaped role choice is durable. diff --git a/scripts/lf_native_poc/__init__.py b/scripts/lf_native_poc/__init__.py new file mode 100644 index 0000000..0469b5b --- /dev/null +++ b/scripts/lf_native_poc/__init__.py @@ -0,0 +1 @@ +"""Standalone LF-native/DataZone import POCs.""" diff --git a/scripts/lf_native_poc/create_throwaway_subscriber.py b/scripts/lf_native_poc/create_throwaway_subscriber.py new file mode 100644 index 0000000..dcaedd7 --- /dev/null +++ b/scripts/lf_native_poc/create_throwaway_subscriber.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +"""Create a disposable DataZone subscriber project and Lakehouse environment.""" + +from __future__ import annotations + +import argparse +import json +import time +from datetime import datetime +from typing import Any + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +DEFAULT_DOMAIN_ID = "dzd-6w14ep5r5owwh3" +DEFAULT_PROFILE_NAME = "All capabilities" +FALLBACK_PROFILE_NAMES = ("All capabilities", "raja-default-profile") +LAKEHOUSE_BLUEPRINT_ID = "d6y5smpdi8x9lz" +LAKEHOUSE_ENVIRONMENT_NAME = "Lakehouse Database" + + +class ThrowawaySubscriberError(RuntimeError): + """Raised when the throwaway subscriber cannot be created.""" + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--domain-id", default=DEFAULT_DOMAIN_ID) + parser.add_argument("--region", default="us-east-1") + parser.add_argument("--profile-name", default=DEFAULT_PROFILE_NAME) + parser.add_argument("--name-prefix", default="raja-throwaway") + parser.add_argument( + "--description", + default="Throwaway subscriber project for LF-native imported Glue grant debugging", + ) + parser.add_argument("--timeout-seconds", type=int, default=900) + return parser.parse_args() + + +def _list_all(method: Any, key: str, **kwargs: Any) -> list[dict[str, Any]]: + paginator = method.__self__.get_paginator(method.__name__) + items: list[dict[str, Any]] = [] + for page in paginator.paginate(**kwargs): + page_items = page.get(key, []) + if isinstance(page_items, list): + items.extend(page_items) + return items + + +def _extract_provisioned_value(environment: dict[str, Any], name: str) -> str: + resources = environment.get("provisionedResources") or [] + if not isinstance(resources, list): + return "" + for resource in resources: + if not isinstance(resource, dict): + continue + if str(resource.get("name") or "") == name: + return str(resource.get("value") or "") + return "" + + +def _timestamped_name(prefix: str) -> str: + return f"{prefix}-{datetime.now().strftime('%Y%m%d-%H%M%S')}" + + +def _profile_env_config_ids(profile_detail: dict[str, Any]) -> set[str]: + configs = profile_detail.get("environmentConfigurations") or [] + if not isinstance(configs, list): + return set() + return { + str(env_config.get("environmentBlueprintId") or "") + for env_config in configs + if isinstance(env_config, dict) + } + + +def _find_project_profile( + client: Any, *, domain_id: str, profile_name: str +) -> tuple[str, dict[str, Any]]: + profiles = _list_all(client.list_project_profiles, "items", domainIdentifier=domain_id) + for profile in profiles: + if str(profile.get("name") or "") == profile_name: + profile_id = str(profile.get("id") or "") + if not profile_id: + break + detail = client.get_project_profile( + domainIdentifier=domain_id, + identifier=profile_id, + ) + return profile_id, detail + raise ThrowawaySubscriberError(f"project profile {profile_name!r} was not found") + + +def _choose_project_profile( + client: Any, *, domain_id: str, requested_profile_name: str +) -> tuple[str, str, dict[str, Any]]: + candidates = [requested_profile_name] + for fallback in FALLBACK_PROFILE_NAMES: + if fallback not in candidates: + candidates.append(fallback) + + first_found: tuple[str, str, dict[str, Any]] | None = None + for candidate in candidates: + try: + profile_id, detail = _find_project_profile( + client, domain_id=domain_id, profile_name=candidate + ) + except ThrowawaySubscriberError: + continue + if first_found is None: + first_found = (candidate, profile_id, detail) + if LAKEHOUSE_BLUEPRINT_ID in _profile_env_config_ids(detail): + return candidate, profile_id, detail + + if first_found is not None: + return first_found + raise ThrowawaySubscriberError("no usable project profile was found") + + +def _wait_for_project_ready( + client: Any, *, domain_id: str, project_id: str, timeout_seconds: int +) -> dict[str, Any]: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + project = client.get_project(domainIdentifier=domain_id, identifier=project_id) + status = str(project.get("projectStatus") or project.get("status") or "") + if status in {"ACTIVE", "SUCCESS", ""}: + return project + time.sleep(5) + raise ThrowawaySubscriberError(f"project {project_id} did not become ready") + + +def _lakehouse_env_config( + profile: dict[str, Any], *, project_profile_id: str +) -> dict[str, Any]: + for env_config in profile.get("environmentConfigurations") or []: + if str(env_config.get("environmentBlueprintId") or "") == LAKEHOUSE_BLUEPRINT_ID: + return env_config + raise ThrowawaySubscriberError( + f"project profile {project_profile_id} has no Lakehouse environment configuration" + ) + + +def _find_lakehouse_environment( + client: Any, *, domain_id: str, project_id: str +) -> dict[str, Any] | None: + environments = _list_all( + client.list_environments, + "items", + domainIdentifier=domain_id, + projectIdentifier=project_id, + ) + for environment in environments: + if str(environment.get("environmentBlueprintId") or "") == LAKEHOUSE_BLUEPRINT_ID: + return environment + for environment in environments: + if str(environment.get("name") or "") == LAKEHOUSE_ENVIRONMENT_NAME: + return environment + return None + + +def _wait_for_environment_active( + client: Any, *, domain_id: str, environment_id: str, timeout_seconds: int +) -> dict[str, Any]: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + environment = client.get_environment( + domainIdentifier=domain_id, + identifier=environment_id, + ) + status = str(environment.get("status") or "") + if status == "ACTIVE": + return environment + if status in {"CREATE_FAILED", "UPDATE_FAILED", "DELETE_FAILED"}: + raise ThrowawaySubscriberError( + f"environment {environment_id} entered terminal status {status}" + ) + time.sleep(10) + raise ThrowawaySubscriberError(f"environment {environment_id} did not become ACTIVE") + + +def _wait_for_auto_environment( + client: Any, *, domain_id: str, project_id: str, timeout_seconds: int +) -> dict[str, Any]: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + environment = _find_lakehouse_environment( + client, domain_id=domain_id, project_id=project_id + ) + if environment is None: + time.sleep(10) + continue + environment_id = str(environment.get("id") or "") + if not environment_id: + time.sleep(10) + continue + return _wait_for_environment_active( + client, + domain_id=domain_id, + environment_id=environment_id, + timeout_seconds=max(30, int(deadline - time.time())), + ) + raise ThrowawaySubscriberError( + f"project {project_id} did not auto-create a Lakehouse environment" + ) + + +def main() -> None: + args = _parse_args() + client = boto3.client("datazone", region_name=args.region) + + project_name = _timestamped_name(args.name_prefix) + try: + profile_name, profile_id, profile = _choose_project_profile( + client, domain_id=args.domain_id, requested_profile_name=args.profile_name + ) + created_project = client.create_project( + domainIdentifier=args.domain_id, + name=project_name, + description=args.description, + projectProfileId=profile_id, + ) + project_id = str(created_project.get("id") or "") + if not project_id: + raise ThrowawaySubscriberError("create_project returned no project id") + + _wait_for_project_ready( + client, + domain_id=args.domain_id, + project_id=project_id, + timeout_seconds=args.timeout_seconds, + ) + + env_config = _lakehouse_env_config(profile, project_profile_id=profile_id) + env_config_id = str(env_config.get("id") or "") + deployment_mode = str(env_config.get("deploymentMode") or "") + + if deployment_mode == "ON_CREATE": + environment = _wait_for_auto_environment( + client, + domain_id=args.domain_id, + project_id=project_id, + timeout_seconds=args.timeout_seconds, + ) + environment_id = str(environment.get("id") or "") + else: + created_environment = client.create_environment( + domainIdentifier=args.domain_id, + projectIdentifier=project_id, + name=LAKEHOUSE_ENVIRONMENT_NAME, + environmentConfigurationId=env_config_id, + ) + environment_id = str(created_environment.get("id") or "") + if not environment_id: + raise ThrowawaySubscriberError("create_environment returned no environment id") + environment = _wait_for_environment_active( + client, + domain_id=args.domain_id, + environment_id=environment_id, + timeout_seconds=args.timeout_seconds, + ) + except (ClientError, BotoCoreError) as exc: + raise ThrowawaySubscriberError(str(exc)) from exc + + result = { + "project_name": project_name, + "project_id": project_id, + "project_profile_name": profile_name, + "project_profile_id": profile_id, + "environment_configuration_id": env_config_id, + "environment_deployment_mode": deployment_mode, + "environment_name": LAKEHOUSE_ENVIRONMENT_NAME, + "environment_id": environment_id, + "environment_user_role_arn": _extract_provisioned_value(environment, "userRoleArn"), + } + print(json.dumps(result, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/lf_native_poc/package_tag_import_poc.py b/scripts/lf_native_poc/package_tag_import_poc.py new file mode 100644 index 0000000..fdbf747 --- /dev/null +++ b/scripts/lf_native_poc/package_tag_import_poc.py @@ -0,0 +1,556 @@ +#!/usr/bin/env python3 +"""POC for DataZone-managed Glue import on the LF-native package_tag table.""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from scripts.seed_glue_tables import ( # noqa: E402 + _asset_search, + _ensure_glue_data_source, + _ensure_subscription, + _find_glue_data_source, + _get_asset, + _list_subscription_grants, +) +from scripts.tf_outputs import load_tf_outputs # noqa: E402 + + +class PocError(RuntimeError): + """Raised when the package_tag import POC cannot proceed.""" + + +@dataclass(frozen=True) +class PocContext: + region: str + account_id: str + domain_id: str + database_name: str + owner_project_id: str + subscriber_project_id: str + owner_environment_id: str + subscriber_environment_id: str + table_name: str + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--table-name", default="package_tag", help="Glue table to probe.") + parser.add_argument( + "--subscriber-key", + default="bio", + help="Logical subscriber project key from tf outputs (default: bio).", + ) + parser.add_argument( + "--grant-publisher-role", + action="store_true", + help="Grant LF access to the owner Glue data source role before rerunning import.", + ) + parser.add_argument( + "--restart-import", + action="store_true", + help="Start a fresh DataZone Glue data source run.", + ) + return parser.parse_args() + + +def _extract_provisioned_value(environment: dict[str, Any], name: str) -> str: + resources = environment.get("provisionedResources") or [] + if not isinstance(resources, list): + return "" + for resource in resources: + if not isinstance(resource, dict): + continue + if str(resource.get("name") or "") == name: + return str(resource.get("value") or "") + return "" + + +def _form_names(asset_detail: dict[str, Any]) -> list[str]: + forms = asset_detail.get("formsOutput") or [] + if not isinstance(forms, list): + return [] + names: list[str] = [] + for form in forms: + if not isinstance(form, dict): + continue + name = str(form.get("formName") or "") + if name: + names.append(name) + return names + + +def _asset_candidate_summary(asset_detail: dict[str, Any]) -> dict[str, Any]: + listing = asset_detail.get("listing") or {} + return { + "asset_id": str(asset_detail.get("id") or ""), + "name": str(asset_detail.get("name") or ""), + "created_by": str(asset_detail.get("createdBy") or ""), + "created_at": str(asset_detail.get("createdAt") or ""), + "external_identifier": str(asset_detail.get("externalIdentifier") or ""), + "listing_id": str(listing.get("listingId") or ""), + "listing_status": str(listing.get("listingStatus") or ""), + "form_names": _form_names(asset_detail), + } + + +def _score_asset_as_imported_candidate( + asset_detail: dict[str, Any], *, known_asset_ids: set[str] +) -> tuple[int, list[str]]: + score = 0 + reasons: list[str] = [] + asset_id = str(asset_detail.get("id") or "") + forms = set(_form_names(asset_detail)) + created_by = str(asset_detail.get("createdBy") or "") + + if asset_id and asset_id not in known_asset_ids: + score += 10 + reasons.append("new asset id after import run") + if "DataSourceReferenceForm" in forms: + score += 8 + reasons.append("has DataSourceReferenceForm") + if created_by == "SYSTEM": + score += 4 + reasons.append("created by SYSTEM") + if len(forms) > 1: + score += 2 + reasons.append("has multiple forms") + return score, reasons + + +def _get_context(args: argparse.Namespace) -> PocContext: + outputs = load_tf_outputs() + project_ids = outputs.get("datazone_project_ids") or {} + environment_ids = outputs.get("datazone_project_environment_ids") or {} + if not isinstance(project_ids, dict) or not isinstance(environment_ids, dict): + raise PocError( + "tf outputs are missing datazone_project_ids " + "or datazone_project_environment_ids" + ) + + account_id = str(boto3.client("sts").get_caller_identity()["Account"]) + region = "us-east-1" + domain_id = str(outputs.get("datazone_domain_id") or "") + database_name = str(outputs.get("iceberg_lf_database_name") or "") + owner_project_id = str(project_ids.get("alpha") or "") + subscriber_project_id = str(project_ids.get(args.subscriber_key) or "") + owner_environment_id = str(environment_ids.get("alpha") or "") + subscriber_environment_id = str(environment_ids.get(args.subscriber_key) or "") + + if not domain_id or not database_name: + raise PocError("tf outputs are missing datazone_domain_id or iceberg_lf_database_name") + if not owner_project_id or not subscriber_project_id: + raise PocError("tf outputs are missing owner or subscriber project IDs") + if not owner_environment_id or not subscriber_environment_id: + raise PocError("tf outputs are missing owner or subscriber environment IDs") + + return PocContext( + region=region, + account_id=account_id, + domain_id=domain_id, + database_name=database_name, + owner_project_id=owner_project_id, + subscriber_project_id=subscriber_project_id, + owner_environment_id=owner_environment_id, + subscriber_environment_id=subscriber_environment_id, + table_name=args.table_name, + ) + + +def _get_environment_role_arn(client: Any, *, domain_id: str, environment_id: str) -> str: + response = client.get_environment(domainIdentifier=domain_id, identifier=environment_id) + role_arn = _extract_provisioned_value(response, "userRoleArn") + if not role_arn: + raise PocError(f"environment {environment_id} is missing userRoleArn") + return role_arn + + +def _get_data_source_role_arn(client: Any, *, domain_id: str, data_source_id: str) -> str: + response = client.get_data_source(domainIdentifier=domain_id, identifier=data_source_id) + config = response.get("configuration") or {} + glue_config = config.get("glueRunConfiguration") or {} + role_arn = str(glue_config.get("dataAccessRole") or "") + if not role_arn: + raise PocError( + f"data source {data_source_id} is missing " + "glueRunConfiguration.dataAccessRole" + ) + return role_arn + + +def _get_glue_table(glue_client: Any, *, database_name: str, table_name: str) -> dict[str, Any]: + try: + return glue_client.get_table(DatabaseName=database_name, Name=table_name)["Table"] + except (ClientError, BotoCoreError) as exc: + raise PocError(f"failed to read Glue table {database_name}.{table_name}") from exc + + +def _s3_uri_to_arn(location: str) -> str: + if not location.startswith("s3://"): + raise PocError(f"expected s3:// location, got {location!r}") + bucket_and_key = location.removeprefix("s3://").rstrip("/") + bucket, _, key = bucket_and_key.partition("/") + if not bucket: + raise PocError(f"could not parse S3 bucket from {location!r}") + if not key: + return f"arn:aws:s3:::{bucket}" + return f"arn:aws:s3:::{bucket}/{key}" + + +def _grant_permission( + lf_client: Any, + *, + principal_arn: str, + resource: dict[str, Any], + permissions: list[str], + grant_permissions: list[str], +) -> None: + try: + lf_client.grant_permissions( + Principal={"DataLakePrincipalIdentifier": principal_arn}, + Resource=resource, + Permissions=permissions, + PermissionsWithGrantOption=grant_permissions, + ) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + message = str(exc) + if code == "AlreadyExistsException" or "Permissions modification is invalid" in message: + return + raise + + +def _ensure_publisher_lf_permissions( + *, + lf_client: Any, + principal_arn: str, + database_name: str, + table_name: str, + table_location: str, +) -> None: + data_location_arn = _s3_uri_to_arn(table_location) + _grant_permission( + lf_client, + principal_arn=principal_arn, + resource={"Database": {"Name": database_name}}, + permissions=["ALL"], + grant_permissions=["ALL"], + ) + _grant_permission( + lf_client, + principal_arn=principal_arn, + resource={"Table": {"DatabaseName": database_name, "Name": table_name}}, + permissions=["ALL"], + grant_permissions=["ALL"], + ) + _grant_permission( + lf_client, + principal_arn=principal_arn, + resource={"DataLocation": {"ResourceArn": data_location_arn}}, + permissions=["DATA_LOCATION_ACCESS"], + grant_permissions=["DATA_LOCATION_ACCESS"], + ) + + +def _start_import_run(client: Any, *, domain_id: str, data_source_id: str) -> str: + before = client.get_data_source(domainIdentifier=domain_id, identifier=data_source_id) + last_run_at = str(before.get("lastRunAt") or "") + client.start_data_source_run( + domainIdentifier=domain_id, + dataSourceIdentifier=data_source_id, + ) + return last_run_at + + +def _wait_for_import_run( + client: Any, + *, + domain_id: str, + data_source_id: str, + previous_last_run_at: str, + timeout_seconds: int = 300, +) -> dict[str, Any]: + deadline = time.time() + timeout_seconds + observed_new_run = False + while time.time() < deadline: + response = client.get_data_source(domainIdentifier=domain_id, identifier=data_source_id) + current_last_run_at = str(response.get("lastRunAt") or "") + current_last_run_status = str(response.get("lastRunStatus") or "") + if current_last_run_at and current_last_run_at != previous_last_run_at: + observed_new_run = True + if observed_new_run and current_last_run_status in {"SUCCESS", "FAILED"}: + return response + time.sleep(5) + raise PocError(f"timed out waiting for data source run {data_source_id}") + + +def _find_matching_assets( + client: Any, + *, + domain_id: str, + owner_project_id: str, + table_name: str, + database_name: str, +) -> list[dict[str, Any]]: + matches: list[dict[str, Any]] = [] + for item in _asset_search(client, domain_id, owner_project_id, table_name): + if str(item.get("name") or "") != table_name: + continue + asset_id = str(item.get("identifier") or "") + if not asset_id: + continue + detail = _get_asset(client, domain_id, asset_id) + if str(detail.get("typeIdentifier") or "") != "amazon.datazone.GlueTableAssetType": + continue + external_identifier = str(detail.get("externalIdentifier") or "") + if database_name not in external_identifier and table_name not in external_identifier: + continue + matches.append(detail) + return matches + + +def _choose_imported_asset( + assets: list[dict[str, Any]], *, known_asset_ids: set[str] +) -> tuple[dict[str, Any] | None, list[dict[str, Any]]]: + scored: list[tuple[int, dict[str, Any], list[str]]] = [] + for asset in assets: + score, reasons = _score_asset_as_imported_candidate(asset, known_asset_ids=known_asset_ids) + scored.append((score, asset, reasons)) + + scored.sort(key=lambda item: item[0], reverse=True) + summaries = [] + for score, asset, reasons in scored: + summary = _asset_candidate_summary(asset) + summary["import_score"] = score + summary["import_reasons"] = reasons + summaries.append(summary) + + if not scored or scored[0][0] <= 0: + return None, summaries + return scored[0][1], summaries + + +def _listing_id_for_asset(asset_detail: dict[str, Any]) -> str: + listing = asset_detail.get("listing") or {} + return str(listing.get("listingId") or "") + + +def _lf_permissions_for_table( + lf_client: Any, *, database_name: str, table_name: str +) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + resource = {"Table": {"DatabaseName": database_name, "Name": table_name}} + next_token: str | None = None + while True: + kwargs: dict[str, Any] = {"Resource": resource} + if next_token: + kwargs["NextToken"] = next_token + page = lf_client.list_permissions(**kwargs) + page_items = page.get("PrincipalResourcePermissions") or [] + if isinstance(page_items, list): + items.extend(page_items) + next_token = page.get("NextToken") + if not next_token: + return items + return items + + +def main() -> None: + args = _parse_args() + ctx = _get_context(args) + + datazone = boto3.client("datazone", region_name=ctx.region) + glue = boto3.client("glue", region_name=ctx.region) + lf = boto3.client("lakeformation", region_name=ctx.region) + + owner_role_arn = _get_environment_role_arn( + datazone, domain_id=ctx.domain_id, environment_id=ctx.owner_environment_id + ) + subscriber_role_arn = _get_environment_role_arn( + datazone, domain_id=ctx.domain_id, environment_id=ctx.subscriber_environment_id + ) + + _ensure_glue_data_source( + datazone, + domain_id=ctx.domain_id, + owner_project_id=ctx.owner_project_id, + database_name=ctx.database_name, + dry_run=False, + ) + data_source_name = f"{ctx.database_name}-datasource" + data_source = _find_glue_data_source( + datazone, ctx.domain_id, ctx.owner_project_id, data_source_name + ) + if data_source is None: + raise PocError(f"owner Glue data source {data_source_name!r} was not found") + data_source_id = str(data_source.get("dataSourceId") or data_source.get("id") or "") + if not data_source_id: + raise PocError(f"data source {data_source_name!r} is missing an identifier") + + data_source_role_arn = _get_data_source_role_arn( + datazone, domain_id=ctx.domain_id, data_source_id=data_source_id + ) + glue_table = _get_glue_table( + glue, database_name=ctx.database_name, table_name=ctx.table_name + ) + table_location = str((glue_table.get("StorageDescriptor") or {}).get("Location") or "") + if not table_location: + raise PocError(f"Glue table {ctx.table_name} is missing StorageDescriptor.Location") + + before_assets = _find_matching_assets( + datazone, + domain_id=ctx.domain_id, + owner_project_id=ctx.owner_project_id, + table_name=ctx.table_name, + database_name=ctx.database_name, + ) + before_asset_ids = {str(asset.get("id") or "") for asset in before_assets} + + print("=" * 60) + print("LF-native imported Glue asset POC") + print("=" * 60) + print(f"Table: {ctx.table_name}") + print(f"Database: {ctx.database_name}") + print(f"Data source: {data_source_name} ({data_source_id})") + print(f"Owner env role: {owner_role_arn}") + print(f"Data source role: {data_source_role_arn}") + print(f"Subscriber env role: {subscriber_role_arn}") + print(f"Table location: {table_location}") + print(f"Existing matching assets before run: {len(before_assets)}") + + if args.grant_publisher_role: + _ensure_publisher_lf_permissions( + lf_client=lf, + principal_arn=data_source_role_arn, + database_name=ctx.database_name, + table_name=ctx.table_name, + table_location=table_location, + ) + print("Publisher LF grants: ensured for data source role") + + run_result = datazone.get_data_source( + domainIdentifier=ctx.domain_id, identifier=data_source_id + ) + if args.restart_import: + previous_last_run_at = _start_import_run( + datazone, domain_id=ctx.domain_id, data_source_id=data_source_id + ) + print( + "Import run: started " + f"(previous lastRunAt={previous_last_run_at or 'none'})" + ) + run_result = _wait_for_import_run( + datazone, + domain_id=ctx.domain_id, + data_source_id=data_source_id, + previous_last_run_at=previous_last_run_at, + ) + print( + "Import run result: " + f"{run_result.get('lastRunStatus')} " + f"assetCount={run_result.get('lastRunAssetCount')}" + ) + if run_result.get("lastRunErrorMessage"): + print(f"Import run error: {json.dumps(run_result['lastRunErrorMessage'])}") + + after_assets = _find_matching_assets( + datazone, + domain_id=ctx.domain_id, + owner_project_id=ctx.owner_project_id, + table_name=ctx.table_name, + database_name=ctx.database_name, + ) + imported_asset, asset_summaries = _choose_imported_asset( + after_assets, known_asset_ids=before_asset_ids + ) + print("\nAsset candidates:") + print(json.dumps(asset_summaries, indent=2)) + + if imported_asset is None: + print( + "\nRESULT: No imported asset candidate was identified.\n" + "If the data source run succeeded, DataZone may be suppressing import because a\n" + "manual asset already exists for the same Glue table/external identifier." + ) + return + + imported_listing_id = _listing_id_for_asset(imported_asset) + if not imported_listing_id: + raise PocError("imported asset candidate has no active listing") + + request_id = _ensure_subscription( + datazone, + domain_id=ctx.domain_id, + listing=type( + "Listing", + (), + { + "listing_id": imported_listing_id, + "listing_revision": "", + "asset_id": str(imported_asset.get("id") or ""), + "asset_revision": str(imported_asset.get("revision") or ""), + "name": ctx.table_name, + "owner_project_id": ctx.owner_project_id, + }, + )(), + project_id=ctx.subscriber_project_id, + dry_run=False, + force=False, + ) + print(f"\nSubscription request: {request_id}") + + time.sleep(10) + grants = _list_subscription_grants(datazone, ctx.domain_id, imported_listing_id) + table_permissions = _lf_permissions_for_table( + lf, database_name=ctx.database_name, table_name=ctx.table_name + ) + subscriber_entries = [ + item + for item in table_permissions + if str((item.get("Principal") or {}).get("DataLakePrincipalIdentifier") or "") + == subscriber_role_arn + ] + + print("\nDataZone grant objects:") + print(json.dumps(grants, indent=2, default=str)) + print("\nSubscriber LF table permissions:") + print(json.dumps(subscriber_entries, indent=2, default=str)) + + if grants and subscriber_entries: + print( + "\nRESULT: Imported-asset path produced both DataZone grant objects and\n" + "subscriber LF permissions. This strongly supports H1/H2 as the root cause." + ) + return + if grants and not subscriber_entries: + print( + "\nRESULT: DataZone created grant objects, but subscriber LF permissions are " + "still missing.\n" + "That points to downstream fulfillment failure after grant object creation." + ) + return + print( + "\nRESULT: Even after rerunning the managed Glue import, no grant objects or " + "subscriber LF\n" + "permissions appeared for the imported candidate." + ) + + +if __name__ == "__main__": + try: + main() + except PocError as exc: + print(f"✗ {exc}", file=sys.stderr) + sys.exit(1) diff --git a/scripts/sagemaker_gaps.py b/scripts/sagemaker_gaps.py index d7ada5d..373a336 100644 --- a/scripts/sagemaker_gaps.py +++ b/scripts/sagemaker_gaps.py @@ -4,9 +4,8 @@ This script patches the current Terraform provider gaps for RAJA's V2 domain: 1. Ensure the default project profile exists and is ENABLED. -2. Resolve the three configured seed projects in the V2 domain. -3. Ensure the root domain unit grants CREATE_ASSET_TYPE to project owners. -4. Refresh infra/tf-outputs.json with discovered project IDs. +2. Ensure the root domain unit grants CREATE_ASSET_TYPE to project owners. +3. Refresh infra/tf-outputs.json with discovered environment IDs. It is safe to rerun. Existing resources are reused. """ @@ -40,7 +39,7 @@ DEFAULT_PROFILE_DESCRIPTION = "Default project profile for RAJA Terraform-managed V2 projects" SEED_CONFIG = load_seed_config() PROJECT_SPECS = { - project.project_name: { + project.key: { "name": project.display_name, "description": ( f"{project.display_name} project in the RAJA symmetric seed topology. " @@ -51,9 +50,11 @@ for project in SEED_CONFIG.projects } ENVIRONMENT_SPECS = { - project.project_name: f"raja-{project.key}-env" + project.key: f"raja-{project.key}-env" for project in SEED_CONFIG.projects } +LAKEHOUSE_BLUEPRINT_ID = "d6y5smpdi8x9lz" +LAKEHOUSE_ENVIRONMENT_NAME = "Lakehouse Database" CUSTOM_BLUEPRINT_CANDIDATE_NAMES = ( "raja-registry-blueprint", "raja-poc", @@ -149,16 +150,35 @@ def _get_context(args: argparse.Namespace) -> Context: ) -def _existing_project_id_from_outputs(outputs: dict[str, Any], project_name: str) -> str: - raw_projects = outputs.get("datazone_project_ids") - if isinstance(raw_projects, dict): - value = raw_projects.get(project_name) - if isinstance(value, str) and value: - return value - raw_legacy = outputs.get(f"datazone_{project_name}_project_id") - if isinstance(raw_legacy, str) and raw_legacy: - return raw_legacy - return "" +def _load_project_ids_from_outputs(outputs: dict[str, Any]) -> dict[str, str]: + """Read {project.key: project_id} from Terraform outputs. + + Prefer the canonical `datazone_project_ids` map written by this script. + Fall back to legacy flat Terraform output names for older stacks. + """ + raw_project_ids = outputs.get("datazone_project_ids") + project_ids: dict[str, str] = {} + if isinstance(raw_project_ids, dict): + project_ids = { + str(key): str(value or "").strip() + for key, value in raw_project_ids.items() + if isinstance(key, str) + } + + legacy_key_map = { + "alpha": "datazone_owner_project_id", + "bio": "datazone_users_project_id", + "compute": "datazone_guests_project_id", + } + + resolved: dict[str, str] = {} + for project in SEED_CONFIG.projects: + resolved[project.key] = ( + project_ids.get(project.key) + or str(outputs.get(f"datazone_{project.project_name}_project_id") or "").strip() + or str(outputs.get(legacy_key_map.get(project.key, "")) or "").strip() + ) + return resolved def _build_datazone_projects_json( @@ -166,10 +186,10 @@ def _build_datazone_projects_json( environment_ids: dict[str, str], ) -> str: projects = { - project.project_name: { - "project_id": project_ids.get(project.project_name, ""), + project.key: { + "project_id": project_ids.get(project.key, ""), "project_label": project.display_name, - "environment_id": environment_ids.get(project.project_name, ""), + "environment_id": environment_ids.get(project.key, ""), } for project in SEED_CONFIG.projects } @@ -217,46 +237,6 @@ def _ensure_project_profile(client: Any, ctx: Context) -> str: return profile_id -def _list_projects(client: Any, ctx: Context) -> list[dict[str, Any]]: - return _list_all(client.list_projects, "items", domainIdentifier=ctx.domain_id) - - -def _ensure_projects(client: Any, ctx: Context, profile_id: str) -> dict[str, str]: - projects = _list_projects(client, ctx) - by_name = {str(project["name"]): project for project in projects} - by_id = {str(project["id"]): project for project in projects} - ensured: dict[str, str] = {} - - for key, spec in PROJECT_SPECS.items(): - existing = by_name.get(spec["name"]) - if existing is None: - existing_output_id = _existing_project_id_from_outputs(ctx.outputs, key) - candidate = by_id.get(existing_output_id) if existing_output_id else None - if candidate is not None and str(candidate.get("name") or "") == spec["name"]: - existing = candidate - if existing: - project_id = str(existing["id"]) - ensured[key] = project_id - print(f"Project {key}: {spec['name']} ({project_id})") - continue - - print(f"Project {key}: {spec['name']} (missing)") - if ctx.dry_run: - print(" [DRY-RUN] Would create V2 project") - ensured[key] = f"dry-run-{key}" - continue - - created = client.create_project( - domainIdentifier=ctx.domain_id, - name=spec["name"], - description=spec["description"], - projectProfileId=profile_id, - ) - project_id = str(created["id"]) - ensured[key] = project_id - print(f" Created project {project_id}") - - return ensured def _ensure_asset_type_grant(client: Any, ctx: Context) -> None: @@ -455,7 +435,7 @@ def _discover_environment_ids( ) -> dict[str, str]: discovered: dict[str, str] = {} - for key, environment_name in ENVIRONMENT_SPECS.items(): + for key, legacy_environment_name in ENVIRONMENT_SPECS.items(): project_id = project_ids.get(key, "") if not project_id: continue @@ -467,13 +447,38 @@ def _discover_environment_ids( domainIdentifier=ctx.domain_id, projectIdentifier=project_id, ) - by_name = {str(item.get("name") or ""): item for item in items} - item = by_name.get(environment_name) + + item = next( + ( + candidate + for candidate in items + if str(candidate.get("environmentBlueprintId") or "") == LAKEHOUSE_BLUEPRINT_ID + and str(candidate.get("status") or "") == "ACTIVE" + ), + None, + ) + if item is None: + item = next( + ( + candidate + for candidate in items + if str(candidate.get("name") or "") == LAKEHOUSE_ENVIRONMENT_NAME + and str(candidate.get("status") or "") == "ACTIVE" + ), + None, + ) + if item is None: + by_name = {str(candidate.get("name") or ""): candidate for candidate in items} + item = by_name.get(legacy_environment_name) if not item: - print(f"Environment {key}: {environment_name} (missing)") + print( + f"Environment {key}: {LAKEHOUSE_ENVIRONMENT_NAME} " + f"(missing; legacy name {legacy_environment_name!r} not found either)" + ) continue environment_project_id = str(item.get("projectId") or "") environment_id = str(item.get("id") or "") + environment_name = str(item.get("name") or "") if environment_project_id and project_id != environment_project_id: print( f"Environment {key}: {environment_name} ({environment_id}) " @@ -667,6 +672,7 @@ def _ensure_subscription_grant( print(f" Accepted subscription grant for {project_key}") + def _ensure_default_subscription_grants( client: Any, ctx: Context, project_ids: dict[str, str] ) -> None: @@ -676,8 +682,13 @@ def _ensure_default_subscription_grants( def _print_import_hints(ctx: Context, project_ids: dict[str, str]) -> None: print("\nTerraform import hints:") - for key, project_id in project_ids.items(): - print(f" terraform import aws_datazone_project.{key} {ctx.domain_id}:{project_id}") + for project in SEED_CONFIG.projects: + project_id = project_ids.get(project.key, "") + if project_id: + print( + f" terraform import aws_datazone_project.{project.project_name}" + f" {ctx.domain_id}:{project_id}" + ) def main() -> None: @@ -692,7 +703,7 @@ def main() -> None: if ctx.dry_run: print("Mode: DRY-RUN") profile_id = _ensure_project_profile(client, ctx) - project_ids = _ensure_projects(client, ctx, profile_id) + project_ids = _load_project_ids_from_outputs(ctx.outputs) _ensure_asset_type_grant(client, ctx) _ensure_default_subscription_grants(client, ctx, project_ids) ctx.custom_blueprint_id = _find_custom_blueprint_id(client, ctx) diff --git a/scripts/seed_glue_tables.py b/scripts/seed_glue_tables.py new file mode 100644 index 0000000..211e975 --- /dev/null +++ b/scripts/seed_glue_tables.py @@ -0,0 +1,1194 @@ +#!/usr/bin/env python3 +"""Register LF-native Iceberg Glue tables as DataZone assets and subscriptions.""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from scripts.seed_config import load_seed_config, load_seed_state, write_seed_state +from scripts.tf_outputs import load_tf_outputs + +SEED_CONFIG = load_seed_config() + +GLUE_TABLE_ASSET_TYPE = "amazon.datazone.GlueTableAssetType" +GLUE_TABLE_FORM_NAME = "GlueTableForm" +GLUE_TABLE_FORM_TYPE = "amazon.datazone.GlueTableFormType" +ICEBERG_TABLE_NAMES = ( + "package_entry", + "package_manifest", + "package_revision", + "package_tag", +) + + +class SeedGlueTablesError(RuntimeError): + """Raised when Glue-table DataZone seeding cannot complete.""" + + +@dataclass(frozen=True) +class GlueAssetTypeConfig: + asset_type_name: str + asset_type_revision: str + form_name: str + form_type_name: str + form_type_revision: str + + +@dataclass(frozen=True) +class DataZoneListing: + listing_id: str + listing_revision: str + asset_id: str + asset_revision: str + name: str + owner_project_id: str + + +@dataclass(frozen=True) +class AssetCandidate: + asset_id: str + asset_revision: str + asset_detail: dict[str, Any] + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--dry-run", action="store_true", help="Show planned actions only.") + parser.add_argument( + "--force", + action="store_true", + help=( + "Revoke existing accepted subscriptions before re-creating them. " + "Use this to re-trigger DataZone's LF grant machinery after infrastructure changes." + ), + ) + parser.add_argument( + "--inspect", + action="store_true", + help=( + "Inspect subscription grant state via DataZone API. " + "Reads saved seed state and reports what grants DataZone believes it issued " + "(GRANTED/PENDING/FAILED) vs the subscriptions that are ACCEPTED." + ), + ) + parser.add_argument( + "--database-name", + default="", + help="Override the LF-native Glue database name instead of reading infra/tf-outputs.json.", + ) + parser.add_argument( + "--owner-project-id", + default="", + help="Override the default project ID instead of reading infra/tf-outputs.json.", + ) + parser.add_argument( + "--subscriber-project-ids", + default="", + help=( + "Comma-separated subscriber project IDs. " + "Defaults to non-default projects from infra/tf-outputs.json." + ), + ) + return parser.parse_args() + + +def _get_region() -> str: + return os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" + + +def _get_domain_id(outputs: dict[str, Any]) -> str: + domain_id = str( + os.environ.get("DATAZONE_DOMAIN_ID") or outputs.get("datazone_domain_id") or "" + ).strip() + if not domain_id: + raise SeedGlueTablesError( + "missing DATAZONE_DOMAIN_ID and infra/tf-outputs.json:datazone_domain_id" + ) + return domain_id + + +def _get_database_name(args: argparse.Namespace, outputs: dict[str, Any]) -> str: + if args.database_name: + return str(args.database_name).strip() + return str(outputs.get("iceberg_lf_database_name") or "").strip() + + +def _get_owner_project_id(args: argparse.Namespace, outputs: dict[str, Any]) -> str: + if args.owner_project_id: + return str(args.owner_project_id).strip() + project_ids: dict[str, Any] = outputs.get("datazone_project_ids") or {} + owner_project_id = str(project_ids.get(SEED_CONFIG.default_project) or "").strip() + if not owner_project_id: + raise SeedGlueTablesError( + f"missing project ID for default project {SEED_CONFIG.default_project!r}" + " in infra/tf-outputs.json:datazone_project_ids" + ) + return owner_project_id + + +def _get_subscriber_project_ids(args: argparse.Namespace, outputs: dict[str, Any]) -> list[str]: + if args.subscriber_project_ids: + return [value.strip() for value in args.subscriber_project_ids.split(",") if value.strip()] + + project_ids: dict[str, Any] = outputs.get("datazone_project_ids") or {} + return [ + str(project_ids[project.key]) + for project in SEED_CONFIG.projects + if project.key != SEED_CONFIG.default_project and project_ids.get(project.key) + ] + + +def _get_account_id() -> str: + return str(boto3.client("sts").get_caller_identity()["Account"]) + + +def _s3_uri_to_arn(location: str) -> str: + if not location.startswith("s3://"): + raise SeedGlueTablesError(f"expected s3:// location, got {location!r}") + bucket_and_key = location.removeprefix("s3://").rstrip("/") + bucket, _, key = bucket_and_key.partition("/") + if not bucket: + raise SeedGlueTablesError(f"could not parse S3 bucket from {location!r}") + if not key: + return f"arn:aws:s3:::{bucket}" + return f"arn:aws:s3:::{bucket}/{key}" + + +def _get_glue_asset_type_config(client: Any, domain_id: str) -> GlueAssetTypeConfig: + try: + response = client.get_asset_type( + domainIdentifier=domain_id, + identifier=GLUE_TABLE_ASSET_TYPE, + ) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError("failed to resolve DataZone Glue table asset type") from exc + + asset_type_revision = str(response.get("revision") or "").strip() + forms_output = response.get("formsOutput") or {} + if not asset_type_revision or not isinstance(forms_output, dict): + raise SeedGlueTablesError("Glue table asset type response was missing revision metadata") + + form_output = forms_output.get(GLUE_TABLE_FORM_NAME) + if not isinstance(form_output, dict): + raise SeedGlueTablesError("GlueTableForm is not available in this DataZone domain") + + form_type_name = str(form_output.get("typeName") or "").strip() + form_type_revision = str(form_output.get("typeRevision") or "").strip() + if not form_type_name or not form_type_revision: + raise SeedGlueTablesError("GlueTableForm metadata is incomplete in this DataZone domain") + + return GlueAssetTypeConfig( + asset_type_name=GLUE_TABLE_ASSET_TYPE, + asset_type_revision=asset_type_revision, + form_name=GLUE_TABLE_FORM_NAME, + form_type_name=form_type_name, + form_type_revision=form_type_revision, + ) + + +def _build_external_identifier( + account_id: str, region: str, database_name: str, table_name: str +) -> str: + return f"glue://{account_id}/{region}/{database_name}/{table_name}" + + +def _asset_search( + client: Any, + domain_id: str, + owner_project_id: str, + search_text: str, +) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "maxResults": 50, + "owningProjectIdentifier": owner_project_id, + "searchScope": "ASSET", + "searchText": search_text, + } + if next_token: + kwargs["nextToken"] = next_token + try: + response = client.search(**kwargs) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError("failed to search DataZone assets") from exc + for item in response.get("items", []): + asset = item.get("assetItem") + if isinstance(asset, dict): + items.append(asset) + next_token = response.get("nextToken") + if not next_token: + return items + + +def _get_asset(client: Any, domain_id: str, asset_id: str) -> dict[str, Any]: + try: + response = client.get_asset(domainIdentifier=domain_id, identifier=asset_id) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError(f"failed to fetch DataZone asset {asset_id}") from exc + if not isinstance(response, dict): + raise SeedGlueTablesError(f"unexpected DataZone asset payload for {asset_id}") + return response + + +def _find_listing( + client: Any, + domain_id: str, + owner_project_id: str, + asset_type_name: str, + table_name: str, +) -> DataZoneListing | None: + for asset in _asset_search(client, domain_id, owner_project_id, table_name): + if str(asset.get("name") or "") != table_name: + continue + if str(asset.get("typeIdentifier") or "") != asset_type_name: + continue + if str(asset.get("owningProjectId") or "") != owner_project_id: + continue + asset_id = str(asset.get("identifier") or "") + if not asset_id: + continue + asset_detail = _get_asset(client, domain_id, asset_id) + listing = asset_detail.get("listing") or {} + return DataZoneListing( + listing_id=str(listing.get("listingId") or ""), + listing_revision="", + asset_id=asset_id, + asset_revision=str(asset_detail.get("revision") or ""), + name=table_name, + owner_project_id=owner_project_id, + ) + return None + + +def _form_names(asset_detail: dict[str, Any]) -> set[str]: + forms = asset_detail.get("formsOutput") or [] + if not isinstance(forms, list): + return set() + names = { + str(form.get("formName") or "") + for form in forms + if isinstance(form, dict) and form.get("formName") + } + return {name for name in names if name} + + +def _is_imported_glue_asset(asset_detail: dict[str, Any]) -> bool: + return "DataSourceReferenceForm" in _form_names(asset_detail) + + +def _asset_candidates( + client: Any, + domain_id: str, + owner_project_id: str, + asset_type_name: str, + table_name: str, +) -> list[AssetCandidate]: + candidates: list[AssetCandidate] = [] + for asset in _asset_search(client, domain_id, owner_project_id, table_name): + if str(asset.get("name") or "") != table_name: + continue + if str(asset.get("typeIdentifier") or "") != asset_type_name: + continue + if str(asset.get("owningProjectId") or "") != owner_project_id: + continue + asset_id = str(asset.get("identifier") or "") + if not asset_id: + continue + asset_detail = _get_asset(client, domain_id, asset_id) + candidates.append( + AssetCandidate( + asset_id=asset_id, + asset_revision=str(asset_detail.get("revision") or ""), + asset_detail=asset_detail, + ) + ) + return candidates + + +def _asset_score( + candidate: AssetCandidate, expected_external_identifier: str +) -> tuple[int, list[str]]: + score = 0 + reasons: list[str] = [] + forms = _form_names(candidate.asset_detail) + created_by = str(candidate.asset_detail.get("createdBy") or "") + external_identifier = str(candidate.asset_detail.get("externalIdentifier") or "") + + if "DataSourceReferenceForm" in forms: + score += 10 + reasons.append("has DataSourceReferenceForm") + if created_by == "SYSTEM": + score += 4 + reasons.append("created by SYSTEM") + if external_identifier == expected_external_identifier: + score += 2 + reasons.append("matches manual external identifier") + if external_identifier.startswith("arn:aws:glue:"): + score += 2 + reasons.append("matches imported Glue external identifier") + if "SubscriptionTermsForm" in forms: + score += 1 + reasons.append("has subscription terms") + return score, reasons + + +def _select_listing_candidate( + candidates: list[AssetCandidate], expected_external_identifier: str +) -> tuple[DataZoneListing | None, list[dict[str, Any]], bool]: + scored: list[tuple[int, AssetCandidate, list[str]]] = [] + for candidate in candidates: + score, reasons = _asset_score(candidate, expected_external_identifier) + scored.append((score, candidate, reasons)) + + scored.sort(key=lambda item: item[0], reverse=True) + summaries: list[dict[str, Any]] = [] + for score, candidate, reasons in scored: + listing = candidate.asset_detail.get("listing") or {} + summaries.append( + { + "asset_id": candidate.asset_id, + "listing_id": str(listing.get("listingId") or ""), + "external_identifier": str( + candidate.asset_detail.get("externalIdentifier") or "" + ), + "created_by": str(candidate.asset_detail.get("createdBy") or ""), + "forms": sorted(_form_names(candidate.asset_detail)), + "imported": _is_imported_glue_asset(candidate.asset_detail), + "score": score, + "reasons": reasons, + } + ) + + for _, candidate, _ in scored: + listing = candidate.asset_detail.get("listing") or {} + listing_id = str(listing.get("listingId") or "") + if not listing_id: + continue + return ( + DataZoneListing( + listing_id=listing_id, + listing_revision="", + asset_id=candidate.asset_id, + asset_revision=candidate.asset_revision, + name=str(candidate.asset_detail.get("name") or ""), + owner_project_id=str(candidate.asset_detail.get("owningProjectId") or ""), + ), + summaries, + _is_imported_glue_asset(candidate.asset_detail), + ) + return None, summaries, False + + +def _wait_for_imported_listing( + datazone_client: Any, + *, + domain_id: str, + owner_project_id: str, + asset_type_name: str, + account_id: str, + region: str, + database_name: str, + table_name: str, + timeout_seconds: int = 120, +) -> tuple[DataZoneListing | None, list[dict[str, Any]], bool]: + external_identifier = _build_external_identifier(account_id, region, database_name, table_name) + deadline = time.time() + timeout_seconds + last_summaries: list[dict[str, Any]] = [] + while time.time() < deadline: + listing, summaries, imported = _select_listing_candidate( + _asset_candidates( + datazone_client, + domain_id, + owner_project_id, + asset_type_name, + table_name, + ), + external_identifier, + ) + last_summaries = summaries + if listing is not None and imported: + return listing, summaries, imported + time.sleep(5) + return None, last_summaries, False + + +def _glue_table_form_content(glue_table: dict[str, Any], account_id: str, region: str) -> str: + storage_descriptor = glue_table.get("StorageDescriptor") or {} + columns = storage_descriptor.get("Columns") or [] + database_name = str(glue_table["DatabaseName"]) + table_name = str(glue_table["Name"]) + payload = { + "catalogId": account_id, + "region": region, + "databaseName": database_name, + "tableName": table_name, + "tableArn": f"arn:aws:glue:{region}:{account_id}:table/{database_name}/{table_name}", + "columns": [ + { + "columnName": str(column.get("Name") or ""), + "dataType": str(column.get("Type") or ""), + } + for column in columns + if column.get("Name") + ], + } + return json.dumps(payload, separators=(",", ":"), sort_keys=True) + + +def _ensure_listing( + datazone_client: Any, + *, + domain_id: str, + owner_project_id: str, + asset_type: GlueAssetTypeConfig, + account_id: str, + region: str, + database_name: str, + table_name: str, + dry_run: bool, +) -> DataZoneListing: + external_identifier = _build_external_identifier(account_id, region, database_name, table_name) + existing, candidate_summaries, imported = _select_listing_candidate( + _asset_candidates( + datazone_client, + domain_id, + owner_project_id, + asset_type.asset_type_name, + table_name, + ), + external_identifier, + ) + if existing is not None: + selected = next( + ( + summary + for summary in candidate_summaries + if str(summary.get("listing_id") or "") == existing.listing_id + ), + None, + ) + if selected is not None: + print( + f"Listing {table_name}: selected {existing.listing_id}" + f" via {', '.join(selected.get('reasons') or ['existing asset'])}" + ) + else: + print(f"Listing {table_name}: present ({existing.listing_id})") + if imported: + return existing + + print(f"Listing {table_name}: waiting for imported Glue asset") + if dry_run: + return DataZoneListing( + listing_id=f"dry-run-{table_name}", + listing_revision="1", + asset_id=f"dry-run-asset-{table_name}", + asset_revision="1", + name=table_name, + owner_project_id=owner_project_id, + ) + + imported_listing, candidate_summaries, imported = _wait_for_imported_listing( + datazone_client, + domain_id=domain_id, + owner_project_id=owner_project_id, + asset_type_name=asset_type.asset_type_name, + account_id=account_id, + region=region, + database_name=database_name, + table_name=table_name, + ) + if imported_listing is not None and imported: + print(f" Imported listing {imported_listing.listing_id} is ready") + return imported_listing + + raise SeedGlueTablesError( + "expected a DataZone-imported Glue asset for " + f"{table_name}, but none was available after waiting. " + f"Candidates: {json.dumps(candidate_summaries, default=str)}" + ) + + +def _find_subscription_request( + client: Any, + domain_id: str, + listing_id: str, + project_id: str, + status: str, +) -> dict[str, Any] | None: + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "status": status, + "maxResults": 50, + "subscribedListingId": listing_id, + } + if next_token: + kwargs["nextToken"] = next_token + try: + response = client.list_subscription_requests(**kwargs) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError("failed to list DataZone subscription requests") from exc + for item in response.get("items", []): + if not isinstance(item, dict): + continue + principals = item.get("subscribedPrincipals", []) + listings = item.get("subscribedListings", []) + project_match = any( + isinstance(principal.get("project"), dict) + and principal["project"].get("id") == project_id + for principal in principals + ) + listing_match = any(listing.get("id") == listing_id for listing in listings) + if project_match and listing_match: + return item + next_token = response.get("nextToken") + if not next_token: + return None + + +def _get_subscription_request_status(client: Any, domain_id: str, request_id: str) -> str: + try: + response = client.get_subscription_request_details( + domainIdentifier=domain_id, + identifier=request_id, + ) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + f"failed to fetch subscription request {request_id}" + ) from exc + return str(response.get("status") or "") + + +def _find_active_subscription( + client: Any, + domain_id: str, + listing_id: str, + project_id: str, +) -> dict[str, Any] | None: + """Return the APPROVED subscription (not request) for a listing+project, or None.""" + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "subscribedListingId": listing_id, + "status": "APPROVED", + "maxResults": 50, + } + if next_token: + kwargs["nextToken"] = next_token + try: + response = client.list_subscriptions(**kwargs) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + f"failed to list subscriptions for listing {listing_id}" + ) from exc + for item in response.get("items", []): + subscriber = item.get("subscribedPrincipal") or {} + project = subscriber.get("project") or {} + if project.get("id") == project_id: + return item + next_token = response.get("nextToken") + if not next_token: + return None + + +def _revoke_subscription( + client: Any, + domain_id: str, + subscription_id: str, + table_name: str, +) -> None: + try: + client.revoke_subscription( + domainIdentifier=domain_id, + identifier=subscription_id, + ) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + f"failed to revoke subscription {subscription_id} for {table_name}: {exc}" + ) from exc + print(f" Revoked subscription {subscription_id}") + + +def _ensure_subscription( + client: Any, + *, + domain_id: str, + listing: DataZoneListing, + project_id: str, + dry_run: bool, + force: bool = False, +) -> str: + label = f"Subscription {project_id}/{listing.name}" + + if force: + active = _find_active_subscription(client, domain_id, listing.listing_id, project_id) + if active is not None: + subscription_id = str(active.get("id") or "") + print(f"{label}: revoking ({subscription_id})") + if dry_run: + print(f" [DRY-RUN] Would revoke subscription {subscription_id}") + else: + _revoke_subscription(client, domain_id, subscription_id, listing.name) + + accepted = _find_subscription_request( + client, domain_id, listing.listing_id, project_id, "ACCEPTED" + ) + if accepted is not None and not force: + request_id = str(accepted.get("id") or "") + print(f"{label}: present ({request_id})") + return request_id + + print(f"{label}: missing") + if dry_run: + return f"dry-run-subscription-{project_id}-{listing.name}" + + pending = _find_subscription_request( + client, domain_id, listing.listing_id, project_id, "PENDING" + ) + if pending is None: + try: + pending = client.create_subscription_request( + clientToken=str(uuid.uuid4()), + domainIdentifier=domain_id, + requestReason=f"RAJA Glue table grant for {listing.name}", + subscribedListings=[{"identifier": listing.listing_id}], + subscribedPrincipals=[{"project": {"identifier": project_id}}], + ) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + "failed to create subscription request for " + f"project {project_id} and {listing.name}: {exc}" + ) from exc + print(f" Created subscription request {pending['id']}") + + request_id = str(pending["id"]) + try: + client.accept_subscription_request( + domainIdentifier=domain_id, + identifier=request_id, + decisionComment="Auto-approved by seed_glue_tables.py", + ) + except (ClientError, BotoCoreError) as exc: + deadline = time.time() + 10 + while time.time() < deadline: + if _get_subscription_request_status(client, domain_id, request_id) == "ACCEPTED": + print(f" Subscription request {request_id} is already accepted") + return request_id + time.sleep(1) + raise SeedGlueTablesError( + f"failed to accept subscription request {request_id} for {listing.name}: {exc}" + ) from exc + print(f" Accepted subscription request {request_id}") + return request_id + + +def _list_subscription_grants( + client: Any, domain_id: str, listing_id: str +) -> list[dict[str, Any]]: + grants: list[dict[str, Any]] = [] + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "subscribedListingId": listing_id, + "maxResults": 50, + } + if next_token: + kwargs["nextToken"] = next_token + try: + response = client.list_subscription_grants(**kwargs) + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + f"failed to list subscription grants for listing {listing_id}" + ) from exc + grants.extend(response.get("items", [])) + next_token = response.get("nextToken") + if not next_token: + return grants + + +def _inspect_subscription_grants( + client: Any, + *, + domain_id: str, + state: dict[str, Any], +) -> None: + """Report what DataZone believes it granted for each Glue table subscription. + + Probes H4: accepted subscriptions may not have triggered DataZone-managed LF + grants. If no grant objects exist at all, DataZone never attempted to issue LF + permissions. If grants exist but have status GRANT_FAILED, DataZone tried and + failed (useful for H2 investigation — likely a missing grantable permission on + the publisher side). + """ + glue_state: dict[str, Any] = state.get("glue_tables") or {} + if not glue_state: + print("No Glue table seed state found — run without --inspect first.") + return + + print("=" * 60) + print("DataZone subscription grant inspection") + print("=" * 60) + + any_missing = False + for table_name, table_state in sorted(glue_state.items()): + listing_id: str = str(table_state.get("listing_id") or "") + subscription_ids: dict[str, str] = table_state.get("subscription_ids") or {} + if not listing_id: + print(f"\n{table_name}: no listing_id in seed state — skipping") + continue + + print(f"\n{table_name} (listing {listing_id})") + grants = _list_subscription_grants(client, domain_id, listing_id) + if not grants: + print(" ✗ No subscription grant objects found — DataZone never attempted LF grants") + any_missing = True + continue + + for grant in grants: + grant_id = str(grant.get("id") or "") + grant_status = str(grant.get("status") or "?") + sub_id = str(grant.get("subscriptionId") or "") + assets = grant.get("assets") or [] + print(f" Grant {grant_id} status={grant_status} subscription={sub_id}") + for asset in assets: + asset_id = str(asset.get("assetId") or "") + asset_status = str(asset.get("status") or "?") + failure_cause = asset.get("failureCause") or {} + failure_msg = str(failure_cause.get("message") or "") + line = f" asset={asset_id} status={asset_status}" + if failure_msg: + line += f" failure={failure_msg}" + print(line) + + # Cross-reference: warn about subscription request IDs that have no matching grant + grant_sub_ids = {str(g.get("subscriptionId") or "") for g in grants} + for project_id, request_id in sorted(subscription_ids.items()): + if request_id not in grant_sub_ids: + print( + f" ✗ Subscription request {request_id} (project {project_id})" + " has no matching grant object" + ) + any_missing = True + + print("\n" + "=" * 60) + if any_missing: + print( + "RESULT: At least one accepted subscription has no grant object.\n" + "DataZone never initiated LF grant fulfillment for those subscriptions.\n" + "This confirms H4 and narrows root cause to H1 (data-source linkage) or\n" + "H2 (publisher-side grantable permissions)." + ) + else: + print( + "RESULT: All subscriptions have grant objects — check statuses above for " + "GRANT_FAILED." + ) + + +DEFAULT_LAKEHOUSE_CONNECTION_NAME = "project.default_lakehouse" + + +def _find_glue_data_source( + client: Any, + domain_id: str, + project_id: str, + data_source_name: str, +) -> dict[str, Any] | None: + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "projectIdentifier": project_id, + "maxResults": 50, + "type": "GLUE", + } + if next_token: + kwargs["nextToken"] = next_token + try: + response = client.list_data_sources(**kwargs) + except (ClientError, BotoCoreError) as exc: + code = ( + exc.response.get("Error", {}).get("Code", "") + if isinstance(exc, ClientError) + else "" + ) + if code in ("AccessDeniedException", "ValidationException"): + print(f"Glue data source: list_data_sources unsupported ({code}) — skipping") + return None + raise + for item in response.get("items", []): + if str(item.get("name") or "") == data_source_name: + return item + next_token = response.get("nextToken") + if not next_token: + return None + + +def _find_project_connection_id( + client: Any, + domain_id: str, + project_id: str, + *, + connection_name: str, + connection_type: str | None = None, +) -> str: + next_token: str | None = None + while True: + kwargs: dict[str, Any] = { + "domainIdentifier": domain_id, + "projectIdentifier": project_id, + "maxResults": 50, + } + if next_token: + kwargs["nextToken"] = next_token + response = client.list_connections(**kwargs) + for item in response.get("items", []): + if str(item.get("name") or "") != connection_name: + continue + if connection_type and str(item.get("type") or "") != connection_type: + continue + connection_id = str(item.get("connectionId") or item.get("id") or "") + if connection_id: + return connection_id + next_token = response.get("nextToken") + if not next_token: + return "" + + +def _get_data_source_role_arn(client: Any, *, domain_id: str, data_source_id: str) -> str: + response = client.get_data_source(domainIdentifier=domain_id, identifier=data_source_id) + config = response.get("configuration") or {} + glue_config = config.get("glueRunConfiguration") or {} + role_arn = str(glue_config.get("dataAccessRole") or "") + if not role_arn: + raise SeedGlueTablesError( + f"data source {data_source_id} is missing glueRunConfiguration.dataAccessRole" + ) + return role_arn + + +def _grant_lf_permissions( + lf_client: Any, + *, + principal_arn: str, + resource: dict[str, Any], + permissions: list[str], + grant_permissions: list[str], +) -> None: + try: + lf_client.grant_permissions( + Principal={"DataLakePrincipalIdentifier": principal_arn}, + Resource=resource, + Permissions=permissions, + PermissionsWithGrantOption=grant_permissions, + ) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + message = str(exc) + if code == "AlreadyExistsException" or "Permissions modification is invalid" in message: + return + raise SeedGlueTablesError( + f"failed to grant Lake Formation permissions to {principal_arn}: {exc}" + ) from exc + except BotoCoreError as exc: + raise SeedGlueTablesError( + f"failed to grant Lake Formation permissions to {principal_arn}: {exc}" + ) from exc + + +def _ensure_data_source_lf_permissions( + *, + datazone_client: Any, + glue_client: Any, + lf_client: Any, + domain_id: str, + data_source_id: str, + database_name: str, + table_names: tuple[str, ...], +) -> None: + role_arn = _get_data_source_role_arn( + datazone_client, domain_id=domain_id, data_source_id=data_source_id + ) + print(f"Glue data source role: {role_arn}") + + _grant_lf_permissions( + lf_client, + principal_arn=role_arn, + resource={"Database": {"Name": database_name}}, + permissions=["ALL"], + grant_permissions=["ALL"], + ) + + for table_name in table_names: + try: + glue_table = glue_client.get_table(DatabaseName=database_name, Name=table_name)["Table"] + except (ClientError, BotoCoreError) as exc: + raise SeedGlueTablesError( + f"failed to read Glue table {database_name}.{table_name}" + ) from exc + table_location = str((glue_table.get("StorageDescriptor") or {}).get("Location") or "") + if not table_location: + raise SeedGlueTablesError( + f"Glue table {database_name}.{table_name} is missing StorageDescriptor.Location" + ) + + _grant_lf_permissions( + lf_client, + principal_arn=role_arn, + resource={"Table": {"DatabaseName": database_name, "Name": table_name}}, + permissions=["ALL"], + grant_permissions=["ALL"], + ) + _grant_lf_permissions( + lf_client, + principal_arn=role_arn, + resource={"DataLocation": {"ResourceArn": _s3_uri_to_arn(table_location)}}, + permissions=["DATA_LOCATION_ACCESS"], + grant_permissions=["DATA_LOCATION_ACCESS"], + ) + + +def _wait_for_data_source_ready(client: Any, domain_id: str, data_source_id: str) -> bool: + deadline = time.time() + 120 + while time.time() < deadline: + response = client.get_data_source( + domainIdentifier=domain_id, + identifier=data_source_id, + ) + status = str(response.get("status") or "") + if status == "READY": + return True + if status in {"FAILED_CREATION", "FAILED_UPDATE", "FAILED_DELETION"}: + print(f" WARNING: data source {data_source_id} entered terminal status {status}") + return False + time.sleep(2) + print(f" WARNING: timed out waiting for data source {data_source_id} to become READY") + return False + + +def _start_data_source_run_if_ready(client: Any, domain_id: str, data_source_id: str) -> None: + if not _wait_for_data_source_ready(client, domain_id, data_source_id): + return + try: + client.start_data_source_run( + domainIdentifier=domain_id, + dataSourceIdentifier=data_source_id, + ) + print( + f" Started initial run for {data_source_id} — " + "check DataZone console for import status" + ) + except (ClientError, BotoCoreError) as exc: + print(f" WARNING: failed to start data source run: {exc}") + + +def _ensure_glue_data_source( + client: Any, + *, + domain_id: str, + owner_project_id: str, + database_name: str, + dry_run: bool, +) -> str: + """Ensure a DataZone-managed Glue data source exists for the LF-native Iceberg database.""" + data_source_name = f"{database_name}-datasource" + existing = _find_glue_data_source(client, domain_id, owner_project_id, data_source_name) + if existing is not None: + data_source_id = str(existing.get("dataSourceId") or existing.get("id") or "") + data_source_status = str(existing.get("status") or "?") + print(f"Glue data source: present ({data_source_id}) [{data_source_status}]") + if data_source_id and ( + int(existing.get("lastRunAssetCount") or 0) < len(ICEBERG_TABLE_NAMES) + ): + _start_data_source_run_if_ready(client, domain_id, data_source_id) + return data_source_id + + connection_id = _find_project_connection_id( + client, + domain_id, + owner_project_id, + connection_name=DEFAULT_LAKEHOUSE_CONNECTION_NAME, + connection_type="LAKEHOUSE", + ) + if not connection_id: + print( + "Glue data source: skipped " + f"({DEFAULT_LAKEHOUSE_CONNECTION_NAME!r} connection not found)" + ) + return "" + + print(f"Glue data source: creating {data_source_name!r}") + if dry_run: + print(" [DRY-RUN] Would create Glue data source and start initial run") + return "" + + try: + response = client.create_data_source( + clientToken=str(uuid.uuid4()), + domainIdentifier=domain_id, + projectIdentifier=owner_project_id, + name=data_source_name, + type="GLUE", + connectionIdentifier=connection_id, + publishOnImport=True, + configuration={ + "glueRunConfiguration": { + "relationalFilterConfigurations": [ + { + "databaseName": database_name, + "filterExpressions": [{"expression": "*", "type": "INCLUDE"}], + } + ] + } + }, + ) + except (ClientError, BotoCoreError) as exc: + print(f" WARNING: failed to create Glue data source: {exc}") + return "" + + data_source_id = str(response.get("id") or "") + print(f" Created data source {data_source_id}") + _start_data_source_run_if_ready(client, domain_id, data_source_id) + return data_source_id + + +def main() -> None: + args = _parse_args() + outputs = load_tf_outputs() + + if args.inspect: + region = _get_region() + domain_id = _get_domain_id(outputs) + state = load_seed_state() + datazone_client = boto3.client("datazone", region_name=region) + _inspect_subscription_grants(datazone_client, domain_id=domain_id, state=state) + return + + database_name = _get_database_name(args, outputs) + if not database_name: + print("Glue table seeding: skipped (iceberg_lf_database_name is empty)") + return + + region = _get_region() + domain_id = _get_domain_id(outputs) + owner_project_id = _get_owner_project_id(args, outputs) + subscriber_project_ids = [ + project_id + for project_id in _get_subscriber_project_ids(args, outputs) + if project_id != owner_project_id + ] + if not subscriber_project_ids: + raise SeedGlueTablesError("no subscriber project IDs were configured") + + account_id = _get_account_id() + datazone_client = boto3.client("datazone", region_name=region) + glue_client = boto3.client("glue", region_name=region) + lf_client = boto3.client("lakeformation", region_name=region) + + data_source_id = _ensure_glue_data_source( + datazone_client, + domain_id=domain_id, + owner_project_id=owner_project_id, + database_name=database_name, + dry_run=args.dry_run, + ) + if data_source_id: + _ensure_data_source_lf_permissions( + datazone_client=datazone_client, + glue_client=glue_client, + lf_client=lf_client, + domain_id=domain_id, + data_source_id=data_source_id, + database_name=database_name, + table_names=ICEBERG_TABLE_NAMES, + ) + _start_data_source_run_if_ready(datazone_client, domain_id, data_source_id) + + asset_type = _get_glue_asset_type_config(datazone_client, domain_id) + + print("=" * 60) + print("Seeding DataZone Glue table assets") + print(f"Domain: {domain_id}") + print(f"Database: {database_name}") + print(f"Owner project: {owner_project_id}") + print(f"Subscribers: {', '.join(subscriber_project_ids)}") + print(f"Asset type: {asset_type.asset_type_name}@{asset_type.asset_type_revision}") + print(f"Glue table form: {asset_type.form_type_name}@{asset_type.form_type_revision}") + if args.dry_run: + print("Mode: DRY-RUN") + if args.force: + print("Mode: FORCE (revoke + re-subscribe)") + print("=" * 60) + + state = load_seed_state() + glue_state: dict[str, dict[str, Any]] = {} + for table_name in ICEBERG_TABLE_NAMES: + listing = _ensure_listing( + datazone_client, + domain_id=domain_id, + owner_project_id=owner_project_id, + asset_type=asset_type, + account_id=account_id, + region=region, + database_name=database_name, + table_name=table_name, + dry_run=args.dry_run, + ) + subscription_ids: dict[str, str] = {} + for project_id in subscriber_project_ids: + subscription_ids[project_id] = _ensure_subscription( + datazone_client, + domain_id=domain_id, + listing=listing, + project_id=project_id, + dry_run=args.dry_run, + force=args.force, + ) + glue_state[table_name] = { + "asset_id": listing.asset_id, + "asset_revision": listing.asset_revision, + "database_name": database_name, + "external_identifier": _build_external_identifier( + account_id, region, database_name, table_name + ), + "listing_id": listing.listing_id, + "listing_revision": listing.listing_revision, + "subscription_ids": subscription_ids, + } + + if args.dry_run: + print("Glue table seeding: dry-run complete") + return + + state["glue_tables"] = glue_state + write_seed_state(state) + print("Glue table seeding: complete") + + +if __name__ == "__main__": + try: + main() + except SeedGlueTablesError as exc: + print(f"✗ {exc}", file=sys.stderr) + sys.exit(1) diff --git a/scripts/seed_packages.py b/scripts/seed_packages.py index 4e4021d..7ea1841 100644 --- a/scripts/seed_packages.py +++ b/scripts/seed_packages.py @@ -4,11 +4,15 @@ from __future__ import annotations import os +from pathlib import Path import sys import boto3 from botocore.exceptions import ClientError +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + from raja.datazone import DataZoneConfig, DataZoneError, DataZoneService, datazone_enabled from scripts.seed_config import ( DEFAULT_TEST_URI_PATH, diff --git a/scripts/seed_users.py b/scripts/seed_users.py index a7f4d42..638d608 100755 --- a/scripts/seed_users.py +++ b/scripts/seed_users.py @@ -4,10 +4,14 @@ from __future__ import annotations import os +from pathlib import Path import sys import boto3 +if __package__ in {None, ""}: + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + from raja.datazone import DataZoneConfig, DataZoneService, datazone_enabled from scripts.seed_config import ( build_user_assignments, diff --git a/seed-config.yaml b/seed-config.yaml index 66c6bca..1c15bfb 100644 --- a/seed-config.yaml +++ b/seed-config.yaml @@ -3,17 +3,17 @@ default_project: alpha projects: - key: alpha display_name: Alpha - project_name: owner + project_name: alpha designation: PROJECT_CONTRIBUTOR buckets: "*" - key: bio display_name: Bio - project_name: users + project_name: bio designation: PROJECT_CONTRIBUTOR buckets: "*bio*" - key: compute display_name: Compute - project_name: guests + project_name: compute designation: PROJECT_CONTRIBUTOR buckets: "*example*" packages: diff --git a/specs/48-use-sagemaker/08-lf-native-iceberg-catalog-blockers.md b/specs/48-use-sagemaker/08-lf-native-iceberg-catalog-blockers.md new file mode 100644 index 0000000..9dc6aa9 --- /dev/null +++ b/specs/48-use-sagemaker/08-lf-native-iceberg-catalog-blockers.md @@ -0,0 +1,251 @@ +# LF-Native Iceberg Catalog Blockers + +## Goal + +Document the remaining blocker after implementing the LF-native Iceberg catalog +POC on March 18, 2026: DataZone accepts subscriptions to manually created Glue +table assets, but subscriber Lakehouse environment roles still do not receive +usable Lake Formation grants. + +## Current State + +A new Glue database `raja-standalone-iceberg-lf` exists and mirrors the four +Quilt Iceberg tables: + +- `package_entry` +- `package_manifest` +- `package_revision` +- `package_tag` + +These tables are published into DataZone as +`amazon.datazone.GlueTableAssetType` assets and subscription requests were +created and reached `ACCEPTED`. + +## Facts + +### Infrastructure facts + +- The mirrored Glue database and tables were created successfully. +- The mirrored tables point at the expected Quilt Iceberg metadata and S3 + locations. +- The S3 table locations were already registered in Lake Formation at + per-table prefixes. +- The mirrored tables now show `IsRegisteredWithLakeFormation = true`. +- The database initially came up with `IAM_ALLOWED_PRINCIPALS`; this had to be + explicitly removed after create. +- Lake Formation grants exist for: + - `arn:aws:iam::712023778557:role/service-role/AmazonSageMakerDomainExecution` + - the operator principal used during apply +- Lake Formation grants do not exist for the subscriber environment roles: + - `arn:aws:iam::712023778557:role/datazone_usr_role_bm7eqh5dc6olrb_5vadl88wuqs59j` + - `arn:aws:iam::712023778557:role/datazone_usr_role_b3byg401pnpjjb_3zjkma8hy45u07` + +### DataZone facts + +- The domain exposes: + - asset type `amazon.datazone.GlueTableAssetType` + - asset type revision `24` + - form `GlueTableForm` + - form type `amazon.datazone.GlueTableFormType` + - form type revision `13` +- Manual `create_asset` works for Glue table assets in this domain. +- The `GlueTableForm` schema rejects a `description` field inside `columns`. +- Published listings were successfully created for all four mirrored tables. +- Subscription requests for users/guests projects were created. +- In some cases, subscription requests auto-transitioned to `ACCEPTED` before + an explicit accept call. +- Despite accepted subscriptions, no subscriber LF grants appeared on the + mirrored database or tables. + +### Environment facts + +- Owner, users, and guests projects each have active `Lakehouse Database` + environments. +- Each such environment has its own generated `userRoleArn`. +- The owner project has a Glue data source configured against its own generated + Glue DB: + - data source id `6i8xx7hn1vt2qv` + - type `GLUE` + - data access role + `arn:aws:iam::712023778557:role/datazone_usr_role_ag00w9am11jcx3_cgaw0vcx71emnb` +- That data source is not pointing at `raja-standalone-iceberg-lf`. + +## Confirmed Blocker + +Accepted subscriptions to manually created Glue table assets are not, in this +POC, resulting in the downstream DataZone-managed fulfillment step that should +materialize Lake Formation grants to subscriber Lakehouse environment roles. + +Important distinction: + +- `ACCEPTED` confirms the subscription decision was recorded. +- It does not, by itself, prove that DataZone completed environment grant + fulfillment. +- AWS documents that for managed Glue assets, approved subscriptions should be + automatically added to existing data lake environments in the subscriber + project. +- AWS separately documents an explicit `Add grant` action for cases where a new + environment is added later and needs access to an already-approved + subscription. +- AWS also documents optional row/column filter flows, but those are an + additional narrowing mechanism, not the default prerequisite for ordinary + full-table sharing. + +Therefore the current blocker is more precise than "acceptance is insufficient": +the system is reaching subscription acceptance, but not the expected managed +Lake Formation grant fulfillment for existing subscriber environments. + +## Hypotheses + +### H1: Missing data-source linkage + +DataZone may only auto-manage LF grants for Glue assets that originate from a +registered/published Glue data source import path, not arbitrary manual +`create_asset` calls. + +Why this is plausible: + +- The owner project already has a GLUE data source and a dedicated data access + role. +- AWS documentation for managed Glue assets is written around DataZone-managed + Glue publication, not arbitrary manual asset creation. +- The manually created assets are subscribable, but may not be recognized by + DataZone as eligible for managed fulfillment. + +### H2: Wrong grant principal on publisher side + +The critical grantable principal may not be the domain execution role alone. +DataZone may require the publishing environment's Glue data access role or +Lakehouse environment role to hold grantable LF permissions on the database, +tables, and data locations. + +Why this is plausible: + +- The owner Lakehouse data source explicitly uses + `datazone_usr_role_ag00w9am11jcx3_cgaw0vcx71emnb`. +- Granting only the domain execution role did not produce subscriber grants. + +### H3: Manual asset payload is incomplete + +The manually created Glue assets may be missing internal metadata that DataZone +uses to connect an asset to the managed Glue subscription workflow. + +Why this is plausible: + +- The assets were publishable and subscribable, but no managed LF side effects + occurred. +- The `DataSourceReferenceForm` exists in the asset type, but was not + populated. +- The owner Glue data source has never imported these tables. +- A manually created asset may not carry the same source lineage metadata as an + imported managed Glue asset. + +### H4: Environment attachment step is required, but is not being triggered + +Accepted subscriptions may not automatically attach to existing Lakehouse +environments for these manually created assets, even though the request status +is `ACCEPTED`. + +Why this is plausible: + +- The subscriber roles received no LF permissions. +- AWS documentation distinguishes: + - automatic fulfillment to existing subscriber data lake environments for + managed Glue assets + - explicit `Add grant` for environments added after approval +- The current POC may be bypassing or failing the automatic fulfillment path. + +### H5: Filters are not the blocker, but grant mode may still matter + +The missing step is unlikely to be "define filters", but DataZone may still use +different fulfillment logic when a grant is represented as a filtered access +path versus a full-table share. + +Why this is plausible: + +- AWS documents filters as an explicit additional access-grant mode for managed + assets. +- No filter configuration was used here. +- The failure is occurring before any subscriber LF permissions appear at all, + which suggests the issue is more fundamental than row/column scoping. + +## Non-Goals + +This follow-on work should not: + +- change Quilt's writer path yet +- redesign the RAJA package asset flow +- optimize sync of Iceberg metadata pointers +- generalize beyond the four POC tables + +## Suggested Next Steps + +1. Prove whether DataZone requires data-source-backed publication. + - Create a real owner-project Glue data source/import path for + `raja-standalone-iceberg-lf`. + - Re-publish one table through that managed path. + - Re-run one subscription and inspect LF grants. + +2. Identify the true publisher-side grant principal. + - Compare behavior when grantable LF permissions are given to: + - domain execution role + - owner Lakehouse environment `userRoleArn` + - owner Glue data source `dataAccessRole` + - Record the minimal working principal set. + +3. Test whether `DataSourceReferenceForm` is required. + - Inspect a Glue asset produced by a real DataZone Glue import. + - Compare its forms to the manually created asset. + - Especially check `DataSourceReferenceForm` and any internal forms tied to + source lineage. + +4. Verify environment attachment behavior explicitly. + - After accepted subscription, inspect whether the asset appears as granted + to subscriber Lakehouse environments. + - If not, determine whether: + - the asset is ineligible for automatic fulfillment + - an explicit `Add grant` action is required + - the `Add grant` path is only supported for managed imported Glue assets + +5. Reduce scope to a single-table spike. + - Use only `package_tag` until LF grants reach one subscriber role and + Athena can query it. + +6. Confirm whether filters change behavior. + - Test one managed Glue-table subscription with no filters. + - Test one managed Glue-table subscription with row/column filters only if + the no-filter path still fails. + - Treat filters as a diagnostic branch, not the default fix. + +7. Only after the above succeeds, update automation. + - Keep the current manual asset seeding isolated. + - Replace it with the managed publication flow if that proves necessary. + +## Success Criteria for the Next Spec + +The next implementation is only complete when all of the following are true for +at least one mirrored table: + +- subscription status is `ACCEPTED` +- subscriber Lakehouse environment role has LF permissions on the mirrored + DB/table +- Athena can query the table while assuming the subscriber environment role +- no `IAM_ALLOWED_PRINCIPALS` permissions remain on the mirrored catalog + resources + +## Recommended Direction + +Bias toward proving H1 and H2 first: + +- managed Glue data source publication +- owner data access role grantability + +Then validate H4: + +- whether existing subscriber Lakehouse environments should already have been + fulfilled automatically +- whether `Add grant` is only relevant to post-approval environment additions + +Do not treat filters as the primary explanation unless a managed imported Glue +asset also fails without them. The current evidence points first to managed +fulfillment eligibility and/or publisher-side LF grantability. diff --git a/specs/48-use-sagemaker/09-lf-native-iceberg-catalog-h1-h2-tests.md b/specs/48-use-sagemaker/09-lf-native-iceberg-catalog-h1-h2-tests.md new file mode 100644 index 0000000..064890b --- /dev/null +++ b/specs/48-use-sagemaker/09-lf-native-iceberg-catalog-h1-h2-tests.md @@ -0,0 +1,169 @@ +# LF-Native Iceberg Catalog — H1/H2/H4 Tests + +## Context + +This spec follows `08-lf-native-iceberg-catalog-blockers.md`. That document +confirmed that accepted subscriptions to manually created `GlueTableAssetType` +assets did not produce Lake Formation grants to subscriber environment roles. +Four hypotheses (H1–H4) were left open. This spec records the work done to +probe them and where that work landed. + +--- + +## What Was Done + +### Step 1 — H2 test: Owner env role LF grants (Terraform) + +**File:** `infra/terraform/main.tf` + +Added three new `aws_lakeformation_permissions` blocks for the owner Lakehouse +environment role (`aws_iam_role.datazone_environment_owner`), mirroring the +existing domain-execution-role grants: + +- `iceberg_lf_owner_env_data_location` — `DATA_LOCATION_ACCESS` with grant + option, one per table S3 location +- `iceberg_lf_owner_env_db` — `ALL` with grant option on the database +- `iceberg_lf_owner_env_tables` — `ALL` with grant option per table + +**Rationale:** DataZone may require the publishing environment's IAM role to +hold grantable LF permissions before it can delegate grants to subscriber +environment roles. Only the domain execution role was previously granted. + +### Step 2 — H4 probe: `--inspect` mode in `seed_glue_tables.py` + +**File:** `scripts/seed_glue_tables.py` + +Added `--inspect` flag. When passed, the script reads saved seed state and +calls `datazone.list_subscription_grants(subscribedListingId=...)` for each +table listing. It then: + +- Reports whether any grant objects exist at all +- Shows per-grant status (`GRANTED` / `GRANT_FAILED` / etc.) and per-asset + failure messages +- Cross-references subscription request IDs against grant object subscription + IDs to surface orphaned requests + +### Step 3 — H1 test: DataZone Glue data source in `sagemaker_gaps.py` + +**File:** `scripts/sagemaker_gaps.py` + +Added `_ensure_iceberg_lf_data_source()`, called at the end of the main +function after environments are discovered. It: + +1. Reads `iceberg_lf_database_name` from tf-outputs.json (skips if empty) +2. Gets the owner project's Lakehouse environment ID from the discovered + `environment_ids["owner"]` +3. Calls `datazone.list_data_sources()` to check for an existing data source + named `{database_name}-datasource` +4. Creates it via `datazone.create_data_source()` with `type="GLUE"`, + `publishOnImport=True`, and a relational filter covering all tables in the + LF-native database +5. Calls `datazone.start_data_source_run()` to trigger an initial import + +--- + +## What Was Tested + +`./poe test-all` was run on the standard dev deploy (the one without +`iceberg_s3_bucket` set). All 55 integration tests passed. + +--- + +## Where It Failed / What Was Blocked + +### Iceberg feature not enabled in dev deploy + +`iceberg_s3_bucket` is not set in the dev Terraform variables, so: + +- `iceberg_lf_database_name` is empty in tf-outputs.json +- `seed_glue_tables.py` prints `skipped (iceberg_lf_database_name is empty)` + and exits +- `sagemaker_gaps.py` prints `skipped (iceberg_lf_database_name not set in + outputs)` and skips data source creation + +Neither the H2 Terraform grants nor the H1 data source were exercised because +the Iceberg stack was not deployed in this session. + +### H4 confirmed: DataZone never attempted LF grant fulfillment + +`--inspect` was run against the existing seed state from the prior POC session +(the deploy that had `raja-standalone-iceberg-lf` and accepted subscriptions). +Output: + +``` +package_entry ✗ No subscription grant objects found +package_manifest ✗ No subscription grant objects found +package_revision ✗ No subscription grant objects found +package_tag ✗ No subscription grant objects found + +RESULT: DataZone never initiated LF grant fulfillment for those subscriptions. +This confirms H4 and narrows root cause to H1 or H2. +``` + +This is not a failed grant — no grant object was ever created. DataZone's LF +subscription machinery never fired at all for manually created Glue assets. + +### Lakehouse environments missing in dev deploy + +`sagemaker_gaps.py` also revealed: + +``` +Environment owner: raja-alpha-env (skipped — project profile '4n0danlvurs0br' +has no raja-registry env config; re-create project with raja-default-profile) +Environment owner: raja-alpha-env (missing) +``` + +The owner/users/guests projects exist under a different profile +(`4n0danlvurs0br`) rather than `raja-default-profile`. Lakehouse environments +were never created for this profile. Even if `iceberg_s3_bucket` were set, +`_ensure_iceberg_lf_data_source` would skip because `owner_environment_id` +would be empty. + +--- + +## Net State + +| Hypothesis | Status | +|---|---| +| H1 (data-source linkage) | **Untested** — Iceberg stack not deployed in this session | +| H2 (publisher principal) | **Untested** — Iceberg stack not deployed in this session | +| H3 (asset payload incomplete) | **Untested** | +| H4 (no grant attempt at all) | **Confirmed** — `--inspect` shows zero grant objects | + +The new code is correct and in place. Testing H1 and H2 requires a deploy with +`iceberg_s3_bucket` set, pointing at the Quilt bucket that contains the +Iceberg tables, so that `raja-standalone-iceberg-lf` is created. + +--- + +## Prerequisites for Next Test Run + +1. Set `iceberg_s3_bucket` in `infra/terraform/terraform.tfvars` (or the + equivalent `.env`-driven variable) to the Quilt S3 bucket name (without + `s3://` prefix). +2. Run `terraform apply` — this will: + - Create the LF-native database and tables + - Apply the H2 owner-env-role LF grants +3. Run `python -m scripts.seed_glue_tables` to publish table assets and + create subscriptions. +4. Run `python -m scripts.sagemaker_gaps` — this will create the H1 Glue data + source and start the import run (requires owner Lakehouse environment to + exist first; see environment profile issue above). +5. Run `python -m scripts.seed_glue_tables --inspect` to check grant state. +6. Check LF permissions directly: + ```bash + aws lakeformation list-permissions \ + --resource '{"Table":{"DatabaseName":"raja-standalone-iceberg-lf","Name":"package_tag"}}' \ + --query 'PrincipalResourcePermissions[].Principal' + ``` + +## Open Pre-condition + +The owner Lakehouse environment issue (`4n0danlvurs0br` profile) must be +resolved before the H1 data source can be created via `sagemaker_gaps.py`. +Options: + +- Re-create the owner project under `raja-default-profile` so the Lakehouse + environment blueprint is available, **or** +- Hardcode the existing Lakehouse environment ID (from the prior POC session: + `6i8xx7hn1vt2qv`) as a fallback in `_ensure_iceberg_lf_data_source`. diff --git a/tests/unit/test_lf_native_poc.py b/tests/unit/test_lf_native_poc.py new file mode 100644 index 0000000..4816639 --- /dev/null +++ b/tests/unit/test_lf_native_poc.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from scripts.lf_native_poc.create_throwaway_subscriber import _timestamped_name +from scripts.lf_native_poc.package_tag_import_poc import ( + _extract_provisioned_value, + _score_asset_as_imported_candidate, +) + + +def test_extract_provisioned_value_returns_matching_value() -> None: + environment = { + "provisionedResources": [ + {"name": "glueDBName", "value": "db1"}, + {"name": "userRoleArn", "value": "arn:aws:iam::123:role/example"}, + ] + } + + assert _extract_provisioned_value(environment, "userRoleArn") == "arn:aws:iam::123:role/example" + + +def test_score_asset_as_imported_candidate_prefers_new_asset_and_reference_form() -> None: + asset = { + "id": "new-asset", + "createdBy": "SYSTEM", + "formsOutput": [ + {"formName": "GlueTableForm"}, + {"formName": "DataSourceReferenceForm"}, + ], + } + + score, reasons = _score_asset_as_imported_candidate(asset, known_asset_ids={"old-asset"}) + + assert score > 0 + assert "new asset id after import run" in reasons + assert "has DataSourceReferenceForm" in reasons + assert "created by SYSTEM" in reasons + + +def test_timestamped_name_prefixes_value() -> None: + assert _timestamped_name("raja-throwaway").startswith("raja-throwaway-") diff --git a/uv.lock b/uv.lock index 15a8877..4bfec16 100644 --- a/uv.lock +++ b/uv.lock @@ -874,7 +874,7 @@ wheels = [ [[package]] name = "raja" -version = "1.2.0" +version = "1.3.0" source = { editable = "." } dependencies = [ { name = "awscrt" },